Documentation work all over the place

Renamed also some paramters
This commit is contained in:
Darksider3 2019-10-23 14:27:48 +02:00
parent 65c7bb6b3f
commit 0eeafa626e
6 changed files with 273 additions and 51 deletions

View file

@ -8,30 +8,91 @@ import lib.uis.default as default_cmd # Follows -u, -a, -f flags
class Backup:
"""Backups a Tilde database to an CSV file
:Example:
>>> from Backup import Backup
>>> from ListUsers import ListUsers
>>> L = ListUsers.ListUsers("/path/to/sqlite").get_fetch()
>>> backup_db = Backup("stdout")
>>> backup_db.backup_to_file(L)
CSV-Separated list with headers in first row
"""
filename: str
quoting: int
dialect: str
field_names: tuple
def __init__(self, fname: str, quoting: int = csv.QUOTE_NONNUMERIC, dialect: str = "excel"):
self.setFilename(fname)
def __init__(self, output: str, quoting: int = csv.QUOTE_NONNUMERIC, dialect: str = "excel"):
""" Constructs the Backup object
:param output: File name to backup to(set to stdout for stdout)
:type output: str
:param quoting: Set quoting for CSV Module
:type quoting: int
:param dialect: Set the CSV-Dialect. Defaults to excel, which is the classic CSV
:type dialect: str
"""
self.setFilename(output)
self.setQuoting(quoting)
self.setDialect(dialect)
self.setFieldnames(tuple(['id', 'username', 'email', 'name', 'pubkey', 'timestamp', 'status']))
def setDialect(self, dialect: str):
def setDialect(self, dialect: str) -> None:
""" Set dialect for Object
:param dialect: Dialect to set for Object
:type dialect: str
:return: None
:rtype: None
"""
self.dialect = dialect
def setQuoting(self, quoting: int):
def setQuoting(self, quoting: int) -> None:
""" Set quoting in the CSV(must be supported by the CSV Module!)
:param quoting: Quoting Integer given by csv.QUOTE_* constants
:type quoting: int
:return: None
:rtype: None
"""
self.quoting = quoting
def setFilename(self, filename: str):
def setFilename(self, filename: str) -> None:
""" Sets Filename to output to
:param filename: Filename to output to(set stdout for stdout)
:type filename: str
:return: None
:rtype: None
"""
self.filename = filename
def setFieldnames(self, f_names: tuple):
def setFieldnames(self, f_names: tuple) -> None:
""" Set fieldname to process
:param f_names: Fieldnames-Tuple
:type f_names: tuple
:return: None
:rtype: None
"""
self.field_names = f_names
def BackupToFile(self, fetched: list):
def backup_to_file(self, fetched: list) -> bool:
"""Backup Userlist to File(or stdout)
:param fetched: List of values to write out CSV-formatted
:return: True, if success, None when not.
:rtype: bool
"""
returner = io.StringIO()
write_csv = csv.DictWriter(returner, fieldnames=self.field_names, quoting=self.quoting, dialect=self.dialect)
write_csv.writeheader()
@ -39,6 +100,7 @@ class Backup:
if self.filename == "stdout":
print(returner.getvalue())
return True
else:
with open(self.filename, "w") as f:
print(returner.getvalue(), file=f)
@ -53,9 +115,9 @@ if __name__ == "__main__":
try:
L = ListUsers.ListUsers(config['DEFAULT']['applications_db'],
unapproved=args.unapproved, approved=args.approved)
fetch = L.getFetch()
fetch = L.get_fetch()
B = Backup(args.file)
B.BackupToFile(fetch)
B.backup_to_file(fetch)
exit(0)
except KeyboardInterrupt as e:
pass

View file

@ -7,13 +7,24 @@ import lib.UserExceptions
import lib.uis.config_ui # dont go to default, just following -c flag
def ImportFromFile(fname: str, db: str, userids: tuple = tuple([])):
def import_from_file(fname: str, db: str, userids: tuple = tuple([])) -> bool:
""" Imports Users from a given CSV-file to the system and DB
:param fname:
:type fname: str
:param db: Path to the sqlite db
:type db: str
:param userids: FIXME: Tuple which userids should we write
:type userids: tuple
:return: True on success, False when not
:rtype: bool
"""
if not os.path.isfile(fname):
print(f"File {fname} don't exist")
return None
return False
if not os.path.isfile(db):
print(f"The database file {db} don't exist")
return None
return False
if userids:
pass # empty tuple means everything
# noinspection PyBroadException
@ -83,7 +94,7 @@ if __name__ == "__main__":
if not args.file:
print("You MUST set a CSV-file with the -f/--file flag that already exist")
exit(1)
ImportFromFile(args.file, config['DEFAULT']['applications_db'])
import_from_file(args.file, config['DEFAULT']['applications_db'])
exit(0)
except KeyboardInterrupt as e:
pass

View file

@ -10,6 +10,16 @@ class ListUsers:
usersFetch = None
def __init__(self, db: str, unapproved: bool = False, approved: bool = True):
"""Constructs ListUsers
:param db: Database to access
:type db: str
:param unapproved: only List unapproved users
:type unapproved: bool
:param approved: only list approved users
:type approved: bool
"""
self.db = SQLitedb(db)
if unapproved: # only unapproved users
query = "SELECT * FROM `applications` WHERE `status` = '0'"
@ -20,6 +30,12 @@ class ListUsers:
self.usersFetch = self.db.query(query)
def output_as_list(self) -> str:
"""Generates a string with one (approved) user per line and one newline at the end
:rtype: str
:return: String consisting with one(activated) user per line
"""
list_str: str = ""
query = "SELECT `username` FROM `applications` WHERE `status` = '1' ORDER BY timestamp ASC"
self.usersFetch = self.db.query(query)
@ -30,12 +46,13 @@ class ListUsers:
def prettyPrint(self) -> None:
pass # see below why not implemented yet, texttable...
def getFetch(self) -> list:
def get_fetch(self) -> list:
""" Returns a complete fetch done by the sqlitedb-class
:return: Complete fetchall() in a dict-factory
:rtype: list
"""
return self.usersFetch
@ -75,7 +92,7 @@ if __name__ == "__main__":
if args.list:
ret = L.output_as_list()
else:
fetch = L.getFetch()
fetch = L.get_fetch()
ret += "ID %-1s| Username %-5s| Mail %-20s| Name %-17s| Registered %-8s | State |\n" % (
" ", " ", " ", " ", " "
)

View file

@ -101,7 +101,7 @@ if __name__ == "__main__":
print(f"Couldn't update database entry for {args.user}, didn't touch the system")
exit(1)
try:
Sysctl.removeUser(args.user)
Sysctl.remove_user(args.user)
except lib.UserExceptions.UnknownReturnCode as e:
print(f"Couldn't remove {args.user} from the system, unknown return code: {e}. DB is modified.")
exit(1)

View file

@ -5,11 +5,19 @@ import lib.UserExceptions
class System:
"""Class to interact with the system specifically to support our needs 0w0"""
"""Class to interact with the system specifically to support our needs 0w0
:Example:
>>> from lib.System import System as System
>>> Sys_ctl = System(dryrun=True)
>>> Sys_ctl.register("Bob")
>>> Sys_ctl.lock_user_pw("Bob")
>>> Sys_ctl.add_to_usergroup("Bob")
>>> Sys_ctl.make_ssh_usable("Bob", "sshkey")
"""
dry = False
dry: bool = False
create_command = []
home = ""
home: str = ""
def __init__(self, dryrun: bool = False, home: str = "/home/"):
"""Creates an objects. Can set dry run.
@ -19,6 +27,8 @@ class System:
:type dryrun: bool
:param home: Standard directory to search for the home directories of your users(default is /home/)
:type home: str
:raises:
ValueError: if homedir can not be found
"""
self.dry = dryrun
@ -37,6 +47,8 @@ class System:
:type cc: tuple
:return: True, if worked, raises lib.UserExceptions.UserExistsAlready when not
:rtype: bool
:raises:
lib.UserExceptions.UserExistsAlready: when username specified already exists on the system
"""
create_command = cc
cc = create_command + tuple([username])
@ -49,8 +61,15 @@ class System:
raise lib.UserExceptions.UserExistsAlready(f"User {username} exists already")
return True
def unregister(self, username: str):
self.removeUser(username)
def unregister(self, username: str) -> bool:
""" Just an alias function for removeUser
:param username: username to remove
:type username: str
:return: True, when success, False(or exception) when not
:rtype: bool
"""
return self.remove_user(username)
def make_ssh_usable(self, username: str, pubkey: str, sshdir: str = ".ssh/") -> bool:
""" Make SSH usable for our newly registered user
@ -64,6 +83,10 @@ class System:
:return: True, if worked, raises lib.UserExceptions.UnknownReturnCode, lib.UserExceptions.HomeDirExistsAlready
or lib.UserExceptions.ModifyFilesystem when not
:rtype: bool
:raises:
lib.UserExceptions.SSHDirUncreatable: if the ssh-dir couldnt be created nor exist
lib.UserExceptions.ModifyFilesystem: When chmod to .ssh and authorized_keys failed
lib.UserExceptions.General: if PWD cant find the specified user
"""
if self.dry:
@ -95,7 +118,16 @@ class System:
raise lib.UserExceptions.General(f"PWD can't find {username}: {e}")
return True
def write_ssh(self, key: str, ssh_dir: str):
@staticmethod
def write_ssh(key: str, ssh_dir: str) -> None:
""" Write SSH key to a specified directory(appends authorized_keys itself!)
:param key: Key to write
:type key: str
:param ssh_dir: SSH Directory to write to
:type ssh_dir: str
:return: None
"""
with open(ssh_dir + "authorized_keys", "w") as f:
print(key, file=f)
f.close()
@ -110,6 +142,8 @@ class System:
:type cc: tuple
:rtype: bool
:return: True, if worked, raises lib.UserExceptions.UnknownReturnCode when not
:raises:
lib.UserExceptions.UnknownReturnCode: When cc returns something else then 0
"""
lock_command = cc
@ -134,6 +168,8 @@ class System:
:type cc: tuple
:return: True, if worked, raises lib.UserExceptions.UnknownReturnCode when not
:rtype bool
:raises:
lib.UserExceptions.UnknownReturnCode: if cc returned something else then 0
"""
add_command = cc
@ -163,7 +199,7 @@ class System:
pp += i + " "
print(pp)
def removeUser(self, username: str, cc: tuple = tuple(["userdel", "-r"])) -> bool:
def remove_user(self, username: str, cc: tuple = tuple(["userdel", "-r"])) -> bool:
"""Removes the specified user from the system
:param username: The username you want to delete from the system.
@ -172,6 +208,8 @@ class System:
:type cc: tuple
:return: True, if worked, raises lib.UserExceptions.UnknownReturnCode when not
:rtype: bool
:raises:
lib.UserExceptions.UnknownReturnCode: When cc returns something else then 0 or 6
"""
remove_command = cc
@ -188,11 +226,11 @@ class System:
def AIO(username, pubkey, group="tilde"):
syst = System(dryrun=False)
syst.register(username)
syst.lock_user_pw(username)
syst.add_to_usergroup(username, group)
syst.make_ssh_usable(username, pubkey)
sys_ctl = System(dryrun=False)
sys_ctl.register(username)
sys_ctl.lock_user_pw(username)
sys_ctl.add_to_usergroup(username, group)
sys_ctl.make_ssh_usable(username, pubkey)
if __name__ == "__main__":

View file

@ -1,9 +1,19 @@
import re
import pwd
import lib.sqlitedb
import csv
def checkUsernameCharacters(username: str) -> bool:
""" Checks the Username for invalid characters. Allow only alphanumerical characters, a lower alpha one first,
Followed by any sequence of digits and characters
:param username: String to check for validity
:type username: str
:return: True when valid, False when not
:rtype: bool
"""
if " " not in username and "_" not in username and username.isascii() and username[:1].islower() and \
not username[0].isnumeric():
if not re.search(r"\W+", username):
@ -12,15 +22,35 @@ def checkUsernameCharacters(username: str) -> bool:
return False
def checkUsernameLength(username: str) -> bool:
if len(username) > 16:
def checkUsernameLength(username: str, upper_limit: int = 16, lower_limit: int = 3) -> bool:
""" Checks username for an upper and lower bounds limit character count
:param username: Username to check
:type username: str
:param upper_limit: Upper limit bounds to check for(default is 16)
:type upper_limit: int
:param lower_limit: Lower limit bounds to check for(default is 3)
:return: True, when all bounds are in, False when one or both aren't.
:rtype: bool
"""
if len(username) > upper_limit:
return False
if len(username) < 3:
if len(username) < lower_limit:
return False
return True
def checkUserExists(username: str) -> bool:
""" Checks if the User exists on the **SYSTEM** by calling PWD on it.
**Note**: You might want to use this in conjunction with checkUserInDB
:param username:
:type username: str
:return: True when exists, False when not
:rtype: bool
"""
try:
pwd.getpwnam(username)
except KeyError:
@ -30,6 +60,16 @@ def checkUserExists(username: str) -> bool:
def checkUserInDB(username: str, db: str) -> bool:
""" Checks users existence in the **DATABASE**.
:Note: You might want to use this in conjunction with `checkUserExists`
:param username: Username to check existence in database
:type username: str
:param db: Path to database to check in
:type db: str
:return: True, when User exists, False when not
"""
try:
ldb = lib.sqlitedb.SQLitedb(db)
fetched = ldb.safequery("SELECT * FROM 'applications' WHERE username = ?", tuple([username]))
@ -41,6 +81,16 @@ def checkUserInDB(username: str, db: str) -> bool:
def checkSSHKey(key: str) -> bool:
""" Checks SSH Key for meta-data that we accept.
:Note: We currently only allow ssh keys without options but with a mail address at the end in b64 encoded.
The currently supported algorithms are: ecdfsa-sha2-nistp256, 'ecdsa-sha2-nistp384', 'ecdsa-sha2-nistp521',
'ssh-rsa', 'ssh-dss' and 'ssh-ed25519'
:param key: Key to check
:return: True, when Key is valid, False when not
:rtype: bool
"""
# taken from https://github.com/hashbang/provisor/blob/master/provisor/utils.py, all belongs to them! ;)
import base64
if len(key) > 8192 or len(key) < 80:
@ -60,66 +110,110 @@ def checkSSHKey(key: str) -> bool:
def checkEmail(mail: str) -> bool:
""" Checks Mail against a relatively simple REgex Pattern.
:param mail: Mail to check
:type mail: str
:return: False, when the Mail is invalid, True when valid.
:rtype: bool
"""
if not re.match("(^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$)", mail):
return False
else:
return True
def checkDatetimeFormat(form: str) -> bool :
def checkDatetimeFormat(datetime_str: str) -> bool:
""" Checks a Strings format on date time.
:param datetime_str: String to check
:type datetime_str: str
:return: True when valid, False when not.
:rtype: bool
"""
import datetime
try:
datetime.datetime.strptime(form, "%Y-%m-%d %H:%M:%S")
datetime.datetime.strptime(datetime_str, "%Y-%m-%d %H:%M:%S")
except ValueError:
return False
return True
def checkName(name: str) -> bool:
""" Checks a users (real) Name against a real simple REgex Pattern.
:param name: Name/String to check
:type name: str
:return: True when valid, False when not.
:rtype: bool
"""
if not re.match("\w+\s*\w", name):
return False
else:
return True
def checkImportFile(fname: str, db: str) -> bool:
error_list = str()
def checkImportFile(path: str, db: str):
""" Checks an CSV file against most of the validators and prints an Error message with the line number corresponding
to said failure.. Those includes: checkName, checkUsernameCharacters,
ckeckUsernameLength, duplicate usernames(in the CSV), checkSSHKey, checkEmail, checkUserExists, checkUserInDB,
checkDatetimeformat and if the status is 1 or 0.
:param path: Path to file to check
:type path: str
:param db: Path to database file(SQLite)
:type db: str
:return: Str when Failure, True when success(All tests passed)
:rtype: Str or None
"""
errstr = ""
valid = True
ln = 1 # line number
with open(fname, 'r', newline='') as f:
import csv
reador = csv.DictReader(f)
for row in reador:
valid_names_list = []
with open(path, 'r', newline='') as f:
reader = csv.DictReader(f)
for row in reader:
# if any of this fails move on to the next user, just print a relatively helpful message lel
if not lib.validator.checkName(row["name"]):
error_list += f"Line{ln}: {row['name']} seems not legit. Character followed by character should be " \
f"correct.\n"
errstr += f"Line {ln}: Name: '{row['name']}' seems not legit. Character followed by character should" \
f" be correct.\n"
valid = False
if not lib.validator.checkUsernameCharacters(row["username"]):
error_list += (f"Line {ln}: Username contains unsupported characters or starts with a number: '"
f"{row['username']}'.\n")
errstr += (f"Line {ln}: Username contains unsupported characters or starts with a number: '"
f"{row['username']}'.\n")
valid = False
if not lib.validator.checkUsernameLength(row["username"]):
error_list += f"Line {ln}: Username '{row['username']}' is either too long(>16) or short(<3)\n"
errstr += f"Line {ln}: Username '{row['username']}' is either too long(>16) or short(<3)\n"
valid = False
# dup checking
if row["username"] in valid_names_list:
errstr += f"Line {ln}: Duplicate Username {row['username']}!\n"
valid = False
else:
valid_names_list.append(row["username"])
# dup end
if not lib.validator.checkSSHKey(row["pubkey"]):
error_list += f"Line {ln}: Following SSH-Key of user '{row['username']}' isn't valid: '{row['pubkey']}'."\
f"\n"
errstr += f"Line {ln}: Following SSH-Key of user '{row['username']}' isn't valid: " \
f"'{row['pubkey']}'.\n"
valid = False
if not lib.validator.checkEmail(row["email"]):
error_list += f"Line {ln}: E-Mail address of user '{row['username']}' '{row['email']}' is not valid.\n"
errstr += f"Line {ln}: E-Mail address of user '{row['username']}' '{row['email']}' is not valid.\n"
valid = False
if not lib.validator.checkUserExists(row["username"]) or checkUserInDB(row["username"], db):
error_list += f"Line {ln}: User '{row['username']}' already exists.\n"
errstr += f"Line {ln}: User '{row['username']}' already exists.\n"
valid = False
if not lib.validator.checkDatetimeFormat(row["timestamp"]):
error_list += f"Line {ln}: Timestamp '{row['timestamp']}' from user '{row['username']}' is invalid.\n"
errstr += f"Line {ln}: Timestamp '{row['timestamp']}' from user '{row['username']}' is invalid.\n"
valid = False
if int(row["status"]) > 1 or int(row["status"]) < 0:
error_list += f"Line {ln}: Status '{row['status']}' MUST be either 0 or 1.\n"
errstr += f"Line {ln}: Status '{row['status']}' MUST be either 0 or 1.\n"
valid = False
ln += 1
if valid:
return True
else:
return error_list
return errstr