feature-admin-split
#4
Manually merged
tilde
merged 79 commits from dark/ssh-reg:feature-admin-split
into master
5 years ago
@ -0,0 +1,4 @@
|
|||||||
|
__pycache__/
|
||||||
|
.idea/
|
||||||
|
test*
|
||||||
|
|
@ -0,0 +1,129 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
import configparser
|
||||||
|
import csv
|
||||||
|
import io
|
||||||
|
|
||||||
|
import ListUsers
|
||||||
|
import lib.uis.default as default_cmd # Follows -u, -a, -f flags
|
||||||
|
|
||||||
|
|
||||||
|
class Backup:
|
||||||
|
"""Backups a Tilde database to an CSV file
|
||||||
|
|
||||||
|
:Example:
|
||||||
|
>>> from Backup import Backup
|
||||||
|
>>> from ListUsers import ListUsers
|
||||||
|
>>> L = ListUsers.ListUsers("/path/to/sqlite").get_fetch()
|
||||||
|
>>> backup_db = Backup("stdout")
|
||||||
|
>>> backup_db.backup_to_file(L)
|
||||||
|
CSV-Separated list with headers in first row
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
filename: str
|
||||||
|
quoting: int
|
||||||
|
dialect: str
|
||||||
|
field_names: tuple
|
||||||
|
|
||||||
|
def __init__(self, output: str, quoting: int = csv.QUOTE_NONNUMERIC, dialect: str = "excel"):
|
||||||
|
""" Constructs the Backup object
|
||||||
|
|
||||||
|
:param output: File name to backup to(set to stdout for stdout)
|
||||||
|
:type output: str
|
||||||
|
:param quoting: Set quoting for CSV Module
|
||||||
|
:type quoting: int
|
||||||
|
:param dialect: Set the CSV-Dialect. Defaults to excel, which is the classic CSV
|
||||||
|
:type dialect: str
|
||||||
|
"""
|
||||||
|
|
||||||
|
self.setFilename(output)
|
||||||
|
self.setQuoting(quoting)
|
||||||
|
self.setDialect(dialect)
|
||||||
|
self.setFieldnames(tuple(['id', 'username', 'email', 'name', 'pubkey', 'timestamp', 'status']))
|
||||||
|
|
||||||
|
def setDialect(self, dialect: str) -> None:
|
||||||
|
""" Set dialect for Object
|
||||||
|
|
||||||
|
:param dialect: Dialect to set for Object
|
||||||
|
:type dialect: str
|
||||||
|
:return: None
|
||||||
|
:rtype: None
|
||||||
|
"""
|
||||||
|
|
||||||
|
self.dialect = dialect
|
||||||
|
|
||||||
|
def setQuoting(self, quoting: int) -> None:
|
||||||
|
""" Set quoting in the CSV(must be supported by the CSV Module!)
|
||||||
|
|
||||||
|
:param quoting: Quoting Integer given by csv.QUOTE_* constants
|
||||||
|
:type quoting: int
|
||||||
|
:return: None
|
||||||
|
:rtype: None
|
||||||
|
"""
|
||||||
|
|
||||||
|
self.quoting = quoting
|
||||||
|
|
||||||
|
def setFilename(self, filename: str) -> None:
|
||||||
|
""" Sets Filename to output to
|
||||||
|
|
||||||
|
:param filename: Filename to output to(set stdout for stdout)
|
||||||
|
:type filename: str
|
||||||
|
:return: None
|
||||||
|
:rtype: None
|
||||||
|
"""
|
||||||
|
|
||||||
|
self.filename = filename
|
||||||
|
|
||||||
|
def setFieldnames(self, f_names: tuple) -> None:
|
||||||
|
""" Set fieldname to process
|
||||||
|
|
||||||
|
:param f_names: Fieldnames-Tuple
|
||||||
|
:type f_names: tuple
|
||||||
|
:return: None
|
||||||
|
:rtype: None
|
||||||
|
"""
|
||||||
|
|
||||||
|
self.field_names = f_names
|
||||||
|
|
||||||
|
def backup_to_file(self, fetched: list) -> bool:
|
||||||
|
"""Backup Userlist to File(or stdout)
|
||||||
|
|
||||||
|
:param fetched: List of values to write out CSV-formatted
|
||||||
|
:return: True, if success, None when not.
|
||||||
|
:rtype: bool
|
||||||
|
"""
|
||||||
|
|
||||||
|
returner = io.StringIO()
|
||||||
|
write_csv = csv.DictWriter(returner, fieldnames=self.field_names, quoting=self.quoting, dialect=self.dialect)
|
||||||
|
write_csv.writeheader()
|
||||||
|
for row in fetched:
|
||||||
|
write_csv.writerow(dict(row))
|
||||||
|
# sqlite3.Row doesn't "easily" convert to a dict itself sadly, so just a quick help from us here
|
||||||
|
# it actually even delivers a list(sqlite3.Row) also, which doesnt make the life a whole lot easier
|
||||||
|
|
||||||
|
if self.filename == "stdout":
|
||||||
|
print(returner.getvalue())
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
with open(self.filename, "w") as f:
|
||||||
|
print(returner.getvalue(), file=f)
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
default_cmd.argparser.description += " - Backups Tilde Users to stdout or a file."
|
||||||
|
args = default_cmd.argparser.parse_args()
|
||||||
|
config = configparser.ConfigParser()
|
||||||
|
config.read(args.config)
|
||||||
|
L = ListUsers.ListUsers(config['DEFAULT']['applications_db'],
|
||||||
|
unapproved=args.unapproved, approved=args.approved)
|
||||||
|
fetch = L.get_fetch()
|
||||||
|
if fetch:
|
||||||
|
B = Backup(args.file)
|
||||||
|
B.setFieldnames(fetch[0].keys()) # sqlite3.row delivers its keys for us! SO NICE!
|
||||||
|
B.backup_to_file(fetch)
|
||||||
|
else:
|
||||||
|
print("nothing to backup!")
|
||||||
|
exit(1)
|
||||||
|
exit(0)
|
@ -0,0 +1,87 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
import configparser
|
||||||
|
import csv
|
||||||
|
import os
|
||||||
|
|
||||||
|
import lib.UserExceptions
|
||||||
|
import lib.uis.config_ui # dont go to default, just following -c flag
|
||||||
|
|
||||||
|
|
||||||
|
def import_from_file(file_path: str, db: str, user_ids: tuple = tuple([])) -> bool:
|
||||||
|
""" Imports Users from a given CSV-file to the system and DB
|
||||||
|
|
||||||
|
:param file_path:
|
||||||
|
:type file_path: str
|
||||||
|
:param db: Path to the sqlite db
|
||||||
|
:type db: str
|
||||||
|
:param user_ids: FIXME: Tuple which user_ids should we write
|
||||||
|
:type user_ids: tuple
|
||||||
|
:return: True on success, False when not
|
||||||
|
:rtype: bool
|
||||||
|
"""
|
||||||
|
if not os.path.isfile(file_path):
|
||||||
|
print(f"File {file_path} don't exist")
|
||||||
|
return False
|
||||||
|
if not os.path.isfile(db):
|
||||||
|
print(f"The database file {db} don't exist")
|
||||||
|
return False
|
||||||
|
if user_ids:
|
||||||
|
pass # empty tuple means everything
|
||||||
|
# noinspection PyBroadException
|
||||||
|
try:
|
||||||
|
with open(file_path, 'r', newline='') as f:
|
||||||
|
import lib.Validator
|
||||||
|
sql = lib.sqlitedb.SQLiteDB(db)
|
||||||
|
err = lib.Validator.checkImportFile(file_path, db)
|
||||||
|
if err is not True:
|
||||||
|
print(err)
|
||||||
|
exit(0)
|
||||||
|
import lib.sqlitedb
|
||||||
|
import lib.System
|
||||||
|
sys_ctl = lib.System.System("root")
|
||||||
|
reader = csv.DictReader(f) # @TODO csv.Sniffer to compare? When yes, give force-accept option
|
||||||
|
for row in reader:
|
||||||
|
if row["status"] == "1":
|
||||||
|
try:
|
||||||
|
sys_ctl.setUser(row["username"])
|
||||||
|
sys_ctl.aio_approve(row["pubkey"])
|
||||||
|
print(row['username'], "====> Registered.")
|
||||||
|
except lib.UserExceptions.General as GeneralExcept:
|
||||||
|
print(f"Something didnt work out! {GeneralExcept}")
|
||||||
|
elif row["status"] == "0":
|
||||||
|
print(row['username'] + " not approved, therefore not registered.")
|
||||||
|
try:
|
||||||
|
sql.safequery(
|
||||||
|
"INSERT INTO `applications` (username, name, timestamp, email, pubkey, status) "
|
||||||
|
"VALUES (?,?,?,?,?,?)", tuple([row["username"], row["name"], row["timestamp"],
|
||||||
|
row["email"], row["pubkey"], row["status"]]))
|
||||||
|
except OSError as E:
|
||||||
|
pass
|
||||||
|
print(f"UUFFF, something went WRONG with the file {file_path}: {E}")
|
||||||
|
except Exception as didntCatch:
|
||||||
|
print(f"Exception! UNCATCHED! {type(didntCatch)}: {didntCatch}")
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
|
||||||
|
ArgParser = lib.uis.config_ui.argparser
|
||||||
|
ArgParser.description += "- Imports a CSV file consisting of user specific details to the database"
|
||||||
|
ArgParser.add_argument('-f', '--file', default="stdout",
|
||||||
|
type=str, help='Import from CSV file', required=True)
|
||||||
|
ArgParser.add_argument('--Import', default=False, action="store_true",
|
||||||
|
help="Import Users.", required=True)
|
||||||
|
args = ArgParser.parse_args()
|
||||||
|
config = configparser.ConfigParser()
|
||||||
|
config.read(args.config)
|
||||||
|
|
||||||
|
if not args.Import:
|
||||||
|
print("Error, need the import flag")
|
||||||
|
if not args.file:
|
||||||
|
print("Error, need the import file")
|
||||||
|
if not args.file:
|
||||||
|
print("You MUST set a CSV-file with the -f/--file flag that already exist")
|
||||||
|
exit(1)
|
||||||
|
import_from_file(args.file, config['DEFAULT']['applications_db'])
|
||||||
|
exit(0)
|
@ -0,0 +1,120 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
import configparser
|
||||||
|
import sqlite3 # sqlite3.Row-Object
|
||||||
|
from typing import List # Typing support!
|
||||||
|
|
||||||
|
import lib.uis.default as default_cmd # Follows -u, -a, -f flags
|
||||||
|
from lib.sqlitedb import SQLiteDB
|
||||||
|
|
||||||
|
|
||||||
|
class ListUsers:
|
||||||
|
db = None
|
||||||
|
usersFetch = None
|
||||||
|
|
||||||
|
def __init__(self, db: str, unapproved: bool = False, approved: bool = True, single_user: str = None):
|
||||||
|
"""Constructs ListUsers
|
||||||
|
|
||||||
|
:param db: Database to access
|
||||||
|
:type db: str
|
||||||
|
:param unapproved: only List unapproved users
|
||||||
|
:type unapproved: bool
|
||||||
|
:param approved: only list approved users
|
||||||
|
:type approved: bool
|
||||||
|
"""
|
||||||
|
|
||||||
|
self.db = SQLiteDB(db)
|
||||||
|
if unapproved: # only unapproved users
|
||||||
|
query = "SELECT * FROM `applications` WHERE `status` = '0'"
|
||||||
|
elif approved: # Approved users
|
||||||
|
query = "SELECT * FROM `applications` WHERE `status` = '1'"
|
||||||
|
else: # All users
|
||||||
|
query = "SELECT * FROM `applications`"
|
||||||
|
self.usersFetch = self.db.query(query)
|
||||||
|
if single_user is not None:
|
||||||
|
query = "SELECT * FROM `applications` WHERE `username` = ?"
|
||||||
|
self.usersFetch = self.db.safequery(query, tuple([single_user]))
|
||||||
|
|
||||||
|
def output_as_list(self) -> str:
|
||||||
|
"""Generates a string with one (approved) user per line and one newline at the end
|
||||||
|
|
||||||
|
:rtype: str
|
||||||
|
:return: String consisting with one(activated) user per line
|
||||||
|
"""
|
||||||
|
|
||||||
|
list_str: str = ""
|
||||||
|
query = "SELECT `username` FROM `applications` WHERE `status` = '1' ORDER BY timestamp ASC"
|
||||||
|
self.usersFetch = self.db.query(query)
|
||||||
|
for users in self.usersFetch:
|
||||||
|
list_str += users["username"] + "\n"
|
||||||
|
return list_str
|
||||||
|
|
||||||
|
def prettyPrint(self) -> None:
|
||||||
|
pass # see below why not implemented yet, texttable...
|
||||||
|
|
||||||
|
def get_fetch(self) -> List[sqlite3.Row]:
|
||||||
|
""" Returns a complete fetch done by the lib.sqlitedb-class
|
||||||
|
|
||||||
|
:return: Complete fetchall(). A List[sqlite3.Row] with dict-emulation objects.
|
||||||
|
:rtype: List[sqlite3.Row]
|
||||||
|
"""
|
||||||
|
|
||||||
|
return self.usersFetch
|
||||||
|
|
||||||
|
|
||||||
|
# @TODO MAYBE best solution: https://pypi.org/project/texttable/
|
||||||
|
# examle:
|
||||||
|
"""
|
||||||
|
from texttable import Texttable
|
||||||
|
t = Texttable()
|
||||||
|
t.add_rows([['Name', 'Age'], ['Alice', 24], ['Bob', 19]])
|
||||||
|
print(t.draw())
|
||||||
|
---------------> Results in:
|
||||||
|
|
||||||
|
+-------+-----+
|
||||||
|
| Name | Age |
|
||||||
|
+=======+=====+
|
||||||
|
| Alice | 24 |
|
||||||
|
+-------+-----+
|
||||||
|
| Bob | 19 |
|
||||||
|
+-------+-----+
|
||||||
|
|
||||||
|
for user in fetch:
|
||||||
|
print("ID: {}; Username: \"{}\"; Mail: {}; Name: \"{}\"; Registered: {}; Status: {}".format(
|
||||||
|
user["id"], user["username"], user["email"], user["name"], user["timestamp"], user["status"]
|
||||||
|
))
|
||||||
|
"""
|
||||||
|
if __name__ == "__main__":
|
||||||
|
default_cmd.argparser.description += " - Lists Users from the Tilde database."
|
||||||
|
default_cmd.argparser.add_argument('--list-asc', default=False, action="store_true",
|
||||||
|
help='Output a newline seperated list of users', required=False, dest="args_asc")
|
||||||
|
default_cmd.argparser.add_argument('--user', default=None, type=str,
|
||||||
|
help="Just show a specific user by it's name", required=False)
|
||||||
|
args = default_cmd.argparser.parse_args()
|
||||||
|
config = configparser.ConfigParser()
|
||||||
|
config.read(args.config)
|
||||||
|
|
||||||
|
ret = ""
|
||||||
|
if args.user is not None:
|
||||||
|
L = ListUsers(config['DEFAULT']['applications_db'], unapproved=args.unapproved, approved=args.approved,
|
||||||
|
single_user=args.user)
|
||||||
|
else:
|
||||||
|
L = ListUsers(config['DEFAULT']['applications_db'], unapproved=args.unapproved, approved=args.approved)
|
||||||
|
if args.args_asc:
|
||||||
|
ret = L.output_as_list()
|
||||||
|
else:
|
||||||
|
fetch = L.get_fetch()
|
||||||
|
ret += "ID %-1s| Username %-5s| Mail %-20s| Name %-17s| Registered %-8s | State |\n" % (
|
||||||
|
" ", " ", " ", " ", " "
|
||||||
|
)
|
||||||
|
ret += 102 * "-" + "\n"
|
||||||
|
for user in fetch:
|
||||||
|
ret += "%-4i| %-14s| %-25s| %-22s| %-8s | %-5i |\n" % (
|
||||||
|
user["id"], user["username"], user["email"], user["name"], user["timestamp"], user["status"]
|
||||||
|
)
|
||||||
|
if args.file != "stdout":
|
||||||
|
with open(args.file, 'w') as f:
|
||||||
|
print(ret, file=f)
|
||||||
|
else:
|
||||||
|
print(ret)
|
||||||
|
exit(0)
|
@ -0,0 +1,147 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
import configparser
|
||||||
|
import sqlite3
|
||||||
|
|
||||||
|
import lib.System
|
||||||
|
import lib.UserExceptions
|
||||||
|
import lib.Validator
|
||||||
|
import lib.sqlitedb
|
||||||
|
import lib.uis.config_ui # only follow -c flag
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
lib.uis.config_ui.argparser.description += " - Edit Tilde Users"
|
||||||
|
ArgParser = lib.uis.config_ui.argparser
|
||||||
|
ArgParser.add_argument('--user', type=str,
|
||||||
|
help='Tilde users name to edit', required=True)
|
||||||
|
|
||||||
|
Mutually = ArgParser.add_mutually_exclusive_group()
|
||||||
|
Mutually.add_argument('-r', '--remove', default=False, action="store_true",
|
||||||
|
help='Remove an approved/unapproved User from the system(and DB). Effectively purges him.',
|
||||||
|
required=False)
|
||||||
|
Mutually.add_argument("--verify", default=True, action="store_false",
|
||||||
|
help="Turns off value checks",
|
||||||
|
required=False)
|
||||||
|
|
||||||
|
ArgParser.add_argument('--sshpubkey', type=str, default=None,
|
||||||
|
help="Stores the new given SSH-Key in given user", required=False)
|
||||||
|
ArgParser.add_argument('--name', type=str, default=None,
|
||||||
|
help="Sets the stored name of the given user")
|
||||||
|
ArgParser.add_argument('--username', type=str, default=None,
|
||||||
|
help="Rename given User")
|
||||||
|
ArgParser.add_argument('--email', type=str, default=None,
|
||||||
|
help="Set new email address for given user")
|
||||||
|
ArgParser.add_argument('--status', type=int, default=None,
|
||||||
|
help="Set status of given user")
|
||||||
|
args = ArgParser.parse_args()
|
||||||
|
config = configparser.ConfigParser()
|
||||||
|
config.read(args.config)
|
||||||
|
db = config['DEFAULT']['applications_db']
|
||||||
|
if not args.sshpubkey and not args.name and not args.username and not args.email and args.status is None \
|
||||||
|
and not args.remove:
|
||||||
|
print(f"Well, SOMETHING must be done with {args.user} ;-)")
|
||||||
|
exit(1)
|
||||||
|
# --> --user
|
||||||
|
if not lib.Validator.checkUserInDB(args.user, db):
|
||||||
|
print(f"User {args.user} does not exist in the database.")
|
||||||
|
exit(1)
|
||||||
|
DB = lib.sqlitedb.SQLiteDB(db)
|
||||||
|
sys_ctl = lib.System.System(args.user)
|
||||||
|
if not DB:
|
||||||
|
print("Could not establish connection to database")
|
||||||
|
exit(1)
|
||||||
|
CurrentUser = DB.safequery("SELECT * FROM `applications` WHERE `username`=?", tuple([args.user]))[0]
|
||||||
|
|
||||||
|
# --> --remove
|
||||||
|
if args.remove:
|
||||||
|
print(f"Removing {args.user} from the system and the database...")
|
||||||
|
try:
|
||||||
|
DB.removeApplicantFromDBperUsername(args.user)
|
||||||
|
print(f"Purged from the DB")
|
||||||
|
if CurrentUser["status"] == 1:
|
||||||
|
sys_ctl.remove_user()
|
||||||
|
print(f"Purged from the system")
|
||||||
|
else:
|
||||||
|
print(f"'{args.user}' was not approved before, therefore not deleting from system itself.")
|
||||||
|
except lib.UserExceptions.General as e:
|
||||||
|
print(f"{e}")
|
||||||
|
exit(1)
|
||||||
|
print(f"Successfully removed '{args.user}'.")
|
||||||
|
exit(0)
|
||||||
|
|
||||||
|
# --> --sshpubkey
|
||||||
|
if args.sshpubkey:
|
||||||
|
if not lib.Validator.checkSSHKey(args.sshpubkey):
|
||||||
|
print(f"Pubkey '{args.sshpubkey}' isn't valid.")
|
||||||
|
exit(1)
|
||||||
|
try:
|
||||||
|
DB.safequery("UPDATE `applications` SET `pubkey`=? WHERE `username`=?",
|
||||||
|
tuple([args.sshpubkey, args.user]))
|
||||||
|
CurrentUser = DB.safequery("SELECT * FROM `applications` WHERE `username` = ? ", tuple([args.user]))[0]
|
||||||
|
if int(CurrentUser["status"]) == 1:
|
||||||
|
sys_ctl.make_ssh_usable(args.sshpubkey)
|
||||||
|
except sqlite3.Error as e:
|
||||||
|
print(f"Something unexpected happened! {e}")
|
||||||
|
exit(1)
|
||||||
|
except lib.UserExceptions.ModifyFilesystem as e:
|
||||||
|
print(f"One action failed during writing the ssh key back to the authorization file. {e}")
|
||||||
|
print(f"'{args.user}'s SSH-Key updated successfully.")
|
||||||
|
|
||||||
|
# --> --name
|
||||||
|
if args.name:
|
||||||
|
if not lib.Validator.checkName(args.name):
|
||||||
|
print(f"'{args.name}' is not a valid Name.")
|
||||||
|
exit(1)
|
||||||
|
try:
|
||||||
|
DB.safequery("UPDATE `applications` SET `name` =? WHERE `username` =?", tuple([args.name, args.user]))
|
||||||
|
except sqlite3.Error as e:
|
||||||
|
print(f"Could not write '{args.name}' to database: {e}")
|
||||||
|
print(f"'{args.user}'s Name changed to '{args.name}'.")
|
||||||
|
|
||||||
|
# --> --email
|
||||||
|
if args.email:
|
||||||
|
if not lib.Validator.checkEmail(args.email):
|
||||||
|
print(f"'{args.email}' is not a valid Mail address!")
|
||||||
|
exit(1)
|
||||||
|
try:
|
||||||
|
DB.safequery("UPDATE `applications` SET `email` =? WHERE `username` =?", tuple([args.email]))
|
||||||
|
except sqlite3.Error as e:
|
||||||
|
print(f"Could not write '{args.email}' to the database. {e}")
|
||||||
|
print(f"'{args.user}' Mail changed to '{args.email}'.")
|
||||||
|
|
||||||
|
# --> --status
|
||||||
|
if args.status is not None:
|
||||||
|
if args.status != 0 and args.status != 1:
|
||||||
|
print("Only 0 and 1 are valid status, where 1 is activated and 0 is unapproved.")
|
||||||
|
exit(0)
|
||||||
|
|
||||||
|
# just takes first result out of the dict
|
||||||
|
if args.status == int(CurrentUser["status"]):
|
||||||
|
print(f"New and old status are the same.")
|
||||||
|
|
||||||
|
if args.status == 0 and int(CurrentUser["status"]) == 1:
|
||||||
|
try:
|
||||||
|
DB.safequery("UPDATE `applications` SET `status` =? WHERE `id`=?",
|
||||||
|
tuple([args.status, CurrentUser["id"]]))
|
||||||
|
sys_ctl.remove_user()
|
||||||
|
except sqlite3.Error as e:
|
||||||
|
print(f"Could not update database entry for '{args.user}', did not touch the system")
|
||||||
|
exit(1)
|
||||||
|
except lib.UserExceptions.UnknownReturnCode as e:
|
||||||
|
print(f"Could not remove '{args.user}' from the system, unknown return code: {e}. DB is modified.")
|
||||||
|
exit(1)
|
||||||
|
print(f"Successfully changed '{args.user}'s status to 0 and cleared from the system.")
|
||||||
|
|
||||||
|
if args.status == 1 and int(CurrentUser["status"]) == 0:
|
||||||
|
try:
|
||||||
|
DB.safequery("UPDATE `applications` SET `status`=? WHERE `username`=?",
|
||||||
|
tuple([args.status, args.user]))
|
||||||
|
sys_ctl.aio_approve(CurrentUser["pubkey"])
|
||||||
|
except sqlite3.Error as e:
|
||||||
|
print(f"Could not update Users status in database")
|
||||||
|
exit(1)
|
||||||
|
except lib.UserExceptions.General as ChangeUser:
|
||||||
|
print(f"Some chain in the cattle just slipped away, my lord! {ChangeUser}")
|
||||||
|
exit(1)
|
||||||
|
print(f"Successfully changed '{args.user}'s status to 1 and created on the system.")
|
||||||
|
exit(0)
|
@ -0,0 +1,6 @@
|
|||||||
|
import configparser
|
||||||
|
import lib.uis.default as default_cmd
|
||||||
|
|
||||||
|
args = default_cmd.argparser.parse_args()
|
||||||
|
config = configparser.ConfigParser()
|
||||||
|
config.read(args.config)
|
@ -0,0 +1,260 @@
|
|||||||
|
import os
|
||||||
|
import pwd
|
||||||
|
import subprocess
|
||||||
|
|
||||||
|
import lib.UserExceptions
|
||||||
|
|
||||||
|
|
||||||
|
class System:
|
||||||
|
"""Class to interact with the system specifically to support our needs 0w0
|
||||||
|
:Example:
|
||||||
|
>>> from lib.System import System as System
|
||||||
|
>>> Sys_ctl = System("Test", dryrun=True)
|
||||||
|
>>> Sys_ctl.register()
|
||||||
|
>>> Sys_ctl.lock_user_pw()
|
||||||
|
>>> Sys_ctl.add_to_usergroup()
|
||||||
|
>>> Sys_ctl.make_ssh_usable("sshkey")
|
||||||
|
"""
|
||||||
|
|
||||||
|
dry: bool = False
|
||||||
|
create_command = []
|
||||||
|
home: str = ""
|
||||||
|
user: str
|
||||||
|
|
||||||
|
def setUser(self, username: str):
|
||||||
|
self.user = username
|
||||||
|
|
||||||
|
def __init__(self, username: str, dryrun: bool = False, home: str = "/home/"):
|
||||||
|
"""Creates an objects. Can set dry run.
|
||||||
|
|
||||||
|
:param username: Username to manipulate
|
||||||
|
:type username: str
|
||||||
|
:param dryrun: Run all command in a dry-run? When enabled, doesn't make any changes to the system (defaults to
|
||||||
|
false)
|
||||||
|
:type dryrun: bool
|
||||||
|
:param home: Standard directory to search for the home directories of your users(default is /home/)
|
||||||
|
:type home: str
|
||||||
|
:raises:
|
||||||
|
ValueError: if homedir can not be found
|
||||||
|
"""
|
||||||
|
|
||||||
|
self.dry = dryrun
|
||||||
|
if not home.endswith("/"):
|
||||||
|
home += "/"
|
||||||
|
if not os.path.isdir(home):
|
||||||
|
raise ValueError("home should be an existent directory...")
|
||||||
|
self.home = home
|
||||||
|
self.user = username
|
||||||
|
|
||||||
|
def aio_approve(self, pubkey, group="tilde"):
|
||||||
|
""" Executes all neccessary steps to create a user from itself. Raises ALOT of possible exceptions
|
||||||
|
|
||||||
|
:Note: CAREFULL! You MUST except the exceptions!
|
||||||
|
:param pubkey: Users public ssh key
|
||||||
|
:type pubkey: str
|
||||||
|
:param group: User-group. Defaults to tilde
|
||||||
|
:type group: str
|
||||||
|
:return: None
|
||||||
|
:raises:
|
||||||
|
lib.UserExceptions.UserExistsAlready: User Exists already on system
|
||||||
|
lib.UserExceptions.UnknownReturnCode: Unknown Return Code from useradd
|
||||||
|
lib.UserExceptions.SSHDirUncreatable: Users SSH Dir couldnt be created
|
||||||
|
lib.UserExceptions.ModifyFilesystem: Something with CHMOD failed
|
||||||
|
"""
|
||||||
|
self.register()
|
||||||
|
self.lock_user_pw()
|
||||||
|
self.add_to_usergroup(group)
|
||||||
|
self.make_ssh_usable(pubkey)
|
||||||
|
|
||||||
|
def register(self, cc: tuple = tuple(["useradd", "-m"])) -> bool:
|
||||||
|
"""Creates an local account for the given username
|
||||||
|
|
||||||
|
:param cc: Tuple with commands separated to execute on the machine. (defaults to useradd -m)
|
||||||
|
:type cc: tuple
|
||||||
|
:return: True, if worked, raises lib.UserExceptions.UserExistsAlready when not
|
||||||
|
:rtype: bool
|
||||||
|
:raises:
|
||||||
|
lib.UserExceptions.UserExistsAlready: when username specified already exists on the system
|
||||||
|
"""
|
||||||
|
|
||||||
|
create_command = cc
|
||||||
|
cc = create_command + tuple([self.user])
|
||||||
|
if self.dry:
|
||||||
|
self.printTuple(cc)
|
||||||
|
return True
|
||||||
|
elif not self.dry:
|
||||||
|
rt = subprocess.call(cc)
|
||||||
|
if rt != 0:
|
||||||
|
raise lib.UserExceptions.UserExistsAlready(self.user)
|
||||||
|
return True
|
||||||
|
|
||||||
|
def unregister(self) -> bool:
|
||||||
|
""" Just an alias function for removeUser
|
||||||
|
|
||||||
|
:return: True, when success, False(or exception) when not
|
||||||
|
:rtype: bool
|
||||||
|
"""
|
||||||
|
|
||||||
|
return self.remove_user()
|
||||||
|
|
||||||
|
def make_ssh_usable(self, pubkey: str, sshdir: str = ".ssh/") -> bool:
|
||||||
|
""" Make SSH usable for our newly registered user
|
||||||
|
|
||||||
|
:param pubkey: Public SSH Key for the User you want accessible by SSH
|
||||||
|
:type pubkey: str
|
||||||
|
:param sshdir: Directory to write the authorized_keys File to. PWD is $HOME of said user. (defaults to ".ssh/")
|
||||||
|
:type sshdir: str
|
||||||
|
:return: True, if worked, raises lib.UserExceptions.UnknownReturnCode, lib.UserExceptions.HomeDirExistsAlready
|
||||||
|
or lib.UserExceptions.ModifyFilesystem when not
|
||||||
|
:rtype: bool
|
||||||
|
:raises:
|
||||||
|
lib.UserExceptions.SSHDirUncreatable: if the ssh-dir couldnt be created nor exist
|
||||||
|
lib.UserExceptions.ModifyFilesystem: When chmod to .ssh and authorized_keys failed
|
||||||
|
lib.UserExceptions.General: if PWD cant find the specified user
|
||||||
|
"""
|
||||||
|
|
||||||
|
if self.dry:
|
||||||
|
print("Nah, @TODO, but actually kinda too lazy for this lul. Just a lot happening here")
|
||||||
|
return True
|
||||||
|
if not sshdir.endswith("/"):
|
||||||
|
sshdir += "/"
|
||||||
|
ssh_dir = self.home + self.user + "/" + sshdir
|
||||||
|
try:
|
||||||
|
os.mkdir(ssh_dir)
|
||||||
|
except FileExistsError:
|
||||||
|
pass # thats actually a good one for us :D
|
||||||
|
except OSError as e:
|
||||||
|
raise lib.UserExceptions.SSHDirUncreatable(f"Could not create {ssh_dir}: Exception: {e}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.write_ssh(pubkey, ssh_dir)
|
||||||
|
except OSError as e:
|
||||||
|
raise lib.UserExceptions.ModifyFilesystem(
|
||||||
|
f"Could not write and/or chmod 0700 {ssh_dir} or {ssh_dir}/authorized_keys, Exception: {e}")
|
||||||
|
try:
|
||||||
|
pwdnam = pwd.getpwnam(self.user)
|
||||||
|
os.chown(ssh_dir, pwdnam[2], pwdnam[3]) # 2=>uid, 3=>gid
|
||||||
|
os.chown(ssh_dir + "authorized_keys", pwd.getpwnam(self.user)[2], pwd.getpwnam(self.user)[3])
|
||||||
|
except OSError as e: # by os.chown
|
||||||
|
raise lib.UserExceptions.ModifyFilesystem(
|
||||||
|
f"Could not chown {ssh_dir} and/or authorized_keys to {self.user} and their group, Exception: {e}", )
|
||||||
|
except KeyError as e: # by PWD
|
||||||
|
raise lib.UserExceptions.General(f"PWD can't find {self.user}: {e}")
|
||||||
|
return True
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def write_ssh(key: str, ssh_dir: str) -> None:
|
||||||
|
""" Write SSH key to a specified directory(appends authorized_keys itself!)
|
||||||
|
|
||||||
|
:param key: Key to write
|
||||||
|
:type key: str
|
||||||
|
:param ssh_dir: SSH Directory to write to
|
||||||
|
:type ssh_dir: str
|
||||||
|
:return: None
|
||||||
|
"""
|
||||||
|
|
||||||
|
with open(ssh_dir + "authorized_keys", "w") as f:
|
||||||
|
print(key, file=f)
|
||||||
|
f.close()
|
||||||
|
os.chmod(ssh_dir + "authorized_keys", 0o700) # we dont care about the directory here
|
||||||
|
|
||||||
|
def lock_user_pw(self, cc: tuple = tuple(["usermod", "--lock"])) -> bool:
|
||||||
|
"""Lock a users password so it stays empty
|
||||||
|
|
||||||
|
:param cc: Commands to run in the subprocess to lock it down(defaults to usermod --lock)
|
||||||
|
:type cc: tuple
|
||||||
|
:rtype: bool
|
||||||
|
:return: True, if worked, raises lib.UserExceptions.UnknownReturnCode when not
|
||||||
|
:raises:
|
||||||
|
lib.UserExceptions.UnknownReturnCode: When cc returns something else then 0
|
||||||
|
"""
|
||||||
|
|
||||||
|
lock_command = cc
|
||||||
|
cc = lock_command + tuple([self.user])
|
||||||
|
if self.dry:
|
||||||
|
self.printTuple(cc)
|
||||||
|
return True
|
||||||
|
elif not self.dry:
|
||||||
|
rt = subprocess.call(cc)
|
||||||
|
if rt != 0:
|
||||||
|
raise lib.UserExceptions.UnknownReturnCode(f"Could not lock user '{self.user}'; '{cc}' returned '{rt}'")
|
||||||
|
return True
|
||||||
|
|
||||||
|
def add_to_usergroup(self, group: str = "tilde", cc: tuple = tuple(["usermod", "-a", "-G"])) -> bool:
|
||||||
|
""" Adds a given user to a given group
|
||||||
|
|
||||||
|
:param group: Groupname where you want to add your user to
|
||||||
|
:type group: str
|
||||||
|
:param cc: Commands to execute that adds your user to said specific group(defaults to usermod -a -G")
|
||||||
|
:type cc: tuple
|
||||||
|
:return: True, if worked, raises lib.UserExceptions.UnknownReturnCode when not
|
||||||
|
:rtype bool
|
||||||
|
:raises:
|
||||||
|
lib.UserExceptions.UnknownReturnCode: if cc returned something else then 0
|
||||||
|
"""
|
||||||
|
|
||||||
|
add_command = cc
|
||||||
|
cc = add_command + tuple([group, self.user])
|
||||||
|
if self.dry:
|
||||||
|
self.printTuple(cc)
|
||||||
|
return True
|
||||||
|
elif not self.dry:
|
||||||
|
rt = subprocess.call(cc)
|
||||||
|
if rt != 0:
|
||||||
|
raise lib.UserExceptions.UnknownReturnCode(
|
||||||
|
f"Could not add user '{self.user}' to group '{group}' with command '{cc}', returned '{rt}'", )
|
||||||
|
return True
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def printTuple(tup: tuple) -> None:
|
||||||
|
"""Prints a tuple with spaces as separators
|
||||||
|
|
||||||
|
:param tup: Tuple you want to print
|
||||||
|
:type tup: tuple
|
||||||
|
:rtype: None
|
||||||
|
:returns: Nothing
|
||||||
|
"""
|
||||||
|
|
||||||
|
pp = ""
|
||||||
|
for i in tup:
|
||||||
|
pp += i + " "
|
||||||
|
print(pp)
|
||||||
|
|
||||||
|
def remove_user(self, cc: tuple = tuple(["userdel", "-r"])) -> bool:
|
||||||
|
"""Removes the specified user from the system
|
||||||
|
|
||||||
|
:param cc: Commands to execute to delete the user from the System(defaults to userdel -r)
|
||||||
|
:type cc: tuple
|
||||||
|
:return: True, if worked, raises lib.UserExceptions.UnknownReturnCode when not
|
||||||
|
:rtype: bool
|
||||||
|
:raises:
|
||||||
|
lib.UserExceptions.UnknownReturnCode: When cc returns something else then 0 or 6
|
||||||
|
"""
|
||||||
|
|
||||||
|
remove_command = cc
|
||||||
|
cc = remove_command + tuple([self.user])
|
||||||
|
if self.dry:
|
||||||
|
self.printTuple(cc)
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
ret = subprocess.Popen(cc, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||||
|
ret.wait() # wait for cc
|
||||||
|
stdio, err_io = ret.communicate() # get stdout as well as stderr
|
||||||
|
if ret.returncode != 0 and ret.returncode != 6: # userdel returns 6 when no mail dir was found but success
|
||||||
|
raise lib.UserExceptions.UnknownReturnCode(
|
||||||
|
f"Could not delete user with command {cc}. Return code: {ret.returncode},"
|
||||||
|
f" stdout/stderr: {stdio + err_io}")
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
try:
|
||||||
|
S = System("dar", dryrun=True)
|
||||||
|
S.register()
|
||||||
|
S.lock_user_pw()
|
||||||
|
S.add_to_usergroup()
|
||||||
|
# if not S.make_ssh_usable("dar", "SSHpub"):
|
||||||
|
# print("Huh, error :shrug:")
|
||||||
|
exit(0)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
@ -0,0 +1,48 @@
|
|||||||
|
class General(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class User(General):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class UnknownUser(User):
|
||||||
|
def __init__(self, name):
|
||||||
|
Exception.__init__(self, f"Tried to perform action on unknown user '{name}'")
|
||||||
|
|
||||||
|
|
||||||
|
class UserExistsAlready(User):
|
||||||
|
def __init__(self, name):
|
||||||
|
Exception.__init__(self, f"User '{name}' is already registered")
|
||||||
|
|
||||||
|
|
||||||
|
class UnknownReturnCode(General):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class ModifyFilesystem(General):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class SSHDirUncreatable(ModifyFilesystem):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class SQLiteDatabaseDoesntExistYet(General):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class UsernameLength(User):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class UsernameTooShort(User):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class UsernameTooLong(User):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class UsernameInvalidCharacters(User):
|
||||||
|
pass
|
@ -0,0 +1,219 @@
|
|||||||
|
import csv
|
||||||
|
import pwd
|
||||||
|
import re
|
||||||
|
|
||||||
|
import lib.sqlitedb
|
||||||
|
|
||||||
|
|
||||||
|
def checkUsernameCharacters(username: str) -> bool:
|
||||||
|
""" Checks the Username for invalid characters. Allow only alphanumerical characters, a lower alpha one first,
|
||||||
|
Followed by any sequence of digits and characters
|
||||||
|
|
||||||
|
:param username: String to check for validity
|
||||||
|
:type username: str
|
||||||
|
:return: True when valid, False when not
|
||||||
|
:rtype: bool
|
||||||
|
"""
|
||||||
|
|
||||||
|
if " " not in username and "_" not in username and username.isascii() and username[:1].islower() and \
|
||||||
|
not username[0].isnumeric():
|
||||||
|
if not re.search(r"\W+", username):
|
||||||
|
if not re.search("[^a-zA-Z0-9]", username):
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def checkUsernameLength(username: str, upper_limit: int = 16, lower_limit: int = 3) -> bool:
|
||||||
|
""" Checks username for an upper and lower bounds limit character count
|
||||||
|
|
||||||
|
:param username: Username to check
|
||||||
|
:type username: str
|
||||||
|
:param upper_limit: Upper limit bounds to check for(default is 16)
|
||||||
|
:type upper_limit: int
|
||||||
|
:param lower_limit: Lower limit bounds to check for(default is 3)
|
||||||
|
:return: True, when all bounds are in, False when one or both aren't.
|
||||||
|
:rtype: bool
|
||||||
|
"""
|
||||||
|
|
||||||
|
if len(username) > upper_limit:
|
||||||
|
return False
|
||||||
|
if len(username) < lower_limit:
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
def checkUserExists(username: str) -> bool:
|
||||||
|
""" Checks if the User exists on the **SYSTEM** by calling PWD on it.
|
||||||
|
**Note**: You might want to use this in conjunction with checkUserInDB
|
||||||
|
|
||||||
|
:param username:
|
||||||
|
:type username: str
|
||||||
|
:return: True when exists, False when not
|
||||||
|
:rtype: bool
|
||||||
|
"""
|
||||||
|
|
||||||
|
try:
|
||||||
|
pwd.getpwnam(username)
|
||||||
|
except KeyError:
|
||||||
|
return False
|
||||||
|
return True # User already exists
|
||||||
|
|
||||||
|
|
||||||
|
def checkUserInDB(username: str, db: str) -> bool:
|
||||||
|
""" Checks users existence in the **DATABASE**.
|
||||||
|
:Note: You might want to use this in conjunction with `checkUserExists`
|
||||||
|
|
||||||
|
:param username: Username to check existence in database
|
||||||
|
:type username: str
|
||||||
|
:param db: Path to database to check in
|
||||||
|
:type db: str
|
||||||
|
:return: True, when User exists, False when not
|
||||||
|
"""
|
||||||
|
|
||||||
|
try:
|
||||||
|
ldb = lib.sqlitedb.SQLiteDB(db)
|
||||||
|
fetched = ldb.safequery("SELECT * FROM 'applications' WHERE username = ?", tuple([username]))
|
||||||
|
if fetched:
|
||||||
|
return True
|
||||||
|
except lib.sqlitedb.sqlite3.Error as e:
|
||||||
|
print(f"SQLite Exception: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def checkSSHKey(key: str) -> bool:
|
||||||
|
""" Checks SSH Key for meta-data that we accept.
|
||||||
|
:Note: We currently only allow ssh keys without options but with a mail address at the end in b64 encoded.
|
||||||
|
The currently supported algorithms are: ecdfsa-sha2-nistp256, 'ecdsa-sha2-nistp384', 'ecdsa-sha2-nistp521',
|
||||||
|
'ssh-rsa', 'ssh-dss' and 'ssh-ed25519'
|
||||||
|
|
||||||
|
:param key: Key to check
|
||||||
|
:return: True, when Key is valid, False when not
|
||||||
|
:rtype: bool
|
||||||
|
"""
|
||||||
|
|
||||||
|
# taken from https://github.com/hashbang/provisor/blob/master/provisor/utils.py, all belongs to them! ;)
|
||||||
|
import base64
|
||||||
|
if len(key) > 8192 or len(key) < 80:
|
||||||
|
return False
|
||||||
|
|
||||||
|
key = key.replace("\"", "").replace("'", "").replace("\\\"", "")
|
||||||
|
key = key.split(' ')
|
||||||
|
types = ['ecdsa-sha2-nistp256', 'ecdsa-sha2-nistp384',
|
||||||
|
'ecdsa-sha2-nistp521', 'ssh-rsa', 'ssh-dss', 'ssh-ed25519']
|
||||||
|
if key[0] not in types:
|
||||||
|
return False
|
||||||
|
try:
|
||||||
|
base64.decodebytes(bytes(key[1], "utf-8"))
|
||||||
|
except TypeError:
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
def checkEmail(mail: str) -> bool:
|
||||||
|
""" Checks Mail against a relatively simple REgex Pattern.
|
||||||
|
|
||||||
|
:param mail: Mail to check
|
||||||
|
:type mail: str
|
||||||
|
:return: False, when the Mail is invalid, True when valid.
|
||||||
|
:rtype: bool
|
||||||
|
"""
|
||||||
|
|
||||||
|
if not re.match("(^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$)", mail):
|
||||||
|
return False
|
||||||
|
else:
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
def checkDatetimeFormat(datetime_str: str) -> bool:
|
||||||
|
""" Checks a Strings format on date time.
|
||||||
|
|
||||||
|
:param datetime_str: String to check
|
||||||
|
:type datetime_str: str
|
||||||
|
:return: True when valid, False when not.
|
||||||
|
:rtype: bool
|
||||||
|
"""
|
||||||
|
|
||||||
|
import datetime
|
||||||
|
try:
|
||||||
|
datetime.datetime.strptime(datetime_str, "%Y-%m-%d %H:%M:%S")
|
||||||
|
except ValueError:
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
def checkName(name: str) -> bool:
|
||||||
|
""" Checks a users (real) Name against a real simple REgex Pattern.
|
||||||
|
|
||||||
|
:param name: Name/String to check
|
||||||
|
:type name: str
|
||||||
|
:return: True when valid, False when not.
|
||||||
|
:rtype: bool
|
||||||
|
"""
|
||||||
|
|
||||||
|
if not re.match("\w+\s*\w", name):
|
||||||
|
return False
|
||||||
|
else:
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
def checkImportFile(path: str, db: str):
|
||||||
|
""" Checks an CSV file against most of the validators and prints an Error message with the line number corresponding
|
||||||
|
to said failure.. Those includes: checkName, checkUsernameCharacters,
|
||||||
|
ckeckUsernameLength, duplicate usernames(in the CSV), checkSSHKey, checkEmail, checkUserExists, checkUserInDB,
|
||||||
|
checkDatetimeformat and if the status is 1 or 0.
|
||||||
|
|
||||||
|
:param path: Path to file to check
|
||||||
|
:type path: str
|
||||||
|
:param db: Path to database file(SQLite)
|
||||||
|
:type db: str
|
||||||
|
:return: Str when Failure, True when success(All tests passed)
|
||||||
|
:rtype: Str or None
|
||||||
|
"""
|
||||||
|
|
||||||
|
errstr = ""
|
||||||
|
valid = True
|
||||||
|
ln = 1 # line number
|
||||||
|
valid_names_list = []
|
||||||
|
with open(path, 'r', newline='') as f:
|
||||||
|
reader = csv.DictReader(f)
|
||||||
|
for row in reader:
|
||||||
|
# if any of this fails move on to the next user, just print a relatively helpful message lel
|
||||||
|
if not lib.Validator.checkName(row["name"]):
|
||||||
|
errstr += f"Line {ln}: Name: '{row['name']}' seems not legit. Character followed by character should" \
|
||||||
|
f" be correct.\n"
|
||||||
|
valid = False
|
||||||
|
if not lib.Validator.checkUsernameCharacters(row["username"]):
|
||||||
|
errstr += (f"Line {ln}: Username contains unsupported characters or starts with a number: '"
|
||||||
|
f"{row['username']}'.\n")
|
||||||
|
valid = False
|
||||||
|
if not lib.Validator.checkUsernameLength(row["username"]):
|
||||||
|
errstr += f"Line {ln}: Username '{row['username']}' is either too long(>16) or short(<3)\n"
|
||||||
|
valid = False
|
||||||
|
# dup checking
|
||||||
|
if row["username"] in valid_names_list:
|
||||||
|
errstr += f"Line {ln}: Duplicate Username {row['username']}!\n"
|
||||||
|
valid = False
|
||||||
|
else:
|
||||||
|
valid_names_list.append(row["username"])
|
||||||
|
# dup end
|
||||||
|
if not lib.Validator.checkSSHKey(row["pubkey"]):
|
||||||
|
errstr += f"Line {ln}: Following SSH-Key of user '{row['username']}' isn't valid: " \
|
||||||
|
f"'{row['pubkey']}'.\n"
|
||||||
|
valid = False
|
||||||
|
if not lib.Validator.checkEmail(row["email"]):
|
||||||
|
errstr += f"Line {ln}: E-Mail address of user '{row['username']}' '{row['email']}' is not valid.\n"
|
||||||
|
valid = False
|
||||||
|
if lib.Validator.checkUserExists(row["username"]) or checkUserInDB(row["username"], db):
|
||||||
|
errstr += f"Line {ln}: User '{row['username']}' already exists.\n"
|
||||||
|
valid = False
|
||||||
|
if not lib.Validator.checkDatetimeFormat(row["timestamp"]):
|
||||||
|
errstr += f"Line {ln}: Timestamp '{row['timestamp']}' from user '{row['username']}' is invalid.\n"
|
||||||
|
valid = False
|
||||||
|
if int(row["status"]) > 1 or int(row["status"]) < 0:
|
||||||
|
errstr += f"Line {ln}: Status '{row['status']}' MUST be either 0 or 1.\n"
|
||||||
|
valid = False
|
||||||
|
ln += 1
|
||||||
|
if valid:
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
return errstr
|
@ -0,0 +1,9 @@
|
|||||||
|
import os
|
||||||
|
|
||||||
|
cwd = os.environ.get('TILDE_CONF')
|
||||||
|
if cwd is None:
|
||||||
|
cwd = os.getcwd() + "/applicationsconfig.ini"
|
||||||
|
else:
|
||||||
|
if os.path.isfile(cwd) is False:
|
||||||
|
cwd = os.getcwd() + "/applicationsconfig.ini"
|
||||||
|
# cwd is now either cwd/applicationsconfig or $TILDE_CONF
|
@ -0,0 +1,164 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
import sqlite3
|
||||||
|
from sys import stderr as stderr
|
||||||
|
from typing import List # Typing support!
|
||||||
|
|
||||||
|
|
||||||
|
# create dictionary out of sqlite results
|
||||||
|
def dict_factory(cursor, row):
|
||||||
|
d: dict = {}
|
||||||
|
for idx, col in enumerate(cursor.description):
|
||||||
|
d[col[0]] = row[idx]
|
||||||
|
return d
|
||||||
|
|
||||||
|
|
||||||
|
class SQLiteDB:
|
||||||
|
"""SQLitedb handles EVERYTHING directly related to our Database."""
|
||||||
|
|
||||||
|
db = ""
|
||||||
|
cursor = None
|
||||||
|
connection = None
|
||||||
|
last_result = None
|
||||||
|
|
||||||
|
def __init__(self, dbpath: str):
|
||||||
|
"""
|
||||||
|
:param dbpath: Path to the database we want to open
|
||||||
|
:type dbpath: str
|
||||||
|
:returns: Object for the SQLitedb-Class.
|
||||||
|
:rtype: object
|
||||||
|
"""
|
||||||
|
|
||||||
|
db = dbpath
|
||||||
|
try:
|
||||||
|
self.connection = sqlite3.connect(db)
|
||||||
|
self.cursor = self.connection.cursor()
|
||||||
|
except sqlite3.Error as e:
|
||||||
|
print("Connection error: %s" % e, file=stderr)
|
||||||
|
|
||||||
|
self.cursor.row_factory = sqlite3.Row # every result will be a dict now
|
||||||
|
|
||||||
|
def __del__(self):
|
||||||
|
try:
|
||||||
|
self.connection.commit()
|
||||||
|
self.connection.close()
|
||||||
|
except sqlite3.Error as e:
|
||||||
|
print("Couldn't gracefully close db: %s" % e, file=stderr)
|
||||||
|
|
||||||
|
def query(self, qq: str) -> List[sqlite3.Row]:
|
||||||
|
"""Do a query and automagically get the fetched results in a list
|
||||||
|
:param qq: Query to execute
|
||||||
|
:type qq: str
|
||||||
|
:returns: A tuple(/list) consisting with any fetched results
|
||||||
|
:rtype: list
|
||||||
|
"""
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.cursor.execute(qq)
|
||||||
|
self.last_result = self.cursor.fetchall()
|
||||||
|
self.connection.commit()
|
||||||
|
except sqlite3.OperationalError:
|
||||||
|
self._createTable()
|
||||||
|
return self.query(qq)
|
||||||
|
except sqlite3.Error as e:
|
||||||
|
print("Couldn't execute query %s, exception: %s" % (qq, e), file=stderr)
|
||||||
|
self.last_result = []
|
||||||
|
return self.last_result
|
||||||
|
|
||||||
|
# sometimes we need the cursor for safety reasons, for example does sqlite3 all the security related
|
||||||
|
# escaoing in supplied strings for us, when we deliver it to con.execute in the second argument as a tuple
|
||||||
|
def getCursor(self) -> sqlite3:
|
||||||
|
"""Returns SQLite3 Cursor. Use with **c a u t i o n**... """
|
||||||
|
return self.cursor
|
||||||
|
|
||||||
|
# we could try to utilise that ourselfs in a function. Be c a r e f u l, these values in the tuple MUST HAVE
|
||||||
|
# THE RIGHT TYPE
|
||||||
|
def safequery(self, qq: str, deliver: tuple) -> List[sqlite3.Row]:
|
||||||
|
""" Shall handle any query that has user input in it as an alternative to self.query
|
||||||
|
:param qq: Query to execute
|
||||||
|
:type qq: str
|
||||||
|
:param deliver: User inputs marked with the placeholder(`?`) in the str
|
||||||
|
:type deliver: tuple
|
||||||
|
:returns: A tuple(/list) consisting with any fetched results
|
||||||
|
:rtype: List[sqlite3.Row]
|
||||||
|
"""
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.cursor.execute(qq, deliver)
|
||||||
|
self.last_result = self.cursor.fetchall()
|
||||||
|
self.connection.commit()
|
||||||
|
except TypeError as e:
|
||||||
|
print("Types in given tuple doesnt match to execute query \"%s\": %s" % (qq, e), file=stderr)
|
||||||
|
self.last_result = []
|
||||||
|
except sqlite3.OperationalError:
|
||||||
|
self._createTable()
|
||||||
|
return self.safequery(qq, deliver)
|
||||||
|
except sqlite3.Error as e:
|
||||||
|
print("Couldn't execute query %s, exception: %s" % (qq, e), file=stderr)
|
||||||
|
print(deliver)
|
||||||
|
print(type(e))
|
||||||
|
self.last_result = []
|
||||||
|
return self.last_result
|
||||||
|
|
||||||
|
def removeApplicantFromDB(self, userid: int) -> bool:
|
||||||
|
"""Removes Applicants from the DB by ID. Use along System.removeUser()
|
||||||
|
:param userid: User ID to remove from the Database
|
||||||
|
:type userid: int
|
||||||
|
:returns: True, if removal was successful(from the DB), False when not
|
||||||
|
:rtype: bool
|
||||||
|
"""
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.last_result = self.cursor.execute("DELETE FROM `applications` WHERE id = ? ", [userid])
|
||||||
|
self.connection.commit()
|
||||||
|
except sqlite3.OperationalError:
|
||||||
|
print("The database has probably not yet seen any users, so it didnt create your table yet. Come back"
|
||||||
|
"when a user tried to register")
|
||||||
|
return False
|
||||||
|
except sqlite3.Error as e:
|
||||||
|
print(f"Could not delete user with id: {userid}, exception in DB: {e}") # @TODO LOGGING FFS
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
def removeApplicantFromDBperUsername(self, username: str) -> bool:
|
||||||
|
"""Removes Applicants from the DB by Username. Use along System.removeUser()
|
||||||
|
:param username: Username to remove from the database
|
||||||
|
:type username: str
|
||||||
|
:returns: True, if removal was successful(from the DB), False when not
|
||||||
|
:rtype: bool
|
||||||
|
"""
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.last_result = self.cursor.execute("DELETE FROM `applications` WHERE username = ?", [username])
|
||||||
|
self.connection.commit()
|
||||||
|
except sqlite3.OperationalError:
|
||||||
|
print("The database has probably not yet seen any users, so it didnt create your table yet. Come back"
|
||||||
|
"when a user tried to register")
|
||||||
|
return False
|
||||||
|
except sqlite3.Error as e:
|
||||||
|
print(f"Could not delete user {username}, exception in DB: {e}") # @TODO LOGGING
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
def _createTable(self) -> None:
|
||||||
|
try:
|
||||||
|
self.cursor.execute(
|
||||||
|
"CREATE TABLE IF NOT EXISTS applications("
|
||||||
|
"id INTEGER PRIMARY KEY AUTOINCREMENT,"
|
||||||
|
"username TEXT NOT NULL, email TEXT NOT NULL,"
|
||||||
|
"name TEXT NOT NULL, pubkey TEXT NOT NULL,"
|
||||||
|
"timestamp DATETIME DEFAULT CURRENT_TIMESTAMP CONSTRAINT "
|
||||||
|
"timestamp_valid CHECK( timestamp IS strftime('%Y-%m-%d %H:%M:%S', timestamp))"
|
||||||
|
",status INTEGER NOT NULL DEFAULT 0);")
|
||||||
|
self.connection.commit()
|
||||||
|
except sqlite3.Error as e:
|
||||||
|
print(f"The database probably doesn't exist yet, but read the message: {e}")
|
||||||
|
print("The database table didn't exist yet; created it successfully!")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
try:
|
||||||
|
SQLiteDB("bla.db")
|
||||||
|
print("hi")
|
||||||
|
exit(0)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
@ -0,0 +1,6 @@
|
|||||||
|
import argparse
|
||||||
|
import lib.cwd
|
||||||
|
|
||||||
|
argparser = argparse.ArgumentParser(description='Tilde administration tools ', conflict_handler="resolve")
|
||||||
|
argparser.add_argument('-c', '--config', default=lib.cwd.cwd,
|
||||||
|
type=str, help='Path to configuration file', required=False)
|
@ -0,0 +1,11 @@
|
|||||||
|
import lib.uis.config_ui
|
||||||
|
|
||||||
|
argparser = lib.uis.config_ui.argparser
|
||||||
|
|
||||||
|
# store_true just stores true when the command is supplied, so it doesn't need choices nor types
|
||||||
|
argparser.add_argument('-u', '--unapproved', default=False, action="store_true",
|
||||||
|
help='only unapproved users. Default is only approved.', required=False)
|
||||||
|
argparser.add_argument('-a', '--approved', default=False, action="store_true",
|
||||||
|
help="Only approved Users.", required=False)
|
||||||
|
argparser.add_argument('-f', '--file', default="stdout",
|
||||||
|
type=str, help='write to file instead of stdout', required=False)
|
@ -0,0 +1,5 @@
|
|||||||
|
unalias dev_build
|
||||||
|
unalias dev_run
|
||||||
|
unalias dev_bash
|
||||||
|
unalias dev_stop
|
||||||
|
unalias dev_disable
|
@ -0,0 +1,5 @@
|
|||||||
|
alias dev_run="docker container run -l dev-ssh-reg --name dev-ssh-reg --rm -itd -v $PWD/private/:/app/admin ssh-reg"
|
||||||
|
alias dev_stop="docker container stop dev-ssh-reg"
|
||||||
|
alias dev_build="docker build -t ssh-reg:latest --force-rm ."
|
||||||
|
alias dev_bash="docker container exec -it dev-ssh-reg bash -c 'cd /app/admin; exec bash --login -i'"
|
||||||
|
alias dev_disable="source private/scripts/disable.sh"
|
Loading…
Reference in New Issue