eventlens/backend/app/services.py

276 lines
9.4 KiB
Python

from datetime import datetime, timedelta
import re
from sqlalchemy import desc, select
from sqlalchemy.orm import Session
from app.models import NotificationStatus, NotificationType, TrackedEvent, WatchItem
from app.notifications import send_email_notification
from app.providers.registry import get_providers
from app.schemas import SyncResult
PROVIDER_PRIORITY = {
"ticketmaster": 3,
"eventim": 2,
"bandsintown": 1,
}
def list_watch_items(db: Session) -> list[WatchItem]:
return list(db.scalars(select(WatchItem).order_by(WatchItem.name)))
def list_events(db: Session) -> list[TrackedEvent]:
events = list(db.scalars(select(TrackedEvent).order_by(desc(TrackedEvent.event_date))))
deduped: list[TrackedEvent] = []
for event in events:
duplicate_index = next(
(
index
for index, existing in enumerate(deduped)
if events_are_equivalent(existing, event)
),
None,
)
if duplicate_index is None:
deduped.append(event)
continue
if is_preferred_event(event, deduped[duplicate_index]):
deduped[duplicate_index] = event
return deduped
def list_notifications(db: Session):
from app.models import NotificationLog
return list(db.scalars(select(NotificationLog).order_by(desc(NotificationLog.created_at))))
def normalize_event_text(value: str | None) -> str:
if not value:
return ""
cleaned = re.sub(r"[^a-z0-9]+", " ", value.casefold())
return " ".join(cleaned.split())
def titles_match(left: str | None, right: str | None) -> bool:
normalized_left = normalize_event_text(left)
normalized_right = normalize_event_text(right)
if not normalized_left or not normalized_right:
return False
return (
normalized_left == normalized_right
or normalized_left in normalized_right
or normalized_right in normalized_left
)
def get_event_date_key(value: datetime | None):
return value.date() if value else None
def events_are_equivalent(left: TrackedEvent, right: TrackedEvent) -> bool:
if left.watch_item_id != right.watch_item_id:
return False
if get_event_date_key(left.event_date) != get_event_date_key(right.event_date):
return False
left_city = normalize_event_text(left.city)
right_city = normalize_event_text(right.city)
if left_city and right_city and left_city != right_city:
return False
title_matches = titles_match(left.title, right.title)
venue_matches = titles_match(left.venue_name, right.venue_name)
return title_matches or venue_matches
def is_preferred_event(candidate: TrackedEvent, current: TrackedEvent) -> bool:
candidate_score = (
1 if candidate.is_ticket_purchased else 0,
PROVIDER_PRIORITY.get(candidate.source, 0),
1 if candidate.ticket_url else 0,
candidate.last_seen_at or datetime.min,
)
current_score = (
1 if current.is_ticket_purchased else 0,
PROVIDER_PRIORITY.get(current.source, 0),
1 if current.ticket_url else 0,
current.last_seen_at or datetime.min,
)
return candidate_score > current_score
def has_equivalent_existing_event(db: Session, tracked_event: TrackedEvent) -> bool:
stmt = select(TrackedEvent).where(
TrackedEvent.watch_item_id == tracked_event.watch_item_id,
TrackedEvent.id != tracked_event.id,
)
existing_events = list(db.scalars(stmt))
return any(events_are_equivalent(tracked_event, existing) for existing in existing_events)
def upsert_event(
db: Session,
watch_item: WatchItem,
provider_name: str,
event_data: dict,
) -> tuple[TrackedEvent, bool]:
stmt = select(TrackedEvent).where(
TrackedEvent.watch_item_id == watch_item.id,
TrackedEvent.source == provider_name,
TrackedEvent.external_id == event_data["external_id"],
)
tracked_event = db.scalar(stmt)
is_new = tracked_event is None
if tracked_event is None:
tracked_event = TrackedEvent(
watch_item=watch_item,
source=provider_name,
external_id=event_data["external_id"],
title=event_data["title"],
matched_term=event_data["matched_term"],
venue_name=event_data.get("venue_name"),
city=event_data.get("city"),
country_code=event_data.get("country_code"),
event_date=event_data.get("event_date"),
ticket_url=event_data.get("ticket_url"),
image_url=event_data.get("image_url"),
raw_payload=event_data.get("raw_payload"),
)
db.add(tracked_event)
db.flush()
else:
tracked_event.title = event_data["title"]
tracked_event.matched_term = event_data["matched_term"]
tracked_event.venue_name = event_data.get("venue_name")
tracked_event.city = event_data.get("city")
tracked_event.country_code = event_data.get("country_code")
tracked_event.event_date = event_data.get("event_date")
tracked_event.ticket_url = event_data.get("ticket_url")
tracked_event.image_url = event_data.get("image_url")
tracked_event.raw_payload = event_data.get("raw_payload")
tracked_event.last_seen_at = datetime.utcnow()
return tracked_event, is_new
def run_sync(db: Session) -> SyncResult:
providers = get_providers()
active_items = list(
db.scalars(select(WatchItem).where(WatchItem.is_active.is_(True)).order_by(WatchItem.name))
)
new_events = 0
updated_events = 0
notifications_sent = 0
notifications_skipped = 0
for watch_item in active_items:
for provider in providers:
try:
events = provider.search_events(
term=watch_item.name,
watch_type=watch_item.watch_type,
region_scope=watch_item.region_scope,
)
except Exception:
db.rollback()
continue
for event_data in events:
tracked_event, is_new = upsert_event(
db=db,
watch_item=watch_item,
provider_name=provider.source_name,
event_data=event_data,
)
if is_new:
new_events += 1
else:
updated_events += 1
should_notify = (
is_new
and tracked_event.discovery_notified_at is None
and not has_equivalent_existing_event(db, tracked_event)
)
if should_notify:
status = send_email_notification(
db=db,
tracked_event=tracked_event,
notification_type=NotificationType.discovery,
subject=f"Neuer Termin fuer {watch_item.name}",
body=(
f"Es wurde ein neuer Termin fuer '{watch_item.name}' gefunden.\n\n"
f"Quelle: {tracked_event.source}\n"
f"Titel: {tracked_event.title}\n"
f"Ort: {tracked_event.venue_name or 'unbekannt'}\n"
f"Stadt: {tracked_event.city or 'unbekannt'}\n"
f"Datum: {tracked_event.event_date or 'unbekannt'}\n"
f"Tickets: {tracked_event.ticket_url or 'keine URL'}\n"
),
)
if status == NotificationStatus.sent:
tracked_event.discovery_notified_at = datetime.utcnow()
notifications_sent += 1
else:
notifications_skipped += 1
db.commit()
return SyncResult(
scanned_watch_items=len(active_items),
new_events=new_events,
updated_events=updated_events,
notifications_sent=notifications_sent,
notifications_skipped=notifications_skipped,
)
def send_reminders(db: Session) -> int:
now = datetime.utcnow()
start = now + timedelta(days=6)
end = now + timedelta(days=7, hours=12)
stmt = select(TrackedEvent).where(
TrackedEvent.is_ticket_purchased.is_(True),
TrackedEvent.event_date.is_not(None),
TrackedEvent.reminder_notified_at.is_(None),
)
candidates = list(db.scalars(stmt))
sent_count = 0
for tracked_event in candidates:
if tracked_event.event_date is None:
continue
if not (start <= tracked_event.event_date <= end):
continue
status = send_email_notification(
db=db,
tracked_event=tracked_event,
notification_type=NotificationType.reminder,
subject=f"Erinnerung: {tracked_event.title} in einer Woche",
body=(
f"Du hast Karten fuer '{tracked_event.title}' markiert.\n\n"
f"Veranstaltung: {tracked_event.title}\n"
f"Datum: {tracked_event.event_date}\n"
f"Venue: {tracked_event.venue_name or 'unbekannt'}\n"
f"Stadt: {tracked_event.city or 'unbekannt'}\n"
f"Tickets: {tracked_event.ticket_url or 'keine URL'}\n"
),
)
if status == NotificationStatus.sent:
tracked_event.reminder_notified_at = datetime.utcnow()
sent_count += 1
db.commit()
return sent_count