Compare commits

...

80 Commits

Author SHA1 Message Date
n1trux dd5b4043be Merge remote-tracking branch 'origin/pr/4' 4 years ago
Darksider3 df1d09a155 Forgot that message. x_X 5 years ago
Darksider3 8008fa36b9 General overhaul, better messages, ordered imports etc 5 years ago
Darksider3 32c6ea7e07 System: Rename aio_register to aio_approve. 5 years ago
Darksider3 8d4d2038d3 Typing support and sqlite3.Row!
Typing will be handy in further development, as well will i settle down
to sqlite3.Row. Fetchall() returns now a list(grr) with
dict-imitating/sumating objects, which you can call keys() on and will
return them, what we now already use for Backups, which comes handy.

Also Typing gives the ability to let the code even more documentate
itself. It's planned to use all over the place but i've to read myself
into it yet more to get started on that one.
It's a step in the right direction!
5 years ago
Darksider3 84508820a6 ListUsers: Implement --user flag, rename --list to --list-asc 5 years ago
Darksider3 b7ffedfba2 Rename SQLitedb to SQLiteDB 5 years ago
Darksider3 fb4b577eb8 minor adjustemenets 5 years ago
Darksider3 f31117e940 System: aio_register, performs all neccessary steps to create user...
BUT will NOT catch ANY exceptions. Therefore the caller script itself is
REQUIRED to catch.
5 years ago
Darksider3 dd01acf801 editUsers: Refactor from System taken 5 years ago
Darksider3 05f27078f6 System: Refactors username given in constructor
deleted all references and added one class which accepts to set a
username
5 years ago
Darksider3 f204a74f98 Refactor some variable names 5 years ago
Darksider3 7ebabbb5f6 EditUsers: Implement --remove flag 5 years ago
Darksider3 b4d4fd9ad4 System: Improved return calls at remove_user 5 years ago
Darksider3 7d5ef3b493 Wording fix in docstring 5 years ago
Darksider3 0eeafa626e Documentation work all over the place
Renamed also some paramters
5 years ago
Darksider3 65c7bb6b3f Move argparser into mains function body 5 years ago
Darksider3 a17d0ed30f ListUsers: UAP and APP => Unapproved and Approved replaced 5 years ago
Darksider3 389b614de9 Revert "ListUsers: UAP and APP renamed, function updates /home/users.txt"
This reverts commit 84cc83db3c.
5 years ago
Darksider3 84cc83db3c ListUsers: UAP and APP renamed, function updates /home/users.txt
On every register() call the System rewrites the /home/users.txt to
reflect currently active users. Will fall apart when something
unexpected happened, but that's @helix responsibility.
5 years ago
Darksider3 452204a5e9 ListUsers: --list sorts all approved users ascending 5 years ago
Darksider3 c46adc2849 editUsers: Description, remove username change and so on 5 years ago
Darksider3 24eee6e84e Shebang/env for script execution 5 years ago
Darksider3 6ea679cb60 System: userdel returns 6 when no maildir is deleted but homedir is
So we agree on that one!
5 years ago
Darksider3 fb9a98fb81 Edit: Did delete, but here it is back!
It can now create users on disk, delete from disk(by changing the
status), change everything that really matters, except the username yet
5 years ago
Darksider3 eb6f3da3d7 System: Account for userdel returning 6 on success...
just because it couldnt delete the users mail.
5 years ago
Darksider3 fb7544bd8c edit: Added Name, email and first steps for status changes 5 years ago
Darksider3 7091cbcbd2 Validator: checkName taken from registration form 5 years ago
Darksider3 786b652b21 update dev environment with disable possiblities 5 years ago
Darksider3 84431bda1d Edit: Sceleton, Interface and first argument
The sceleton for the interface and it's first possible argument are
finished, so a users pbukey can be changed now without touching the DB
itself directly.

Checking for users existence and the presence of options/arguments is
done beforehand and will throw out when nothing useful got delivered to
the script.
5 years ago
Darksider3 7786df3761 System: Introduce function to write just write the auth file 5 years ago
Darksider3 96097e26b1 Catched that little error and killed it! 5 years ago
Darksider3 27e5f0445a Shortcuts to source when working on ssh-reg
Just source the file and you get the aliases for dev_run,
dev_stop(affects containers) and dev_build which rebuilds the ssh-reg
image
5 years ago
Darksider3 710ceacd7c Breaking up the code smell regarding the CFG.py!
It began smelling already but having some duplicate code across the
interfaces is still better than having all of it all over the place.

It enables to write specific flags which are nice to have. For example,
Import.py requires the --Import flag because it WANTS the user to read
the whole Help before it acts actually as an importer. When the user
supplies something they should know what's currently happening.

Also removes the hardcoded dependency on lib.CFG-Calls from most calls
which was already embarassingly present. Introduced some db and
cfg-variables which doesnt clutter anything but suck much less.

In future we provide a set of default arguments and a bare minimum -
config_ui as the bare minimum, default as the full blown storm.

This is rather big because it also patches several other smells
including a bug where a user from the db wouldnt be reported as existent
5 years ago
Darksider3 934b6bf75a Factor out 'as CFG' 5 years ago
Darksider3 670aa3d9c3 Factor out CONF_FILE 5 years ago
Darksider3 cdc72a30f4 Factor out REG_FILE 5 years ago
Darksider3 77efb4b339 delete cfgparse again and get it going as it where before 5 years ago
Darksider3 8c3ac4060f Split CFG.py to private/lib/cfgparse.py 5 years ago
Darksider3 192e70e4a2 Seperate out even more, own dir for UIs
default UI is now in private/lib/uis/default.py
5 years ago
Darksider3 5c6ecaf627 Modularize CFG.py into CWD, default_cmd and cmd ui 5 years ago
Darksider3 5a57a62780 Allow uppercase Chars afters first character in usernames 5 years ago
Darksider3 91b5e6ae7b Positional fix: OperationalError before general 5 years ago
Darksider3 77a31a44d1 Abort on errors in import file before even trying to activate
Moves every check regarding the imported file outside of import.py into
the validator. Also removes every follow-up checks regarding it out of
import.py.

Looks a whole lot cleaner now!
5 years ago
Darksider3 2ab27aa019 Validate Timestamps and don't not insert into DB when error
Validates now the timestamp in the import.py, doesn't insert users into
the sqlite-database before creating the systems account and checks now
for existence in the database too(unapproved users, who comes first,
gets his name first..)
5 years ago
Darksider3 3ae497e2eb Let the container stop gracefully with exec!
Right now the container can't stop gracefully because the sshd-server
runs on the server as PID 0. This results in the docker daemon not
killing it but waiting for it to die, which never happens, and results
in the default timeout-wait before it KILLS the process. With exec, the
sshd becomes PID 1 and can receive and process signals(probably SIGTERM
here) and handles them as well. The container stops now nearly
instantly..
5 years ago
Darksider3 b0856b558e Move import related stuff to the Import.py script.
and delete occurences in Import It is much tidier to look at and
doesn't clutter the Script. Is even a whole seperate task
so it does make no sense to call it in Backup when i think about it.
5 years ago
Darksider3 283143104d Actually we allow currently strange things, this fixes it on the usernames 5 years ago
Darksider3 d517db3b2a On Operational Exceptions we create the table and try again
this could potentially result in a endless recursion when the
table isnt writeable and a lot other funny things happen
but normally and in 99,999999999% of our cases this
is totally fine
5 years ago
Darksider3 a1563116f6 Renamed SSH-Exception 5 years ago
Darksider3 0c1f7ea252 Validate even imported files and users
Created a new file for the validation functions, should be soon(TM) used too
in the userapplication-script but dont hurry
5 years ago
Darksider3 8274cbb27e unregister() is just a different word for removing 5 years ago
Darksider3 4760e35fda it's freakin stupid to import IDs. Nothing depends on them. Ignore them. 5 years ago
Darksider3 96c403a3d9 Introducing own Exception classes to distinguish between possible System errors 5 years ago
Darksider3 c1488f6164 Return True on success 5 years ago
Darksider3 cd8fe5fed0 commit on querys 5 years ago
Darksider3 0718de20fb Let SQLite check for incorrect dates on timestamp row 5 years ago
Darksider3 2202af7826 Introduce imports, which comes with a new flag(--Import)
--Import depends on --file being present pointing to a valid CSV-file
that contains unique users with id's. When a user already exists
with his id, the script will fail.
It is also Backup.py-only which is reflected in the help message
5 years ago
Darksider3 c28e616ea5 Faster rebuildung by moving system relevant stuff up 5 years ago
Darksider3 e0efeee67d Better error handling, just hang a freakin' / to ssh_dir if it got screwed up 5 years ago
Darksider3 04d95c2d08 realign layout 5 years ago
Darksider3 43c7636920 Make PEP8 finally happy 5 years ago
Darksider3 290e72f159 Add various documentation to the System class and sqlitedb-Class...
... which are all written in rst-format, which sphynx understands and has
an own formal specification in  http://docutils.sourceforge.net/rst.html
5 years ago
Darksider3 d413662b36 Adds ability to delete users in db as well as on the system + type hints..
.... in function names for further documentation.
5 years ago
Darksider3 a6d63fee42 PEP8: One Import per line... -.-
Add TODO statements and foremost return False on error. Missed that
5 years ago
Darksider3 c43cad73fa Print Error messages to stderr instead of standard-out. Makes very much sense indeed! 5 years ago
Darksider3 89faf57b58 System: Still a lot @TODO here, but add it. Should(TM) work, but didn't
test it yet and also have to do a whole lot more. e.g. write out in dry-mode
which commands are getting to run in serious-mode but build up together
correctly in the correct order
5 years ago
Darksider3 2afb4c79a2 Implements a backup to csv. Uses StringIO because it has an own writer()
method, which is pretty nice to have when csv.writer() want's that on its
passed variable.

Also respects every flag yet introduced(-c, -f, -a, -u) and reuses the code
already written in ListUsers.py. It could be very nice to bring that code
into lib/ because it is probably needed way more often
5 years ago
Darksider3 90b2ee5236 add gitignore 5 years ago
Darksider3 a984b14c37 Implement -f/--file option. Defaults to stdout 5 years ago
Darksider3 4134e3cc2e introduces -f/--file, which outputs or takes from a file(conditional to the current program) 5 years ago
Darksider3 6e86bf11bb -a Flag: default false, do action on only approved users 5 years ago
Darksider3 c40ef4d40a Max username length is 16 now 5 years ago
Darksider3 cab4cd25a2 Well, that alternative view rocks hard... 5 years ago
Darksider3 b368ea058c huh, that already looks close to good... o.o 5 years ago
Darksider3 f06d4fa945 first steps towards real listings! 5 years ago
Darksider3 3915843943 introducing the -u flag, which turns on the 'unapproved' mode.
Oh, and just testing it in ListUsers.py :)
5 years ago
Darksider3 2eeeebdacb Add sceleton for listusers.py 5 years ago
Darksider3 96837cebe3 Externalize CFG-Handling into lib/CFG.py.
Never write this code again!
5 years ago
Darksider3 8ed215e89b Create sqlite3 connector class to handle all the errors and quirks for us 5 years ago

4
.gitignore vendored

@ -0,0 +1,4 @@
__pycache__/
.idea/
test*

@ -8,21 +8,6 @@ RUN apt-get update &&\
# Clean up APT when done.
RUN apt-get clean && rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
# private/{scripts, administrate.py}, public/{scripts, userapplications.py}, config/userapplicatonsconfig.ini
#configs, logs, db
COPY config/applicationsconfig.ini /app/data/applicationsconfig.ini
# admin scripts
COPY private/ /app/admin/
# user accessible scripts
# Make TILDE_ENV
COPY public/ /app/user/
#SSH config into /etc :)
COPY config/etc /etc
# create user for applications
RUN useradd -Md /app/user/ -s /app/user/userapplication.py tilde
@ -34,14 +19,20 @@ RUN usermod -U tilde
RUN useradd -Md /app/admin -s /app/admin/administrate.py admin
# privilege separation directory
RUN mkdir -p /var/run/sshd
# expose SSH port
EXPOSE 22
ENV TILDE_CONF="/app/data/applicationsconfig.ini"
RUN touch /app/data/applications.sqlite
RUN touch /app/data/applications.log
# Doesnt work, @TODO why
#RUN setfacl -R -m u:tilde:rwx /app/data/
RUN chown -R tilde /app/data
# admin scripts
COPY private/ /app/admin/
# user accessible scripts
# Make TILDE_ENV
COPY public/ /app/user/
RUN mkdir /app/user/.ssh
CMD ["sh", "-c", " echo TILDE_CONF=$TILDE_CONF > /app/user/.ssh/environment && /usr/sbin/sshd -D"]
CMD ["sh", "-c", " echo TILDE_CONF=$TILDE_CONF > /app/user/.ssh/environment && exec /usr/sbin/sshd -D"]

@ -0,0 +1,129 @@
#!/usr/bin/env python3
import configparser
import csv
import io
import ListUsers
import lib.uis.default as default_cmd # Follows -u, -a, -f flags
class Backup:
"""Backups a Tilde database to an CSV file
:Example:
>>> from Backup import Backup
>>> from ListUsers import ListUsers
>>> L = ListUsers.ListUsers("/path/to/sqlite").get_fetch()
>>> backup_db = Backup("stdout")
>>> backup_db.backup_to_file(L)
CSV-Separated list with headers in first row
"""
filename: str
quoting: int
dialect: str
field_names: tuple
def __init__(self, output: str, quoting: int = csv.QUOTE_NONNUMERIC, dialect: str = "excel"):
""" Constructs the Backup object
:param output: File name to backup to(set to stdout for stdout)
:type output: str
:param quoting: Set quoting for CSV Module
:type quoting: int
:param dialect: Set the CSV-Dialect. Defaults to excel, which is the classic CSV
:type dialect: str
"""
self.setFilename(output)
self.setQuoting(quoting)
self.setDialect(dialect)
self.setFieldnames(tuple(['id', 'username', 'email', 'name', 'pubkey', 'timestamp', 'status']))
def setDialect(self, dialect: str) -> None:
""" Set dialect for Object
:param dialect: Dialect to set for Object
:type dialect: str
:return: None
:rtype: None
"""
self.dialect = dialect
def setQuoting(self, quoting: int) -> None:
""" Set quoting in the CSV(must be supported by the CSV Module!)
:param quoting: Quoting Integer given by csv.QUOTE_* constants
:type quoting: int
:return: None
:rtype: None
"""
self.quoting = quoting
def setFilename(self, filename: str) -> None:
""" Sets Filename to output to
:param filename: Filename to output to(set stdout for stdout)
:type filename: str
:return: None
:rtype: None
"""
self.filename = filename
def setFieldnames(self, f_names: tuple) -> None:
""" Set fieldname to process
:param f_names: Fieldnames-Tuple
:type f_names: tuple
:return: None
:rtype: None
"""
self.field_names = f_names
def backup_to_file(self, fetched: list) -> bool:
"""Backup Userlist to File(or stdout)
:param fetched: List of values to write out CSV-formatted
:return: True, if success, None when not.
:rtype: bool
"""
returner = io.StringIO()
write_csv = csv.DictWriter(returner, fieldnames=self.field_names, quoting=self.quoting, dialect=self.dialect)
write_csv.writeheader()
for row in fetched:
write_csv.writerow(dict(row))
# sqlite3.Row doesn't "easily" convert to a dict itself sadly, so just a quick help from us here
# it actually even delivers a list(sqlite3.Row) also, which doesnt make the life a whole lot easier
if self.filename == "stdout":
print(returner.getvalue())
return True
else:
with open(self.filename, "w") as f:
print(returner.getvalue(), file=f)
return True
if __name__ == "__main__":
default_cmd.argparser.description += " - Backups Tilde Users to stdout or a file."
args = default_cmd.argparser.parse_args()
config = configparser.ConfigParser()
config.read(args.config)
L = ListUsers.ListUsers(config['DEFAULT']['applications_db'],
unapproved=args.unapproved, approved=args.approved)
fetch = L.get_fetch()
if fetch:
B = Backup(args.file)
B.setFieldnames(fetch[0].keys()) # sqlite3.row delivers its keys for us! SO NICE!
B.backup_to_file(fetch)
else:
print("nothing to backup!")
exit(1)
exit(0)

@ -0,0 +1,87 @@
#!/usr/bin/env python3
import configparser
import csv
import os
import lib.UserExceptions
import lib.uis.config_ui # dont go to default, just following -c flag
def import_from_file(file_path: str, db: str, user_ids: tuple = tuple([])) -> bool:
""" Imports Users from a given CSV-file to the system and DB
:param file_path:
:type file_path: str
:param db: Path to the sqlite db
:type db: str
:param user_ids: FIXME: Tuple which user_ids should we write
:type user_ids: tuple
:return: True on success, False when not
:rtype: bool
"""
if not os.path.isfile(file_path):
print(f"File {file_path} don't exist")
return False
if not os.path.isfile(db):
print(f"The database file {db} don't exist")
return False
if user_ids:
pass # empty tuple means everything
# noinspection PyBroadException
try:
with open(file_path, 'r', newline='') as f:
import lib.Validator
sql = lib.sqlitedb.SQLiteDB(db)
err = lib.Validator.checkImportFile(file_path, db)
if err is not True:
print(err)
exit(0)
import lib.sqlitedb
import lib.System
sys_ctl = lib.System.System("root")
reader = csv.DictReader(f) # @TODO csv.Sniffer to compare? When yes, give force-accept option
for row in reader:
if row["status"] == "1":
try:
sys_ctl.setUser(row["username"])
sys_ctl.aio_approve(row["pubkey"])
print(row['username'], "====> Registered.")
except lib.UserExceptions.General as GeneralExcept:
print(f"Something didnt work out! {GeneralExcept}")
elif row["status"] == "0":
print(row['username'] + " not approved, therefore not registered.")
try:
sql.safequery(
"INSERT INTO `applications` (username, name, timestamp, email, pubkey, status) "
"VALUES (?,?,?,?,?,?)", tuple([row["username"], row["name"], row["timestamp"],
row["email"], row["pubkey"], row["status"]]))
except OSError as E:
pass
print(f"UUFFF, something went WRONG with the file {file_path}: {E}")
except Exception as didntCatch:
print(f"Exception! UNCATCHED! {type(didntCatch)}: {didntCatch}")
return True
if __name__ == "__main__":
ArgParser = lib.uis.config_ui.argparser
ArgParser.description += "- Imports a CSV file consisting of user specific details to the database"
ArgParser.add_argument('-f', '--file', default="stdout",
type=str, help='Import from CSV file', required=True)
ArgParser.add_argument('--Import', default=False, action="store_true",
help="Import Users.", required=True)
args = ArgParser.parse_args()
config = configparser.ConfigParser()
config.read(args.config)
if not args.Import:
print("Error, need the import flag")
if not args.file:
print("Error, need the import file")
if not args.file:
print("You MUST set a CSV-file with the -f/--file flag that already exist")
exit(1)
import_from_file(args.file, config['DEFAULT']['applications_db'])
exit(0)

@ -0,0 +1,120 @@
#!/usr/bin/env python3
import configparser
import sqlite3 # sqlite3.Row-Object
from typing import List # Typing support!
import lib.uis.default as default_cmd # Follows -u, -a, -f flags
from lib.sqlitedb import SQLiteDB
class ListUsers:
db = None
usersFetch = None
def __init__(self, db: str, unapproved: bool = False, approved: bool = True, single_user: str = None):
"""Constructs ListUsers
:param db: Database to access
:type db: str
:param unapproved: only List unapproved users
:type unapproved: bool
:param approved: only list approved users
:type approved: bool
"""
self.db = SQLiteDB(db)
if unapproved: # only unapproved users
query = "SELECT * FROM `applications` WHERE `status` = '0'"
elif approved: # Approved users
query = "SELECT * FROM `applications` WHERE `status` = '1'"
else: # All users
query = "SELECT * FROM `applications`"
self.usersFetch = self.db.query(query)
if single_user is not None:
query = "SELECT * FROM `applications` WHERE `username` = ?"
self.usersFetch = self.db.safequery(query, tuple([single_user]))
def output_as_list(self) -> str:
"""Generates a string with one (approved) user per line and one newline at the end
:rtype: str
:return: String consisting with one(activated) user per line
"""
list_str: str = ""
query = "SELECT `username` FROM `applications` WHERE `status` = '1' ORDER BY timestamp ASC"
self.usersFetch = self.db.query(query)
for users in self.usersFetch:
list_str += users["username"] + "\n"
return list_str
def prettyPrint(self) -> None:
pass # see below why not implemented yet, texttable...
def get_fetch(self) -> List[sqlite3.Row]:
""" Returns a complete fetch done by the lib.sqlitedb-class
:return: Complete fetchall(). A List[sqlite3.Row] with dict-emulation objects.
:rtype: List[sqlite3.Row]
"""
return self.usersFetch
# @TODO MAYBE best solution: https://pypi.org/project/texttable/
# examle:
"""
from texttable import Texttable
t = Texttable()
t.add_rows([['Name', 'Age'], ['Alice', 24], ['Bob', 19]])
print(t.draw())
---------------> Results in:
+-------+-----+
| Name | Age |
+=======+=====+
| Alice | 24 |
+-------+-----+
| Bob | 19 |
+-------+-----+
for user in fetch:
print("ID: {}; Username: \"{}\"; Mail: {}; Name: \"{}\"; Registered: {}; Status: {}".format(
user["id"], user["username"], user["email"], user["name"], user["timestamp"], user["status"]
))
"""
if __name__ == "__main__":
default_cmd.argparser.description += " - Lists Users from the Tilde database."
default_cmd.argparser.add_argument('--list-asc', default=False, action="store_true",
help='Output a newline seperated list of users', required=False, dest="args_asc")
default_cmd.argparser.add_argument('--user', default=None, type=str,
help="Just show a specific user by it's name", required=False)
args = default_cmd.argparser.parse_args()
config = configparser.ConfigParser()
config.read(args.config)
ret = ""
if args.user is not None:
L = ListUsers(config['DEFAULT']['applications_db'], unapproved=args.unapproved, approved=args.approved,
single_user=args.user)
else:
L = ListUsers(config['DEFAULT']['applications_db'], unapproved=args.unapproved, approved=args.approved)
if args.args_asc:
ret = L.output_as_list()
else:
fetch = L.get_fetch()
ret += "ID %-1s| Username %-5s| Mail %-20s| Name %-17s| Registered %-8s | State |\n" % (
" ", " ", " ", " ", " "
)
ret += 102 * "-" + "\n"
for user in fetch:
ret += "%-4i| %-14s| %-25s| %-22s| %-8s | %-5i |\n" % (
user["id"], user["username"], user["email"], user["name"], user["timestamp"], user["status"]
)
if args.file != "stdout":
with open(args.file, 'w') as f:
print(ret, file=f)
else:
print(ret)
exit(0)

@ -0,0 +1,147 @@
#!/usr/bin/env python3
import configparser
import sqlite3
import lib.System
import lib.UserExceptions
import lib.Validator
import lib.sqlitedb
import lib.uis.config_ui # only follow -c flag
if __name__ == "__main__":
lib.uis.config_ui.argparser.description += " - Edit Tilde Users"
ArgParser = lib.uis.config_ui.argparser
ArgParser.add_argument('--user', type=str,
help='Tilde users name to edit', required=True)
Mutually = ArgParser.add_mutually_exclusive_group()
Mutually.add_argument('-r', '--remove', default=False, action="store_true",
help='Remove an approved/unapproved User from the system(and DB). Effectively purges him.',
required=False)
Mutually.add_argument("--verify", default=True, action="store_false",
help="Turns off value checks",
required=False)
ArgParser.add_argument('--sshpubkey', type=str, default=None,
help="Stores the new given SSH-Key in given user", required=False)
ArgParser.add_argument('--name', type=str, default=None,
help="Sets the stored name of the given user")
ArgParser.add_argument('--username', type=str, default=None,
help="Rename given User")
ArgParser.add_argument('--email', type=str, default=None,
help="Set new email address for given user")
ArgParser.add_argument('--status', type=int, default=None,
help="Set status of given user")
args = ArgParser.parse_args()
config = configparser.ConfigParser()
config.read(args.config)
db = config['DEFAULT']['applications_db']
if not args.sshpubkey and not args.name and not args.username and not args.email and args.status is None \
and not args.remove:
print(f"Well, SOMETHING must be done with {args.user} ;-)")
exit(1)
# --> --user
if not lib.Validator.checkUserInDB(args.user, db):
print(f"User {args.user} does not exist in the database.")
exit(1)
DB = lib.sqlitedb.SQLiteDB(db)
sys_ctl = lib.System.System(args.user)
if not DB:
print("Could not establish connection to database")
exit(1)
CurrentUser = DB.safequery("SELECT * FROM `applications` WHERE `username`=?", tuple([args.user]))[0]
# --> --remove
if args.remove:
print(f"Removing {args.user} from the system and the database...")
try:
DB.removeApplicantFromDBperUsername(args.user)
print(f"Purged from the DB")
if CurrentUser["status"] == 1:
sys_ctl.remove_user()
print(f"Purged from the system")
else:
print(f"'{args.user}' was not approved before, therefore not deleting from system itself.")
except lib.UserExceptions.General as e:
print(f"{e}")
exit(1)
print(f"Successfully removed '{args.user}'.")
exit(0)
# --> --sshpubkey
if args.sshpubkey:
if not lib.Validator.checkSSHKey(args.sshpubkey):
print(f"Pubkey '{args.sshpubkey}' isn't valid.")
exit(1)
try:
DB.safequery("UPDATE `applications` SET `pubkey`=? WHERE `username`=?",
tuple([args.sshpubkey, args.user]))
CurrentUser = DB.safequery("SELECT * FROM `applications` WHERE `username` = ? ", tuple([args.user]))[0]
if int(CurrentUser["status"]) == 1:
sys_ctl.make_ssh_usable(args.sshpubkey)
except sqlite3.Error as e:
print(f"Something unexpected happened! {e}")
exit(1)
except lib.UserExceptions.ModifyFilesystem as e:
print(f"One action failed during writing the ssh key back to the authorization file. {e}")
print(f"'{args.user}'s SSH-Key updated successfully.")
# --> --name
if args.name:
if not lib.Validator.checkName(args.name):
print(f"'{args.name}' is not a valid Name.")
exit(1)
try:
DB.safequery("UPDATE `applications` SET `name` =? WHERE `username` =?", tuple([args.name, args.user]))
except sqlite3.Error as e:
print(f"Could not write '{args.name}' to database: {e}")
print(f"'{args.user}'s Name changed to '{args.name}'.")
# --> --email
if args.email:
if not lib.Validator.checkEmail(args.email):
print(f"'{args.email}' is not a valid Mail address!")
exit(1)
try:
DB.safequery("UPDATE `applications` SET `email` =? WHERE `username` =?", tuple([args.email]))
except sqlite3.Error as e:
print(f"Could not write '{args.email}' to the database. {e}")
print(f"'{args.user}' Mail changed to '{args.email}'.")
# --> --status
if args.status is not None:
if args.status != 0 and args.status != 1:
print("Only 0 and 1 are valid status, where 1 is activated and 0 is unapproved.")
exit(0)
# just takes first result out of the dict
if args.status == int(CurrentUser["status"]):
print(f"New and old status are the same.")
if args.status == 0 and int(CurrentUser["status"]) == 1:
try:
DB.safequery("UPDATE `applications` SET `status` =? WHERE `id`=?",
tuple([args.status, CurrentUser["id"]]))
sys_ctl.remove_user()
except sqlite3.Error as e:
print(f"Could not update database entry for '{args.user}', did not touch the system")
exit(1)
except lib.UserExceptions.UnknownReturnCode as e:
print(f"Could not remove '{args.user}' from the system, unknown return code: {e}. DB is modified.")
exit(1)
print(f"Successfully changed '{args.user}'s status to 0 and cleared from the system.")
if args.status == 1 and int(CurrentUser["status"]) == 0:
try:
DB.safequery("UPDATE `applications` SET `status`=? WHERE `username`=?",
tuple([args.status, args.user]))
sys_ctl.aio_approve(CurrentUser["pubkey"])
except sqlite3.Error as e:
print(f"Could not update Users status in database")
exit(1)
except lib.UserExceptions.General as ChangeUser:
print(f"Some chain in the cattle just slipped away, my lord! {ChangeUser}")
exit(1)
print(f"Successfully changed '{args.user}'s status to 1 and created on the system.")
exit(0)

@ -0,0 +1,6 @@
import configparser
import lib.uis.default as default_cmd
args = default_cmd.argparser.parse_args()
config = configparser.ConfigParser()
config.read(args.config)

@ -0,0 +1,260 @@
import os
import pwd
import subprocess
import lib.UserExceptions
class System:
"""Class to interact with the system specifically to support our needs 0w0
:Example:
>>> from lib.System import System as System
>>> Sys_ctl = System("Test", dryrun=True)
>>> Sys_ctl.register()
>>> Sys_ctl.lock_user_pw()
>>> Sys_ctl.add_to_usergroup()
>>> Sys_ctl.make_ssh_usable("sshkey")
"""
dry: bool = False
create_command = []
home: str = ""
user: str
def setUser(self, username: str):
self.user = username
def __init__(self, username: str, dryrun: bool = False, home: str = "/home/"):
"""Creates an objects. Can set dry run.
:param username: Username to manipulate
:type username: str
:param dryrun: Run all command in a dry-run? When enabled, doesn't make any changes to the system (defaults to
false)
:type dryrun: bool
:param home: Standard directory to search for the home directories of your users(default is /home/)
:type home: str
:raises:
ValueError: if homedir can not be found
"""
self.dry = dryrun
if not home.endswith("/"):
home += "/"
if not os.path.isdir(home):
raise ValueError("home should be an existent directory...")
self.home = home
self.user = username
def aio_approve(self, pubkey, group="tilde"):
""" Executes all neccessary steps to create a user from itself. Raises ALOT of possible exceptions
:Note: CAREFULL! You MUST except the exceptions!
:param pubkey: Users public ssh key
:type pubkey: str
:param group: User-group. Defaults to tilde
:type group: str
:return: None
:raises:
lib.UserExceptions.UserExistsAlready: User Exists already on system
lib.UserExceptions.UnknownReturnCode: Unknown Return Code from useradd
lib.UserExceptions.SSHDirUncreatable: Users SSH Dir couldnt be created
lib.UserExceptions.ModifyFilesystem: Something with CHMOD failed
"""
self.register()
self.lock_user_pw()
self.add_to_usergroup(group)
self.make_ssh_usable(pubkey)
def register(self, cc: tuple = tuple(["useradd", "-m"])) -> bool:
"""Creates an local account for the given username
:param cc: Tuple with commands separated to execute on the machine. (defaults to useradd -m)
:type cc: tuple
:return: True, if worked, raises lib.UserExceptions.UserExistsAlready when not
:rtype: bool
:raises:
lib.UserExceptions.UserExistsAlready: when username specified already exists on the system
"""
create_command = cc
cc = create_command + tuple([self.user])
if self.dry:
self.printTuple(cc)
return True
elif not self.dry:
rt = subprocess.call(cc)
if rt != 0:
raise lib.UserExceptions.UserExistsAlready(self.user)
return True
def unregister(self) -> bool:
""" Just an alias function for removeUser
:return: True, when success, False(or exception) when not
:rtype: bool
"""
return self.remove_user()
def make_ssh_usable(self, pubkey: str, sshdir: str = ".ssh/") -> bool:
""" Make SSH usable for our newly registered user
:param pubkey: Public SSH Key for the User you want accessible by SSH
:type pubkey: str
:param sshdir: Directory to write the authorized_keys File to. PWD is $HOME of said user. (defaults to ".ssh/")
:type sshdir: str
:return: True, if worked, raises lib.UserExceptions.UnknownReturnCode, lib.UserExceptions.HomeDirExistsAlready
or lib.UserExceptions.ModifyFilesystem when not
:rtype: bool
:raises:
lib.UserExceptions.SSHDirUncreatable: if the ssh-dir couldnt be created nor exist
lib.UserExceptions.ModifyFilesystem: When chmod to .ssh and authorized_keys failed
lib.UserExceptions.General: if PWD cant find the specified user
"""
if self.dry:
print("Nah, @TODO, but actually kinda too lazy for this lul. Just a lot happening here")
return True
if not sshdir.endswith("/"):
sshdir += "/"
ssh_dir = self.home + self.user + "/" + sshdir
try:
os.mkdir(ssh_dir)
except FileExistsError:
pass # thats actually a good one for us :D
except OSError as e:
raise lib.UserExceptions.SSHDirUncreatable(f"Could not create {ssh_dir}: Exception: {e}")
try:
self.write_ssh(pubkey, ssh_dir)
except OSError as e:
raise lib.UserExceptions.ModifyFilesystem(
f"Could not write and/or chmod 0700 {ssh_dir} or {ssh_dir}/authorized_keys, Exception: {e}")
try:
pwdnam = pwd.getpwnam(self.user)
os.chown(ssh_dir, pwdnam[2], pwdnam[3]) # 2=>uid, 3=>gid
os.chown(ssh_dir + "authorized_keys", pwd.getpwnam(self.user)[2], pwd.getpwnam(self.user)[3])
except OSError as e: # by os.chown
raise lib.UserExceptions.ModifyFilesystem(
f"Could not chown {ssh_dir} and/or authorized_keys to {self.user} and their group, Exception: {e}", )
except KeyError as e: # by PWD
raise lib.UserExceptions.General(f"PWD can't find {self.user}: {e}")
return True
@staticmethod
def write_ssh(key: str, ssh_dir: str) -> None:
""" Write SSH key to a specified directory(appends authorized_keys itself!)
:param key: Key to write
:type key: str
:param ssh_dir: SSH Directory to write to
:type ssh_dir: str
:return: None
"""
with open(ssh_dir + "authorized_keys", "w") as f:
print(key, file=f)
f.close()
os.chmod(ssh_dir + "authorized_keys", 0o700) # we dont care about the directory here
def lock_user_pw(self, cc: tuple = tuple(["usermod", "--lock"])) -> bool:
"""Lock a users password so it stays empty
:param cc: Commands to run in the subprocess to lock it down(defaults to usermod --lock)
:type cc: tuple
:rtype: bool
:return: True, if worked, raises lib.UserExceptions.UnknownReturnCode when not
:raises:
lib.UserExceptions.UnknownReturnCode: When cc returns something else then 0
"""
lock_command = cc
cc = lock_command + tuple([self.user])
if self.dry:
self.printTuple(cc)
return True
elif not self.dry:
rt = subprocess.call(cc)
if rt != 0:
raise lib.UserExceptions.UnknownReturnCode(f"Could not lock user '{self.user}'; '{cc}' returned '{rt}'")
return True
def add_to_usergroup(self, group: str = "tilde", cc: tuple = tuple(["usermod", "-a", "-G"])) -> bool:
""" Adds a given user to a given group
:param group: Groupname where you want to add your user to
:type group: str
:param cc: Commands to execute that adds your user to said specific group(defaults to usermod -a -G")
:type cc: tuple
:return: True, if worked, raises lib.UserExceptions.UnknownReturnCode when not
:rtype bool
:raises:
lib.UserExceptions.UnknownReturnCode: if cc returned something else then 0
"""
add_command = cc
cc = add_command + tuple([group, self.user])
if self.dry:
self.printTuple(cc)
return True
elif not self.dry:
rt = subprocess.call(cc)
if rt != 0:
raise lib.UserExceptions.UnknownReturnCode(
f"Could not add user '{self.user}' to group '{group}' with command '{cc}', returned '{rt}'", )
return True
@staticmethod
def printTuple(tup: tuple) -> None:
"""Prints a tuple with spaces as separators
:param tup: Tuple you want to print
:type tup: tuple
:rtype: None
:returns: Nothing
"""
pp = ""
for i in tup:
pp += i + " "
print(pp)
def remove_user(self, cc: tuple = tuple(["userdel", "-r"])) -> bool:
"""Removes the specified user from the system
:param cc: Commands to execute to delete the user from the System(defaults to userdel -r)
:type cc: tuple
:return: True, if worked, raises lib.UserExceptions.UnknownReturnCode when not
:rtype: bool
:raises:
lib.UserExceptions.UnknownReturnCode: When cc returns something else then 0 or 6
"""
remove_command = cc
cc = remove_command + tuple([self.user])
if self.dry:
self.printTuple(cc)
return True
else:
ret = subprocess.Popen(cc, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
ret.wait() # wait for cc
stdio, err_io = ret.communicate() # get stdout as well as stderr
if ret.returncode != 0 and ret.returncode != 6: # userdel returns 6 when no mail dir was found but success
raise lib.UserExceptions.UnknownReturnCode(
f"Could not delete user with command {cc}. Return code: {ret.returncode},"
f" stdout/stderr: {stdio + err_io}")
return True
if __name__ == "__main__":
try:
S = System("dar", dryrun=True)
S.register()
S.lock_user_pw()
S.add_to_usergroup()
# if not S.make_ssh_usable("dar", "SSHpub"):
# print("Huh, error :shrug:")
exit(0)
except KeyboardInterrupt:
pass

@ -0,0 +1,48 @@
class General(Exception):
pass
class User(General):
pass
class UnknownUser(User):
def __init__(self, name):
Exception.__init__(self, f"Tried to perform action on unknown user '{name}'")
class UserExistsAlready(User):
def __init__(self, name):
Exception.__init__(self, f"User '{name}' is already registered")
class UnknownReturnCode(General):
pass
class ModifyFilesystem(General):
pass
class SSHDirUncreatable(ModifyFilesystem):
pass
class SQLiteDatabaseDoesntExistYet(General):
pass
class UsernameLength(User):
pass
class UsernameTooShort(User):
pass
class UsernameTooLong(User):
pass
class UsernameInvalidCharacters(User):
pass

@ -0,0 +1,219 @@
import csv
import pwd
import re
import lib.sqlitedb
def checkUsernameCharacters(username: str) -> bool:
""" Checks the Username for invalid characters. Allow only alphanumerical characters, a lower alpha one first,
Followed by any sequence of digits and characters
:param username: String to check for validity
:type username: str
:return: True when valid, False when not
:rtype: bool
"""
if " " not in username and "_" not in username and username.isascii() and username[:1].islower() and \
not username[0].isnumeric():
if not re.search(r"\W+", username):
if not re.search("[^a-zA-Z0-9]", username):
return True
return False
def checkUsernameLength(username: str, upper_limit: int = 16, lower_limit: int = 3) -> bool:
""" Checks username for an upper and lower bounds limit character count
:param username: Username to check
:type username: str
:param upper_limit: Upper limit bounds to check for(default is 16)
:type upper_limit: int
:param lower_limit: Lower limit bounds to check for(default is 3)
:return: True, when all bounds are in, False when one or both aren't.
:rtype: bool
"""
if len(username) > upper_limit:
return False
if len(username) < lower_limit:
return False
return True
def checkUserExists(username: str) -> bool:
""" Checks if the User exists on the **SYSTEM** by calling PWD on it.
**Note**: You might want to use this in conjunction with checkUserInDB
:param username:
:type username: str
:return: True when exists, False when not
:rtype: bool
"""
try:
pwd.getpwnam(username)
except KeyError:
return False
return True # User already exists
def checkUserInDB(username: str, db: str) -> bool:
""" Checks users existence in the **DATABASE**.
:Note: You might want to use this in conjunction with `checkUserExists`
:param username: Username to check existence in database
:type username: str
:param db: Path to database to check in
:type db: str
:return: True, when User exists, False when not
"""
try:
ldb = lib.sqlitedb.SQLiteDB(db)
fetched = ldb.safequery("SELECT * FROM 'applications' WHERE username = ?", tuple([username]))
if fetched:
return True
except lib.sqlitedb.sqlite3.Error as e:
print(f"SQLite Exception: {e}")
return False
def checkSSHKey(key: str) -> bool:
""" Checks SSH Key for meta-data that we accept.
:Note: We currently only allow ssh keys without options but with a mail address at the end in b64 encoded.
The currently supported algorithms are: ecdfsa-sha2-nistp256, 'ecdsa-sha2-nistp384', 'ecdsa-sha2-nistp521',
'ssh-rsa', 'ssh-dss' and 'ssh-ed25519'
:param key: Key to check
:return: True, when Key is valid, False when not
:rtype: bool
"""
# taken from https://github.com/hashbang/provisor/blob/master/provisor/utils.py, all belongs to them! ;)
import base64
if len(key) > 8192 or len(key) < 80:
return False
key = key.replace("\"", "").replace("'", "").replace("\\\"", "")
key = key.split(' ')
types = ['ecdsa-sha2-nistp256', 'ecdsa-sha2-nistp384',
'ecdsa-sha2-nistp521', 'ssh-rsa', 'ssh-dss', 'ssh-ed25519']
if key[0] not in types:
return False
try:
base64.decodebytes(bytes(key[1], "utf-8"))
except TypeError:
return False
return True
def checkEmail(mail: str) -> bool:
""" Checks Mail against a relatively simple REgex Pattern.
:param mail: Mail to check
:type mail: str
:return: False, when the Mail is invalid, True when valid.
:rtype: bool
"""
if not re.match("(^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$)", mail):
return False
else:
return True
def checkDatetimeFormat(datetime_str: str) -> bool:
""" Checks a Strings format on date time.
:param datetime_str: String to check
:type datetime_str: str
:return: True when valid, False when not.
:rtype: bool
"""
import datetime
try:
datetime.datetime.strptime(datetime_str, "%Y-%m-%d %H:%M:%S")
except ValueError:
return False
return True
def checkName(name: str) -> bool:
""" Checks a users (real) Name against a real simple REgex Pattern.
:param name: Name/String to check
:type name: str
:return: True when valid, False when not.
:rtype: bool
"""
if not re.match("\w+\s*\w", name):
return False
else:
return True
def checkImportFile(path: str, db: str):
""" Checks an CSV file against most of the validators and prints an Error message with the line number corresponding
to said failure.. Those includes: checkName, checkUsernameCharacters,
ckeckUsernameLength, duplicate usernames(in the CSV), checkSSHKey, checkEmail, checkUserExists, checkUserInDB,
checkDatetimeformat and if the status is 1 or 0.
:param path: Path to file to check
:type path: str
:param db: Path to database file(SQLite)
:type db: str
:return: Str when Failure, True when success(All tests passed)
:rtype: Str or None
"""
errstr = ""
valid = True
ln = 1 # line number
valid_names_list = []
with open(path, 'r', newline='') as f:
reader = csv.DictReader(f)
for row in reader:
# if any of this fails move on to the next user, just print a relatively helpful message lel
if not lib.Validator.checkName(row["name"]):
errstr += f"Line {ln}: Name: '{row['name']}' seems not legit. Character followed by character should" \
f" be correct.\n"
valid = False
if not lib.Validator.checkUsernameCharacters(row["username"]):
errstr += (f"Line {ln}: Username contains unsupported characters or starts with a number: '"
f"{row['username']}'.\n")
valid = False
if not lib.Validator.checkUsernameLength(row["username"]):
errstr += f"Line {ln}: Username '{row['username']}' is either too long(>16) or short(<3)\n"
valid = False
# dup checking
if row["username"] in valid_names_list:
errstr += f"Line {ln}: Duplicate Username {row['username']}!\n"
valid = False
else:
valid_names_list.append(row["username"])
# dup end
if not lib.Validator.checkSSHKey(row["pubkey"]):
errstr += f"Line {ln}: Following SSH-Key of user '{row['username']}' isn't valid: " \
f"'{row['pubkey']}'.\n"
valid = False
if not lib.Validator.checkEmail(row["email"]):
errstr += f"Line {ln}: E-Mail address of user '{row['username']}' '{row['email']}' is not valid.\n"
valid = False
if lib.Validator.checkUserExists(row["username"]) or checkUserInDB(row["username"], db):
errstr += f"Line {ln}: User '{row['username']}' already exists.\n"
valid = False
if not lib.Validator.checkDatetimeFormat(row["timestamp"]):
errstr += f"Line {ln}: Timestamp '{row['timestamp']}' from user '{row['username']}' is invalid.\n"
valid = False
if int(row["status"]) > 1 or int(row["status"]) < 0:
errstr += f"Line {ln}: Status '{row['status']}' MUST be either 0 or 1.\n"
valid = False
ln += 1
if valid:
return True
else:
return errstr

@ -0,0 +1,9 @@
import os
cwd = os.environ.get('TILDE_CONF')
if cwd is None:
cwd = os.getcwd() + "/applicationsconfig.ini"
else:
if os.path.isfile(cwd) is False:
cwd = os.getcwd() + "/applicationsconfig.ini"
# cwd is now either cwd/applicationsconfig or $TILDE_CONF

@ -0,0 +1,164 @@
#!/usr/bin/env python3
import sqlite3
from sys import stderr as stderr
from typing import List # Typing support!
# create dictionary out of sqlite results
def dict_factory(cursor, row):
d: dict = {}
for idx, col in enumerate(cursor.description):
d[col[0]] = row[idx]
return d
class SQLiteDB:
"""SQLitedb handles EVERYTHING directly related to our Database."""
db = ""
cursor = None
connection = None
last_result = None
def __init__(self, dbpath: str):
"""
:param dbpath: Path to the database we want to open
:type dbpath: str
:returns: Object for the SQLitedb-Class.
:rtype: object
"""
db = dbpath
try:
self.connection = sqlite3.connect(db)
self.cursor = self.connection.cursor()
except sqlite3.Error as e:
print("Connection error: %s" % e, file=stderr)
self.cursor.row_factory = sqlite3.Row # every result will be a dict now
def __del__(self):
try:
self.connection.commit()
self.connection.close()
except sqlite3.Error as e:
print("Couldn't gracefully close db: %s" % e, file=stderr)
def query(self, qq: str) -> List[sqlite3.Row]:
"""Do a query and automagically get the fetched results in a list
:param qq: Query to execute
:type qq: str
:returns: A tuple(/list) consisting with any fetched results
:rtype: list
"""
try:
self.cursor.execute(qq)
self.last_result = self.cursor.fetchall()
self.connection.commit()
except sqlite3.OperationalError:
self._createTable()
return self.query(qq)
except sqlite3.Error as e:
print("Couldn't execute query %s, exception: %s" % (qq, e), file=stderr)
self.last_result = []
return self.last_result
# sometimes we need the cursor for safety reasons, for example does sqlite3 all the security related
# escaoing in supplied strings for us, when we deliver it to con.execute in the second argument as a tuple
def getCursor(self) -> sqlite3:
"""Returns SQLite3 Cursor. Use with **c a u t i o n**... """
return self.cursor
# we could try to utilise that ourselfs in a function. Be c a r e f u l, these values in the tuple MUST HAVE
# THE RIGHT TYPE
def safequery(self, qq: str, deliver: tuple) -> List[sqlite3.Row]:
""" Shall handle any query that has user input in it as an alternative to self.query
:param qq: Query to execute
:type qq: str
:param deliver: User inputs marked with the placeholder(`?`) in the str
:type deliver: tuple
:returns: A tuple(/list) consisting with any fetched results
:rtype: List[sqlite3.Row]
"""
try:
self.cursor.execute(qq, deliver)
self.last_result = self.cursor.fetchall()
self.connection.commit()
except TypeError as e:
print("Types in given tuple doesnt match to execute query \"%s\": %s" % (qq, e), file=stderr)
self.last_result = []
except sqlite3.OperationalError:
self._createTable()
return self.safequery(qq, deliver)
except sqlite3.Error as e:
print("Couldn't execute query %s, exception: %s" % (qq, e), file=stderr)
print(deliver)
print(type(e))
self.last_result = []
return self.last_result
def removeApplicantFromDB(self, userid: int) -> bool:
"""Removes Applicants from the DB by ID. Use along System.removeUser()
:param userid: User ID to remove from the Database
:type userid: int
:returns: True, if removal was successful(from the DB), False when not
:rtype: bool
"""
try:
self.last_result = self.cursor.execute("DELETE FROM `applications` WHERE id = ? ", [userid])
self.connection.commit()
except sqlite3.OperationalError:
print("The database has probably not yet seen any users, so it didnt create your table yet. Come back"
"when a user tried to register")
return False
except sqlite3.Error as e:
print(f"Could not delete user with id: {userid}, exception in DB: {e}") # @TODO LOGGING FFS
return False
return True
def removeApplicantFromDBperUsername(self, username: str) -> bool:
"""Removes Applicants from the DB by Username. Use along System.removeUser()
:param username: Username to remove from the database
:type username: str
:returns: True, if removal was successful(from the DB), False when not
:rtype: bool
"""
try:
self.last_result = self.cursor.execute("DELETE FROM `applications` WHERE username = ?", [username])
self.connection.commit()
except sqlite3.OperationalError:
print("The database has probably not yet seen any users, so it didnt create your table yet. Come back"
"when a user tried to register")
return False
except sqlite3.Error as e:
print(f"Could not delete user {username}, exception in DB: {e}") # @TODO LOGGING
return False
return True
def _createTable(self) -> None:
try:
self.cursor.execute(
"CREATE TABLE IF NOT EXISTS applications("
"id INTEGER PRIMARY KEY AUTOINCREMENT,"
"username TEXT NOT NULL, email TEXT NOT NULL,"
"name TEXT NOT NULL, pubkey TEXT NOT NULL,"
"timestamp DATETIME DEFAULT CURRENT_TIMESTAMP CONSTRAINT "
"timestamp_valid CHECK( timestamp IS strftime('%Y-%m-%d %H:%M:%S', timestamp))"
",status INTEGER NOT NULL DEFAULT 0);")
self.connection.commit()
except sqlite3.Error as e:
print(f"The database probably doesn't exist yet, but read the message: {e}")
print("The database table didn't exist yet; created it successfully!")
if __name__ == "__main__":
try:
SQLiteDB("bla.db")
print("hi")
exit(0)
except KeyboardInterrupt:
pass

@ -0,0 +1,6 @@
import argparse
import lib.cwd
argparser = argparse.ArgumentParser(description='Tilde administration tools ', conflict_handler="resolve")
argparser.add_argument('-c', '--config', default=lib.cwd.cwd,
type=str, help='Path to configuration file', required=False)

@ -0,0 +1,11 @@
import lib.uis.config_ui
argparser = lib.uis.config_ui.argparser
# store_true just stores true when the command is supplied, so it doesn't need choices nor types
argparser.add_argument('-u', '--unapproved', default=False, action="store_true",
help='only unapproved users. Default is only approved.', required=False)
argparser.add_argument('-a', '--approved', default=False, action="store_true",
help="Only approved Users.", required=False)
argparser.add_argument('-f', '--file', default="stdout",
type=str, help='write to file instead of stdout', required=False)

@ -0,0 +1,5 @@
unalias dev_build
unalias dev_run
unalias dev_bash
unalias dev_stop
unalias dev_disable

@ -0,0 +1,5 @@
alias dev_run="docker container run -l dev-ssh-reg --name dev-ssh-reg --rm -itd -v $PWD/private/:/app/admin ssh-reg"
alias dev_stop="docker container stop dev-ssh-reg"
alias dev_build="docker build -t ssh-reg:latest --force-rm ."
alias dev_bash="docker container exec -it dev-ssh-reg bash -c 'cd /app/admin; exec bash --login -i'"
alias dev_disable="source private/scripts/disable.sh"

@ -1,30 +1,26 @@
#!/usr/bin/env python3
import re, configparser, logging, sqlite3, argparse
from os import getcwd
import argparse
import configparser
import logging
import re
import sqlite3
from os import environ
from os import getcwd
from os import path as ospath
import re, configparser, logging, sqlite3
try:
cwd = environ.get('TILDE_CONF')
if cwd is None:
cwd=getcwd()+"/applicationsconfig.ini"
cwd = getcwd()+"/applicationsconfig.ini"
else:
if ospath.isfile(cwd) is False:
cwd=getcwd()+"/applicationsconfig.ini"
cwd = getcwd() + "/applicationsconfig.ini"
# cwd is now either cwd/applicationsconfig or $TILDE_CONF
argparser = argparse.ArgumentParser(description='interactive registration formular for tilde platforms')
argparser.add_argument('-c', '--config', default=cwd, type=str, help='Config file', required=False)
args = argparser.parse_args()
CONF_FILE = args.config
except:
# intended broad, @TODO check them all for errors instead of everything in one
logging.exception("Argumentparser-Exception: ")
exit(0)
try:
config = configparser.ConfigParser()
config.read(CONF_FILE)
logging.basicConfig(format="%(asctime)s: %(message)s", filename=config['DEFAULT']['log_file'],
@ -46,7 +42,9 @@ def __createTable(cursor, connection):
"id INTEGER PRIMARY KEY AUTOINCREMENT,"
"username TEXT NOT NULL, email TEXT NOT NULL,"
"name TEXT NOT NULL, pubkey TEXT NOT NULL,"
"timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, status INTEGER NOT NULL DEFAULT 0);")
"timestamp DATETIME DEFAULT CURRENT_TIMESTAMP CONSTRAINT "
"timestamp_valid CHECK( timestamp IS strftime('%Y-%m-%d %H:%M:%S', timestamp))"
",status INTEGER NOT NULL DEFAULT 0);")
connection.commit()
except sqlite3.Error as e:
logging.exception("Couldn't create needed SQLite Table! Exception: %s" % e)
@ -79,15 +77,24 @@ def __checkSQLite(cursor, connection):
def check_username(value):
global VALID_USER
if " " in value or "_ " in value or not value.isascii() or not value[:1].islower() or value[0].isnumeric():
VALID_USER = False
return False
if re.search(r"\W+", value):
VALID_USER = False
return False
if len(value) < 3:
VALID_USER=False
VALID_USER = False
return False
if len(value) > 16:
VALID_USER = False
return False
try:
from pwd import getpwnam
getpwnam(value)
VALID_USER = False
# intended broad
except Exception:
# everything from pwd throws an KeyError when the given user cannot be found
except KeyError:
VALID_USER = True
return True
return False
@ -98,34 +105,36 @@ def validate_pubkey(value):
global VALID_SSH
import base64
if len(value) > 8192 or len(value) < 80:
VALID_SSH=False
VALID_SSH = False
return False
value = value.replace("\"", "").replace("'", "").replace("\\\"", "")
value = value.split(' ')
types = [ 'ecdsa-sha2-nistp256', 'ecdsa-sha2-nistp384',
'ecdsa-sha2-nistp521', 'ssh-rsa', 'ssh-dss', 'ssh-ed25519' ]
types = ['ecdsa-sha2-nistp256', 'ecdsa-sha2-nistp384',
'ecdsa-sha2-nistp521', 'ssh-rsa', 'ssh-dss', 'ssh-ed25519']
if value[0] not in types:
VALID_SSH=False
VALID_SSH = False
return False
try:
base64.decodebytes(bytes(value[1], "utf-8"))
except TypeError:
VALID_SSH=False
VALID_SSH = False
return False
VALID_SSH=True
VALID_SSH = True
return True
def main():
print(" ▗▀▖ \n▗▖▖ ▐ ▌ ▌▛▀▖\n▘▝▗▖▜▀ ▌ ▌▌ ▌\n ▝▘▐ ▝▀▘▘ ▘")
username = input("Welcome to the ~.fun user application form!\n\nWhat is your desired username? [a-z0-9] allowed:\n")
username = input("Welcome to the ~.fun user application form!\n\n"
"What is your desired username? [a-z0-9] allowed:\n")
while (not re.match("[a-z]+[a-z0-9]", username)) or (not check_username(username)):
username = input("Invalid Username, maybe it exists already?\nValid characters are only a-z and 0-9."
"\nMake sure your username starts with a character and not a number."
"\nMake sure your username starts with a character and not a number"
" and is not larger than 16 characters."
"\nWhat is your desired username? [a-z0-9] allowed:\n")
fullname = input("\nPlease enter your full name:\n")
@ -154,8 +163,8 @@ def main():
if re.match("[yY]", validation):
print("Thank you for your application! We'll get in touch shortly. 🐧")
try:
connection=sqlite3.connect(REG_FILE)
cursor=connection.cursor()
connection = sqlite3.connect(REG_FILE)
cursor = connection.cursor()
__checkSQLite(cursor, connection)
addtotable(cursor, connection, username, fullname, email, pubkey)
connection.commit()

Loading…
Cancel
Save