Compare commits

..

88 Commits

Author SHA1 Message Date
n1trux 633bfbe5c5 fix that fucking Dockerfile for good, thanks @dark 5 years ago
n1trux dd5b4043be Merge remote-tracking branch 'origin/pr/4' 5 years ago
n1trux 43a3dd81ac Merge remote-tracking branch 'origin/pr/3' 5 years ago
Darksider3 df1d09a155 Forgot that message. x_X 5 years ago
Darksider3 8008fa36b9 General overhaul, better messages, ordered imports etc 5 years ago
Darksider3 32c6ea7e07 System: Rename aio_register to aio_approve. 5 years ago
Darksider3 8d4d2038d3 Typing support and sqlite3.Row!
Typing will be handy in further development, as well will i settle down
to sqlite3.Row. Fetchall() returns now a list(grr) with
dict-imitating/sumating objects, which you can call keys() on and will
return them, what we now already use for Backups, which comes handy.

Also Typing gives the ability to let the code even more documentate
itself. It's planned to use all over the place but i've to read myself
into it yet more to get started on that one.
It's a step in the right direction!
5 years ago
Darksider3 84508820a6 ListUsers: Implement --user flag, rename --list to --list-asc 5 years ago
Darksider3 b7ffedfba2 Rename SQLitedb to SQLiteDB 5 years ago
Darksider3 fb4b577eb8 minor adjustemenets 5 years ago
Darksider3 f31117e940 System: aio_register, performs all neccessary steps to create user...
BUT will NOT catch ANY exceptions. Therefore the caller script itself is
REQUIRED to catch.
5 years ago
Darksider3 dd01acf801 editUsers: Refactor from System taken 5 years ago
Darksider3 05f27078f6 System: Refactors username given in constructor
deleted all references and added one class which accepts to set a
username
5 years ago
Darksider3 f204a74f98 Refactor some variable names 5 years ago
Darksider3 7ebabbb5f6 EditUsers: Implement --remove flag 5 years ago
Darksider3 b4d4fd9ad4 System: Improved return calls at remove_user 5 years ago
Darksider3 7d5ef3b493 Wording fix in docstring 5 years ago
Darksider3 0eeafa626e Documentation work all over the place
Renamed also some paramters
5 years ago
Darksider3 65c7bb6b3f Move argparser into mains function body 5 years ago
Darksider3 a17d0ed30f ListUsers: UAP and APP => Unapproved and Approved replaced 5 years ago
Darksider3 389b614de9 Revert "ListUsers: UAP and APP renamed, function updates /home/users.txt"
This reverts commit 84cc83db3c.
5 years ago
Darksider3 84cc83db3c ListUsers: UAP and APP renamed, function updates /home/users.txt
On every register() call the System rewrites the /home/users.txt to
reflect currently active users. Will fall apart when something
unexpected happened, but that's @helix responsibility.
5 years ago
Darksider3 452204a5e9 ListUsers: --list sorts all approved users ascending 5 years ago
Darksider3 c46adc2849 editUsers: Description, remove username change and so on 5 years ago
Darksider3 24eee6e84e Shebang/env for script execution 5 years ago
Darksider3 6ea679cb60 System: userdel returns 6 when no maildir is deleted but homedir is
So we agree on that one!
5 years ago
Darksider3 fb9a98fb81 Edit: Did delete, but here it is back!
It can now create users on disk, delete from disk(by changing the
status), change everything that really matters, except the username yet
5 years ago
Darksider3 eb6f3da3d7 System: Account for userdel returning 6 on success...
just because it couldnt delete the users mail.
5 years ago
Darksider3 fb7544bd8c edit: Added Name, email and first steps for status changes 5 years ago
Darksider3 7091cbcbd2 Validator: checkName taken from registration form 5 years ago
Darksider3 786b652b21 update dev environment with disable possiblities 5 years ago
Darksider3 84431bda1d Edit: Sceleton, Interface and first argument
The sceleton for the interface and it's first possible argument are
finished, so a users pbukey can be changed now without touching the DB
itself directly.

Checking for users existence and the presence of options/arguments is
done beforehand and will throw out when nothing useful got delivered to
the script.
5 years ago
Darksider3 7786df3761 System: Introduce function to write just write the auth file 5 years ago
Darksider3 96097e26b1 Catched that little error and killed it! 5 years ago
Darksider3 27e5f0445a Shortcuts to source when working on ssh-reg
Just source the file and you get the aliases for dev_run,
dev_stop(affects containers) and dev_build which rebuilds the ssh-reg
image
5 years ago
Darksider3 710ceacd7c Breaking up the code smell regarding the CFG.py!
It began smelling already but having some duplicate code across the
interfaces is still better than having all of it all over the place.

It enables to write specific flags which are nice to have. For example,
Import.py requires the --Import flag because it WANTS the user to read
the whole Help before it acts actually as an importer. When the user
supplies something they should know what's currently happening.

Also removes the hardcoded dependency on lib.CFG-Calls from most calls
which was already embarassingly present. Introduced some db and
cfg-variables which doesnt clutter anything but suck much less.

In future we provide a set of default arguments and a bare minimum -
config_ui as the bare minimum, default as the full blown storm.

This is rather big because it also patches several other smells
including a bug where a user from the db wouldnt be reported as existent
5 years ago
Darksider3 934b6bf75a Factor out 'as CFG' 5 years ago
Darksider3 670aa3d9c3 Factor out CONF_FILE 5 years ago
Darksider3 cdc72a30f4 Factor out REG_FILE 5 years ago
Darksider3 77efb4b339 delete cfgparse again and get it going as it where before 5 years ago
Darksider3 8c3ac4060f Split CFG.py to private/lib/cfgparse.py 5 years ago
Darksider3 192e70e4a2 Seperate out even more, own dir for UIs
default UI is now in private/lib/uis/default.py
5 years ago
Darksider3 5c6ecaf627 Modularize CFG.py into CWD, default_cmd and cmd ui 5 years ago
Darksider3 5a57a62780 Allow uppercase Chars afters first character in usernames 5 years ago
Darksider3 91b5e6ae7b Positional fix: OperationalError before general 5 years ago
Darksider3 77a31a44d1 Abort on errors in import file before even trying to activate
Moves every check regarding the imported file outside of import.py into
the validator. Also removes every follow-up checks regarding it out of
import.py.

Looks a whole lot cleaner now!
5 years ago
Darksider3 2ab27aa019 Validate Timestamps and don't not insert into DB when error
Validates now the timestamp in the import.py, doesn't insert users into
the sqlite-database before creating the systems account and checks now
for existence in the database too(unapproved users, who comes first,
gets his name first..)
5 years ago
Darksider3 3ae497e2eb Let the container stop gracefully with exec!
Right now the container can't stop gracefully because the sshd-server
runs on the server as PID 0. This results in the docker daemon not
killing it but waiting for it to die, which never happens, and results
in the default timeout-wait before it KILLS the process. With exec, the
sshd becomes PID 1 and can receive and process signals(probably SIGTERM
here) and handles them as well. The container stops now nearly
instantly..
5 years ago
Darksider3 b0856b558e Move import related stuff to the Import.py script.
and delete occurences in Import It is much tidier to look at and
doesn't clutter the Script. Is even a whole seperate task
so it does make no sense to call it in Backup when i think about it.
5 years ago
Darksider3 283143104d Actually we allow currently strange things, this fixes it on the usernames 5 years ago
Darksider3 d517db3b2a On Operational Exceptions we create the table and try again
this could potentially result in a endless recursion when the
table isnt writeable and a lot other funny things happen
but normally and in 99,999999999% of our cases this
is totally fine
5 years ago
Darksider3 a1563116f6 Renamed SSH-Exception 5 years ago
Darksider3 0c1f7ea252 Validate even imported files and users
Created a new file for the validation functions, should be soon(TM) used too
in the userapplication-script but dont hurry
5 years ago
Darksider3 8274cbb27e unregister() is just a different word for removing 5 years ago
Darksider3 4760e35fda it's freakin stupid to import IDs. Nothing depends on them. Ignore them. 5 years ago
Darksider3 96c403a3d9 Introducing own Exception classes to distinguish between possible System errors 5 years ago
Darksider3 c1488f6164 Return True on success 5 years ago
Darksider3 cd8fe5fed0 commit on querys 5 years ago
Darksider3 0718de20fb Let SQLite check for incorrect dates on timestamp row 5 years ago
Darksider3 2202af7826 Introduce imports, which comes with a new flag(--Import)
--Import depends on --file being present pointing to a valid CSV-file
that contains unique users with id's. When a user already exists
with his id, the script will fail.
It is also Backup.py-only which is reflected in the help message
5 years ago
Darksider3 c28e616ea5 Faster rebuildung by moving system relevant stuff up 5 years ago
Darksider3 e0efeee67d Better error handling, just hang a freakin' / to ssh_dir if it got screwed up 5 years ago
Darksider3 04d95c2d08 realign layout 5 years ago
Darksider3 43c7636920 Make PEP8 finally happy 5 years ago
Darksider3 290e72f159 Add various documentation to the System class and sqlitedb-Class...
... which are all written in rst-format, which sphynx understands and has
an own formal specification in  http://docutils.sourceforge.net/rst.html
5 years ago
Darksider3 d413662b36 Adds ability to delete users in db as well as on the system + type hints..
.... in function names for further documentation.
5 years ago
Darksider3 a6d63fee42 PEP8: One Import per line... -.-
Add TODO statements and foremost return False on error. Missed that
5 years ago
Darksider3 c43cad73fa Print Error messages to stderr instead of standard-out. Makes very much sense indeed! 5 years ago
Darksider3 89faf57b58 System: Still a lot @TODO here, but add it. Should(TM) work, but didn't
test it yet and also have to do a whole lot more. e.g. write out in dry-mode
which commands are getting to run in serious-mode but build up together
correctly in the correct order
5 years ago
Darksider3 2afb4c79a2 Implements a backup to csv. Uses StringIO because it has an own writer()
method, which is pretty nice to have when csv.writer() want's that on its
passed variable.

Also respects every flag yet introduced(-c, -f, -a, -u) and reuses the code
already written in ListUsers.py. It could be very nice to bring that code
into lib/ because it is probably needed way more often
5 years ago
Darksider3 90b2ee5236 add gitignore 5 years ago
Darksider3 a984b14c37 Implement -f/--file option. Defaults to stdout 5 years ago
Darksider3 4134e3cc2e introduces -f/--file, which outputs or takes from a file(conditional to the current program) 5 years ago
Darksider3 6e86bf11bb -a Flag: default false, do action on only approved users 5 years ago
Darksider3 c40ef4d40a Max username length is 16 now 5 years ago
Darksider3 cab4cd25a2 Well, that alternative view rocks hard... 5 years ago
Darksider3 b368ea058c huh, that already looks close to good... o.o 5 years ago
Darksider3 f06d4fa945 first steps towards real listings! 5 years ago
Darksider3 3915843943 introducing the -u flag, which turns on the 'unapproved' mode.
Oh, and just testing it in ListUsers.py :)
5 years ago
Darksider3 2eeeebdacb Add sceleton for listusers.py 5 years ago
Darksider3 96837cebe3 Externalize CFG-Handling into lib/CFG.py.
Never write this code again!
5 years ago
Darksider3 8ed215e89b Create sqlite3 connector class to handle all the errors and quirks for us 5 years ago
Darksider3 c4912502d7 Now with dynamic TILDE_CONF support! 5 years ago
Darksider3 79fb448f30 pep8-changes like before mostly 5 years ago
Darksider3 4d353e6804 Some PEP8 conform changes, noteably line breaks and too long lines and indendantion 5 years ago
phre4k d79e06a70e Merge branch 'master' of dark/ssh-reg into master 5 years ago
Darksider3 a772fa9d40 Create /app/user/.ssh before trying to write to it! 5 years ago
phre4k 05bb0e77c6 Merge branch 'master' of dark/ssh-reg into master 5 years ago

4
.gitignore vendored

@ -0,0 +1,4 @@
__pycache__/
.idea/
test*

@ -8,21 +8,6 @@ RUN apt-get update &&\
# Clean up APT when done. # Clean up APT when done.
RUN apt-get clean && rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* RUN apt-get clean && rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
# private/{scripts, administrate.py}, public/{scripts, userapplications.py}, config/userapplicatonsconfig.ini
#configs, logs, db
COPY config/applicationsconfig.ini /app/data/applicationsconfig.ini
# admin scripts
COPY private/ /app/admin/
# user accessible scripts
# Make TILDE_ENV
COPY public/ /app/user/
#SSH config into /etc :)
COPY config/etc /etc
# create user for applications # create user for applications
RUN useradd -Md /app/user/ -s /app/user/userapplication.py tilde RUN useradd -Md /app/user/ -s /app/user/userapplication.py tilde
@ -34,15 +19,27 @@ RUN usermod -U tilde
RUN useradd -Md /app/admin -s /app/admin/administrate.py admin RUN useradd -Md /app/admin -s /app/admin/administrate.py admin
# privilege separation directory # privilege separation directory
RUN mkdir -p /var/run/sshd RUN mkdir -p /var/run/sshd
# expose SSH port # expose SSH port
EXPOSE 22 EXPOSE 22
ENV TILDE_CONF="/app/data/applicationsconfig.ini" ENV TILDE_CONF="/app/data/applicationsconfig.ini"
#COPY config/environment /app/user/.ssh/environment RUN mkdir -p /app/data
RUN echo TILDE_CONF=$TILDE_CONF > /app/user/.ssh/environment
# private/{scripts, administrate.py}, public/{scripts, userapplications.py}, config/userapplicatonsconfig.ini
#configs, logs, db
COPY config/applicationsconfig.ini /app/data/applicationsconfig.ini
#SSH config into /etc :)
COPY config/etc /etc
RUN touch /app/data/applications.sqlite RUN touch /app/data/applications.sqlite
RUN touch /app/data/applications.log RUN touch /app/data/applications.log
# Doesnt work, @TODO why # Doesnt work, @TODO why
#RUN setfacl -R -m u:tilde:rwx /app/data/ #RUN setfacl -R -m u:tilde:rwx /app/data/
RUN chown -R tilde /app/data RUN chown -R tilde /app/data
CMD ["/usr/sbin/sshd", "-D"] # admin scripts
COPY private/ /app/admin/
# user accessible scripts
# Make TILDE_ENV
COPY public/ /app/user/
RUN mkdir /app/user/.ssh
CMD ["sh", "-c", " echo TILDE_CONF=$TILDE_CONF > /app/user/.ssh/environment && exec /usr/sbin/sshd -D"]

@ -0,0 +1,129 @@
#!/usr/bin/env python3
import configparser
import csv
import io
import ListUsers
import lib.uis.default as default_cmd # Follows -u, -a, -f flags
class Backup:
"""Backups a Tilde database to an CSV file
:Example:
>>> from Backup import Backup
>>> from ListUsers import ListUsers
>>> L = ListUsers.ListUsers("/path/to/sqlite").get_fetch()
>>> backup_db = Backup("stdout")
>>> backup_db.backup_to_file(L)
CSV-Separated list with headers in first row
"""
filename: str
quoting: int
dialect: str
field_names: tuple
def __init__(self, output: str, quoting: int = csv.QUOTE_NONNUMERIC, dialect: str = "excel"):
""" Constructs the Backup object
:param output: File name to backup to(set to stdout for stdout)
:type output: str
:param quoting: Set quoting for CSV Module
:type quoting: int
:param dialect: Set the CSV-Dialect. Defaults to excel, which is the classic CSV
:type dialect: str
"""
self.setFilename(output)
self.setQuoting(quoting)
self.setDialect(dialect)
self.setFieldnames(tuple(['id', 'username', 'email', 'name', 'pubkey', 'timestamp', 'status']))
def setDialect(self, dialect: str) -> None:
""" Set dialect for Object
:param dialect: Dialect to set for Object
:type dialect: str
:return: None
:rtype: None
"""
self.dialect = dialect
def setQuoting(self, quoting: int) -> None:
""" Set quoting in the CSV(must be supported by the CSV Module!)
:param quoting: Quoting Integer given by csv.QUOTE_* constants
:type quoting: int
:return: None
:rtype: None
"""
self.quoting = quoting
def setFilename(self, filename: str) -> None:
""" Sets Filename to output to
:param filename: Filename to output to(set stdout for stdout)
:type filename: str
:return: None
:rtype: None
"""
self.filename = filename
def setFieldnames(self, f_names: tuple) -> None:
""" Set fieldname to process
:param f_names: Fieldnames-Tuple
:type f_names: tuple
:return: None
:rtype: None
"""
self.field_names = f_names
def backup_to_file(self, fetched: list) -> bool:
"""Backup Userlist to File(or stdout)
:param fetched: List of values to write out CSV-formatted
:return: True, if success, None when not.
:rtype: bool
"""
returner = io.StringIO()
write_csv = csv.DictWriter(returner, fieldnames=self.field_names, quoting=self.quoting, dialect=self.dialect)
write_csv.writeheader()
for row in fetched:
write_csv.writerow(dict(row))
# sqlite3.Row doesn't "easily" convert to a dict itself sadly, so just a quick help from us here
# it actually even delivers a list(sqlite3.Row) also, which doesnt make the life a whole lot easier
if self.filename == "stdout":
print(returner.getvalue())
return True
else:
with open(self.filename, "w") as f:
print(returner.getvalue(), file=f)
return True
if __name__ == "__main__":
default_cmd.argparser.description += " - Backups Tilde Users to stdout or a file."
args = default_cmd.argparser.parse_args()
config = configparser.ConfigParser()
config.read(args.config)
L = ListUsers.ListUsers(config['DEFAULT']['applications_db'],
unapproved=args.unapproved, approved=args.approved)
fetch = L.get_fetch()
if fetch:
B = Backup(args.file)
B.setFieldnames(fetch[0].keys()) # sqlite3.row delivers its keys for us! SO NICE!
B.backup_to_file(fetch)
else:
print("nothing to backup!")
exit(1)
exit(0)

@ -0,0 +1,87 @@
#!/usr/bin/env python3
import configparser
import csv
import os
import lib.UserExceptions
import lib.uis.config_ui # dont go to default, just following -c flag
def import_from_file(file_path: str, db: str, user_ids: tuple = tuple([])) -> bool:
""" Imports Users from a given CSV-file to the system and DB
:param file_path:
:type file_path: str
:param db: Path to the sqlite db
:type db: str
:param user_ids: FIXME: Tuple which user_ids should we write
:type user_ids: tuple
:return: True on success, False when not
:rtype: bool
"""
if not os.path.isfile(file_path):
print(f"File {file_path} don't exist")
return False
if not os.path.isfile(db):
print(f"The database file {db} don't exist")
return False
if user_ids:
pass # empty tuple means everything
# noinspection PyBroadException
try:
with open(file_path, 'r', newline='') as f:
import lib.Validator
sql = lib.sqlitedb.SQLiteDB(db)
err = lib.Validator.checkImportFile(file_path, db)
if err is not True:
print(err)
exit(0)
import lib.sqlitedb
import lib.System
sys_ctl = lib.System.System("root")
reader = csv.DictReader(f) # @TODO csv.Sniffer to compare? When yes, give force-accept option
for row in reader:
if row["status"] == "1":
try:
sys_ctl.setUser(row["username"])
sys_ctl.aio_approve(row["pubkey"])
print(row['username'], "====> Registered.")
except lib.UserExceptions.General as GeneralExcept:
print(f"Something didnt work out! {GeneralExcept}")
elif row["status"] == "0":
print(row['username'] + " not approved, therefore not registered.")
try:
sql.safequery(
"INSERT INTO `applications` (username, name, timestamp, email, pubkey, status) "
"VALUES (?,?,?,?,?,?)", tuple([row["username"], row["name"], row["timestamp"],
row["email"], row["pubkey"], row["status"]]))
except OSError as E:
pass
print(f"UUFFF, something went WRONG with the file {file_path}: {E}")
except Exception as didntCatch:
print(f"Exception! UNCATCHED! {type(didntCatch)}: {didntCatch}")
return True
if __name__ == "__main__":
ArgParser = lib.uis.config_ui.argparser
ArgParser.description += "- Imports a CSV file consisting of user specific details to the database"
ArgParser.add_argument('-f', '--file', default="stdout",
type=str, help='Import from CSV file', required=True)
ArgParser.add_argument('--Import', default=False, action="store_true",
help="Import Users.", required=True)
args = ArgParser.parse_args()
config = configparser.ConfigParser()
config.read(args.config)
if not args.Import:
print("Error, need the import flag")
if not args.file:
print("Error, need the import file")
if not args.file:
print("You MUST set a CSV-file with the -f/--file flag that already exist")
exit(1)
import_from_file(args.file, config['DEFAULT']['applications_db'])
exit(0)

@ -0,0 +1,120 @@
#!/usr/bin/env python3
import configparser
import sqlite3 # sqlite3.Row-Object
from typing import List # Typing support!
import lib.uis.default as default_cmd # Follows -u, -a, -f flags
from lib.sqlitedb import SQLiteDB
class ListUsers:
db = None
usersFetch = None
def __init__(self, db: str, unapproved: bool = False, approved: bool = True, single_user: str = None):
"""Constructs ListUsers
:param db: Database to access
:type db: str
:param unapproved: only List unapproved users
:type unapproved: bool
:param approved: only list approved users
:type approved: bool
"""
self.db = SQLiteDB(db)
if unapproved: # only unapproved users
query = "SELECT * FROM `applications` WHERE `status` = '0'"
elif approved: # Approved users
query = "SELECT * FROM `applications` WHERE `status` = '1'"
else: # All users
query = "SELECT * FROM `applications`"
self.usersFetch = self.db.query(query)
if single_user is not None:
query = "SELECT * FROM `applications` WHERE `username` = ?"
self.usersFetch = self.db.safequery(query, tuple([single_user]))
def output_as_list(self) -> str:
"""Generates a string with one (approved) user per line and one newline at the end
:rtype: str
:return: String consisting with one(activated) user per line
"""
list_str: str = ""
query = "SELECT `username` FROM `applications` WHERE `status` = '1' ORDER BY timestamp ASC"
self.usersFetch = self.db.query(query)
for users in self.usersFetch:
list_str += users["username"] + "\n"
return list_str
def prettyPrint(self) -> None:
pass # see below why not implemented yet, texttable...
def get_fetch(self) -> List[sqlite3.Row]:
""" Returns a complete fetch done by the lib.sqlitedb-class
:return: Complete fetchall(). A List[sqlite3.Row] with dict-emulation objects.
:rtype: List[sqlite3.Row]
"""
return self.usersFetch
# @TODO MAYBE best solution: https://pypi.org/project/texttable/
# examle:
"""
from texttable import Texttable
t = Texttable()
t.add_rows([['Name', 'Age'], ['Alice', 24], ['Bob', 19]])
print(t.draw())
---------------> Results in:
+-------+-----+
| Name | Age |
+=======+=====+
| Alice | 24 |
+-------+-----+
| Bob | 19 |
+-------+-----+
for user in fetch:
print("ID: {}; Username: \"{}\"; Mail: {}; Name: \"{}\"; Registered: {}; Status: {}".format(
user["id"], user["username"], user["email"], user["name"], user["timestamp"], user["status"]
))
"""
if __name__ == "__main__":
default_cmd.argparser.description += " - Lists Users from the Tilde database."
default_cmd.argparser.add_argument('--list-asc', default=False, action="store_true",
help='Output a newline seperated list of users', required=False, dest="args_asc")
default_cmd.argparser.add_argument('--user', default=None, type=str,
help="Just show a specific user by it's name", required=False)
args = default_cmd.argparser.parse_args()
config = configparser.ConfigParser()
config.read(args.config)
ret = ""
if args.user is not None:
L = ListUsers(config['DEFAULT']['applications_db'], unapproved=args.unapproved, approved=args.approved,
single_user=args.user)
else:
L = ListUsers(config['DEFAULT']['applications_db'], unapproved=args.unapproved, approved=args.approved)
if args.args_asc:
ret = L.output_as_list()
else:
fetch = L.get_fetch()
ret += "ID %-1s| Username %-5s| Mail %-20s| Name %-17s| Registered %-8s | State |\n" % (
" ", " ", " ", " ", " "
)
ret += 102 * "-" + "\n"
for user in fetch:
ret += "%-4i| %-14s| %-25s| %-22s| %-8s | %-5i |\n" % (
user["id"], user["username"], user["email"], user["name"], user["timestamp"], user["status"]
)
if args.file != "stdout":
with open(args.file, 'w') as f:
print(ret, file=f)
else:
print(ret)
exit(0)

@ -4,388 +4,395 @@ import configparser, logging, sqlite3, argparse, pwd
import os import os
import subprocess import subprocess
# Clear shell # Clear shell
def clear(): def clear():
os.system('cls' if os.name == 'nt' else 'clear') os.system('cls' if os.name == 'nt' else 'clear')
# create dictionary out of sqlite results # create dictionary out of sqlite results
def dict_factory(cursor, row): def dict_factory(cursor, row):
d = {} d = {}
for idx, col in enumerate(cursor.description): for idx, col in enumerate(cursor.description):
d[col[0]] = row[idx] d[col[0]] = row[idx]
return d return d
# prints command(but doesnt execute them) # prints command(but doesnt execute them)
# need this for work, just convenience # need this for work, just convenience
def debugExec(commands): def debugExec(commands):
print("Commands: {!s} -> Returns 0".format(commands)) print("Commands: {!s} -> Returns 0".format(commands))
return 0 return 0
# @TODO hardcoded config? # @TODO hardcoded config?
cwd = os.environ.get('TILDE_CONF') cwd = os.environ.get('TILDE_CONF')
if cwd is None: if cwd is None:
cwd=os.getcwd()+"/applicationsconfig.ini" cwd = os.getcwd() + "/applicationsconfig.ini"
else: else:
if os.path.isfile(cwd) is False: if os.path.isfile(cwd) is False:
cwd=os.getcwd()+"/applicationsconfig.ini" cwd = os.getcwd() + "/applicationsconfig.ini"
# cwd is now either cwd/applicationsconfig or $TILDE_CONF # cwd is now either cwd/applicationsconfig or $TILDE_CONF
argparser = argparse.ArgumentParser(description = 'interactive registration formular for tilde platforms') argparser = argparse.ArgumentParser(description='interactive registration formular for tilde platforms')
argparser.add_argument('-c', '--config', default = cwd, argparser.add_argument('-c', '--config', default=cwd,
type = str, help = 'Path to configuration file', required = False) type=str, help='Path to configuration file', required=False)
args = argparser.parse_args() args = argparser.parse_args()
CONF_FILE = args.config CONF_FILE = args.config
config = configparser.ConfigParser() config = configparser.ConfigParser()
config.read(CONF_FILE) config.read(CONF_FILE)
logging.basicConfig(format="%(asctime)s: %(message)s", logging.basicConfig(format="%(asctime)s: %(message)s",
level = int(config['LOG_LEVEL']['log_level']) level=int(config['LOG_LEVEL']['log_level'])
) )
del(cwd) del cwd
REG_FILE = config['DEFAULT']['applications_db'] REG_FILE = config['DEFAULT']['applications_db']
# Does everything related to applicants, i.e. creating, manipulations... # Does everything related to applicants, i.e. creating, manipulations...
class applicants(): class Applicants:
# User identifier # User identifier
identifier = "username" identifier = "username"
# SQLite DB Path # SQLite DB Path
sourceDB = "" sourceDB = ""
# another sqlite to batch-recreate users # another sqlite to batch-recreate users
differentDB = "" differentDB = ""
def __init__(self, lident, sourceDB=REG_FILE):
self.identifier = lident def __init__(self, lident, sourcedb=REG_FILE):
self.sourceDB = sourceDB self.identifier = lident
self.__connectToDB__("source") self.sourceDB = sourcedb
# all results shall be done with dict_factory! Makes everything so much simpler self.__connectToDB__("source")
self.sdbCursor.row_factory = dict_factory # all results shall be done with dict_factory! Makes everything so much simpler
self.sdbCursor.row_factory = dict_factory
def __del__(self):
self.__closeDB__("source") def __del__(self):
self.__closeDB__("source")
def __connectToDB__(self, which):
if which == "source": def __connectToDB__(self, which):
try: if which == "source":
self.sdbConnection = sqlite3.connect(self.sourceDB) try:
self.sdbCursor = self.sdbConnection.cursor() self.sdbConnection = sqlite3.connect(self.sourceDB)
except sqlite3.Error as e: self.sdbCursor = self.sdbConnection.cursor()
logging.exception("Database: Couldn't open database and get cursor: %s" % e) except sqlite3.Error as e:
else: logging.exception("Database: Couldn't open database and get cursor: %s" % e)
self.ddbConnection = sqlite3.connect(self.differentDB) else:
self.ddbCursor = self.ddbConnection.cursor() self.ddbConnection = sqlite3.connect(self.differentDB)
self.ddbCursor = self.ddbConnection.cursor()
def __closeDB__(self, which):
if(which == "source"): def __closeDB__(self, which):
try: if which == "source":
self.sdbConnection.close() try:
except sqlite3.Error as e: self.sdbConnection.close()
logging.exception("Couldn't close database! Error: %s" % e) # @TODO: Dump full db with query or just the executed querys to file except sqlite3.Error as e:
else: logging.exception(
self.ddbConnection.close() # @TODO: Evaluate getting rid of ddb(differentDB)? "Couldn't close database! Error: %s" % e)
# @TODO: Dump full db with query or just the executed querys to file
# get List of all applications(not accepted yet) else:
def getApplicationsList(self): self.ddbConnection.close() # @TODO: Evaluate getting rid of ddb(differentDB)?
query = "SELECT * FROM `applications` WHERE `status` = '0'"
try: # get List of all applications(not accepted yet)
self.sdbCursor.execute(query) def getapplicationslist(self):
rows = self.sdbCursor.fetchall() query = "SELECT * FROM `applications` WHERE `status` = '0'"
except sqlite3.Error as e: try:
logging.exception("Database Error: %s" % e) self.sdbCursor.execute(query)
rows=[] rows = self.sdbCursor.fetchall()
return rows except sqlite3.Error as e:
logging.exception("Database Error: %s" % e)
def getApprovedApplicantsList(self): rows = []
query = "SELECT * From `applications` WHERE `status` = '1'" return rows
try:
self.sdbCursor.execute(query) def getapprovedapplicantslist(self):
rows = self.sdbCursor.fetchall() query = "SELECT * From `applications` WHERE `status` = '1'"
except sqlite3.Error as e: try:
logging.exception("Database Error: %s" % e) self.sdbCursor.execute(query)
rows=[] rows = self.sdbCursor.fetchall()
return rows except sqlite3.Error as e:
logging.exception("Database Error: %s" % e)
# edit aproved users rows = []
def editApprovedApplicant(self, term, updaterow): return rows
try:
self.sdbCursor.execute( # edit aproved users
"UPDATE `applications` SET ? WHERE id=?", def editapprovedapplicants(self, term):
( str(term), ) try:
) # the fuck did i try here?
self.sdbConnection.commit() self.sdbCursor.execute(
except sqlite3.Error as e: "UPDATE `applications` WHERE id=?", (str(term),)
logging.exception("Database Error: %s" % e) )
self.sdbConnection.commit()
# set user to aproved except sqlite3.Error as e:
def setApprovedApplication(self, selectterm): logging.exception("Database Error: %s" % e)
query = "SELECT `username` FROM `applications` WHERE `username` = `{0!s}`".format(selectterm)
# set user to aproved
# get applicants data def setapprovedapplication(self, selectterm):
def getApplicantsData(self, term): # query = "SELECT `username` FROM `applications` WHERE `username` = `{0!s}`".format(selectterm)
# @TODO: Use shorthand if for the correct query, directly into sqlite pass
if self.identifier == "id":
try: # get applicants data
self.sdbCursor.execute( def getapplicantsdata(self, term):
"SELECT * FROM `applications` WHERE id = ?", # @TODO: Use shorthand if for the correct query, directly into sqlite
( str(term), ) if self.identifier == "id":
) try:
except sqlite3.Error as e: self.sdbCursor.execute(
logging.exception("Database Error: %s" % e) "SELECT * FROM `applications` WHERE id = ?",
(str(term),)
else: )
self.sdbCursor.execute( except sqlite3.Error as e:
"SELECT * FROM `applications` WHERE username = ?", logging.exception("Database Error: %s" % e)
( str(term), )
) else:
result = self.sdbCursor.fetchone() self.sdbCursor.execute(
return result "SELECT * FROM `applications` WHERE username = ?",
(str(term),)
# @TODO: migrade just approved users to some new/another sqlitedb )
def migrateApprovedData(self, different_db): result = self.sdbCursor.fetchone()
pass return result
# @TODO: delete migrated data # @TODO: migrade just approved users to some new/another sqlitedb
def deleteMigratedDataSet(self, selectterm): def migrateapproveddata(self, different_db):
pass pass
# Applicants whom doesnt got approved should get removed # @TODO: delete migrated data
def removeApplicant(self, term): def deletemigrateddata(self, selectterm):
if self.identifier == "id": pass
try:
self.sdbCursor.execute( # Applicants whom doesnt got approved should get removed
"DELETE FROM `applications` WHERE id = ?", def removeapplicant(self, term):
( str(term), ) if self.identifier == "id":
) try:
self.sdbConnection.commit() self.sdbCursor.execute(
except sqlite3.Error as e: "DELETE FROM `applications` WHERE id = ?",
logging.exception("Database Error: %s" % e) (str(term),)
)
else: self.sdbConnection.commit()
self.sdbCursor.execute( except sqlite3.Error as e:
"DELETE FROM `applications` WHERE username = ?", logging.exception("Database Error: %s" % e)
( str(term), )
) else:
self.sdbConnection.commit() self.sdbCursor.execute(
'DELETE FROM `applications` WHERE username = ?',
#@TODO: Possibility to work without passing users manually (str(term),)
def selectedUser(userid, username = False): )
pass self.sdbConnection.commit()
# Print out a list of aprovable users # @TODO: Possibility to work without passing users manually
def printApprovableUsers(self, users): def selecteduser(userid, username=False):
i=0 pass
for user in users:
print("ID: {0!s}, Status: {0!s}, Name: {0!s}".format(i, user["status"], user["username"])) # Print out a list of aprovable users
i += 1 def printapprovableusers(self, users):
return i i = 0
for user in users:
# Get List of users print("ID: {0!s}, Status: {0!s}, Name: {0!s}".format(i, user["status"], user["username"]))
def userPrint(self, fetched, userid): i += 1
print("ID: {0!s}".format(fetched[int(userid)]["id"])) return i
print("Username: {0!s}".format(fetched[int(userid)]["username"]))
print("Mail: {0!s}".format(fetched[int(userid)]["email"])) # Get List of users
print("SSH: {0!s}".format(fetched[int(userid)]["pubkey"])) @staticmethod
print("Registrated time: {0!s}".format(fetched[int(userid)]["timestamp"])) def userprint(fetched, userid):
print("ID: {0!s}".format(fetched[int(userid)]["id"]))
# Approve an applicant. Handles everything related, like create home dir, set flags blabla print("Username: {0!s}".format(fetched[int(userid)]["username"]))
def approveApplicant(self, term): print("Mail: {0!s}".format(fetched[int(userid)]["email"]))
user = self.getApplicantsData(term) print("SSH: {0!s}".format(fetched[int(userid)]["pubkey"]))
ret = self.__execScript(user) print("Registrated time: {0!s}".format(fetched[int(userid)]["timestamp"]))
if ret[0] != 0: # @DEBUG: Change to == 0
print("Something went wrong in the user creation! Exiting without deleting users record in database!") # Approve an applicant. Handles everything related, like create home dir, set flags blabla
print("Last executed commands: {0!s}\nreturn code: {1!s}".format(ret[-1][1], ret[-1][0])) def approveapplicant(self, term):
exit(0) user = self.getapplicantsdata(term)
ret = self.__execScript(user)
if self.identifier == "id": if ret[0] != 0: # @DEBUG: Change to == 0
try: print("Something went wrong in the user creation! Exiting without deleting users record in database!")
self.sdbCursor.execute( print("Last executed commands: {0!s}\nreturn code: {1!s}".format(ret[-1][1], ret[-1][0]))
"UPDATE `applications` SET `status`=1 WHERE `id`=?", exit(0)
( str(term), )
) if self.identifier == "id":
self.sdbConnection.commit() try:
except sqlite3.Error as e: self.sdbCursor.execute(
logging.exception("Database Error: %s" % e) "UPDATE `applications` SET `status`=1 WHERE `id`=?",
(str(term),)
else: )
self.sdbCursor.execute( self.sdbConnection.commit()
"UPDATE `applications` SET `status`=1 WHERE `username`=?" except sqlite3.Error as e:
( str(term), ) logging.exception("Database Error: %s" % e)
)
self.sdbConnection.commit() else:
self.sdbCursor.execute(
# Script execution, handles everything done with the shell/commands themselves "UPDATE `applications` SET `status`=1 WHERE `username`=?",
def __execScript(self, user): (str(term), )
# @TODO: omfg just write some wrapper-class/lib... sucks hard! )
username=user["username"] self.sdbConnection.commit()
homeDir="/home/"+username+"/"
sshDir=homeDir+".ssh/" # Script execution, handles everything done with the shell/commands themselves
executed=[] @staticmethod
def __execScript(user):
executed.append(["useradd", "-m", username]) # @TODO: omfg just write some wrapper-class/lib... sucks hard!
rcode = subprocess.call(executed[0]) username = user["username"]
if rcode != 0: home_dir = "/home/" + username + "/"
return [rcode,executed,] ssh_dir = home_dir + ".ssh/"
executed = []
executed.append(["usermod", "--lock", username])
rcode = subprocess.call(executed[1]) #empty pw executed.append(["useradd", "-m", username])
if rcode != 0: returncode = subprocess.call(executed[0])
return [rcode,executed,] if returncode != 0:
return [returncode, executed, ]
executed.append(["usermod", "-a", "-G", "tilde", username])
rcode = subprocess.call(executed[2]) # add to usergroup executed.append(["usermod", "--lock", username])
if rcode != 0: returncode = subprocess.call(executed[1]) # empty pw
return [rcode,executed,] if returncode != 0:
return [returncode, executed, ]
executed.append(["mkdir", sshDir])
try: executed.append(["usermod", "-a", "-G", "tilde", username])
# @TODO: use config variable(chmodPerms) returncode = subprocess.call(executed[2]) # add to usergroup
ret = os.mkdir(sshDir, 0o777) #create sshdir if returncode != 0:
rcode = 0 return [returncode, executed, ]
except OSError as e:
logging.exception(e.strerror) executed.append(["mkdir", ssh_dir])
rcode = e.errno # False, couldn't create. try:
return [rcode,executed,] # @TODO: use config variable(chmodPerms)
os.mkdir(ssh_dir, 0o777) # create sshdir
executed.append(["write(sshkey) to", sshDir+"authorized_keys"]) returncode = 0
with open(sshDir+"authorized_keys", "w") as f: except OSError as e:
f.write(user["pubkey"]) logging.exception(e.strerror)
if f.closed != True: returncode = e.errno # False, couldn't create.
logging.exception("Could'nt write to authorized_keys!") return [returncode, executed, ]
return [rcode,executed,]
executed.append(["write(sshkey) to", ssh_dir + "authorized_keys"])
executed.append(["chmod", "-Rv", "700", sshDir]) with open(ssh_dir + "authorized_keys", "w") as f:
f.write(user["pubkey"])
if not f.closed:
logging.exception("Could'nt write to authorized_keys!")
return [returncode, executed, ]
executed.append(["chmod", "-Rv", "700", ssh_dir])
try:
os.chmod(ssh_dir + "authorized_keys", 0o700) # directory is already 700
returncode = 0
except OSError as e:
logging.exception(e.strerror)
returncode = e.errno
return [returncode, executed, ]
try:
executed.append(["chown", "-Rv", username + ":" + username, ssh_dir])
os.chown(ssh_dir, pwd.getpwnam(username)[2], pwd.getpwnam(username)[3]) # 2=>uid, 3=>gid
executed.append(["chown", "-v", username + ":" + username, ssh_dir + "authorized_keys"])
os.chown(ssh_dir + "authorized_keys", pwd.getpwnam(username)[2], pwd.getpwnam(username)[3])
returncode = 0
except OSError as e:
logging.exception(e.strerror) # @TODO: maybe append strerror to executed instead of printing it
returncode = e.errno
return [returncode, executed, ]
return [returncode, executed, ]
# {'id': 7, 'username': 'testuser47', 'email': '47test@testmail.com', 'name':
# 'test Name', 'pubkey': 'ssh-rsa [...]', 'timestamp': '2018-08-22 13:31:16', 'status': 0}
try:
os.chmod(sshDir+"authorized_keys", 0o700) # directory is already 700
rcode = 0
except OSError as e:
logging.exception(e.strerror)
rcode = e.errno
return [rcode, executed,]
try:
executed.append(["chown", "-Rv", username+":"+username, sshDir])
os.chown(sshDir, pwd.getpwnam(username)[2], pwd.getpwnam(username)[3]) #2=>uid, 3=>gid
executed.append(["chown", "-v", username+":"+username, sshDir+"authorized_keys"])
os.chown(sshDir+"authorized_keys", pwd.getpwnam(username)[2], pwd.getpwnam(username)[3])
rcode = 0
except OSError as e:
logging.exception(e.strerror) # @TODO: maybe append strerror to executed instead of printing it
rcode = e.errno
return [rcode, executed,]
return [rcode,executed,]
"""
{'id': 7, 'username': 'testuser47', 'email': '47test@testmail.com', 'name':
'test Name', 'pubkey': 'ssh-rsa [...]', 'timestamp': '2018-08-22 13:31:16', 'status': 0}
"""
def main(): def main():
# how many times the Seperator/Delimiter? # how many times the Separator/Delimiter?
delcount = 40 delcount = 40
# The seperator for the menu # The separator for the menu
Seperator = "="*delcount separator = "=" * delcount
Menu = Seperator+"\n\t\t Main-Menu:\n\n" \ menu = separator + "\n\t\t Main-Menu:\n\n" \
"\t 1) list and edit pending users\n"\ "\t 1) list and edit pending users\n" \
"\t 2) list applicants\n"\ "\t 2) list applicants\n" \
"\t 3) edit applicant\n"\ "\t 3) edit applicant\n" \
"\t 4) quit\n"+Seperator+"\n" "\t 4) quit\n" + separator + "\n"
# Identify by ID # Identify by ID
applications = applicants(lident = "id") applications = Applicants(lident="id")
while 1 != 0: while 1 != 0:
print(Menu) print(menu)
command = input("Please select, what you want to do: \n -> ") command = input("Please select, what you want to do: \n -> ")
# User shouldnt be able to type something in that isnt a number # User shouldn't be able to type something in that isnt a number
if command.isalpha() or command == '': if command.isalpha() or command == '':
clear()
print("!!! invalid input, please try again. !!!")
continue
# convert
command=int(command)
if command == 4 or command == "q":
exit(0)
# Edit and list pending users/applicants @TODO Wording: Users or applicants?
elif command == 1:
users = applications.getApplicationsList()
i=applications.printApprovableUsers(users)
if i == 0 :
print("No pending users")
# giving some time to aknowledge that something WRONG happened
input("Continue with Keypress...")
clear()
continue
usersel = 0
UserMax = i
print("Menu:\n r=>return to main")
# Edit Menue
while 1 != 0 or usersel != "r":
i = applications.printApprovableUsers(users)
if usersel == "r":
break # break when user presses r
usersel = input("Which user( ID ) do you want to change? ->")
if len(usersel) > 1 or usersel.isalpha():
usersel = ""
# convert to int if input isnt an r
usersel = int(usersel) if usersel != '' and usersel != 'r' else 0
if usersel > UserMax - 1:
print("User {0!s} doesn't exist!".format(usersel))
continue
# Show the user his chosen user and ask what to do
applications.userPrint(users, usersel)
print("You chosed ID No. {0!s}, what do you like to do?".format(usersel))
chosenUser = usersel
usersel = ""
# Finally down the edit menue!
while usersel != "e":
usersel = input("User: {0!s}\n \t\t(A)ctivate \n\t\t(R)emove \n\t\tR(e)turn\n -> ".format(chosenUser))
if usersel == "A":
applications.approveApplicant(users[chosenUser]['id'])
print("User {0!s} has been successfully approved!".format(users[chosenUser]['username']))
input("waiting for input...")
clear()
usersel="e" # remove for being able to continue editing?
continue
elif usersel == "R":
applications.removeApplicant(users[chosenUser]['id'])
print("User {0!s} successfully deleted!".format(user[chosenUser]['username']))
input("waiting for input...")
clear() clear()
print("!!! invalid input, please try again. !!!")
continue continue
elif usersel == "e":
clear() # convert
command = int(command)
if command == 4 or command == "q":
exit(0)
# Edit and list pending users/applicants @TODO Wording: Users or applicants?
elif command == 1:
users = applications.getapplicationslist()
i = applications.printapprovableusers(users)
if i == 0:
print("No pending users")
# giving some time to acknowledge that something WRONG happened
input("Continue with Keypress...")
clear()
continue
user_selection = 0
user_max = i
print("Menu:\n r=>return to main")
# Edit Menu
while 1 != 0 or user_selection != "r":
i = applications.printapprovableusers(users)
if user_selection == "r":
break # break when user presses r
user_selection = input("Which user( ID ) do you want to change? ->")
if len(user_selection) > 1 or user_selection.isalpha():
user_selection = ""
# convert to int if input isnt an r
user_selection = int(user_selection) if user_selection != '' and user_selection != 'r' else 0
if user_selection > user_max - 1:
print("User {0!s} doesn't exist!".format(user_selection))
continue
# Show the user his chosen user and ask what to do
applications.userprint(users, user_selection)
print("You chosed ID No. {0!s}, what do you like to do?".format(user_selection))
chosen_user = user_selection
user_selection = ""
# Finally down the edit menu!
while user_selection != "e":
user_selection = input(
"User: {0!s}\n \t\t(A)ctivate \n\t\t(R)emove \n\t\tR(e)turn\n -> ".format(chosen_user))
if user_selection == "A":
applications.approveapplicant(users[chosen_user]['id'])
print("User {0!s} has been successfully approved!".format(users[chosen_user]['username']))
input("waiting for input...")
clear()
user_selection = "e" # remove for being able to continue editing?
continue
elif user_selection == "R":
applications.removeapplicant(users[chosen_user]['id'])
print("User {0!s} successfully deleted!".format(user[chosen_user]['username']))
input("waiting for input...")
clear()
continue
elif user_selection == "e":
clear()
continue
elif int(command) == 2:
users = applications.getapprovedapplicantslist()
if not users:
print("no activate users yet!")
i = 0
for user in users:
print("ID: {0!s}, Status: {1!s}, Name: {2!s}".format(user["id"], user["status"], user["username"]))
continue continue
elif command == str(3):
elif int(command) == 2: pass
users = applications.getApprovedApplicantsList() else:
exit(0)
if users == []:
print("no activate users yet!")
i=0
for user in users:
print("ID: {0!s}, Status: {1!s}, Name: {2!s}".format(user["id"], user["status"], user["username"]))
continue
elif command == str(3):
pass
else:
exit(0)
if __name__ == "__main__": if __name__ == "__main__":
try: try:
main() main()
exit(0) exit(0)
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass
#print("Exception occured. View log file for details.") # print("Exception occured. View log file for details.")
#logging.exception("Some exception occured") # logging.exception("Some exception occured")

@ -0,0 +1,147 @@
#!/usr/bin/env python3
import configparser
import sqlite3
import lib.System
import lib.UserExceptions
import lib.Validator
import lib.sqlitedb
import lib.uis.config_ui # only follow -c flag
if __name__ == "__main__":
lib.uis.config_ui.argparser.description += " - Edit Tilde Users"
ArgParser = lib.uis.config_ui.argparser
ArgParser.add_argument('--user', type=str,
help='Tilde users name to edit', required=True)
Mutually = ArgParser.add_mutually_exclusive_group()
Mutually.add_argument('-r', '--remove', default=False, action="store_true",
help='Remove an approved/unapproved User from the system(and DB). Effectively purges him.',
required=False)
Mutually.add_argument("--verify", default=True, action="store_false",
help="Turns off value checks",
required=False)
ArgParser.add_argument('--sshpubkey', type=str, default=None,
help="Stores the new given SSH-Key in given user", required=False)
ArgParser.add_argument('--name', type=str, default=None,
help="Sets the stored name of the given user")
ArgParser.add_argument('--username', type=str, default=None,
help="Rename given User")
ArgParser.add_argument('--email', type=str, default=None,
help="Set new email address for given user")
ArgParser.add_argument('--status', type=int, default=None,
help="Set status of given user")
args = ArgParser.parse_args()
config = configparser.ConfigParser()
config.read(args.config)
db = config['DEFAULT']['applications_db']
if not args.sshpubkey and not args.name and not args.username and not args.email and args.status is None \
and not args.remove:
print(f"Well, SOMETHING must be done with {args.user} ;-)")
exit(1)
# --> --user
if not lib.Validator.checkUserInDB(args.user, db):
print(f"User {args.user} does not exist in the database.")
exit(1)
DB = lib.sqlitedb.SQLiteDB(db)
sys_ctl = lib.System.System(args.user)
if not DB:
print("Could not establish connection to database")
exit(1)
CurrentUser = DB.safequery("SELECT * FROM `applications` WHERE `username`=?", tuple([args.user]))[0]
# --> --remove
if args.remove:
print(f"Removing {args.user} from the system and the database...")
try:
DB.removeApplicantFromDBperUsername(args.user)
print(f"Purged from the DB")
if CurrentUser["status"] == 1:
sys_ctl.remove_user()
print(f"Purged from the system")
else:
print(f"'{args.user}' was not approved before, therefore not deleting from system itself.")
except lib.UserExceptions.General as e:
print(f"{e}")
exit(1)
print(f"Successfully removed '{args.user}'.")
exit(0)
# --> --sshpubkey
if args.sshpubkey:
if not lib.Validator.checkSSHKey(args.sshpubkey):
print(f"Pubkey '{args.sshpubkey}' isn't valid.")
exit(1)
try:
DB.safequery("UPDATE `applications` SET `pubkey`=? WHERE `username`=?",
tuple([args.sshpubkey, args.user]))
CurrentUser = DB.safequery("SELECT * FROM `applications` WHERE `username` = ? ", tuple([args.user]))[0]
if int(CurrentUser["status"]) == 1:
sys_ctl.make_ssh_usable(args.sshpubkey)
except sqlite3.Error as e:
print(f"Something unexpected happened! {e}")
exit(1)
except lib.UserExceptions.ModifyFilesystem as e:
print(f"One action failed during writing the ssh key back to the authorization file. {e}")
print(f"'{args.user}'s SSH-Key updated successfully.")
# --> --name
if args.name:
if not lib.Validator.checkName(args.name):
print(f"'{args.name}' is not a valid Name.")
exit(1)
try:
DB.safequery("UPDATE `applications` SET `name` =? WHERE `username` =?", tuple([args.name, args.user]))
except sqlite3.Error as e:
print(f"Could not write '{args.name}' to database: {e}")
print(f"'{args.user}'s Name changed to '{args.name}'.")
# --> --email
if args.email:
if not lib.Validator.checkEmail(args.email):
print(f"'{args.email}' is not a valid Mail address!")
exit(1)
try:
DB.safequery("UPDATE `applications` SET `email` =? WHERE `username` =?", tuple([args.email]))
except sqlite3.Error as e:
print(f"Could not write '{args.email}' to the database. {e}")
print(f"'{args.user}' Mail changed to '{args.email}'.")
# --> --status
if args.status is not None:
if args.status != 0 and args.status != 1:
print("Only 0 and 1 are valid status, where 1 is activated and 0 is unapproved.")
exit(0)
# just takes first result out of the dict
if args.status == int(CurrentUser["status"]):
print(f"New and old status are the same.")
if args.status == 0 and int(CurrentUser["status"]) == 1:
try:
DB.safequery("UPDATE `applications` SET `status` =? WHERE `id`=?",
tuple([args.status, CurrentUser["id"]]))
sys_ctl.remove_user()
except sqlite3.Error as e:
print(f"Could not update database entry for '{args.user}', did not touch the system")
exit(1)
except lib.UserExceptions.UnknownReturnCode as e:
print(f"Could not remove '{args.user}' from the system, unknown return code: {e}. DB is modified.")
exit(1)
print(f"Successfully changed '{args.user}'s status to 0 and cleared from the system.")
if args.status == 1 and int(CurrentUser["status"]) == 0:
try:
DB.safequery("UPDATE `applications` SET `status`=? WHERE `username`=?",
tuple([args.status, args.user]))
sys_ctl.aio_approve(CurrentUser["pubkey"])
except sqlite3.Error as e:
print(f"Could not update Users status in database")
exit(1)
except lib.UserExceptions.General as ChangeUser:
print(f"Some chain in the cattle just slipped away, my lord! {ChangeUser}")
exit(1)
print(f"Successfully changed '{args.user}'s status to 1 and created on the system.")
exit(0)

@ -0,0 +1,6 @@
import configparser
import lib.uis.default as default_cmd
args = default_cmd.argparser.parse_args()
config = configparser.ConfigParser()
config.read(args.config)

@ -0,0 +1,260 @@
import os
import pwd
import subprocess
import lib.UserExceptions
class System:
"""Class to interact with the system specifically to support our needs 0w0
:Example:
>>> from lib.System import System as System
>>> Sys_ctl = System("Test", dryrun=True)
>>> Sys_ctl.register()
>>> Sys_ctl.lock_user_pw()
>>> Sys_ctl.add_to_usergroup()
>>> Sys_ctl.make_ssh_usable("sshkey")
"""
dry: bool = False
create_command = []
home: str = ""
user: str
def setUser(self, username: str):
self.user = username
def __init__(self, username: str, dryrun: bool = False, home: str = "/home/"):
"""Creates an objects. Can set dry run.
:param username: Username to manipulate
:type username: str
:param dryrun: Run all command in a dry-run? When enabled, doesn't make any changes to the system (defaults to
false)
:type dryrun: bool
:param home: Standard directory to search for the home directories of your users(default is /home/)
:type home: str
:raises:
ValueError: if homedir can not be found
"""
self.dry = dryrun
if not home.endswith("/"):
home += "/"
if not os.path.isdir(home):
raise ValueError("home should be an existent directory...")
self.home = home
self.user = username
def aio_approve(self, pubkey, group="tilde"):
""" Executes all neccessary steps to create a user from itself. Raises ALOT of possible exceptions
:Note: CAREFULL! You MUST except the exceptions!
:param pubkey: Users public ssh key
:type pubkey: str
:param group: User-group. Defaults to tilde
:type group: str
:return: None
:raises:
lib.UserExceptions.UserExistsAlready: User Exists already on system
lib.UserExceptions.UnknownReturnCode: Unknown Return Code from useradd
lib.UserExceptions.SSHDirUncreatable: Users SSH Dir couldnt be created
lib.UserExceptions.ModifyFilesystem: Something with CHMOD failed
"""
self.register()
self.lock_user_pw()
self.add_to_usergroup(group)
self.make_ssh_usable(pubkey)
def register(self, cc: tuple = tuple(["useradd", "-m"])) -> bool:
"""Creates an local account for the given username
:param cc: Tuple with commands separated to execute on the machine. (defaults to useradd -m)
:type cc: tuple
:return: True, if worked, raises lib.UserExceptions.UserExistsAlready when not
:rtype: bool
:raises:
lib.UserExceptions.UserExistsAlready: when username specified already exists on the system
"""
create_command = cc
cc = create_command + tuple([self.user])
if self.dry:
self.printTuple(cc)
return True
elif not self.dry:
rt = subprocess.call(cc)
if rt != 0:
raise lib.UserExceptions.UserExistsAlready(self.user)
return True
def unregister(self) -> bool:
""" Just an alias function for removeUser
:return: True, when success, False(or exception) when not
:rtype: bool
"""
return self.remove_user()
def make_ssh_usable(self, pubkey: str, sshdir: str = ".ssh/") -> bool:
""" Make SSH usable for our newly registered user
:param pubkey: Public SSH Key for the User you want accessible by SSH
:type pubkey: str
:param sshdir: Directory to write the authorized_keys File to. PWD is $HOME of said user. (defaults to ".ssh/")
:type sshdir: str
:return: True, if worked, raises lib.UserExceptions.UnknownReturnCode, lib.UserExceptions.HomeDirExistsAlready
or lib.UserExceptions.ModifyFilesystem when not
:rtype: bool
:raises:
lib.UserExceptions.SSHDirUncreatable: if the ssh-dir couldnt be created nor exist
lib.UserExceptions.ModifyFilesystem: When chmod to .ssh and authorized_keys failed
lib.UserExceptions.General: if PWD cant find the specified user
"""
if self.dry:
print("Nah, @TODO, but actually kinda too lazy for this lul. Just a lot happening here")
return True
if not sshdir.endswith("/"):
sshdir += "/"
ssh_dir = self.home + self.user + "/" + sshdir
try:
os.mkdir(ssh_dir)
except FileExistsError:
pass # thats actually a good one for us :D
except OSError as e:
raise lib.UserExceptions.SSHDirUncreatable(f"Could not create {ssh_dir}: Exception: {e}")
try:
self.write_ssh(pubkey, ssh_dir)
except OSError as e:
raise lib.UserExceptions.ModifyFilesystem(
f"Could not write and/or chmod 0700 {ssh_dir} or {ssh_dir}/authorized_keys, Exception: {e}")
try:
pwdnam = pwd.getpwnam(self.user)
os.chown(ssh_dir, pwdnam[2], pwdnam[3]) # 2=>uid, 3=>gid
os.chown(ssh_dir + "authorized_keys", pwd.getpwnam(self.user)[2], pwd.getpwnam(self.user)[3])
except OSError as e: # by os.chown
raise lib.UserExceptions.ModifyFilesystem(
f"Could not chown {ssh_dir} and/or authorized_keys to {self.user} and their group, Exception: {e}", )
except KeyError as e: # by PWD
raise lib.UserExceptions.General(f"PWD can't find {self.user}: {e}")
return True
@staticmethod
def write_ssh(key: str, ssh_dir: str) -> None:
""" Write SSH key to a specified directory(appends authorized_keys itself!)
:param key: Key to write
:type key: str
:param ssh_dir: SSH Directory to write to
:type ssh_dir: str
:return: None
"""
with open(ssh_dir + "authorized_keys", "w") as f:
print(key, file=f)
f.close()
os.chmod(ssh_dir + "authorized_keys", 0o700) # we dont care about the directory here
def lock_user_pw(self, cc: tuple = tuple(["usermod", "--lock"])) -> bool:
"""Lock a users password so it stays empty
:param cc: Commands to run in the subprocess to lock it down(defaults to usermod --lock)
:type cc: tuple
:rtype: bool
:return: True, if worked, raises lib.UserExceptions.UnknownReturnCode when not
:raises:
lib.UserExceptions.UnknownReturnCode: When cc returns something else then 0
"""
lock_command = cc
cc = lock_command + tuple([self.user])
if self.dry:
self.printTuple(cc)
return True
elif not self.dry:
rt = subprocess.call(cc)
if rt != 0:
raise lib.UserExceptions.UnknownReturnCode(f"Could not lock user '{self.user}'; '{cc}' returned '{rt}'")
return True
def add_to_usergroup(self, group: str = "tilde", cc: tuple = tuple(["usermod", "-a", "-G"])) -> bool:
""" Adds a given user to a given group
:param group: Groupname where you want to add your user to
:type group: str
:param cc: Commands to execute that adds your user to said specific group(defaults to usermod -a -G")
:type cc: tuple
:return: True, if worked, raises lib.UserExceptions.UnknownReturnCode when not
:rtype bool
:raises:
lib.UserExceptions.UnknownReturnCode: if cc returned something else then 0
"""
add_command = cc
cc = add_command + tuple([group, self.user])
if self.dry:
self.printTuple(cc)
return True
elif not self.dry:
rt = subprocess.call(cc)
if rt != 0:
raise lib.UserExceptions.UnknownReturnCode(
f"Could not add user '{self.user}' to group '{group}' with command '{cc}', returned '{rt}'", )
return True
@staticmethod
def printTuple(tup: tuple) -> None:
"""Prints a tuple with spaces as separators
:param tup: Tuple you want to print
:type tup: tuple
:rtype: None
:returns: Nothing
"""
pp = ""
for i in tup:
pp += i + " "
print(pp)
def remove_user(self, cc: tuple = tuple(["userdel", "-r"])) -> bool:
"""Removes the specified user from the system
:param cc: Commands to execute to delete the user from the System(defaults to userdel -r)
:type cc: tuple
:return: True, if worked, raises lib.UserExceptions.UnknownReturnCode when not
:rtype: bool
:raises:
lib.UserExceptions.UnknownReturnCode: When cc returns something else then 0 or 6
"""
remove_command = cc
cc = remove_command + tuple([self.user])
if self.dry:
self.printTuple(cc)
return True
else:
ret = subprocess.Popen(cc, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
ret.wait() # wait for cc
stdio, err_io = ret.communicate() # get stdout as well as stderr
if ret.returncode != 0 and ret.returncode != 6: # userdel returns 6 when no mail dir was found but success
raise lib.UserExceptions.UnknownReturnCode(
f"Could not delete user with command {cc}. Return code: {ret.returncode},"
f" stdout/stderr: {stdio + err_io}")
return True
if __name__ == "__main__":
try:
S = System("dar", dryrun=True)
S.register()
S.lock_user_pw()
S.add_to_usergroup()
# if not S.make_ssh_usable("dar", "SSHpub"):
# print("Huh, error :shrug:")
exit(0)
except KeyboardInterrupt:
pass

@ -0,0 +1,48 @@
class General(Exception):
pass
class User(General):
pass
class UnknownUser(User):
def __init__(self, name):
Exception.__init__(self, f"Tried to perform action on unknown user '{name}'")
class UserExistsAlready(User):
def __init__(self, name):
Exception.__init__(self, f"User '{name}' is already registered")
class UnknownReturnCode(General):
pass
class ModifyFilesystem(General):
pass
class SSHDirUncreatable(ModifyFilesystem):
pass
class SQLiteDatabaseDoesntExistYet(General):
pass
class UsernameLength(User):
pass
class UsernameTooShort(User):
pass
class UsernameTooLong(User):
pass
class UsernameInvalidCharacters(User):
pass

@ -0,0 +1,219 @@
import csv
import pwd
import re
import lib.sqlitedb
def checkUsernameCharacters(username: str) -> bool:
""" Checks the Username for invalid characters. Allow only alphanumerical characters, a lower alpha one first,
Followed by any sequence of digits and characters
:param username: String to check for validity
:type username: str
:return: True when valid, False when not
:rtype: bool
"""
if " " not in username and "_" not in username and username.isascii() and username[:1].islower() and \
not username[0].isnumeric():
if not re.search(r"\W+", username):
if not re.search("[^a-zA-Z0-9]", username):
return True
return False
def checkUsernameLength(username: str, upper_limit: int = 16, lower_limit: int = 3) -> bool:
""" Checks username for an upper and lower bounds limit character count
:param username: Username to check
:type username: str
:param upper_limit: Upper limit bounds to check for(default is 16)
:type upper_limit: int
:param lower_limit: Lower limit bounds to check for(default is 3)
:return: True, when all bounds are in, False when one or both aren't.
:rtype: bool
"""
if len(username) > upper_limit:
return False
if len(username) < lower_limit:
return False
return True
def checkUserExists(username: str) -> bool:
""" Checks if the User exists on the **SYSTEM** by calling PWD on it.
**Note**: You might want to use this in conjunction with checkUserInDB
:param username:
:type username: str
:return: True when exists, False when not
:rtype: bool
"""
try:
pwd.getpwnam(username)
except KeyError:
return False
return True # User already exists
def checkUserInDB(username: str, db: str) -> bool:
""" Checks users existence in the **DATABASE**.
:Note: You might want to use this in conjunction with `checkUserExists`
:param username: Username to check existence in database
:type username: str
:param db: Path to database to check in
:type db: str
:return: True, when User exists, False when not
"""
try:
ldb = lib.sqlitedb.SQLiteDB(db)
fetched = ldb.safequery("SELECT * FROM 'applications' WHERE username = ?", tuple([username]))
if fetched:
return True
except lib.sqlitedb.sqlite3.Error as e:
print(f"SQLite Exception: {e}")
return False
def checkSSHKey(key: str) -> bool:
""" Checks SSH Key for meta-data that we accept.
:Note: We currently only allow ssh keys without options but with a mail address at the end in b64 encoded.
The currently supported algorithms are: ecdfsa-sha2-nistp256, 'ecdsa-sha2-nistp384', 'ecdsa-sha2-nistp521',
'ssh-rsa', 'ssh-dss' and 'ssh-ed25519'
:param key: Key to check
:return: True, when Key is valid, False when not
:rtype: bool
"""
# taken from https://github.com/hashbang/provisor/blob/master/provisor/utils.py, all belongs to them! ;)
import base64
if len(key) > 8192 or len(key) < 80:
return False
key = key.replace("\"", "").replace("'", "").replace("\\\"", "")
key = key.split(' ')
types = ['ecdsa-sha2-nistp256', 'ecdsa-sha2-nistp384',
'ecdsa-sha2-nistp521', 'ssh-rsa', 'ssh-dss', 'ssh-ed25519']
if key[0] not in types:
return False
try:
base64.decodebytes(bytes(key[1], "utf-8"))
except TypeError:
return False
return True
def checkEmail(mail: str) -> bool:
""" Checks Mail against a relatively simple REgex Pattern.
:param mail: Mail to check
:type mail: str
:return: False, when the Mail is invalid, True when valid.
:rtype: bool
"""
if not re.match("(^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$)", mail):
return False
else:
return True
def checkDatetimeFormat(datetime_str: str) -> bool:
""" Checks a Strings format on date time.
:param datetime_str: String to check
:type datetime_str: str
:return: True when valid, False when not.
:rtype: bool
"""
import datetime
try:
datetime.datetime.strptime(datetime_str, "%Y-%m-%d %H:%M:%S")
except ValueError:
return False
return True
def checkName(name: str) -> bool:
""" Checks a users (real) Name against a real simple REgex Pattern.
:param name: Name/String to check
:type name: str
:return: True when valid, False when not.
:rtype: bool
"""
if not re.match("\w+\s*\w", name):
return False
else:
return True
def checkImportFile(path: str, db: str):
""" Checks an CSV file against most of the validators and prints an Error message with the line number corresponding
to said failure.. Those includes: checkName, checkUsernameCharacters,
ckeckUsernameLength, duplicate usernames(in the CSV), checkSSHKey, checkEmail, checkUserExists, checkUserInDB,
checkDatetimeformat and if the status is 1 or 0.
:param path: Path to file to check
:type path: str
:param db: Path to database file(SQLite)
:type db: str
:return: Str when Failure, True when success(All tests passed)
:rtype: Str or None
"""
errstr = ""
valid = True
ln = 1 # line number
valid_names_list = []
with open(path, 'r', newline='') as f:
reader = csv.DictReader(f)
for row in reader:
# if any of this fails move on to the next user, just print a relatively helpful message lel
if not lib.Validator.checkName(row["name"]):
errstr += f"Line {ln}: Name: '{row['name']}' seems not legit. Character followed by character should" \
f" be correct.\n"
valid = False
if not lib.Validator.checkUsernameCharacters(row["username"]):
errstr += (f"Line {ln}: Username contains unsupported characters or starts with a number: '"
f"{row['username']}'.\n")
valid = False
if not lib.Validator.checkUsernameLength(row["username"]):
errstr += f"Line {ln}: Username '{row['username']}' is either too long(>16) or short(<3)\n"
valid = False
# dup checking
if row["username"] in valid_names_list:
errstr += f"Line {ln}: Duplicate Username {row['username']}!\n"
valid = False
else:
valid_names_list.append(row["username"])
# dup end
if not lib.Validator.checkSSHKey(row["pubkey"]):
errstr += f"Line {ln}: Following SSH-Key of user '{row['username']}' isn't valid: " \
f"'{row['pubkey']}'.\n"
valid = False
if not lib.Validator.checkEmail(row["email"]):
errstr += f"Line {ln}: E-Mail address of user '{row['username']}' '{row['email']}' is not valid.\n"
valid = False
if lib.Validator.checkUserExists(row["username"]) or checkUserInDB(row["username"], db):
errstr += f"Line {ln}: User '{row['username']}' already exists.\n"
valid = False
if not lib.Validator.checkDatetimeFormat(row["timestamp"]):
errstr += f"Line {ln}: Timestamp '{row['timestamp']}' from user '{row['username']}' is invalid.\n"
valid = False
if int(row["status"]) > 1 or int(row["status"]) < 0:
errstr += f"Line {ln}: Status '{row['status']}' MUST be either 0 or 1.\n"
valid = False
ln += 1
if valid:
return True
else:
return errstr

@ -0,0 +1,9 @@
import os
cwd = os.environ.get('TILDE_CONF')
if cwd is None:
cwd = os.getcwd() + "/applicationsconfig.ini"
else:
if os.path.isfile(cwd) is False:
cwd = os.getcwd() + "/applicationsconfig.ini"
# cwd is now either cwd/applicationsconfig or $TILDE_CONF

@ -0,0 +1,164 @@
#!/usr/bin/env python3
import sqlite3
from sys import stderr as stderr
from typing import List # Typing support!
# create dictionary out of sqlite results
def dict_factory(cursor, row):
d: dict = {}
for idx, col in enumerate(cursor.description):
d[col[0]] = row[idx]
return d
class SQLiteDB:
"""SQLitedb handles EVERYTHING directly related to our Database."""
db = ""
cursor = None
connection = None
last_result = None
def __init__(self, dbpath: str):
"""
:param dbpath: Path to the database we want to open
:type dbpath: str
:returns: Object for the SQLitedb-Class.
:rtype: object
"""
db = dbpath
try:
self.connection = sqlite3.connect(db)
self.cursor = self.connection.cursor()
except sqlite3.Error as e:
print("Connection error: %s" % e, file=stderr)
self.cursor.row_factory = sqlite3.Row # every result will be a dict now
def __del__(self):
try:
self.connection.commit()
self.connection.close()
except sqlite3.Error as e:
print("Couldn't gracefully close db: %s" % e, file=stderr)
def query(self, qq: str) -> List[sqlite3.Row]:
"""Do a query and automagically get the fetched results in a list
:param qq: Query to execute
:type qq: str
:returns: A tuple(/list) consisting with any fetched results
:rtype: list
"""
try:
self.cursor.execute(qq)
self.last_result = self.cursor.fetchall()
self.connection.commit()
except sqlite3.OperationalError:
self._createTable()
return self.query(qq)
except sqlite3.Error as e:
print("Couldn't execute query %s, exception: %s" % (qq, e), file=stderr)
self.last_result = []
return self.last_result
# sometimes we need the cursor for safety reasons, for example does sqlite3 all the security related
# escaoing in supplied strings for us, when we deliver it to con.execute in the second argument as a tuple
def getCursor(self) -> sqlite3:
"""Returns SQLite3 Cursor. Use with **c a u t i o n**... """
return self.cursor
# we could try to utilise that ourselfs in a function. Be c a r e f u l, these values in the tuple MUST HAVE
# THE RIGHT TYPE
def safequery(self, qq: str, deliver: tuple) -> List[sqlite3.Row]:
""" Shall handle any query that has user input in it as an alternative to self.query
:param qq: Query to execute
:type qq: str
:param deliver: User inputs marked with the placeholder(`?`) in the str
:type deliver: tuple
:returns: A tuple(/list) consisting with any fetched results
:rtype: List[sqlite3.Row]
"""
try:
self.cursor.execute(qq, deliver)
self.last_result = self.cursor.fetchall()
self.connection.commit()
except TypeError as e:
print("Types in given tuple doesnt match to execute query \"%s\": %s" % (qq, e), file=stderr)
self.last_result = []
except sqlite3.OperationalError:
self._createTable()
return self.safequery(qq, deliver)
except sqlite3.Error as e:
print("Couldn't execute query %s, exception: %s" % (qq, e), file=stderr)
print(deliver)
print(type(e))
self.last_result = []
return self.last_result
def removeApplicantFromDB(self, userid: int) -> bool:
"""Removes Applicants from the DB by ID. Use along System.removeUser()
:param userid: User ID to remove from the Database
:type userid: int
:returns: True, if removal was successful(from the DB), False when not
:rtype: bool
"""
try:
self.last_result = self.cursor.execute("DELETE FROM `applications` WHERE id = ? ", [userid])
self.connection.commit()
except sqlite3.OperationalError:
print("The database has probably not yet seen any users, so it didnt create your table yet. Come back"
"when a user tried to register")
return False
except sqlite3.Error as e:
print(f"Could not delete user with id: {userid}, exception in DB: {e}") # @TODO LOGGING FFS
return False
return True
def removeApplicantFromDBperUsername(self, username: str) -> bool:
"""Removes Applicants from the DB by Username. Use along System.removeUser()
:param username: Username to remove from the database
:type username: str
:returns: True, if removal was successful(from the DB), False when not
:rtype: bool
"""
try:
self.last_result = self.cursor.execute("DELETE FROM `applications` WHERE username = ?", [username])
self.connection.commit()
except sqlite3.OperationalError:
print("The database has probably not yet seen any users, so it didnt create your table yet. Come back"
"when a user tried to register")
return False
except sqlite3.Error as e:
print(f"Could not delete user {username}, exception in DB: {e}") # @TODO LOGGING
return False
return True
def _createTable(self) -> None:
try:
self.cursor.execute(
"CREATE TABLE IF NOT EXISTS applications("
"id INTEGER PRIMARY KEY AUTOINCREMENT,"
"username TEXT NOT NULL, email TEXT NOT NULL,"
"name TEXT NOT NULL, pubkey TEXT NOT NULL,"
"timestamp DATETIME DEFAULT CURRENT_TIMESTAMP CONSTRAINT "
"timestamp_valid CHECK( timestamp IS strftime('%Y-%m-%d %H:%M:%S', timestamp))"
",status INTEGER NOT NULL DEFAULT 0);")
self.connection.commit()
except sqlite3.Error as e:
print(f"The database probably doesn't exist yet, but read the message: {e}")
print("The database table didn't exist yet; created it successfully!")
if __name__ == "__main__":
try:
SQLiteDB("bla.db")
print("hi")
exit(0)
except KeyboardInterrupt:
pass

@ -0,0 +1,6 @@
import argparse
import lib.cwd
argparser = argparse.ArgumentParser(description='Tilde administration tools ', conflict_handler="resolve")
argparser.add_argument('-c', '--config', default=lib.cwd.cwd,
type=str, help='Path to configuration file', required=False)

@ -0,0 +1,11 @@
import lib.uis.config_ui
argparser = lib.uis.config_ui.argparser
# store_true just stores true when the command is supplied, so it doesn't need choices nor types
argparser.add_argument('-u', '--unapproved', default=False, action="store_true",
help='only unapproved users. Default is only approved.', required=False)
argparser.add_argument('-a', '--approved', default=False, action="store_true",
help="Only approved Users.", required=False)
argparser.add_argument('-f', '--file', default="stdout",
type=str, help='write to file instead of stdout', required=False)

@ -0,0 +1,5 @@
unalias dev_build
unalias dev_run
unalias dev_bash
unalias dev_stop
unalias dev_disable

@ -0,0 +1,5 @@
alias dev_run="docker container run -l dev-ssh-reg --name dev-ssh-reg --rm -itd -v $PWD/private/:/app/admin ssh-reg"
alias dev_stop="docker container stop dev-ssh-reg"
alias dev_build="docker build -t ssh-reg:latest --force-rm ."
alias dev_bash="docker container exec -it dev-ssh-reg bash -c 'cd /app/admin; exec bash --login -i'"
alias dev_disable="source private/scripts/disable.sh"

@ -1,171 +1,187 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import re, configparser, logging, sqlite3, argparse import argparse
from os import getcwd import configparser
import logging
import re
import sqlite3
from os import environ from os import environ
from os import getcwd
from os import path as ospath from os import path as ospath
import re, configparser, logging, sqlite3
try: try:
cwd = environ.get('TILDE_CONF') cwd = environ.get('TILDE_CONF')
if cwd is None: if cwd is None:
cwd=getcwd()+"/applicationsconfig.ini" cwd = getcwd()+"/applicationsconfig.ini"
else: else:
if ospath.isfile(cwd) is False: if ospath.isfile(cwd) is False:
cwd=getcwd()+"/applicationsconfig.ini" cwd = getcwd() + "/applicationsconfig.ini"
# cwd is now either cwd/applicationsconfig or $TILDE_CONF # cwd is now either cwd/applicationsconfig or $TILDE_CONF
argparser = argparse.ArgumentParser(description='interactive registration formular for tilde platforms') argparser = argparse.ArgumentParser(description='interactive registration formular for tilde platforms')
argparser.add_argument('-c', '--config', default=cwd, type=str, help='Config file', required=False) argparser.add_argument('-c', '--config', default=cwd, type=str, help='Config file', required=False)
args = argparser.parse_args() args = argparser.parse_args()
CONF_FILE=args.config CONF_FILE = args.config
config = configparser.ConfigParser()
config.read(CONF_FILE)
logging.basicConfig(format="%(asctime)s: %(message)s", filename=config['DEFAULT']['log_file'],
level=int(config['LOG_LEVEL']['log_level']))
REG_FILE = config['DEFAULT']['applications_db']
except: except:
logging.exception("Argumentparser-Exception: ") # intended broad, @TODO check them all for errors instead of everything in one
logging.exception("logging or configparser-Exception: ")
try: exit(0)
config = configparser.ConfigParser()
config.read(CONF_FILE)
logging.basicConfig(format="%(asctime)s: %(message)s", filename=config['DEFAULT']['log_file'],level=int(config['LOG_LEVEL']['log_level']))
del(cwd)
REG_FILE=config['DEFAULT']['applications_db']
except:
logging.exception("logging or configparser-Exception: ")
VALID_SSH=False VALID_SSH = False
VALID_USER=False VALID_USER = False
def __createTable(cursor, connection): def __createTable(cursor, connection):
try: try:
cursor.execute( cursor.execute(
"CREATE TABLE IF NOT EXISTS applications(" \ "CREATE TABLE IF NOT EXISTS applications("
"id INTEGER PRIMARY KEY AUTOINCREMENT,"\ "id INTEGER PRIMARY KEY AUTOINCREMENT,"
"username TEXT NOT NULL, email TEXT NOT NULL,"\ "username TEXT NOT NULL, email TEXT NOT NULL,"
"name TEXT NOT NULL, pubkey TEXT NOT NULL,"\ "name TEXT NOT NULL, pubkey TEXT NOT NULL,"
"timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, status INTEGER NOT NULL DEFAULT 0);") "timestamp DATETIME DEFAULT CURRENT_TIMESTAMP CONSTRAINT "
connection.commit() "timestamp_valid CHECK( timestamp IS strftime('%Y-%m-%d %H:%M:%S', timestamp))"
except: ",status INTEGER NOT NULL DEFAULT 0);")
logging.exception("Couldn't create needed SQLite Table!") connection.commit()
except sqlite3.Error as e:
logging.exception("Couldn't create needed SQLite Table! Exception: %s" % e)
def addtotable(cursor, connection, username, name, email, pubkey): def addtotable(cursor, connection, username, name, email, pubkey):
try: try:
cursor.execute("INSERT INTO 'applications'(username, name, email, pubkey)VALUES("\ cursor.execute("INSERT INTO 'applications'(username, name, email, pubkey)VALUES("
"?,?,?,?)", [username, name, email, pubkey]) "?,?,?,?)", [username, name, email, pubkey])
connection.commit() connection.commit()
except: except sqlite3.Error as e:
logging.exception("Couldn't insert user into the db") logging.exception("Couldn't insert user into the db: %s" % e)
# check if sqlite file does exists or already and has our structure # check if sqlite file does exists or already and has our structure
def __checkSQLite(cursor, connection): def __checkSQLite(cursor, connection):
#SELECT name FROM sqlite_master WHERE type='table' AND name='{table_name}'; # SELECT name FROM sqlite_master WHERE type='table' AND name='{table_name}';
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='applications'") cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='applications'")
res=cursor.fetchall() res = cursor.fetchall()
if res== []: if not res:
try: try:
__createTable(cursor, connection) __createTable(cursor, connection)
except: except sqlite3.Error as e:
logging.exception("couldn't create table on given database. Exception: ") logging.exception("couldn't create table on given database. Exception: %s" % e)
else: else:
pass pass
return True return True
def check_username(value): def check_username(value):
global VALID_USER global VALID_USER
if len(value) < 3: if " " in value or "_ " in value or not value.isascii() or not value[:1].islower() or value[0].isnumeric():
VALID_USER=False VALID_USER = False
return False
if re.search(r"\W+", value):
VALID_USER = False
return False
if len(value) < 3:
VALID_USER = False
return False
if len(value) > 16:
VALID_USER = False
return False
try:
from pwd import getpwnam
getpwnam(value)
VALID_USER = False
# everything from pwd throws an KeyError when the given user cannot be found
except KeyError:
VALID_USER = True
return True
return False return False
try:
from pwd import getpwnam
getpwnam(value)
VALID_USER=False
except:
VALID_USER=True
return True
return False
# taken from https://github.com/hashbang/provisor/blob/master/provisor/utils.py, all belongs to them! ;) # taken from https://github.com/hashbang/provisor/blob/master/provisor/utils.py, all belongs to them! ;)
def validate_pubkey(value): def validate_pubkey(value):
global VALID_SSH global VALID_SSH
import base64 import base64
if len(value) > 8192 or len(value) < 80: if len(value) > 8192 or len(value) < 80:
VALID_SSH=False VALID_SSH = False
return False return False
value = value.replace("\"", "").replace("'", "").replace("\\\"", "") value = value.replace("\"", "").replace("'", "").replace("\\\"", "")
value = value.split(' ') value = value.split(' ')
types = [ 'ecdsa-sha2-nistp256', 'ecdsa-sha2-nistp384', types = ['ecdsa-sha2-nistp256', 'ecdsa-sha2-nistp384',
'ecdsa-sha2-nistp521', 'ssh-rsa', 'ssh-dss', 'ssh-ed25519' ] 'ecdsa-sha2-nistp521', 'ssh-rsa', 'ssh-dss', 'ssh-ed25519']
if value[0] not in types: if value[0] not in types:
VALID_SSH=False VALID_SSH = False
return False return False
try: try:
base64.decodebytes(bytes(value[1], "utf-8")) base64.decodebytes(bytes(value[1], "utf-8"))
except TypeError: except TypeError:
VALID_SSH=False VALID_SSH = False
return False return False
VALID_SSH=True
return True
VALID_SSH = True
return True
def main(): def main():
print(" ▗▀▖ \n▗▖▖ ▐ ▌ ▌▛▀▖\n▘▝▗▖▜▀ ▌ ▌▌ ▌\n ▝▘▐ ▝▀▘▘ ▘") print(" ▗▀▖ \n▗▖▖ ▐ ▌ ▌▛▀▖\n▘▝▗▖▜▀ ▌ ▌▌ ▌\n ▝▘▐ ▝▀▘▘ ▘")
username = input("Welcome to the ~.fun user application form!\n\nWhat is your desired username? [a-z0-9] allowed:\n") username = input("Welcome to the ~.fun user application form!\n\n"
while (not re.match("[a-z]+[a-z0-9]", username)) or (not check_username(username)): "What is your desired username? [a-z0-9] allowed:\n")
username = input("Invalid Username, maybe it exists already?\nValid characters are only a-z and 0-9.\nMake sure your username starts with a character and not a number." \ while (not re.match("[a-z]+[a-z0-9]", username)) or (not check_username(username)):
"\nWhat is your desired username? [a-z0-9] allowed:\n") username = input("Invalid Username, maybe it exists already?\nValid characters are only a-z and 0-9."
"\nMake sure your username starts with a character and not a number"
" and is not larger than 16 characters."
fullname = input("\nPlease enter your full name:\n") "\nWhat is your desired username? [a-z0-9] allowed:\n")
while not re.match("\w+\s*\w*", username):
fullname = input("\nThat is not your real name.\nPlease enter your full name:\n") fullname = input("\nPlease enter your full name:\n")
while not re.match("\w+\s*\w*", username):
fullname = input("\nThat is not your real name.\nPlease enter your full name:\n")
email = input("\nPlease enter your email address:\n")
while not re.match("(^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$)", email): email = input("\nPlease enter your email address:\n")
email = input("\nThat is not a valid mail address.\nPlease enter your email address:\n") while not re.match("(^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$)", email):
email = input("\nThat is not a valid mail address.\nPlease enter your email address:\n")
pubkey = input("\nPlease paste your ssh public key:\n") pubkey = input("\nPlease paste your ssh public key:\n")
while (not re.match("ssh-(\w)+\s(\w+)(\s*)([a-zA-Z0-9@]*)", pubkey)) or (not validate_pubkey(pubkey)): while (not re.match("ssh-(\w)+\s(\w+)(\s*)([a-zA-Z0-9@]*)", pubkey)) or (not validate_pubkey(pubkey)):
pubkey = input("\nPlease enter a valid public key. You can show it with ssh-keygen -f .ssh/id_rsa -y on your local machine.\nPlease enter your pubkey:\n") pubkey = input("\nPlease enter a valid public key. You can show it with ssh-keygen -f .ssh/id_rsa -y on your "
validate_pubkey(pubkey) "local machine.\nPlease enter your pubkey:\n")
validate_pubkey(pubkey)
print("\nUsername: {0!s}".format(username)) print("\nUsername: {0!s}".format(username))
print("Full Name: {0!s}".format(fullname)) print("Full Name: {0!s}".format(fullname))
print("Email: {0!s}".format(email)) print("Email: {0!s}".format(email))
print("Public {0!s}".format(pubkey)) print("Public {0!s}".format(pubkey))
validation = input("\nIs this information correct? [y/N]")
validation = input("\nIs this information correct? [y/N]") while not re.match("[yYnN\n]", validation):
while not re.match("[yYnN\n]", validation): print("Please answer y for yes or n for no")
print("Please answer y for yes or n for no") validation = input("Is this information correct? [y/N]")
validation = input("Is this information correct? [y/N]") if re.match("[yY]", validation):
if re.match("[yY]", validation): print("Thank you for your application! We'll get in touch shortly. 🐧")
print("Thank you for your application! We'll get in touch shortly. 🐧") try:
try: connection = sqlite3.connect(REG_FILE)
connection=sqlite3.connect(REG_FILE) cursor = connection.cursor()
cursor=connection.cursor() __checkSQLite(cursor, connection)
__checkSQLite(cursor, connection) addtotable(cursor, connection, username, fullname, email, pubkey)
addtotable(cursor, connection, username, fullname, email, pubkey) connection.commit()
connection.commit() connection.close()
connection.close() except sqlite3.Error as e:
except: logging.exception("Database {0!s} couldnt be accessed or created. Exception: {0!s}".
logging.exception("Database {0!s} couldnt be accessed or created. Exception:".format(config['DEFAULT']['applications_db'])) format(config['DEFAULT']['applications_db'], e))
connection.close() if connection:
exit(1) connection.close()
pass exit(1)
return 0
return 0
if __name__ == "__main__": if __name__ == "__main__":
try: try:
main() main()
exit(0) exit(0)
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass

Loading…
Cancel
Save