Files
blechreiz-website/eventplanner_gcal/google_sync.py
2026-04-09 16:29:00 +02:00

683 lines
23 KiB
Python

"""
Google Calendar synchronization module.
This module handles synchronization between the local event database
and Google Calendar, including push notifications for real-time updates.
"""
import datetime
import logging
import time
from django.conf import settings
from django.contrib.auth.models import User
from eventplanner.models import Event, EventParticipation
from eventplanner_gcal.models import GCalMapping, GCalPushChannel, UserGCalCoupling
logger = logging.getLogger(__name__)
# Module-level service object cache
_service_object = None
def create_gcal_service_object():
"""
Creates a Google API service object.
This object is required whenever a Google API call is made.
Uses the new google-auth library instead of oauth2client.
"""
try:
import os
import pickle
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
except ImportError as e:
logger.error(f"Required Google API libraries not installed: {e}")
return None
gcal_settings = settings.GCAL_COUPLING
credentials_file = gcal_settings["credentials_file"]
creds = None
# Try to load existing credentials
if os.path.exists(credentials_file):
try:
with open(credentials_file, "rb") as token:
creds = pickle.load(token)
except Exception as e:
logger.warning(f"Could not load credentials from {credentials_file}: {e}")
# Check if credentials are valid
if creds and creds.expired and creds.refresh_token:
try:
creds.refresh(Request())
# Save refreshed credentials
with open(credentials_file, "wb") as token:
pickle.dump(creds, token)
except Exception as e:
logger.error(f"Failed to refresh credentials: {e}")
creds = None
if not creds or not creds.valid:
logger.error(
"Invalid or missing Google Calendar credentials. "
"Please run the credential setup process."
)
return None
try:
service = build("calendar", "v3", credentials=creds)
return service
except Exception as e:
logger.error(f"Failed to build Google Calendar service: {e}")
return None
def _invalidate_service_on_error(exc):
"""Reset the cached service object so the next call retries credential loading."""
global _service_object
logger.warning(f"Invalidating cached GCal service due to error: {exc}")
_service_object = None
def get_service_object():
"""Get or create the Google Calendar service object."""
global _service_object
if _service_object is None:
_service_object = create_gcal_service_object()
if _service_object is None:
logger.error("Failed to create Google Calendar service object")
return _service_object
def reset_service_object():
"""Reset the cached service object (useful for testing or credential refresh)."""
global _service_object
_service_object = None
def get_service_object_fresh():
"""Force-create a new service object, bypassing and replacing the cache."""
reset_service_object()
return get_service_object()
# --------------------- Building GCal event representation ------------------------------------
def build_gcal_attendees_obj(event):
"""
Builds an attendees object that is inserted into the GCal event.
Attendees are all users that have a Google mail address.
"""
result = []
for user_mapping in UserGCalCoupling.objects.all():
u = user_mapping.user
try:
participation = EventParticipation.objects.get(event=event, user=u)
local_status = participation.status
local_comment = participation.comment
except EventParticipation.DoesNotExist:
local_status = "-"
local_comment = ""
status = "needsAction"
if local_status == "?":
status = "tentative"
elif local_status == "Yes":
status = "accepted"
elif local_status == "No":
status = "declined"
attendee = {
"email": user_mapping.email,
"displayName": u.username,
"comment": local_comment,
"responseStatus": status,
}
result.append(attendee)
return result
def build_gcal_event(event, timezone="Europe/Berlin"):
"""Builds a GCal event using a local event."""
def create_datetime_obj(date, time_val):
if time_val is None:
return {"date": str(date), "timeZone": timezone}
else:
return {
"dateTime": f"{date}T{time_val}",
"timeZone": timezone,
}
start_date = event.date
end_date = event.end_date
if end_date is None:
end_date = start_date
start_time = event.meeting_time
if start_time is None:
start_time = event.time
if start_time is None:
end_time = None
else:
end_time = datetime.time(22, 30)
g_location = str(event.location)
if event.map_location:
# Map location has the following format: latitude,longitude,zoomlevel
# the first two are needed
parts = event.map_location.split(",")
if len(parts) >= 2:
g_location = f"{parts[0]},{parts[1]}"
gcal_settings = settings.GCAL_COUPLING
return {
"summary": gcal_settings["eventPrefix"] + event.title,
"description": str(event.desc),
"location": g_location,
"start": create_datetime_obj(start_date, start_time),
"end": create_datetime_obj(end_date, end_time),
"extendedProperties": {
"private": {
"blechreizEvent": "true",
"blechreizID": str(event.id),
}
},
"attendees": build_gcal_attendees_obj(event),
}
# ------------------------------ Callback Functions ------------------------------------------------
def on_gcal_event_created(request_id, response, exception=None):
"""Callback function for created events to enter new gcal id in the mapping table."""
if exception is not None:
logger.error(f"Error creating GCal event: {exception}")
return # Don't raise — let the batch continue processing other events
google_id = response["id"]
django_id = response["extendedProperties"]["private"]["blechreizID"]
try:
event = Event.objects.get(pk=django_id)
mapping = GCalMapping(gcal_id=google_id, event=event)
mapping.save()
logger.info(f"Created mapping: GCal {google_id} <-> Event {django_id}")
except Event.DoesNotExist:
logger.error(f"Event {django_id} not found when creating GCal mapping")
# ------------------------------ GCal Api Calls -------------------------------------------------
def get_all_gcal_events(service, from_now=False):
"""
Retrieves all gcal events with custom property blechreizEvent=True.
These are all events that have been created by this script.
Handles pagination so all events are returned regardless of count.
"""
if from_now:
now = datetime.datetime.now()
min_time = now.strftime("%Y-%m-%dT%H:%M:%S-00:00")
else:
min_time = "2000-01-01T00:00:00-00:00"
all_items = []
page_token = None
try:
while True:
kwargs = dict(
calendarId="primary",
singleEvents=True,
maxResults=250, # max allowed per page by the API
orderBy="startTime",
timeMin=min_time,
timeMax="2100-01-01T00:00:00-00:00",
privateExtendedProperty="blechreizEvent=true",
)
if page_token:
kwargs["pageToken"] = page_token
response = service.events().list(**kwargs).execute()
all_items.extend(response.get("items", []))
page_token = response.get("nextPageToken")
if not page_token:
break
logger.info(f"Fetched {len(all_items)} GCal events (all pages)")
return all_items
except Exception as e:
logger.error(f"Failed to retrieve GCal events: {e}")
return []
def create_gcal_event_request(service, event, timezone="Europe/Berlin"):
"""Creates a request to create a new gcal event using a local event."""
google_event = build_gcal_event(event, timezone)
return service.events().insert(calendarId="primary", body=google_event)
def update_gcal_event_request(service, event, timezone="Europe/Berlin"):
"""Creates a request to update an existing gcal event using a local event."""
google_event = build_gcal_event(event, timezone)
try:
mapping = GCalMapping.objects.get(event=event)
except GCalMapping.DoesNotExist:
return create_gcal_event_request(service, event, timezone)
return service.events().patch(
calendarId="primary", eventId=mapping.gcal_id, body=google_event
)
def delete_gcal_event_request(service, event):
"""Creates a request to delete gcal event that belongs to the given local event."""
try:
mapping = GCalMapping.objects.get(event=event)
gcal_id = mapping.gcal_id
mapping.delete()
return service.events().delete(calendarId="primary", eventId=gcal_id)
except GCalMapping.DoesNotExist:
logger.warning(f"No GCal mapping found for event {event.id}")
return None
# ------------------------------------- Synchronization ----------------------------------------------------
def delete_all_gcal_events(service=None):
"""Deletes all gcal events that have been created by this script."""
if service is None:
service = get_service_object()
if service is None:
logger.error("No service object available")
return 0
gcal_events = get_all_gcal_events(service)
gcal_ids = [ev["id"] for ev in gcal_events]
count = len(gcal_ids)
if count == 0:
return 0
# Use batch request for efficiency
batch = service.new_batch_http_request()
for gcal_id in gcal_ids:
batch.add(service.events().delete(calendarId="primary", eventId=gcal_id))
try:
batch.execute()
except Exception as e:
logger.error(f"Error deleting GCal events: {e}")
status = getattr(e, 'status_code', None) or getattr(e, 'resp', {}).get('status')
if str(status) in ('401', '403'):
_invalidate_service_on_error(e)
GCalMapping.objects.all().delete()
return count
def _execute_in_chunks(service, request_callback_pairs, chunk_size=30, delay=12):
"""
Execute API requests in small batches with a sleep between chunks.
Google Calendar API allows 500 requests per 100 seconds per user (~5 req/s).
Default: 30 requests per chunk, 12 s sleep → ~2.5 req/s average.
request_callback_pairs: list of (request, callback_or_None)
"""
total = len(request_callback_pairs)
for i in range(0, total, chunk_size):
chunk = request_callback_pairs[i : i + chunk_size]
batch = service.new_batch_http_request()
for req, cb in chunk:
if cb is not None:
batch.add(req, callback=cb)
else:
batch.add(req)
try:
batch.execute()
except Exception as e:
logger.error(f"Error executing batch chunk {i // chunk_size + 1}: {e}")
status = getattr(e, "status_code", None) or getattr(e, "resp", {}).get("status")
if str(status) in ("401", "403"):
_invalidate_service_on_error(e)
return # auth broken, no point continuing
if i + chunk_size < total:
logger.info(
f"Chunk {i // chunk_size + 1} done "
f"({min(i + chunk_size, total)}/{total}), "
f"sleeping {delay}s to stay within rate limits..."
)
time.sleep(delay)
def sync_from_local_to_google(service=None):
"""
Creates a google event for each local event (if it does not exist yet) and
deletes all google events that are not found in local database.
Updates participation info of gcal events using local data.
Creates are processed in chunks (future events first) to avoid rate limits.
"""
if service is None:
service = get_service_object()
if service is None:
logger.error("No service object available for sync")
return 0, 0
all_events = get_all_gcal_events(service)
# Map gcal_id -> django_id for every blechreiz-owned event at Google
gcal_id_to_django_id = {}
events_at_google_django_id = set()
for gcal_ev in all_events:
try:
django_id = int(gcal_ev["extendedProperties"]["private"]["blechreizID"])
events_at_google_django_id.add(django_id)
gcal_id_to_django_id[gcal_ev["id"]] = django_id
except (KeyError, ValueError) as e:
logger.warning(f"Invalid GCal event structure: {e}")
local_events_django_id = set(Event.objects.all().values_list("pk", flat=True))
# Repair GCalMapping for events that exist at Google but have no local mapping
# (can happen when a previous batch failed mid-way)
for gcal_id, django_id in gcal_id_to_django_id.items():
if django_id in local_events_django_id and not GCalMapping.objects.filter(gcal_id=gcal_id).exists():
try:
event = Event.objects.get(pk=django_id)
GCalMapping.objects.get_or_create(event=event, defaults={"gcal_id": gcal_id})
logger.info(f"Repaired missing mapping: GCal {gcal_id} <-> Event {django_id}")
except Event.DoesNotExist:
pass
events_to_create_django_id = local_events_django_id - events_at_google_django_id
# Only delete Google events whose local Event no longer exists
# (never delete based on missing GCalMapping — that's just a local cache)
events_to_delete_google_id = {
gcal_id
for gcal_id, django_id in gcal_id_to_django_id.items()
if django_id not in local_events_django_id
}
# --- Deletes (usually few, single batch is fine) ---
if events_to_delete_google_id:
delete_pairs = [
(service.events().delete(calendarId="primary", eventId=gcal_id), None)
for gcal_id in events_to_delete_google_id
]
_execute_in_chunks(service, delete_pairs)
# --- Creates: future events first (soonest upcoming), then past events ---
today = datetime.date.today()
future_ids = list(
Event.objects.filter(pk__in=events_to_create_django_id, date__gte=today)
.order_by("date")
.values_list("pk", flat=True)
)
past_ids = list(
Event.objects.filter(pk__in=events_to_create_django_id, date__lt=today)
.order_by("-date")
.values_list("pk", flat=True)
)
ordered_create_ids = future_ids + past_ids
create_pairs = []
for event_django_id in ordered_create_ids:
try:
event = Event.objects.get(pk=event_django_id)
create_pairs.append(
(create_gcal_event_request(service, event), on_gcal_event_created)
)
except Event.DoesNotExist:
pass
if create_pairs:
_execute_in_chunks(service, create_pairs)
# --- Updates: attendee status changes ---
update_pairs = []
for gcal_ev in all_events:
try:
event_django_id = int(
gcal_ev["extendedProperties"]["private"]["blechreizID"]
)
django_ev = Event.objects.get(pk=event_django_id)
gcal_attendees = gcal_ev.get("attendees", [])
local_attendees = build_gcal_attendees_obj(django_ev)
if gcal_attendees != local_attendees:
update_pairs.append((update_gcal_event_request(service, django_ev), None))
except Event.DoesNotExist:
pass
except (KeyError, ValueError):
pass
if update_pairs:
_execute_in_chunks(service, update_pairs)
return len(events_to_create_django_id), len(events_to_delete_google_id)
def sync_from_google_to_local(service=None):
"""
Retrieves only participation infos for all events and
updates local database if anything has changed.
"""
if service is None:
service = get_service_object()
if service is None:
logger.error("No service object available for sync")
return False
new_status_received = False
all_events = get_all_gcal_events(service, from_now=True)
for e in all_events:
try:
local_id = e["extendedProperties"]["private"]["blechreizID"]
local_event = Event.objects.get(pk=local_id)
for a in e.get("attendees", []):
try:
user_coupling = UserGCalCoupling.objects.get(email=a["email"])
user = user_coupling.user
part = EventParticipation.get_or_create(user, local_event)
if "comment" in a:
part.comment = a["comment"]
response_status = a.get("responseStatus", "needsAction")
if response_status == "needsAction":
part.status = "-"
elif response_status == "tentative":
part.status = "?"
elif response_status == "accepted":
part.status = "Yes"
elif response_status == "declined":
part.status = "No"
else:
logger.error(
f"Unknown response status when mapping gcal event: {response_status}"
)
prev = EventParticipation.objects.get(
event=part.event, user=part.user
)
# Important: Save only if the participation info has changed
# otherwise everything is synced back to google via the post save signal
# and an endless loop is entered
if prev.status != part.status or prev.comment != part.comment:
part.save()
new_status_received = True
except UserGCalCoupling.DoesNotExist:
pass
except Event.DoesNotExist:
logger.warning(f"Event with id {local_id} not found in local database")
except KeyError as e:
logger.warning(f"Invalid event structure: {e}")
return new_status_received
# ------------------------------------- Push Channel Management ----------------------------------------------------
def check_gcal_subscription(
service=None, time_to_live=14 * 24 * 3600, renew_before_expiry=None
):
"""
Google offers a push service if any event information has changed.
This works using a so called channel, which has a certain time to live.
This method checks that a valid channel exists:
- if none exists a new one is created
- if existing channel does expire soon, the channel is renewed
- if channel has already expired a sync is triggered and a new channel is created
"""
if service is None:
service = get_service_object()
if service is None:
logger.error("No service object available")
return
if renew_before_expiry is None:
renew_before_expiry = 0.8 * time_to_live
callback_url = settings.GCAL_COUPLING["push_url"]
channels = GCalPushChannel.objects.filter(address=callback_url)
if channels.count() > 1:
logger.warning(
f"Multiple GCal channels found for {callback_url}. Stopping all and creating fresh one."
)
for ch in channels:
ch.stop(service)
channels = GCalPushChannel.objects.none()
db_channel = channels.first()
if db_channel is None:
logger.info(f"No GCalCallback Channel exists yet for: {callback_url}")
sync_from_local_to_google(service)
GCalPushChannel.create_new(callback_url, service, time_to_live)
return
cur_time = int(time.time() * 1000)
if db_channel.expiration > cur_time:
# not yet expired
if cur_time + renew_before_expiry * 1000 > db_channel.expiration:
# will expire in less than "renew_before_expiry"
logger.info(f"Renewing Google Calendar Subscription: {callback_url}")
db_channel.stop(service)
GCalPushChannel.create_new(callback_url, service, time_to_live)
else:
logger.info(f"Channel active until {db_channel.expiration}")
else:
logger.info(
"Google calendar subscription had expired - getting new subscription"
)
sync_from_local_to_google(service)
GCalPushChannel.create_new(callback_url, service, time_to_live)
def stop_all_gcal_subscriptions(service=None):
"""Stops all channel subscriptions."""
if service is None:
service = get_service_object()
if service is None:
logger.error("No service object available")
return
for db_channel in GCalPushChannel.objects.all():
logger.info(
f"Stopping channel {db_channel.id} expiry at {db_channel.expiration}"
)
db_channel.stop(service)
def check_if_google_callback_is_valid(token, channel_id, resource_id, service=None):
"""Validate an incoming Google Calendar push notification."""
if service is None:
service = get_service_object()
all_channels = GCalPushChannel.objects.all()
if len(all_channels) == 0:
return False # no known subscriptions -> callback has to be from an old channel
if len(all_channels) > 1:
logger.warning(
"Multiple GCal subscriptions! This is strange and probably an error. "
"All channels are closed and one new is created."
)
stop_all_gcal_subscriptions(service)
check_gcal_subscription()
all_channels = GCalPushChannel.objects.all()
if len(all_channels) != 1:
return False
the_channel = all_channels[0]
if (
channel_id != the_channel.id
or resource_id != the_channel.resource_id
or token != the_channel.token
):
logger.warning(
f"Got GCal Response from an unexpected Channel. "
f"Got ({channel_id}, {resource_id}, {token}) "
f"expected ({the_channel.id}, {the_channel.resource_id}, {the_channel.token}). "
f"Old Channel is stopped."
)
GCalPushChannel.stop_channel(service, channel_id, resource_id)
return False
return True
# Backwards compatibility aliases
syncFromLocalToGoogle = sync_from_local_to_google
syncFromGoogleToLocal = sync_from_google_to_local
checkIfGoogleCallbackIsValid = check_if_google_callback_is_valid
checkGCalSubscription = check_gcal_subscription
stopAllGCalSubscriptions = stop_all_gcal_subscriptions
deleteAllGCalEvents = delete_all_gcal_events
getServiceObject = get_service_object