General overhaul, better messages, ordered imports etc

This commit is contained in:
Darksider3 2019-10-25 10:17:39 +02:00
parent 32c6ea7e07
commit 8008fa36b9
7 changed files with 180 additions and 185 deletions

View file

@ -1,9 +1,10 @@
#!/usr/bin/env python3
import ListUsers
import configparser
import csv
import io
import configparser
import ListUsers
import lib.uis.default as default_cmd # Follows -u, -a, -f flags
@ -115,17 +116,14 @@ if __name__ == "__main__":
args = default_cmd.argparser.parse_args()
config = configparser.ConfigParser()
config.read(args.config)
try:
L = ListUsers.ListUsers(config['DEFAULT']['applications_db'],
unapproved=args.unapproved, approved=args.approved)
fetch = L.get_fetch()
if fetch:
B = Backup(args.file)
B.setFieldnames(fetch[0].keys()) # sqlite3.row delivers its keys for us! SO NICE!
B.backup_to_file(fetch)
else:
print("nothing to backup!")
exit(1)
exit(0)
except KeyboardInterrupt as e:
pass
L = ListUsers.ListUsers(config['DEFAULT']['applications_db'],
unapproved=args.unapproved, approved=args.approved)
fetch = L.get_fetch()
if fetch:
B = Backup(args.file)
B.setFieldnames(fetch[0].keys()) # sqlite3.row delivers its keys for us! SO NICE!
B.backup_to_file(fetch)
else:
print("nothing to backup!")
exit(1)
exit(0)

View file

@ -1,8 +1,9 @@
#!/usr/bin/env python3
import configparser
import csv
import os
import configparser
import lib.UserExceptions
import lib.uis.config_ui # dont go to default, just following -c flag
@ -74,15 +75,13 @@ if __name__ == "__main__":
args = ArgParser.parse_args()
config = configparser.ConfigParser()
config.read(args.config)
try:
if not args.Import:
print("Error, need the import flag")
if not args.Import:
print("Error, need the import flag")
if not args.file:
print("Error, need the import file")
if not args.file:
print("Error, need the import file")
if not args.file:
print("You MUST set a CSV-file with the -f/--file flag that already exist")
exit(1)
import_from_file(args.file, config['DEFAULT']['applications_db'])
exit(0)
except KeyboardInterrupt as e:
pass
print("You MUST set a CSV-file with the -f/--file flag that already exist")
exit(1)
import_from_file(args.file, config['DEFAULT']['applications_db'])
exit(0)

View file

@ -1,11 +1,11 @@
#!/usr/bin/env python3
from lib.sqlitedb import SQLiteDB
import lib.uis.default as default_cmd # Follows -u, -a, -f flags
from typing import List # Typing support!
import sqlite3 # sqlite3.Row-Object
import configparser
import sqlite3 # sqlite3.Row-Object
from typing import List # Typing support!
import lib.uis.default as default_cmd # Follows -u, -a, -f flags
from lib.sqlitedb import SQLiteDB
class ListUsers:
@ -46,7 +46,7 @@ class ListUsers:
query = "SELECT `username` FROM `applications` WHERE `status` = '1' ORDER BY timestamp ASC"
self.usersFetch = self.db.query(query)
for users in self.usersFetch:
list_str += users["username"]+"\n"
list_str += users["username"] + "\n"
return list_str
def prettyPrint(self) -> None:
@ -94,30 +94,27 @@ if __name__ == "__main__":
config = configparser.ConfigParser()
config.read(args.config)
try:
ret = ""
if args.user is not None:
L = ListUsers(config['DEFAULT']['applications_db'], unapproved=args.unapproved, approved=args.approved,
single_user=args.user)
else:
L = ListUsers(config['DEFAULT']['applications_db'], unapproved=args.unapproved, approved=args.approved)
if args.args_asc:
ret = L.output_as_list()
else:
fetch = L.get_fetch()
ret += "ID %-1s| Username %-5s| Mail %-20s| Name %-17s| Registered %-8s | State |\n" % (
" ", " ", " ", " ", " "
ret = ""
if args.user is not None:
L = ListUsers(config['DEFAULT']['applications_db'], unapproved=args.unapproved, approved=args.approved,
single_user=args.user)
else:
L = ListUsers(config['DEFAULT']['applications_db'], unapproved=args.unapproved, approved=args.approved)
if args.args_asc:
ret = L.output_as_list()
else:
fetch = L.get_fetch()
ret += "ID %-1s| Username %-5s| Mail %-20s| Name %-17s| Registered %-8s | State |\n" % (
" ", " ", " ", " ", " "
)
ret += 102 * "-" + "\n"
for user in fetch:
ret += "%-4i| %-14s| %-25s| %-22s| %-8s | %-5i |\n" % (
user["id"], user["username"], user["email"], user["name"], user["timestamp"], user["status"]
)
ret += 102 * "-" + "\n"
for user in fetch:
ret += "%-4i| %-14s| %-25s| %-22s| %-8s | %-5i |\n" % (
user["id"], user["username"], user["email"], user["name"], user["timestamp"], user["status"]
)
if args.file != "stdout":
with open(args.file, 'w') as f:
print(ret, file=f)
else:
print(ret)
exit(0)
except KeyboardInterrupt:
pass
if args.file != "stdout":
with open(args.file, 'w') as f:
print(ret, file=f)
else:
print(ret)
exit(0)

View file

@ -1,12 +1,13 @@
#!/usr/bin/env python3
import configparser
import lib.uis.config_ui # only follow -c flag
import lib.Validator
import lib.sqlitedb
import sqlite3
import lib.System
import lib.UserExceptions
import sqlite3
import lib.Validator
import lib.sqlitedb
import lib.uis.config_ui # only follow -c flag
if __name__ == "__main__":
lib.uis.config_ui.argparser.description += " - Edit Tilde Users"
@ -35,115 +36,112 @@ if __name__ == "__main__":
args = ArgParser.parse_args()
config = configparser.ConfigParser()
config.read(args.config)
try:
db = config['DEFAULT']['applications_db']
if not args.sshpubkey and not args.name and not args.username and not args.email and args.status is None \
and not args.remove:
print(f"Well, SOMETHING must be done with {args.user} ;-)")
exit(1)
# --> --user
if not lib.Validator.checkUserInDB(args.user, db):
print(f"User {args.user} does not exist in the database.")
exit(1)
DB = lib.sqlitedb.SQLiteDB(db)
sys_ctl = lib.System.System(args.user)
if not DB:
print("Could not establish connection to database")
exit(1)
CurrentUser = DB.safequery("SELECT * FROM `applications` WHERE `username`=?", tuple([args.user]))[0]
db = config['DEFAULT']['applications_db']
if not args.sshpubkey and not args.name and not args.username and not args.email and args.status is None \
and not args.remove:
print(f"Well, SOMETHING must be done with {args.user} ;-)")
exit(1)
# --> --user
if not lib.Validator.checkUserInDB(args.user, db):
print(f"User {args.user} does not exist in the database.")
exit(1)
DB = lib.sqlitedb.SQLiteDB(db)
sys_ctl = lib.System.System(args.user)
if not DB:
print("Could not establish connection to database")
exit(1)
CurrentUser = DB.safequery("SELECT * FROM `applications` WHERE `username`=?", tuple([args.user]))[0]
# --> --remove
if args.remove:
print(f"Removing {args.user} from the system and the database...")
try:
DB.removeApplicantFromDBperUsername(args.user)
print(f"Purged from the DB")
if CurrentUser["status"] == 1:
sys_ctl.remove_user()
print(f"Purged from the system")
else:
print(f"'{args.user}' was not approved before, therefore not deleting from system itself.")
except lib.UserExceptions.General as e:
print(f"{e}")
exit(1)
print(f"Successfully removed '{args.user}'.")
# --> --remove
if args.remove:
print(f"Removing {args.user} from the system and the database...")
try:
DB.removeApplicantFromDBperUsername(args.user)
print(f"Purged from the DB")
if CurrentUser["status"] == 1:
sys_ctl.remove_user()
print(f"Purged from the system")
else:
print(f"'{args.user}' was not approved before, therefore not deleting from system itself.")
except lib.UserExceptions.General as e:
print(f"{e}")
exit(1)
print(f"Successfully removed '{args.user}'.")
exit(0)
# --> --sshpubkey
if args.sshpubkey:
if not lib.Validator.checkSSHKey(args.sshpubkey):
print(f"Pubkey '{args.sshpubkey}' isn't valid.")
exit(1)
try:
DB.safequery("UPDATE `applications` SET `pubkey`=? WHERE `username`=?",
tuple([args.sshpubkey, args.user]))
CurrentUser = DB.safequery("SELECT * FROM `applications` WHERE `username` = ? ", tuple([args.user]))[0]
if int(CurrentUser["status"]) == 1:
sys_ctl.make_ssh_usable(args.sshpubkey)
except sqlite3.Error as e:
print(f"Something unexpected happened! {e}")
exit(1)
except lib.UserExceptions.ModifyFilesystem as e:
print(f"One action failed during writing the ssh key back to the authorization file. {e}")
print(f"'{args.user}'s SSH-Key updated successfully.")
# --> --name
if args.name:
if not lib.Validator.checkName(args.name):
print(f"'{args.name}' is not a valid Name.")
exit(1)
try:
DB.safequery("UPDATE `applications` SET `name` =? WHERE `username` =?", tuple([args.name, args.user]))
except sqlite3.Error as e:
print(f"Could not write '{args.name}' to database: {e}")
print(f"'{args.user}'s Name changed to '{args.name}'.")
# --> --email
if args.email:
if not lib.Validator.checkEmail(args.email):
print(f"'{args.email}' is not a valid Mail address!")
exit(1)
try:
DB.safequery("UPDATE `applications` SET `email` =? WHERE `username` =?", tuple([args.email]))
except sqlite3.Error as e:
print(f"Could not write '{args.email}' to the database. {e}")
print(f"'{args.user}' Mail changed to '{args.email}'.")
# --> --status
if args.status is not None:
if args.status != 0 and args.status != 1:
print("Only 0 and 1 are valid status, where 1 is activated and 0 is unapproved.")
exit(0)
# --> --sshpubkey
if args.sshpubkey:
if not lib.Validator.checkSSHKey(args.sshpubkey):
print(f"Pubkey '{args.sshpubkey}' isn't valid.")
exit(1)
# just takes first result out of the dict
if args.status == int(CurrentUser["status"]):
print(f"New and old status are the same.")
if args.status == 0 and int(CurrentUser["status"]) == 1:
try:
DB.safequery("UPDATE `applications` SET `pubkey`=? WHERE `username`=?",
tuple([args.sshpubkey, args.user]))
CurrentUser = DB.safequery("SELECT * FROM `applications` WHERE `username` = ? ", tuple([args.user]))[0]
if int(CurrentUser["status"]) == 1:
sys_ctl.make_ssh_usable(args.sshpubkey)
DB.safequery("UPDATE `applications` SET `status` =? WHERE `id`=?",
tuple([args.status, CurrentUser["id"]]))
sys_ctl.remove_user()
except sqlite3.Error as e:
print(f"Something unexpected happened! {e}")
print(f"Could not update database entry for '{args.user}', did not touch the system")
exit(1)
except lib.UserExceptions.ModifyFilesystem as e:
print(f"One action failed during writing the ssh key back to the authorization file. {e}")
print(f"'{args.user}'s SSH-Key updated successfully.")
except lib.UserExceptions.UnknownReturnCode as e:
print(f"Could not remove '{args.user}' from the system, unknown return code: {e}. DB is modified.")
exit(1)
print(f"Successfully changed '{args.user}'s status to 0 and cleared from the system.")
# --> --name
if args.name:
if not lib.Validator.checkName(args.name):
print(f"'{args.name}' is not a valid Name.")
exit(1)
if args.status == 1 and int(CurrentUser["status"]) == 0:
try:
DB.safequery("UPDATE `applications` SET `name` =? WHERE `username` =?", tuple([args.name, args.user]))
DB.safequery("UPDATE `applications` SET `status`=? WHERE `username`=?",
tuple([args.status, args.user]))
sys_ctl.aio_approve(CurrentUser["pubkey"])
except sqlite3.Error as e:
print(f"Could not write '{args.name}' to database: {e}")
print(f"'{args.user}'s Name changed to '{args.name}'.")
# --> --email
if args.email:
if not lib.Validator.checkEmail(args.email):
print(f"'{args.email}' is not a valid Mail address!")
print(f"Could not update Users status in database")
exit(1)
try:
DB.safequery("UPDATE `applications` SET `email` =? WHERE `username` =?", tuple([args.email]))
except sqlite3.Error as e:
print(f"Could not write '{args.email}' to the database. {e}")
print(f"'{args.user}' Mail changed to '{args.email}'.")
# --> --status
if args.status is not None:
if args.status != 0 and args.status != 1:
print("Only 0 and 1 are valid status, where 1 is activated and 0 is unapproved.")
exit(0)
# just takes first result out of the dict
if args.status == int(CurrentUser["status"]):
print(f"New and old status are the same.")
if args.status == 0 and int(CurrentUser["status"]) == 1:
try:
DB.safequery("UPDATE `applications` SET `status` =? WHERE `id`=?",
tuple([args.status, CurrentUser["id"]]))
sys_ctl.remove_user()
except sqlite3.Error as e:
print(f"Could not update database entry for '{args.user}', did not touch the system")
exit(1)
except lib.UserExceptions.UnknownReturnCode as e:
print(f"Could not remove '{args.user}' from the system, unknown return code: {e}. DB is modified.")
exit(1)
print(f"Successfully changed '{args.user}'s status to 0 and cleared from the system.")
if args.status == 1 and int(CurrentUser["status"]) == 0:
try:
DB.safequery("UPDATE `applications` SET `status`=? WHERE `username`=?",
tuple([args.status, args.user]))
sys_ctl.aio_approve(CurrentUser["pubkey"])
except sqlite3.Error as e:
print(f"Could not update Users status in database")
exit(1)
except lib.UserExceptions.General as ChangeUser:
print(f"Some chain in the cattle just slipped away, my lord! {ChangeUser}")
exit(1)
print(f"Successfully changed '{args.user}'s status to 1 and created on the system.")
exit(0)
except KeyboardInterrupt as e:
pass
except lib.UserExceptions.General as ChangeUser:
print(f"Some chain in the cattle just slipped away, my lord! {ChangeUser}")
exit(1)
print(f"Successfully changed '{args.user}'s status to 1 and created on the system.")
exit(0)

View file

@ -1,6 +1,7 @@
import os
import subprocess
import pwd
import subprocess
import lib.UserExceptions
@ -136,7 +137,7 @@ class System:
os.chown(ssh_dir + "authorized_keys", pwd.getpwnam(self.user)[2], pwd.getpwnam(self.user)[3])
except OSError as e: # by os.chown
raise lib.UserExceptions.ModifyFilesystem(
f"Could not chown {ssh_dir} and/or authorized_keys to {self.user} and their group, Exception: {e}",)
f"Could not chown {ssh_dir} and/or authorized_keys to {self.user} and their group, Exception: {e}", )
except KeyError as e: # by PWD
raise lib.UserExceptions.General(f"PWD can't find {self.user}: {e}")
return True
@ -201,7 +202,7 @@ class System:
rt = subprocess.call(cc)
if rt != 0:
raise lib.UserExceptions.UnknownReturnCode(
f"Could not add user '{self.user}' to group '{group}' with command '{cc}', returned '{rt}'",)
f"Could not add user '{self.user}' to group '{group}' with command '{cc}', returned '{rt}'", )
return True
@staticmethod
@ -242,7 +243,7 @@ class System:
if ret.returncode != 0 and ret.returncode != 6: # userdel returns 6 when no mail dir was found but success
raise lib.UserExceptions.UnknownReturnCode(
f"Could not delete user with command {cc}. Return code: {ret.returncode},"
f" stdout/stderr: {stdio+err_io}")
f" stdout/stderr: {stdio + err_io}")
return True

View file

@ -2,18 +2,24 @@ class General(Exception):
pass
class UnknownUser(General):
class User(General):
pass
class UnknownUser(User):
def __init__(self, name):
Exception.__init__(self, f"Tried to perform action on unknown user '{name}'")
class UserExistsAlready(User):
def __init__(self, name):
Exception.__init__(self, f"User '{name}' is already registered")
class UnknownReturnCode(General):
pass
class UserExistsAlready(UnknownReturnCode):
pass
class ModifyFilesystem(General):
pass
@ -26,10 +32,6 @@ class SQLiteDatabaseDoesntExistYet(General):
pass
class User(Exception):
pass
class UsernameLength(User):
pass

View file

@ -1,7 +1,8 @@
import re
import pwd
import lib.sqlitedb
import csv
import pwd
import re
import lib.sqlitedb
def checkUsernameCharacters(username: str) -> bool:
@ -54,9 +55,8 @@ def checkUserExists(username: str) -> bool:
try:
pwd.getpwnam(username)
except KeyError:
return True # User already exists
else:
return False
return True # User already exists
def checkUserInDB(username: str, db: str) -> bool:
@ -203,7 +203,7 @@ def checkImportFile(path: str, db: str):
if not lib.Validator.checkEmail(row["email"]):
errstr += f"Line {ln}: E-Mail address of user '{row['username']}' '{row['email']}' is not valid.\n"
valid = False
if not lib.Validator.checkUserExists(row["username"]) or checkUserInDB(row["username"], db):
if lib.Validator.checkUserExists(row["username"]) or checkUserInDB(row["username"], db):
errstr += f"Line {ln}: User '{row['username']}' already exists.\n"
valid = False
if not lib.Validator.checkDatetimeFormat(row["timestamp"]):