Refactor code structure for improved readability and maintainability

main
lasseedfast 7 months ago
parent 62b68c3717
commit 8229d98b68
  1. 1
      _arango.py
  2. 21
      article2db.py
  3. 21
      collections_page.py
  4. 37
      llm_server.py
  5. 405
      migrate_repos.py
  6. 57
      streamlit_chatbot.py
  7. 1527
      streamlit_chatbot_streaming_tools.py

@ -223,7 +223,6 @@ class ArangoDB:
assert '_id' in document or '_key' in document, "Document must have either _id or _key"
if '_id' not in document:
document['_id'] = f"{collection_name}/{document['_key']}"
return self.db.collection(collection_name).insert(
document,
overwrite=overwrite,

@ -643,14 +643,9 @@ class Processor:
).get(self.document._key)
self.document._id = self.document.arango_doc["_id"]
if "summary" not in self.document.arango_doc:
# Make a summary in the background
print_yellow("No summary found in the document, generating in background...")
print_rainbow(self.document.arango_doc['chunks'])
self.document.make_summary_in_background()
else:
print_green("Summary already exists in the document.")
print(self.document.arango_doc['summary'])
# Send the document to llm server for background processing
self.document.make_summary_in_background()
return self.document.arango_doc
def llm2metadata(self):
@ -1090,6 +1085,11 @@ class Processor:
)
self.document.title = self.document.get_title()
if self.document.is_sci:
self.document.arango_collection = "sci_articles"
self.document.arango_db_name = "base"
# Try to get DOI from filename or text
if not self.document.doi and self.document.filename:
self.document.doi = self.extract_doi(self.document.filename)
@ -1221,6 +1221,11 @@ class Processor:
self.document.arango_db_name = self.username
print_purple("Not a scientific article, using 'other_articles' collection.")
else:
self.document.arango_collection = "sci_articles"
self.document.arango_db_name = "base"
print_purple("Scientific article, using 'sci_articles' collection.")
arango_doc = self.chunks2arango()
_id = arango_doc["_id"]

@ -176,8 +176,12 @@ class ArticleCollectionsPage(StreamlitBaseClass):
expander_title = f"**{title}** *{journal}* ({published_year}) {icon}"
with st.expander(expander_title):
if not title == "No Title":
st.markdown(f"**Title:** \n{title}")
# if not title == "No Title":
# st.markdown(f"**Title:** \n{title}")
if 'summary' in article and 'short_summary' in article['summary']:
st.markdown(
f"{article['summary']['short_summary']}"
)
if not journal == "No Journal":
st.markdown(f"**Journal:** \n{journal}")
@ -200,6 +204,8 @@ class ArticleCollectionsPage(StreamlitBaseClass):
continue
if isinstance(value, list):
value = ", ".join(value)
if key == "summary":
st.markdown(f"**Summary:** \n{value['text_sum']}")
st.markdown(f"**{key.capitalize()}**: \n{value} ")
if "doi" in article:
if article["doi"]:
@ -370,7 +376,16 @@ class ArticleCollectionsPage(StreamlitBaseClass):
info_cursor = arango.db.aql.execute(query, bind_vars={"doc_id": _id})
elif doi:
info_cursor = arango.db.aql.execute(
f'FOR doc IN sci_articles FILTER doc["doi"] == "{doi}" LIMIT 1 RETURN {{"_id": doc["_id"], "doi": doc["doi"], "title": doc["title"], "metadata": doc["metadata"], "summary": doc["summary"]}}'
f'''FOR doc IN sci_articles
FILTER doc["doi"] == "{doi}"
LIMIT 1
RETURN {{
"_id": doc["_id"],
"doi": doc["doi"],
"title": doc["title"],
"metadata": doc["metadata"],
"summary": doc["summary"]
}}'''
)
return next(info_cursor, None)

@ -254,12 +254,36 @@ def summarise_document_task(doc_data: dict):
system_message = "You are summarising a document. It is very important that you keep to what is written and do not add any of your own opinions or interpretations. Always answer in English."
# Initialize LLM and generate summary
llm = LLM(system_message=system_message)
llm = LLM(system_message=system_message, chat=True)
#if 'abstract'
prompt = get_summary_prompt(text, is_sci)
response = llm.generate(query=prompt)
summary = response.content
if 'summary' in doc_data and 'text_sum' in doc_data['summary']:
# If a summary already exists, use it instead of generating a new one
summary = doc_data['summary']['text_sum']
llm.messages.append({"role": "user", "content": 'Make a summary of this text:\n[text removed in chat history]'},
{"role": "assistant", "content": summary})
else:
prompt = get_summary_prompt(text, is_sci)
response = llm.generate(query=prompt)
summary = response.content
prompt = """Thanks! Now make a very short summary of the text, that is no longer than 50 words.
The summary should give an idea of what sort of text it is, and what the main points are.
Below are some examples of how to write the short summary:
Example 1: "This is a *scientific article* about the effects of climate change on polar bears. It discusses the impact of melting ice caps on their habitat and food sources."
Example 2: "This is an *interview* with a climate scientist discussing the challenges of communicating climate change to the public. It highlights the importance of clear messaging and public engagement."
Example 3: "This is a *news article* about the latest developments in renewable energy technology. It covers advancements in solar and wind power, and their potential impact on reducing carbon emissions."
Example 4: "This is a *blog post* about the benefits of meditation for mental health. It explores how mindfulness practices can reduce stress and improve overall well-being."
Example 5: "This is a *report* on the economic impact of the COVID-19 pandemic. It analyzes job losses, government responses, and the path to recovery."
Example 6: "This is a *research paper* on the effects of social media on youth mental health. It examines the correlation between social media use and anxiety, depression, and self-esteem issues."
Example 7: "This is an *opinion piece* on the importance of biodiversity conservation. It argues for stronger environmental policies to protect endangered species and ecosystems."
"""
short_summary_response = llm.generate(query=prompt)
short_summary = short_summary_response.content.strip()
# Create summary document
@ -267,8 +291,9 @@ def summarise_document_task(doc_data: dict):
"text_sum": summary,
"meta": {
"model": llm.model,
"temperature": llm.options["temperature"] if text else 0,
"date": datetime.now().strftime("%Y-%m-%d"),
},
"short_summary": short_summary,
}
# Process chunks if they exist
@ -296,7 +321,7 @@ def summarise_document_task(doc_data: dict):
if db_name == "sci_articles":
chroma.add_document(
collection="sci_articles_article_summaries",
document_id= doc_data["_key"]
document_id= doc_data["_key"],
text=summary_doc["text_sum"],
metadata={
"model": summary_doc["meta"]["model"],

@ -0,0 +1,405 @@
#!/usr/bin/env python3
"""
Automated GitHub to Gitea Repository Migration Tool
This script automatically discovers all repositories from a GitHub user
and migrates them to a Gitea instance, preserving all branches, tags, and history.
"""
import subprocess
import requests
import json
import os
import shutil
from pathlib import Path
from typing import List, Dict, Optional
import time
class RepoMigrator:
def __init__(self, github_username: str, gitea_base_url: str, gitea_username: str):
"""
Initialize the repository migrator.
Args:
github_username: Your GitHub username (source)
gitea_base_url: Base URL of your Gitea instance (e.g., 'https://git.edfast.se')
gitea_username: Your username on Gitea (destination)
"""
self.github_username = github_username
self.gitea_base_url = gitea_base_url.rstrip('/')
self.gitea_username = gitea_username
self.temp_dir = Path('/tmp/repo_migration')
# Create temp directory for cloning repos
self.temp_dir.mkdir(exist_ok=True)
def get_github_repos(self, github_token: Optional[str] = None) -> List[Dict]:
"""
Fetch all repositories from GitHub user.
Args:
github_token: Optional GitHub personal access token for private repos
Returns:
List of repository dictionaries with name, clone_url, etc.
"""
headers = {}
if github_token:
headers['Authorization'] = f'token {github_token}'
repos = []
page = 1
print(f"🔍 Discovering repositories for {self.github_username}...")
while True:
# GitHub API endpoint for user repositories
url = f'https://api.github.com/users/{self.github_username}/repos'
params = {'page': page, 'per_page': 100, 'type': 'all'}
response = requests.get(url, headers=headers, params=params)
if response.status_code != 200:
raise Exception(f"Failed to fetch GitHub repos: {response.status_code} - {response.text}")
page_repos = response.json()
if not page_repos:
break
repos.extend(page_repos)
page += 1
print(f" Found {len(page_repos)} repos on page {page-1}")
print(f"✅ Total repositories found: {len(repos)}")
return repos
def check_gitea_repo_exists(self, repo_name: str, gitea_token: Optional[str] = None) -> bool:
"""
Check if repository already exists on Gitea.
Args:
repo_name: Name of the repository
gitea_token: Optional Gitea access token
Returns:
True if repository exists, False otherwise
"""
headers = {}
if gitea_token:
headers['Authorization'] = f'token {gitea_token}'
url = f'{self.gitea_base_url}/api/v1/repos/{self.gitea_username}/{repo_name}'
response = requests.get(url, headers=headers)
return response.status_code == 200
def create_gitea_repo(self, repo_name: str, description: str, private: bool, gitea_token: str) -> bool:
"""
Create a new repository on Gitea.
Args:
repo_name: Name for the new repository
description: Repository description
private: Whether the repository should be private
gitea_token: Gitea access token
Returns:
True if successful, False otherwise
"""
headers = {
'Authorization': f'token {gitea_token}',
'Content-Type': 'application/json'
}
data = {
'name': repo_name,
'description': description,
'private': private,
'auto_init': False # Don't initialize with README since we're migrating
}
url = f'{self.gitea_base_url}/api/v1/user/repos'
response = requests.post(url, headers=headers, json=data)
if response.status_code == 201:
print(f" ✅ Created repository '{repo_name}' on Gitea")
return True
else:
print(f" ❌ Failed to create repository '{repo_name}': {response.status_code} - {response.text}")
return False
def run_command(self, command: List[str], cwd: Optional[Path] = None) -> tuple[bool, str]:
"""
Execute a shell command and return success status and output.
Args:
command: List of command parts
cwd: Working directory for the command
Returns:
Tuple of (success: bool, output: str)
"""
try:
result = subprocess.run(
command,
cwd=cwd,
capture_output=True,
text=True,
check=True
)
return True, result.stdout
except subprocess.CalledProcessError as e:
return False, f"Command failed: {e.stderr}"
def get_authenticated_clone_url(self, repo: Dict, github_token: Optional[str] = None) -> str:
"""
Get the appropriate clone URL based on available authentication.
Args:
repo: Repository dictionary from GitHub API
github_token: Optional GitHub personal access token
Returns:
Clone URL with authentication if available
"""
if github_token:
# Use HTTPS with token authentication
return f"https://{github_token}@github.com/{self.github_username}/{repo['name']}.git"
elif 'ssh_url' in repo and repo['ssh_url']:
# Use SSH URL if available (works with VS Code authentication)
return repo['ssh_url']
else:
# Fall back to HTTPS clone URL
return repo['clone_url']
def migrate_repository(self, repo: Dict, gitea_token: str, github_token: Optional[str] = None) -> bool:
"""
Migrate a single repository from GitHub to Gitea.
Args:
repo: Repository dictionary from GitHub API
gitea_token: Gitea access token for authentication
github_token: Optional GitHub personal access token
Returns:
True if migration successful, False otherwise
"""
repo_name = repo['name']
# Use authenticated clone URL if token is available
clone_url = self.get_authenticated_clone_url(repo, github_token)
description = repo.get('description', '')
private = repo['private']
print(f"\n📦 Migrating repository: {repo_name}")
# Check if repo already exists on Gitea
if self.check_gitea_repo_exists(repo_name, gitea_token):
print(f" Repository '{repo_name}' already exists on Gitea. Skipping...")
return True
# Create repository on Gitea first
if not self.create_gitea_repo(repo_name, description, private, gitea_token):
return False
# Clone repository with all branches and tags
repo_path = self.temp_dir / f"{repo_name}.git"
# Remove existing clone if it exists
if repo_path.exists():
shutil.rmtree(repo_path)
print(f" 📥 Cloning {repo_name} from GitHub with full history...")
# Use --mirror to get complete repository content including all refs
# This ensures we get everything: all branches, tags, and the complete history
success, output = self.run_command([
'git', 'clone', '--mirror', clone_url, str(repo_path)
])
if not success:
print(f" ❌ Failed to clone repository: {output}")
return False
# Verify we have the complete repository content
success, refs_output = self.run_command([
'git', 'show-ref'
], cwd=repo_path)
if success and refs_output.strip():
ref_lines = refs_output.strip().split('\n')
print(f" 📋 Found {len(ref_lines)} references (branches/tags)")
# Check total commit count across all branches
success, log_output = self.run_command([
'git', 'rev-list', '--count', '--all'
], cwd=repo_path)
if success and log_output.strip():
commit_count = int(log_output.strip())
print(f" 📊 Total commits across all branches: {commit_count}")
# Show file structure of the repository at HEAD
success, ls_output = self.run_command([
'git', 'ls-tree', '-r', '--name-only', 'HEAD'
], cwd=repo_path)
if success and ls_output.strip():
files = ls_output.strip().split('\n')
print(f" 📁 Files in repository: {len(files)} total")
# Show first few files to verify content
for file in files[:10]:
print(f" - {file}")
if len(files) > 10:
print(f" ... and {len(files) - 10} more files")
else:
print(f" No files found in HEAD - repository might be empty or have issues")
else:
print(f" No commits found - this is likely an empty repository")
else:
print(f" No refs found - this might be an empty repository")
# Set up Gitea remote URL with authentication
gitea_url = f'{self.gitea_base_url}/{self.gitea_username}/{repo_name}.git'
print(f" 📤 Pushing complete repository to Gitea...")
# For a mirrored clone, we need to set the push URL and push everything
success, output = self.run_command([
'git', 'remote', 'set-url', '--push', 'origin', gitea_url
], cwd=repo_path)
if not success:
print(f" ❌ Failed to set Gitea remote URL: {output}")
return False
# Push everything (all branches, tags, and refs) to Gitea
# The --mirror flag pushes all refs including branches and tags
success, output = self.run_command([
'git', 'push', '--mirror'
], cwd=repo_path)
if not success:
print(f" ❌ Failed to push to Gitea: {output}")
# Show more detailed error info
print(f" Error details: {output}")
return False
# Verify the push was successful by checking what branches exist on Gitea
print(f" 🔍 Verifying push to Gitea...")
success, branch_output = self.run_command([
'git', 'ls-remote', gitea_url
])
if success and branch_output.strip():
remote_refs = branch_output.strip().split('\n')
print(f" 📋 Confirmed {len(remote_refs)} references pushed to Gitea")
# Check if we have a main/master branch
has_main = any('refs/heads/main' in ref for ref in remote_refs)
has_master = any('refs/heads/master' in ref for ref in remote_refs)
if has_main:
print(f" 🌿 Default branch 'main' is available")
elif has_master:
print(f" 🌿 Default branch 'master' is available")
else:
# List available branches
branches = [ref.split('\t')[1] for ref in remote_refs if 'refs/heads/' in ref]
if branches:
print(f" 🌿 Available branches: {', '.join([b.replace('refs/heads/', '') for b in branches])}")
else:
print(f" Could not verify remote refs, but push appeared successful")
print(f" ✅ Successfully migrated {repo_name}")
# Clean up temporary clone
shutil.rmtree(repo_path)
return True
def migrate_all_repositories(self, github_token: Optional[str] = None, gitea_token: Optional[str] = None):
"""
Main method to migrate all repositories from GitHub to Gitea.
Args:
github_token: Optional GitHub personal access token
gitea_token: Optional Gitea access token (required for creating repos)
"""
if not gitea_token:
raise ValueError("Gitea token is required for creating repositories")
print(f"🚀 Starting migration from GitHub ({self.github_username}) to Gitea ({self.gitea_base_url}/{self.gitea_username})")
print("=" * 80)
# Get all repositories from GitHub
try:
repos = self.get_github_repos(github_token)
except Exception as e:
print(f"❌ Failed to fetch repositories from GitHub: {e}")
return
if not repos:
print("No repositories found to migrate.")
return
# Migrate each repository
successful_migrations = 0
failed_migrations = 0
for repo in repos:
try:
if self.migrate_repository(repo, gitea_token, github_token):
successful_migrations += 1
else:
failed_migrations += 1
# Small delay between migrations to be respectful
time.sleep(1)
except Exception as e:
print(f" ❌ Unexpected error migrating {repo['name']}: {e}")
failed_migrations += 1
# Summary
print("\n" + "=" * 80)
print(f"🎉 Migration completed!")
print(f"✅ Successfully migrated: {successful_migrations} repositories")
if failed_migrations > 0:
print(f"❌ Failed migrations: {failed_migrations} repositories")
# Clean up temp directory
if self.temp_dir.exists():
shutil.rmtree(self.temp_dir)
def main():
"""
Main function to run the migration script.
Set your configuration here and run the script.
"""
# Configuration - Update these values
GITHUB_USERNAME = 'lasseedfast'
GITEA_BASE_URL = 'https://git.edfast.se'
GITEA_USERNAME = 'lasse'
# Optional: Set these environment variables or hardcode them
# For security, it's better to use environment variables
GITHUB_TOKEN = os.getenv('GITHUB_TOKEN') # Optional, for private repos
GITEA_TOKEN = os.getenv('GITEA_TOKEN') # Required for creating repos
if not GITEA_TOKEN:
print("❌ GITEA_TOKEN environment variable is required!")
print(" You can get a token from: {}/user/settings/applications".format(GITEA_BASE_URL))
print(" Then run: export GITEA_TOKEN='your_token_here'")
return
# Create migrator and run migration
migrator = RepoMigrator(GITHUB_USERNAME, GITEA_BASE_URL, GITEA_USERNAME)
migrator.migrate_all_repositories(GITHUB_TOKEN, GITEA_TOKEN)
if __name__ == '__main__':
main()

@ -955,7 +955,6 @@ class Bot(BaseClass):
# Use a small model for efficient summarization
summary: OllamaMessage = self.generate(query=summary_prompt, model="small", stream=False)
summary_text = summary.content.strip('"')
summary_text = self.remove_thinking(summary_text)
# Format with source information
formatted_summary = f"{metadata_string}\n\nSUMMARY:\n{summary_text}"
@ -1228,42 +1227,46 @@ class StreamlitBot(Bot):
return "\n\n".join(bot_responses)
def write_reasoning(self, response):
"""Handle streaming responses that may contain thinking chunks"""
"""Handle streaming responses that may contain thinking chunks with native v0.9.0+ support"""
if isinstance(response, str):
# If the response is a string, just display it
return st.write(response)
chunks_iter = iter(response) # Convert generator to iterator
thinking_content = []
content_chunks = []
try:
first_mode, first_text = next(chunks_iter) # Get first chunk
except StopIteration:
return ""
# If it's a thinking chunk, show it in an expander
if first_mode == "thinking":
thinking_text = first_text.replace("<think>", "").replace("</think>", "")
if len(thinking_text) > 10:
st.write(thinking_text)
with st.expander("How the bot has been reasoning"):
st.write(thinking_text)
# Define a generator for the remaining normal content
def rest_gen():
for mode, text in chunks_iter:
if mode == "normal":
yield text
# Collect all chunks first to properly separate thinking from content
all_chunks = [(first_mode, first_text)]
for chunk in chunks_iter:
all_chunks.append(chunk)
# Separate thinking and content chunks
for mode, text in all_chunks:
if mode == "thinking":
thinking_content.append(text)
elif mode == "content":
content_chunks.append(text)
# Show thinking content in an expander if present
if thinking_content and len("".join(thinking_content).strip()) > 10:
with st.expander("🤔 How the bot has been reasoning"):
st.write("".join(thinking_content))
# Stream the main content
if content_chunks:
def content_gen():
for text in content_chunks:
yield text
return st.write_stream(rest_gen())
return st.write_stream(content_gen())
else:
# If the first chunk isn't thinking, include it in the stream
def full_gen():
yield first_text
for mode, text in chunks_iter:
if mode == "normal":
yield text
return st.write_stream(full_gen())
return ""
def write_normal(self, response):
"""Handle regular streaming responses without thinking chunks"""
@ -1274,8 +1277,10 @@ class StreamlitBot(Bot):
def text_only_gen():
for chunk in response:
if isinstance(chunk, tuple) and len(chunk) == 2:
_, text = chunk
yield text
chunk_type, text = chunk
# Only yield content chunks, skip thinking chunks
if chunk_type == "content":
yield text
else:
yield chunk

File diff suppressed because it is too large Load Diff
Loading…
Cancel
Save