|
|
|
@ -7,46 +7,47 @@ import lib.UserExceptions
|
|
|
|
|
import lib.uis.config_ui # dont go to default, just following -c flag
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def import_from_file(fname: str, db: str, userids: tuple = tuple([])) -> bool:
|
|
|
|
|
def import_from_file(file_path: str, db: str, user_ids: tuple = tuple([])) -> bool:
|
|
|
|
|
""" Imports Users from a given CSV-file to the system and DB
|
|
|
|
|
|
|
|
|
|
:param fname:
|
|
|
|
|
:type fname: str
|
|
|
|
|
:param file_path:
|
|
|
|
|
:type file_path: str
|
|
|
|
|
:param db: Path to the sqlite db
|
|
|
|
|
:type db: str
|
|
|
|
|
:param userids: FIXME: Tuple which userids should we write
|
|
|
|
|
:type userids: tuple
|
|
|
|
|
:param user_ids: FIXME: Tuple which user_ids should we write
|
|
|
|
|
:type user_ids: tuple
|
|
|
|
|
:return: True on success, False when not
|
|
|
|
|
:rtype: bool
|
|
|
|
|
"""
|
|
|
|
|
if not os.path.isfile(fname):
|
|
|
|
|
print(f"File {fname} don't exist")
|
|
|
|
|
if not os.path.isfile(file_path):
|
|
|
|
|
print(f"File {file_path} don't exist")
|
|
|
|
|
return False
|
|
|
|
|
if not os.path.isfile(db):
|
|
|
|
|
print(f"The database file {db} don't exist")
|
|
|
|
|
return False
|
|
|
|
|
if userids:
|
|
|
|
|
if user_ids:
|
|
|
|
|
pass # empty tuple means everything
|
|
|
|
|
# noinspection PyBroadException
|
|
|
|
|
try:
|
|
|
|
|
with open(fname, 'r', newline='') as f:
|
|
|
|
|
with open(file_path, 'r', newline='') as f:
|
|
|
|
|
import lib.validator
|
|
|
|
|
sql = lib.sqlitedb.SQLitedb(db)
|
|
|
|
|
err = lib.validator.checkImportFile(fname, db)
|
|
|
|
|
err = lib.validator.checkImportFile(file_path, db)
|
|
|
|
|
if err is not True:
|
|
|
|
|
print(err)
|
|
|
|
|
exit(0)
|
|
|
|
|
import lib.sqlitedb
|
|
|
|
|
import lib.System
|
|
|
|
|
sysctl = lib.System.System()
|
|
|
|
|
sys_ctl = lib.System.System("root")
|
|
|
|
|
reader = csv.DictReader(f) # @TODO csv.Sniffer to compare? When yes, give force-accept option
|
|
|
|
|
for row in reader:
|
|
|
|
|
if row["status"] == "1":
|
|
|
|
|
try:
|
|
|
|
|
sysctl.register(row["username"])
|
|
|
|
|
sysctl.lock_user_pw(row["username"])
|
|
|
|
|
sysctl.add_to_usergroup(row["username"])
|
|
|
|
|
sysctl.make_ssh_usable(row["username"], row["pubkey"])
|
|
|
|
|
sys_ctl.setUser(row["username"])
|
|
|
|
|
sys_ctl.register()
|
|
|
|
|
sys_ctl.lock_user_pw()
|
|
|
|
|
sys_ctl.add_to_usergroup()
|
|
|
|
|
sys_ctl.make_ssh_usable(row["pubkey"])
|
|
|
|
|
print(row['username'], "====> Registered.")
|
|
|
|
|
except lib.UserExceptions.UserExistsAlready as UEA:
|
|
|
|
|
pass # @TODO User was determined to exists already, shouldn't happen but is possible
|
|
|
|
@ -69,7 +70,7 @@ def import_from_file(fname: str, db: str, userids: tuple = tuple([])) -> bool:
|
|
|
|
|
row["email"], row["pubkey"], row["status"]]))
|
|
|
|
|
except OSError as E:
|
|
|
|
|
pass
|
|
|
|
|
print(f"UUFFF, something went WRONG with the file {fname}: {E}")
|
|
|
|
|
print(f"UUFFF, something went WRONG with the file {file_path}: {E}")
|
|
|
|
|
except Exception as didntCatch:
|
|
|
|
|
print(f"Exception! UNCATCHED! {type(didntCatch)}: {didntCatch}")
|
|
|
|
|
return True
|
|
|
|
|