Compare commits

...

7 Commits
master ... dev

Author SHA1 Message Date
Darksider3 e2c2bcb0c2 import.py: Dont check for users existence while just sanitizing.
This ends the context of commit b4bb45ef85
and 108492db3f with an awesome little
detail, which provides SO MUCH value.
4 years ago
Darksider3 108492db3f private/lib/Validator.py: Further code quality enhancements
Namely:
* f => file_handle
* errstr => err_str
* intendantion adjustements.
* factor out uneccessary if-return-else-return-clause.

Also a new functionality to NOT check for users existence but everything
else. Needed to sanitize in case it's a backup of the current running
instance already. This further adds to commit
b4bb45ef85.
4 years ago
Darksider3 d2f99d95d0 sqlitedb.py: Refactor naming
safequery->safe_query, more snake_case all over the place enforced.
4 years ago
Darksider3 fb620447c7 config_ui: Always point out that the script can operate on the ENV 4 years ago
Darksider3 b4bb45ef85 Import.py: Sanitize given csv in case --Import isnt given and...
... fixup some variable names, also a `pass` that i forget to delete
beforehand.

Also, as you can see in the title, i added the ability to sanitize a
given csv file in case the user didnt supply the --Import flag. Thus, it
isnt required anymore.
4 years ago
Darksider3 aae7504e13 ListUsers.py: Externalize class & rename some variables
This actually makes the whole file easier to read and maintain. I've
still to refactor the output to something.
4 years ago
Darksider3 b5c11455d0 Backup.py -> backup.py, also some PEP conformity-changes
Which also includes two TODO-Tasks.
We had not proven ourselfs worthy of the holy-PEP-construct yet, so we
have to rename the whole file to backu.py, which follows snake_case, and
it's classes members.
They now embrace it as they should've done in the first place!

The dragon which we just encountered isnt as big as we thought and we
can still fight it with our as-easy-as-it-gets task to just snip out the
class and give it it's own residence under lib/backup.py-File!
4 years ago

@ -1,120 +1,41 @@
#!/usr/bin/env python3
import configparser
import sqlite3 # sqlite3.Row-Object
from typing import List # Typing support!
import lib.uis.default as default_cmd # Follows -u, -a, -f flags
from lib.sqlitedb import SQLiteDB
class ListUsers:
db = None
usersFetch = None
def __init__(self, db: str, unapproved: bool = False, approved: bool = True, single_user: str = None):
"""Constructs ListUsers
:param db: Database to access
:type db: str
:param unapproved: only List unapproved users
:type unapproved: bool
:param approved: only list approved users
:type approved: bool
"""
self.db = SQLiteDB(db)
if unapproved: # only unapproved users
query = "SELECT * FROM `applications` WHERE `status` = '0'"
elif approved: # Approved users
query = "SELECT * FROM `applications` WHERE `status` = '1'"
else: # All users
query = "SELECT * FROM `applications`"
self.usersFetch = self.db.query(query)
if single_user is not None:
query = "SELECT * FROM `applications` WHERE `username` = ?"
self.usersFetch = self.db.safequery(query, tuple([single_user]))
def output_as_list(self) -> str:
"""Generates a string with one (approved) user per line and one newline at the end
:rtype: str
:return: String consisting with one(activated) user per line
"""
list_str: str = ""
query = "SELECT `username` FROM `applications` WHERE `status` = '1' ORDER BY timestamp ASC"
self.usersFetch = self.db.query(query)
for users in self.usersFetch:
list_str += users["username"] + "\n"
return list_str
def prettyPrint(self) -> None:
pass # see below why not implemented yet, texttable...
def get_fetch(self) -> List[sqlite3.Row]:
""" Returns a complete fetch done by the lib.sqlitedb-class
:return: Complete fetchall(). A List[sqlite3.Row] with dict-emulation objects.
:rtype: List[sqlite3.Row]
"""
return self.usersFetch
# @TODO MAYBE best solution: https://pypi.org/project/texttable/
# examle:
"""
from texttable import Texttable
t = Texttable()
t.add_rows([['Name', 'Age'], ['Alice', 24], ['Bob', 19]])
print(t.draw())
---------------> Results in:
+-------+-----+
| Name | Age |
+=======+=====+
| Alice | 24 |
+-------+-----+
| Bob | 19 |
+-------+-----+
from lib.ListUsers import ListUsers
for user in fetch:
print("ID: {}; Username: \"{}\"; Mail: {}; Name: \"{}\"; Registered: {}; Status: {}".format(
user["id"], user["username"], user["email"], user["name"], user["timestamp"], user["status"]
))
"""
if __name__ == "__main__":
default_cmd.argparser.description += " - Lists Users from the Tilde database."
default_cmd.argparser.add_argument('--list-asc', default=False, action="store_true",
help='Output a newline seperated list of users', required=False, dest="args_asc")
default_cmd.argparser.add_argument('--user', default=None, type=str,
help="Just show a specific user by it's name", required=False)
default_cmd.argparser.add_argument('--single_user', default=None, type=str,
help="Just show a specific single_user by it's name", required=False)
args = default_cmd.argparser.parse_args()
config = configparser.ConfigParser()
config.read(args.config)
ret = ""
if args.user is not None:
OUTPUT = ""
if args.single_user is not None:
L = ListUsers(config['DEFAULT']['applications_db'], unapproved=args.unapproved, approved=args.approved,
single_user=args.user)
single_user=args.single_user)
else:
L = ListUsers(config['DEFAULT']['applications_db'], unapproved=args.unapproved, approved=args.approved)
if args.args_asc:
ret = L.output_as_list()
OUTPUT = L.output_as_list()
else:
fetch = L.get_fetch()
ret += "ID %-1s| Username %-5s| Mail %-20s| Name %-17s| Registered %-8s | State |\n" % (
users = L.get_fetch()
OUTPUT += "ID %-1s| Username %-5s| Mail %-20s| Name %-17s| Registered %-8s | State |\n" % (
" ", " ", " ", " ", " "
)
ret += 102 * "-" + "\n"
for user in fetch:
ret += "%-4i| %-14s| %-25s| %-22s| %-8s | %-5i |\n" % (
user["id"], user["username"], user["email"], user["name"], user["timestamp"], user["status"]
OUTPUT += 102 * "-" + "\n"
for single_user in users:
OUTPUT += "%-4i| %-14s| %-25s| %-22s| %-8s | %-5i |\n" % (
single_user["id"], single_user["username"], single_user["email"],
single_user["name"], single_user["timestamp"], single_user["status"]
)
if args.file != "stdout":
with open(args.file, 'w') as f:
print(ret, file=f)
print(OUTPUT, file=f)
else:
print(ret)
print(OUTPUT)
exit(0)

@ -1,20 +1,26 @@
#!/usr/bin/env python3
""" This module is thought to be the main point to export and import users.
It's actually not really a module but a script ought to be run from the command line
@TODO: Wording of module header...
"""
import configparser
import csv
import io
import ListUsers
from lib.ListUsers import ListUsers
import lib.uis.default as default_cmd # Follows -u, -a, -f flags
class Backup:
"""Backups a Tilde database to an CSV file
@TODO: Move class into own file
:Example:
>>> from Backup import Backup
>>> from backup import Backup
>>> from ListUsers import ListUsers
>>> L = ListUsers.ListUsers("/path/to/sqlite").get_fetch()
>>> L = ListUsers.list_users("/path/to/sqlite").get_fetch()
>>> backup_db = Backup("stdout")
>>> backup_db.backup_to_file(L)
CSV-Separated list with headers in first row
@ -37,12 +43,13 @@ class Backup:
:type dialect: str
"""
self.setFilename(output)
self.setQuoting(quoting)
self.setDialect(dialect)
self.setFieldnames(tuple(['id', 'username', 'email', 'name', 'pubkey', 'timestamp', 'status']))
self.set_filename(output)
self.set_quoting(quoting)
self.set_dialect(dialect)
self.set_field_names(tuple(['id', 'username', 'email', 'name',
'pubkey', 'timestamp', 'status']))
def setDialect(self, dialect: str) -> None:
def set_dialect(self, dialect: str) -> None:
""" Set dialect for Object
:param dialect: Dialect to set for Object
@ -53,7 +60,7 @@ class Backup:
self.dialect = dialect
def setQuoting(self, quoting: int) -> None:
def set_quoting(self, quoting: int) -> None:
""" Set quoting in the CSV(must be supported by the CSV Module!)
:param quoting: Quoting Integer given by csv.QUOTE_* constants
@ -64,10 +71,10 @@ class Backup:
self.quoting = quoting
def setFilename(self, filename: str) -> None:
""" Sets Filename to output to
def set_filename(self, filename: str) -> None:
""" Sets Filename to OUTPUT to
:param filename: Filename to output to(set stdout for stdout)
:param filename: Filename to OUTPUT to(set stdout for stdout)
:type filename: str
:return: None
:rtype: None
@ -75,8 +82,8 @@ class Backup:
self.filename = filename
def setFieldnames(self, f_names: tuple) -> None:
""" Set fieldname to process
def set_field_names(self, f_names: tuple) -> None:
""" Set field name to process
:param f_names: Fieldnames-Tuple
:type f_names: tuple
@ -95,20 +102,22 @@ class Backup:
"""
returner = io.StringIO()
write_csv = csv.DictWriter(returner, fieldnames=self.field_names, quoting=self.quoting, dialect=self.dialect)
write_csv = csv.DictWriter(returner, fieldnames=self.field_names,
quoting=self.quoting, dialect=self.dialect)
write_csv.writeheader()
for row in fetched:
write_csv.writerow(dict(row))
# sqlite3.Row doesn't "easily" convert to a dict itself sadly, so just a quick help from us here
# it actually even delivers a list(sqlite3.Row) also, which doesnt make the life a whole lot easier
# sqlite3.Row doesn't "easily" convert to a dict itself sadly,
# so just a quick help from us here
# it actually even delivers a list(sqlite3.Row) also,
# which doesnt make the life a whole lot easier
if self.filename == "stdout":
print(returner.getvalue())
return True
else:
with open(self.filename, "w") as f:
print(returner.getvalue(), file=f)
return True
return True
if __name__ == "__main__":
@ -116,12 +125,12 @@ if __name__ == "__main__":
args = default_cmd.argparser.parse_args()
config = configparser.ConfigParser()
config.read(args.config)
L = ListUsers.ListUsers(config['DEFAULT']['applications_db'],
unapproved=args.unapproved, approved=args.approved)
L = ListUsers(config['DEFAULT']['applications_db'],
unapproved=args.unapproved, approved=args.approved)
fetch = L.get_fetch()
if fetch:
B = Backup(args.file)
B.setFieldnames(fetch[0].keys()) # sqlite3.row delivers its keys for us! SO NICE!
B.set_field_names(fetch[0].keys()) # sqlite3.row delivers its keys for us! SO NICE!
B.backup_to_file(fetch)
else:
print("nothing to backup!")

@ -50,7 +50,7 @@ if __name__ == "__main__":
if not DB:
print("Could not establish connection to database")
exit(1)
CurrentUser = DB.safequery("SELECT * FROM `applications` WHERE `username`=?", tuple([args.user]))[0]
CurrentUser = DB.safe_query("SELECT * FROM `applications` WHERE `username`=?", tuple([args.user]))[0]
# --> --remove
if args.remove:
@ -75,9 +75,9 @@ if __name__ == "__main__":
print(f"Pubkey '{args.sshpubkey}' isn't valid.")
exit(1)
try:
DB.safequery("UPDATE `applications` SET `pubkey`=? WHERE `username`=?",
tuple([args.sshpubkey, args.user]))
CurrentUser = DB.safequery("SELECT * FROM `applications` WHERE `username` = ? ", tuple([args.user]))[0]
DB.safe_query("UPDATE `applications` SET `pubkey`=? WHERE `username`=?",
tuple([args.sshpubkey, args.user]))
CurrentUser = DB.safe_query("SELECT * FROM `applications` WHERE `username` = ? ", tuple([args.user]))[0]
if int(CurrentUser["status"]) == 1:
sys_ctl.make_ssh_usable(args.sshpubkey)
except sqlite3.Error as e:
@ -93,7 +93,7 @@ if __name__ == "__main__":
print(f"'{args.name}' is not a valid Name.")
exit(1)
try:
DB.safequery("UPDATE `applications` SET `name` =? WHERE `username` =?", tuple([args.name, args.user]))
DB.safe_query("UPDATE `applications` SET `name` =? WHERE `username` =?", tuple([args.name, args.user]))
except sqlite3.Error as e:
print(f"Could not write '{args.name}' to database: {e}")
print(f"'{args.user}'s Name changed to '{args.name}'.")
@ -104,7 +104,7 @@ if __name__ == "__main__":
print(f"'{args.email}' is not a valid Mail address!")
exit(1)
try:
DB.safequery("UPDATE `applications` SET `email` =? WHERE `username` =?", tuple([args.email]))
DB.safe_query("UPDATE `applications` SET `email` =? WHERE `username` =?", tuple([args.email]))
except sqlite3.Error as e:
print(f"Could not write '{args.email}' to the database. {e}")
print(f"'{args.user}' Mail changed to '{args.email}'.")
@ -121,8 +121,8 @@ if __name__ == "__main__":
if args.status == 0 and int(CurrentUser["status"]) == 1:
try:
DB.safequery("UPDATE `applications` SET `status` =? WHERE `id`=?",
tuple([args.status, CurrentUser["id"]]))
DB.safe_query("UPDATE `applications` SET `status` =? WHERE `id`=?",
tuple([args.status, CurrentUser["id"]]))
sys_ctl.remove_user()
except sqlite3.Error as e:
print(f"Could not update database entry for '{args.user}', did not touch the system")
@ -134,8 +134,8 @@ if __name__ == "__main__":
if args.status == 1 and int(CurrentUser["status"]) == 0:
try:
DB.safequery("UPDATE `applications` SET `status`=? WHERE `username`=?",
tuple([args.status, args.user]))
DB.safe_query("UPDATE `applications` SET `status`=? WHERE `username`=?",
tuple([args.status, args.user]))
sys_ctl.aio_approve(CurrentUser["pubkey"])
except sqlite3.Error as e:
print(f"Could not update Users status in database")

@ -1,14 +1,17 @@
#!/usr/bin/env python3
"""
Import users or sanitize backup files
"""
import configparser
import csv
import os
import lib.UserExceptions
import lib.uis.config_ui # dont go to default, just following -c flag
import lib.Validator
def import_from_file(file_path: str, db: str, user_ids: tuple = tuple([])) -> bool:
def import_from_file(file_path: str, database_file: str, user_ids: tuple = tuple([])) -> bool:
""" Imports Users from a given CSV-file to the system and DB
:param file_path:
@ -23,22 +26,21 @@ def import_from_file(file_path: str, db: str, user_ids: tuple = tuple([])) -> bo
if not os.path.isfile(file_path):
print(f"File {file_path} don't exist")
return False
if not os.path.isfile(db):
print(f"The database file {db} don't exist")
if not os.path.isfile(database_file):
print(f"The database file {database_file} don't exist")
return False
if user_ids:
pass # empty tuple means everything
# noinspection PyBroadException
try:
with open(file_path, 'r', newline='') as f:
import lib.Validator
sql = lib.sqlitedb.SQLiteDB(db)
err = lib.Validator.checkImportFile(file_path, db)
import lib.sqlitedb
import lib.System
sql = lib.sqlitedb.SQLiteDB(database_file)
err = lib.Validator.checkImportFile(file_path, database_file)
if err is not True:
print(err)
exit(0)
import lib.sqlitedb
import lib.System
sys_ctl = lib.System.System("root")
reader = csv.DictReader(f) # @TODO csv.Sniffer to compare? When yes, give force-accept option
for row in reader:
@ -47,20 +49,19 @@ def import_from_file(file_path: str, db: str, user_ids: tuple = tuple([])) -> bo
sys_ctl.setUser(row["username"])
sys_ctl.aio_approve(row["pubkey"])
print(row['username'], "====> Registered.")
except lib.UserExceptions.General as GeneralExcept:
print(f"Something didnt work out! {GeneralExcept}")
except lib.UserExceptions.General as general_except:
print(f"Something didnt work out! {general_except}")
elif row["status"] == "0":
print(row['username'] + " not approved, therefore not registered.")
try:
sql.safequery(
sql.safe_query(
"INSERT INTO `applications` (username, name, timestamp, email, pubkey, status) "
"VALUES (?,?,?,?,?,?)", tuple([row["username"], row["name"], row["timestamp"],
row["email"], row["pubkey"], row["status"]]))
except OSError as E:
pass
print(f"UUFFF, something went WRONG with the file {file_path}: {E}")
except Exception as didntCatch:
print(f"Exception! UNCATCHED! {type(didntCatch)}: {didntCatch}")
except OSError as os_exception:
print(f"UUFFF, something went WRONG with the file {file_path}: {os_exception}")
except Exception as didnt_catch:
print(f"Exception! UNCATCHED! {type(didnt_catch)}: {didnt_catch}")
return True
@ -71,17 +72,24 @@ if __name__ == "__main__":
ArgParser.add_argument('-f', '--file', default="stdout",
type=str, help='Import from CSV file', required=True)
ArgParser.add_argument('--Import', default=False, action="store_true",
help="Import Users.", required=True)
help="Import Users. If not set, just sanitize the supplied csv", required=False)
args = ArgParser.parse_args()
config = configparser.ConfigParser()
config.read(args.config)
if not args.Import:
print("Error, need the import flag")
if not args.file:
print("Error, need the import file")
if not args.file:
print("You MUST set a CSV-file with the -f/--file flag that already exist")
if not args.Import:
# we assume that you just want to sanitize the backup
if not os.path.isfile(args.file):
print(f"File {args.file} doesnt exist")
exit(1)
import_from_file(args.file, config['DEFAULT']['applications_db'])
sanitized = lib.Validator.checkImportFile(args.file, config['DEFAULT']['applications_db'], False)
if sanitized is not True:
print(sanitized)
else:
print(f"{args.file} is valid!")
exit(0)
elif args.Import:
import_from_file(args.file, config['DEFAULT']['applications_db'])
exit(0)

@ -0,0 +1,65 @@
from lib.sqlitedb import SQLiteDB
import sqlite3 # sqlite3.Row-Object
from typing import List # Typing support!
class ListUsers:
"""
List tilde users
"""
db = None
users_fetch = None
def __init__(self, db: str, unapproved: bool = False, approved: bool = True, single_user: str = None):
"""Constructs list_users
:param db: Database to access
:type db: str
:param unapproved: only List unapproved users
:type unapproved: bool
:param approved: only list approved users
:type approved: bool
"""
self.db = SQLiteDB(db)
if unapproved: # only unapproved users
query = "SELECT * FROM `applications` WHERE `status` = '0'"
elif approved: # Approved users
query = "SELECT * FROM `applications` WHERE `status` = '1'"
else: # All users
query = "SELECT * FROM `applications`"
self.users_fetch = self.db.query(query)
if single_user is not None:
query = "SELECT * FROM `applications` WHERE `username` = ?"
self.users_fetch = self.db.safe_query(query, tuple([single_user]))
def output_as_list(self) -> str:
"""Generates a string with one (approved) single_user per line and one newline at the end
:rtype: str
:return: String consisting with one(activated) single_user per line
"""
list_str: str = ""
query = "SELECT `username` FROM `applications` WHERE `status` = '1' ORDER BY timestamp ASC"
self.users_fetch = self.db.query(query)
for user in self.users_fetch:
list_str += user["username"] + "\n"
return list_str
def pretty_print(self) -> None:
"""
pretty-print users
:return: None
"""
pass # see below why not implemented yet, texttable...
def get_fetch(self) -> List[sqlite3.Row]:
""" Returns a complete users done by the lib.sqlitedb-class
:return: Complete fetchall(). A List[sqlite3.Row] with dict-emulation objects.
:rtype: List[sqlite3.Row]
"""
return self.users_fetch

@ -72,7 +72,7 @@ def checkUserInDB(username: str, db: str) -> bool:
try:
ldb = lib.sqlitedb.SQLiteDB(db)
fetched = ldb.safequery("SELECT * FROM 'applications' WHERE username = ?", tuple([username]))
fetched = ldb.safe_query("SELECT * FROM 'applications' WHERE username = ?", tuple([username]))
if fetched:
return True
except lib.sqlitedb.sqlite3.Error as e:
@ -156,7 +156,7 @@ def checkName(name: str) -> bool:
return True
def checkImportFile(path: str, db: str):
def checkImportFile(path: str, db: str, test_existence: bool = True):
""" Checks an CSV file against most of the validators and prints an Error message with the line number corresponding
to said failure.. Those includes: checkName, checkUsernameCharacters,
ckeckUsernameLength, duplicate usernames(in the CSV), checkSSHKey, checkEmail, checkUserExists, checkUserInDB,
@ -166,54 +166,64 @@ def checkImportFile(path: str, db: str):
:type path: str
:param db: Path to database file(SQLite)
:type db: str
:param test_existence: Flag, checking users existence while true, won't when set to false. Default's to true.
:type test_existence: bool
:return: Str when Failure, True when success(All tests passed)
:rtype: Str or None
"""
errstr = ""
err_str = ""
valid = True
ln = 1 # line number
line = 1 # line number
valid_names_list = []
with open(path, 'r', newline='') as f:
reader = csv.DictReader(f)
with open(path, 'r', newline='') as file_handle:
reader = csv.DictReader(file_handle)
for row in reader:
# if any of this fails move on to the next user, just print a relatively helpful message lel
if not lib.Validator.checkName(row["name"]):
errstr += f"Line {ln}: Name: '{row['name']}' seems not legit. Character followed by character should" \
f" be correct.\n"
err_str += f"Line {line}: Name: '{row['name']}' seems not legit. " \
f"Character followed by character should be correct.\n"
valid = False
if not lib.Validator.checkUsernameCharacters(row["username"]):
errstr += (f"Line {ln}: Username contains unsupported characters or starts with a number: '"
f"{row['username']}'.\n")
err_str += (f"Line {line}: "
f"Username contains unsupported characters or starts with a number: '"
f"{row['username']}'.\n")
valid = False
if not lib.Validator.checkUsernameLength(row["username"]):
errstr += f"Line {ln}: Username '{row['username']}' is either too long(>16) or short(<3)\n"
err_str += f"Line {line}: " \
f"Username '{row['username']}' is either too long(>16) or short(<3)\n"
valid = False
# dup checking
if row["username"] in valid_names_list:
errstr += f"Line {ln}: Duplicate Username {row['username']}!\n"
err_str += f"Line {line}: Duplicate Username {row['username']}!\n"
valid = False
else:
valid_names_list.append(row["username"])
# dup end
if not lib.Validator.checkSSHKey(row["pubkey"]):
errstr += f"Line {ln}: Following SSH-Key of user '{row['username']}' isn't valid: " \
f"'{row['pubkey']}'.\n"
err_str += f"Line {line}: " \
f"Following SSH-Key of user '{row['username']}' isn't valid: " \
f"'{row['pubkey']}'.\n"
valid = False
if not lib.Validator.checkEmail(row["email"]):
errstr += f"Line {ln}: E-Mail address of user '{row['username']}' '{row['email']}' is not valid.\n"
err_str += \
f"Line {line}: " \
f"E-Mail address of user '{row['username']}' '{row['email']}' is not valid.\n"
valid = False
if lib.Validator.checkUserExists(row["username"]) or checkUserInDB(row["username"], db):
errstr += f"Line {ln}: User '{row['username']}' already exists.\n"
valid = False
if test_existence:
err_str += f"Line {line}: User '{row['username']}' already exists.\n"
valid = False
else:
pass
if not lib.Validator.checkDatetimeFormat(row["timestamp"]):
errstr += f"Line {ln}: Timestamp '{row['timestamp']}' from user '{row['username']}' is invalid.\n"
err_str += f"Line {line}: Timestamp '{row['timestamp']}' " \
f"from user '{row['username']}' is invalid.\n"
valid = False
if int(row["status"]) > 1 or int(row["status"]) < 0:
errstr += f"Line {ln}: Status '{row['status']}' MUST be either 0 or 1.\n"
err_str += f"Line {line}: Status '{row['status']}' MUST be either 0 or 1.\n"
valid = False
ln += 1
line += 1
if valid:
return True
else:
return errstr
err_str = True
return err_str

@ -1,4 +1,8 @@
#!/usr/bin/env python3
"""
SQLite wrapper which does just some simple wraps, to ease our experience a little.
"""
import sqlite3
from sys import stderr as stderr
from typing import List # Typing support!
@ -20,20 +24,18 @@ class SQLiteDB:
connection = None
last_result = None
def __init__(self, dbpath: str):
def __init__(self, db_path: str):
"""
:param dbpath: Path to the database we want to open
:type dbpath: str
:param db_path: Path to the database we want to open
:type db_path: str
:returns: Object for the SQLitedb-Class.
:rtype: object
"""
db = dbpath
try:
self.connection = sqlite3.connect(db)
self.connection = sqlite3.connect(db_path)
self.cursor = self.connection.cursor()
except sqlite3.Error as e:
print("Connection error: %s" % e, file=stderr)
except sqlite3.Error as sql_con:
print("Connection error: %s" % sql_con, file=stderr)
self.cursor.row_factory = sqlite3.Row # every result will be a dict now
@ -44,38 +46,39 @@ class SQLiteDB:
except sqlite3.Error as e:
print("Couldn't gracefully close db: %s" % e, file=stderr)
def query(self, qq: str) -> List[sqlite3.Row]:
def query(self, q_str: str) -> List[sqlite3.Row]:
"""Do a query and automagically get the fetched results in a list
:param qq: Query to execute
:type qq: str
:param q_str: Query to execute
:type q_str: str
:returns: A tuple(/list) consisting with any fetched results
:rtype: list
"""
try:
self.cursor.execute(qq)
self.cursor.execute(q_str)
self.last_result = self.cursor.fetchall()
self.connection.commit()
except sqlite3.OperationalError:
self._createTable()
return self.query(qq)
except sqlite3.Error as e:
print("Couldn't execute query %s, exception: %s" % (qq, e), file=stderr)
return self.query(q_str)
except sqlite3.Error as sql_query_except:
print("Couldn't execute query %s, exception: %s" % (q_str, sql_query_except),
file=stderr)
self.last_result = []
return self.last_result
# sometimes we need the cursor for safety reasons, for example does sqlite3 all the security related
# escaoing in supplied strings for us, when we deliver it to con.execute in the second argument as a tuple
def getCursor(self) -> sqlite3:
def get_cursor(self) -> sqlite3:
"""Returns SQLite3 Cursor. Use with **c a u t i o n**... """
return self.cursor
# we could try to utilise that ourselfs in a function. Be c a r e f u l, these values in the tuple MUST HAVE
# THE RIGHT TYPE
def safequery(self, qq: str, deliver: tuple) -> List[sqlite3.Row]:
def safe_query(self, q_str: str, deliver: tuple) -> List[sqlite3.Row]:
""" Shall handle any query that has user input in it as an alternative to self.query
:param qq: Query to execute
:type qq: str
:param q_str: Query to execute
:type q_str: str
:param deliver: User inputs marked with the placeholder(`?`) in the str
:type deliver: tuple
:returns: A tuple(/list) consisting with any fetched results
@ -83,39 +86,40 @@ class SQLiteDB:
"""
try:
self.cursor.execute(qq, deliver)
self.cursor.execute(q_str, deliver)
self.last_result = self.cursor.fetchall()
self.connection.commit()
except TypeError as e:
print("Types in given tuple doesnt match to execute query \"%s\": %s" % (qq, e), file=stderr)
except TypeError as type_err:
print("Types in given tuple doesnt match to execute query \"%s\": %s" % (q_str, type_err), file=stderr)
self.last_result = []
except sqlite3.OperationalError:
self._createTable()
return self.safequery(qq, deliver)
except sqlite3.Error as e:
print("Couldn't execute query %s, exception: %s" % (qq, e), file=stderr)
return self.safe_query(q_str, deliver)
except sqlite3.Error as sql_query_error:
print("Couldn't execute query %s, exception: %s" % (q_str, sql_query_error), file=stderr)
print(deliver)
print(type(e))
print(type(sql_query_error))
self.last_result = []
return self.last_result
def removeApplicantFromDB(self, userid: int) -> bool:
def removeApplicantFromDB(self, user_id: int) -> bool:
"""Removes Applicants from the DB by ID. Use along System.removeUser()
:param userid: User ID to remove from the Database
:type userid: int
:param user_id: User ID to remove from the Database
:type user_id: int
:returns: True, if removal was successful(from the DB), False when not
:rtype: bool
"""
try:
self.last_result = self.cursor.execute("DELETE FROM `applications` WHERE id = ? ", [userid])
self.last_result = self.cursor.execute("DELETE FROM `applications` WHERE id = ? ",
[user_id])
self.connection.commit()
except sqlite3.OperationalError:
print("The database has probably not yet seen any users, so it didnt create your table yet. Come back"
"when a user tried to register")
return False
except sqlite3.Error as e:
print(f"Could not delete user with id: {userid}, exception in DB: {e}") # @TODO LOGGING FFS
except sqlite3.Error as query_error:
print(f"Could not delete user with id: {user_id}, exception in DB: {query_error}") # @TODO LOGGING FFS
return False
return True
@ -134,8 +138,8 @@ class SQLiteDB:
print("The database has probably not yet seen any users, so it didnt create your table yet. Come back"
"when a user tried to register")
return False
except sqlite3.Error as e:
print(f"Could not delete user {username}, exception in DB: {e}") # @TODO LOGGING
except sqlite3.Error as sql_error:
print(f"Could not delete user {username}, exception in DB: {sql_error}") # @TODO LOGGING
return False
return True
@ -150,8 +154,8 @@ class SQLiteDB:
"timestamp_valid CHECK( timestamp IS strftime('%Y-%m-%d %H:%M:%S', timestamp))"
",status INTEGER NOT NULL DEFAULT 0);")
self.connection.commit()
except sqlite3.Error as e:
print(f"The database probably doesn't exist yet, but read the message: {e}")
except sqlite3.Error as sql_error:
print(f"The database probably doesn't exist yet, but read the message: {sql_error}")
print("The database table didn't exist yet; created it successfully!")

@ -1,6 +1,9 @@
import argparse
import lib.cwd
argparser = argparse.ArgumentParser(description='Tilde administration tools ', conflict_handler="resolve")
argparser = argparse.ArgumentParser(description='Tilde administration tools ',
conflict_handler="resolve")
argparser.add_argument('-c', '--config', default=lib.cwd.cwd,
type=str, help='Path to configuration file', required=False)
type=str,
help='Path to configuration file. If not set, we look for it in $TILDE_CONF',
required=False)

Loading…
Cancel
Save