|
|
import multiprocessing |
|
|
from _llm import LLM as LLM_garda |
|
|
from _openai import LLM_OpenAI as LLM |
|
|
#from _llm import LLM |
|
|
from _arango import arango |
|
|
from langchain_text_splitters import CharacterTextSplitter |
|
|
import difflib |
|
|
import re |
|
|
import random |
|
|
from time import sleep |
|
|
from pprint import pprint |
|
|
from print_color import * |
|
|
|
|
|
|
|
|
class Interrogation: |
|
|
def __init__(self, _key, text): |
|
|
self._key = _key |
|
|
self.text = text |
|
|
self.mentioned_persons = [] |
|
|
self.chunks = None |
|
|
|
|
|
|
|
|
def check_name(person, answer_person, text): |
|
|
print_yellow(person, " - ", answer_person) |
|
|
same = False |
|
|
# If full name similarity is below a threshold (e.g., 0.5), compare first names only |
|
|
|
|
|
# If person only has one name, first or last, compare that to first and last name of answer_person |
|
|
print('Length person:', len(person.strip().split())) |
|
|
|
|
|
if len(person.strip().split()) == 1: |
|
|
llm = LLM() |
|
|
answer_first_name = answer_person.split()[0].strip() |
|
|
answer_last_name = answer_person.split()[-1].strip() |
|
|
|
|
|
first_name_similarity = difflib.SequenceMatcher( |
|
|
None, person, answer_first_name |
|
|
).ratio() |
|
|
|
|
|
last_name_similarity = difflib.SequenceMatcher( |
|
|
None, person, answer_last_name |
|
|
).ratio() |
|
|
|
|
|
print("First name similarity:", first_name_similarity) |
|
|
print("Last name similarity:", last_name_similarity) |
|
|
|
|
|
if difflib.SequenceMatcher(None, person, answer_first_name).ratio() > 0.9: |
|
|
if answer_last_name in text: |
|
|
same = True |
|
|
else: |
|
|
# Count how many time the first name appears in the first_names list |
|
|
first_names = [ |
|
|
i["name"].split()[0] for i in db.collection("persons").all() |
|
|
] |
|
|
first_name_count = first_names.count(answer_first_name) |
|
|
print("First name count:", first_name_count) |
|
|
if first_name_count == 1: |
|
|
same = True |
|
|
else: |
|
|
llm = LLM_garda() |
|
|
answer = llm.generate( |
|
|
f'Nämns någon med efternamnet "{answer_last_name}" i texten nedan?\n\n"""{text[:5000]}"""\n\nNamnet behöver inte vara stavat på exakt samma sätt, men det ska vara samma namn. Svara "JA" eller "NEJ"' |
|
|
) |
|
|
if "JA" in answer: |
|
|
same = True |
|
|
|
|
|
elif difflib.SequenceMatcher(None, person, answer_last_name).ratio() > 0.9: |
|
|
if answer_first_name in text: |
|
|
same = True |
|
|
else: |
|
|
llm = LLM_garda() |
|
|
answer = llm.generate( |
|
|
f'Nämns någon med förnamnet "{answer_first_name}" i texten nedan?\n\n"""{text[:5000]}"""\n\nNamnet behöver inte vara stavat på exakt samma sätt, men det ska vara samma namn. Svara "JA" eller "NEJ"' |
|
|
) |
|
|
if "JA" in answer: |
|
|
same = True |
|
|
|
|
|
else: |
|
|
name_similarity = difflib.SequenceMatcher(None, person, answer_person).ratio() |
|
|
print("Similarity:", name_similarity) |
|
|
|
|
|
|
|
|
if name_similarity > 0.85: |
|
|
same = True |
|
|
|
|
|
return same |
|
|
|
|
|
|
|
|
def execute_query_with_retry(db, query, max_retries=5, delay=2): |
|
|
for attempt in range(max_retries): |
|
|
try: |
|
|
result = db.aql.execute(query) |
|
|
# If the result is a string, raise an exception |
|
|
if isinstance(result, str): |
|
|
raise ValueError(f"Unexpected result from database: {result}") |
|
|
return list(result) |
|
|
except Exception as e: |
|
|
print(f"Error executing query, attempt {attempt+1}: {e}") |
|
|
sleep(delay) |
|
|
# If we've exhausted all retries, re-raise the last exception |
|
|
raise |
|
|
|
|
|
|
|
|
# Then, in your extract_persons function: |
|
|
def extract_persons(interrogation, names_interrogation): |
|
|
|
|
|
llm = LLM( |
|
|
chat=True, |
|
|
system_prompt="Du är en assistent som hjälper till att hitta personer i ett polisförhör. Svara bara när personen finns i den del du får, hitta inte på personer.", |
|
|
) |
|
|
names = [] |
|
|
# Find persons in the text |
|
|
prompt = f'''Det här är en text från ett polisförhör där {interrogation["person"]} förhörs:\n |
|
|
"""{chunk}"""\n |
|
|
Vilka personer nämns i texten? Svara ENBART med en pythonformaterad lista av namn. |
|
|
Exempel på svar för att du ska förstå formen: "["namn1", "namn2", "namn3"]". |
|
|
Jag är inte intresserad av förhörsledaren eller personen som förhörs.''' |
|
|
response = llm.generate(prompt) |
|
|
response = re.sub(r"[^a-zA-ZåäöÅÄÖ\- ,]", "", response).replace(" namn ", "") |
|
|
|
|
|
for name in [i.strip() for i in response.split(",") if len(i) > 2]: |
|
|
if name not in names and name not in names_interrogation: |
|
|
same_name = False |
|
|
if names_interrogation != []: |
|
|
for name_interrogation in list(names_interrogation): |
|
|
if name in name_interrogation: |
|
|
same_name = True |
|
|
names_interrogation[name] = names_interrogation[name_interrogation] |
|
|
person_arango = db.aql.execute('for doc in persons filter doc.name == @name return doc', bind_vars={'name': names_interrogation[name_interrogation]}, count=True) |
|
|
if person_arango: |
|
|
person_arango = list(person_arango)[0] |
|
|
if interrogation["_key"] not in person_arango["mentioned_as"]: |
|
|
person_arango["mentioned_as"][interrogation["_key"]] = [name] |
|
|
else: |
|
|
if name not in person_arango["mentioned_as"][interrogation["_key"]]: |
|
|
person_arango["mentioned_as"][interrogation["_key"]].append(name) |
|
|
db.collection("persons").update(person_arango, check_rev=False) |
|
|
|
|
|
if not same_name: |
|
|
names.append(name) |
|
|
else: |
|
|
print_green('Name already in names_interrogation', name) |
|
|
return names, names_interrogation |
|
|
|
|
|
def identify_persons(names, chunk, names_interrogation): |
|
|
for name in names: |
|
|
print_blue('New name:', name) |
|
|
|
|
|
# Compare the person to a list of known persons |
|
|
prompt = f'''Jag vill veta vem {name} är. Kolla på förhöret nedan och svara om du hittar något om personen där. |
|
|
"""{chunk}"""\n |
|
|
Vem är {name}? Svara bara med sådant som finns i texten.''' |
|
|
info = llm.generate(prompt) |
|
|
person = None |
|
|
# Reverse name |
|
|
if name in persons: |
|
|
person = persons_dict[name] |
|
|
|
|
|
elif name in known_persons: |
|
|
person = persons_dict[known_persons[name]] |
|
|
|
|
|
elif name.split().reverse() in persons: |
|
|
print("Vände och hittade ✌️", name.split().reverse()) |
|
|
person = persons_dict[name.split().reverse()] |
|
|
|
|
|
else: |
|
|
|
|
|
closest_matches = difflib.get_close_matches(name, persons, n=4, cutoff=0.3) |
|
|
if name.split()[0] in first_names: |
|
|
if first_names[name.split()[0]] not in closest_matches: |
|
|
closest_matches.append(first_names[name.split()[0]]) |
|
|
persons_string = "\n".join(closest_matches) |
|
|
prompt = f"""Jag behöver identifiera {name}. Nedan är en lista på personer det kanske skulle kunna vara:\n |
|
|
{persons_string}\n |
|
|
Är {name} någon av dessa personer? I texten kan personen stå med bara sitt förnamn eller efternamn, så kolla speciellt efter namn i listan där förnamnet eller efternamnet stämmer. Namnet i förhöret kan också vara felstavat, exempelvis ett s istället för två eller så kan bokstäver ha bytt plats, men inte ett helt annat namn. |
|
|
Svara BARA med namnet på personen ur listan. Är det inte någon av personerna i listan så svara "None".""" |
|
|
answer_person = llm.generate(prompt) |
|
|
|
|
|
if answer_person in persons and check_name( |
|
|
name, answer_person, interrogation["text"] |
|
|
): |
|
|
person = persons_dict[answer_person] |
|
|
|
|
|
else: |
|
|
print_red(f"""Answer "{answer_person}" not in persons""") |
|
|
|
|
|
if person: |
|
|
if name not in names_interrogation: |
|
|
names_interrogation[name] = person['name'] |
|
|
|
|
|
print_green(f'{name} identified: {person["name"]}', "\n") |
|
|
|
|
|
if "info" not in person: |
|
|
person["info"] = [] |
|
|
if info not in person["info"]: |
|
|
person["info"].append(info) |
|
|
|
|
|
if interrogation["_key"] not in person["mentioned_as"]: |
|
|
person["mentioned_as"][interrogation["_key"]] = [name] |
|
|
else: |
|
|
if name not in person["mentioned_as"][interrogation["_key"]]: |
|
|
person["mentioned_as"][interrogation["_key"]].append(name) |
|
|
|
|
|
if interrogation["_key"] not in person["mentioned_in_interrogation"]: |
|
|
person["mentioned_in_interrogation"].append(interrogation["_key"]) |
|
|
|
|
|
db.collection("persons").update(person, check_rev=False) |
|
|
|
|
|
# If the person was not identified as a confirmed person, add to the unconfirmed persons |
|
|
else: |
|
|
if name not in names_interrogation: |
|
|
names_interrogation[name] = name |
|
|
print(f"\033[91m{name} not identified\033[0m") |
|
|
print_yellow( |
|
|
"\n".join([f"- {i}" for i in persons_string.split("\n")]), "\n" |
|
|
) |
|
|
print() |
|
|
|
|
|
_key = arango.fix_key_name(name) #TODO Are there multiple persons with the same name? |
|
|
|
|
|
# If no confirmed person was identified, create a new person or add to another unconfirmed person |
|
|
doc = db.collection("persons").get(_key) |
|
|
if doc: |
|
|
if interrogation["_key"] not in doc["mentioned_as"]: |
|
|
doc["mentioned_as"][interrogation["_key"]] = [name] |
|
|
else: |
|
|
if name not in doc["mentioned_as"][interrogation["_key"]]: |
|
|
doc["mentioned_as"][interrogation["_key"]].append(name) |
|
|
|
|
|
|
|
|
if interrogation["_key"] not in doc["mentioned_in_interrogation"]: |
|
|
doc["mentioned_in_interrogation"].append(interrogation["_key"]) |
|
|
if info not in doc["info"]: |
|
|
doc["info"].append(info) |
|
|
|
|
|
else: |
|
|
doc = { |
|
|
"_key": _key, |
|
|
"name": name, |
|
|
"info": [info], |
|
|
"confirmed": False, |
|
|
"mentioned_in_interrogation": [interrogation["_key"]], |
|
|
"mentioned_as": {interrogation["_key"]: [name]}, |
|
|
} |
|
|
|
|
|
db.collection("persons").insert(doc, merge=False, overwrite_mode='update') |
|
|
|
|
|
if person and person['_key'] not in interrogation["mentioned_persons"]: |
|
|
interrogation["mentioned_persons"].append(person['_key']) |
|
|
db.collection("interrogations").update(interrogation, check_rev=False) |
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
db = arango.db |
|
|
q = 'for doc in interrogations return doc' |
|
|
interrogations = list(db.aql.execute(q)) |
|
|
|
|
|
interrogations.sort(key=lambda x: x["date"]) |
|
|
|
|
|
persons = list(db.collection("persons").all()) |
|
|
|
|
|
interrogations_done = [] |
|
|
for person in persons: |
|
|
if ( |
|
|
"mentioned_in_interrogation" in person |
|
|
and person["mentioned_in_interrogation"] |
|
|
): |
|
|
for interrogation in person["mentioned_in_interrogation"]: |
|
|
interrogations_done.append(interrogation) |
|
|
|
|
|
# interrogations = [ |
|
|
# interrogation |
|
|
# for interrogation in interrogations |
|
|
# if interrogation["_key"] not in set(interrogations_done) |
|
|
# ] |
|
|
# print("Number of interrogations to process:", len(interrogations)) |
|
|
|
|
|
# q = 'for doc in interrogations return doc' |
|
|
# interrogations = list(db.aql.execute(q)) |
|
|
|
|
|
# # Filter out interrogations that have their _key in the rumors collection |
|
|
# q = 'for rumor in rumors return rumor._key' |
|
|
# rumors = list(db.aql.execute(q)) |
|
|
# interrogations = [interrogation for interrogation in interrogations if interrogation['_key'] not in rumors] |
|
|
# print('Number of interrogations to process:', len(interrogations)) |
|
|
print(len(interrogations)) |
|
|
|
|
|
for interrogation in interrogations: |
|
|
names_interrogation = {} |
|
|
known_persons = { |
|
|
"Douglas": "Douglas Bengtsson", |
|
|
"Rashid": "Rashid Sheiksaid", |
|
|
"Emanuel": "Emanuel Johansson", |
|
|
"Robert": "Robert Bengtsson", |
|
|
"Marlene": "Marlene Ahlqvist", |
|
|
"Jhonny": "Jhonny Backman", |
|
|
} |
|
|
|
|
|
sleep(random.uniform(0.05, 0.3)) |
|
|
print("INTERROGATION:", interrogation["_key"]) |
|
|
q = "for doc in persons filter doc.confirmed == true return doc" |
|
|
result = execute_query_with_retry(db, q) |
|
|
|
|
|
persons_docs = list(result) |
|
|
|
|
|
persons = [i["name"].strip() for i in persons_docs] |
|
|
first_names = {i["name"].split()[0].strip(): i["name"] for i in persons_docs} |
|
|
|
|
|
persons_dict = {i["name"]: i for i in persons_docs} |
|
|
|
|
|
text_splitter = CharacterTextSplitter( |
|
|
separator="\n\n", |
|
|
chunk_size=8000, |
|
|
chunk_overlap=0, |
|
|
length_function=len, |
|
|
is_separator_regex=False, |
|
|
) |
|
|
chunks = text_splitter.split_text(interrogation["text"]) |
|
|
for chunk in chunks: |
|
|
names = extract_persons(interrogation) |
|
|
exit() |
|
|
with multiprocessing.Pool(processes=3) as pool: |
|
|
pool.map(extract_persons, interrogations)
|
|
|
|