101 lines
4.5 KiB
Python
101 lines
4.5 KiB
Python
|
import mysql.connector as mariadb
|
||
|
|
||
|
import datetime
|
||
|
|
||
|
import config.database as database
|
||
|
import config.globalvar as globalvar
|
||
|
|
||
|
import scripts.loggorilla as loggorilla
|
||
|
import scripts.tokenguard as tokenguard
|
||
|
|
||
|
class session():
|
||
|
|
||
|
def __init__(self):
|
||
|
self.db_main = mariadb.connect(**database.db_main)
|
||
|
self.cursor = self.db_main.cursor(dictionary=True)
|
||
|
|
||
|
def user(self, jwt, allowed_roles):
|
||
|
APIADDR = "procedure.validation"
|
||
|
response = {}
|
||
|
try:
|
||
|
loggorilla.prcss(APIADDR, "Define parameters")
|
||
|
payload = tokenguard.decode(jwt, globalvar.ssh['key']['public'])
|
||
|
|
||
|
loggorilla.prcss(APIADDR, "Get dependency data")
|
||
|
self.cursor.execute(f"SELECT * FROM auth_session WHERE id = %s ; ", (payload["session"]["id"],) )
|
||
|
r_session = self.cursor.fetchone()
|
||
|
self.cursor.execute(f"SELECT COUNT(*) AS `count`, auth_profile.* FROM auth_profile_verification LEFT JOIN auth_profile ON auth_profile.id = auth_profile_verification.auth_profile WHERE auth_profile.token = %s AND auth_profile_verification.type = 'email' AND auth_profile_verification.verified = 1 ; ", (r_session['token'],) )
|
||
|
r_profile = self.cursor.fetchone()
|
||
|
self.cursor.execute(f"SELECT auth_roles FROM auth_profile_roles WHERE auth_profile = %s ; ", (r_profile['id'],) )
|
||
|
r_roles = self.cursor.fetchall()
|
||
|
|
||
|
r_profile['roles'] = [0]
|
||
|
for row in r_roles:
|
||
|
r_profile['roles'].remove(0)
|
||
|
r_profile['roles'].append(row['auth_roles'])
|
||
|
|
||
|
loggorilla.prcss(APIADDR, "Validation")
|
||
|
if datetime.datetime.now() > r_session['end']:
|
||
|
loggorilla.prcss(APIADDR, "Deleting")
|
||
|
self.cursor.execute("DELETE FROM auth_session WHERE id = %s ; ", (r_session['id'],) )
|
||
|
loggorilla.prcss(APIADDR, "Giving response")
|
||
|
loggorilla.accss(APIADDR, "Expired. Your session removed." )
|
||
|
response["status" ] = "failed"
|
||
|
response["desc" ] = "Expired. Your session removed."
|
||
|
response["data" ] = {
|
||
|
"valid" :{
|
||
|
"status" : 0,
|
||
|
"desc" : "expired"
|
||
|
},
|
||
|
"session" : r_session,
|
||
|
"profile" : r_profile
|
||
|
}
|
||
|
elif r_profile["count"] == 0:
|
||
|
loggorilla.prcss(APIADDR, "Giving response")
|
||
|
loggorilla.accss(APIADDR, "No active account for this" )
|
||
|
response["status" ] = "failed"
|
||
|
response["desc" ] = "No active account for this"
|
||
|
response["data" ] = {
|
||
|
"message" : "Please contact us if you still had a problem",
|
||
|
"valid" :{
|
||
|
"status" : 0,
|
||
|
"desc" : "forbidden"
|
||
|
},
|
||
|
"session" : r_session,
|
||
|
"profile" : r_profile
|
||
|
}
|
||
|
elif any(role in allowed_roles for role in r_profile['roles']):
|
||
|
loggorilla.prcss(APIADDR, "Giving response")
|
||
|
response["status" ] = "success"
|
||
|
response["desc" ] = "User roles authorized"
|
||
|
response["data" ] = {
|
||
|
"valid" :{
|
||
|
"status" : 1,
|
||
|
"desc" : "authorized"
|
||
|
},
|
||
|
"session" : r_session,
|
||
|
"profile" : r_profile
|
||
|
}
|
||
|
else:
|
||
|
loggorilla.prcss(APIADDR, "Giving response")
|
||
|
loggorilla.accss(APIADDR, "User roles unauthorized" )
|
||
|
response["status" ] = "failed"
|
||
|
response["desc" ] = "User roles unauthorized"
|
||
|
response["data" ] = {
|
||
|
"valid" :{
|
||
|
"status" : 0,
|
||
|
"desc" : "unauthorized"
|
||
|
},
|
||
|
"session" : r_session,
|
||
|
"profile" : r_profile
|
||
|
}
|
||
|
except Exception as e:
|
||
|
loggorilla.error(APIADDR, str(e) )
|
||
|
response["status" ] = "failed"
|
||
|
response["desc" ] = "Internal Server Error. Please contact us if you still have an error. for detail"
|
||
|
finally:
|
||
|
self.cursor.close()
|
||
|
self.db_main.close()
|
||
|
|
||
|
return response
|