Files
blechreiz-website/eventplanner_gcal/models.py

128 lines
3.9 KiB
Python

import logging
import uuid
from django.contrib.auth.models import User
from django.db import models
from eventplanner.models import Event
logger = logging.getLogger(__name__)
class UserGCalCoupling(models.Model):
"""For every user in this table the gcal coupling is activated."""
user = models.OneToOneField(User, on_delete=models.CASCADE)
email = models.CharField(max_length=1024)
class Meta:
verbose_name = "User Google Calendar Coupling"
verbose_name_plural = "User Google Calendar Couplings"
def __str__(self):
return f"{self.user.username} <-> {self.email}"
class GCalMapping(models.Model):
"""Mapping between event id at google and local event id."""
gcal_id = models.CharField(max_length=64)
event = models.OneToOneField(Event, on_delete=models.CASCADE, primary_key=True)
class Meta:
verbose_name = "Google Calendar Mapping"
verbose_name_plural = "Google Calendar Mappings"
def __str__(self):
return f"GCal:{self.gcal_id} <-> Event:{self.event_id}"
class GCalPushChannel(models.Model):
"""This table has either zero or one entry. Required to store if a channel already exists,
when it expires and how to stop (renew) the channel.
"""
id = models.CharField(max_length=128, primary_key=True)
address = models.CharField(max_length=256)
token = models.CharField(max_length=128)
resource_id = models.CharField(max_length=128)
expiration = models.BigIntegerField()
class Meta:
verbose_name = "Google Calendar Push Channel"
verbose_name_plural = "Google Calendar Push Channels"
def __str__(self):
return f"Channel {self.id} expires at {self.expiration}"
def to_channel_dict(self):
"""Return a dictionary representation for the Google API."""
return {
"id": self.id,
"type": "web_hook",
"address": self.address,
"token": self.token,
"expiration": self.expiration,
"resourceId": self.resource_id,
}
@classmethod
def from_response(cls, response, address, token):
"""Create a GCalPushChannel from a Google API response."""
return cls(
id=response["id"],
address=address,
token=token,
expiration=int(response.get("expiration", 0)),
resource_id=response.get("resourceId", ""),
)
@classmethod
def create_new(cls, callback_url, service, ttl=None):
"""Create a new push channel subscription."""
channel_id = str(uuid.uuid4())
token = "blechreizGcal"
body = {
"id": channel_id,
"type": "web_hook",
"address": callback_url,
"token": token,
}
if ttl:
body["params"] = {"ttl": str(int(ttl))}
response = service.events().watch(calendarId="primary", body=body).execute()
db_channel = cls.from_response(response, callback_url, token)
db_channel.save()
return db_channel
def stop(self, service):
"""Stop this push channel subscription."""
body = {
"id": self.id,
"resourceId": self.resource_id,
}
try:
service.channels().stop(body=body).execute()
except Exception as e:
logger.warning(f"Failed to stop channel {self.id}: {e}")
self.delete()
@classmethod
def stop_channel(cls, service, channel_id, resource_id):
"""Stop a push channel by ID and resource ID."""
body = {
"id": channel_id,
"resourceId": resource_id,
}
try:
service.channels().stop(body=body).execute()
except Exception as e:
logger.warning(f"Failed to stop channel {channel_id}: {e}")
# Also delete from database if it exists
cls.objects.filter(id=channel_id).delete()