Files
eventlens/backend/app/source_scanner.py
T
2026-04-18 14:23:24 +02:00

447 lines
16 KiB
Python

import json
import re
from datetime import datetime
from html import unescape
from urllib.parse import urljoin
import requests
from bs4 import BeautifulSoup
from app.models import RegionScope, WatchItem, WatchSource, WatchType
from app.providers.utils import normalize_search_text
MONTH_ALIASES = {
"jan": 1,
"januar": 1,
"feb": 2,
"februar": 2,
"maer": 3,
"maerz": 3,
"mar": 3,
"maerz": 3,
"apr": 4,
"april": 4,
"mai": 5,
"jun": 6,
"juni": 6,
"jul": 7,
"juli": 7,
"aug": 8,
"august": 8,
"sep": 9,
"sept": 9,
"september": 9,
"okt": 10,
"oktober": 10,
"nov": 11,
"november": 11,
"dez": 12,
"dezember": 12,
}
class SourceScanner:
headers = {
"User-Agent": "eventlens/0.1 (+https://local)",
"Accept": "text/html,application/xhtml+xml,application/json",
"Accept-Language": "de-DE,de;q=0.9,en;q=0.7",
}
def scan(self, watch_item: WatchItem, source: WatchSource) -> list[dict]:
response = requests.get(
source.url,
headers=self.headers,
timeout=30,
)
response.raise_for_status()
content_type = response.headers.get("content-type", "")
if "application/json" in content_type:
return self._scan_json(watch_item, source, response.json())
return self._scan_html(watch_item, source, response.text)
def _scan_json(self, watch_item: WatchItem, source: WatchSource, payload) -> list[dict]:
events = self._extract_jsonld_events(payload)
return self._events_from_jsonld(watch_item, source, events)
def _scan_html(self, watch_item: WatchItem, source: WatchSource, html: str) -> list[dict]:
soup = BeautifulSoup(html, "html.parser")
jsonld_events = []
for script in soup.find_all("script", type="application/ld+json"):
raw_payload = script.string or script.get_text()
if not raw_payload:
continue
try:
payload = json.loads(unescape(raw_payload))
except json.JSONDecodeError:
continue
jsonld_events.extend(self._extract_jsonld_events(payload))
jsonld_results = self._events_from_jsonld(watch_item, source, jsonld_events)
if jsonld_results:
return jsonld_results
return self._events_from_html_text(watch_item, source, soup)
def _extract_jsonld_events(self, payload) -> list[dict]:
events: list[dict] = []
if isinstance(payload, list):
for item in payload:
events.extend(self._extract_jsonld_events(item))
return events
if not isinstance(payload, dict):
return events
graph = payload.get("@graph")
if isinstance(graph, list):
for item in graph:
events.extend(self._extract_jsonld_events(item))
item_type = payload.get("@type")
if isinstance(item_type, list):
is_event = "Event" in item_type
else:
is_event = item_type == "Event"
if is_event:
events.append(payload)
return events
def _events_from_jsonld(
self,
watch_item: WatchItem,
source: WatchSource,
events: list[dict],
) -> list[dict]:
results: list[dict] = []
normalized_term = normalize_search_text(watch_item.name)
for event in events:
title = event.get("name") or ""
performers = self._extract_performer_names(event)
haystack = normalize_search_text(" ".join([title] + performers))
if normalized_term not in haystack:
continue
location = event.get("location") or {}
address = location.get("address") or {}
city = address.get("addressLocality") or location.get("name")
if watch_item.region_scope == RegionScope.hamburg and normalize_search_text(city) != "hamburg":
continue
event_date = self._parse_datetime(event.get("startDate"))
if event_date and event_date.date() < datetime.utcnow().date():
continue
ticket_url = event.get("url") or source.url
results.append(
{
"external_id": str(event.get("@id") or ticket_url or f"{source.id}:{title}"),
"title": title or watch_item.name,
"matched_term": watch_item.name,
"venue_name": location.get("name") or source.label,
"city": city,
"country_code": "DE",
"event_date": event_date,
"ticket_url": ticket_url,
"image_url": self._extract_image(event),
"raw_payload": event,
}
)
return results
def _events_from_html_text(
self,
watch_item: WatchItem,
source: WatchSource,
soup: BeautifulSoup,
) -> list[dict]:
text = soup.get_text(" ", strip=True)
normalized_text = normalize_search_text(text)
normalized_term = normalize_search_text(watch_item.name)
if normalized_term not in normalized_text:
return []
results: list[dict] = []
seen_keys: set[str] = set()
for context in self._find_matching_contexts(soup, watch_item):
context_text = context.get_text(" ", strip=True)
event_date = self._find_nearest_date(context_text, watch_item.name)
if event_date is None:
continue
if event_date.date() < datetime.utcnow().date():
continue
if (
watch_item.region_scope == RegionScope.hamburg
and "hamburg" not in normalize_search_text(context_text)
):
continue
title = self._find_title(context, watch_item.name)
link = self._find_nearest_link(context, watch_item.name, source.url) or source.url
key = f"{source.id}:{normalize_search_text(title)}:{event_date.date().isoformat()}"
if key in seen_keys:
continue
seen_keys.add(key)
results.append(
{
"external_id": key,
"title": title,
"matched_term": watch_item.name,
"venue_name": self._find_venue(context_text, source.label),
"city": "Hamburg" if watch_item.region_scope == RegionScope.hamburg else None,
"country_code": "DE",
"event_date": event_date,
"ticket_url": link,
"image_url": None,
"raw_payload": {
"source_url": source.url,
"parser": "html_text",
"context": context_text[:1000],
},
}
)
return results
def _extract_performer_names(self, event: dict) -> list[str]:
performer = event.get("performer") or event.get("performers")
if isinstance(performer, dict):
return [performer.get("name", "")]
if isinstance(performer, list):
return [item.get("name", "") for item in performer if isinstance(item, dict)]
return []
def _extract_image(self, event: dict) -> str | None:
image = event.get("image")
if isinstance(image, str):
return image
if isinstance(image, list):
for item in image:
if isinstance(item, str):
return item
return None
def _parse_datetime(self, value: str | None) -> datetime | None:
if not value:
return None
try:
return datetime.fromisoformat(value.replace("Z", "+00:00")).replace(tzinfo=None)
except ValueError:
pass
for fmt in ("%d.%m.%Y", "%Y-%m-%d"):
try:
return datetime.strptime(value[:10], fmt)
except ValueError:
continue
return None
def _find_nearest_date(self, text: str, term: str) -> datetime | None:
normalized_term = normalize_search_text(term)
normalized_text = normalize_search_text(text)
term_index = normalized_text.find(normalized_term)
search_area = text
if term_index >= 0:
start = max(0, term_index - 300)
end = min(len(text), term_index + 500)
search_area = text[start:end]
candidates: list[datetime] = []
for pattern in (
r"\b(\d{1,2}\.\d{1,2}\.\d{4})\b",
r"\b(\d{1,2}\.\d{1,2}\.\d{2})\b",
r"\b(\d{1,2}\.\d{1,2}\.)\b",
):
for match in re.finditer(pattern, search_area):
parsed = self._parse_german_date(match.group(1))
if parsed:
candidates.append(parsed)
month_name_pattern = (
r"jan(?:uar)?|feb(?:ruar)?|m(?:ae|ä)r(?:z)?|apr(?:il)?|mai|jun(?:i)?|"
r"jul(?:i)?|aug(?:ust)?|sep(?:t|tember)?|okt(?:ober)?|nov(?:ember)?|dez(?:ember)?"
)
for match in re.finditer(
rf"\b(\d{{1,2}})\.?\s+({month_name_pattern})\.?\s*(\d{{4}})?\b",
search_area,
re.IGNORECASE,
):
parsed = self._parse_named_month_date(match.group(1), match.group(2), match.group(3))
if parsed:
candidates.append(parsed)
for match in re.finditer(
rf"\b({month_name_pattern})\.?\s+(\d{{1,2}})\.?\s*(\d{{4}})?\b",
search_area,
re.IGNORECASE,
):
parsed = self._parse_named_month_date(match.group(2), match.group(1), match.group(3))
if parsed:
candidates.append(parsed)
future_candidates = [
candidate for candidate in candidates if candidate.date() >= datetime.utcnow().date()
]
if future_candidates:
return sorted(future_candidates)[0]
return sorted(candidates)[0] if candidates else None
def _parse_german_date(self, value: str) -> datetime | None:
cleaned = value.strip()
current_year = datetime.utcnow().year
candidates = [cleaned]
if re.fullmatch(r"\d{1,2}\.\d{1,2}\.", cleaned):
candidates.append(f"{cleaned}{current_year}")
candidates.append(f"{cleaned}{current_year + 1}")
elif re.fullmatch(r"\d{1,2}\.\d{1,2}\.\d{2}", cleaned):
day, month, year = cleaned.split(".")
candidates.append(f"{day}.{month}.20{year}")
for candidate in candidates:
try:
parsed = datetime.strptime(candidate, "%d.%m.%Y")
if parsed.date() < datetime.utcnow().date() and candidate != cleaned:
continue
return parsed
except ValueError:
continue
return None
def _parse_named_month_date(
self,
day_value: str,
month_value: str,
year_value: str | None,
) -> datetime | None:
month = MONTH_ALIASES.get(normalize_search_text(month_value).rstrip("."))
if month is None:
return None
day = int(day_value)
current_year = datetime.utcnow().year
years = [int(year_value)] if year_value else [current_year, current_year + 1]
for year in years:
try:
parsed = datetime(year, month, day)
except ValueError:
continue
if year_value or parsed.date() >= datetime.utcnow().date():
return parsed
return None
def _find_matching_contexts(self, soup: BeautifulSoup, watch_item: WatchItem) -> list:
normalized_term = normalize_search_text(watch_item.name)
selectors = [
"li.card",
".tourplan .row",
"[class*=event]",
"[class*=termin]",
"article",
"tr",
"li",
".row",
]
candidates = []
seen_nodes = set()
for selector in selectors:
for node in soup.select(selector):
if id(node) in seen_nodes:
continue
seen_nodes.add(id(node))
text = node.get_text(" ", strip=True)
if normalized_term not in normalize_search_text(text):
continue
if len(text) > 3500:
continue
if self._find_nearest_date(text, watch_item.name):
candidates.append(node)
if candidates:
return candidates
fallback = self._find_best_context(soup, watch_item.name)
return [fallback] if fallback is not None else []
def _find_venue(self, text: str, default: str) -> str:
lines = [line.strip() for line in re.split(r"\s{2,}|\n|\r", text) if line.strip()]
for line in lines:
normalized = normalize_search_text(line)
if "hamburg" in normalized and len(line) <= 120:
return line
return default
def _find_best_context(self, soup: BeautifulSoup, term: str):
normalized_term = normalize_search_text(term)
candidates = []
for node in soup.find_all(string=True):
if normalized_term in normalize_search_text(str(node)):
parent = node.parent
if parent is None:
continue
best_parent = self._climb_to_context_with_date(parent, term)
text = best_parent.get_text(" ", strip=True)
candidates.append(
(
0 if self._find_nearest_date(text, term) else 1,
len(text),
best_parent,
)
)
if not candidates:
return None
candidates.sort(key=lambda item: (item[0], item[1]))
return candidates[0][2]
def _climb_to_context_with_date(self, node, term: str):
current = node
best = node
for _ in range(6):
if current is None:
break
context_text = current.get_text(" ", strip=True)
if self._find_nearest_date(context_text, term):
return current
best = current
current = current.parent
return best
def _find_title(self, soup: BeautifulSoup, term: str) -> str:
if soup is None:
return term
normalized_term = normalize_search_text(term)
for heading in soup.find_all(["h1", "h2", "h3", "h4", "strong", "b", "a"]):
title = heading.get_text(" ", strip=True)
if normalized_term in normalize_search_text(title):
return title
text = soup.get_text(" ", strip=True)
dated_match = re.search(
r"(.{0,40}\d{1,2}\.\d{1,2}\.(?:\d{2,4})?.{0,100}"
+ re.escape(term)
+ r".{0,100})",
text,
re.IGNORECASE,
)
if dated_match:
return " ".join(dated_match.group(1).split())
match = re.search(r"(.{0,80}" + re.escape(term) + r".{0,80})", text, re.IGNORECASE)
if match:
return " ".join(match.group(1).split())
return term
def _find_nearest_link(self, soup: BeautifulSoup, term: str, base_url: str) -> str | None:
normalized_term = normalize_search_text(term)
for link in soup.find_all("a", href=True):
if normalized_term in normalize_search_text(link.get_text(" ", strip=True)):
return urljoin(base_url, link["href"])
return None