Compare commits

..

No commits in common. 'dev' and 'master' have entirely different histories.
dev ... master

@ -1,26 +1,20 @@
#!/usr/bin/env python3
""" This module is thought to be the main point to export and import users.
It's actually not really a module but a script ought to be run from the command line
@TODO: Wording of module header...
"""
import configparser
import csv
import io
from lib.ListUsers import ListUsers
import ListUsers
import lib.uis.default as default_cmd # Follows -u, -a, -f flags
class Backup:
"""Backups a Tilde database to an CSV file
@TODO: Move class into own file
:Example:
>>> from backup import Backup
>>> from Backup import Backup
>>> from ListUsers import ListUsers
>>> L = ListUsers.list_users("/path/to/sqlite").get_fetch()
>>> L = ListUsers.ListUsers("/path/to/sqlite").get_fetch()
>>> backup_db = Backup("stdout")
>>> backup_db.backup_to_file(L)
CSV-Separated list with headers in first row
@ -43,13 +37,12 @@ class Backup:
:type dialect: str
"""
self.set_filename(output)
self.set_quoting(quoting)
self.set_dialect(dialect)
self.set_field_names(tuple(['id', 'username', 'email', 'name',
'pubkey', 'timestamp', 'status']))
self.setFilename(output)
self.setQuoting(quoting)
self.setDialect(dialect)
self.setFieldnames(tuple(['id', 'username', 'email', 'name', 'pubkey', 'timestamp', 'status']))
def set_dialect(self, dialect: str) -> None:
def setDialect(self, dialect: str) -> None:
""" Set dialect for Object
:param dialect: Dialect to set for Object
@ -60,7 +53,7 @@ class Backup:
self.dialect = dialect
def set_quoting(self, quoting: int) -> None:
def setQuoting(self, quoting: int) -> None:
""" Set quoting in the CSV(must be supported by the CSV Module!)
:param quoting: Quoting Integer given by csv.QUOTE_* constants
@ -71,10 +64,10 @@ class Backup:
self.quoting = quoting
def set_filename(self, filename: str) -> None:
""" Sets Filename to OUTPUT to
def setFilename(self, filename: str) -> None:
""" Sets Filename to output to
:param filename: Filename to OUTPUT to(set stdout for stdout)
:param filename: Filename to output to(set stdout for stdout)
:type filename: str
:return: None
:rtype: None
@ -82,8 +75,8 @@ class Backup:
self.filename = filename
def set_field_names(self, f_names: tuple) -> None:
""" Set field name to process
def setFieldnames(self, f_names: tuple) -> None:
""" Set fieldname to process
:param f_names: Fieldnames-Tuple
:type f_names: tuple
@ -102,22 +95,20 @@ class Backup:
"""
returner = io.StringIO()
write_csv = csv.DictWriter(returner, fieldnames=self.field_names,
quoting=self.quoting, dialect=self.dialect)
write_csv = csv.DictWriter(returner, fieldnames=self.field_names, quoting=self.quoting, dialect=self.dialect)
write_csv.writeheader()
for row in fetched:
write_csv.writerow(dict(row))
# sqlite3.Row doesn't "easily" convert to a dict itself sadly,
# so just a quick help from us here
# it actually even delivers a list(sqlite3.Row) also,
# which doesnt make the life a whole lot easier
# sqlite3.Row doesn't "easily" convert to a dict itself sadly, so just a quick help from us here
# it actually even delivers a list(sqlite3.Row) also, which doesnt make the life a whole lot easier
if self.filename == "stdout":
print(returner.getvalue())
return True
else:
with open(self.filename, "w") as f:
print(returner.getvalue(), file=f)
return True
return True
if __name__ == "__main__":
@ -125,12 +116,12 @@ if __name__ == "__main__":
args = default_cmd.argparser.parse_args()
config = configparser.ConfigParser()
config.read(args.config)
L = ListUsers(config['DEFAULT']['applications_db'],
unapproved=args.unapproved, approved=args.approved)
L = ListUsers.ListUsers(config['DEFAULT']['applications_db'],
unapproved=args.unapproved, approved=args.approved)
fetch = L.get_fetch()
if fetch:
B = Backup(args.file)
B.set_field_names(fetch[0].keys()) # sqlite3.row delivers its keys for us! SO NICE!
B.setFieldnames(fetch[0].keys()) # sqlite3.row delivers its keys for us! SO NICE!
B.backup_to_file(fetch)
else:
print("nothing to backup!")

@ -1,17 +1,14 @@
#!/usr/bin/env python3
"""
Import users or sanitize backup files
"""
import configparser
import csv
import os
import lib.UserExceptions
import lib.uis.config_ui # dont go to default, just following -c flag
import lib.Validator
def import_from_file(file_path: str, database_file: str, user_ids: tuple = tuple([])) -> bool:
def import_from_file(file_path: str, db: str, user_ids: tuple = tuple([])) -> bool:
""" Imports Users from a given CSV-file to the system and DB
:param file_path:
@ -26,21 +23,22 @@ def import_from_file(file_path: str, database_file: str, user_ids: tuple = tuple
if not os.path.isfile(file_path):
print(f"File {file_path} don't exist")
return False
if not os.path.isfile(database_file):
print(f"The database file {database_file} don't exist")
if not os.path.isfile(db):
print(f"The database file {db} don't exist")
return False
if user_ids:
pass # empty tuple means everything
# noinspection PyBroadException
try:
with open(file_path, 'r', newline='') as f:
import lib.sqlitedb
import lib.System
sql = lib.sqlitedb.SQLiteDB(database_file)
err = lib.Validator.checkImportFile(file_path, database_file)
import lib.Validator
sql = lib.sqlitedb.SQLiteDB(db)
err = lib.Validator.checkImportFile(file_path, db)
if err is not True:
print(err)
exit(0)
import lib.sqlitedb
import lib.System
sys_ctl = lib.System.System("root")
reader = csv.DictReader(f) # @TODO csv.Sniffer to compare? When yes, give force-accept option
for row in reader:
@ -49,19 +47,20 @@ def import_from_file(file_path: str, database_file: str, user_ids: tuple = tuple
sys_ctl.setUser(row["username"])
sys_ctl.aio_approve(row["pubkey"])
print(row['username'], "====> Registered.")
except lib.UserExceptions.General as general_except:
print(f"Something didnt work out! {general_except}")
except lib.UserExceptions.General as GeneralExcept:
print(f"Something didnt work out! {GeneralExcept}")
elif row["status"] == "0":
print(row['username'] + " not approved, therefore not registered.")
try:
sql.safe_query(
sql.safequery(
"INSERT INTO `applications` (username, name, timestamp, email, pubkey, status) "
"VALUES (?,?,?,?,?,?)", tuple([row["username"], row["name"], row["timestamp"],
row["email"], row["pubkey"], row["status"]]))
except OSError as os_exception:
print(f"UUFFF, something went WRONG with the file {file_path}: {os_exception}")
except Exception as didnt_catch:
print(f"Exception! UNCATCHED! {type(didnt_catch)}: {didnt_catch}")
except OSError as E:
pass
print(f"UUFFF, something went WRONG with the file {file_path}: {E}")
except Exception as didntCatch:
print(f"Exception! UNCATCHED! {type(didntCatch)}: {didntCatch}")
return True
@ -72,24 +71,17 @@ if __name__ == "__main__":
ArgParser.add_argument('-f', '--file', default="stdout",
type=str, help='Import from CSV file', required=True)
ArgParser.add_argument('--Import', default=False, action="store_true",
help="Import Users. If not set, just sanitize the supplied csv", required=False)
help="Import Users.", required=True)
args = ArgParser.parse_args()
config = configparser.ConfigParser()
config.read(args.config)
if not args.Import:
print("Error, need the import flag")
if not args.file:
print("Error, need the import file")
if not args.Import:
# we assume that you just want to sanitize the backup
if not os.path.isfile(args.file):
print(f"File {args.file} doesnt exist")
if not args.file:
print("You MUST set a CSV-file with the -f/--file flag that already exist")
exit(1)
sanitized = lib.Validator.checkImportFile(args.file, config['DEFAULT']['applications_db'], False)
if sanitized is not True:
print(sanitized)
else:
print(f"{args.file} is valid!")
exit(0)
elif args.Import:
import_from_file(args.file, config['DEFAULT']['applications_db'])
import_from_file(args.file, config['DEFAULT']['applications_db'])
exit(0)

@ -1,41 +1,120 @@
#!/usr/bin/env python3
import configparser
import sqlite3 # sqlite3.Row-Object
from typing import List # Typing support!
import lib.uis.default as default_cmd # Follows -u, -a, -f flags
from lib.ListUsers import ListUsers
from lib.sqlitedb import SQLiteDB
class ListUsers:
db = None
usersFetch = None
def __init__(self, db: str, unapproved: bool = False, approved: bool = True, single_user: str = None):
"""Constructs ListUsers
:param db: Database to access
:type db: str
:param unapproved: only List unapproved users
:type unapproved: bool
:param approved: only list approved users
:type approved: bool
"""
self.db = SQLiteDB(db)
if unapproved: # only unapproved users
query = "SELECT * FROM `applications` WHERE `status` = '0'"
elif approved: # Approved users
query = "SELECT * FROM `applications` WHERE `status` = '1'"
else: # All users
query = "SELECT * FROM `applications`"
self.usersFetch = self.db.query(query)
if single_user is not None:
query = "SELECT * FROM `applications` WHERE `username` = ?"
self.usersFetch = self.db.safequery(query, tuple([single_user]))
def output_as_list(self) -> str:
"""Generates a string with one (approved) user per line and one newline at the end
:rtype: str
:return: String consisting with one(activated) user per line
"""
list_str: str = ""
query = "SELECT `username` FROM `applications` WHERE `status` = '1' ORDER BY timestamp ASC"
self.usersFetch = self.db.query(query)
for users in self.usersFetch:
list_str += users["username"] + "\n"
return list_str
def prettyPrint(self) -> None:
pass # see below why not implemented yet, texttable...
def get_fetch(self) -> List[sqlite3.Row]:
""" Returns a complete fetch done by the lib.sqlitedb-class
:return: Complete fetchall(). A List[sqlite3.Row] with dict-emulation objects.
:rtype: List[sqlite3.Row]
"""
return self.usersFetch
# @TODO MAYBE best solution: https://pypi.org/project/texttable/
# examle:
"""
from texttable import Texttable
t = Texttable()
t.add_rows([['Name', 'Age'], ['Alice', 24], ['Bob', 19]])
print(t.draw())
---------------> Results in:
+-------+-----+
| Name | Age |
+=======+=====+
| Alice | 24 |
+-------+-----+
| Bob | 19 |
+-------+-----+
for user in fetch:
print("ID: {}; Username: \"{}\"; Mail: {}; Name: \"{}\"; Registered: {}; Status: {}".format(
user["id"], user["username"], user["email"], user["name"], user["timestamp"], user["status"]
))
"""
if __name__ == "__main__":
default_cmd.argparser.description += " - Lists Users from the Tilde database."
default_cmd.argparser.add_argument('--list-asc', default=False, action="store_true",
help='Output a newline seperated list of users', required=False, dest="args_asc")
default_cmd.argparser.add_argument('--single_user', default=None, type=str,
help="Just show a specific single_user by it's name", required=False)
default_cmd.argparser.add_argument('--user', default=None, type=str,
help="Just show a specific user by it's name", required=False)
args = default_cmd.argparser.parse_args()
config = configparser.ConfigParser()
config.read(args.config)
OUTPUT = ""
if args.single_user is not None:
ret = ""
if args.user is not None:
L = ListUsers(config['DEFAULT']['applications_db'], unapproved=args.unapproved, approved=args.approved,
single_user=args.single_user)
single_user=args.user)
else:
L = ListUsers(config['DEFAULT']['applications_db'], unapproved=args.unapproved, approved=args.approved)
if args.args_asc:
OUTPUT = L.output_as_list()
ret = L.output_as_list()
else:
users = L.get_fetch()
OUTPUT += "ID %-1s| Username %-5s| Mail %-20s| Name %-17s| Registered %-8s | State |\n" % (
fetch = L.get_fetch()
ret += "ID %-1s| Username %-5s| Mail %-20s| Name %-17s| Registered %-8s | State |\n" % (
" ", " ", " ", " ", " "
)
OUTPUT += 102 * "-" + "\n"
for single_user in users:
OUTPUT += "%-4i| %-14s| %-25s| %-22s| %-8s | %-5i |\n" % (
single_user["id"], single_user["username"], single_user["email"],
single_user["name"], single_user["timestamp"], single_user["status"]
ret += 102 * "-" + "\n"
for user in fetch:
ret += "%-4i| %-14s| %-25s| %-22s| %-8s | %-5i |\n" % (
user["id"], user["username"], user["email"], user["name"], user["timestamp"], user["status"]
)
if args.file != "stdout":
with open(args.file, 'w') as f:
print(OUTPUT, file=f)
print(ret, file=f)
else:
print(OUTPUT)
print(ret)
exit(0)

@ -50,7 +50,7 @@ if __name__ == "__main__":
if not DB:
print("Could not establish connection to database")
exit(1)
CurrentUser = DB.safe_query("SELECT * FROM `applications` WHERE `username`=?", tuple([args.user]))[0]
CurrentUser = DB.safequery("SELECT * FROM `applications` WHERE `username`=?", tuple([args.user]))[0]
# --> --remove
if args.remove:
@ -75,9 +75,9 @@ if __name__ == "__main__":
print(f"Pubkey '{args.sshpubkey}' isn't valid.")
exit(1)
try:
DB.safe_query("UPDATE `applications` SET `pubkey`=? WHERE `username`=?",
tuple([args.sshpubkey, args.user]))
CurrentUser = DB.safe_query("SELECT * FROM `applications` WHERE `username` = ? ", tuple([args.user]))[0]
DB.safequery("UPDATE `applications` SET `pubkey`=? WHERE `username`=?",
tuple([args.sshpubkey, args.user]))
CurrentUser = DB.safequery("SELECT * FROM `applications` WHERE `username` = ? ", tuple([args.user]))[0]
if int(CurrentUser["status"]) == 1:
sys_ctl.make_ssh_usable(args.sshpubkey)
except sqlite3.Error as e:
@ -93,7 +93,7 @@ if __name__ == "__main__":
print(f"'{args.name}' is not a valid Name.")
exit(1)
try:
DB.safe_query("UPDATE `applications` SET `name` =? WHERE `username` =?", tuple([args.name, args.user]))
DB.safequery("UPDATE `applications` SET `name` =? WHERE `username` =?", tuple([args.name, args.user]))
except sqlite3.Error as e:
print(f"Could not write '{args.name}' to database: {e}")
print(f"'{args.user}'s Name changed to '{args.name}'.")
@ -104,7 +104,7 @@ if __name__ == "__main__":
print(f"'{args.email}' is not a valid Mail address!")
exit(1)
try:
DB.safe_query("UPDATE `applications` SET `email` =? WHERE `username` =?", tuple([args.email]))
DB.safequery("UPDATE `applications` SET `email` =? WHERE `username` =?", tuple([args.email]))
except sqlite3.Error as e:
print(f"Could not write '{args.email}' to the database. {e}")
print(f"'{args.user}' Mail changed to '{args.email}'.")
@ -121,8 +121,8 @@ if __name__ == "__main__":
if args.status == 0 and int(CurrentUser["status"]) == 1:
try:
DB.safe_query("UPDATE `applications` SET `status` =? WHERE `id`=?",
tuple([args.status, CurrentUser["id"]]))
DB.safequery("UPDATE `applications` SET `status` =? WHERE `id`=?",
tuple([args.status, CurrentUser["id"]]))
sys_ctl.remove_user()
except sqlite3.Error as e:
print(f"Could not update database entry for '{args.user}', did not touch the system")
@ -134,8 +134,8 @@ if __name__ == "__main__":
if args.status == 1 and int(CurrentUser["status"]) == 0:
try:
DB.safe_query("UPDATE `applications` SET `status`=? WHERE `username`=?",
tuple([args.status, args.user]))
DB.safequery("UPDATE `applications` SET `status`=? WHERE `username`=?",
tuple([args.status, args.user]))
sys_ctl.aio_approve(CurrentUser["pubkey"])
except sqlite3.Error as e:
print(f"Could not update Users status in database")

@ -1,65 +0,0 @@
from lib.sqlitedb import SQLiteDB
import sqlite3 # sqlite3.Row-Object
from typing import List # Typing support!
class ListUsers:
"""
List tilde users
"""
db = None
users_fetch = None
def __init__(self, db: str, unapproved: bool = False, approved: bool = True, single_user: str = None):
"""Constructs list_users
:param db: Database to access
:type db: str
:param unapproved: only List unapproved users
:type unapproved: bool
:param approved: only list approved users
:type approved: bool
"""
self.db = SQLiteDB(db)
if unapproved: # only unapproved users
query = "SELECT * FROM `applications` WHERE `status` = '0'"
elif approved: # Approved users
query = "SELECT * FROM `applications` WHERE `status` = '1'"
else: # All users
query = "SELECT * FROM `applications`"
self.users_fetch = self.db.query(query)
if single_user is not None:
query = "SELECT * FROM `applications` WHERE `username` = ?"
self.users_fetch = self.db.safe_query(query, tuple([single_user]))
def output_as_list(self) -> str:
"""Generates a string with one (approved) single_user per line and one newline at the end
:rtype: str
:return: String consisting with one(activated) single_user per line
"""
list_str: str = ""
query = "SELECT `username` FROM `applications` WHERE `status` = '1' ORDER BY timestamp ASC"
self.users_fetch = self.db.query(query)
for user in self.users_fetch:
list_str += user["username"] + "\n"
return list_str
def pretty_print(self) -> None:
"""
pretty-print users
:return: None
"""
pass # see below why not implemented yet, texttable...
def get_fetch(self) -> List[sqlite3.Row]:
""" Returns a complete users done by the lib.sqlitedb-class
:return: Complete fetchall(). A List[sqlite3.Row] with dict-emulation objects.
:rtype: List[sqlite3.Row]
"""
return self.users_fetch

@ -72,7 +72,7 @@ def checkUserInDB(username: str, db: str) -> bool:
try:
ldb = lib.sqlitedb.SQLiteDB(db)
fetched = ldb.safe_query("SELECT * FROM 'applications' WHERE username = ?", tuple([username]))
fetched = ldb.safequery("SELECT * FROM 'applications' WHERE username = ?", tuple([username]))
if fetched:
return True
except lib.sqlitedb.sqlite3.Error as e:
@ -156,7 +156,7 @@ def checkName(name: str) -> bool:
return True
def checkImportFile(path: str, db: str, test_existence: bool = True):
def checkImportFile(path: str, db: str):
""" Checks an CSV file against most of the validators and prints an Error message with the line number corresponding
to said failure.. Those includes: checkName, checkUsernameCharacters,
ckeckUsernameLength, duplicate usernames(in the CSV), checkSSHKey, checkEmail, checkUserExists, checkUserInDB,
@ -166,64 +166,54 @@ def checkImportFile(path: str, db: str, test_existence: bool = True):
:type path: str
:param db: Path to database file(SQLite)
:type db: str
:param test_existence: Flag, checking users existence while true, won't when set to false. Default's to true.
:type test_existence: bool
:return: Str when Failure, True when success(All tests passed)
:rtype: Str or None
"""
err_str = ""
errstr = ""
valid = True
line = 1 # line number
ln = 1 # line number
valid_names_list = []
with open(path, 'r', newline='') as file_handle:
reader = csv.DictReader(file_handle)
with open(path, 'r', newline='') as f:
reader = csv.DictReader(f)
for row in reader:
# if any of this fails move on to the next user, just print a relatively helpful message lel
if not lib.Validator.checkName(row["name"]):
err_str += f"Line {line}: Name: '{row['name']}' seems not legit. " \
f"Character followed by character should be correct.\n"
errstr += f"Line {ln}: Name: '{row['name']}' seems not legit. Character followed by character should" \
f" be correct.\n"
valid = False
if not lib.Validator.checkUsernameCharacters(row["username"]):
err_str += (f"Line {line}: "
f"Username contains unsupported characters or starts with a number: '"
f"{row['username']}'.\n")
errstr += (f"Line {ln}: Username contains unsupported characters or starts with a number: '"
f"{row['username']}'.\n")
valid = False
if not lib.Validator.checkUsernameLength(row["username"]):
err_str += f"Line {line}: " \
f"Username '{row['username']}' is either too long(>16) or short(<3)\n"
errstr += f"Line {ln}: Username '{row['username']}' is either too long(>16) or short(<3)\n"
valid = False
# dup checking
if row["username"] in valid_names_list:
err_str += f"Line {line}: Duplicate Username {row['username']}!\n"
errstr += f"Line {ln}: Duplicate Username {row['username']}!\n"
valid = False
else:
valid_names_list.append(row["username"])
# dup end
if not lib.Validator.checkSSHKey(row["pubkey"]):
err_str += f"Line {line}: " \
f"Following SSH-Key of user '{row['username']}' isn't valid: " \
f"'{row['pubkey']}'.\n"
errstr += f"Line {ln}: Following SSH-Key of user '{row['username']}' isn't valid: " \
f"'{row['pubkey']}'.\n"
valid = False
if not lib.Validator.checkEmail(row["email"]):
err_str += \
f"Line {line}: " \
f"E-Mail address of user '{row['username']}' '{row['email']}' is not valid.\n"
errstr += f"Line {ln}: E-Mail address of user '{row['username']}' '{row['email']}' is not valid.\n"
valid = False
if lib.Validator.checkUserExists(row["username"]) or checkUserInDB(row["username"], db):
if test_existence:
err_str += f"Line {line}: User '{row['username']}' already exists.\n"
valid = False
else:
pass
errstr += f"Line {ln}: User '{row['username']}' already exists.\n"
valid = False
if not lib.Validator.checkDatetimeFormat(row["timestamp"]):
err_str += f"Line {line}: Timestamp '{row['timestamp']}' " \
f"from user '{row['username']}' is invalid.\n"
errstr += f"Line {ln}: Timestamp '{row['timestamp']}' from user '{row['username']}' is invalid.\n"
valid = False
if int(row["status"]) > 1 or int(row["status"]) < 0:
err_str += f"Line {line}: Status '{row['status']}' MUST be either 0 or 1.\n"
errstr += f"Line {ln}: Status '{row['status']}' MUST be either 0 or 1.\n"
valid = False
line += 1
ln += 1
if valid:
err_str = True
return err_str
return True
else:
return errstr

@ -1,8 +1,4 @@
#!/usr/bin/env python3
"""
SQLite wrapper which does just some simple wraps, to ease our experience a little.
"""
import sqlite3
from sys import stderr as stderr
from typing import List # Typing support!
@ -24,18 +20,20 @@ class SQLiteDB:
connection = None
last_result = None
def __init__(self, db_path: str):
def __init__(self, dbpath: str):
"""
:param db_path: Path to the database we want to open
:type db_path: str
:param dbpath: Path to the database we want to open
:type dbpath: str
:returns: Object for the SQLitedb-Class.
:rtype: object
"""
db = dbpath
try:
self.connection = sqlite3.connect(db_path)
self.connection = sqlite3.connect(db)
self.cursor = self.connection.cursor()
except sqlite3.Error as sql_con:
print("Connection error: %s" % sql_con, file=stderr)
except sqlite3.Error as e:
print("Connection error: %s" % e, file=stderr)
self.cursor.row_factory = sqlite3.Row # every result will be a dict now
@ -46,39 +44,38 @@ class SQLiteDB:
except sqlite3.Error as e:
print("Couldn't gracefully close db: %s" % e, file=stderr)
def query(self, q_str: str) -> List[sqlite3.Row]:
def query(self, qq: str) -> List[sqlite3.Row]:
"""Do a query and automagically get the fetched results in a list
:param q_str: Query to execute
:type q_str: str
:param qq: Query to execute
:type qq: str
:returns: A tuple(/list) consisting with any fetched results
:rtype: list
"""
try:
self.cursor.execute(q_str)
self.cursor.execute(qq)
self.last_result = self.cursor.fetchall()
self.connection.commit()
except sqlite3.OperationalError:
self._createTable()
return self.query(q_str)
except sqlite3.Error as sql_query_except:
print("Couldn't execute query %s, exception: %s" % (q_str, sql_query_except),
file=stderr)
return self.query(qq)
except sqlite3.Error as e:
print("Couldn't execute query %s, exception: %s" % (qq, e), file=stderr)
self.last_result = []
return self.last_result
# sometimes we need the cursor for safety reasons, for example does sqlite3 all the security related
# escaoing in supplied strings for us, when we deliver it to con.execute in the second argument as a tuple
def get_cursor(self) -> sqlite3:
def getCursor(self) -> sqlite3:
"""Returns SQLite3 Cursor. Use with **c a u t i o n**... """
return self.cursor
# we could try to utilise that ourselfs in a function. Be c a r e f u l, these values in the tuple MUST HAVE
# THE RIGHT TYPE
def safe_query(self, q_str: str, deliver: tuple) -> List[sqlite3.Row]:
def safequery(self, qq: str, deliver: tuple) -> List[sqlite3.Row]:
""" Shall handle any query that has user input in it as an alternative to self.query
:param q_str: Query to execute
:type q_str: str
:param qq: Query to execute
:type qq: str
:param deliver: User inputs marked with the placeholder(`?`) in the str
:type deliver: tuple
:returns: A tuple(/list) consisting with any fetched results
@ -86,40 +83,39 @@ class SQLiteDB:
"""
try:
self.cursor.execute(q_str, deliver)
self.cursor.execute(qq, deliver)
self.last_result = self.cursor.fetchall()
self.connection.commit()
except TypeError as type_err:
print("Types in given tuple doesnt match to execute query \"%s\": %s" % (q_str, type_err), file=stderr)
except TypeError as e:
print("Types in given tuple doesnt match to execute query \"%s\": %s" % (qq, e), file=stderr)
self.last_result = []
except sqlite3.OperationalError:
self._createTable()
return self.safe_query(q_str, deliver)
except sqlite3.Error as sql_query_error:
print("Couldn't execute query %s, exception: %s" % (q_str, sql_query_error), file=stderr)
return self.safequery(qq, deliver)
except sqlite3.Error as e:
print("Couldn't execute query %s, exception: %s" % (qq, e), file=stderr)
print(deliver)
print(type(sql_query_error))
print(type(e))
self.last_result = []
return self.last_result
def removeApplicantFromDB(self, user_id: int) -> bool:
def removeApplicantFromDB(self, userid: int) -> bool:
"""Removes Applicants from the DB by ID. Use along System.removeUser()
:param user_id: User ID to remove from the Database
:type user_id: int
:param userid: User ID to remove from the Database
:type userid: int
:returns: True, if removal was successful(from the DB), False when not
:rtype: bool
"""
try:
self.last_result = self.cursor.execute("DELETE FROM `applications` WHERE id = ? ",
[user_id])
self.last_result = self.cursor.execute("DELETE FROM `applications` WHERE id = ? ", [userid])
self.connection.commit()
except sqlite3.OperationalError:
print("The database has probably not yet seen any users, so it didnt create your table yet. Come back"
"when a user tried to register")
return False
except sqlite3.Error as query_error:
print(f"Could not delete user with id: {user_id}, exception in DB: {query_error}") # @TODO LOGGING FFS
except sqlite3.Error as e:
print(f"Could not delete user with id: {userid}, exception in DB: {e}") # @TODO LOGGING FFS
return False
return True
@ -138,8 +134,8 @@ class SQLiteDB:
print("The database has probably not yet seen any users, so it didnt create your table yet. Come back"
"when a user tried to register")
return False
except sqlite3.Error as sql_error:
print(f"Could not delete user {username}, exception in DB: {sql_error}") # @TODO LOGGING
except sqlite3.Error as e:
print(f"Could not delete user {username}, exception in DB: {e}") # @TODO LOGGING
return False
return True
@ -154,8 +150,8 @@ class SQLiteDB:
"timestamp_valid CHECK( timestamp IS strftime('%Y-%m-%d %H:%M:%S', timestamp))"
",status INTEGER NOT NULL DEFAULT 0);")
self.connection.commit()
except sqlite3.Error as sql_error:
print(f"The database probably doesn't exist yet, but read the message: {sql_error}")
except sqlite3.Error as e:
print(f"The database probably doesn't exist yet, but read the message: {e}")
print("The database table didn't exist yet; created it successfully!")

@ -1,9 +1,6 @@
import argparse
import lib.cwd
argparser = argparse.ArgumentParser(description='Tilde administration tools ',
conflict_handler="resolve")
argparser = argparse.ArgumentParser(description='Tilde administration tools ', conflict_handler="resolve")
argparser.add_argument('-c', '--config', default=lib.cwd.cwd,
type=str,
help='Path to configuration file. If not set, we look for it in $TILDE_CONF',
required=False)
type=str, help='Path to configuration file', required=False)

Loading…
Cancel
Save