From 3614d7e012cd06ae953fe33fbb0d5da83366af5b Mon Sep 17 00:00:00 2001 From: Daniel Schadt Date: Sat, 2 Jul 2022 20:02:31 +0200 Subject: add a small script for maintenance tasks --- fietsboek/scripts/fietsctl.py | 243 +++++++++++++++++++++++++++++++++++++ fietsboek/scripts/initialize_db.py | 33 ----- setup.py | 2 +- 3 files changed, 244 insertions(+), 34 deletions(-) create mode 100644 fietsboek/scripts/fietsctl.py delete mode 100644 fietsboek/scripts/initialize_db.py diff --git a/fietsboek/scripts/fietsctl.py b/fietsboek/scripts/fietsctl.py new file mode 100644 index 0000000..1c6a8c0 --- /dev/null +++ b/fietsboek/scripts/fietsctl.py @@ -0,0 +1,243 @@ +"""Script to do maintenance work on a Fietsboek instance.""" +# pylint: disable=consider-using-f-string,unused-argument +import argparse +import getpass +import sys + +from pyramid.paster import bootstrap, setup_logging +from sqlalchemy import select + +from .. import models + + +EXIT_OKAY = 0 +EXIT_FAILURE = 1 + + +def cmd_useradd(env, args): + """Create a new user. + + This user creation bypasses the "enable_account_registration" setting. It + also immediately sets the new user's account to being verified. + + If email, name or password are not given as command line arguments, they + will be asked for interactively. + + On success, the created user's unique ID will be printed. + + Note that this function does less input validation and should therefore be used with care! + """ + email = args.email + if not email: + email = input("Email address: ") + name = args.name + if not name: + name = input("Name: ") + password = args.password + if not password: + password = getpass.getpass() + + # The UNIQUE constraint only prevents identical emails from being inserted, + # but does not take into account the case insensitivity. The least we + # should do here to not brick log ins for the user is to check if the email + # already exists. + query = models.User.query_by_email(email) + with env["request"].tm: + result = env["request"].dbsession.execute(query).scalar_one_or_none() + if result is not None: + print("Error: The given email already exists!", file=sys.stderr) + return EXIT_FAILURE + + user = models.User(name=name, email=email, is_verified=True, is_admin=args.admin) + user.set_password(password) + + with env["request"].tm: + dbsession = env["request"].dbsession + dbsession.add(user) + dbsession.flush() + user_id = user.id + + print(user_id) + return EXIT_OKAY + + +def cmd_userdel(env, args): + """Delete a user. + + This command deletes the user's account as well as any tracks associated + with it. + + This command is destructive and irreversibly deletes data. + """ + if args.id: + query = select(models.User).filter_by(id=args.id) + else: + query = models.User.query_by_email(args.email) + with env["request"].tm: + dbsession = env["request"].dbsession + user = dbsession.execute(query).scalar_one_or_none() + if user is None: + print("Error: No such user found.", file=sys.stderr) + return EXIT_FAILURE + print(user.name) + print(user.email) + if not args.force: + query = input("Really delete this user? [y/N] ") + if query not in {'Y', 'y'}: + print("Aborted by user.") + return EXIT_FAILURE + dbsession.delete(user) + print("User deleted") + return EXIT_OKAY + + +def cmd_userlist(env, args): + """Prints a listing of all user accounts. + + The format is + [av] {ID} - {email} - {Name} + one line per user. The 'a' is added for admin accounts, the 'v' is added + for verified users. + """ + with env["request"].tm: + dbsession = env["request"].dbsession + users = dbsession.execute(select(models.User).order_by(models.User.id)).scalars() + for user in users: + tag = '[{}{}]'.format( + 'a' if user.is_admin else '-', + 'v' if user.is_verified else '-', + ) + print(f"{tag} {user.id} - {user.email} - {user.name}") + return EXIT_OKAY + + +def cmd_passwd(env, args): + """Change the password of a user.""" + if args.id: + query = select(models.User).filter_by(id=args.id) + else: + query = models.User.query_by_email(args.email) + with env["request"].tm: + dbsession = env["request"].dbsession + user = dbsession.execute(query).scalar_one_or_none() + if user is None: + print("Error: No such user found.", file=sys.stderr) + return EXIT_FAILURE + password = args.password + if not password: + password = getpass.getpass() + repeat = getpass.getpass("Repeat password: ") + if password != repeat: + print("Error: Mismatched passwords.") + return EXIT_FAILURE + + user.set_password(password) + print(f"Changed password of {user.name} ({user.email})") + return EXIT_OKAY + + + +def parse_args(argv): + """Parse the given args. + + :param argv: List of arguments. + :type argv: list[str] + :return: The parsed arguments. + :rtype: argparse.Namespace + """ + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument( + '-c', '--config', + dest='config_uri', + help='configuration file, e.g., development.ini', + required=True, + ) + + subparsers = parser.add_subparsers(help='available subcommands', required=True) + + p_useradd = subparsers.add_parser( + "useradd", + help="create a new user", + description=cmd_useradd.__doc__, + ) + p_useradd.add_argument( + '--email', + help="email address of the user", + ) + p_useradd.add_argument( + '--name', + help="name of the user", + ) + p_useradd.add_argument( + '--password', + help="password of the user", + ) + p_useradd.add_argument( + '--admin', + action='store_true', + help="make the new user an admin", + ) + p_useradd.set_defaults(func=cmd_useradd) + + p_userdel = subparsers.add_parser( + "userdel", + help="delete a user account", + description=cmd_userdel.__doc__, + ) + p_userdel.add_argument( + '--force', '-f', + action='store_true', + help="override the safety check", + ) + group = p_userdel.add_mutually_exclusive_group(required=True) + group.add_argument( + '--id', '-i', + type=int, + help="database ID of the user", + ) + group.add_argument( + '--email', '-e', + help="email of the user", + ) + p_userdel.set_defaults(func=cmd_userdel) + + p_userlist = subparsers.add_parser( + "userlist", + help="list user accounts", + description=cmd_userlist.__doc__, + ) + p_userlist.set_defaults(func=cmd_userlist) + + p_passwd = subparsers.add_parser( + "passwd", + help="change user password", + description=cmd_userdel.__doc__, + ) + p_passwd.add_argument( + '--password', + help="password of the user", + ) + group = p_passwd.add_mutually_exclusive_group(required=True) + group.add_argument( + '--id', '-i', + type=int, + help="database ID of the user", + ) + group.add_argument( + '--email', '-e', + help="email of the user", + ) + p_passwd.set_defaults(func=cmd_passwd) + + return parser.parse_args(argv[1:]) + + +def main(argv=None): + """Main entry point.""" + if argv is None: + argv = sys.argv + args = parse_args(argv) + setup_logging(args.config_uri) + env = bootstrap(args.config_uri) + + sys.exit(args.func(env, args)) diff --git a/fietsboek/scripts/initialize_db.py b/fietsboek/scripts/initialize_db.py deleted file mode 100644 index d3de7ce..0000000 --- a/fietsboek/scripts/initialize_db.py +++ /dev/null @@ -1,33 +0,0 @@ -"""Script to initialize the tables in the database.""" -# pylint: disable=dangerous-default-value -import argparse -import sys - -from pyramid.paster import bootstrap, setup_logging - -from .. import models - - -def parse_args(argv): - """Parse the given args. - - :param argv: List of arguments. - :type argv: list[str] - :return: The parsed arguments. - :rtype: argparse.Namespace - """ - parser = argparse.ArgumentParser() - parser.add_argument( - 'config_uri', - help='Configuration file, e.g., development.ini', - ) - return parser.parse_args(argv[1:]) - - -def main(argv=sys.argv): - """Main entry point.""" - args = parse_args(argv) - setup_logging(args.config_uri) - env = bootstrap(args.config_uri) - - models.meta.metadata.create_all(env['request'].dbsession.get_bind()) diff --git a/setup.py b/setup.py index 58b69b9..d6267a6 100644 --- a/setup.py +++ b/setup.py @@ -60,7 +60,7 @@ setup( 'main = fietsboek:main', ], 'console_scripts': [ - 'initialize_fietsboek_db=fietsboek.scripts.initialize_db:main', + 'fietsctl=fietsboek.scripts.fietsctl:main', ], }, ) -- cgit v1.2.3