You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

719 lines
30 KiB

# This service expects the following ArangoDB setup:
# - Collection "talks": contains all speeches/talks (main search target)
# - Collection "people": contains person/speaker info
# - View "talks_search": ArangoSearch view for fulltext search on "talks"
# If you only have these, you don't need to change any collection/view names.
from __future__ import annotations
import re
from collections import Counter
from dataclasses import dataclass, field
from typing import Iterable, Sequence
from arango_client import arango
from info import debate_types, party_colors
from backend.services.snippets import make_snippet
SELECT_COLUMNS = [
"_id",
"_key",
"id",
"anforandetext",
"anforande_nummer",
"kammaraktivitet",
"talare",
"datum",
"year",
"debateurl",
"parti",
"intressent_id",
]
@dataclass
class ParsedQuery:
must_terms: list[str] = field(default_factory=list)
should_groups: list[list[str]] = field(default_factory=list)
exclude_terms: list[str] = field(default_factory=list)
years: tuple[int, int] | None = None
class SearchService:
def __init__(
self,
db=None,
collection_name: str = "talks", # Default: "talks" collection
view_name: str = "talks_search", # Default: "talks_search" ArangoSearch view
analyzer: str = "text_sv",
prefix_analyzer_index: str = "edge_ngram",
prefix_analyzer_match: str = "match_edge_ngram",
):
"""
Initialize the search service for ArangoDB.
Parameters:
-----------
db : Database connection (optional)
If not provided, uses arango.db from arango_client
collection_name : str
Name of the main collection to search (default: "talks")
view_name : str
Name of the ArangoSearch view (default: "talks_search")
analyzer : str
Text analyzer for Swedish text tokenization (default: "text_sv")
prefix_analyzer_index : str
Analyzer for prefix matching at index time (default: "edge_ngram_sv")
prefix_analyzer_match : str
Analyzer for prefix matching at query time (default: "match_edge_ngram_sv")
The service works with both arangosearch views and search-alias views.
For arangosearch views, you need to specify the Analyzer context.
For search-alias views, Analyzers are inferred from inverted index definitions.
"""
if db is not None and hasattr(db, "collection"):
self.db = db
else:
self.db = arango.db
self.collection = self.db.collection(collection_name)
self.collection_name = self.collection.name
self.view_name = view_name
self.analyzer = analyzer
self.prefix_analyzer_index = prefix_analyzer_index
self.prefix_analyzer_match = prefix_analyzer_match
# Try to get the view - will be None if it doesn't exist
try:
self.view = self.db.view(self.view_name)
except Exception:
self.view = None
# Try to get people collection
try:
self.people_collection = self.db.collection("people")
except Exception:
self.people_collection = None
def _normalize_term(self, term: str) -> str:
term = term.strip()
if not term:
return term
term = term.lower()
term = term.replace("*", "%")
if "%" not in term:
term = f"%{term}%"
if not term.startswith("%"):
term = f"%{term}"
if not term.endswith("%"):
term = f"{term}%"
return term
def _prepare_search_term(self, term: str) -> tuple[str, bool]:
"""Clean a raw query term and flag whether it should be handled as a phrase."""
clean = term.strip()
clean = clean.replace("*", "")
is_phrase = " " in clean
return clean, is_phrase
def _arangosearch_condition(self, bind_key: str, term: str, is_phrase: bool) -> str:
"""Create a single ArangoSearch predicate using the Swedish analyzer."""
analyzer = "text_sv"
if is_phrase:
return f"ANALYZER(PHRASE(doc.anforandetext, @{bind_key}), '{analyzer}')"
return f"ANALYZER(doc.anforandetext == @{bind_key}, '{analyzer}')"
def _build_search_clause(self, parsed: ParsedQuery) -> tuple[str, dict[str, str], list[str]]:
"""Produce the SEARCH expression, bind parameters, and terms used for snippets."""
clauses: list[str] = []
bind_vars: dict[str, str] = {}
snippet_terms: list[str] = []
seen_terms: set[str] = set()
for idx, term in enumerate(parsed.must_terms):
clean, is_phrase = self._prepare_search_term(term)
if not clean:
continue
key = f"include_{idx}"
bind_vars[key] = clean
clauses.append(self._arangosearch_condition(key, clean, is_phrase))
if clean not in seen_terms:
snippet_terms.append(clean)
seen_terms.add(clean)
for group_idx, group in enumerate(parsed.should_groups):
sub_clauses: list[str] = []
for term_idx, term in enumerate(group):
clean, is_phrase = self._prepare_search_term(term)
if not clean:
continue
key = f"should_{group_idx}_{term_idx}"
bind_vars[key] = clean
sub_clauses.append(self._arangosearch_condition(key, clean, is_phrase))
if clean not in seen_terms:
snippet_terms.append(clean)
seen_terms.add(clean)
if sub_clauses:
clauses.append("(" + " OR ".join(sub_clauses) + ")")
for idx, term in enumerate(parsed.exclude_terms):
clean, is_phrase = self._prepare_search_term(term)
if not clean:
continue
key = f"exclude_{idx}"
bind_vars[key] = clean
clauses.append("NOT " + self._arangosearch_condition(key, clean, is_phrase))
return " AND ".join(clauses), bind_vars, snippet_terms
def parse_query(self, query: str) -> ParsedQuery:
"""Parse a raw query string into must/should/exclude buckets and optional year span."""
parsed = ParsedQuery()
if not query:
return parsed
parts = re.findall(r'"[^"]+"|\S+', query.replace("'", '"'))
tokens = [token.strip('"') for token in parts]
idx = 0
while idx < len(tokens):
token = tokens[idx]
if not token:
idx += 1
continue
if token.lower().startswith("år:") and len(token) >= 8:
try:
start, end = token[3:].split("-", 1)
parsed.years = (int(start), int(end))
except ValueError:
pass
idx += 1
continue
is_negative = token.startswith("-")
clean = token[1:] if is_negative else token
group: list[str] = [clean]
j = idx + 1
while j + 1 < len(tokens) and tokens[j].upper() == "OR":
group.append(tokens[j + 1])
j += 2
if len(group) > 1:
target = parsed.exclude_terms if is_negative else parsed.should_groups
if is_negative:
target.extend(group)
else:
target.append(group)
idx = j
continue
if is_negative:
parsed.exclude_terms.append(clean)
else:
parsed.must_terms.append(clean)
idx += 1
return parsed
def _build_text_predicate(self, parsed: ParsedQuery):
"""Legacy LIKE-based predicate builder, used only when the ArangoSearch view is unavailable."""
clauses = []
params: dict[str, str] = {}
for i, term in enumerate(parsed.must_terms):
key = f"include_{i}"
clauses.append(f"LIKE(text_lower, @{key})")
params[key] = self._normalize_term(term)
for i, group in enumerate(parsed.should_groups):
sub_clauses = []
for j, term in enumerate(group):
key = f"should_{i}_{j}"
sub_clauses.append(f"LIKE(text_lower, @{key})")
params[key] = self._normalize_term(term)
if sub_clauses:
clauses.append("(" + " OR ".join(sub_clauses) + ")")
for i, term in enumerate(parsed.exclude_terms):
key = f"exclude_{i}"
clauses.append(f"NOT LIKE(text_lower, @{key})")
params[key] = self._normalize_term(term)
return clauses, params
def _build_arangosearch_predicate(
self,
parsed: ParsedQuery,
) -> tuple[str, dict[str, str], list[str]]:
"""
Convert parsed query terms into an ArangoSearch SEARCH clause.
Returns:
--------
tuple of (search_expression, bind_vars, snippet_terms)
- search_expression: The AQL SEARCH clause (without the "SEARCH" keyword)
- bind_vars: Dictionary of bind variables to pass to the query
- snippet_terms: List of terms to use for snippet highlighting
Note: This uses the Swedish text analyzer "text_sv" for tokenization.
For phrase searches, PHRASE() is used with the analyzer as third parameter.
For single terms, we check if the field contains the token using IN TOKENS().
For prefix searches (ending with * or %), we use the edge_ngram analyzers.
Important: The pattern is "doc.field IN TOKENS(term, analyzer)" not the reverse!
This checks if any of the tokens from the search term appear in the field.
"""
analyzer = self.analyzer # "text_sv" for Swedish text
prefix_index = self.prefix_analyzer_index # "edge_ngram_sv"
prefix_match = self.prefix_analyzer_match # "match_edge_ngram_sv"
clauses: list[str] = []
bind_vars: dict[str, str] = {}
snippet_terms: list[str] = []
seen_terms: set[str] = set()
def _clean(term: str) -> tuple[str, bool, bool]:
"""
Clean a search term and determine its type.
Returns:
--------
tuple of (cleaned_term, is_phrase, is_prefix)
- cleaned_term: The term with wildcards removed
- is_phrase: True if term contains spaces (multi-word phrase)
- is_prefix: True if term ends with * or % (prefix search)
"""
raw = term.strip()
is_prefix = raw.endswith(("*", "%"))
if is_prefix:
raw = raw.rstrip("*%")
raw = raw.strip()
is_phrase = " " in raw
return raw, is_phrase, is_prefix
# Process MUST terms (all must match)
for idx, term in enumerate(parsed.must_terms):
cleaned, is_phrase, is_prefix = _clean(term)
if not cleaned:
continue
key = f"must_{idx}"
bind_vars[key] = cleaned
if is_prefix:
# Use STARTS_WITH instead of edge n-grams
clauses.append(
f"ANALYZER(STARTS_WITH(doc.anforandetext, @{key}), '{analyzer}')"
)
elif is_phrase:
# Phrase search: "klimat förändring" matches words in this exact order
# PHRASE(field, search_phrase, analyzer) according to documentation
clauses.append(f"PHRASE(doc.anforandetext, @{key}, '{analyzer}')")
else:
# Single word search: tokenize and check if token exists in the field
# Pattern: doc.field IN TOKENS(term, analyzer) - field contains any of the tokens
clauses.append(
f"ANALYZER(doc.anforandetext IN TOKENS(@{key}, '{analyzer}'), '{analyzer}')"
)
# Add to snippet terms if not already seen
if cleaned not in seen_terms:
snippet_terms.append(cleaned)
seen_terms.add(cleaned)
# Process SHOULD groups (at least one term in each group must match)
# Example: "klimat OR miljö" means either "klimat" or "miljö" must appear
for group_idx, group in enumerate(parsed.should_groups):
or_parts: list[str] = []
for term_idx, term in enumerate(group):
cleaned, is_phrase, is_prefix = _clean(term)
if not cleaned:
continue
key = f"should_{group_idx}_{term_idx}"
bind_vars[key] = cleaned
if is_prefix:
or_parts.append(
f"ANALYZER(doc.anforandetext IN TOKENS(@{key}, '{prefix_match}'), '{prefix_index}')"
)
elif is_phrase:
or_parts.append(f"PHRASE(doc.anforandetext, @{key}, '{analyzer}')")
else:
or_parts.append(
f"ANALYZER(doc.anforandetext IN TOKENS(@{key}, '{analyzer}'), '{analyzer}')"
)
if cleaned not in seen_terms:
snippet_terms.append(cleaned)
seen_terms.add(cleaned)
# Combine OR parts into a single clause
if or_parts:
clauses.append("(" + " OR ".join(or_parts) + ")")
# Process EXCLUDE terms (must NOT match)
# Example: "-riksdag" excludes documents containing "riksdag"
for idx, term in enumerate(parsed.exclude_terms):
cleaned, is_phrase, is_prefix = _clean(term)
if not cleaned:
continue
key = f"exclude_{idx}"
bind_vars[key] = cleaned
if is_prefix:
clauses.append(
f"NOT ANALYZER(doc.anforandetext IN TOKENS(@{key}, '{prefix_match}'), '{prefix_index}')"
)
elif is_phrase:
clauses.append(f"NOT PHRASE(doc.anforandetext, @{key}, '{analyzer}')")
else:
clauses.append(
f"NOT ANALYZER(doc.anforandetext IN TOKENS(@{key}, '{analyzer}'), '{analyzer}')"
)
# Combine all clauses with AND (all conditions must be true)
return " AND ".join(clauses), bind_vars, snippet_terms
def search(
self,
payload,
include_snippets: bool = True,
return_snippets: bool = False,
focus_ids: Sequence[str] | None = None,
return_fields: Iterable[str] = SELECT_COLUMNS,
):
"""
Run the search using the ArangoSearch view when available.
This method uses the "talks_search" ArangoSearch view if it exists,
otherwise falls back to slower LIKE-based search on the "talks" collection.
Parameters:
-----------
payload : SearchPayload
Search parameters including:
- q: Query string (supports AND, OR, NOT, phrases with quotes, year ranges)
- limit: Maximum number of results
- parties: List of party codes to filter by
- people: List of speaker names to filter by
- debates: List of debate types to filter by
- from_year, to_year: Year range filters
- speaker: Specific speaker to filter by
- speaker_ids: List of speaker ID:s to filter by
include_snippets : bool
Whether to generate text snippets with highlights (default: True)
return_snippets : bool
Wheter to only return snippets (default: False)
focus_ids : Sequence[str] | None
Optional list of document ids to constrain queries to; useful for follow-up searches in chat mode.
Returns:
--------
tuple of (results, stats, limit_reached)
- results: List of matching documents with snippets and metadata
- stats: Dictionary with per_party, per_year counts and total
- limit_reached: Boolean indicating if more results exist
if return_snippets is True, returns only the list of snippets.
Query syntax examples:
- "klimat" - single word
- "klimat förändring" - phrase (exact word order)
- klimat OR miljö - either word
- klimat -politik - klimat but not politik
- klima* - prefix search (klimat, klimatet, etc.)
- år:2020-2023 - year range
"""
use_view = self.view is not None # True if "talks_search" view exists
bind_vars: dict[str, object] = {}
if payload.limit:
limit_string = f"LIMIT 0, @limit" # +1 to check if limit reached
bind_vars["limit"] = payload.limit + 1
else:
limit_string = ""
limit_reached = False
filters: list[str] = []
# Parse the query string into structured components
parsed = self.parse_query(payload.q)
snippet_terms: list[str] = []
search_expression = ""
# Focus ID filter: optional list of document IDs to restrict the search to
focus_ids: list[str] | None = getattr(payload, "focus_ids", None)
if use_view:
# Use ArangoSearch view for fast full-text search
bind_vars["@view"] = self.view_name # Should be "talks_search"
search_expression, search_params, snippet_terms = self._build_arangosearch_predicate(parsed)
bind_vars.update(search_params)
else:
# Fallback to legacy LIKE-based filtering when view is missing
# This is much slower but works without a view configured
text_clauses, text_params = self._build_text_predicate(parsed)
filters.extend(text_clauses)
bind_vars.update(text_params)
# Add party filter if specified
if payload.parties:
bind_vars["parties"] = payload.parties
filters.append("doc.parti IN @parties")
# Add person/speaker filter if specified
if payload.people:
bind_vars["people"] = payload.people
filters.append("doc.talare IN @people")
# Add speaker_ids filter - this should take precedence over speaker name
if payload.speaker_ids:
if isinstance(payload.speaker_ids, str):
payload.speaker_ids = [payload.speaker_ids]
bind_vars["speaker_ids"] = payload.speaker_ids
filters.append("doc.intressent_id in @speaker_ids")
elif getattr(payload, "speaker", None):
# Fallback to speaker name if no ID provided
print(f'Adding speaker name filter: {payload.speaker}')
bind_vars["speaker"] = payload.speaker
filters.append("doc.talare == @speaker")
# Add debate type filter if specified
if payload.debates:
bind_vars["debates"] = payload.debates
filters.append("doc.kammaraktivitet IN @debates")
# Handle year range (from query or from payload)
year_start = parsed.years[0] if parsed.years else payload.from_year
year_end = parsed.years[1] if parsed.years else payload.to_year
if year_start is not None:
bind_vars["year_start"] = year_start
filters.append("doc.year >= @year_start")
if year_end is not None:
bind_vars["year_end"] = year_end
filters.append("doc.year <= @year_end")
# Add specific speaker filter if provided
if getattr(payload, "speaker", None):
bind_vars["speaker"] = payload.speaker
filters.append("doc.talare == @speaker")
# Add focus ID filter if provided
if focus_ids:
bind_vars["focus_ids"] = focus_ids
filters.append("doc._id IN @focus_ids")
# Build the FILTER clause block
filters_block = ""
if filters:
filters_block = "\n " + "\n ".join(f"FILTER {clause}" for clause in filters)
if return_fields:
select_fields_dict = {field: f"doc.{field}" for field in return_fields}
select_fields = str(select_fields_dict).replace("'", "")
# Build the complete AQL query
if use_view:
if search_expression and include_snippets:
# With snippets: use OFFSET_INFO() to get match positions for highlighting
query = f"""
FOR doc IN @@view
SEARCH {search_expression}
{filters_block}
SORT BM25(doc) DESC, doc.dok_datum, doc.anforande_nummer
{limit_string}
RETURN MERGE({select_fields}, {{
bm25: BM25(doc),
_highlight_matches: (
FOR offsetInfo IN OFFSET_INFO(doc, ["anforandetext"])
RETURN {{
name: offsetInfo.name,
matches: offsetInfo.offsets[* RETURN {{
offset: CURRENT,
match: SUBSTRING_BYTES(VALUE(doc, offsetInfo.name), CURRENT[0], CURRENT[1])
}}]
}}
)
}})
""".strip()
elif search_expression:
# Without snippets: simpler query with just BM25 score
query = f"""
FOR doc IN @@view
SEARCH {search_expression}{filters_block}
SORT BM25(doc) DESC, doc.dok_datum, doc.anforande_nummer
{limit_string}
RETURN MERGE(doc, {{ bm25: BM25(doc) }})
""".strip()
else:
# No search expression: just filter and sort by date
query = f"""
FOR doc IN @@view{filters_block}
SORT doc.dok_datum, doc.anforande_nummer
{limit_string}
RETURN {select_fields}
""".strip()
else:
# Fallback to collection scan with LIKE-based text search
bind_vars["@collection"] = self.collection_name
query = f"""
FOR doc IN @@collection
LET text_lower = LOWER(TO_STRING(doc.anforandetext)){filters_block}
SORT doc.dok_datum, doc.anforande_nummer
{limit_string}
RETURN {select_fields}
""".strip()
# Execute the query
cursor = self.db.aql.execute(query, bind_vars=bind_vars)
rows = list(cursor)
print(len(rows), 'rows returned from ArangoDB')
# Check if we hit the limit
if payload.limit:
limit_reached = len(rows) > payload.limit
if limit_reached:
rows = rows[: payload.limit]
# Determine which terms to use for snippet generation
include_terms = (
snippet_terms
or parsed.must_terms
or [t for group in parsed.should_groups for t in group]
)
# Process results and generate snippets
results = []
for doc in rows:
print('Document ID:', doc.get("_key"))
# Extract highlight information if available
highlights = doc.pop("_highlight_matches", None) if isinstance(doc, dict) else None
text = doc.get("anforandetext") or ""
# Get the _id (primary identifier)
_id_value = doc.get("_key") or doc.get("anforande_id") or ""
_id = str(_id_value)
# Parse audio start position
raw_start = doc.get("startpos")
try:
start_seconds = int(raw_start) if raw_start is not None else None
except (TypeError, ValueError):
start_seconds = None
# Generate snippets with highlights
snippet = None
snippet_long = None
if include_snippets:
if highlights:
# Use ArangoSearch's OFFSET_INFO for precise highlighting
try:
byte_text = text.encode("utf-8")
matches = [
(info_match["offset"][0], info_match["offset"][1], info_match["match"])
for info in highlights
for info_match in info.get("matches", [])
]
matches.sort(key=lambda item: item[0])
if matches:
# Short snippet: first match with context
start, length, matched = matches[0]
snippet_bytes = byte_text[
max(0, start - 60) : min(len(byte_text), start + length + 60)
]
snippet = snippet_bytes.decode("utf-8", errors="replace").replace(
matched, f"**{matched}**", 1
)
# Long snippet: up to 3 matches with context
long_segments: list[str] = []
for seg_start, seg_length, seg_match in matches[:3]:
seg_bytes = byte_text[
max(0, seg_start - 60) : min(len(byte_text), seg_start + seg_length + 60)
]
long_segments.append(
seg_bytes.decode("utf-8", errors="replace").replace(
seg_match, f"**{seg_match}**", 1
)
)
snippet_long = " ... ".join(long_segments) if long_segments else snippet
else:
# Fallback to manual snippet generation
snippet = make_snippet(text, include_terms, long=False)
snippet_long = make_snippet(text, include_terms, long=True)
except Exception:
# If highlighting fails, fall back to manual snippet generation
snippet = make_snippet(text, include_terms, long=False)
snippet_long = make_snippet(text, include_terms, long=True)
else:
# No highlight info available, use manual snippet generation
snippet = make_snippet(text, include_terms, long=False)
snippet_long = make_snippet(text, include_terms, long=True)
# Build result object
results.append(
{
"_id": doc.get("_id"),
# "id": doc.get("_id"), # Optional: add for debugging
"text": text,
"snippet": snippet,
"snippet_long": snippet_long,
"number": doc.get("anforande_nummer"),
"debate_type": debate_types.get(doc.get("kammaraktivitet"), doc.get("kammaraktivitet")),
"speaker": doc.get("talare"),
"date": doc.get("datum") or doc.get("dok_datum"),
"year": doc.get("year"),
"url_session": doc.get("debateurl"),
"party": doc.get("parti"),
"url_audio": doc.get("audiofileurl"),
"audio_start_seconds": start_seconds,
"intressent_id": doc.get("intressent_id"),
"bm25": doc.get("bm25") if isinstance(doc, dict) else None,
}
)
# Generate statistics
per_party = Counter(hit["party"] for hit in results if hit["party"])
per_year = Counter(hit["year"] for hit in results if hit["year"])
stats = {
"per_party": dict(per_party),
"per_year": {int(k): v for k, v in per_year.items()},
"total": len(results),
}
if return_snippets:
snippets_result = []
for res in results:
snippets_result.append(
{
"_id": res["_id"],
"snippet_long": res["snippet_long"],
"speaker": res["speaker"],
"date": res["date"],
"party": res["party"],
"debate_type": res["debate_type"],
}
)
return snippets_result, stats, limit_reached
print(f'Search returning {len(results)} results, limit reached: {limit_reached}')
return results, stats, limit_reached
# --- TESTING CODE ---
if __name__ == "__main__":
service = SearchService()
from dataclasses import dataclass
@dataclass
class Payload:
q: str = 'bidrag'
parties: list[str] | None = None
people: list[str] | None = None
debates: list[str] | None = None
from_year: int | None = 1990
to_year: int | None = 2023
speaker: str | None = None
limit: int = 10
speaker_ids: str | None = "0958072321310"
payload = Payload()
results, stats, limited = service.search(payload)
print(results)