pep8-changes like before mostly

feature-admin-split
Darksider3 5 years ago
parent 4d353e6804
commit 79fb448f30

@ -4,388 +4,395 @@ import configparser, logging, sqlite3, argparse, pwd
import os import os
import subprocess import subprocess
# Clear shell # Clear shell
def clear(): def clear():
os.system('cls' if os.name == 'nt' else 'clear') os.system('cls' if os.name == 'nt' else 'clear')
# create dictionary out of sqlite results # create dictionary out of sqlite results
def dict_factory(cursor, row): def dict_factory(cursor, row):
d = {} d = {}
for idx, col in enumerate(cursor.description): for idx, col in enumerate(cursor.description):
d[col[0]] = row[idx] d[col[0]] = row[idx]
return d return d
# prints command(but doesnt execute them) # prints command(but doesnt execute them)
# need this for work, just convenience # need this for work, just convenience
def debugExec(commands): def debugExec(commands):
print("Commands: {!s} -> Returns 0".format(commands)) print("Commands: {!s} -> Returns 0".format(commands))
return 0 return 0
# @TODO hardcoded config? # @TODO hardcoded config?
cwd = os.environ.get('TILDE_CONF') cwd = os.environ.get('TILDE_CONF')
if cwd is None: if cwd is None:
cwd=os.getcwd()+"/applicationsconfig.ini" cwd = os.getcwd() + "/applicationsconfig.ini"
else: else:
if os.path.isfile(cwd) is False: if os.path.isfile(cwd) is False:
cwd=os.getcwd()+"/applicationsconfig.ini" cwd = os.getcwd() + "/applicationsconfig.ini"
# cwd is now either cwd/applicationsconfig or $TILDE_CONF # cwd is now either cwd/applicationsconfig or $TILDE_CONF
argparser = argparse.ArgumentParser(description = 'interactive registration formular for tilde platforms') argparser = argparse.ArgumentParser(description='interactive registration formular for tilde platforms')
argparser.add_argument('-c', '--config', default = cwd, argparser.add_argument('-c', '--config', default=cwd,
type = str, help = 'Path to configuration file', required = False) type=str, help='Path to configuration file', required=False)
args = argparser.parse_args() args = argparser.parse_args()
CONF_FILE = args.config CONF_FILE = args.config
config = configparser.ConfigParser() config = configparser.ConfigParser()
config.read(CONF_FILE) config.read(CONF_FILE)
logging.basicConfig(format="%(asctime)s: %(message)s", logging.basicConfig(format="%(asctime)s: %(message)s",
level = int(config['LOG_LEVEL']['log_level']) level=int(config['LOG_LEVEL']['log_level'])
) )
del(cwd) del cwd
REG_FILE = config['DEFAULT']['applications_db'] REG_FILE = config['DEFAULT']['applications_db']
# Does everything related to applicants, i.e. creating, manipulations... # Does everything related to applicants, i.e. creating, manipulations...
class applicants(): class Applicants:
# User identifier # User identifier
identifier = "username" identifier = "username"
# SQLite DB Path # SQLite DB Path
sourceDB = "" sourceDB = ""
# another sqlite to batch-recreate users # another sqlite to batch-recreate users
differentDB = "" differentDB = ""
def __init__(self, lident, sourceDB=REG_FILE):
self.identifier = lident def __init__(self, lident, sourcedb=REG_FILE):
self.sourceDB = sourceDB self.identifier = lident
self.__connectToDB__("source") self.sourceDB = sourcedb
# all results shall be done with dict_factory! Makes everything so much simpler self.__connectToDB__("source")
self.sdbCursor.row_factory = dict_factory # all results shall be done with dict_factory! Makes everything so much simpler
self.sdbCursor.row_factory = dict_factory
def __del__(self):
self.__closeDB__("source") def __del__(self):
self.__closeDB__("source")
def __connectToDB__(self, which):
if which == "source": def __connectToDB__(self, which):
try: if which == "source":
self.sdbConnection = sqlite3.connect(self.sourceDB) try:
self.sdbCursor = self.sdbConnection.cursor() self.sdbConnection = sqlite3.connect(self.sourceDB)
except sqlite3.Error as e: self.sdbCursor = self.sdbConnection.cursor()
logging.exception("Database: Couldn't open database and get cursor: %s" % e) except sqlite3.Error as e:
else: logging.exception("Database: Couldn't open database and get cursor: %s" % e)
self.ddbConnection = sqlite3.connect(self.differentDB) else:
self.ddbCursor = self.ddbConnection.cursor() self.ddbConnection = sqlite3.connect(self.differentDB)
self.ddbCursor = self.ddbConnection.cursor()
def __closeDB__(self, which):
if(which == "source"): def __closeDB__(self, which):
try: if which == "source":
self.sdbConnection.close() try:
except sqlite3.Error as e: self.sdbConnection.close()
logging.exception("Couldn't close database! Error: %s" % e) # @TODO: Dump full db with query or just the executed querys to file except sqlite3.Error as e:
else: logging.exception(
self.ddbConnection.close() # @TODO: Evaluate getting rid of ddb(differentDB)? "Couldn't close database! Error: %s" % e)
# @TODO: Dump full db with query or just the executed querys to file
# get List of all applications(not accepted yet) else:
def getApplicationsList(self): self.ddbConnection.close() # @TODO: Evaluate getting rid of ddb(differentDB)?
query = "SELECT * FROM `applications` WHERE `status` = '0'"
try: # get List of all applications(not accepted yet)
self.sdbCursor.execute(query) def getapplicationslist(self):
rows = self.sdbCursor.fetchall() query = "SELECT * FROM `applications` WHERE `status` = '0'"
except sqlite3.Error as e: try:
logging.exception("Database Error: %s" % e) self.sdbCursor.execute(query)
rows=[] rows = self.sdbCursor.fetchall()
return rows except sqlite3.Error as e:
logging.exception("Database Error: %s" % e)
def getApprovedApplicantsList(self): rows = []
query = "SELECT * From `applications` WHERE `status` = '1'" return rows
try:
self.sdbCursor.execute(query) def getapprovedapplicantslist(self):
rows = self.sdbCursor.fetchall() query = "SELECT * From `applications` WHERE `status` = '1'"
except sqlite3.Error as e: try:
logging.exception("Database Error: %s" % e) self.sdbCursor.execute(query)
rows=[] rows = self.sdbCursor.fetchall()
return rows except sqlite3.Error as e:
logging.exception("Database Error: %s" % e)
# edit aproved users rows = []
def editApprovedApplicant(self, term, updaterow): return rows
try:
self.sdbCursor.execute( # edit aproved users
"UPDATE `applications` SET ? WHERE id=?", def editapprovedapplicants(self, term):
( str(term), ) try:
) # the fuck did i try here?
self.sdbConnection.commit() self.sdbCursor.execute(
except sqlite3.Error as e: "UPDATE `applications` WHERE id=?", (str(term),)
logging.exception("Database Error: %s" % e) )
self.sdbConnection.commit()
# set user to aproved except sqlite3.Error as e:
def setApprovedApplication(self, selectterm): logging.exception("Database Error: %s" % e)
query = "SELECT `username` FROM `applications` WHERE `username` = `{0!s}`".format(selectterm)
# set user to aproved
# get applicants data def setapprovedapplication(self, selectterm):
def getApplicantsData(self, term): # query = "SELECT `username` FROM `applications` WHERE `username` = `{0!s}`".format(selectterm)
# @TODO: Use shorthand if for the correct query, directly into sqlite pass
if self.identifier == "id":
try: # get applicants data
self.sdbCursor.execute( def getapplicantsdata(self, term):
"SELECT * FROM `applications` WHERE id = ?", # @TODO: Use shorthand if for the correct query, directly into sqlite
( str(term), ) if self.identifier == "id":
) try:
except sqlite3.Error as e: self.sdbCursor.execute(
logging.exception("Database Error: %s" % e) "SELECT * FROM `applications` WHERE id = ?",
(str(term),)
else: )
self.sdbCursor.execute( except sqlite3.Error as e:
"SELECT * FROM `applications` WHERE username = ?", logging.exception("Database Error: %s" % e)
( str(term), )
) else:
result = self.sdbCursor.fetchone() self.sdbCursor.execute(
return result "SELECT * FROM `applications` WHERE username = ?",
(str(term),)
# @TODO: migrade just approved users to some new/another sqlitedb )
def migrateApprovedData(self, different_db): result = self.sdbCursor.fetchone()
pass return result
# @TODO: delete migrated data # @TODO: migrade just approved users to some new/another sqlitedb
def deleteMigratedDataSet(self, selectterm): def migrateapproveddata(self, different_db):
pass pass
# Applicants whom doesnt got approved should get removed # @TODO: delete migrated data
def removeApplicant(self, term): def deletemigrateddata(self, selectterm):
if self.identifier == "id": pass
try:
self.sdbCursor.execute( # Applicants whom doesnt got approved should get removed
"DELETE FROM `applications` WHERE id = ?", def removeapplicant(self, term):
( str(term), ) if self.identifier == "id":
) try:
self.sdbConnection.commit() self.sdbCursor.execute(
except sqlite3.Error as e: "DELETE FROM `applications` WHERE id = ?",
logging.exception("Database Error: %s" % e) (str(term),)
)
else: self.sdbConnection.commit()
self.sdbCursor.execute( except sqlite3.Error as e:
"DELETE FROM `applications` WHERE username = ?", logging.exception("Database Error: %s" % e)
( str(term), )
) else:
self.sdbConnection.commit() self.sdbCursor.execute(
'DELETE FROM `applications` WHERE username = ?',
#@TODO: Possibility to work without passing users manually (str(term),)
def selectedUser(userid, username = False): )
pass self.sdbConnection.commit()
# Print out a list of aprovable users # @TODO: Possibility to work without passing users manually
def printApprovableUsers(self, users): def selecteduser(userid, username=False):
i=0 pass
for user in users:
print("ID: {0!s}, Status: {0!s}, Name: {0!s}".format(i, user["status"], user["username"])) # Print out a list of aprovable users
i += 1 def printapprovableusers(self, users):
return i i = 0
for user in users:
# Get List of users print("ID: {0!s}, Status: {0!s}, Name: {0!s}".format(i, user["status"], user["username"]))
def userPrint(self, fetched, userid): i += 1
print("ID: {0!s}".format(fetched[int(userid)]["id"])) return i
print("Username: {0!s}".format(fetched[int(userid)]["username"]))
print("Mail: {0!s}".format(fetched[int(userid)]["email"])) # Get List of users
print("SSH: {0!s}".format(fetched[int(userid)]["pubkey"])) @staticmethod
print("Registrated time: {0!s}".format(fetched[int(userid)]["timestamp"])) def userprint(fetched, userid):
print("ID: {0!s}".format(fetched[int(userid)]["id"]))
# Approve an applicant. Handles everything related, like create home dir, set flags blabla print("Username: {0!s}".format(fetched[int(userid)]["username"]))
def approveApplicant(self, term): print("Mail: {0!s}".format(fetched[int(userid)]["email"]))
user = self.getApplicantsData(term) print("SSH: {0!s}".format(fetched[int(userid)]["pubkey"]))
ret = self.__execScript(user) print("Registrated time: {0!s}".format(fetched[int(userid)]["timestamp"]))
if ret[0] != 0: # @DEBUG: Change to == 0
print("Something went wrong in the user creation! Exiting without deleting users record in database!") # Approve an applicant. Handles everything related, like create home dir, set flags blabla
print("Last executed commands: {0!s}\nreturn code: {1!s}".format(ret[-1][1], ret[-1][0])) def approveapplicant(self, term):
exit(0) user = self.getapplicantsdata(term)
ret = self.__execScript(user)
if self.identifier == "id": if ret[0] != 0: # @DEBUG: Change to == 0
try: print("Something went wrong in the user creation! Exiting without deleting users record in database!")
self.sdbCursor.execute( print("Last executed commands: {0!s}\nreturn code: {1!s}".format(ret[-1][1], ret[-1][0]))
"UPDATE `applications` SET `status`=1 WHERE `id`=?", exit(0)
( str(term), )
) if self.identifier == "id":
self.sdbConnection.commit() try:
except sqlite3.Error as e: self.sdbCursor.execute(
logging.exception("Database Error: %s" % e) "UPDATE `applications` SET `status`=1 WHERE `id`=?",
(str(term),)
else: )
self.sdbCursor.execute( self.sdbConnection.commit()
"UPDATE `applications` SET `status`=1 WHERE `username`=?" except sqlite3.Error as e:
( str(term), ) logging.exception("Database Error: %s" % e)
)
self.sdbConnection.commit() else:
self.sdbCursor.execute(
# Script execution, handles everything done with the shell/commands themselves "UPDATE `applications` SET `status`=1 WHERE `username`=?",
def __execScript(self, user): (str(term), )
# @TODO: omfg just write some wrapper-class/lib... sucks hard! )
username=user["username"] self.sdbConnection.commit()
homeDir="/home/"+username+"/"
sshDir=homeDir+".ssh/" # Script execution, handles everything done with the shell/commands themselves
executed=[] @staticmethod
def __execScript(user):
executed.append(["useradd", "-m", username]) # @TODO: omfg just write some wrapper-class/lib... sucks hard!
rcode = subprocess.call(executed[0]) username = user["username"]
if rcode != 0: home_dir = "/home/" + username + "/"
return [rcode,executed,] ssh_dir = home_dir + ".ssh/"
executed = []
executed.append(["usermod", "--lock", username])
rcode = subprocess.call(executed[1]) #empty pw executed.append(["useradd", "-m", username])
if rcode != 0: returncode = subprocess.call(executed[0])
return [rcode,executed,] if returncode != 0:
return [returncode, executed, ]
executed.append(["usermod", "-a", "-G", "tilde", username])
rcode = subprocess.call(executed[2]) # add to usergroup executed.append(["usermod", "--lock", username])
if rcode != 0: returncode = subprocess.call(executed[1]) # empty pw
return [rcode,executed,] if returncode != 0:
return [returncode, executed, ]
executed.append(["mkdir", sshDir])
try: executed.append(["usermod", "-a", "-G", "tilde", username])
# @TODO: use config variable(chmodPerms) returncode = subprocess.call(executed[2]) # add to usergroup
ret = os.mkdir(sshDir, 0o777) #create sshdir if returncode != 0:
rcode = 0 return [returncode, executed, ]
except OSError as e:
logging.exception(e.strerror) executed.append(["mkdir", ssh_dir])
rcode = e.errno # False, couldn't create. try:
return [rcode,executed,] # @TODO: use config variable(chmodPerms)
os.mkdir(ssh_dir, 0o777) # create sshdir
executed.append(["write(sshkey) to", sshDir+"authorized_keys"]) returncode = 0
with open(sshDir+"authorized_keys", "w") as f: except OSError as e:
f.write(user["pubkey"]) logging.exception(e.strerror)
if f.closed != True: returncode = e.errno # False, couldn't create.
logging.exception("Could'nt write to authorized_keys!") return [returncode, executed, ]
return [rcode,executed,]
executed.append(["write(sshkey) to", ssh_dir + "authorized_keys"])
executed.append(["chmod", "-Rv", "700", sshDir]) with open(ssh_dir + "authorized_keys", "w") as f:
f.write(user["pubkey"])
if not f.closed:
logging.exception("Could'nt write to authorized_keys!")
return [returncode, executed, ]
executed.append(["chmod", "-Rv", "700", ssh_dir])
try:
os.chmod(ssh_dir + "authorized_keys", 0o700) # directory is already 700
returncode = 0
except OSError as e:
logging.exception(e.strerror)
returncode = e.errno
return [returncode, executed, ]
try:
executed.append(["chown", "-Rv", username + ":" + username, ssh_dir])
os.chown(ssh_dir, pwd.getpwnam(username)[2], pwd.getpwnam(username)[3]) # 2=>uid, 3=>gid
executed.append(["chown", "-v", username + ":" + username, ssh_dir + "authorized_keys"])
os.chown(ssh_dir + "authorized_keys", pwd.getpwnam(username)[2], pwd.getpwnam(username)[3])
returncode = 0
except OSError as e:
logging.exception(e.strerror) # @TODO: maybe append strerror to executed instead of printing it
returncode = e.errno
return [returncode, executed, ]
return [returncode, executed, ]
# {'id': 7, 'username': 'testuser47', 'email': '47test@testmail.com', 'name':
# 'test Name', 'pubkey': 'ssh-rsa [...]', 'timestamp': '2018-08-22 13:31:16', 'status': 0}
try:
os.chmod(sshDir+"authorized_keys", 0o700) # directory is already 700
rcode = 0
except OSError as e:
logging.exception(e.strerror)
rcode = e.errno
return [rcode, executed,]
try:
executed.append(["chown", "-Rv", username+":"+username, sshDir])
os.chown(sshDir, pwd.getpwnam(username)[2], pwd.getpwnam(username)[3]) #2=>uid, 3=>gid
executed.append(["chown", "-v", username+":"+username, sshDir+"authorized_keys"])
os.chown(sshDir+"authorized_keys", pwd.getpwnam(username)[2], pwd.getpwnam(username)[3])
rcode = 0
except OSError as e:
logging.exception(e.strerror) # @TODO: maybe append strerror to executed instead of printing it
rcode = e.errno
return [rcode, executed,]
return [rcode,executed,]
"""
{'id': 7, 'username': 'testuser47', 'email': '47test@testmail.com', 'name':
'test Name', 'pubkey': 'ssh-rsa [...]', 'timestamp': '2018-08-22 13:31:16', 'status': 0}
"""
def main(): def main():
# how many times the Seperator/Delimiter? # how many times the Separator/Delimiter?
delcount = 40 delcount = 40
# The seperator for the menu # The separator for the menu
Seperator = "="*delcount separator = "=" * delcount
Menu = Seperator+"\n\t\t Main-Menu:\n\n" \ menu = separator + "\n\t\t Main-Menu:\n\n" \
"\t 1) list and edit pending users\n"\ "\t 1) list and edit pending users\n" \
"\t 2) list applicants\n"\ "\t 2) list applicants\n" \
"\t 3) edit applicant\n"\ "\t 3) edit applicant\n" \
"\t 4) quit\n"+Seperator+"\n" "\t 4) quit\n" + separator + "\n"
# Identify by ID # Identify by ID
applications = applicants(lident = "id") applications = Applicants(lident="id")
while 1 != 0: while 1 != 0:
print(Menu) print(menu)
command = input("Please select, what you want to do: \n -> ") command = input("Please select, what you want to do: \n -> ")
# User shouldnt be able to type something in that isnt a number # User shouldn't be able to type something in that isnt a number
if command.isalpha() or command == '': if command.isalpha() or command == '':
clear()
print("!!! invalid input, please try again. !!!")
continue
# convert
command=int(command)
if command == 4 or command == "q":
exit(0)
# Edit and list pending users/applicants @TODO Wording: Users or applicants?
elif command == 1:
users = applications.getApplicationsList()
i=applications.printApprovableUsers(users)
if i == 0 :
print("No pending users")
# giving some time to aknowledge that something WRONG happened
input("Continue with Keypress...")
clear()
continue
usersel = 0
UserMax = i
print("Menu:\n r=>return to main")
# Edit Menue
while 1 != 0 or usersel != "r":
i = applications.printApprovableUsers(users)
if usersel == "r":
break # break when user presses r
usersel = input("Which user( ID ) do you want to change? ->")
if len(usersel) > 1 or usersel.isalpha():
usersel = ""
# convert to int if input isnt an r
usersel = int(usersel) if usersel != '' and usersel != 'r' else 0
if usersel > UserMax - 1:
print("User {0!s} doesn't exist!".format(usersel))
continue
# Show the user his chosen user and ask what to do
applications.userPrint(users, usersel)
print("You chosed ID No. {0!s}, what do you like to do?".format(usersel))
chosenUser = usersel
usersel = ""
# Finally down the edit menue!
while usersel != "e":
usersel = input("User: {0!s}\n \t\t(A)ctivate \n\t\t(R)emove \n\t\tR(e)turn\n -> ".format(chosenUser))
if usersel == "A":
applications.approveApplicant(users[chosenUser]['id'])
print("User {0!s} has been successfully approved!".format(users[chosenUser]['username']))
input("waiting for input...")
clear()
usersel="e" # remove for being able to continue editing?
continue
elif usersel == "R":
applications.removeApplicant(users[chosenUser]['id'])
print("User {0!s} successfully deleted!".format(user[chosenUser]['username']))
input("waiting for input...")
clear() clear()
print("!!! invalid input, please try again. !!!")
continue continue
elif usersel == "e":
clear() # convert
command = int(command)
if command == 4 or command == "q":
exit(0)
# Edit and list pending users/applicants @TODO Wording: Users or applicants?
elif command == 1:
users = applications.getapplicationslist()
i = applications.printapprovableusers(users)
if i == 0:
print("No pending users")
# giving some time to acknowledge that something WRONG happened
input("Continue with Keypress...")
clear()
continue
user_selection = 0
user_max = i
print("Menu:\n r=>return to main")
# Edit Menu
while 1 != 0 or user_selection != "r":
i = applications.printapprovableusers(users)
if user_selection == "r":
break # break when user presses r
user_selection = input("Which user( ID ) do you want to change? ->")
if len(user_selection) > 1 or user_selection.isalpha():
user_selection = ""
# convert to int if input isnt an r
user_selection = int(user_selection) if user_selection != '' and user_selection != 'r' else 0
if user_selection > user_max - 1:
print("User {0!s} doesn't exist!".format(user_selection))
continue
# Show the user his chosen user and ask what to do
applications.userprint(users, user_selection)
print("You chosed ID No. {0!s}, what do you like to do?".format(user_selection))
chosen_user = user_selection
user_selection = ""
# Finally down the edit menu!
while user_selection != "e":
user_selection = input(
"User: {0!s}\n \t\t(A)ctivate \n\t\t(R)emove \n\t\tR(e)turn\n -> ".format(chosen_user))
if user_selection == "A":
applications.approveapplicant(users[chosen_user]['id'])
print("User {0!s} has been successfully approved!".format(users[chosen_user]['username']))
input("waiting for input...")
clear()
user_selection = "e" # remove for being able to continue editing?
continue
elif user_selection == "R":
applications.removeapplicant(users[chosen_user]['id'])
print("User {0!s} successfully deleted!".format(user[chosen_user]['username']))
input("waiting for input...")
clear()
continue
elif user_selection == "e":
clear()
continue
elif int(command) == 2:
users = applications.getapprovedapplicantslist()
if not users:
print("no activate users yet!")
i = 0
for user in users:
print("ID: {0!s}, Status: {1!s}, Name: {2!s}".format(user["id"], user["status"], user["username"]))
continue continue
elif command == str(3):
elif int(command) == 2: pass
users = applications.getApprovedApplicantsList() else:
exit(0)
if users == []:
print("no activate users yet!")
i=0
for user in users:
print("ID: {0!s}, Status: {1!s}, Name: {2!s}".format(user["id"], user["status"], user["username"]))
continue
elif command == str(3):
pass
else:
exit(0)
if __name__ == "__main__": if __name__ == "__main__":
try: try:
main() main()
exit(0) exit(0)
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass
#print("Exception occured. View log file for details.") # print("Exception occured. View log file for details.")
#logging.exception("Some exception occured") # logging.exception("Some exception occured")

Loading…
Cancel
Save