Compare commits

..

No commits in common. 'master' and 'feature-restruct' have entirely different histories.

4
.gitignore vendored

@ -1,4 +0,0 @@
__pycache__/
.idea/
test*

@ -8,6 +8,21 @@ RUN apt-get update &&\
# Clean up APT when done. # Clean up APT when done.
RUN apt-get clean && rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* RUN apt-get clean && rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
# private/{scripts, administrate.py}, public/{scripts, userapplications.py}, config/userapplicatonsconfig.ini
#configs, logs, db
COPY config/applicationsconfig.ini /app/data/applicationsconfig.ini
# admin scripts
COPY private/ /app/admin/
# user accessible scripts
# Make TILDE_ENV
COPY public/ /app/user/
#SSH config into /etc :)
COPY config/etc /etc
# create user for applications # create user for applications
RUN useradd -Md /app/user/ -s /app/user/userapplication.py tilde RUN useradd -Md /app/user/ -s /app/user/userapplication.py tilde
@ -19,27 +34,15 @@ RUN usermod -U tilde
RUN useradd -Md /app/admin -s /app/admin/administrate.py admin RUN useradd -Md /app/admin -s /app/admin/administrate.py admin
# privilege separation directory # privilege separation directory
RUN mkdir -p /var/run/sshd RUN mkdir -p /var/run/sshd
# expose SSH port # expose SSH port
EXPOSE 22 EXPOSE 22
ENV TILDE_CONF="/app/data/applicationsconfig.ini" ENV TILDE_CONF="/app/data/applicationsconfig.ini"
RUN mkdir -p /app/data #COPY config/environment /app/user/.ssh/environment
RUN echo TILDE_CONF=$TILDE_CONF > /app/user/.ssh/environment
# private/{scripts, administrate.py}, public/{scripts, userapplications.py}, config/userapplicatonsconfig.ini
#configs, logs, db
COPY config/applicationsconfig.ini /app/data/applicationsconfig.ini
#SSH config into /etc :)
COPY config/etc /etc
RUN touch /app/data/applications.sqlite RUN touch /app/data/applications.sqlite
RUN touch /app/data/applications.log RUN touch /app/data/applications.log
# Doesnt work, @TODO why # Doesnt work, @TODO why
#RUN setfacl -R -m u:tilde:rwx /app/data/ #RUN setfacl -R -m u:tilde:rwx /app/data/
RUN chown -R tilde /app/data RUN chown -R tilde /app/data
# admin scripts CMD ["/usr/sbin/sshd", "-D"]
COPY private/ /app/admin/
# user accessible scripts
# Make TILDE_ENV
COPY public/ /app/user/
RUN mkdir /app/user/.ssh
CMD ["sh", "-c", " echo TILDE_CONF=$TILDE_CONF > /app/user/.ssh/environment && exec /usr/sbin/sshd -D"]

@ -1,129 +0,0 @@
#!/usr/bin/env python3
import configparser
import csv
import io
import ListUsers
import lib.uis.default as default_cmd # Follows -u, -a, -f flags
class Backup:
"""Backups a Tilde database to an CSV file
:Example:
>>> from Backup import Backup
>>> from ListUsers import ListUsers
>>> L = ListUsers.ListUsers("/path/to/sqlite").get_fetch()
>>> backup_db = Backup("stdout")
>>> backup_db.backup_to_file(L)
CSV-Separated list with headers in first row
"""
filename: str
quoting: int
dialect: str
field_names: tuple
def __init__(self, output: str, quoting: int = csv.QUOTE_NONNUMERIC, dialect: str = "excel"):
""" Constructs the Backup object
:param output: File name to backup to(set to stdout for stdout)
:type output: str
:param quoting: Set quoting for CSV Module
:type quoting: int
:param dialect: Set the CSV-Dialect. Defaults to excel, which is the classic CSV
:type dialect: str
"""
self.setFilename(output)
self.setQuoting(quoting)
self.setDialect(dialect)
self.setFieldnames(tuple(['id', 'username', 'email', 'name', 'pubkey', 'timestamp', 'status']))
def setDialect(self, dialect: str) -> None:
""" Set dialect for Object
:param dialect: Dialect to set for Object
:type dialect: str
:return: None
:rtype: None
"""
self.dialect = dialect
def setQuoting(self, quoting: int) -> None:
""" Set quoting in the CSV(must be supported by the CSV Module!)
:param quoting: Quoting Integer given by csv.QUOTE_* constants
:type quoting: int
:return: None
:rtype: None
"""
self.quoting = quoting
def setFilename(self, filename: str) -> None:
""" Sets Filename to output to
:param filename: Filename to output to(set stdout for stdout)
:type filename: str
:return: None
:rtype: None
"""
self.filename = filename
def setFieldnames(self, f_names: tuple) -> None:
""" Set fieldname to process
:param f_names: Fieldnames-Tuple
:type f_names: tuple
:return: None
:rtype: None
"""
self.field_names = f_names
def backup_to_file(self, fetched: list) -> bool:
"""Backup Userlist to File(or stdout)
:param fetched: List of values to write out CSV-formatted
:return: True, if success, None when not.
:rtype: bool
"""
returner = io.StringIO()
write_csv = csv.DictWriter(returner, fieldnames=self.field_names, quoting=self.quoting, dialect=self.dialect)
write_csv.writeheader()
for row in fetched:
write_csv.writerow(dict(row))
# sqlite3.Row doesn't "easily" convert to a dict itself sadly, so just a quick help from us here
# it actually even delivers a list(sqlite3.Row) also, which doesnt make the life a whole lot easier
if self.filename == "stdout":
print(returner.getvalue())
return True
else:
with open(self.filename, "w") as f:
print(returner.getvalue(), file=f)
return True
if __name__ == "__main__":
default_cmd.argparser.description += " - Backups Tilde Users to stdout or a file."
args = default_cmd.argparser.parse_args()
config = configparser.ConfigParser()
config.read(args.config)
L = ListUsers.ListUsers(config['DEFAULT']['applications_db'],
unapproved=args.unapproved, approved=args.approved)
fetch = L.get_fetch()
if fetch:
B = Backup(args.file)
B.setFieldnames(fetch[0].keys()) # sqlite3.row delivers its keys for us! SO NICE!
B.backup_to_file(fetch)
else:
print("nothing to backup!")
exit(1)
exit(0)

@ -1,87 +0,0 @@
#!/usr/bin/env python3
import configparser
import csv
import os
import lib.UserExceptions
import lib.uis.config_ui # dont go to default, just following -c flag
def import_from_file(file_path: str, db: str, user_ids: tuple = tuple([])) -> bool:
""" Imports Users from a given CSV-file to the system and DB
:param file_path:
:type file_path: str
:param db: Path to the sqlite db
:type db: str
:param user_ids: FIXME: Tuple which user_ids should we write
:type user_ids: tuple
:return: True on success, False when not
:rtype: bool
"""
if not os.path.isfile(file_path):
print(f"File {file_path} don't exist")
return False
if not os.path.isfile(db):
print(f"The database file {db} don't exist")
return False
if user_ids:
pass # empty tuple means everything
# noinspection PyBroadException
try:
with open(file_path, 'r', newline='') as f:
import lib.Validator
sql = lib.sqlitedb.SQLiteDB(db)
err = lib.Validator.checkImportFile(file_path, db)
if err is not True:
print(err)
exit(0)
import lib.sqlitedb
import lib.System
sys_ctl = lib.System.System("root")
reader = csv.DictReader(f) # @TODO csv.Sniffer to compare? When yes, give force-accept option
for row in reader:
if row["status"] == "1":
try:
sys_ctl.setUser(row["username"])
sys_ctl.aio_approve(row["pubkey"])
print(row['username'], "====> Registered.")
except lib.UserExceptions.General as GeneralExcept:
print(f"Something didnt work out! {GeneralExcept}")
elif row["status"] == "0":
print(row['username'] + " not approved, therefore not registered.")
try:
sql.safequery(
"INSERT INTO `applications` (username, name, timestamp, email, pubkey, status) "
"VALUES (?,?,?,?,?,?)", tuple([row["username"], row["name"], row["timestamp"],
row["email"], row["pubkey"], row["status"]]))
except OSError as E:
pass
print(f"UUFFF, something went WRONG with the file {file_path}: {E}")
except Exception as didntCatch:
print(f"Exception! UNCATCHED! {type(didntCatch)}: {didntCatch}")
return True
if __name__ == "__main__":
ArgParser = lib.uis.config_ui.argparser
ArgParser.description += "- Imports a CSV file consisting of user specific details to the database"
ArgParser.add_argument('-f', '--file', default="stdout",
type=str, help='Import from CSV file', required=True)
ArgParser.add_argument('--Import', default=False, action="store_true",
help="Import Users.", required=True)
args = ArgParser.parse_args()
config = configparser.ConfigParser()
config.read(args.config)
if not args.Import:
print("Error, need the import flag")
if not args.file:
print("Error, need the import file")
if not args.file:
print("You MUST set a CSV-file with the -f/--file flag that already exist")
exit(1)
import_from_file(args.file, config['DEFAULT']['applications_db'])
exit(0)

@ -1,120 +0,0 @@
#!/usr/bin/env python3
import configparser
import sqlite3 # sqlite3.Row-Object
from typing import List # Typing support!
import lib.uis.default as default_cmd # Follows -u, -a, -f flags
from lib.sqlitedb import SQLiteDB
class ListUsers:
db = None
usersFetch = None
def __init__(self, db: str, unapproved: bool = False, approved: bool = True, single_user: str = None):
"""Constructs ListUsers
:param db: Database to access
:type db: str
:param unapproved: only List unapproved users
:type unapproved: bool
:param approved: only list approved users
:type approved: bool
"""
self.db = SQLiteDB(db)
if unapproved: # only unapproved users
query = "SELECT * FROM `applications` WHERE `status` = '0'"
elif approved: # Approved users
query = "SELECT * FROM `applications` WHERE `status` = '1'"
else: # All users
query = "SELECT * FROM `applications`"
self.usersFetch = self.db.query(query)
if single_user is not None:
query = "SELECT * FROM `applications` WHERE `username` = ?"
self.usersFetch = self.db.safequery(query, tuple([single_user]))
def output_as_list(self) -> str:
"""Generates a string with one (approved) user per line and one newline at the end
:rtype: str
:return: String consisting with one(activated) user per line
"""
list_str: str = ""
query = "SELECT `username` FROM `applications` WHERE `status` = '1' ORDER BY timestamp ASC"
self.usersFetch = self.db.query(query)
for users in self.usersFetch:
list_str += users["username"] + "\n"
return list_str
def prettyPrint(self) -> None:
pass # see below why not implemented yet, texttable...
def get_fetch(self) -> List[sqlite3.Row]:
""" Returns a complete fetch done by the lib.sqlitedb-class
:return: Complete fetchall(). A List[sqlite3.Row] with dict-emulation objects.
:rtype: List[sqlite3.Row]
"""
return self.usersFetch
# @TODO MAYBE best solution: https://pypi.org/project/texttable/
# examle:
"""
from texttable import Texttable
t = Texttable()
t.add_rows([['Name', 'Age'], ['Alice', 24], ['Bob', 19]])
print(t.draw())
---------------> Results in:
+-------+-----+
| Name | Age |
+=======+=====+
| Alice | 24 |
+-------+-----+
| Bob | 19 |
+-------+-----+
for user in fetch:
print("ID: {}; Username: \"{}\"; Mail: {}; Name: \"{}\"; Registered: {}; Status: {}".format(
user["id"], user["username"], user["email"], user["name"], user["timestamp"], user["status"]
))
"""
if __name__ == "__main__":
default_cmd.argparser.description += " - Lists Users from the Tilde database."
default_cmd.argparser.add_argument('--list-asc', default=False, action="store_true",
help='Output a newline seperated list of users', required=False, dest="args_asc")
default_cmd.argparser.add_argument('--user', default=None, type=str,
help="Just show a specific user by it's name", required=False)
args = default_cmd.argparser.parse_args()
config = configparser.ConfigParser()
config.read(args.config)
ret = ""
if args.user is not None:
L = ListUsers(config['DEFAULT']['applications_db'], unapproved=args.unapproved, approved=args.approved,
single_user=args.user)
else:
L = ListUsers(config['DEFAULT']['applications_db'], unapproved=args.unapproved, approved=args.approved)
if args.args_asc:
ret = L.output_as_list()
else:
fetch = L.get_fetch()
ret += "ID %-1s| Username %-5s| Mail %-20s| Name %-17s| Registered %-8s | State |\n" % (
" ", " ", " ", " ", " "
)
ret += 102 * "-" + "\n"
for user in fetch:
ret += "%-4i| %-14s| %-25s| %-22s| %-8s | %-5i |\n" % (
user["id"], user["username"], user["email"], user["name"], user["timestamp"], user["status"]
)
if args.file != "stdout":
with open(args.file, 'w') as f:
print(ret, file=f)
else:
print(ret)
exit(0)

@ -4,20 +4,16 @@ import configparser, logging, sqlite3, argparse, pwd
import os import os
import subprocess import subprocess
# Clear shell # Clear shell
def clear(): def clear():
os.system('cls' if os.name == 'nt' else 'clear') os.system('cls' if os.name == 'nt' else 'clear')
# create dictionary out of sqlite results # create dictionary out of sqlite results
def dict_factory(cursor, row): def dict_factory(cursor, row):
d = {} d = {}
for idx, col in enumerate(cursor.description): for idx, col in enumerate(cursor.description):
d[col[0]] = row[idx] d[col[0]] = row[idx]
return d return d
# prints command(but doesnt execute them) # prints command(but doesnt execute them)
# need this for work, just convenience # need this for work, just convenience
def debugExec(commands): def debugExec(commands):
@ -38,28 +34,28 @@ argparser.add_argument('-c', '--config', default=cwd,
type = str, help = 'Path to configuration file', required = False) type = str, help = 'Path to configuration file', required = False)
args = argparser.parse_args() args = argparser.parse_args()
CONF_FILE = args.config CONF_FILE = args.config
config = configparser.ConfigParser() config = configparser.ConfigParser()
config.read(CONF_FILE) config.read(CONF_FILE)
logging.basicConfig(format="%(asctime)s: %(message)s", logging.basicConfig(format="%(asctime)s: %(message)s",
level = int(config['LOG_LEVEL']['log_level']) level = int(config['LOG_LEVEL']['log_level'])
) )
del cwd del(cwd)
REG_FILE = config['DEFAULT']['applications_db'] REG_FILE = config['DEFAULT']['applications_db']
# Does everything related to applicants, i.e. creating, manipulations... # Does everything related to applicants, i.e. creating, manipulations...
class Applicants: class applicants():
# User identifier # User identifier
identifier = "username" identifier = "username"
# SQLite DB Path # SQLite DB Path
sourceDB = "" sourceDB = ""
# another sqlite to batch-recreate users # another sqlite to batch-recreate users
differentDB = "" differentDB = ""
def __init__(self, lident, sourceDB=REG_FILE):
def __init__(self, lident, sourcedb=REG_FILE):
self.identifier = lident self.identifier = lident
self.sourceDB = sourcedb self.sourceDB = sourceDB
self.__connectToDB__("source") self.__connectToDB__("source")
# all results shall be done with dict_factory! Makes everything so much simpler # all results shall be done with dict_factory! Makes everything so much simpler
self.sdbCursor.row_factory = dict_factory self.sdbCursor.row_factory = dict_factory
@ -79,18 +75,16 @@ class Applicants:
self.ddbCursor = self.ddbConnection.cursor() self.ddbCursor = self.ddbConnection.cursor()
def __closeDB__(self, which): def __closeDB__(self, which):
if which == "source": if(which == "source"):
try: try:
self.sdbConnection.close() self.sdbConnection.close()
except sqlite3.Error as e: except sqlite3.Error as e:
logging.exception( logging.exception("Couldn't close database! Error: %s" % e) # @TODO: Dump full db with query or just the executed querys to file
"Couldn't close database! Error: %s" % e)
# @TODO: Dump full db with query or just the executed querys to file
else: else:
self.ddbConnection.close() # @TODO: Evaluate getting rid of ddb(differentDB)? self.ddbConnection.close() # @TODO: Evaluate getting rid of ddb(differentDB)?
# get List of all applications(not accepted yet) # get List of all applications(not accepted yet)
def getapplicationslist(self): def getApplicationsList(self):
query = "SELECT * FROM `applications` WHERE `status` = '0'" query = "SELECT * FROM `applications` WHERE `status` = '0'"
try: try:
self.sdbCursor.execute(query) self.sdbCursor.execute(query)
@ -100,7 +94,7 @@ class Applicants:
rows=[] rows=[]
return rows return rows
def getapprovedapplicantslist(self): def getApprovedApplicantsList(self):
query = "SELECT * From `applications` WHERE `status` = '1'" query = "SELECT * From `applications` WHERE `status` = '1'"
try: try:
self.sdbCursor.execute(query) self.sdbCursor.execute(query)
@ -111,23 +105,22 @@ class Applicants:
return rows return rows
# edit aproved users # edit aproved users
def editapprovedapplicants(self, term): def editApprovedApplicant(self, term, updaterow):
try: try:
# the fuck did i try here?
self.sdbCursor.execute( self.sdbCursor.execute(
"UPDATE `applications` WHERE id=?", (str(term),) "UPDATE `applications` SET ? WHERE id=?",
( str(term), )
) )
self.sdbConnection.commit() self.sdbConnection.commit()
except sqlite3.Error as e: except sqlite3.Error as e:
logging.exception("Database Error: %s" % e) logging.exception("Database Error: %s" % e)
# set user to aproved # set user to aproved
def setapprovedapplication(self, selectterm): def setApprovedApplication(self, selectterm):
# query = "SELECT `username` FROM `applications` WHERE `username` = `{0!s}`".format(selectterm) query = "SELECT `username` FROM `applications` WHERE `username` = `{0!s}`".format(selectterm)
pass
# get applicants data # get applicants data
def getapplicantsdata(self, term): def getApplicantsData(self, term):
# @TODO: Use shorthand if for the correct query, directly into sqlite # @TODO: Use shorthand if for the correct query, directly into sqlite
if self.identifier == "id": if self.identifier == "id":
try: try:
@ -147,15 +140,15 @@ class Applicants:
return result return result
# @TODO: migrade just approved users to some new/another sqlitedb # @TODO: migrade just approved users to some new/another sqlitedb
def migrateapproveddata(self, different_db): def migrateApprovedData(self, different_db):
pass pass
# @TODO: delete migrated data # @TODO: delete migrated data
def deletemigrateddata(self, selectterm): def deleteMigratedDataSet(self, selectterm):
pass pass
# Applicants whom doesnt got approved should get removed # Applicants whom doesnt got approved should get removed
def removeapplicant(self, term): def removeApplicant(self, term):
if self.identifier == "id": if self.identifier == "id":
try: try:
self.sdbCursor.execute( self.sdbCursor.execute(
@ -168,17 +161,17 @@ class Applicants:
else: else:
self.sdbCursor.execute( self.sdbCursor.execute(
'DELETE FROM `applications` WHERE username = ?', "DELETE FROM `applications` WHERE username = ?",
( str(term), ) ( str(term), )
) )
self.sdbConnection.commit() self.sdbConnection.commit()
#@TODO: Possibility to work without passing users manually #@TODO: Possibility to work without passing users manually
def selecteduser(userid, username=False): def selectedUser(userid, username = False):
pass pass
# Print out a list of aprovable users # Print out a list of aprovable users
def printapprovableusers(self, users): def printApprovableUsers(self, users):
i=0 i=0
for user in users: for user in users:
print("ID: {0!s}, Status: {0!s}, Name: {0!s}".format(i, user["status"], user["username"])) print("ID: {0!s}, Status: {0!s}, Name: {0!s}".format(i, user["status"], user["username"]))
@ -186,8 +179,7 @@ class Applicants:
return i return i
# Get List of users # Get List of users
@staticmethod def userPrint(self, fetched, userid):
def userprint(fetched, userid):
print("ID: {0!s}".format(fetched[int(userid)]["id"])) print("ID: {0!s}".format(fetched[int(userid)]["id"]))
print("Username: {0!s}".format(fetched[int(userid)]["username"])) print("Username: {0!s}".format(fetched[int(userid)]["username"]))
print("Mail: {0!s}".format(fetched[int(userid)]["email"])) print("Mail: {0!s}".format(fetched[int(userid)]["email"]))
@ -195,8 +187,8 @@ class Applicants:
print("Registrated time: {0!s}".format(fetched[int(userid)]["timestamp"])) print("Registrated time: {0!s}".format(fetched[int(userid)]["timestamp"]))
# Approve an applicant. Handles everything related, like create home dir, set flags blabla # Approve an applicant. Handles everything related, like create home dir, set flags blabla
def approveapplicant(self, term): def approveApplicant(self, term):
user = self.getapplicantsdata(term) user = self.getApplicantsData(term)
ret = self.__execScript(user) ret = self.__execScript(user)
if ret[0] != 0: # @DEBUG: Change to == 0 if ret[0] != 0: # @DEBUG: Change to == 0
print("Something went wrong in the user creation! Exiting without deleting users record in database!") print("Something went wrong in the user creation! Exiting without deleting users record in database!")
@ -215,96 +207,99 @@ class Applicants:
else: else:
self.sdbCursor.execute( self.sdbCursor.execute(
"UPDATE `applications` SET `status`=1 WHERE `username`=?", "UPDATE `applications` SET `status`=1 WHERE `username`=?"
( str(term), ) ( str(term), )
) )
self.sdbConnection.commit() self.sdbConnection.commit()
# Script execution, handles everything done with the shell/commands themselves # Script execution, handles everything done with the shell/commands themselves
@staticmethod def __execScript(self, user):
def __execScript(user):
# @TODO: omfg just write some wrapper-class/lib... sucks hard! # @TODO: omfg just write some wrapper-class/lib... sucks hard!
username=user["username"] username=user["username"]
home_dir = "/home/" + username + "/" homeDir="/home/"+username+"/"
ssh_dir = home_dir + ".ssh/" sshDir=homeDir+".ssh/"
executed=[] executed=[]
executed.append(["useradd", "-m", username]) executed.append(["useradd", "-m", username])
returncode = subprocess.call(executed[0]) rcode = subprocess.call(executed[0])
if returncode != 0: if rcode != 0:
return [returncode, executed, ] return [rcode,executed,]
executed.append(["usermod", "--lock", username]) executed.append(["usermod", "--lock", username])
returncode = subprocess.call(executed[1]) # empty pw rcode = subprocess.call(executed[1]) #empty pw
if returncode != 0: if rcode != 0:
return [returncode, executed, ] return [rcode,executed,]
executed.append(["usermod", "-a", "-G", "tilde", username]) executed.append(["usermod", "-a", "-G", "tilde", username])
returncode = subprocess.call(executed[2]) # add to usergroup rcode = subprocess.call(executed[2]) # add to usergroup
if returncode != 0: if rcode != 0:
return [returncode, executed, ] return [rcode,executed,]
executed.append(["mkdir", ssh_dir]) executed.append(["mkdir", sshDir])
try: try:
# @TODO: use config variable(chmodPerms) # @TODO: use config variable(chmodPerms)
os.mkdir(ssh_dir, 0o777) # create sshdir ret = os.mkdir(sshDir, 0o777) #create sshdir
returncode = 0 rcode = 0
except OSError as e: except OSError as e:
logging.exception(e.strerror) logging.exception(e.strerror)
returncode = e.errno # False, couldn't create. rcode = e.errno # False, couldn't create.
return [returncode, executed, ] return [rcode,executed,]
executed.append(["write(sshkey) to", ssh_dir + "authorized_keys"]) executed.append(["write(sshkey) to", sshDir+"authorized_keys"])
with open(ssh_dir + "authorized_keys", "w") as f: with open(sshDir+"authorized_keys", "w") as f:
f.write(user["pubkey"]) f.write(user["pubkey"])
if not f.closed: if f.closed != True:
logging.exception("Could'nt write to authorized_keys!") logging.exception("Could'nt write to authorized_keys!")
return [returncode, executed, ] return [rcode,executed,]
executed.append(["chmod", "-Rv", "700", ssh_dir]) executed.append(["chmod", "-Rv", "700", sshDir])
try: try:
os.chmod(ssh_dir + "authorized_keys", 0o700) # directory is already 700 os.chmod(sshDir+"authorized_keys", 0o700) # directory is already 700
returncode = 0 rcode = 0
except OSError as e: except OSError as e:
logging.exception(e.strerror) logging.exception(e.strerror)
returncode = e.errno rcode = e.errno
return [returncode, executed, ] return [rcode, executed,]
try: try:
executed.append(["chown", "-Rv", username + ":" + username, ssh_dir]) executed.append(["chown", "-Rv", username+":"+username, sshDir])
os.chown(ssh_dir, pwd.getpwnam(username)[2], pwd.getpwnam(username)[3]) # 2=>uid, 3=>gid os.chown(sshDir, pwd.getpwnam(username)[2], pwd.getpwnam(username)[3]) #2=>uid, 3=>gid
executed.append(["chown", "-v", username + ":" + username, ssh_dir + "authorized_keys"]) executed.append(["chown", "-v", username+":"+username, sshDir+"authorized_keys"])
os.chown(ssh_dir + "authorized_keys", pwd.getpwnam(username)[2], pwd.getpwnam(username)[3]) os.chown(sshDir+"authorized_keys", pwd.getpwnam(username)[2], pwd.getpwnam(username)[3])
returncode = 0 rcode = 0
except OSError as e: except OSError as e:
logging.exception(e.strerror) # @TODO: maybe append strerror to executed instead of printing it logging.exception(e.strerror) # @TODO: maybe append strerror to executed instead of printing it
returncode = e.errno rcode = e.errno
return [returncode, executed, ] return [rcode, executed,]
return [rcode,executed,]
"""
{'id': 7, 'username': 'testuser47', 'email': '47test@testmail.com', 'name':
'test Name', 'pubkey': 'ssh-rsa [...]', 'timestamp': '2018-08-22 13:31:16', 'status': 0}
return [returncode, executed, ] """
# {'id': 7, 'username': 'testuser47', 'email': '47test@testmail.com', 'name':
# 'test Name', 'pubkey': 'ssh-rsa [...]', 'timestamp': '2018-08-22 13:31:16', 'status': 0}
def main(): def main():
# how many times the Separator/Delimiter? # how many times the Seperator/Delimiter?
delcount = 40 delcount = 40
# The separator for the menu # The seperator for the menu
separator = "=" * delcount Seperator = "="*delcount
menu = separator + "\n\t\t Main-Menu:\n\n" \ Menu = Seperator+"\n\t\t Main-Menu:\n\n" \
"\t 1) list and edit pending users\n"\ "\t 1) list and edit pending users\n"\
"\t 2) list applicants\n"\ "\t 2) list applicants\n"\
"\t 3) edit applicant\n"\ "\t 3) edit applicant\n"\
"\t 4) quit\n" + separator + "\n" "\t 4) quit\n"+Seperator+"\n"
# Identify by ID # Identify by ID
applications = Applicants(lident="id") applications = applicants(lident = "id")
while 1 != 0: while 1 != 0:
print(menu) print(Menu)
command = input("Please select, what you want to do: \n -> ") command = input("Please select, what you want to do: \n -> ")
# User shouldn't be able to type something in that isnt a number # User shouldnt be able to type something in that isnt a number
if command.isalpha() or command == '': if command.isalpha() or command == '':
clear() clear()
print("!!! invalid input, please try again. !!!") print("!!! invalid input, please try again. !!!")
@ -317,66 +312,65 @@ def main():
exit(0) exit(0)
# Edit and list pending users/applicants @TODO Wording: Users or applicants? # Edit and list pending users/applicants @TODO Wording: Users or applicants?
elif command == 1: elif command == 1:
users = applications.getapplicationslist() users = applications.getApplicationsList()
i = applications.printapprovableusers(users) i=applications.printApprovableUsers(users)
if i == 0 : if i == 0 :
print("No pending users") print("No pending users")
# giving some time to acknowledge that something WRONG happened # giving some time to aknowledge that something WRONG happened
input("Continue with Keypress...") input("Continue with Keypress...")
clear() clear()
continue continue
user_selection = 0 usersel = 0
user_max = i UserMax = i
print("Menu:\n r=>return to main") print("Menu:\n r=>return to main")
# Edit Menu # Edit Menue
while 1 != 0 or user_selection != "r": while 1 != 0 or usersel != "r":
i = applications.printapprovableusers(users) i = applications.printApprovableUsers(users)
if user_selection == "r": if usersel == "r":
break # break when user presses r break # break when user presses r
user_selection = input("Which user( ID ) do you want to change? ->") usersel = input("Which user( ID ) do you want to change? ->")
if len(user_selection) > 1 or user_selection.isalpha(): if len(usersel) > 1 or usersel.isalpha():
user_selection = "" usersel = ""
# convert to int if input isnt an r # convert to int if input isnt an r
user_selection = int(user_selection) if user_selection != '' and user_selection != 'r' else 0 usersel = int(usersel) if usersel != '' and usersel != 'r' else 0
if user_selection > user_max - 1: if usersel > UserMax - 1:
print("User {0!s} doesn't exist!".format(user_selection)) print("User {0!s} doesn't exist!".format(usersel))
continue continue
# Show the user his chosen user and ask what to do # Show the user his chosen user and ask what to do
applications.userprint(users, user_selection) applications.userPrint(users, usersel)
print("You chosed ID No. {0!s}, what do you like to do?".format(user_selection)) print("You chosed ID No. {0!s}, what do you like to do?".format(usersel))
chosen_user = user_selection chosenUser = usersel
user_selection = "" usersel = ""
# Finally down the edit menu! # Finally down the edit menue!
while user_selection != "e": while usersel != "e":
user_selection = input( usersel = input("User: {0!s}\n \t\t(A)ctivate \n\t\t(R)emove \n\t\tR(e)turn\n -> ".format(chosenUser))
"User: {0!s}\n \t\t(A)ctivate \n\t\t(R)emove \n\t\tR(e)turn\n -> ".format(chosen_user)) if usersel == "A":
if user_selection == "A": applications.approveApplicant(users[chosenUser]['id'])
applications.approveapplicant(users[chosen_user]['id']) print("User {0!s} has been successfully approved!".format(users[chosenUser]['username']))
print("User {0!s} has been successfully approved!".format(users[chosen_user]['username']))
input("waiting for input...") input("waiting for input...")
clear() clear()
user_selection = "e" # remove for being able to continue editing? usersel="e" # remove for being able to continue editing?
continue continue
elif user_selection == "R": elif usersel == "R":
applications.removeapplicant(users[chosen_user]['id']) applications.removeApplicant(users[chosenUser]['id'])
print("User {0!s} successfully deleted!".format(user[chosen_user]['username'])) print("User {0!s} successfully deleted!".format(user[chosenUser]['username']))
input("waiting for input...") input("waiting for input...")
clear() clear()
continue continue
elif user_selection == "e": elif usersel == "e":
clear() clear()
continue continue
elif int(command) == 2: elif int(command) == 2:
users = applications.getapprovedapplicantslist() users = applications.getApprovedApplicantsList()
if not users: if users == []:
print("no activate users yet!") print("no activate users yet!")
i=0 i=0
for user in users: for user in users:
@ -387,7 +381,6 @@ def main():
else: else:
exit(0) exit(0)
if __name__ == "__main__": if __name__ == "__main__":
try: try:
main() main()

@ -1,147 +0,0 @@
#!/usr/bin/env python3
import configparser
import sqlite3
import lib.System
import lib.UserExceptions
import lib.Validator
import lib.sqlitedb
import lib.uis.config_ui # only follow -c flag
if __name__ == "__main__":
lib.uis.config_ui.argparser.description += " - Edit Tilde Users"
ArgParser = lib.uis.config_ui.argparser
ArgParser.add_argument('--user', type=str,
help='Tilde users name to edit', required=True)
Mutually = ArgParser.add_mutually_exclusive_group()
Mutually.add_argument('-r', '--remove', default=False, action="store_true",
help='Remove an approved/unapproved User from the system(and DB). Effectively purges him.',
required=False)
Mutually.add_argument("--verify", default=True, action="store_false",
help="Turns off value checks",
required=False)
ArgParser.add_argument('--sshpubkey', type=str, default=None,
help="Stores the new given SSH-Key in given user", required=False)
ArgParser.add_argument('--name', type=str, default=None,
help="Sets the stored name of the given user")
ArgParser.add_argument('--username', type=str, default=None,
help="Rename given User")
ArgParser.add_argument('--email', type=str, default=None,
help="Set new email address for given user")
ArgParser.add_argument('--status', type=int, default=None,
help="Set status of given user")
args = ArgParser.parse_args()
config = configparser.ConfigParser()
config.read(args.config)
db = config['DEFAULT']['applications_db']
if not args.sshpubkey and not args.name and not args.username and not args.email and args.status is None \
and not args.remove:
print(f"Well, SOMETHING must be done with {args.user} ;-)")
exit(1)
# --> --user
if not lib.Validator.checkUserInDB(args.user, db):
print(f"User {args.user} does not exist in the database.")
exit(1)
DB = lib.sqlitedb.SQLiteDB(db)
sys_ctl = lib.System.System(args.user)
if not DB:
print("Could not establish connection to database")
exit(1)
CurrentUser = DB.safequery("SELECT * FROM `applications` WHERE `username`=?", tuple([args.user]))[0]
# --> --remove
if args.remove:
print(f"Removing {args.user} from the system and the database...")
try:
DB.removeApplicantFromDBperUsername(args.user)
print(f"Purged from the DB")
if CurrentUser["status"] == 1:
sys_ctl.remove_user()
print(f"Purged from the system")
else:
print(f"'{args.user}' was not approved before, therefore not deleting from system itself.")
except lib.UserExceptions.General as e:
print(f"{e}")
exit(1)
print(f"Successfully removed '{args.user}'.")
exit(0)
# --> --sshpubkey
if args.sshpubkey:
if not lib.Validator.checkSSHKey(args.sshpubkey):
print(f"Pubkey '{args.sshpubkey}' isn't valid.")
exit(1)
try:
DB.safequery("UPDATE `applications` SET `pubkey`=? WHERE `username`=?",
tuple([args.sshpubkey, args.user]))
CurrentUser = DB.safequery("SELECT * FROM `applications` WHERE `username` = ? ", tuple([args.user]))[0]
if int(CurrentUser["status"]) == 1:
sys_ctl.make_ssh_usable(args.sshpubkey)
except sqlite3.Error as e:
print(f"Something unexpected happened! {e}")
exit(1)
except lib.UserExceptions.ModifyFilesystem as e:
print(f"One action failed during writing the ssh key back to the authorization file. {e}")
print(f"'{args.user}'s SSH-Key updated successfully.")
# --> --name
if args.name:
if not lib.Validator.checkName(args.name):
print(f"'{args.name}' is not a valid Name.")
exit(1)
try:
DB.safequery("UPDATE `applications` SET `name` =? WHERE `username` =?", tuple([args.name, args.user]))
except sqlite3.Error as e:
print(f"Could not write '{args.name}' to database: {e}")
print(f"'{args.user}'s Name changed to '{args.name}'.")
# --> --email
if args.email:
if not lib.Validator.checkEmail(args.email):
print(f"'{args.email}' is not a valid Mail address!")
exit(1)
try:
DB.safequery("UPDATE `applications` SET `email` =? WHERE `username` =?", tuple([args.email]))
except sqlite3.Error as e:
print(f"Could not write '{args.email}' to the database. {e}")
print(f"'{args.user}' Mail changed to '{args.email}'.")
# --> --status
if args.status is not None:
if args.status != 0 and args.status != 1:
print("Only 0 and 1 are valid status, where 1 is activated and 0 is unapproved.")
exit(0)
# just takes first result out of the dict
if args.status == int(CurrentUser["status"]):
print(f"New and old status are the same.")
if args.status == 0 and int(CurrentUser["status"]) == 1:
try:
DB.safequery("UPDATE `applications` SET `status` =? WHERE `id`=?",
tuple([args.status, CurrentUser["id"]]))
sys_ctl.remove_user()
except sqlite3.Error as e:
print(f"Could not update database entry for '{args.user}', did not touch the system")
exit(1)
except lib.UserExceptions.UnknownReturnCode as e:
print(f"Could not remove '{args.user}' from the system, unknown return code: {e}. DB is modified.")
exit(1)
print(f"Successfully changed '{args.user}'s status to 0 and cleared from the system.")
if args.status == 1 and int(CurrentUser["status"]) == 0:
try:
DB.safequery("UPDATE `applications` SET `status`=? WHERE `username`=?",
tuple([args.status, args.user]))
sys_ctl.aio_approve(CurrentUser["pubkey"])
except sqlite3.Error as e:
print(f"Could not update Users status in database")
exit(1)
except lib.UserExceptions.General as ChangeUser:
print(f"Some chain in the cattle just slipped away, my lord! {ChangeUser}")
exit(1)
print(f"Successfully changed '{args.user}'s status to 1 and created on the system.")
exit(0)

@ -1,6 +0,0 @@
import configparser
import lib.uis.default as default_cmd
args = default_cmd.argparser.parse_args()
config = configparser.ConfigParser()
config.read(args.config)

@ -1,260 +0,0 @@
import os
import pwd
import subprocess
import lib.UserExceptions
class System:
"""Class to interact with the system specifically to support our needs 0w0
:Example:
>>> from lib.System import System as System
>>> Sys_ctl = System("Test", dryrun=True)
>>> Sys_ctl.register()
>>> Sys_ctl.lock_user_pw()
>>> Sys_ctl.add_to_usergroup()
>>> Sys_ctl.make_ssh_usable("sshkey")
"""
dry: bool = False
create_command = []
home: str = ""
user: str
def setUser(self, username: str):
self.user = username
def __init__(self, username: str, dryrun: bool = False, home: str = "/home/"):
"""Creates an objects. Can set dry run.
:param username: Username to manipulate
:type username: str
:param dryrun: Run all command in a dry-run? When enabled, doesn't make any changes to the system (defaults to
false)
:type dryrun: bool
:param home: Standard directory to search for the home directories of your users(default is /home/)
:type home: str
:raises:
ValueError: if homedir can not be found
"""
self.dry = dryrun
if not home.endswith("/"):
home += "/"
if not os.path.isdir(home):
raise ValueError("home should be an existent directory...")
self.home = home
self.user = username
def aio_approve(self, pubkey, group="tilde"):
""" Executes all neccessary steps to create a user from itself. Raises ALOT of possible exceptions
:Note: CAREFULL! You MUST except the exceptions!
:param pubkey: Users public ssh key
:type pubkey: str
:param group: User-group. Defaults to tilde
:type group: str
:return: None
:raises:
lib.UserExceptions.UserExistsAlready: User Exists already on system
lib.UserExceptions.UnknownReturnCode: Unknown Return Code from useradd
lib.UserExceptions.SSHDirUncreatable: Users SSH Dir couldnt be created
lib.UserExceptions.ModifyFilesystem: Something with CHMOD failed
"""
self.register()
self.lock_user_pw()
self.add_to_usergroup(group)
self.make_ssh_usable(pubkey)
def register(self, cc: tuple = tuple(["useradd", "-m"])) -> bool:
"""Creates an local account for the given username
:param cc: Tuple with commands separated to execute on the machine. (defaults to useradd -m)
:type cc: tuple
:return: True, if worked, raises lib.UserExceptions.UserExistsAlready when not
:rtype: bool
:raises:
lib.UserExceptions.UserExistsAlready: when username specified already exists on the system
"""
create_command = cc
cc = create_command + tuple([self.user])
if self.dry:
self.printTuple(cc)
return True
elif not self.dry:
rt = subprocess.call(cc)
if rt != 0:
raise lib.UserExceptions.UserExistsAlready(self.user)
return True
def unregister(self) -> bool:
""" Just an alias function for removeUser
:return: True, when success, False(or exception) when not
:rtype: bool
"""
return self.remove_user()
def make_ssh_usable(self, pubkey: str, sshdir: str = ".ssh/") -> bool:
""" Make SSH usable for our newly registered user
:param pubkey: Public SSH Key for the User you want accessible by SSH
:type pubkey: str
:param sshdir: Directory to write the authorized_keys File to. PWD is $HOME of said user. (defaults to ".ssh/")
:type sshdir: str
:return: True, if worked, raises lib.UserExceptions.UnknownReturnCode, lib.UserExceptions.HomeDirExistsAlready
or lib.UserExceptions.ModifyFilesystem when not
:rtype: bool
:raises:
lib.UserExceptions.SSHDirUncreatable: if the ssh-dir couldnt be created nor exist
lib.UserExceptions.ModifyFilesystem: When chmod to .ssh and authorized_keys failed
lib.UserExceptions.General: if PWD cant find the specified user
"""
if self.dry:
print("Nah, @TODO, but actually kinda too lazy for this lul. Just a lot happening here")
return True
if not sshdir.endswith("/"):
sshdir += "/"
ssh_dir = self.home + self.user + "/" + sshdir
try:
os.mkdir(ssh_dir)
except FileExistsError:
pass # thats actually a good one for us :D
except OSError as e:
raise lib.UserExceptions.SSHDirUncreatable(f"Could not create {ssh_dir}: Exception: {e}")
try:
self.write_ssh(pubkey, ssh_dir)
except OSError as e:
raise lib.UserExceptions.ModifyFilesystem(
f"Could not write and/or chmod 0700 {ssh_dir} or {ssh_dir}/authorized_keys, Exception: {e}")
try:
pwdnam = pwd.getpwnam(self.user)
os.chown(ssh_dir, pwdnam[2], pwdnam[3]) # 2=>uid, 3=>gid
os.chown(ssh_dir + "authorized_keys", pwd.getpwnam(self.user)[2], pwd.getpwnam(self.user)[3])
except OSError as e: # by os.chown
raise lib.UserExceptions.ModifyFilesystem(
f"Could not chown {ssh_dir} and/or authorized_keys to {self.user} and their group, Exception: {e}", )
except KeyError as e: # by PWD
raise lib.UserExceptions.General(f"PWD can't find {self.user}: {e}")
return True
@staticmethod
def write_ssh(key: str, ssh_dir: str) -> None:
""" Write SSH key to a specified directory(appends authorized_keys itself!)
:param key: Key to write
:type key: str
:param ssh_dir: SSH Directory to write to
:type ssh_dir: str
:return: None
"""
with open(ssh_dir + "authorized_keys", "w") as f:
print(key, file=f)
f.close()
os.chmod(ssh_dir + "authorized_keys", 0o700) # we dont care about the directory here
def lock_user_pw(self, cc: tuple = tuple(["usermod", "--lock"])) -> bool:
"""Lock a users password so it stays empty
:param cc: Commands to run in the subprocess to lock it down(defaults to usermod --lock)
:type cc: tuple
:rtype: bool
:return: True, if worked, raises lib.UserExceptions.UnknownReturnCode when not
:raises:
lib.UserExceptions.UnknownReturnCode: When cc returns something else then 0
"""
lock_command = cc
cc = lock_command + tuple([self.user])
if self.dry:
self.printTuple(cc)
return True
elif not self.dry:
rt = subprocess.call(cc)
if rt != 0:
raise lib.UserExceptions.UnknownReturnCode(f"Could not lock user '{self.user}'; '{cc}' returned '{rt}'")
return True
def add_to_usergroup(self, group: str = "tilde", cc: tuple = tuple(["usermod", "-a", "-G"])) -> bool:
""" Adds a given user to a given group
:param group: Groupname where you want to add your user to
:type group: str
:param cc: Commands to execute that adds your user to said specific group(defaults to usermod -a -G")
:type cc: tuple
:return: True, if worked, raises lib.UserExceptions.UnknownReturnCode when not
:rtype bool
:raises:
lib.UserExceptions.UnknownReturnCode: if cc returned something else then 0
"""
add_command = cc
cc = add_command + tuple([group, self.user])
if self.dry:
self.printTuple(cc)
return True
elif not self.dry:
rt = subprocess.call(cc)
if rt != 0:
raise lib.UserExceptions.UnknownReturnCode(
f"Could not add user '{self.user}' to group '{group}' with command '{cc}', returned '{rt}'", )
return True
@staticmethod
def printTuple(tup: tuple) -> None:
"""Prints a tuple with spaces as separators
:param tup: Tuple you want to print
:type tup: tuple
:rtype: None
:returns: Nothing
"""
pp = ""
for i in tup:
pp += i + " "
print(pp)
def remove_user(self, cc: tuple = tuple(["userdel", "-r"])) -> bool:
"""Removes the specified user from the system
:param cc: Commands to execute to delete the user from the System(defaults to userdel -r)
:type cc: tuple
:return: True, if worked, raises lib.UserExceptions.UnknownReturnCode when not
:rtype: bool
:raises:
lib.UserExceptions.UnknownReturnCode: When cc returns something else then 0 or 6
"""
remove_command = cc
cc = remove_command + tuple([self.user])
if self.dry:
self.printTuple(cc)
return True
else:
ret = subprocess.Popen(cc, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
ret.wait() # wait for cc
stdio, err_io = ret.communicate() # get stdout as well as stderr
if ret.returncode != 0 and ret.returncode != 6: # userdel returns 6 when no mail dir was found but success
raise lib.UserExceptions.UnknownReturnCode(
f"Could not delete user with command {cc}. Return code: {ret.returncode},"
f" stdout/stderr: {stdio + err_io}")
return True
if __name__ == "__main__":
try:
S = System("dar", dryrun=True)
S.register()
S.lock_user_pw()
S.add_to_usergroup()
# if not S.make_ssh_usable("dar", "SSHpub"):
# print("Huh, error :shrug:")
exit(0)
except KeyboardInterrupt:
pass

@ -1,48 +0,0 @@
class General(Exception):
pass
class User(General):
pass
class UnknownUser(User):
def __init__(self, name):
Exception.__init__(self, f"Tried to perform action on unknown user '{name}'")
class UserExistsAlready(User):
def __init__(self, name):
Exception.__init__(self, f"User '{name}' is already registered")
class UnknownReturnCode(General):
pass
class ModifyFilesystem(General):
pass
class SSHDirUncreatable(ModifyFilesystem):
pass
class SQLiteDatabaseDoesntExistYet(General):
pass
class UsernameLength(User):
pass
class UsernameTooShort(User):
pass
class UsernameTooLong(User):
pass
class UsernameInvalidCharacters(User):
pass

@ -1,219 +0,0 @@
import csv
import pwd
import re
import lib.sqlitedb
def checkUsernameCharacters(username: str) -> bool:
""" Checks the Username for invalid characters. Allow only alphanumerical characters, a lower alpha one first,
Followed by any sequence of digits and characters
:param username: String to check for validity
:type username: str
:return: True when valid, False when not
:rtype: bool
"""
if " " not in username and "_" not in username and username.isascii() and username[:1].islower() and \
not username[0].isnumeric():
if not re.search(r"\W+", username):
if not re.search("[^a-zA-Z0-9]", username):
return True
return False
def checkUsernameLength(username: str, upper_limit: int = 16, lower_limit: int = 3) -> bool:
""" Checks username for an upper and lower bounds limit character count
:param username: Username to check
:type username: str
:param upper_limit: Upper limit bounds to check for(default is 16)
:type upper_limit: int
:param lower_limit: Lower limit bounds to check for(default is 3)
:return: True, when all bounds are in, False when one or both aren't.
:rtype: bool
"""
if len(username) > upper_limit:
return False
if len(username) < lower_limit:
return False
return True
def checkUserExists(username: str) -> bool:
""" Checks if the User exists on the **SYSTEM** by calling PWD on it.
**Note**: You might want to use this in conjunction with checkUserInDB
:param username:
:type username: str
:return: True when exists, False when not
:rtype: bool
"""
try:
pwd.getpwnam(username)
except KeyError:
return False
return True # User already exists
def checkUserInDB(username: str, db: str) -> bool:
""" Checks users existence in the **DATABASE**.
:Note: You might want to use this in conjunction with `checkUserExists`
:param username: Username to check existence in database
:type username: str
:param db: Path to database to check in
:type db: str
:return: True, when User exists, False when not
"""
try:
ldb = lib.sqlitedb.SQLiteDB(db)
fetched = ldb.safequery("SELECT * FROM 'applications' WHERE username = ?", tuple([username]))
if fetched:
return True
except lib.sqlitedb.sqlite3.Error as e:
print(f"SQLite Exception: {e}")
return False
def checkSSHKey(key: str) -> bool:
""" Checks SSH Key for meta-data that we accept.
:Note: We currently only allow ssh keys without options but with a mail address at the end in b64 encoded.
The currently supported algorithms are: ecdfsa-sha2-nistp256, 'ecdsa-sha2-nistp384', 'ecdsa-sha2-nistp521',
'ssh-rsa', 'ssh-dss' and 'ssh-ed25519'
:param key: Key to check
:return: True, when Key is valid, False when not
:rtype: bool
"""
# taken from https://github.com/hashbang/provisor/blob/master/provisor/utils.py, all belongs to them! ;)
import base64
if len(key) > 8192 or len(key) < 80:
return False
key = key.replace("\"", "").replace("'", "").replace("\\\"", "")
key = key.split(' ')
types = ['ecdsa-sha2-nistp256', 'ecdsa-sha2-nistp384',
'ecdsa-sha2-nistp521', 'ssh-rsa', 'ssh-dss', 'ssh-ed25519']
if key[0] not in types:
return False
try:
base64.decodebytes(bytes(key[1], "utf-8"))
except TypeError:
return False
return True
def checkEmail(mail: str) -> bool:
""" Checks Mail against a relatively simple REgex Pattern.
:param mail: Mail to check
:type mail: str
:return: False, when the Mail is invalid, True when valid.
:rtype: bool
"""
if not re.match("(^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$)", mail):
return False
else:
return True
def checkDatetimeFormat(datetime_str: str) -> bool:
""" Checks a Strings format on date time.
:param datetime_str: String to check
:type datetime_str: str
:return: True when valid, False when not.
:rtype: bool
"""
import datetime
try:
datetime.datetime.strptime(datetime_str, "%Y-%m-%d %H:%M:%S")
except ValueError:
return False
return True
def checkName(name: str) -> bool:
""" Checks a users (real) Name against a real simple REgex Pattern.
:param name: Name/String to check
:type name: str
:return: True when valid, False when not.
:rtype: bool
"""
if not re.match("\w+\s*\w", name):
return False
else:
return True
def checkImportFile(path: str, db: str):
""" Checks an CSV file against most of the validators and prints an Error message with the line number corresponding
to said failure.. Those includes: checkName, checkUsernameCharacters,
ckeckUsernameLength, duplicate usernames(in the CSV), checkSSHKey, checkEmail, checkUserExists, checkUserInDB,
checkDatetimeformat and if the status is 1 or 0.
:param path: Path to file to check
:type path: str
:param db: Path to database file(SQLite)
:type db: str
:return: Str when Failure, True when success(All tests passed)
:rtype: Str or None
"""
errstr = ""
valid = True
ln = 1 # line number
valid_names_list = []
with open(path, 'r', newline='') as f:
reader = csv.DictReader(f)
for row in reader:
# if any of this fails move on to the next user, just print a relatively helpful message lel
if not lib.Validator.checkName(row["name"]):
errstr += f"Line {ln}: Name: '{row['name']}' seems not legit. Character followed by character should" \
f" be correct.\n"
valid = False
if not lib.Validator.checkUsernameCharacters(row["username"]):
errstr += (f"Line {ln}: Username contains unsupported characters or starts with a number: '"
f"{row['username']}'.\n")
valid = False
if not lib.Validator.checkUsernameLength(row["username"]):
errstr += f"Line {ln}: Username '{row['username']}' is either too long(>16) or short(<3)\n"
valid = False
# dup checking
if row["username"] in valid_names_list:
errstr += f"Line {ln}: Duplicate Username {row['username']}!\n"
valid = False
else:
valid_names_list.append(row["username"])
# dup end
if not lib.Validator.checkSSHKey(row["pubkey"]):
errstr += f"Line {ln}: Following SSH-Key of user '{row['username']}' isn't valid: " \
f"'{row['pubkey']}'.\n"
valid = False
if not lib.Validator.checkEmail(row["email"]):
errstr += f"Line {ln}: E-Mail address of user '{row['username']}' '{row['email']}' is not valid.\n"
valid = False
if lib.Validator.checkUserExists(row["username"]) or checkUserInDB(row["username"], db):
errstr += f"Line {ln}: User '{row['username']}' already exists.\n"
valid = False
if not lib.Validator.checkDatetimeFormat(row["timestamp"]):
errstr += f"Line {ln}: Timestamp '{row['timestamp']}' from user '{row['username']}' is invalid.\n"
valid = False
if int(row["status"]) > 1 or int(row["status"]) < 0:
errstr += f"Line {ln}: Status '{row['status']}' MUST be either 0 or 1.\n"
valid = False
ln += 1
if valid:
return True
else:
return errstr

@ -1,9 +0,0 @@
import os
cwd = os.environ.get('TILDE_CONF')
if cwd is None:
cwd = os.getcwd() + "/applicationsconfig.ini"
else:
if os.path.isfile(cwd) is False:
cwd = os.getcwd() + "/applicationsconfig.ini"
# cwd is now either cwd/applicationsconfig or $TILDE_CONF

@ -1,164 +0,0 @@
#!/usr/bin/env python3
import sqlite3
from sys import stderr as stderr
from typing import List # Typing support!
# create dictionary out of sqlite results
def dict_factory(cursor, row):
d: dict = {}
for idx, col in enumerate(cursor.description):
d[col[0]] = row[idx]
return d
class SQLiteDB:
"""SQLitedb handles EVERYTHING directly related to our Database."""
db = ""
cursor = None
connection = None
last_result = None
def __init__(self, dbpath: str):
"""
:param dbpath: Path to the database we want to open
:type dbpath: str
:returns: Object for the SQLitedb-Class.
:rtype: object
"""
db = dbpath
try:
self.connection = sqlite3.connect(db)
self.cursor = self.connection.cursor()
except sqlite3.Error as e:
print("Connection error: %s" % e, file=stderr)
self.cursor.row_factory = sqlite3.Row # every result will be a dict now
def __del__(self):
try:
self.connection.commit()
self.connection.close()
except sqlite3.Error as e:
print("Couldn't gracefully close db: %s" % e, file=stderr)
def query(self, qq: str) -> List[sqlite3.Row]:
"""Do a query and automagically get the fetched results in a list
:param qq: Query to execute
:type qq: str
:returns: A tuple(/list) consisting with any fetched results
:rtype: list
"""
try:
self.cursor.execute(qq)
self.last_result = self.cursor.fetchall()
self.connection.commit()
except sqlite3.OperationalError:
self._createTable()
return self.query(qq)
except sqlite3.Error as e:
print("Couldn't execute query %s, exception: %s" % (qq, e), file=stderr)
self.last_result = []
return self.last_result
# sometimes we need the cursor for safety reasons, for example does sqlite3 all the security related
# escaoing in supplied strings for us, when we deliver it to con.execute in the second argument as a tuple
def getCursor(self) -> sqlite3:
"""Returns SQLite3 Cursor. Use with **c a u t i o n**... """
return self.cursor
# we could try to utilise that ourselfs in a function. Be c a r e f u l, these values in the tuple MUST HAVE
# THE RIGHT TYPE
def safequery(self, qq: str, deliver: tuple) -> List[sqlite3.Row]:
""" Shall handle any query that has user input in it as an alternative to self.query
:param qq: Query to execute
:type qq: str
:param deliver: User inputs marked with the placeholder(`?`) in the str
:type deliver: tuple
:returns: A tuple(/list) consisting with any fetched results
:rtype: List[sqlite3.Row]
"""
try:
self.cursor.execute(qq, deliver)
self.last_result = self.cursor.fetchall()
self.connection.commit()
except TypeError as e:
print("Types in given tuple doesnt match to execute query \"%s\": %s" % (qq, e), file=stderr)
self.last_result = []
except sqlite3.OperationalError:
self._createTable()
return self.safequery(qq, deliver)
except sqlite3.Error as e:
print("Couldn't execute query %s, exception: %s" % (qq, e), file=stderr)
print(deliver)
print(type(e))
self.last_result = []
return self.last_result
def removeApplicantFromDB(self, userid: int) -> bool:
"""Removes Applicants from the DB by ID. Use along System.removeUser()
:param userid: User ID to remove from the Database
:type userid: int
:returns: True, if removal was successful(from the DB), False when not
:rtype: bool
"""
try:
self.last_result = self.cursor.execute("DELETE FROM `applications` WHERE id = ? ", [userid])
self.connection.commit()
except sqlite3.OperationalError:
print("The database has probably not yet seen any users, so it didnt create your table yet. Come back"
"when a user tried to register")
return False
except sqlite3.Error as e:
print(f"Could not delete user with id: {userid}, exception in DB: {e}") # @TODO LOGGING FFS
return False
return True
def removeApplicantFromDBperUsername(self, username: str) -> bool:
"""Removes Applicants from the DB by Username. Use along System.removeUser()
:param username: Username to remove from the database
:type username: str
:returns: True, if removal was successful(from the DB), False when not
:rtype: bool
"""
try:
self.last_result = self.cursor.execute("DELETE FROM `applications` WHERE username = ?", [username])
self.connection.commit()
except sqlite3.OperationalError:
print("The database has probably not yet seen any users, so it didnt create your table yet. Come back"
"when a user tried to register")
return False
except sqlite3.Error as e:
print(f"Could not delete user {username}, exception in DB: {e}") # @TODO LOGGING
return False
return True
def _createTable(self) -> None:
try:
self.cursor.execute(
"CREATE TABLE IF NOT EXISTS applications("
"id INTEGER PRIMARY KEY AUTOINCREMENT,"
"username TEXT NOT NULL, email TEXT NOT NULL,"
"name TEXT NOT NULL, pubkey TEXT NOT NULL,"
"timestamp DATETIME DEFAULT CURRENT_TIMESTAMP CONSTRAINT "
"timestamp_valid CHECK( timestamp IS strftime('%Y-%m-%d %H:%M:%S', timestamp))"
",status INTEGER NOT NULL DEFAULT 0);")
self.connection.commit()
except sqlite3.Error as e:
print(f"The database probably doesn't exist yet, but read the message: {e}")
print("The database table didn't exist yet; created it successfully!")
if __name__ == "__main__":
try:
SQLiteDB("bla.db")
print("hi")
exit(0)
except KeyboardInterrupt:
pass

@ -1,6 +0,0 @@
import argparse
import lib.cwd
argparser = argparse.ArgumentParser(description='Tilde administration tools ', conflict_handler="resolve")
argparser.add_argument('-c', '--config', default=lib.cwd.cwd,
type=str, help='Path to configuration file', required=False)

@ -1,11 +0,0 @@
import lib.uis.config_ui
argparser = lib.uis.config_ui.argparser
# store_true just stores true when the command is supplied, so it doesn't need choices nor types
argparser.add_argument('-u', '--unapproved', default=False, action="store_true",
help='only unapproved users. Default is only approved.', required=False)
argparser.add_argument('-a', '--approved', default=False, action="store_true",
help="Only approved Users.", required=False)
argparser.add_argument('-f', '--file', default="stdout",
type=str, help='write to file instead of stdout', required=False)

@ -1,5 +0,0 @@
unalias dev_build
unalias dev_run
unalias dev_bash
unalias dev_stop
unalias dev_disable

@ -1,5 +0,0 @@
alias dev_run="docker container run -l dev-ssh-reg --name dev-ssh-reg --rm -itd -v $PWD/private/:/app/admin ssh-reg"
alias dev_stop="docker container stop dev-ssh-reg"
alias dev_build="docker build -t ssh-reg:latest --force-rm ."
alias dev_bash="docker container exec -it dev-ssh-reg bash -c 'cd /app/admin; exec bash --login -i'"
alias dev_disable="source private/scripts/disable.sh"

@ -1,13 +1,11 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import argparse import re, configparser, logging, sqlite3, argparse
import configparser
import logging
import re
import sqlite3
from os import environ
from os import getcwd from os import getcwd
from os import environ
from os import path as ospath from os import path as ospath
import re, configparser, logging, sqlite3
try: try:
cwd = environ.get('TILDE_CONF') cwd = environ.get('TILDE_CONF')
@ -21,15 +19,17 @@ try:
argparser.add_argument('-c', '--config', default=cwd, type=str, help='Config file', required=False) argparser.add_argument('-c', '--config', default=cwd, type=str, help='Config file', required=False)
args = argparser.parse_args() args = argparser.parse_args()
CONF_FILE=args.config CONF_FILE=args.config
except:
logging.exception("Argumentparser-Exception: ")
try:
config = configparser.ConfigParser() config = configparser.ConfigParser()
config.read(CONF_FILE) config.read(CONF_FILE)
logging.basicConfig(format="%(asctime)s: %(message)s", filename=config['DEFAULT']['log_file'], logging.basicConfig(format="%(asctime)s: %(message)s", filename=config['DEFAULT']['log_file'],level=int(config['LOG_LEVEL']['log_level']))
level=int(config['LOG_LEVEL']['log_level'])) del(cwd)
REG_FILE=config['DEFAULT']['applications_db'] REG_FILE=config['DEFAULT']['applications_db']
except: except:
# intended broad, @TODO check them all for errors instead of everything in one
logging.exception("logging or configparser-Exception: ") logging.exception("logging or configparser-Exception: ")
exit(0)
VALID_SSH=False VALID_SSH=False
VALID_USER=False VALID_USER=False
@ -38,26 +38,22 @@ VALID_USER = False
def __createTable(cursor, connection): def __createTable(cursor, connection):
try: try:
cursor.execute( cursor.execute(
"CREATE TABLE IF NOT EXISTS applications(" "CREATE TABLE IF NOT EXISTS applications(" \
"id INTEGER PRIMARY KEY AUTOINCREMENT," "id INTEGER PRIMARY KEY AUTOINCREMENT,"\
"username TEXT NOT NULL, email TEXT NOT NULL," "username TEXT NOT NULL, email TEXT NOT NULL,"\
"name TEXT NOT NULL, pubkey TEXT NOT NULL," "name TEXT NOT NULL, pubkey TEXT NOT NULL,"\
"timestamp DATETIME DEFAULT CURRENT_TIMESTAMP CONSTRAINT " "timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, status INTEGER NOT NULL DEFAULT 0);")
"timestamp_valid CHECK( timestamp IS strftime('%Y-%m-%d %H:%M:%S', timestamp))"
",status INTEGER NOT NULL DEFAULT 0);")
connection.commit() connection.commit()
except sqlite3.Error as e: except:
logging.exception("Couldn't create needed SQLite Table! Exception: %s" % e) logging.exception("Couldn't create needed SQLite Table!")
def addtotable(cursor, connection, username, name, email, pubkey): def addtotable(cursor, connection, username, name, email, pubkey):
try: try:
cursor.execute("INSERT INTO 'applications'(username, name, email, pubkey)VALUES(" cursor.execute("INSERT INTO 'applications'(username, name, email, pubkey)VALUES("\
"?,?,?,?)", [username, name, email, pubkey]) "?,?,?,?)", [username, name, email, pubkey])
connection.commit() connection.commit()
except sqlite3.Error as e: except:
logging.exception("Couldn't insert user into the db: %s" % e) logging.exception("Couldn't insert user into the db")
# check if sqlite file does exists or already and has our structure # check if sqlite file does exists or already and has our structure
def __checkSQLite(cursor, connection): def __checkSQLite(cursor, connection):
@ -65,36 +61,25 @@ def __checkSQLite(cursor, connection):
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='applications'") cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='applications'")
res=cursor.fetchall() res=cursor.fetchall()
if not res: if res== []:
try: try:
__createTable(cursor, connection) __createTable(cursor, connection)
except sqlite3.Error as e: except:
logging.exception("couldn't create table on given database. Exception: %s" % e) logging.exception("couldn't create table on given database. Exception: ")
else: else:
pass pass
return True return True
def check_username(value): def check_username(value):
global VALID_USER global VALID_USER
if " " in value or "_ " in value or not value.isascii() or not value[:1].islower() or value[0].isnumeric():
VALID_USER = False
return False
if re.search(r"\W+", value):
VALID_USER = False
return False
if len(value) < 3: if len(value) < 3:
VALID_USER=False VALID_USER=False
return False return False
if len(value) > 16:
VALID_USER = False
return False
try: try:
from pwd import getpwnam from pwd import getpwnam
getpwnam(value) getpwnam(value)
VALID_USER=False VALID_USER=False
# everything from pwd throws an KeyError when the given user cannot be found except:
except KeyError:
VALID_USER=True VALID_USER=True
return True return True
return False return False
@ -126,36 +111,38 @@ def validate_pubkey(value):
return True return True
def main(): def main():
print(" ▗▀▖ \n▗▖▖ ▐ ▌ ▌▛▀▖\n▘▝▗▖▜▀ ▌ ▌▌ ▌\n ▝▘▐ ▝▀▘▘ ▘") print(" ▗▀▖ \n▗▖▖ ▐ ▌ ▌▛▀▖\n▘▝▗▖▜▀ ▌ ▌▌ ▌\n ▝▘▐ ▝▀▘▘ ▘")
username = input("Welcome to the ~.fun user application form!\n\n" username = input("Welcome to the ~.fun user application form!\n\nWhat is your desired username? [a-z0-9] allowed:\n")
"What is your desired username? [a-z0-9] allowed:\n")
while (not re.match("[a-z]+[a-z0-9]", username)) or (not check_username(username)): while (not re.match("[a-z]+[a-z0-9]", username)) or (not check_username(username)):
username = input("Invalid Username, maybe it exists already?\nValid characters are only a-z and 0-9." username = input("Invalid Username, maybe it exists already?\nValid characters are only a-z and 0-9.\nMake sure your username starts with a character and not a number." \
"\nMake sure your username starts with a character and not a number"
" and is not larger than 16 characters."
"\nWhat is your desired username? [a-z0-9] allowed:\n") "\nWhat is your desired username? [a-z0-9] allowed:\n")
fullname = input("\nPlease enter your full name:\n") fullname = input("\nPlease enter your full name:\n")
while not re.match("\w+\s*\w*", username): while not re.match("\w+\s*\w*", username):
fullname = input("\nThat is not your real name.\nPlease enter your full name:\n") fullname = input("\nThat is not your real name.\nPlease enter your full name:\n")
email = input("\nPlease enter your email address:\n") email = input("\nPlease enter your email address:\n")
while not re.match("(^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$)", email): while not re.match("(^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$)", email):
email = input("\nThat is not a valid mail address.\nPlease enter your email address:\n") email = input("\nThat is not a valid mail address.\nPlease enter your email address:\n")
pubkey = input("\nPlease paste your ssh public key:\n") pubkey = input("\nPlease paste your ssh public key:\n")
while (not re.match("ssh-(\w)+\s(\w+)(\s*)([a-zA-Z0-9@]*)", pubkey)) or (not validate_pubkey(pubkey)): while (not re.match("ssh-(\w)+\s(\w+)(\s*)([a-zA-Z0-9@]*)", pubkey)) or (not validate_pubkey(pubkey)):
pubkey = input("\nPlease enter a valid public key. You can show it with ssh-keygen -f .ssh/id_rsa -y on your " pubkey = input("\nPlease enter a valid public key. You can show it with ssh-keygen -f .ssh/id_rsa -y on your local machine.\nPlease enter your pubkey:\n")
"local machine.\nPlease enter your pubkey:\n")
validate_pubkey(pubkey) validate_pubkey(pubkey)
print("\nUsername: {0!s}".format(username)) print("\nUsername: {0!s}".format(username))
print("Full Name: {0!s}".format(fullname)) print("Full Name: {0!s}".format(fullname))
print("Email: {0!s}".format(email)) print("Email: {0!s}".format(email))
print("Public {0!s}".format(pubkey)) print("Public {0!s}".format(pubkey))
validation = input("\nIs this information correct? [y/N]") validation = input("\nIs this information correct? [y/N]")
while not re.match("[yYnN\n]", validation): while not re.match("[yYnN\n]", validation):
print("Please answer y for yes or n for no") print("Please answer y for yes or n for no")
@ -169,16 +156,13 @@ def main():
addtotable(cursor, connection, username, fullname, email, pubkey) addtotable(cursor, connection, username, fullname, email, pubkey)
connection.commit() connection.commit()
connection.close() connection.close()
except sqlite3.Error as e: except:
logging.exception("Database {0!s} couldnt be accessed or created. Exception: {0!s}". logging.exception("Database {0!s} couldnt be accessed or created. Exception:".format(config['DEFAULT']['applications_db']))
format(config['DEFAULT']['applications_db'], e))
if connection:
connection.close() connection.close()
exit(1) exit(1)
pass
return 0 return 0
if __name__ == "__main__": if __name__ == "__main__":
try: try:
main() main()

Loading…
Cancel
Save