@ -4,16 +4,20 @@ import configparser, logging, sqlite3, argparse, pwd
import os
import subprocess
# Clear shell
def clear ( ) :
os . system ( ' cls ' if os . name == ' nt ' else ' clear ' )
# create dictionary out of sqlite results
def dict_factory ( cursor , row ) :
d = { }
for idx , col in enumerate ( cursor . description ) :
d [ col [ 0 ] ] = row [ idx ]
return d
# prints command(but doesnt execute them)
# need this for work, just convenience
def debugExec ( commands ) :
@ -34,28 +38,28 @@ argparser.add_argument('-c', '--config', default = cwd,
type = str , help = ' Path to configuration file ' , required = False )
args = argparser . parse_args ( )
CONF_FILE = args . config
config = configparser . ConfigParser ( )
config . read ( CONF_FILE )
logging . basicConfig ( format = " %(asctime)s : %(message)s " ,
level = int ( config [ ' LOG_LEVEL ' ] [ ' log_level ' ] )
)
del ( cwd )
del cwd
REG_FILE = config [ ' DEFAULT ' ] [ ' applications_db ' ]
# Does everything related to applicants, i.e. creating, manipulations...
class applicants( ) :
class Applicants :
# User identifier
identifier = " username "
# SQLite DB Path
sourceDB = " "
# another sqlite to batch-recreate users
differentDB = " "
def __init__ ( self , lident , sourceDB = REG_FILE ) :
def __init__ ( self , lident , sourcedb = REG_FILE ) :
self . identifier = lident
self . sourceDB = source DB
self . sourceDB = source db
self . __connectToDB__ ( " source " )
# all results shall be done with dict_factory! Makes everything so much simpler
self . sdbCursor . row_factory = dict_factory
@ -75,16 +79,18 @@ class applicants():
self . ddbCursor = self . ddbConnection . cursor ( )
def __closeDB__ ( self , which ) :
if ( which == " source " ) :
if which == " source " :
try :
self . sdbConnection . close ( )
except sqlite3 . Error as e :
logging . exception ( " Couldn ' t close database! Error: %s " % e ) # @TODO: Dump full db with query or just the executed querys to file
logging . exception (
" Couldn ' t close database! Error: %s " % e )
# @TODO: Dump full db with query or just the executed querys to file
else :
self . ddbConnection . close ( ) # @TODO: Evaluate getting rid of ddb(differentDB)?
# get List of all applications(not accepted yet)
def getApplicationsL ist( self ) :
def getapplicationsl ist( self ) :
query = " SELECT * FROM `applications` WHERE `status` = ' 0 ' "
try :
self . sdbCursor . execute ( query )
@ -94,7 +100,7 @@ class applicants():
rows = [ ]
return rows
def getApprovedApplicantsL ist( self ) :
def getapprovedapplicantsl ist( self ) :
query = " SELECT * From `applications` WHERE `status` = ' 1 ' "
try :
self . sdbCursor . execute ( query )
@ -105,22 +111,23 @@ class applicants():
return rows
# edit aproved users
def editApprovedApplicant ( self , term , updaterow ) :
def editapprovedapplicants ( self , term ) :
try :
# the fuck did i try here?
self . sdbCursor . execute (
" UPDATE `applications` SET ? WHERE id=? " ,
( str ( term ) , )
" UPDATE `applications` WHERE id=? " , ( str ( term ) , )
)
self . sdbConnection . commit ( )
except sqlite3 . Error as e :
logging . exception ( " Database Error: %s " % e )
# set user to aproved
def setApprovedApplication ( self , selectterm ) :
query = " SELECT `username` FROM `applications` WHERE `username` = ` {0!s} ` " . format ( selectterm )
def setapprovedapplication ( self , selectterm ) :
# query = "SELECT `username` FROM `applications` WHERE `username` = `{0!s}`".format(selectterm)
pass
# get applicants data
def getApplicantsD ata( self , term ) :
def getapplicantsd ata( self , term ) :
# @TODO: Use shorthand if for the correct query, directly into sqlite
if self . identifier == " id " :
try :
@ -140,15 +147,15 @@ class applicants():
return result
# @TODO: migrade just approved users to some new/another sqlitedb
def migrateApprovedD ata( self , different_db ) :
def migrateapprovedd ata( self , different_db ) :
pass
# @TODO: delete migrated data
def deleteMigratedDataSet ( self , selectterm ) :
def deletemigrateddata ( self , selectterm ) :
pass
# Applicants whom doesnt got approved should get removed
def removeA pplicant( self , term ) :
def removea pplicant( self , term ) :
if self . identifier == " id " :
try :
self . sdbCursor . execute (
@ -161,17 +168,17 @@ class applicants():
else :
self . sdbCursor . execute (
" DELETE FROM `applications` WHERE username = ? " ,
' DELETE FROM `applications` WHERE username = ? ' ,
( str ( term ) , )
)
self . sdbConnection . commit ( )
# @TODO: Possibility to work without passing users manually
def selectedU ser( userid , username = False ) :
def selectedu ser( userid , username = False ) :
pass
# Print out a list of aprovable users
def printApprovableU sers( self , users ) :
def printapprovableu sers( self , users ) :
i = 0
for user in users :
print ( " ID: {0!s} , Status: {0!s} , Name: {0!s} " . format ( i , user [ " status " ] , user [ " username " ] ) )
@ -179,7 +186,8 @@ class applicants():
return i
# Get List of users
def userPrint ( self , fetched , userid ) :
@staticmethod
def userprint ( fetched , userid ) :
print ( " ID: {0!s} " . format ( fetched [ int ( userid ) ] [ " id " ] ) )
print ( " Username: {0!s} " . format ( fetched [ int ( userid ) ] [ " username " ] ) )
print ( " Mail: {0!s} " . format ( fetched [ int ( userid ) ] [ " email " ] ) )
@ -187,8 +195,8 @@ class applicants():
print ( " Registrated time: {0!s} " . format ( fetched [ int ( userid ) ] [ " timestamp " ] ) )
# Approve an applicant. Handles everything related, like create home dir, set flags blabla
def approveA pplicant( self , term ) :
user = self . get ApplicantsD ata( term )
def approvea pplicant( self , term ) :
user = self . get applicantsd ata( term )
ret = self . __execScript ( user )
if ret [ 0 ] != 0 : # @DEBUG: Change to == 0
print ( " Something went wrong in the user creation! Exiting without deleting users record in database! " )
@ -207,99 +215,96 @@ class applicants():
else :
self . sdbCursor . execute (
" UPDATE `applications` SET `status`=1 WHERE `username`=? "
" UPDATE `applications` SET `status`=1 WHERE `username`=? " ,
( str ( term ) , )
)
self . sdbConnection . commit ( )
# Script execution, handles everything done with the shell/commands themselves
def __execScript ( self , user ) :
@staticmethod
def __execScript ( user ) :
# @TODO: omfg just write some wrapper-class/lib... sucks hard!
username = user [ " username " ]
homeDir = " /home/ " + username + " / "
sshDir = homeDir + " .ssh/ "
home_dir = " /home/ " + username + " / "
ssh_dir = home_dir + " .ssh/ "
executed = [ ]
executed . append ( [ " useradd " , " -m " , username ] )
r code = subprocess . call ( executed [ 0 ] )
if r code != 0 :
return [ r code, executed , ]
r eturn code = subprocess . call ( executed [ 0 ] )
if r eturn code != 0 :
return [ r eturn code, executed , ]
executed . append ( [ " usermod " , " --lock " , username ] )
r code = subprocess . call ( executed [ 1 ] ) # empty pw
if r code != 0 :
return [ r code, executed , ]
r eturn code = subprocess . call ( executed [ 1 ] ) # empty pw
if r eturn code != 0 :
return [ r eturn code, executed , ]
executed . append ( [ " usermod " , " -a " , " -G " , " tilde " , username ] )
r code = subprocess . call ( executed [ 2 ] ) # add to usergroup
if r code != 0 :
return [ r code, executed , ]
r eturn code = subprocess . call ( executed [ 2 ] ) # add to usergroup
if r eturn code != 0 :
return [ r eturn code, executed , ]
executed . append ( [ " mkdir " , ssh D ir] )
executed . append ( [ " mkdir " , ssh _d ir] )
try :
# @TODO: use config variable(chmodPerms)
ret = os . mkdir ( sshDir , 0o777 ) # create sshdir
r code = 0
os . mkdir ( ssh_dir , 0o777 ) # create sshdir
r eturn code = 0
except OSError as e :
logging . exception ( e . strerror )
r code = e . errno # False, couldn't create.
return [ r code, executed , ]
r eturn code = e . errno # False, couldn't create.
return [ r eturn code, executed , ]
executed . append ( [ " write(sshkey) to " , ssh Dir+ " authorized_keys " ] )
with open ( sshDir + " authorized_keys " , " w " ) as f :
executed . append ( [ " write(sshkey) to " , ssh _dir + " authorized_keys " ] )
with open ( ssh_dir + " authorized_keys " , " w " ) as f :
f . write ( user [ " pubkey " ] )
if f . closed != True :
if not f . closed :
logging . exception ( " Could ' nt write to authorized_keys! " )
return [ r code, executed , ]
return [ r eturn code, executed , ]
executed . append ( [ " chmod " , " -Rv " , " 700 " , ssh D ir] )
executed . append ( [ " chmod " , " -Rv " , " 700 " , ssh _d ir] )
try :
os . chmod ( ssh Dir+ " authorized_keys " , 0o700 ) # directory is already 700
r code = 0
os . chmod ( ssh _dir + " authorized_keys " , 0o700 ) # directory is already 700
r eturn code = 0
except OSError as e :
logging . exception ( e . strerror )
rcode = e . errno
return [ rcode , executed , ]
returncode = e . errno
return [ returncode , executed , ]
try :
executed . append ( [ " chown " , " -Rv " , username + " : " + username , sshD ir] )
os . chown ( ssh D ir, pwd . getpwnam ( username ) [ 2 ] , pwd . getpwnam ( username ) [ 3 ] ) # 2=>uid, 3=>gid
executed . append ( [ " chown " , " -v " , username + " : " + username , sshDir + " authorized_keys " ] )
os . chown ( ssh Dir+ " authorized_keys " , pwd . getpwnam ( username ) [ 2 ] , pwd . getpwnam ( username ) [ 3 ] )
r code = 0
executed . append ( [ " chown " , " -Rv " , username + " : " + username , ssh_d ir] )
os . chown ( ssh _d ir, pwd . getpwnam ( username ) [ 2 ] , pwd . getpwnam ( username ) [ 3 ] ) # 2=>uid, 3=>gid
executed . append ( [ " chown " , " -v " , username + " : " + username , ssh_dir + " authorized_keys " ] )
os . chown ( ssh _dir + " authorized_keys " , pwd . getpwnam ( username ) [ 2 ] , pwd . getpwnam ( username ) [ 3 ] )
r eturn code = 0
except OSError as e :
logging . exception ( e . strerror ) # @TODO: maybe append strerror to executed instead of printing it
rcode = e . errno
return [ rcode , executed , ]
return [ rcode , executed , ]
"""
{ ' id ' : 7 , ' username ' : ' testuser47 ' , ' email ' : ' 47test@testmail.com ' , ' name ' :
' test Name ' , ' pubkey ' : ' ssh-rsa [...] ' , ' timestamp ' : ' 2018-08-22 13:31:16 ' , ' status ' : 0 }
returncode = e . errno
return [ returncode , executed , ]
"""
return [ returncode , executed , ]
# {'id': 7, 'username': 'testuser47', 'email': '47test@testmail.com', 'name':
# 'test Name', 'pubkey': 'ssh-rsa [...]', 'timestamp': '2018-08-22 13:31:16', 'status': 0}
def main ( ) :
# how many times the Sepe rator/Delimiter?
# how many times the Sepa rator/Delimiter?
delcount = 40
# The sepe rator for the menu
Sepe rator = " = " * delcount
Menu = Seperator + " \n \t \t Main-Menu: \n \n " \
# The sepa rator for the menu
sepa rator = " = " * delcount
menu = separator + " \n \t \t Main-Menu: \n \n " \
" \t 1) list and edit pending users \n " \
" \t 2) list applicants \n " \
" \t 3) edit applicant \n " \
" \t 4) quit \n " + Seperator + " \n "
" \t 4) quit \n " + separator + " \n "
# Identify by ID
applications = applicants ( lident = " id " )
applications = Applicants ( lident = " id " )
while 1 != 0 :
print ( M enu)
print ( m enu)
command = input ( " Please select, what you want to do: \n -> " )
# User shouldn t be able to type something in that isnt a number
# User shouldn ' t be able to type something in that isnt a number
if command . isalpha ( ) or command == ' ' :
clear ( )
print ( " !!! invalid input, please try again. !!! " )
@ -312,65 +317,66 @@ def main():
exit ( 0 )
# Edit and list pending users/applicants @TODO Wording: Users or applicants?
elif command == 1 :
users = applications . get ApplicationsL ist( )
i = applications . printApprovableU sers( users )
users = applications . get applicationsl ist( )
i = applications . printapprovableu sers( users )
if i == 0 :
print ( " No pending users " )
# giving some time to a knowledge that something WRONG happened
# giving some time to a c knowledge that something WRONG happened
input ( " Continue with Keypress... " )
clear ( )
continue
user sel = 0
UserM ax = i
user _ selection = 0
user_m ax = i
print ( " Menu: \n r=>return to main " )
# Edit Menu e
while 1 != 0 or user sel != " r " :
i = applications . print ApprovableU sers( users )
if user sel == " r " :
# Edit Menu
while 1 != 0 or user _ selection != " r " :
i = applications . print approvableu sers( users )
if user _ selection == " r " :
break # break when user presses r
user sel = input ( " Which user( ID ) do you want to change? -> " )
if len ( user sel) > 1 or user sel. isalpha ( ) :
user sel = " "
user _ selection = input ( " Which user( ID ) do you want to change? -> " )
if len ( user _ selection ) > 1 or user _ selection . isalpha ( ) :
user _ selection = " "
# convert to int if input isnt an r
user sel = int ( user sel) if user sel != ' ' and user sel != ' r ' else 0
if usersel > UserM ax - 1 :
print ( " User {0!s} doesn ' t exist! " . format ( user sel) )
user _ selection = int ( user _ selection ) if user _ selection != ' ' and user _ selection != ' r ' else 0
if user_selection > user_m ax - 1 :
print ( " User {0!s} doesn ' t exist! " . format ( user _ selection ) )
continue
# Show the user his chosen user and ask what to do
applications . userPrint ( users , usersel )
print ( " You chosed ID No. {0!s} , what do you like to do? " . format ( usersel ) )
chosenUser = usersel
usersel = " "
# Finally down the edit menue!
while usersel != " e " :
usersel = input ( " User: {0!s} \n \t \t (A)ctivate \n \t \t (R)emove \n \t \t R(e)turn \n -> " . format ( chosenUser ) )
if usersel == " A " :
applications . approveApplicant ( users [ chosenUser ] [ ' id ' ] )
print ( " User {0!s} has been successfully approved! " . format ( users [ chosenUser ] [ ' username ' ] ) )
applications . userprint ( users , user_selection )
print ( " You chosed ID No. {0!s} , what do you like to do? " . format ( user_selection ) )
chosen_user = user_selection
user_selection = " "
# Finally down the edit menu!
while user_selection != " e " :
user_selection = input (
" User: {0!s} \n \t \t (A)ctivate \n \t \t (R)emove \n \t \t R(e)turn \n -> " . format ( chosen_user ) )
if user_selection == " A " :
applications . approveapplicant ( users [ chosen_user ] [ ' id ' ] )
print ( " User {0!s} has been successfully approved! " . format ( users [ chosen_user ] [ ' username ' ] ) )
input ( " waiting for input... " )
clear ( )
user sel= " e " # remove for being able to continue editing?
user _ selection = " e " # remove for being able to continue editing?
continue
elif user sel == " R " :
applications . remove Applicant( users [ chosenU ser] [ ' id ' ] )
print ( " User {0!s} successfully deleted! " . format ( user [ chosen U ser] [ ' username ' ] ) )
elif user _ selection == " R " :
applications . remove applicant( users [ chosen_u ser] [ ' id ' ] )
print ( " User {0!s} successfully deleted! " . format ( user [ chosen _u ser] [ ' username ' ] ) )
input ( " waiting for input... " )
clear ( )
continue
elif user sel == " e " :
elif user _ selection == " e " :
clear ( )
continue
elif int ( command ) == 2 :
users = applications . get ApprovedApplicantsL ist( )
users = applications . get approvedapplicantsl ist( )
if users == [ ] :
if not users :
print ( " no activate users yet! " )
i = 0
for user in users :
@ -381,6 +387,7 @@ def main():
else :
exit ( 0 )
if __name__ == " __main__ " :
try :
main ( )