You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

206 lines
6.6 KiB

import yaml
import sys
import bcrypt
from _arango import ArangoDB
import os
import dotenv
import getpass
import argparse
import string
import secrets
from utils import fix_key
from colorprinter.print_color import *
dotenv.load_dotenv()
def read_yaml(file_path):
with open(file_path, "r") as file:
return yaml.safe_load(file)
def write_yaml(file_path, data):
with open(file_path, "w") as file:
yaml.safe_dump(data, file)
def add_user(data, username, email, name, password):
# Check for existing username
if username in data["credentials"]["usernames"]:
print(f"Error: Username '{username}' already exists.")
sys.exit(1)
# Check for existing email
for user in data["credentials"]["usernames"].values():
if user["email"] == email:
print(f"Error: Email '{email}' already exists.")
sys.exit(1)
# Hash the password using bcrypt
hashed_password = bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode(
"utf-8"
)
# Add the new user
data["credentials"]["usernames"][username] = {
"email": email,
"name": name,
"password": hashed_password,
}
def make_arango(username):
root_user = os.getenv("ARANGO_ROOT_USER")
root_password = os.getenv("ARANGO_ROOT_PASSWORD")
arango = ArangoDB(user=root_user, password=root_password, db_name="_system")
# Create the user
if not arango.db.has_user(username):
user = arango.db.create_user(
username,
password=os.getenv("ARANGO_PASSWORD"),
active=True,
extra={},
)
else:
user = arango.db.user(username)
user['password'] = os.getenv("ARANGO_PASSWORD")
print_rainbow(user)
if not arango.db.has_database(username):
arango.db.create_database(
name=user['username'],
users=[
user,
{
"username": os.getenv("ARANGO_USER"),
"password": os.getenv("ARANGO_PASSWORD"),
"active": True,
"extra": {},
},
],
)
arango = ArangoDB(user=root_user, password=root_password, db_name=username)
for collection in [
"projects",
"favorite_articles",
"article_collections",
"settings",
"chats",
"notes",
"other_documents",
"rss_feeds",
"interviews",
]:
if not arango.db.has_collection(collection):
arango.db.create_collection(collection)
user_arango = ArangoDB(db_name=username)
user_arango.db.collection("settings").insert(
{"current_page": "Bot Chat", "current_project": None}
)
def generate_random_password(length=16):
characters = string.ascii_letters + string.digits
password = "-".join(
"".join(secrets.choice(characters) for _ in range(6)) for _ in range(3)
)
return password
def delete_user(data, username):
# Check if the user exists
if username not in data["credentials"]["usernames"]:
print(f"Error: Username '{username}' does not exist.")
sys.exit(1)
# Remove the user from the YAML data
del data["credentials"]["usernames"][username]
# Remove the user's database in ArangoDB
root_user = os.getenv("ARANGO_ROOT_USER")
root_password = os.getenv("ARANGO_ROOT_PASSWORD")
base_arango = ArangoDB(user=root_user, password=root_password, db_name="base")
# Remove the user's database in ArangoDB
root_user = os.getenv("ARANGO_ROOT_USER")
root_password = os.getenv("ARANGO_ROOT_PASSWORD")
arango = ArangoDB(user=root_user, password=root_password, db_name="_system")
if arango.db.has_database(username):
arango.db.delete_database(username)
# Remove user access from documents in relevant collections
collections = ["sci_articles", "other_documents"]
for collection_name in collections:
documents = base_arango.db.aql.execute(
"""
FOR doc IN @@collection_name
FILTER @username IN doc.user_access
RETURN {'_id': doc._id, 'user_access': doc.user_access}
""",
bind_vars={"username": username, "@collection_name": collection_name},
)
for document in documents:
if 'user_access' in document:
# Remove username from the list user_access
document['user_access'].remove(username)
base_arango.db.collection(collection_name).update(document)
print_green(f"User {username} deleted successfully.")
def main():
parser = argparse.ArgumentParser(description="Add or delete a user.")
parser.add_argument("--user", help="Username")
parser.add_argument("--email", help="Email address")
parser.add_argument("--name", help="Full name")
parser.add_argument("--password", help="Password")
parser.add_argument("--delete", action="store_true", help="Delete user")
args = parser.parse_args()
yaml_file = "streamlit_users.yaml"
data = read_yaml(yaml_file)
if args.delete:
if args.user:
username = args.user
delete_user(data, username)
write_yaml(yaml_file, data)
else:
print("Error: Username is required to delete a user.")
sys.exit(1)
else:
if args.user and args.email and args.name:
username = args.user
email = args.email
name = args.name
if args.password and len(args.password) >= 8:
password = args.password
else:
password = generate_random_password()
print_yellow("Generated password:", password)
else:
username = input("Enter username: ")
email = input("Enter email: ")
name = input("Enter name: ")
password = getpass.getpass("Enter password: ")
if not password or password == "":
password = generate_random_password()
print_yellow("Generated password:", password)
if username == 'test':
delete_user(data, username)
email = email.lower().strip()
checked_username = fix_key(username)
if checked_username != username:
username = checked_username
print_red(f"Username '{username}' contains invalid characters.")
print_yellow(f"Using '{checked_username}' instead.")
add_user(data, username, email, name, password)
make_arango(username)
write_yaml(yaml_file, data)
print_green(f"User {username} added successfully.")
if __name__ == "__main__":
main()