You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
206 lines
6.6 KiB
206 lines
6.6 KiB
import yaml |
|
import sys |
|
import bcrypt |
|
from _arango import ArangoDB |
|
import os |
|
import dotenv |
|
import getpass |
|
import argparse |
|
import string |
|
import secrets |
|
from utils import fix_key |
|
from colorprinter.print_color import * |
|
|
|
dotenv.load_dotenv() |
|
|
|
|
|
def read_yaml(file_path): |
|
with open(file_path, "r") as file: |
|
return yaml.safe_load(file) |
|
|
|
|
|
def write_yaml(file_path, data): |
|
with open(file_path, "w") as file: |
|
yaml.safe_dump(data, file) |
|
|
|
|
|
def add_user(data, username, email, name, password): |
|
# Check for existing username |
|
if username in data["credentials"]["usernames"]: |
|
print(f"Error: Username '{username}' already exists.") |
|
sys.exit(1) |
|
|
|
# Check for existing email |
|
for user in data["credentials"]["usernames"].values(): |
|
if user["email"] == email: |
|
print(f"Error: Email '{email}' already exists.") |
|
sys.exit(1) |
|
|
|
# Hash the password using bcrypt |
|
hashed_password = bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode( |
|
"utf-8" |
|
) |
|
|
|
# Add the new user |
|
data["credentials"]["usernames"][username] = { |
|
"email": email, |
|
"name": name, |
|
"password": hashed_password, |
|
} |
|
|
|
|
|
def make_arango(username): |
|
root_user = os.getenv("ARANGO_ROOT_USER") |
|
root_password = os.getenv("ARANGO_ROOT_PASSWORD") |
|
arango = ArangoDB(user=root_user, password=root_password, db_name="_system") |
|
|
|
# Create the user |
|
if not arango.db.has_user(username): |
|
user = arango.db.create_user( |
|
username, |
|
password=os.getenv("ARANGO_PASSWORD"), |
|
active=True, |
|
extra={}, |
|
) |
|
else: |
|
user = arango.db.user(username) |
|
user['password'] = os.getenv("ARANGO_PASSWORD") |
|
print_rainbow(user) |
|
|
|
if not arango.db.has_database(username): |
|
arango.db.create_database( |
|
name=user['username'], |
|
users=[ |
|
user, |
|
{ |
|
"username": os.getenv("ARANGO_USER"), |
|
"password": os.getenv("ARANGO_PASSWORD"), |
|
"active": True, |
|
"extra": {}, |
|
}, |
|
], |
|
) |
|
arango = ArangoDB(user=root_user, password=root_password, db_name=username) |
|
for collection in [ |
|
"projects", |
|
"favorite_articles", |
|
"article_collections", |
|
"settings", |
|
"chats", |
|
"notes", |
|
"other_documents", |
|
"rss_feeds", |
|
"interviews", |
|
]: |
|
if not arango.db.has_collection(collection): |
|
arango.db.create_collection(collection) |
|
user_arango = ArangoDB(db_name=username) |
|
user_arango.db.collection("settings").insert( |
|
{"current_page": "Bot Chat", "current_project": None} |
|
) |
|
|
|
|
|
def generate_random_password(length=16): |
|
characters = string.ascii_letters + string.digits |
|
password = "-".join( |
|
"".join(secrets.choice(characters) for _ in range(6)) for _ in range(3) |
|
) |
|
return password |
|
|
|
def delete_user(data, username): |
|
# Check if the user exists |
|
if username not in data["credentials"]["usernames"]: |
|
print(f"Error: Username '{username}' does not exist.") |
|
sys.exit(1) |
|
|
|
# Remove the user from the YAML data |
|
del data["credentials"]["usernames"][username] |
|
|
|
# Remove the user's database in ArangoDB |
|
root_user = os.getenv("ARANGO_ROOT_USER") |
|
root_password = os.getenv("ARANGO_ROOT_PASSWORD") |
|
base_arango = ArangoDB(user=root_user, password=root_password, db_name="base") |
|
# Remove the user's database in ArangoDB |
|
root_user = os.getenv("ARANGO_ROOT_USER") |
|
root_password = os.getenv("ARANGO_ROOT_PASSWORD") |
|
arango = ArangoDB(user=root_user, password=root_password, db_name="_system") |
|
if arango.db.has_database(username): |
|
arango.db.delete_database(username) |
|
|
|
# Remove user access from documents in relevant collections |
|
collections = ["sci_articles", "other_documents"] |
|
for collection_name in collections: |
|
documents = base_arango.db.aql.execute( |
|
""" |
|
FOR doc IN @@collection_name |
|
FILTER @username IN doc.user_access |
|
RETURN {'_id': doc._id, 'user_access': doc.user_access} |
|
""", |
|
bind_vars={"username": username, "@collection_name": collection_name}, |
|
) |
|
for document in documents: |
|
if 'user_access' in document: |
|
# Remove username from the list user_access |
|
document['user_access'].remove(username) |
|
base_arango.db.collection(collection_name).update(document) |
|
|
|
print_green(f"User {username} deleted successfully.") |
|
|
|
|
|
def main(): |
|
parser = argparse.ArgumentParser(description="Add or delete a user.") |
|
parser.add_argument("--user", help="Username") |
|
parser.add_argument("--email", help="Email address") |
|
parser.add_argument("--name", help="Full name") |
|
parser.add_argument("--password", help="Password") |
|
parser.add_argument("--delete", action="store_true", help="Delete user") |
|
|
|
args = parser.parse_args() |
|
|
|
yaml_file = "streamlit_users.yaml" |
|
data = read_yaml(yaml_file) |
|
if args.delete: |
|
if args.user: |
|
username = args.user |
|
delete_user(data, username) |
|
write_yaml(yaml_file, data) |
|
else: |
|
print("Error: Username is required to delete a user.") |
|
sys.exit(1) |
|
else: |
|
if args.user and args.email and args.name: |
|
username = args.user |
|
email = args.email |
|
name = args.name |
|
if args.password and len(args.password) >= 8: |
|
password = args.password |
|
else: |
|
password = generate_random_password() |
|
print_yellow("Generated password:", password) |
|
else: |
|
username = input("Enter username: ") |
|
email = input("Enter email: ") |
|
name = input("Enter name: ") |
|
password = getpass.getpass("Enter password: ") |
|
if not password or password == "": |
|
password = generate_random_password() |
|
print_yellow("Generated password:", password) |
|
|
|
if username == 'test': |
|
delete_user(data, username) |
|
|
|
email = email.lower().strip() |
|
checked_username = fix_key(username) |
|
if checked_username != username: |
|
username = checked_username |
|
print_red(f"Username '{username}' contains invalid characters.") |
|
print_yellow(f"Using '{checked_username}' instead.") |
|
|
|
add_user(data, username, email, name, password) |
|
make_arango(username) |
|
write_yaml(yaml_file, data) |
|
print_green(f"User {username} added successfully.") |
|
|
|
|
|
if __name__ == "__main__": |
|
main()
|
|
|