forked from tilde/ssh-reg
You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
96 lines
3.8 KiB
Python
96 lines
3.8 KiB
Python
5 years ago
|
#!/usr/bin/env python3
|
||
4 years ago
|
"""
|
||
|
Import users or sanitize backup files
|
||
|
"""
|
||
5 years ago
|
import configparser
|
||
5 years ago
|
import csv
|
||
|
import os
|
||
5 years ago
|
|
||
5 years ago
|
import lib.UserExceptions
|
||
5 years ago
|
import lib.uis.config_ui # dont go to default, just following -c flag
|
||
4 years ago
|
import lib.Validator
|
||
5 years ago
|
|
||
5 years ago
|
|
||
4 years ago
|
def import_from_file(file_path: str, database_file: str, user_ids: tuple = tuple([])) -> bool:
|
||
5 years ago
|
""" Imports Users from a given CSV-file to the system and DB
|
||
|
|
||
5 years ago
|
:param file_path:
|
||
|
:type file_path: str
|
||
5 years ago
|
:param db: Path to the sqlite db
|
||
|
:type db: str
|
||
5 years ago
|
:param user_ids: FIXME: Tuple which user_ids should we write
|
||
|
:type user_ids: tuple
|
||
5 years ago
|
:return: True on success, False when not
|
||
|
:rtype: bool
|
||
|
"""
|
||
5 years ago
|
if not os.path.isfile(file_path):
|
||
|
print(f"File {file_path} don't exist")
|
||
5 years ago
|
return False
|
||
4 years ago
|
if not os.path.isfile(database_file):
|
||
|
print(f"The database file {database_file} don't exist")
|
||
5 years ago
|
return False
|
||
5 years ago
|
if user_ids:
|
||
5 years ago
|
pass # empty tuple means everything
|
||
|
# noinspection PyBroadException
|
||
|
try:
|
||
5 years ago
|
with open(file_path, 'r', newline='') as f:
|
||
4 years ago
|
import lib.sqlitedb
|
||
|
import lib.System
|
||
|
sql = lib.sqlitedb.SQLiteDB(database_file)
|
||
|
err = lib.Validator.checkImportFile(file_path, database_file)
|
||
5 years ago
|
if err is not True:
|
||
|
print(err)
|
||
|
exit(0)
|
||
5 years ago
|
sys_ctl = lib.System.System("root")
|
||
5 years ago
|
reader = csv.DictReader(f) # @TODO csv.Sniffer to compare? When yes, give force-accept option
|
||
|
for row in reader:
|
||
|
if row["status"] == "1":
|
||
|
try:
|
||
5 years ago
|
sys_ctl.setUser(row["username"])
|
||
5 years ago
|
sys_ctl.aio_approve(row["pubkey"])
|
||
5 years ago
|
print(row['username'], "====> Registered.")
|
||
4 years ago
|
except lib.UserExceptions.General as general_except:
|
||
|
print(f"Something didnt work out! {general_except}")
|
||
5 years ago
|
elif row["status"] == "0":
|
||
|
print(row['username'] + " not approved, therefore not registered.")
|
||
|
try:
|
||
|
sql.safequery(
|
||
|
"INSERT INTO `applications` (username, name, timestamp, email, pubkey, status) "
|
||
|
"VALUES (?,?,?,?,?,?)", tuple([row["username"], row["name"], row["timestamp"],
|
||
|
row["email"], row["pubkey"], row["status"]]))
|
||
4 years ago
|
except OSError as os_exception:
|
||
|
print(f"UUFFF, something went WRONG with the file {file_path}: {os_exception}")
|
||
|
except Exception as didnt_catch:
|
||
|
print(f"Exception! UNCATCHED! {type(didnt_catch)}: {didnt_catch}")
|
||
5 years ago
|
return True
|
||
|
|
||
|
|
||
|
if __name__ == "__main__":
|
||
5 years ago
|
|
||
|
ArgParser = lib.uis.config_ui.argparser
|
||
|
ArgParser.description += "- Imports a CSV file consisting of user specific details to the database"
|
||
|
ArgParser.add_argument('-f', '--file', default="stdout",
|
||
|
type=str, help='Import from CSV file', required=True)
|
||
|
ArgParser.add_argument('--Import', default=False, action="store_true",
|
||
4 years ago
|
help="Import Users. If not set, just sanitize the supplied csv", required=False)
|
||
5 years ago
|
args = ArgParser.parse_args()
|
||
|
config = configparser.ConfigParser()
|
||
|
config.read(args.config)
|
||
5 years ago
|
|
||
|
if not args.file:
|
||
|
print("Error, need the import file")
|
||
4 years ago
|
if not args.Import:
|
||
|
# we assume that you just want to sanitize the backup
|
||
|
if not os.path.isfile(args.file):
|
||
|
print(f"File {args.file} doesnt exist")
|
||
5 years ago
|
exit(1)
|
||
4 years ago
|
sanitized = lib.Validator.checkImportFile(args.file, config['DEFAULT']['applications_db'])
|
||
|
if sanitized is not True:
|
||
|
print(sanitized)
|
||
|
else:
|
||
|
print(f"{args.file} is valid!")
|
||
|
exit(0)
|
||
|
elif args.Import:
|
||
|
import_from_file(args.file, config['DEFAULT']['applications_db'])
|
||
5 years ago
|
exit(0)
|