Validate Timestamps and don't not insert into DB when error
Validates now the timestamp in the import.py, doesn't insert users into the sqlite-database before creating the systems account and checks now for existence in the database too(unapproved users, who comes first, gets his name first..)
This commit is contained in:
parent
3ae497e2eb
commit
2ab27aa019
2 changed files with 42 additions and 9 deletions
|
@ -23,22 +23,26 @@ def ImportFromFile(fname: str = CFG.args.file, db: str = CFG.REG_FILE, userids:
|
|||
sql = lib.sqlitedb.SQLitedb(CFG.REG_FILE)
|
||||
reader = csv.DictReader(f) # @TODO csv.Sniffer to compare? When yes, give force-accept option
|
||||
for row in reader:
|
||||
db_insert = False
|
||||
# if any of this fails move on to the next user, just print a relatively helpful message lel
|
||||
if not lib.validator.checkUsernameCharacters(row["username"]):
|
||||
print(f"The username contains unsupported characters or starts with a number: "
|
||||
f"{row['username']}")
|
||||
f"{row['username']}. Skipping.")
|
||||
continue
|
||||
if not lib.validator.checkUsernameLength(row["username"]):
|
||||
print(f"The username {row['username']} is either too long(>16) or short(<3).")
|
||||
print(f"The username {row['username']} is either too long(>16) or short(<3). Skipping.")
|
||||
continue
|
||||
if not lib.validator.checkSSHKey(row["pubkey"]):
|
||||
print(f"Following SSH-Key isn't valid: {row['pubkey']}")
|
||||
print(f"Following SSH-Key isn't valid: {row['pubkey']}. Skipping.")
|
||||
continue
|
||||
if not lib.validator.checkEmail(row["email"]):
|
||||
print(f"The E-Mail address {row['email']} is not valid.")
|
||||
print(f"The E-Mail address {row['email']} is not valid. Skipping")
|
||||
continue
|
||||
if lib.validator.checkUserExists(row["username"]):
|
||||
print(f"The user '{row['username']}' already exists.")
|
||||
if not lib.validator.checkUserExists(row["username"]):
|
||||
print(f"The user '{row['username']}' already exists. Skipping.")
|
||||
continue
|
||||
if not lib.validator.checkDatetimeFormat(row["timestamp"]):
|
||||
print(f"The timestamp '{row['timestamp']}' from user '{row['username']}' is invalid. Skipping.")
|
||||
continue
|
||||
if row["status"] == "1":
|
||||
try:
|
||||
|
@ -59,9 +63,13 @@ def ImportFromFile(fname: str = CFG.args.file, db: str = CFG.REG_FILE, userids:
|
|||
except Exception as E: # @TODO well less broad is hard to achieve Kappa
|
||||
print(E)
|
||||
continue
|
||||
db_insert = True
|
||||
elif row["status"] == "0":
|
||||
print(row['username'] + " not approved, therefore not registered.")
|
||||
db_insert = True
|
||||
try:
|
||||
if not db_insert:
|
||||
continue
|
||||
sql.safequery(
|
||||
"INSERT INTO `applications` (username, name, timestamp, email, pubkey, status) "
|
||||
"VALUES (?,?,?,?,?,?)", tuple([row["username"], row["name"], row["timestamp"],
|
||||
|
@ -70,7 +78,7 @@ def ImportFromFile(fname: str = CFG.args.file, db: str = CFG.REG_FILE, userids:
|
|||
pass
|
||||
print(f"UUFFF, something went WRONG with the file {fname}: {E}")
|
||||
except Exception as didntCatch:
|
||||
print(f"Exception! UNCATCHED! {type(didntCatch)}")
|
||||
print(f"Exception! UNCATCHED! {type(didntCatch)}: {didntCatch}")
|
||||
return True
|
||||
|
||||
|
||||
|
@ -83,6 +91,7 @@ if __name__ == "__main__":
|
|||
if not CFG.args.file:
|
||||
print("You MUST set a CSV-file with the -f/--file flag that already exist")
|
||||
exit(1)
|
||||
ImportFromFile()
|
||||
exit(0)
|
||||
except KeyboardInterrupt as e:
|
||||
pass
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
import re
|
||||
import pwd
|
||||
import lib.sqlitedb
|
||||
import lib.CFG as CFG
|
||||
|
||||
|
||||
def checkUsernameCharacters(username: str):
|
||||
|
@ -23,9 +25,22 @@ def checkUserExists(username: str):
|
|||
try:
|
||||
pwd.getpwnam(username)
|
||||
except KeyError:
|
||||
return False # User already exists
|
||||
return True # User already exists
|
||||
else:
|
||||
return True # User doesnt exist
|
||||
if checkUserInDB(username):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def checkUserInDB(username: str):
|
||||
try:
|
||||
L = lib.sqlitedb.SQLitedb(CFG.REG_FILE)
|
||||
fetched = L.safequery("SELECT * FROM 'applications' WHERE username = ?", tuple([username]))
|
||||
if fetched:
|
||||
return True
|
||||
except lib.sqlitedb.sqlite3.Error as e:
|
||||
print(f"SQLite Exception: {e}")
|
||||
return False
|
||||
|
||||
|
||||
def checkSSHKey(key: str):
|
||||
|
@ -52,3 +67,12 @@ def checkEmail(mail: str):
|
|||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
|
||||
def checkDatetimeFormat(form: str):
|
||||
import datetime
|
||||
try:
|
||||
datetime.datetime.strptime(form, "%Y-%m-%d %H:%M:%S")
|
||||
except ValueError:
|
||||
return False
|
||||
return True
|
||||
|
|
Loading…
Reference in a new issue