Merge remote-tracking branch 'origin/pr/4'
This commit is contained in:
commit
dd5b4043be
17 changed files with 1263 additions and 43 deletions
4
.gitignore
vendored
Normal file
4
.gitignore
vendored
Normal file
|
@ -0,0 +1,4 @@
|
|||
__pycache__/
|
||||
.idea/
|
||||
test*
|
||||
|
25
Dockerfile
25
Dockerfile
|
@ -8,21 +8,6 @@ RUN apt-get update &&\
|
|||
|
||||
# Clean up APT when done.
|
||||
RUN apt-get clean && rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
|
||||
|
||||
|
||||
# private/{scripts, administrate.py}, public/{scripts, userapplications.py}, config/userapplicatonsconfig.ini
|
||||
#configs, logs, db
|
||||
COPY config/applicationsconfig.ini /app/data/applicationsconfig.ini
|
||||
|
||||
# admin scripts
|
||||
COPY private/ /app/admin/
|
||||
|
||||
# user accessible scripts
|
||||
# Make TILDE_ENV
|
||||
COPY public/ /app/user/
|
||||
#SSH config into /etc :)
|
||||
COPY config/etc /etc
|
||||
|
||||
# create user for applications
|
||||
RUN useradd -Md /app/user/ -s /app/user/userapplication.py tilde
|
||||
|
||||
|
@ -34,14 +19,20 @@ RUN usermod -U tilde
|
|||
RUN useradd -Md /app/admin -s /app/admin/administrate.py admin
|
||||
# privilege separation directory
|
||||
RUN mkdir -p /var/run/sshd
|
||||
|
||||
# expose SSH port
|
||||
EXPOSE 22
|
||||
|
||||
ENV TILDE_CONF="/app/data/applicationsconfig.ini"
|
||||
RUN touch /app/data/applications.sqlite
|
||||
RUN touch /app/data/applications.log
|
||||
# Doesnt work, @TODO why
|
||||
#RUN setfacl -R -m u:tilde:rwx /app/data/
|
||||
RUN chown -R tilde /app/data
|
||||
# admin scripts
|
||||
COPY private/ /app/admin/
|
||||
|
||||
# user accessible scripts
|
||||
# Make TILDE_ENV
|
||||
COPY public/ /app/user/
|
||||
RUN mkdir /app/user/.ssh
|
||||
CMD ["sh", "-c", " echo TILDE_CONF=$TILDE_CONF > /app/user/.ssh/environment && /usr/sbin/sshd -D"]
|
||||
CMD ["sh", "-c", " echo TILDE_CONF=$TILDE_CONF > /app/user/.ssh/environment && exec /usr/sbin/sshd -D"]
|
||||
|
|
129
private/Backup.py
Executable file
129
private/Backup.py
Executable file
|
@ -0,0 +1,129 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
import configparser
|
||||
import csv
|
||||
import io
|
||||
|
||||
import ListUsers
|
||||
import lib.uis.default as default_cmd # Follows -u, -a, -f flags
|
||||
|
||||
|
||||
class Backup:
|
||||
"""Backups a Tilde database to an CSV file
|
||||
|
||||
:Example:
|
||||
>>> from Backup import Backup
|
||||
>>> from ListUsers import ListUsers
|
||||
>>> L = ListUsers.ListUsers("/path/to/sqlite").get_fetch()
|
||||
>>> backup_db = Backup("stdout")
|
||||
>>> backup_db.backup_to_file(L)
|
||||
CSV-Separated list with headers in first row
|
||||
|
||||
"""
|
||||
|
||||
filename: str
|
||||
quoting: int
|
||||
dialect: str
|
||||
field_names: tuple
|
||||
|
||||
def __init__(self, output: str, quoting: int = csv.QUOTE_NONNUMERIC, dialect: str = "excel"):
|
||||
""" Constructs the Backup object
|
||||
|
||||
:param output: File name to backup to(set to stdout for stdout)
|
||||
:type output: str
|
||||
:param quoting: Set quoting for CSV Module
|
||||
:type quoting: int
|
||||
:param dialect: Set the CSV-Dialect. Defaults to excel, which is the classic CSV
|
||||
:type dialect: str
|
||||
"""
|
||||
|
||||
self.setFilename(output)
|
||||
self.setQuoting(quoting)
|
||||
self.setDialect(dialect)
|
||||
self.setFieldnames(tuple(['id', 'username', 'email', 'name', 'pubkey', 'timestamp', 'status']))
|
||||
|
||||
def setDialect(self, dialect: str) -> None:
|
||||
""" Set dialect for Object
|
||||
|
||||
:param dialect: Dialect to set for Object
|
||||
:type dialect: str
|
||||
:return: None
|
||||
:rtype: None
|
||||
"""
|
||||
|
||||
self.dialect = dialect
|
||||
|
||||
def setQuoting(self, quoting: int) -> None:
|
||||
""" Set quoting in the CSV(must be supported by the CSV Module!)
|
||||
|
||||
:param quoting: Quoting Integer given by csv.QUOTE_* constants
|
||||
:type quoting: int
|
||||
:return: None
|
||||
:rtype: None
|
||||
"""
|
||||
|
||||
self.quoting = quoting
|
||||
|
||||
def setFilename(self, filename: str) -> None:
|
||||
""" Sets Filename to output to
|
||||
|
||||
:param filename: Filename to output to(set stdout for stdout)
|
||||
:type filename: str
|
||||
:return: None
|
||||
:rtype: None
|
||||
"""
|
||||
|
||||
self.filename = filename
|
||||
|
||||
def setFieldnames(self, f_names: tuple) -> None:
|
||||
""" Set fieldname to process
|
||||
|
||||
:param f_names: Fieldnames-Tuple
|
||||
:type f_names: tuple
|
||||
:return: None
|
||||
:rtype: None
|
||||
"""
|
||||
|
||||
self.field_names = f_names
|
||||
|
||||
def backup_to_file(self, fetched: list) -> bool:
|
||||
"""Backup Userlist to File(or stdout)
|
||||
|
||||
:param fetched: List of values to write out CSV-formatted
|
||||
:return: True, if success, None when not.
|
||||
:rtype: bool
|
||||
"""
|
||||
|
||||
returner = io.StringIO()
|
||||
write_csv = csv.DictWriter(returner, fieldnames=self.field_names, quoting=self.quoting, dialect=self.dialect)
|
||||
write_csv.writeheader()
|
||||
for row in fetched:
|
||||
write_csv.writerow(dict(row))
|
||||
# sqlite3.Row doesn't "easily" convert to a dict itself sadly, so just a quick help from us here
|
||||
# it actually even delivers a list(sqlite3.Row) also, which doesnt make the life a whole lot easier
|
||||
|
||||
if self.filename == "stdout":
|
||||
print(returner.getvalue())
|
||||
return True
|
||||
else:
|
||||
with open(self.filename, "w") as f:
|
||||
print(returner.getvalue(), file=f)
|
||||
return True
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
default_cmd.argparser.description += " - Backups Tilde Users to stdout or a file."
|
||||
args = default_cmd.argparser.parse_args()
|
||||
config = configparser.ConfigParser()
|
||||
config.read(args.config)
|
||||
L = ListUsers.ListUsers(config['DEFAULT']['applications_db'],
|
||||
unapproved=args.unapproved, approved=args.approved)
|
||||
fetch = L.get_fetch()
|
||||
if fetch:
|
||||
B = Backup(args.file)
|
||||
B.setFieldnames(fetch[0].keys()) # sqlite3.row delivers its keys for us! SO NICE!
|
||||
B.backup_to_file(fetch)
|
||||
else:
|
||||
print("nothing to backup!")
|
||||
exit(1)
|
||||
exit(0)
|
87
private/Import.py
Executable file
87
private/Import.py
Executable file
|
@ -0,0 +1,87 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
import configparser
|
||||
import csv
|
||||
import os
|
||||
|
||||
import lib.UserExceptions
|
||||
import lib.uis.config_ui # dont go to default, just following -c flag
|
||||
|
||||
|
||||
def import_from_file(file_path: str, db: str, user_ids: tuple = tuple([])) -> bool:
|
||||
""" Imports Users from a given CSV-file to the system and DB
|
||||
|
||||
:param file_path:
|
||||
:type file_path: str
|
||||
:param db: Path to the sqlite db
|
||||
:type db: str
|
||||
:param user_ids: FIXME: Tuple which user_ids should we write
|
||||
:type user_ids: tuple
|
||||
:return: True on success, False when not
|
||||
:rtype: bool
|
||||
"""
|
||||
if not os.path.isfile(file_path):
|
||||
print(f"File {file_path} don't exist")
|
||||
return False
|
||||
if not os.path.isfile(db):
|
||||
print(f"The database file {db} don't exist")
|
||||
return False
|
||||
if user_ids:
|
||||
pass # empty tuple means everything
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
with open(file_path, 'r', newline='') as f:
|
||||
import lib.Validator
|
||||
sql = lib.sqlitedb.SQLiteDB(db)
|
||||
err = lib.Validator.checkImportFile(file_path, db)
|
||||
if err is not True:
|
||||
print(err)
|
||||
exit(0)
|
||||
import lib.sqlitedb
|
||||
import lib.System
|
||||
sys_ctl = lib.System.System("root")
|
||||
reader = csv.DictReader(f) # @TODO csv.Sniffer to compare? When yes, give force-accept option
|
||||
for row in reader:
|
||||
if row["status"] == "1":
|
||||
try:
|
||||
sys_ctl.setUser(row["username"])
|
||||
sys_ctl.aio_approve(row["pubkey"])
|
||||
print(row['username'], "====> Registered.")
|
||||
except lib.UserExceptions.General as GeneralExcept:
|
||||
print(f"Something didnt work out! {GeneralExcept}")
|
||||
elif row["status"] == "0":
|
||||
print(row['username'] + " not approved, therefore not registered.")
|
||||
try:
|
||||
sql.safequery(
|
||||
"INSERT INTO `applications` (username, name, timestamp, email, pubkey, status) "
|
||||
"VALUES (?,?,?,?,?,?)", tuple([row["username"], row["name"], row["timestamp"],
|
||||
row["email"], row["pubkey"], row["status"]]))
|
||||
except OSError as E:
|
||||
pass
|
||||
print(f"UUFFF, something went WRONG with the file {file_path}: {E}")
|
||||
except Exception as didntCatch:
|
||||
print(f"Exception! UNCATCHED! {type(didntCatch)}: {didntCatch}")
|
||||
return True
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
ArgParser = lib.uis.config_ui.argparser
|
||||
ArgParser.description += "- Imports a CSV file consisting of user specific details to the database"
|
||||
ArgParser.add_argument('-f', '--file', default="stdout",
|
||||
type=str, help='Import from CSV file', required=True)
|
||||
ArgParser.add_argument('--Import', default=False, action="store_true",
|
||||
help="Import Users.", required=True)
|
||||
args = ArgParser.parse_args()
|
||||
config = configparser.ConfigParser()
|
||||
config.read(args.config)
|
||||
|
||||
if not args.Import:
|
||||
print("Error, need the import flag")
|
||||
if not args.file:
|
||||
print("Error, need the import file")
|
||||
if not args.file:
|
||||
print("You MUST set a CSV-file with the -f/--file flag that already exist")
|
||||
exit(1)
|
||||
import_from_file(args.file, config['DEFAULT']['applications_db'])
|
||||
exit(0)
|
120
private/ListUsers.py
Executable file
120
private/ListUsers.py
Executable file
|
@ -0,0 +1,120 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
import configparser
|
||||
import sqlite3 # sqlite3.Row-Object
|
||||
from typing import List # Typing support!
|
||||
|
||||
import lib.uis.default as default_cmd # Follows -u, -a, -f flags
|
||||
from lib.sqlitedb import SQLiteDB
|
||||
|
||||
|
||||
class ListUsers:
|
||||
db = None
|
||||
usersFetch = None
|
||||
|
||||
def __init__(self, db: str, unapproved: bool = False, approved: bool = True, single_user: str = None):
|
||||
"""Constructs ListUsers
|
||||
|
||||
:param db: Database to access
|
||||
:type db: str
|
||||
:param unapproved: only List unapproved users
|
||||
:type unapproved: bool
|
||||
:param approved: only list approved users
|
||||
:type approved: bool
|
||||
"""
|
||||
|
||||
self.db = SQLiteDB(db)
|
||||
if unapproved: # only unapproved users
|
||||
query = "SELECT * FROM `applications` WHERE `status` = '0'"
|
||||
elif approved: # Approved users
|
||||
query = "SELECT * FROM `applications` WHERE `status` = '1'"
|
||||
else: # All users
|
||||
query = "SELECT * FROM `applications`"
|
||||
self.usersFetch = self.db.query(query)
|
||||
if single_user is not None:
|
||||
query = "SELECT * FROM `applications` WHERE `username` = ?"
|
||||
self.usersFetch = self.db.safequery(query, tuple([single_user]))
|
||||
|
||||
def output_as_list(self) -> str:
|
||||
"""Generates a string with one (approved) user per line and one newline at the end
|
||||
|
||||
:rtype: str
|
||||
:return: String consisting with one(activated) user per line
|
||||
"""
|
||||
|
||||
list_str: str = ""
|
||||
query = "SELECT `username` FROM `applications` WHERE `status` = '1' ORDER BY timestamp ASC"
|
||||
self.usersFetch = self.db.query(query)
|
||||
for users in self.usersFetch:
|
||||
list_str += users["username"] + "\n"
|
||||
return list_str
|
||||
|
||||
def prettyPrint(self) -> None:
|
||||
pass # see below why not implemented yet, texttable...
|
||||
|
||||
def get_fetch(self) -> List[sqlite3.Row]:
|
||||
""" Returns a complete fetch done by the lib.sqlitedb-class
|
||||
|
||||
:return: Complete fetchall(). A List[sqlite3.Row] with dict-emulation objects.
|
||||
:rtype: List[sqlite3.Row]
|
||||
"""
|
||||
|
||||
return self.usersFetch
|
||||
|
||||
|
||||
# @TODO MAYBE best solution: https://pypi.org/project/texttable/
|
||||
# examle:
|
||||
"""
|
||||
from texttable import Texttable
|
||||
t = Texttable()
|
||||
t.add_rows([['Name', 'Age'], ['Alice', 24], ['Bob', 19]])
|
||||
print(t.draw())
|
||||
---------------> Results in:
|
||||
|
||||
+-------+-----+
|
||||
| Name | Age |
|
||||
+=======+=====+
|
||||
| Alice | 24 |
|
||||
+-------+-----+
|
||||
| Bob | 19 |
|
||||
+-------+-----+
|
||||
|
||||
for user in fetch:
|
||||
print("ID: {}; Username: \"{}\"; Mail: {}; Name: \"{}\"; Registered: {}; Status: {}".format(
|
||||
user["id"], user["username"], user["email"], user["name"], user["timestamp"], user["status"]
|
||||
))
|
||||
"""
|
||||
if __name__ == "__main__":
|
||||
default_cmd.argparser.description += " - Lists Users from the Tilde database."
|
||||
default_cmd.argparser.add_argument('--list-asc', default=False, action="store_true",
|
||||
help='Output a newline seperated list of users', required=False, dest="args_asc")
|
||||
default_cmd.argparser.add_argument('--user', default=None, type=str,
|
||||
help="Just show a specific user by it's name", required=False)
|
||||
args = default_cmd.argparser.parse_args()
|
||||
config = configparser.ConfigParser()
|
||||
config.read(args.config)
|
||||
|
||||
ret = ""
|
||||
if args.user is not None:
|
||||
L = ListUsers(config['DEFAULT']['applications_db'], unapproved=args.unapproved, approved=args.approved,
|
||||
single_user=args.user)
|
||||
else:
|
||||
L = ListUsers(config['DEFAULT']['applications_db'], unapproved=args.unapproved, approved=args.approved)
|
||||
if args.args_asc:
|
||||
ret = L.output_as_list()
|
||||
else:
|
||||
fetch = L.get_fetch()
|
||||
ret += "ID %-1s| Username %-5s| Mail %-20s| Name %-17s| Registered %-8s | State |\n" % (
|
||||
" ", " ", " ", " ", " "
|
||||
)
|
||||
ret += 102 * "-" + "\n"
|
||||
for user in fetch:
|
||||
ret += "%-4i| %-14s| %-25s| %-22s| %-8s | %-5i |\n" % (
|
||||
user["id"], user["username"], user["email"], user["name"], user["timestamp"], user["status"]
|
||||
)
|
||||
if args.file != "stdout":
|
||||
with open(args.file, 'w') as f:
|
||||
print(ret, file=f)
|
||||
else:
|
||||
print(ret)
|
||||
exit(0)
|
147
private/editUsers.py
Executable file
147
private/editUsers.py
Executable file
|
@ -0,0 +1,147 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
import configparser
|
||||
import sqlite3
|
||||
|
||||
import lib.System
|
||||
import lib.UserExceptions
|
||||
import lib.Validator
|
||||
import lib.sqlitedb
|
||||
import lib.uis.config_ui # only follow -c flag
|
||||
|
||||
if __name__ == "__main__":
|
||||
lib.uis.config_ui.argparser.description += " - Edit Tilde Users"
|
||||
ArgParser = lib.uis.config_ui.argparser
|
||||
ArgParser.add_argument('--user', type=str,
|
||||
help='Tilde users name to edit', required=True)
|
||||
|
||||
Mutually = ArgParser.add_mutually_exclusive_group()
|
||||
Mutually.add_argument('-r', '--remove', default=False, action="store_true",
|
||||
help='Remove an approved/unapproved User from the system(and DB). Effectively purges him.',
|
||||
required=False)
|
||||
Mutually.add_argument("--verify", default=True, action="store_false",
|
||||
help="Turns off value checks",
|
||||
required=False)
|
||||
|
||||
ArgParser.add_argument('--sshpubkey', type=str, default=None,
|
||||
help="Stores the new given SSH-Key in given user", required=False)
|
||||
ArgParser.add_argument('--name', type=str, default=None,
|
||||
help="Sets the stored name of the given user")
|
||||
ArgParser.add_argument('--username', type=str, default=None,
|
||||
help="Rename given User")
|
||||
ArgParser.add_argument('--email', type=str, default=None,
|
||||
help="Set new email address for given user")
|
||||
ArgParser.add_argument('--status', type=int, default=None,
|
||||
help="Set status of given user")
|
||||
args = ArgParser.parse_args()
|
||||
config = configparser.ConfigParser()
|
||||
config.read(args.config)
|
||||
db = config['DEFAULT']['applications_db']
|
||||
if not args.sshpubkey and not args.name and not args.username and not args.email and args.status is None \
|
||||
and not args.remove:
|
||||
print(f"Well, SOMETHING must be done with {args.user} ;-)")
|
||||
exit(1)
|
||||
# --> --user
|
||||
if not lib.Validator.checkUserInDB(args.user, db):
|
||||
print(f"User {args.user} does not exist in the database.")
|
||||
exit(1)
|
||||
DB = lib.sqlitedb.SQLiteDB(db)
|
||||
sys_ctl = lib.System.System(args.user)
|
||||
if not DB:
|
||||
print("Could not establish connection to database")
|
||||
exit(1)
|
||||
CurrentUser = DB.safequery("SELECT * FROM `applications` WHERE `username`=?", tuple([args.user]))[0]
|
||||
|
||||
# --> --remove
|
||||
if args.remove:
|
||||
print(f"Removing {args.user} from the system and the database...")
|
||||
try:
|
||||
DB.removeApplicantFromDBperUsername(args.user)
|
||||
print(f"Purged from the DB")
|
||||
if CurrentUser["status"] == 1:
|
||||
sys_ctl.remove_user()
|
||||
print(f"Purged from the system")
|
||||
else:
|
||||
print(f"'{args.user}' was not approved before, therefore not deleting from system itself.")
|
||||
except lib.UserExceptions.General as e:
|
||||
print(f"{e}")
|
||||
exit(1)
|
||||
print(f"Successfully removed '{args.user}'.")
|
||||
exit(0)
|
||||
|
||||
# --> --sshpubkey
|
||||
if args.sshpubkey:
|
||||
if not lib.Validator.checkSSHKey(args.sshpubkey):
|
||||
print(f"Pubkey '{args.sshpubkey}' isn't valid.")
|
||||
exit(1)
|
||||
try:
|
||||
DB.safequery("UPDATE `applications` SET `pubkey`=? WHERE `username`=?",
|
||||
tuple([args.sshpubkey, args.user]))
|
||||
CurrentUser = DB.safequery("SELECT * FROM `applications` WHERE `username` = ? ", tuple([args.user]))[0]
|
||||
if int(CurrentUser["status"]) == 1:
|
||||
sys_ctl.make_ssh_usable(args.sshpubkey)
|
||||
except sqlite3.Error as e:
|
||||
print(f"Something unexpected happened! {e}")
|
||||
exit(1)
|
||||
except lib.UserExceptions.ModifyFilesystem as e:
|
||||
print(f"One action failed during writing the ssh key back to the authorization file. {e}")
|
||||
print(f"'{args.user}'s SSH-Key updated successfully.")
|
||||
|
||||
# --> --name
|
||||
if args.name:
|
||||
if not lib.Validator.checkName(args.name):
|
||||
print(f"'{args.name}' is not a valid Name.")
|
||||
exit(1)
|
||||
try:
|
||||
DB.safequery("UPDATE `applications` SET `name` =? WHERE `username` =?", tuple([args.name, args.user]))
|
||||
except sqlite3.Error as e:
|
||||
print(f"Could not write '{args.name}' to database: {e}")
|
||||
print(f"'{args.user}'s Name changed to '{args.name}'.")
|
||||
|
||||
# --> --email
|
||||
if args.email:
|
||||
if not lib.Validator.checkEmail(args.email):
|
||||
print(f"'{args.email}' is not a valid Mail address!")
|
||||
exit(1)
|
||||
try:
|
||||
DB.safequery("UPDATE `applications` SET `email` =? WHERE `username` =?", tuple([args.email]))
|
||||
except sqlite3.Error as e:
|
||||
print(f"Could not write '{args.email}' to the database. {e}")
|
||||
print(f"'{args.user}' Mail changed to '{args.email}'.")
|
||||
|
||||
# --> --status
|
||||
if args.status is not None:
|
||||
if args.status != 0 and args.status != 1:
|
||||
print("Only 0 and 1 are valid status, where 1 is activated and 0 is unapproved.")
|
||||
exit(0)
|
||||
|
||||
# just takes first result out of the dict
|
||||
if args.status == int(CurrentUser["status"]):
|
||||
print(f"New and old status are the same.")
|
||||
|
||||
if args.status == 0 and int(CurrentUser["status"]) == 1:
|
||||
try:
|
||||
DB.safequery("UPDATE `applications` SET `status` =? WHERE `id`=?",
|
||||
tuple([args.status, CurrentUser["id"]]))
|
||||
sys_ctl.remove_user()
|
||||
except sqlite3.Error as e:
|
||||
print(f"Could not update database entry for '{args.user}', did not touch the system")
|
||||
exit(1)
|
||||
except lib.UserExceptions.UnknownReturnCode as e:
|
||||
print(f"Could not remove '{args.user}' from the system, unknown return code: {e}. DB is modified.")
|
||||
exit(1)
|
||||
print(f"Successfully changed '{args.user}'s status to 0 and cleared from the system.")
|
||||
|
||||
if args.status == 1 and int(CurrentUser["status"]) == 0:
|
||||
try:
|
||||
DB.safequery("UPDATE `applications` SET `status`=? WHERE `username`=?",
|
||||
tuple([args.status, args.user]))
|
||||
sys_ctl.aio_approve(CurrentUser["pubkey"])
|
||||
except sqlite3.Error as e:
|
||||
print(f"Could not update Users status in database")
|
||||
exit(1)
|
||||
except lib.UserExceptions.General as ChangeUser:
|
||||
print(f"Some chain in the cattle just slipped away, my lord! {ChangeUser}")
|
||||
exit(1)
|
||||
print(f"Successfully changed '{args.user}'s status to 1 and created on the system.")
|
||||
exit(0)
|
6
private/lib/CFG.py
Normal file
6
private/lib/CFG.py
Normal file
|
@ -0,0 +1,6 @@
|
|||
import configparser
|
||||
import lib.uis.default as default_cmd
|
||||
|
||||
args = default_cmd.argparser.parse_args()
|
||||
config = configparser.ConfigParser()
|
||||
config.read(args.config)
|
260
private/lib/System.py
Normal file
260
private/lib/System.py
Normal file
|
@ -0,0 +1,260 @@
|
|||
import os
|
||||
import pwd
|
||||
import subprocess
|
||||
|
||||
import lib.UserExceptions
|
||||
|
||||
|
||||
class System:
|
||||
"""Class to interact with the system specifically to support our needs 0w0
|
||||
:Example:
|
||||
>>> from lib.System import System as System
|
||||
>>> Sys_ctl = System("Test", dryrun=True)
|
||||
>>> Sys_ctl.register()
|
||||
>>> Sys_ctl.lock_user_pw()
|
||||
>>> Sys_ctl.add_to_usergroup()
|
||||
>>> Sys_ctl.make_ssh_usable("sshkey")
|
||||
"""
|
||||
|
||||
dry: bool = False
|
||||
create_command = []
|
||||
home: str = ""
|
||||
user: str
|
||||
|
||||
def setUser(self, username: str):
|
||||
self.user = username
|
||||
|
||||
def __init__(self, username: str, dryrun: bool = False, home: str = "/home/"):
|
||||
"""Creates an objects. Can set dry run.
|
||||
|
||||
:param username: Username to manipulate
|
||||
:type username: str
|
||||
:param dryrun: Run all command in a dry-run? When enabled, doesn't make any changes to the system (defaults to
|
||||
false)
|
||||
:type dryrun: bool
|
||||
:param home: Standard directory to search for the home directories of your users(default is /home/)
|
||||
:type home: str
|
||||
:raises:
|
||||
ValueError: if homedir can not be found
|
||||
"""
|
||||
|
||||
self.dry = dryrun
|
||||
if not home.endswith("/"):
|
||||
home += "/"
|
||||
if not os.path.isdir(home):
|
||||
raise ValueError("home should be an existent directory...")
|
||||
self.home = home
|
||||
self.user = username
|
||||
|
||||
def aio_approve(self, pubkey, group="tilde"):
|
||||
""" Executes all neccessary steps to create a user from itself. Raises ALOT of possible exceptions
|
||||
|
||||
:Note: CAREFULL! You MUST except the exceptions!
|
||||
:param pubkey: Users public ssh key
|
||||
:type pubkey: str
|
||||
:param group: User-group. Defaults to tilde
|
||||
:type group: str
|
||||
:return: None
|
||||
:raises:
|
||||
lib.UserExceptions.UserExistsAlready: User Exists already on system
|
||||
lib.UserExceptions.UnknownReturnCode: Unknown Return Code from useradd
|
||||
lib.UserExceptions.SSHDirUncreatable: Users SSH Dir couldnt be created
|
||||
lib.UserExceptions.ModifyFilesystem: Something with CHMOD failed
|
||||
"""
|
||||
self.register()
|
||||
self.lock_user_pw()
|
||||
self.add_to_usergroup(group)
|
||||
self.make_ssh_usable(pubkey)
|
||||
|
||||
def register(self, cc: tuple = tuple(["useradd", "-m"])) -> bool:
|
||||
"""Creates an local account for the given username
|
||||
|
||||
:param cc: Tuple with commands separated to execute on the machine. (defaults to useradd -m)
|
||||
:type cc: tuple
|
||||
:return: True, if worked, raises lib.UserExceptions.UserExistsAlready when not
|
||||
:rtype: bool
|
||||
:raises:
|
||||
lib.UserExceptions.UserExistsAlready: when username specified already exists on the system
|
||||
"""
|
||||
|
||||
create_command = cc
|
||||
cc = create_command + tuple([self.user])
|
||||
if self.dry:
|
||||
self.printTuple(cc)
|
||||
return True
|
||||
elif not self.dry:
|
||||
rt = subprocess.call(cc)
|
||||
if rt != 0:
|
||||
raise lib.UserExceptions.UserExistsAlready(self.user)
|
||||
return True
|
||||
|
||||
def unregister(self) -> bool:
|
||||
""" Just an alias function for removeUser
|
||||
|
||||
:return: True, when success, False(or exception) when not
|
||||
:rtype: bool
|
||||
"""
|
||||
|
||||
return self.remove_user()
|
||||
|
||||
def make_ssh_usable(self, pubkey: str, sshdir: str = ".ssh/") -> bool:
|
||||
""" Make SSH usable for our newly registered user
|
||||
|
||||
:param pubkey: Public SSH Key for the User you want accessible by SSH
|
||||
:type pubkey: str
|
||||
:param sshdir: Directory to write the authorized_keys File to. PWD is $HOME of said user. (defaults to ".ssh/")
|
||||
:type sshdir: str
|
||||
:return: True, if worked, raises lib.UserExceptions.UnknownReturnCode, lib.UserExceptions.HomeDirExistsAlready
|
||||
or lib.UserExceptions.ModifyFilesystem when not
|
||||
:rtype: bool
|
||||
:raises:
|
||||
lib.UserExceptions.SSHDirUncreatable: if the ssh-dir couldnt be created nor exist
|
||||
lib.UserExceptions.ModifyFilesystem: When chmod to .ssh and authorized_keys failed
|
||||
lib.UserExceptions.General: if PWD cant find the specified user
|
||||
"""
|
||||
|
||||
if self.dry:
|
||||
print("Nah, @TODO, but actually kinda too lazy for this lul. Just a lot happening here")
|
||||
return True
|
||||
if not sshdir.endswith("/"):
|
||||
sshdir += "/"
|
||||
ssh_dir = self.home + self.user + "/" + sshdir
|
||||
try:
|
||||
os.mkdir(ssh_dir)
|
||||
except FileExistsError:
|
||||
pass # thats actually a good one for us :D
|
||||
except OSError as e:
|
||||
raise lib.UserExceptions.SSHDirUncreatable(f"Could not create {ssh_dir}: Exception: {e}")
|
||||
|
||||
try:
|
||||
self.write_ssh(pubkey, ssh_dir)
|
||||
except OSError as e:
|
||||
raise lib.UserExceptions.ModifyFilesystem(
|
||||
f"Could not write and/or chmod 0700 {ssh_dir} or {ssh_dir}/authorized_keys, Exception: {e}")
|
||||
try:
|
||||
pwdnam = pwd.getpwnam(self.user)
|
||||
os.chown(ssh_dir, pwdnam[2], pwdnam[3]) # 2=>uid, 3=>gid
|
||||
os.chown(ssh_dir + "authorized_keys", pwd.getpwnam(self.user)[2], pwd.getpwnam(self.user)[3])
|
||||
except OSError as e: # by os.chown
|
||||
raise lib.UserExceptions.ModifyFilesystem(
|
||||
f"Could not chown {ssh_dir} and/or authorized_keys to {self.user} and their group, Exception: {e}", )
|
||||
except KeyError as e: # by PWD
|
||||
raise lib.UserExceptions.General(f"PWD can't find {self.user}: {e}")
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
def write_ssh(key: str, ssh_dir: str) -> None:
|
||||
""" Write SSH key to a specified directory(appends authorized_keys itself!)
|
||||
|
||||
:param key: Key to write
|
||||
:type key: str
|
||||
:param ssh_dir: SSH Directory to write to
|
||||
:type ssh_dir: str
|
||||
:return: None
|
||||
"""
|
||||
|
||||
with open(ssh_dir + "authorized_keys", "w") as f:
|
||||
print(key, file=f)
|
||||
f.close()
|
||||
os.chmod(ssh_dir + "authorized_keys", 0o700) # we dont care about the directory here
|
||||
|
||||
def lock_user_pw(self, cc: tuple = tuple(["usermod", "--lock"])) -> bool:
|
||||
"""Lock a users password so it stays empty
|
||||
|
||||
:param cc: Commands to run in the subprocess to lock it down(defaults to usermod --lock)
|
||||
:type cc: tuple
|
||||
:rtype: bool
|
||||
:return: True, if worked, raises lib.UserExceptions.UnknownReturnCode when not
|
||||
:raises:
|
||||
lib.UserExceptions.UnknownReturnCode: When cc returns something else then 0
|
||||
"""
|
||||
|
||||
lock_command = cc
|
||||
cc = lock_command + tuple([self.user])
|
||||
if self.dry:
|
||||
self.printTuple(cc)
|
||||
return True
|
||||
elif not self.dry:
|
||||
rt = subprocess.call(cc)
|
||||
if rt != 0:
|
||||
raise lib.UserExceptions.UnknownReturnCode(f"Could not lock user '{self.user}'; '{cc}' returned '{rt}'")
|
||||
return True
|
||||
|
||||
def add_to_usergroup(self, group: str = "tilde", cc: tuple = tuple(["usermod", "-a", "-G"])) -> bool:
|
||||
""" Adds a given user to a given group
|
||||
|
||||
:param group: Groupname where you want to add your user to
|
||||
:type group: str
|
||||
:param cc: Commands to execute that adds your user to said specific group(defaults to usermod -a -G")
|
||||
:type cc: tuple
|
||||
:return: True, if worked, raises lib.UserExceptions.UnknownReturnCode when not
|
||||
:rtype bool
|
||||
:raises:
|
||||
lib.UserExceptions.UnknownReturnCode: if cc returned something else then 0
|
||||
"""
|
||||
|
||||
add_command = cc
|
||||
cc = add_command + tuple([group, self.user])
|
||||
if self.dry:
|
||||
self.printTuple(cc)
|
||||
return True
|
||||
elif not self.dry:
|
||||
rt = subprocess.call(cc)
|
||||
if rt != 0:
|
||||
raise lib.UserExceptions.UnknownReturnCode(
|
||||
f"Could not add user '{self.user}' to group '{group}' with command '{cc}', returned '{rt}'", )
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
def printTuple(tup: tuple) -> None:
|
||||
"""Prints a tuple with spaces as separators
|
||||
|
||||
:param tup: Tuple you want to print
|
||||
:type tup: tuple
|
||||
:rtype: None
|
||||
:returns: Nothing
|
||||
"""
|
||||
|
||||
pp = ""
|
||||
for i in tup:
|
||||
pp += i + " "
|
||||
print(pp)
|
||||
|
||||
def remove_user(self, cc: tuple = tuple(["userdel", "-r"])) -> bool:
|
||||
"""Removes the specified user from the system
|
||||
|
||||
:param cc: Commands to execute to delete the user from the System(defaults to userdel -r)
|
||||
:type cc: tuple
|
||||
:return: True, if worked, raises lib.UserExceptions.UnknownReturnCode when not
|
||||
:rtype: bool
|
||||
:raises:
|
||||
lib.UserExceptions.UnknownReturnCode: When cc returns something else then 0 or 6
|
||||
"""
|
||||
|
||||
remove_command = cc
|
||||
cc = remove_command + tuple([self.user])
|
||||
if self.dry:
|
||||
self.printTuple(cc)
|
||||
return True
|
||||
else:
|
||||
ret = subprocess.Popen(cc, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
ret.wait() # wait for cc
|
||||
stdio, err_io = ret.communicate() # get stdout as well as stderr
|
||||
if ret.returncode != 0 and ret.returncode != 6: # userdel returns 6 when no mail dir was found but success
|
||||
raise lib.UserExceptions.UnknownReturnCode(
|
||||
f"Could not delete user with command {cc}. Return code: {ret.returncode},"
|
||||
f" stdout/stderr: {stdio + err_io}")
|
||||
return True
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
S = System("dar", dryrun=True)
|
||||
S.register()
|
||||
S.lock_user_pw()
|
||||
S.add_to_usergroup()
|
||||
# if not S.make_ssh_usable("dar", "SSHpub"):
|
||||
# print("Huh, error :shrug:")
|
||||
exit(0)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
48
private/lib/UserExceptions.py
Normal file
48
private/lib/UserExceptions.py
Normal file
|
@ -0,0 +1,48 @@
|
|||
class General(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class User(General):
|
||||
pass
|
||||
|
||||
|
||||
class UnknownUser(User):
|
||||
def __init__(self, name):
|
||||
Exception.__init__(self, f"Tried to perform action on unknown user '{name}'")
|
||||
|
||||
|
||||
class UserExistsAlready(User):
|
||||
def __init__(self, name):
|
||||
Exception.__init__(self, f"User '{name}' is already registered")
|
||||
|
||||
|
||||
class UnknownReturnCode(General):
|
||||
pass
|
||||
|
||||
|
||||
class ModifyFilesystem(General):
|
||||
pass
|
||||
|
||||
|
||||
class SSHDirUncreatable(ModifyFilesystem):
|
||||
pass
|
||||
|
||||
|
||||
class SQLiteDatabaseDoesntExistYet(General):
|
||||
pass
|
||||
|
||||
|
||||
class UsernameLength(User):
|
||||
pass
|
||||
|
||||
|
||||
class UsernameTooShort(User):
|
||||
pass
|
||||
|
||||
|
||||
class UsernameTooLong(User):
|
||||
pass
|
||||
|
||||
|
||||
class UsernameInvalidCharacters(User):
|
||||
pass
|
219
private/lib/Validator.py
Normal file
219
private/lib/Validator.py
Normal file
|
@ -0,0 +1,219 @@
|
|||
import csv
|
||||
import pwd
|
||||
import re
|
||||
|
||||
import lib.sqlitedb
|
||||
|
||||
|
||||
def checkUsernameCharacters(username: str) -> bool:
|
||||
""" Checks the Username for invalid characters. Allow only alphanumerical characters, a lower alpha one first,
|
||||
Followed by any sequence of digits and characters
|
||||
|
||||
:param username: String to check for validity
|
||||
:type username: str
|
||||
:return: True when valid, False when not
|
||||
:rtype: bool
|
||||
"""
|
||||
|
||||
if " " not in username and "_" not in username and username.isascii() and username[:1].islower() and \
|
||||
not username[0].isnumeric():
|
||||
if not re.search(r"\W+", username):
|
||||
if not re.search("[^a-zA-Z0-9]", username):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def checkUsernameLength(username: str, upper_limit: int = 16, lower_limit: int = 3) -> bool:
|
||||
""" Checks username for an upper and lower bounds limit character count
|
||||
|
||||
:param username: Username to check
|
||||
:type username: str
|
||||
:param upper_limit: Upper limit bounds to check for(default is 16)
|
||||
:type upper_limit: int
|
||||
:param lower_limit: Lower limit bounds to check for(default is 3)
|
||||
:return: True, when all bounds are in, False when one or both aren't.
|
||||
:rtype: bool
|
||||
"""
|
||||
|
||||
if len(username) > upper_limit:
|
||||
return False
|
||||
if len(username) < lower_limit:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def checkUserExists(username: str) -> bool:
|
||||
""" Checks if the User exists on the **SYSTEM** by calling PWD on it.
|
||||
**Note**: You might want to use this in conjunction with checkUserInDB
|
||||
|
||||
:param username:
|
||||
:type username: str
|
||||
:return: True when exists, False when not
|
||||
:rtype: bool
|
||||
"""
|
||||
|
||||
try:
|
||||
pwd.getpwnam(username)
|
||||
except KeyError:
|
||||
return False
|
||||
return True # User already exists
|
||||
|
||||
|
||||
def checkUserInDB(username: str, db: str) -> bool:
|
||||
""" Checks users existence in the **DATABASE**.
|
||||
:Note: You might want to use this in conjunction with `checkUserExists`
|
||||
|
||||
:param username: Username to check existence in database
|
||||
:type username: str
|
||||
:param db: Path to database to check in
|
||||
:type db: str
|
||||
:return: True, when User exists, False when not
|
||||
"""
|
||||
|
||||
try:
|
||||
ldb = lib.sqlitedb.SQLiteDB(db)
|
||||
fetched = ldb.safequery("SELECT * FROM 'applications' WHERE username = ?", tuple([username]))
|
||||
if fetched:
|
||||
return True
|
||||
except lib.sqlitedb.sqlite3.Error as e:
|
||||
print(f"SQLite Exception: {e}")
|
||||
return False
|
||||
|
||||
|
||||
def checkSSHKey(key: str) -> bool:
|
||||
""" Checks SSH Key for meta-data that we accept.
|
||||
:Note: We currently only allow ssh keys without options but with a mail address at the end in b64 encoded.
|
||||
The currently supported algorithms are: ecdfsa-sha2-nistp256, 'ecdsa-sha2-nistp384', 'ecdsa-sha2-nistp521',
|
||||
'ssh-rsa', 'ssh-dss' and 'ssh-ed25519'
|
||||
|
||||
:param key: Key to check
|
||||
:return: True, when Key is valid, False when not
|
||||
:rtype: bool
|
||||
"""
|
||||
|
||||
# taken from https://github.com/hashbang/provisor/blob/master/provisor/utils.py, all belongs to them! ;)
|
||||
import base64
|
||||
if len(key) > 8192 or len(key) < 80:
|
||||
return False
|
||||
|
||||
key = key.replace("\"", "").replace("'", "").replace("\\\"", "")
|
||||
key = key.split(' ')
|
||||
types = ['ecdsa-sha2-nistp256', 'ecdsa-sha2-nistp384',
|
||||
'ecdsa-sha2-nistp521', 'ssh-rsa', 'ssh-dss', 'ssh-ed25519']
|
||||
if key[0] not in types:
|
||||
return False
|
||||
try:
|
||||
base64.decodebytes(bytes(key[1], "utf-8"))
|
||||
except TypeError:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def checkEmail(mail: str) -> bool:
|
||||
""" Checks Mail against a relatively simple REgex Pattern.
|
||||
|
||||
:param mail: Mail to check
|
||||
:type mail: str
|
||||
:return: False, when the Mail is invalid, True when valid.
|
||||
:rtype: bool
|
||||
"""
|
||||
|
||||
if not re.match("(^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$)", mail):
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
|
||||
def checkDatetimeFormat(datetime_str: str) -> bool:
|
||||
""" Checks a Strings format on date time.
|
||||
|
||||
:param datetime_str: String to check
|
||||
:type datetime_str: str
|
||||
:return: True when valid, False when not.
|
||||
:rtype: bool
|
||||
"""
|
||||
|
||||
import datetime
|
||||
try:
|
||||
datetime.datetime.strptime(datetime_str, "%Y-%m-%d %H:%M:%S")
|
||||
except ValueError:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def checkName(name: str) -> bool:
|
||||
""" Checks a users (real) Name against a real simple REgex Pattern.
|
||||
|
||||
:param name: Name/String to check
|
||||
:type name: str
|
||||
:return: True when valid, False when not.
|
||||
:rtype: bool
|
||||
"""
|
||||
|
||||
if not re.match("\w+\s*\w", name):
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
|
||||
def checkImportFile(path: str, db: str):
|
||||
""" Checks an CSV file against most of the validators and prints an Error message with the line number corresponding
|
||||
to said failure.. Those includes: checkName, checkUsernameCharacters,
|
||||
ckeckUsernameLength, duplicate usernames(in the CSV), checkSSHKey, checkEmail, checkUserExists, checkUserInDB,
|
||||
checkDatetimeformat and if the status is 1 or 0.
|
||||
|
||||
:param path: Path to file to check
|
||||
:type path: str
|
||||
:param db: Path to database file(SQLite)
|
||||
:type db: str
|
||||
:return: Str when Failure, True when success(All tests passed)
|
||||
:rtype: Str or None
|
||||
"""
|
||||
|
||||
errstr = ""
|
||||
valid = True
|
||||
ln = 1 # line number
|
||||
valid_names_list = []
|
||||
with open(path, 'r', newline='') as f:
|
||||
reader = csv.DictReader(f)
|
||||
for row in reader:
|
||||
# if any of this fails move on to the next user, just print a relatively helpful message lel
|
||||
if not lib.Validator.checkName(row["name"]):
|
||||
errstr += f"Line {ln}: Name: '{row['name']}' seems not legit. Character followed by character should" \
|
||||
f" be correct.\n"
|
||||
valid = False
|
||||
if not lib.Validator.checkUsernameCharacters(row["username"]):
|
||||
errstr += (f"Line {ln}: Username contains unsupported characters or starts with a number: '"
|
||||
f"{row['username']}'.\n")
|
||||
valid = False
|
||||
if not lib.Validator.checkUsernameLength(row["username"]):
|
||||
errstr += f"Line {ln}: Username '{row['username']}' is either too long(>16) or short(<3)\n"
|
||||
valid = False
|
||||
# dup checking
|
||||
if row["username"] in valid_names_list:
|
||||
errstr += f"Line {ln}: Duplicate Username {row['username']}!\n"
|
||||
valid = False
|
||||
else:
|
||||
valid_names_list.append(row["username"])
|
||||
# dup end
|
||||
if not lib.Validator.checkSSHKey(row["pubkey"]):
|
||||
errstr += f"Line {ln}: Following SSH-Key of user '{row['username']}' isn't valid: " \
|
||||
f"'{row['pubkey']}'.\n"
|
||||
valid = False
|
||||
if not lib.Validator.checkEmail(row["email"]):
|
||||
errstr += f"Line {ln}: E-Mail address of user '{row['username']}' '{row['email']}' is not valid.\n"
|
||||
valid = False
|
||||
if lib.Validator.checkUserExists(row["username"]) or checkUserInDB(row["username"], db):
|
||||
errstr += f"Line {ln}: User '{row['username']}' already exists.\n"
|
||||
valid = False
|
||||
if not lib.Validator.checkDatetimeFormat(row["timestamp"]):
|
||||
errstr += f"Line {ln}: Timestamp '{row['timestamp']}' from user '{row['username']}' is invalid.\n"
|
||||
valid = False
|
||||
if int(row["status"]) > 1 or int(row["status"]) < 0:
|
||||
errstr += f"Line {ln}: Status '{row['status']}' MUST be either 0 or 1.\n"
|
||||
valid = False
|
||||
ln += 1
|
||||
if valid:
|
||||
return True
|
||||
else:
|
||||
return errstr
|
9
private/lib/cwd.py
Normal file
9
private/lib/cwd.py
Normal file
|
@ -0,0 +1,9 @@
|
|||
import os
|
||||
|
||||
cwd = os.environ.get('TILDE_CONF')
|
||||
if cwd is None:
|
||||
cwd = os.getcwd() + "/applicationsconfig.ini"
|
||||
else:
|
||||
if os.path.isfile(cwd) is False:
|
||||
cwd = os.getcwd() + "/applicationsconfig.ini"
|
||||
# cwd is now either cwd/applicationsconfig or $TILDE_CONF
|
164
private/lib/sqlitedb.py
Normal file
164
private/lib/sqlitedb.py
Normal file
|
@ -0,0 +1,164 @@
|
|||
#!/usr/bin/env python3
|
||||
import sqlite3
|
||||
from sys import stderr as stderr
|
||||
from typing import List # Typing support!
|
||||
|
||||
|
||||
# create dictionary out of sqlite results
|
||||
def dict_factory(cursor, row):
|
||||
d: dict = {}
|
||||
for idx, col in enumerate(cursor.description):
|
||||
d[col[0]] = row[idx]
|
||||
return d
|
||||
|
||||
|
||||
class SQLiteDB:
|
||||
"""SQLitedb handles EVERYTHING directly related to our Database."""
|
||||
|
||||
db = ""
|
||||
cursor = None
|
||||
connection = None
|
||||
last_result = None
|
||||
|
||||
def __init__(self, dbpath: str):
|
||||
"""
|
||||
:param dbpath: Path to the database we want to open
|
||||
:type dbpath: str
|
||||
:returns: Object for the SQLitedb-Class.
|
||||
:rtype: object
|
||||
"""
|
||||
|
||||
db = dbpath
|
||||
try:
|
||||
self.connection = sqlite3.connect(db)
|
||||
self.cursor = self.connection.cursor()
|
||||
except sqlite3.Error as e:
|
||||
print("Connection error: %s" % e, file=stderr)
|
||||
|
||||
self.cursor.row_factory = sqlite3.Row # every result will be a dict now
|
||||
|
||||
def __del__(self):
|
||||
try:
|
||||
self.connection.commit()
|
||||
self.connection.close()
|
||||
except sqlite3.Error as e:
|
||||
print("Couldn't gracefully close db: %s" % e, file=stderr)
|
||||
|
||||
def query(self, qq: str) -> List[sqlite3.Row]:
|
||||
"""Do a query and automagically get the fetched results in a list
|
||||
:param qq: Query to execute
|
||||
:type qq: str
|
||||
:returns: A tuple(/list) consisting with any fetched results
|
||||
:rtype: list
|
||||
"""
|
||||
|
||||
try:
|
||||
self.cursor.execute(qq)
|
||||
self.last_result = self.cursor.fetchall()
|
||||
self.connection.commit()
|
||||
except sqlite3.OperationalError:
|
||||
self._createTable()
|
||||
return self.query(qq)
|
||||
except sqlite3.Error as e:
|
||||
print("Couldn't execute query %s, exception: %s" % (qq, e), file=stderr)
|
||||
self.last_result = []
|
||||
return self.last_result
|
||||
|
||||
# sometimes we need the cursor for safety reasons, for example does sqlite3 all the security related
|
||||
# escaoing in supplied strings for us, when we deliver it to con.execute in the second argument as a tuple
|
||||
def getCursor(self) -> sqlite3:
|
||||
"""Returns SQLite3 Cursor. Use with **c a u t i o n**... """
|
||||
return self.cursor
|
||||
|
||||
# we could try to utilise that ourselfs in a function. Be c a r e f u l, these values in the tuple MUST HAVE
|
||||
# THE RIGHT TYPE
|
||||
def safequery(self, qq: str, deliver: tuple) -> List[sqlite3.Row]:
|
||||
""" Shall handle any query that has user input in it as an alternative to self.query
|
||||
:param qq: Query to execute
|
||||
:type qq: str
|
||||
:param deliver: User inputs marked with the placeholder(`?`) in the str
|
||||
:type deliver: tuple
|
||||
:returns: A tuple(/list) consisting with any fetched results
|
||||
:rtype: List[sqlite3.Row]
|
||||
"""
|
||||
|
||||
try:
|
||||
self.cursor.execute(qq, deliver)
|
||||
self.last_result = self.cursor.fetchall()
|
||||
self.connection.commit()
|
||||
except TypeError as e:
|
||||
print("Types in given tuple doesnt match to execute query \"%s\": %s" % (qq, e), file=stderr)
|
||||
self.last_result = []
|
||||
except sqlite3.OperationalError:
|
||||
self._createTable()
|
||||
return self.safequery(qq, deliver)
|
||||
except sqlite3.Error as e:
|
||||
print("Couldn't execute query %s, exception: %s" % (qq, e), file=stderr)
|
||||
print(deliver)
|
||||
print(type(e))
|
||||
self.last_result = []
|
||||
return self.last_result
|
||||
|
||||
def removeApplicantFromDB(self, userid: int) -> bool:
|
||||
"""Removes Applicants from the DB by ID. Use along System.removeUser()
|
||||
:param userid: User ID to remove from the Database
|
||||
:type userid: int
|
||||
:returns: True, if removal was successful(from the DB), False when not
|
||||
:rtype: bool
|
||||
"""
|
||||
|
||||
try:
|
||||
self.last_result = self.cursor.execute("DELETE FROM `applications` WHERE id = ? ", [userid])
|
||||
self.connection.commit()
|
||||
except sqlite3.OperationalError:
|
||||
print("The database has probably not yet seen any users, so it didnt create your table yet. Come back"
|
||||
"when a user tried to register")
|
||||
return False
|
||||
except sqlite3.Error as e:
|
||||
print(f"Could not delete user with id: {userid}, exception in DB: {e}") # @TODO LOGGING FFS
|
||||
return False
|
||||
return True
|
||||
|
||||
def removeApplicantFromDBperUsername(self, username: str) -> bool:
|
||||
"""Removes Applicants from the DB by Username. Use along System.removeUser()
|
||||
:param username: Username to remove from the database
|
||||
:type username: str
|
||||
:returns: True, if removal was successful(from the DB), False when not
|
||||
:rtype: bool
|
||||
"""
|
||||
|
||||
try:
|
||||
self.last_result = self.cursor.execute("DELETE FROM `applications` WHERE username = ?", [username])
|
||||
self.connection.commit()
|
||||
except sqlite3.OperationalError:
|
||||
print("The database has probably not yet seen any users, so it didnt create your table yet. Come back"
|
||||
"when a user tried to register")
|
||||
return False
|
||||
except sqlite3.Error as e:
|
||||
print(f"Could not delete user {username}, exception in DB: {e}") # @TODO LOGGING
|
||||
return False
|
||||
return True
|
||||
|
||||
def _createTable(self) -> None:
|
||||
try:
|
||||
self.cursor.execute(
|
||||
"CREATE TABLE IF NOT EXISTS applications("
|
||||
"id INTEGER PRIMARY KEY AUTOINCREMENT,"
|
||||
"username TEXT NOT NULL, email TEXT NOT NULL,"
|
||||
"name TEXT NOT NULL, pubkey TEXT NOT NULL,"
|
||||
"timestamp DATETIME DEFAULT CURRENT_TIMESTAMP CONSTRAINT "
|
||||
"timestamp_valid CHECK( timestamp IS strftime('%Y-%m-%d %H:%M:%S', timestamp))"
|
||||
",status INTEGER NOT NULL DEFAULT 0);")
|
||||
self.connection.commit()
|
||||
except sqlite3.Error as e:
|
||||
print(f"The database probably doesn't exist yet, but read the message: {e}")
|
||||
print("The database table didn't exist yet; created it successfully!")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
SQLiteDB("bla.db")
|
||||
print("hi")
|
||||
exit(0)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
6
private/lib/uis/config_ui.py
Normal file
6
private/lib/uis/config_ui.py
Normal file
|
@ -0,0 +1,6 @@
|
|||
import argparse
|
||||
import lib.cwd
|
||||
|
||||
argparser = argparse.ArgumentParser(description='Tilde administration tools ', conflict_handler="resolve")
|
||||
argparser.add_argument('-c', '--config', default=lib.cwd.cwd,
|
||||
type=str, help='Path to configuration file', required=False)
|
11
private/lib/uis/default.py
Normal file
11
private/lib/uis/default.py
Normal file
|
@ -0,0 +1,11 @@
|
|||
import lib.uis.config_ui
|
||||
|
||||
argparser = lib.uis.config_ui.argparser
|
||||
|
||||
# store_true just stores true when the command is supplied, so it doesn't need choices nor types
|
||||
argparser.add_argument('-u', '--unapproved', default=False, action="store_true",
|
||||
help='only unapproved users. Default is only approved.', required=False)
|
||||
argparser.add_argument('-a', '--approved', default=False, action="store_true",
|
||||
help="Only approved Users.", required=False)
|
||||
argparser.add_argument('-f', '--file', default="stdout",
|
||||
type=str, help='write to file instead of stdout', required=False)
|
5
private/scripts/disable.sh
Normal file
5
private/scripts/disable.sh
Normal file
|
@ -0,0 +1,5 @@
|
|||
unalias dev_build
|
||||
unalias dev_run
|
||||
unalias dev_bash
|
||||
unalias dev_stop
|
||||
unalias dev_disable
|
5
private/scripts/shortcuts.sh
Normal file
5
private/scripts/shortcuts.sh
Normal file
|
@ -0,0 +1,5 @@
|
|||
alias dev_run="docker container run -l dev-ssh-reg --name dev-ssh-reg --rm -itd -v $PWD/private/:/app/admin ssh-reg"
|
||||
alias dev_stop="docker container stop dev-ssh-reg"
|
||||
alias dev_build="docker build -t ssh-reg:latest --force-rm ."
|
||||
alias dev_bash="docker container exec -it dev-ssh-reg bash -c 'cd /app/admin; exec bash --login -i'"
|
||||
alias dev_disable="source private/scripts/disable.sh"
|
|
@ -1,30 +1,26 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
import re, configparser, logging, sqlite3, argparse
|
||||
from os import getcwd
|
||||
import argparse
|
||||
import configparser
|
||||
import logging
|
||||
import re
|
||||
import sqlite3
|
||||
from os import environ
|
||||
from os import getcwd
|
||||
from os import path as ospath
|
||||
import re, configparser, logging, sqlite3
|
||||
|
||||
|
||||
try:
|
||||
cwd = environ.get('TILDE_CONF')
|
||||
if cwd is None:
|
||||
cwd=getcwd()+"/applicationsconfig.ini"
|
||||
cwd = getcwd()+"/applicationsconfig.ini"
|
||||
else:
|
||||
if ospath.isfile(cwd) is False:
|
||||
cwd=getcwd()+"/applicationsconfig.ini"
|
||||
cwd = getcwd() + "/applicationsconfig.ini"
|
||||
# cwd is now either cwd/applicationsconfig or $TILDE_CONF
|
||||
argparser = argparse.ArgumentParser(description='interactive registration formular for tilde platforms')
|
||||
argparser.add_argument('-c', '--config', default=cwd, type=str, help='Config file', required=False)
|
||||
args = argparser.parse_args()
|
||||
CONF_FILE = args.config
|
||||
except:
|
||||
# intended broad, @TODO check them all for errors instead of everything in one
|
||||
logging.exception("Argumentparser-Exception: ")
|
||||
exit(0)
|
||||
|
||||
try:
|
||||
config = configparser.ConfigParser()
|
||||
config.read(CONF_FILE)
|
||||
logging.basicConfig(format="%(asctime)s: %(message)s", filename=config['DEFAULT']['log_file'],
|
||||
|
@ -46,7 +42,9 @@ def __createTable(cursor, connection):
|
|||
"id INTEGER PRIMARY KEY AUTOINCREMENT,"
|
||||
"username TEXT NOT NULL, email TEXT NOT NULL,"
|
||||
"name TEXT NOT NULL, pubkey TEXT NOT NULL,"
|
||||
"timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, status INTEGER NOT NULL DEFAULT 0);")
|
||||
"timestamp DATETIME DEFAULT CURRENT_TIMESTAMP CONSTRAINT "
|
||||
"timestamp_valid CHECK( timestamp IS strftime('%Y-%m-%d %H:%M:%S', timestamp))"
|
||||
",status INTEGER NOT NULL DEFAULT 0);")
|
||||
connection.commit()
|
||||
except sqlite3.Error as e:
|
||||
logging.exception("Couldn't create needed SQLite Table! Exception: %s" % e)
|
||||
|
@ -79,15 +77,24 @@ def __checkSQLite(cursor, connection):
|
|||
|
||||
def check_username(value):
|
||||
global VALID_USER
|
||||
if " " in value or "_ " in value or not value.isascii() or not value[:1].islower() or value[0].isnumeric():
|
||||
VALID_USER = False
|
||||
return False
|
||||
if re.search(r"\W+", value):
|
||||
VALID_USER = False
|
||||
return False
|
||||
if len(value) < 3:
|
||||
VALID_USER=False
|
||||
VALID_USER = False
|
||||
return False
|
||||
if len(value) > 16:
|
||||
VALID_USER = False
|
||||
return False
|
||||
try:
|
||||
from pwd import getpwnam
|
||||
getpwnam(value)
|
||||
VALID_USER = False
|
||||
# intended broad
|
||||
except Exception:
|
||||
# everything from pwd throws an KeyError when the given user cannot be found
|
||||
except KeyError:
|
||||
VALID_USER = True
|
||||
return True
|
||||
return False
|
||||
|
@ -98,34 +105,36 @@ def validate_pubkey(value):
|
|||
global VALID_SSH
|
||||
import base64
|
||||
if len(value) > 8192 or len(value) < 80:
|
||||
VALID_SSH=False
|
||||
VALID_SSH = False
|
||||
return False
|
||||
|
||||
value = value.replace("\"", "").replace("'", "").replace("\\\"", "")
|
||||
value = value.split(' ')
|
||||
types = [ 'ecdsa-sha2-nistp256', 'ecdsa-sha2-nistp384',
|
||||
'ecdsa-sha2-nistp521', 'ssh-rsa', 'ssh-dss', 'ssh-ed25519' ]
|
||||
types = ['ecdsa-sha2-nistp256', 'ecdsa-sha2-nistp384',
|
||||
'ecdsa-sha2-nistp521', 'ssh-rsa', 'ssh-dss', 'ssh-ed25519']
|
||||
if value[0] not in types:
|
||||
VALID_SSH=False
|
||||
VALID_SSH = False
|
||||
return False
|
||||
|
||||
try:
|
||||
base64.decodebytes(bytes(value[1], "utf-8"))
|
||||
except TypeError:
|
||||
VALID_SSH=False
|
||||
VALID_SSH = False
|
||||
return False
|
||||
|
||||
VALID_SSH=True
|
||||
VALID_SSH = True
|
||||
return True
|
||||
|
||||
|
||||
def main():
|
||||
print(" ▗▀▖ \n▗▖▖ ▐ ▌ ▌▛▀▖\n▘▝▗▖▜▀ ▌ ▌▌ ▌\n ▝▘▐ ▝▀▘▘ ▘")
|
||||
|
||||
username = input("Welcome to the ~.fun user application form!\n\nWhat is your desired username? [a-z0-9] allowed:\n")
|
||||
username = input("Welcome to the ~.fun user application form!\n\n"
|
||||
"What is your desired username? [a-z0-9] allowed:\n")
|
||||
while (not re.match("[a-z]+[a-z0-9]", username)) or (not check_username(username)):
|
||||
username = input("Invalid Username, maybe it exists already?\nValid characters are only a-z and 0-9."
|
||||
"\nMake sure your username starts with a character and not a number."
|
||||
"\nMake sure your username starts with a character and not a number"
|
||||
" and is not larger than 16 characters."
|
||||
"\nWhat is your desired username? [a-z0-9] allowed:\n")
|
||||
|
||||
fullname = input("\nPlease enter your full name:\n")
|
||||
|
@ -154,8 +163,8 @@ def main():
|
|||
if re.match("[yY]", validation):
|
||||
print("Thank you for your application! We'll get in touch shortly. 🐧")
|
||||
try:
|
||||
connection=sqlite3.connect(REG_FILE)
|
||||
cursor=connection.cursor()
|
||||
connection = sqlite3.connect(REG_FILE)
|
||||
cursor = connection.cursor()
|
||||
__checkSQLite(cursor, connection)
|
||||
addtotable(cursor, connection, username, fullname, email, pubkey)
|
||||
connection.commit()
|
||||
|
|
Loading…
Reference in a new issue