# This service expects the following ArangoDB setup: # - Collection "talks": contains all speeches/talks (main search target) # - Collection "people": contains person/speaker info # - View "talks_search": ArangoSearch view for fulltext search on "talks" # If you only have these, you don't need to change any collection/view names. from __future__ import annotations import re from collections import Counter from dataclasses import dataclass, field from typing import Iterable, Sequence from arango_client import arango from info import debate_types, party_colors from backend.services.snippets import make_snippet SELECT_COLUMNS = [ "_id", "_key", "id", "anforandetext", "anforande_nummer", "kammaraktivitet", "talare", "datum", "year", "debateurl", "parti", "intressent_id", ] @dataclass class ParsedQuery: must_terms: list[str] = field(default_factory=list) should_groups: list[list[str]] = field(default_factory=list) exclude_terms: list[str] = field(default_factory=list) years: tuple[int, int] | None = None class SearchService: def __init__( self, db=None, collection_name: str = "talks", # Default: "talks" collection view_name: str = "talks_search", # Default: "talks_search" ArangoSearch view analyzer: str = "text_sv", prefix_analyzer_index: str = "edge_ngram", prefix_analyzer_match: str = "match_edge_ngram", ): """ Initialize the search service for ArangoDB. Parameters: ----------- db : Database connection (optional) If not provided, uses arango.db from arango_client collection_name : str Name of the main collection to search (default: "talks") view_name : str Name of the ArangoSearch view (default: "talks_search") analyzer : str Text analyzer for Swedish text tokenization (default: "text_sv") prefix_analyzer_index : str Analyzer for prefix matching at index time (default: "edge_ngram_sv") prefix_analyzer_match : str Analyzer for prefix matching at query time (default: "match_edge_ngram_sv") The service works with both arangosearch views and search-alias views. For arangosearch views, you need to specify the Analyzer context. For search-alias views, Analyzers are inferred from inverted index definitions. """ if db is not None and hasattr(db, "collection"): self.db = db else: self.db = arango.db self.collection = self.db.collection(collection_name) self.collection_name = self.collection.name self.view_name = view_name self.analyzer = analyzer self.prefix_analyzer_index = prefix_analyzer_index self.prefix_analyzer_match = prefix_analyzer_match # Try to get the view - will be None if it doesn't exist try: self.view = self.db.view(self.view_name) except Exception: self.view = None # Try to get people collection try: self.people_collection = self.db.collection("people") except Exception: self.people_collection = None def _normalize_term(self, term: str) -> str: term = term.strip() if not term: return term term = term.lower() term = term.replace("*", "%") if "%" not in term: term = f"%{term}%" if not term.startswith("%"): term = f"%{term}" if not term.endswith("%"): term = f"{term}%" return term def _prepare_search_term(self, term: str) -> tuple[str, bool]: """Clean a raw query term and flag whether it should be handled as a phrase.""" clean = term.strip() clean = clean.replace("*", "") is_phrase = " " in clean return clean, is_phrase def _arangosearch_condition(self, bind_key: str, term: str, is_phrase: bool) -> str: """Create a single ArangoSearch predicate using the Swedish analyzer.""" analyzer = "text_sv" if is_phrase: return f"ANALYZER(PHRASE(doc.anforandetext, @{bind_key}), '{analyzer}')" return f"ANALYZER(doc.anforandetext == @{bind_key}, '{analyzer}')" def _build_search_clause(self, parsed: ParsedQuery) -> tuple[str, dict[str, str], list[str]]: """Produce the SEARCH expression, bind parameters, and terms used for snippets.""" clauses: list[str] = [] bind_vars: dict[str, str] = {} snippet_terms: list[str] = [] seen_terms: set[str] = set() for idx, term in enumerate(parsed.must_terms): clean, is_phrase = self._prepare_search_term(term) if not clean: continue key = f"include_{idx}" bind_vars[key] = clean clauses.append(self._arangosearch_condition(key, clean, is_phrase)) if clean not in seen_terms: snippet_terms.append(clean) seen_terms.add(clean) for group_idx, group in enumerate(parsed.should_groups): sub_clauses: list[str] = [] for term_idx, term in enumerate(group): clean, is_phrase = self._prepare_search_term(term) if not clean: continue key = f"should_{group_idx}_{term_idx}" bind_vars[key] = clean sub_clauses.append(self._arangosearch_condition(key, clean, is_phrase)) if clean not in seen_terms: snippet_terms.append(clean) seen_terms.add(clean) if sub_clauses: clauses.append("(" + " OR ".join(sub_clauses) + ")") for idx, term in enumerate(parsed.exclude_terms): clean, is_phrase = self._prepare_search_term(term) if not clean: continue key = f"exclude_{idx}" bind_vars[key] = clean clauses.append("NOT " + self._arangosearch_condition(key, clean, is_phrase)) return " AND ".join(clauses), bind_vars, snippet_terms def parse_query(self, query: str) -> ParsedQuery: """Parse a raw query string into must/should/exclude buckets and optional year span.""" parsed = ParsedQuery() if not query: return parsed parts = re.findall(r'"[^"]+"|\S+', query.replace("'", '"')) tokens = [token.strip('"') for token in parts] idx = 0 while idx < len(tokens): token = tokens[idx] if not token: idx += 1 continue if token.lower().startswith("år:") and len(token) >= 8: try: start, end = token[3:].split("-", 1) parsed.years = (int(start), int(end)) except ValueError: pass idx += 1 continue is_negative = token.startswith("-") clean = token[1:] if is_negative else token group: list[str] = [clean] j = idx + 1 while j + 1 < len(tokens) and tokens[j].upper() == "OR": group.append(tokens[j + 1]) j += 2 if len(group) > 1: target = parsed.exclude_terms if is_negative else parsed.should_groups if is_negative: target.extend(group) else: target.append(group) idx = j continue if is_negative: parsed.exclude_terms.append(clean) else: parsed.must_terms.append(clean) idx += 1 return parsed def _build_text_predicate(self, parsed: ParsedQuery): """Legacy LIKE-based predicate builder, used only when the ArangoSearch view is unavailable.""" clauses = [] params: dict[str, str] = {} for i, term in enumerate(parsed.must_terms): key = f"include_{i}" clauses.append(f"LIKE(text_lower, @{key})") params[key] = self._normalize_term(term) for i, group in enumerate(parsed.should_groups): sub_clauses = [] for j, term in enumerate(group): key = f"should_{i}_{j}" sub_clauses.append(f"LIKE(text_lower, @{key})") params[key] = self._normalize_term(term) if sub_clauses: clauses.append("(" + " OR ".join(sub_clauses) + ")") for i, term in enumerate(parsed.exclude_terms): key = f"exclude_{i}" clauses.append(f"NOT LIKE(text_lower, @{key})") params[key] = self._normalize_term(term) return clauses, params def _build_arangosearch_predicate( self, parsed: ParsedQuery, ) -> tuple[str, dict[str, str], list[str]]: """ Convert parsed query terms into an ArangoSearch SEARCH clause. Returns: -------- tuple of (search_expression, bind_vars, snippet_terms) - search_expression: The AQL SEARCH clause (without the "SEARCH" keyword) - bind_vars: Dictionary of bind variables to pass to the query - snippet_terms: List of terms to use for snippet highlighting Note: This uses the Swedish text analyzer "text_sv" for tokenization. For phrase searches, PHRASE() is used with the analyzer as third parameter. For single terms, we check if the field contains the token using IN TOKENS(). For prefix searches (ending with * or %), we use the edge_ngram analyzers. Important: The pattern is "doc.field IN TOKENS(term, analyzer)" not the reverse! This checks if any of the tokens from the search term appear in the field. """ analyzer = self.analyzer # "text_sv" for Swedish text prefix_index = self.prefix_analyzer_index # "edge_ngram_sv" prefix_match = self.prefix_analyzer_match # "match_edge_ngram_sv" clauses: list[str] = [] bind_vars: dict[str, str] = {} snippet_terms: list[str] = [] seen_terms: set[str] = set() def _clean(term: str) -> tuple[str, bool, bool]: """ Clean a search term and determine its type. Returns: -------- tuple of (cleaned_term, is_phrase, is_prefix) - cleaned_term: The term with wildcards removed - is_phrase: True if term contains spaces (multi-word phrase) - is_prefix: True if term ends with * or % (prefix search) """ raw = term.strip() is_prefix = raw.endswith(("*", "%")) if is_prefix: raw = raw.rstrip("*%") raw = raw.strip() is_phrase = " " in raw return raw, is_phrase, is_prefix # Process MUST terms (all must match) for idx, term in enumerate(parsed.must_terms): cleaned, is_phrase, is_prefix = _clean(term) if not cleaned: continue key = f"must_{idx}" bind_vars[key] = cleaned if is_prefix: # Use STARTS_WITH instead of edge n-grams clauses.append( f"ANALYZER(STARTS_WITH(doc.anforandetext, @{key}), '{analyzer}')" ) elif is_phrase: # Phrase search: "klimat förändring" matches words in this exact order # PHRASE(field, search_phrase, analyzer) according to documentation clauses.append(f"PHRASE(doc.anforandetext, @{key}, '{analyzer}')") else: # Single word search: tokenize and check if token exists in the field # Pattern: doc.field IN TOKENS(term, analyzer) - field contains any of the tokens clauses.append( f"ANALYZER(doc.anforandetext IN TOKENS(@{key}, '{analyzer}'), '{analyzer}')" ) # Add to snippet terms if not already seen if cleaned not in seen_terms: snippet_terms.append(cleaned) seen_terms.add(cleaned) # Process SHOULD groups (at least one term in each group must match) # Example: "klimat OR miljö" means either "klimat" or "miljö" must appear for group_idx, group in enumerate(parsed.should_groups): or_parts: list[str] = [] for term_idx, term in enumerate(group): cleaned, is_phrase, is_prefix = _clean(term) if not cleaned: continue key = f"should_{group_idx}_{term_idx}" bind_vars[key] = cleaned if is_prefix: or_parts.append( f"ANALYZER(doc.anforandetext IN TOKENS(@{key}, '{prefix_match}'), '{prefix_index}')" ) elif is_phrase: or_parts.append(f"PHRASE(doc.anforandetext, @{key}, '{analyzer}')") else: or_parts.append( f"ANALYZER(doc.anforandetext IN TOKENS(@{key}, '{analyzer}'), '{analyzer}')" ) if cleaned not in seen_terms: snippet_terms.append(cleaned) seen_terms.add(cleaned) # Combine OR parts into a single clause if or_parts: clauses.append("(" + " OR ".join(or_parts) + ")") # Process EXCLUDE terms (must NOT match) # Example: "-riksdag" excludes documents containing "riksdag" for idx, term in enumerate(parsed.exclude_terms): cleaned, is_phrase, is_prefix = _clean(term) if not cleaned: continue key = f"exclude_{idx}" bind_vars[key] = cleaned if is_prefix: clauses.append( f"NOT ANALYZER(doc.anforandetext IN TOKENS(@{key}, '{prefix_match}'), '{prefix_index}')" ) elif is_phrase: clauses.append(f"NOT PHRASE(doc.anforandetext, @{key}, '{analyzer}')") else: clauses.append( f"NOT ANALYZER(doc.anforandetext IN TOKENS(@{key}, '{analyzer}'), '{analyzer}')" ) # Combine all clauses with AND (all conditions must be true) return " AND ".join(clauses), bind_vars, snippet_terms def search( self, payload, include_snippets: bool = True, return_snippets: bool = False, focus_ids: Sequence[str] | None = None, return_fields: Iterable[str] = SELECT_COLUMNS, ): """ Run the search using the ArangoSearch view when available. This method uses the "talks_search" ArangoSearch view if it exists, otherwise falls back to slower LIKE-based search on the "talks" collection. Parameters: ----------- payload : SearchPayload Search parameters including: - q: Query string (supports AND, OR, NOT, phrases with quotes, year ranges) - limit: Maximum number of results - parties: List of party codes to filter by - people: List of speaker names to filter by - debates: List of debate types to filter by - from_year, to_year: Year range filters - speaker: Specific speaker to filter by - speaker_ids: List of speaker ID:s to filter by include_snippets : bool Whether to generate text snippets with highlights (default: True) return_snippets : bool Wheter to only return snippets (default: False) focus_ids : Sequence[str] | None Optional list of document ids to constrain queries to; useful for follow-up searches in chat mode. Returns: -------- tuple of (results, stats, limit_reached) - results: List of matching documents with snippets and metadata - stats: Dictionary with per_party, per_year counts and total - limit_reached: Boolean indicating if more results exist if return_snippets is True, returns only the list of snippets. Query syntax examples: - "klimat" - single word - "klimat förändring" - phrase (exact word order) - klimat OR miljö - either word - klimat -politik - klimat but not politik - klima* - prefix search (klimat, klimatet, etc.) - år:2020-2023 - year range """ use_view = self.view is not None # True if "talks_search" view exists bind_vars: dict[str, object] = {} if payload.limit: limit_string = f"LIMIT 0, @limit" # +1 to check if limit reached bind_vars["limit"] = payload.limit + 1 else: limit_string = "" limit_reached = False filters: list[str] = [] # Parse the query string into structured components parsed = self.parse_query(payload.q) snippet_terms: list[str] = [] search_expression = "" # Focus ID filter: optional list of document IDs to restrict the search to focus_ids: list[str] | None = getattr(payload, "focus_ids", None) if use_view: # Use ArangoSearch view for fast full-text search bind_vars["@view"] = self.view_name # Should be "talks_search" search_expression, search_params, snippet_terms = self._build_arangosearch_predicate(parsed) bind_vars.update(search_params) else: # Fallback to legacy LIKE-based filtering when view is missing # This is much slower but works without a view configured text_clauses, text_params = self._build_text_predicate(parsed) filters.extend(text_clauses) bind_vars.update(text_params) # Add party filter if specified if payload.parties: bind_vars["parties"] = payload.parties filters.append("doc.parti IN @parties") # Add person/speaker filter if specified if payload.people: bind_vars["people"] = payload.people filters.append("doc.talare IN @people") # Add speaker_ids filter - this should take precedence over speaker name if payload.speaker_ids: if isinstance(payload.speaker_ids, str): payload.speaker_ids = [payload.speaker_ids] bind_vars["speaker_ids"] = payload.speaker_ids filters.append("doc.intressent_id in @speaker_ids") elif getattr(payload, "speaker", None): # Fallback to speaker name if no ID provided print(f'Adding speaker name filter: {payload.speaker}') bind_vars["speaker"] = payload.speaker filters.append("doc.talare == @speaker") # Add debate type filter if specified if payload.debates: bind_vars["debates"] = payload.debates filters.append("doc.kammaraktivitet IN @debates") # Handle year range (from query or from payload) year_start = parsed.years[0] if parsed.years else payload.from_year year_end = parsed.years[1] if parsed.years else payload.to_year if year_start is not None: bind_vars["year_start"] = year_start filters.append("doc.year >= @year_start") if year_end is not None: bind_vars["year_end"] = year_end filters.append("doc.year <= @year_end") # Add specific speaker filter if provided if getattr(payload, "speaker", None): bind_vars["speaker"] = payload.speaker filters.append("doc.talare == @speaker") # Add focus ID filter if provided if focus_ids: bind_vars["focus_ids"] = focus_ids filters.append("doc._id IN @focus_ids") # Build the FILTER clause block filters_block = "" if filters: filters_block = "\n " + "\n ".join(f"FILTER {clause}" for clause in filters) if return_fields: select_fields_dict = {field: f"doc.{field}" for field in return_fields} select_fields = str(select_fields_dict).replace("'", "") # Build the complete AQL query if use_view: if search_expression and include_snippets: # With snippets: use OFFSET_INFO() to get match positions for highlighting query = f""" FOR doc IN @@view SEARCH {search_expression} {filters_block} SORT BM25(doc) DESC, doc.dok_datum, doc.anforande_nummer {limit_string} RETURN MERGE({select_fields}, {{ bm25: BM25(doc), _highlight_matches: ( FOR offsetInfo IN OFFSET_INFO(doc, ["anforandetext"]) RETURN {{ name: offsetInfo.name, matches: offsetInfo.offsets[* RETURN {{ offset: CURRENT, match: SUBSTRING_BYTES(VALUE(doc, offsetInfo.name), CURRENT[0], CURRENT[1]) }}] }} ) }}) """.strip() elif search_expression: # Without snippets: simpler query with just BM25 score query = f""" FOR doc IN @@view SEARCH {search_expression}{filters_block} SORT BM25(doc) DESC, doc.dok_datum, doc.anforande_nummer {limit_string} RETURN MERGE(doc, {{ bm25: BM25(doc) }}) """.strip() else: # No search expression: just filter and sort by date query = f""" FOR doc IN @@view{filters_block} SORT doc.dok_datum, doc.anforande_nummer {limit_string} RETURN {select_fields} """.strip() else: # Fallback to collection scan with LIKE-based text search bind_vars["@collection"] = self.collection_name query = f""" FOR doc IN @@collection LET text_lower = LOWER(TO_STRING(doc.anforandetext)){filters_block} SORT doc.dok_datum, doc.anforande_nummer {limit_string} RETURN {select_fields} """.strip() # Execute the query cursor = self.db.aql.execute(query, bind_vars=bind_vars) rows = list(cursor) print(len(rows), 'rows returned from ArangoDB') # Check if we hit the limit if payload.limit: limit_reached = len(rows) > payload.limit if limit_reached: rows = rows[: payload.limit] # Determine which terms to use for snippet generation include_terms = ( snippet_terms or parsed.must_terms or [t for group in parsed.should_groups for t in group] ) # Process results and generate snippets results = [] for doc in rows: print('Document ID:', doc.get("_key")) # Extract highlight information if available highlights = doc.pop("_highlight_matches", None) if isinstance(doc, dict) else None text = doc.get("anforandetext") or "" # Get the _id (primary identifier) _id_value = doc.get("_key") or doc.get("anforande_id") or "" _id = str(_id_value) # Parse audio start position raw_start = doc.get("startpos") try: start_seconds = int(raw_start) if raw_start is not None else None except (TypeError, ValueError): start_seconds = None # Generate snippets with highlights snippet = None snippet_long = None if include_snippets: if highlights: # Use ArangoSearch's OFFSET_INFO for precise highlighting try: byte_text = text.encode("utf-8") matches = [ (info_match["offset"][0], info_match["offset"][1], info_match["match"]) for info in highlights for info_match in info.get("matches", []) ] matches.sort(key=lambda item: item[0]) if matches: # Short snippet: first match with context start, length, matched = matches[0] snippet_bytes = byte_text[ max(0, start - 60) : min(len(byte_text), start + length + 60) ] snippet = snippet_bytes.decode("utf-8", errors="replace").replace( matched, f"**{matched}**", 1 ) # Long snippet: up to 3 matches with context long_segments: list[str] = [] for seg_start, seg_length, seg_match in matches[:3]: seg_bytes = byte_text[ max(0, seg_start - 60) : min(len(byte_text), seg_start + seg_length + 60) ] long_segments.append( seg_bytes.decode("utf-8", errors="replace").replace( seg_match, f"**{seg_match}**", 1 ) ) snippet_long = " ... ".join(long_segments) if long_segments else snippet else: # Fallback to manual snippet generation snippet = make_snippet(text, include_terms, long=False) snippet_long = make_snippet(text, include_terms, long=True) except Exception: # If highlighting fails, fall back to manual snippet generation snippet = make_snippet(text, include_terms, long=False) snippet_long = make_snippet(text, include_terms, long=True) else: # No highlight info available, use manual snippet generation snippet = make_snippet(text, include_terms, long=False) snippet_long = make_snippet(text, include_terms, long=True) # Build result object results.append( { "_id": doc.get("_id"), # "id": doc.get("_id"), # Optional: add for debugging "text": text, "snippet": snippet, "snippet_long": snippet_long, "number": doc.get("anforande_nummer"), "debate_type": debate_types.get(doc.get("kammaraktivitet"), doc.get("kammaraktivitet")), "speaker": doc.get("talare"), "date": doc.get("datum") or doc.get("dok_datum"), "year": doc.get("year"), "url_session": doc.get("debateurl"), "party": doc.get("parti"), "url_audio": doc.get("audiofileurl"), "audio_start_seconds": start_seconds, "intressent_id": doc.get("intressent_id"), "bm25": doc.get("bm25") if isinstance(doc, dict) else None, } ) # Generate statistics per_party = Counter(hit["party"] for hit in results if hit["party"]) per_year = Counter(hit["year"] for hit in results if hit["year"]) stats = { "per_party": dict(per_party), "per_year": {int(k): v for k, v in per_year.items()}, "total": len(results), } if return_snippets: snippets_result = [] for res in results: snippets_result.append( { "_id": res["_id"], "snippet_long": res["snippet_long"], "speaker": res["speaker"], "date": res["date"], "party": res["party"], "debate_type": res["debate_type"], } ) return snippets_result, stats, limit_reached print(f'Search returning {len(results)} results, limit reached: {limit_reached}') return results, stats, limit_reached # --- TESTING CODE --- if __name__ == "__main__": service = SearchService() from dataclasses import dataclass @dataclass class Payload: q: str = 'bidrag' parties: list[str] | None = None people: list[str] | None = None debates: list[str] | None = None from_year: int | None = 1990 to_year: int | None = 2023 speaker: str | None = None limit: int = 10 speaker_ids: str | None = "0958072321310" payload = Payload() results, stats, limited = service.search(payload) print(results)