diff --git a/private/Backup.py b/private/Backup.py index 9b69299..ef5e13f 100755 --- a/private/Backup.py +++ b/private/Backup.py @@ -8,30 +8,91 @@ import lib.uis.default as default_cmd # Follows -u, -a, -f flags class Backup: + """Backups a Tilde database to an CSV file + + :Example: + >>> from Backup import Backup + >>> from ListUsers import ListUsers + >>> L = ListUsers.ListUsers("/path/to/sqlite").get_fetch() + >>> backup_db = Backup("stdout") + >>> backup_db.backup_to_file(L) + CSV-Separated list with headers in first row + + """ + filename: str quoting: int dialect: str field_names: tuple - def __init__(self, fname: str, quoting: int = csv.QUOTE_NONNUMERIC, dialect: str = "excel"): - self.setFilename(fname) + def __init__(self, output: str, quoting: int = csv.QUOTE_NONNUMERIC, dialect: str = "excel"): + """ Constructs the Backup object + + :param output: File name to backup to(set to stdout for stdout) + :type output: str + :param quoting: Set quoting for CSV Module + :type quoting: int + :param dialect: Set the CSV-Dialect. Defaults to excel, which is the classic CSV + :type dialect: str + """ + + self.setFilename(output) self.setQuoting(quoting) self.setDialect(dialect) self.setFieldnames(tuple(['id', 'username', 'email', 'name', 'pubkey', 'timestamp', 'status'])) - def setDialect(self, dialect: str): + def setDialect(self, dialect: str) -> None: + """ Set dialect for Object + + :param dialect: Dialect to set for Object + :type dialect: str + :return: None + :rtype: None + """ + self.dialect = dialect - def setQuoting(self, quoting: int): + def setQuoting(self, quoting: int) -> None: + """ Set quoting in the CSV(must be supported by the CSV Module!) + + :param quoting: Quoting Integer given by csv.QUOTE_* constants + :type quoting: int + :return: None + :rtype: None + """ + self.quoting = quoting - def setFilename(self, filename: str): + def setFilename(self, filename: str) -> None: + """ Sets Filename to output to + + :param filename: Filename to output to(set stdout for stdout) + :type filename: str + :return: None + :rtype: None + """ + self.filename = filename - def setFieldnames(self, f_names: tuple): + def setFieldnames(self, f_names: tuple) -> None: + """ Set fieldname to process + + :param f_names: Fieldnames-Tuple + :type f_names: tuple + :return: None + :rtype: None + """ + self.field_names = f_names - def BackupToFile(self, fetched: list): + def backup_to_file(self, fetched: list) -> bool: + """Backup Userlist to File(or stdout) + + :param fetched: List of values to write out CSV-formatted + :return: True, if success, None when not. + :rtype: bool + """ + returner = io.StringIO() write_csv = csv.DictWriter(returner, fieldnames=self.field_names, quoting=self.quoting, dialect=self.dialect) write_csv.writeheader() @@ -39,6 +100,7 @@ class Backup: if self.filename == "stdout": print(returner.getvalue()) + return True else: with open(self.filename, "w") as f: print(returner.getvalue(), file=f) @@ -53,9 +115,9 @@ if __name__ == "__main__": try: L = ListUsers.ListUsers(config['DEFAULT']['applications_db'], unapproved=args.unapproved, approved=args.approved) - fetch = L.getFetch() + fetch = L.get_fetch() B = Backup(args.file) - B.BackupToFile(fetch) + B.backup_to_file(fetch) exit(0) except KeyboardInterrupt as e: pass diff --git a/private/Import.py b/private/Import.py index 39e0c2e..3d4d88e 100755 --- a/private/Import.py +++ b/private/Import.py @@ -7,13 +7,24 @@ import lib.UserExceptions import lib.uis.config_ui # dont go to default, just following -c flag -def ImportFromFile(fname: str, db: str, userids: tuple = tuple([])): +def import_from_file(fname: str, db: str, userids: tuple = tuple([])) -> bool: + """ Imports Users from a given CSV-file to the system and DB + + :param fname: + :type fname: str + :param db: Path to the sqlite db + :type db: str + :param userids: FIXME: Tuple which userids should we write + :type userids: tuple + :return: True on success, False when not + :rtype: bool + """ if not os.path.isfile(fname): print(f"File {fname} don't exist") - return None + return False if not os.path.isfile(db): print(f"The database file {db} don't exist") - return None + return False if userids: pass # empty tuple means everything # noinspection PyBroadException @@ -83,7 +94,7 @@ if __name__ == "__main__": if not args.file: print("You MUST set a CSV-file with the -f/--file flag that already exist") exit(1) - ImportFromFile(args.file, config['DEFAULT']['applications_db']) + import_from_file(args.file, config['DEFAULT']['applications_db']) exit(0) except KeyboardInterrupt as e: pass diff --git a/private/ListUsers.py b/private/ListUsers.py index d59afca..e1d3c7a 100755 --- a/private/ListUsers.py +++ b/private/ListUsers.py @@ -10,6 +10,16 @@ class ListUsers: usersFetch = None def __init__(self, db: str, unapproved: bool = False, approved: bool = True): + """Constructs ListUsers + + :param db: Database to access + :type db: str + :param unapproved: only List unapproved users + :type unapproved: bool + :param approved: only list approved users + :type approved: bool + """ + self.db = SQLitedb(db) if unapproved: # only unapproved users query = "SELECT * FROM `applications` WHERE `status` = '0'" @@ -20,6 +30,12 @@ class ListUsers: self.usersFetch = self.db.query(query) def output_as_list(self) -> str: + """Generates a string with one (approved) user per line and one newline at the end + + :rtype: str + :return: String consisting with one(activated) user per line + """ + list_str: str = "" query = "SELECT `username` FROM `applications` WHERE `status` = '1' ORDER BY timestamp ASC" self.usersFetch = self.db.query(query) @@ -30,12 +46,13 @@ class ListUsers: def prettyPrint(self) -> None: pass # see below why not implemented yet, texttable... - def getFetch(self) -> list: + def get_fetch(self) -> list: """ Returns a complete fetch done by the sqlitedb-class :return: Complete fetchall() in a dict-factory :rtype: list """ + return self.usersFetch @@ -75,7 +92,7 @@ if __name__ == "__main__": if args.list: ret = L.output_as_list() else: - fetch = L.getFetch() + fetch = L.get_fetch() ret += "ID %-1s| Username %-5s| Mail %-20s| Name %-17s| Registered %-8s | State |\n" % ( " ", " ", " ", " ", " " ) diff --git a/private/editUsers.py b/private/editUsers.py index 9d7f6a6..eaa6ddc 100755 --- a/private/editUsers.py +++ b/private/editUsers.py @@ -101,7 +101,7 @@ if __name__ == "__main__": print(f"Couldn't update database entry for {args.user}, didn't touch the system") exit(1) try: - Sysctl.removeUser(args.user) + Sysctl.remove_user(args.user) except lib.UserExceptions.UnknownReturnCode as e: print(f"Couldn't remove {args.user} from the system, unknown return code: {e}. DB is modified.") exit(1) diff --git a/private/lib/System.py b/private/lib/System.py index 17de845..311e693 100644 --- a/private/lib/System.py +++ b/private/lib/System.py @@ -5,11 +5,19 @@ import lib.UserExceptions class System: - """Class to interact with the system specifically to support our needs 0w0""" - - dry = False + """Class to interact with the system specifically to support our needs 0w0 + :Example: + >>> from lib.System import System as System + >>> Sys_ctl = System(dryrun=True) + >>> Sys_ctl.register("Bob") + >>> Sys_ctl.lock_user_pw("Bob") + >>> Sys_ctl.add_to_usergroup("Bob") + >>> Sys_ctl.make_ssh_usable("Bob", "sshkey") + """ + + dry: bool = False create_command = [] - home = "" + home: str = "" def __init__(self, dryrun: bool = False, home: str = "/home/"): """Creates an objects. Can set dry run. @@ -19,6 +27,8 @@ class System: :type dryrun: bool :param home: Standard directory to search for the home directories of your users(default is /home/) :type home: str + :raises: + ValueError: if homedir can not be found """ self.dry = dryrun @@ -37,6 +47,8 @@ class System: :type cc: tuple :return: True, if worked, raises lib.UserExceptions.UserExistsAlready when not :rtype: bool + :raises: + lib.UserExceptions.UserExistsAlready: when username specified already exists on the system """ create_command = cc cc = create_command + tuple([username]) @@ -49,8 +61,15 @@ class System: raise lib.UserExceptions.UserExistsAlready(f"User {username} exists already") return True - def unregister(self, username: str): - self.removeUser(username) + def unregister(self, username: str) -> bool: + """ Just an alias function for removeUser + + :param username: username to remove + :type username: str + :return: True, when success, False(or exception) when not + :rtype: bool + """ + return self.remove_user(username) def make_ssh_usable(self, username: str, pubkey: str, sshdir: str = ".ssh/") -> bool: """ Make SSH usable for our newly registered user @@ -64,6 +83,10 @@ class System: :return: True, if worked, raises lib.UserExceptions.UnknownReturnCode, lib.UserExceptions.HomeDirExistsAlready or lib.UserExceptions.ModifyFilesystem when not :rtype: bool + :raises: + lib.UserExceptions.SSHDirUncreatable: if the ssh-dir couldnt be created nor exist + lib.UserExceptions.ModifyFilesystem: When chmod to .ssh and authorized_keys failed + lib.UserExceptions.General: if PWD cant find the specified user """ if self.dry: @@ -95,7 +118,16 @@ class System: raise lib.UserExceptions.General(f"PWD can't find {username}: {e}") return True - def write_ssh(self, key: str, ssh_dir: str): + @staticmethod + def write_ssh(key: str, ssh_dir: str) -> None: + """ Write SSH key to a specified directory(appends authorized_keys itself!) + + :param key: Key to write + :type key: str + :param ssh_dir: SSH Directory to write to + :type ssh_dir: str + :return: None + """ with open(ssh_dir + "authorized_keys", "w") as f: print(key, file=f) f.close() @@ -110,6 +142,8 @@ class System: :type cc: tuple :rtype: bool :return: True, if worked, raises lib.UserExceptions.UnknownReturnCode when not + :raises: + lib.UserExceptions.UnknownReturnCode: When cc returns something else then 0 """ lock_command = cc @@ -134,6 +168,8 @@ class System: :type cc: tuple :return: True, if worked, raises lib.UserExceptions.UnknownReturnCode when not :rtype bool + :raises: + lib.UserExceptions.UnknownReturnCode: if cc returned something else then 0 """ add_command = cc @@ -163,7 +199,7 @@ class System: pp += i + " " print(pp) - def removeUser(self, username: str, cc: tuple = tuple(["userdel", "-r"])) -> bool: + def remove_user(self, username: str, cc: tuple = tuple(["userdel", "-r"])) -> bool: """Removes the specified user from the system :param username: The username you want to delete from the system. @@ -172,6 +208,8 @@ class System: :type cc: tuple :return: True, if worked, raises lib.UserExceptions.UnknownReturnCode when not :rtype: bool + :raises: + lib.UserExceptions.UnknownReturnCode: When cc returns something else then 0 or 6 """ remove_command = cc @@ -188,11 +226,11 @@ class System: def AIO(username, pubkey, group="tilde"): - syst = System(dryrun=False) - syst.register(username) - syst.lock_user_pw(username) - syst.add_to_usergroup(username, group) - syst.make_ssh_usable(username, pubkey) + sys_ctl = System(dryrun=False) + sys_ctl.register(username) + sys_ctl.lock_user_pw(username) + sys_ctl.add_to_usergroup(username, group) + sys_ctl.make_ssh_usable(username, pubkey) if __name__ == "__main__": diff --git a/private/lib/validator.py b/private/lib/validator.py index 5678416..ede8c13 100644 --- a/private/lib/validator.py +++ b/private/lib/validator.py @@ -1,9 +1,19 @@ import re import pwd import lib.sqlitedb +import csv def checkUsernameCharacters(username: str) -> bool: + """ Checks the Username for invalid characters. Allow only alphanumerical characters, a lower alpha one first, + Followed by any sequence of digits and characters + + :param username: String to check for validity + :type username: str + :return: True when valid, False when not + :rtype: bool + """ + if " " not in username and "_" not in username and username.isascii() and username[:1].islower() and \ not username[0].isnumeric(): if not re.search(r"\W+", username): @@ -12,15 +22,35 @@ def checkUsernameCharacters(username: str) -> bool: return False -def checkUsernameLength(username: str) -> bool: - if len(username) > 16: +def checkUsernameLength(username: str, upper_limit: int = 16, lower_limit: int = 3) -> bool: + """ Checks username for an upper and lower bounds limit character count + + :param username: Username to check + :type username: str + :param upper_limit: Upper limit bounds to check for(default is 16) + :type upper_limit: int + :param lower_limit: Lower limit bounds to check for(default is 3) + :return: True, when all bounds are in, False when one or both aren't. + :rtype: bool + """ + + if len(username) > upper_limit: return False - if len(username) < 3: + if len(username) < lower_limit: return False return True def checkUserExists(username: str) -> bool: + """ Checks if the User exists on the **SYSTEM** by calling PWD on it. + **Note**: You might want to use this in conjunction with checkUserInDB + + :param username: + :type username: str + :return: True when exists, False when not + :rtype: bool + """ + try: pwd.getpwnam(username) except KeyError: @@ -30,6 +60,16 @@ def checkUserExists(username: str) -> bool: def checkUserInDB(username: str, db: str) -> bool: + """ Checks users existence in the **DATABASE**. + :Note: You might want to use this in conjunction with `checkUserExists` + + :param username: Username to check existence in database + :type username: str + :param db: Path to database to check in + :type db: str + :return: True, when User exists, False when not + """ + try: ldb = lib.sqlitedb.SQLitedb(db) fetched = ldb.safequery("SELECT * FROM 'applications' WHERE username = ?", tuple([username])) @@ -41,6 +81,16 @@ def checkUserInDB(username: str, db: str) -> bool: def checkSSHKey(key: str) -> bool: + """ Checks SSH Key for meta-data that we accept. + :Note: We currently only allow ssh keys without options but with a mail address at the end in b64 encoded. + The currently supported algorithms are: ecdfsa-sha2-nistp256, 'ecdsa-sha2-nistp384', 'ecdsa-sha2-nistp521', + 'ssh-rsa', 'ssh-dss' and 'ssh-ed25519' + + :param key: Key to check + :return: True, when Key is valid, False when not + :rtype: bool + """ + # taken from https://github.com/hashbang/provisor/blob/master/provisor/utils.py, all belongs to them! ;) import base64 if len(key) > 8192 or len(key) < 80: @@ -60,66 +110,110 @@ def checkSSHKey(key: str) -> bool: def checkEmail(mail: str) -> bool: + """ Checks Mail against a relatively simple REgex Pattern. + + :param mail: Mail to check + :type mail: str + :return: False, when the Mail is invalid, True when valid. + :rtype: bool + """ + if not re.match("(^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$)", mail): return False else: return True -def checkDatetimeFormat(form: str) -> bool : +def checkDatetimeFormat(datetime_str: str) -> bool: + """ Checks a Strings format on date time. + + :param datetime_str: String to check + :type datetime_str: str + :return: True when valid, False when not. + :rtype: bool + """ + import datetime try: - datetime.datetime.strptime(form, "%Y-%m-%d %H:%M:%S") + datetime.datetime.strptime(datetime_str, "%Y-%m-%d %H:%M:%S") except ValueError: return False return True def checkName(name: str) -> bool: + """ Checks a users (real) Name against a real simple REgex Pattern. + + :param name: Name/String to check + :type name: str + :return: True when valid, False when not. + :rtype: bool + """ + if not re.match("\w+\s*\w", name): return False else: return True -def checkImportFile(fname: str, db: str) -> bool: - error_list = str() +def checkImportFile(path: str, db: str): + """ Checks an CSV file against most of the validators and prints an Error message with the line number corresponding + to said failure.. Those includes: checkName, checkUsernameCharacters, + ckeckUsernameLength, duplicate usernames(in the CSV), checkSSHKey, checkEmail, checkUserExists, checkUserInDB, + checkDatetimeformat and if the status is 1 or 0. + + :param path: Path to file to check + :type path: str + :param db: Path to database file(SQLite) + :type db: str + :return: Str when Failure, True when success(All tests passed) + :rtype: Str or None + """ + + errstr = "" valid = True ln = 1 # line number - with open(fname, 'r', newline='') as f: - import csv - reador = csv.DictReader(f) - for row in reador: + valid_names_list = [] + with open(path, 'r', newline='') as f: + reader = csv.DictReader(f) + for row in reader: # if any of this fails move on to the next user, just print a relatively helpful message lel if not lib.validator.checkName(row["name"]): - error_list += f"Line{ln}: {row['name']} seems not legit. Character followed by character should be " \ - f"correct.\n" + errstr += f"Line {ln}: Name: '{row['name']}' seems not legit. Character followed by character should" \ + f" be correct.\n" valid = False if not lib.validator.checkUsernameCharacters(row["username"]): - error_list += (f"Line {ln}: Username contains unsupported characters or starts with a number: '" - f"{row['username']}'.\n") + errstr += (f"Line {ln}: Username contains unsupported characters or starts with a number: '" + f"{row['username']}'.\n") valid = False if not lib.validator.checkUsernameLength(row["username"]): - error_list += f"Line {ln}: Username '{row['username']}' is either too long(>16) or short(<3)\n" + errstr += f"Line {ln}: Username '{row['username']}' is either too long(>16) or short(<3)\n" + valid = False + # dup checking + if row["username"] in valid_names_list: + errstr += f"Line {ln}: Duplicate Username {row['username']}!\n" valid = False + else: + valid_names_list.append(row["username"]) + # dup end if not lib.validator.checkSSHKey(row["pubkey"]): - error_list += f"Line {ln}: Following SSH-Key of user '{row['username']}' isn't valid: '{row['pubkey']}'."\ - f"\n" + errstr += f"Line {ln}: Following SSH-Key of user '{row['username']}' isn't valid: " \ + f"'{row['pubkey']}'.\n" valid = False if not lib.validator.checkEmail(row["email"]): - error_list += f"Line {ln}: E-Mail address of user '{row['username']}' '{row['email']}' is not valid.\n" + errstr += f"Line {ln}: E-Mail address of user '{row['username']}' '{row['email']}' is not valid.\n" valid = False if not lib.validator.checkUserExists(row["username"]) or checkUserInDB(row["username"], db): - error_list += f"Line {ln}: User '{row['username']}' already exists.\n" + errstr += f"Line {ln}: User '{row['username']}' already exists.\n" valid = False if not lib.validator.checkDatetimeFormat(row["timestamp"]): - error_list += f"Line {ln}: Timestamp '{row['timestamp']}' from user '{row['username']}' is invalid.\n" + errstr += f"Line {ln}: Timestamp '{row['timestamp']}' from user '{row['username']}' is invalid.\n" valid = False if int(row["status"]) > 1 or int(row["status"]) < 0: - error_list += f"Line {ln}: Status '{row['status']}' MUST be either 0 or 1.\n" + errstr += f"Line {ln}: Status '{row['status']}' MUST be either 0 or 1.\n" valid = False ln += 1 if valid: return True else: - return error_list + return errstr