import re import warnings import pymupdf import nltk from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.metrics.pairwise import linear_kernel import io import dotenv import os import asyncio import aiofiles import yaml # Check if 'punkt_tab' tokenizer data is available try: nltk.data.find("tokenizers/punkt_tab") except LookupError: import logging logging.info("Downloading 'punkt_tab' tokenizer data for NLTK.") nltk.download("punkt_tab") # Get the directory of the current script script_dir = os.path.dirname(os.path.abspath(__file__)) # Construct the absolute path to the prompts.yaml file prompts_path = os.path.join(script_dir, "prompts.yaml") # Load prompts from configuration file with open(prompts_path, "r") as file: prompts = yaml.safe_load(file) CUSTOM_SYSTEM_PROMPT = prompts["CUSTOM_SYSTEM_PROMPT"] GET_SENTENCES_PROMPT = prompts["GET_SENTENCES_PROMPT"] EXPLANATION_PROMPT = prompts["EXPLANATION_PROMPT"] class LLM: """ LLM class for interacting with language models from OpenAI or Ollama. Attributes: model (str): The model to be used for generating responses. num_ctx (int): The number of context tokens to be used. Defaults to 20000. temperature (float): The temperature setting for the model's response generation. keep_alive (int): The keep-alive duration for the connection. options (dict): Options for the model's response generation. memory (bool): Whether to retain conversation history. messages (list): List of messages in the conversation. openai (bool): Flag indicating if OpenAI is being used. ollama (bool): Flag indicating if Ollama is being used. client (object): The client object for OpenAI. llm (object): The client object for the language model. Methods: __init__(openai_key=False, model=None, temperature=0, system_prompt=None, num_ctx=None, memory=True, keep_alive=3600): Initializes the LLM class with the provided parameters. use_openai(key, model): Configures the class to use OpenAI for generating responses. use_ollama(model): Configures the class to use Ollama for generating responses. generate(prompt): Asynchronously generates a response based on the provided prompt. """ def __init__( self, num_ctx=20000, openai_key=False, model=None, temperature=0, system_prompt=None, memory=True, keep_alive=3600, ): """ Initialize the highlight_pdf class. Parameters: openai_key (str or bool): API key for OpenAI. If False, Ollama will be used. model (str, optional): The model to be used. Defaults to None. temperature (float, optional): Sampling temperature for the model. Defaults to 0. system_prompt (str, optional): Initial system prompt for the model. Defaults to None. context_window (int, optional): Number of context tokens. Defaults to None. memory (bool, optional): Whether to use memory. Defaults to True. keep_alive (int, optional): Keep-alive duration in seconds. Defaults to 3600. """ if model: self.model = model else: self.model = os.getenv("LLM_MODEL") self.temperature = temperature self.keep_alive = keep_alive self.options = {"temperature": self.temperature, num_ctx: num_ctx} self.memory = memory if system_prompt: self.messages = [{"role": "system", "content": system_prompt}] else: self.messages = [{"role": "system", "content": CUSTOM_SYSTEM_PROMPT}] # Check if OpenAI key is provided if openai_key: # Use OpenAI self.use_openai(openai_key, model) elif os.getenv("OPENAI_API_KEY") != '': # Use OpenAI self.use_openai(os.getenv("OPENAI_API_KEY"), model) else: # Use Ollama self.use_ollama(model) def use_openai(self, key, model): """ Configures the instance to use OpenAI's API for language model operations. Args: key (str): The API key for authenticating with OpenAI. model (str): The specific model to use. If not provided, it will default to the value of the "OPENAI_MODEL" environment variable. Attributes: llm (module): The OpenAI module. client (openai.AsyncOpenAI): The OpenAI client initialized with the provided API key. openai (bool): Flag indicating that OpenAI is being used. ollama (bool): Flag indicating that Ollama is not being used. model (str): The model to be used for OpenAI operations. """ import openai self.llm = openai self.client = openai.AsyncOpenAI(api_key=key) self.openai = True self.ollama = False if model: self.model = model else: self.model = os.getenv("LLM_MODEL") def use_ollama(self, model): """ Configures the instance to use the Ollama LLM (Language Learning Model) service. This method initializes an asynchronous Ollama client and sets the appropriate flags to indicate that Ollama is being used instead of OpenAI. It also sets the model to be used for the LLM, either from the provided argument or from an environment variable. Args: model (str): The name of the model to be used. If not provided, the model name will be fetched from the environment variable 'LLM_MODEL'. """ import ollama self.llm = ollama.AsyncClient() self.ollama = True self.openai = False if model: self.model = model else: self.model = os.getenv("LLM_MODEL") async def generate(self, prompt): """ Generates a response based on the provided prompt using either OpenAI or Ollama. Args: prompt (str): The input prompt to generate a response for. Returns: str: The generated response. Notes: - The prompt is stripped of leading whitespace on each line. """ prompt = re.sub(r"^\s+", "", prompt, flags=re.MULTILINE) self.messages.append({"role": "user", "content": prompt}) if self.openai: chat_completion = await self.client.chat.completions.create( messages=self.messages, model=self.model, temperature=0 ) answer = chat_completion.choices[0].message.content return answer elif self.ollama: response = await self.llm.chat( messages=self.messages, model=self.model, options=self.options, keep_alive=self.keep_alive, ) answer = response["message"]["content"] self.messages.append({"role": "assistant", "content": answer}) if not self.memory: self.messages = self.messages[0] return answer class Highlighter: """ Highlighter class for annotating and highlighting sentences in PDF documents using an LLM (Large Language Model). Attributes: silent (bool): Flag to suppress warnings. comment (bool): Flag to add comments to highlighted sentences. llm_params (dict): Parameters for the LLM. Methods: __init__(self, silent=False, openai_key=None, comments=False, llm_model=None, llm_temperature=0, llm_system_prompt=None, llm_num_ctx=None, llm_memory=True, llm_keep_alive=3600): Initializes the Highlighter class with the given parameters. async highlight(self, user_input, docs=None, data=None, pdf_filename=None): Highlights sentences in the provided PDF documents based on the user input. async get_sentences_with_llm(self, text, user_input): Uses the LLM to generate sentences from the text that should be highlighted based on the user input. async annotate_pdf(self, user_input: str, filename: str, pages: list = None, extend_pages: bool = False): Annotates the PDF with highlighted sentences and optional comments. Fixes the filename by replacing special characters with their ASCII equivalents. """ def __init__( self, silent=False, openai_key=None, comment=False, llm_model=None, llm_temperature=0, llm_system_prompt=None, llm_num_ctx=None, llm_memory=True, llm_keep_alive=3600, ): """ Initialize the class with the given parameters. Parameters: silent (bool): Flag to suppress output. openai_key (str or None): API key for OpenAI. comment (bool): Flag to enable or disable comments. llm_model (str or None): The model name for the language model. llm_temperature (float): The temperature setting for the language model. llm_system_prompt (str or None): The system prompt for the language model. llm_num_ctx (int or None): The number of context tokens for the language model. llm_memory (bool): Flag to enable or disable memory for the language model. llm_keep_alive (int): The keep-alive duration for the language model in seconds. """ dotenv.load_dotenv() # Ensure both model are provided or set in the environment assert llm_model or os.getenv("LLM_MODEL"), "LLM_MODEL must be provided as argument or set in the environment." self.silent = silent self.comment = comment self.llm_params = { "openai_key": openai_key, "model": llm_model, "temperature": llm_temperature, "system_prompt": llm_system_prompt, "num_ctx": llm_num_ctx, "memory": llm_memory, "keep_alive": llm_keep_alive, } async def highlight( self, user_input=None, docs=None, data=None, pdf_filename=None, pages=None, zero_indexed_pages=False, ): """ Highlights text in one or more PDF documents based on user input. Args: user_input (str): The text input from the user to highlight in the PDFs. Defaults to None. docs (list, optional): A list of PDF filenames to process. Defaults to None. data (list, optional): Data in JSON format to process. Should be on the format: [{"user_input": "text", "pdf_filename": "filename", "pages": [1, 2, 3]}]. Defaults to None. pdf_filename (str, optional): A single PDF filename to process. Defaults to None. pages (list, optional): A list of page numbers to process. Defaults to None. zero_indexed_pages (bool, optional): Flag to indicate if the page numbers are zero-indexed. Defaults to False. Returns: io.BytesIO: A buffer containing the combined PDF with highlights. Raises: AssertionError: If none of `data`, `pdf_filename`, or `docs` are provided. """ pdf_buffers = [] assert any( [data, pdf_filename, docs] ), "You need to provide either a PDF filename, a list of filenames or data in JSON format." if data: user_input = [item["user_input"] for item in data] docs = [item["pdf_filename"] for item in data] pages = [item.get("pages") for item in data] if not zero_indexed_pages: pages = [[p - 1 for p in page] for page in pages] if not docs: user_input = [user_input] docs = [pdf_filename] pages = [pages] tasks = [ self.annotate_pdf(ui, doc, pages=pg) for ui, doc, pg in zip(user_input, docs, pages or [pages] * len(docs)) ] pdf_buffers = await asyncio.gather(*tasks) combined_pdf = pymupdf.open() new_toc = [] for buffer in pdf_buffers: if not buffer: continue pdf = pymupdf.open(stream=buffer, filetype="pdf") length = len(combined_pdf) combined_pdf.insert_pdf(pdf) new_toc.append([1, f"Document {length + 1}", length + 1]) combined_pdf.set_toc(new_toc) pdf_buffer = io.BytesIO() combined_pdf.save(pdf_buffer) pdf_buffer.seek(0) return pdf_buffer async def get_sentences_with_llm(self, text, user_input): print(text) prompt = GET_SENTENCES_PROMPT.format(text=text, user_input=user_input) answer = await self.llm.generate(prompt) return answer.split("\n") async def annotate_pdf( self, user_input: str, filename: str, pages: list = None, extend_pages: bool = False, ): self.llm = LLM(**self.llm_params) pdf = pymupdf.open(filename) output_pdf = pymupdf.open() vectorizer = TfidfVectorizer() if pages is not None: new_pdf = pymupdf.open() pdf_pages = pdf.pages(pages[0], pages[-1] + 1) pdf_text = "" for page in pdf_pages: pdf_text += f'\n{page.get_text("text")}' new_pdf.insert_pdf(pdf, from_page=page.number, to_page=page.number) else: pdf_text = "\n".join([page.get_text("text") for page in pdf]) new_pdf = pymupdf.open() new_pdf.insert_pdf(pdf) pdf_sentences = nltk.sent_tokenize(pdf_text) tfidf_text = vectorizer.fit_transform(pdf_sentences) sentences = await self.get_sentences_with_llm(pdf_text, user_input) highlight_sentences = [] for sentence in sentences: if sentence == "None" or len(sentence) < 5: continue sentence = sentence.replace('"', "").strip() if sentence in pdf_text: highlight_sentences.append(sentence) else: tfidf_sentence = vectorizer.transform([sentence]) cosine_similarities = linear_kernel( tfidf_sentence, tfidf_text ).flatten() most_similar_index = cosine_similarities.argmax() most_similar_sentence = pdf_sentences[most_similar_index] highlight_sentences.append(most_similar_sentence) relevant_pages = set() for sentence in highlight_sentences: found = False if self.comment: explanation = await self.llm.generate( EXPLANATION_PROMPT.format(sentence=sentence, user_input=user_input) ) for page in new_pdf: rects = page.search_for(sentence) if not rects: continue found = True p1 = rects[0].tl p2 = rects[-1].br highlight = page.add_highlight_annot(start=p1, stop=p2) if self.comment: highlight.set_info(content=explanation) relevant_pages.add(page.number) new_pdf.reload_page(page) if not found and not self.silent: warnings.warn(f"Sentence not found: {sentence}", category=UserWarning) extended_pages = [] if extend_pages: for p in relevant_pages: extended_pages.append(p) if p - 1 not in extended_pages and p - 1 != -1: extended_pages.append(p - 1) if p + 1 not in extended_pages: extended_pages.append(p + 1) relevant_pages = extended_pages for p in relevant_pages: output_pdf.insert_pdf(new_pdf, from_page=p, to_page=p) if len(output_pdf) != 0: buffer = io.BytesIO() new_pdf.save(buffer) buffer.seek(0) return buffer else: if not self.silent: warnings.warn("No relevant sentences found", category=UserWarning) return None async def save_pdf_to_file(pdf_buffer, filename): async with aiofiles.open(filename, "wb") as f: await f.write(pdf_buffer.getbuffer()) if __name__ == "__main__": import argparse import json # Set up argument parser for command-line interface parser = argparse.ArgumentParser( description=( "Highlight sentences in PDF documents using an LLM.\n\n" "For more information, visit: https://github.com/lasseedfast/pdf-highlighter/blob/main/README.md" ) ) parser.add_argument( "--user_input", type=str, help="The text input from the user to highlight in the PDFs.", ) parser.add_argument("--pdf_filename", type=str, help="The PDF filename to process.") parser.add_argument("--silent", action="store_true", help="Suppress warnings.") parser.add_argument("--openai_key", type=str, help="API key for OpenAI.") parser.add_argument("--llm_model", type=str, help="The model name for the language model.") parser.add_argument( "--comment", action="store_true", help="Include comments in the highlighted PDF.", ) parser.add_argument( "--data", type=json.loads, help="Data in JSON format (fields: user_input, pdf_filename, list_of_pages).", ) args = parser.parse_args() # Initialize the Highlighter class with the provided arguments highlighter = Highlighter( silent=args.silent, openai_key=args.openai_key, comment=args.comment, llm_model=args.llm_model, ) # Define the main asynchronous function to highlight the PDF async def main(): highlighted_pdf = await highlighter.highlight( user_input=args.user_input, pdf_filename=args.pdf_filename, data=args.data, ) # Save the highlighted PDF to a new file if not args.pdf_filename: # If no specific PDF filename is provided if args.data and len(args.data) == 1: # If data is provided and contains exactly one item, use its filename filename = args.data[0]["pdf_filename"].replace(".pdf", "_highlighted.pdf") else: # If no specific filename and data contains multiple items, generate a timestamped filename from datetime import datetime filename = f"highlighted_pdf_{datetime.now().strftime('%Y%m%d_%H%M%S')}.pdf" else: # If a specific PDF filename is provided, append '_highlighted' to its name filename = args.pdf_filename.replace(".pdf", "_highlighted.pdf") await save_pdf_to_file( highlighted_pdf, filename ) # Print the clickable file path print(f'''Highlighted PDF saved to "file://{filename.replace(' ', '%20')}"''') # Run the main function using asyncio asyncio.run(main())