Update validation.py

This commit is contained in:
Dita Aji Pratama 2024-08-26 19:19:33 +07:00
parent 2e44c2b0b1
commit 9ca7ba6872

View File

@ -9,11 +9,18 @@ class validation():
def __init__(self): def __init__(self):
pass pass
def account(self, APIADDR, allowed_roles): def account(self, APIADDR, allowed_roles, jwt=None):
response = {} response = {}
loggorilla.prcss(APIADDR, "Get jwt")
if jwt is None:
loggorilla.fyinf(APIADDR, "jwt params is empty: Use beaker session")
for_api = False
beaker_session = request.environ.get('beaker.session')
jwt = beaker_session["token"] if "token" in beaker_session else None
else:
loggorilla.fyinf(APIADDR, "jwt params is available: Use jwt from params")
for_api = True
loggorilla.prcss(APIADDR, "Define parameters") loggorilla.prcss(APIADDR, "Define parameters")
beaker_session = request.environ.get('beaker.session')
jwt = beaker_session["token"] if "token" in beaker_session else None
if jwt is None: if jwt is None:
loggorilla.fyinf(APIADDR, "Guest") loggorilla.fyinf(APIADDR, "Guest")
r_session = {} r_session = {}
@ -23,33 +30,40 @@ class validation():
"phone" :None, "phone" :None,
"roles" :[0] "roles" :[0]
} }
session_not_found = False
else: else:
loggorilla.fyinf(APIADDR, "With JWT") loggorilla.fyinf(APIADDR, "With JWT")
loggorilla.prcss(APIADDR, "Get JWT payload data") loggorilla.prcss(APIADDR, "Get JWT payload data")
payload = tokenguard.decode(jwt, globalvar.ssh['key']['public']) payload = tokenguard.decode(jwt, globalvar.ssh['key']['public'])
loggorilla.prcss(APIADDR, "Get dependency data") loggorilla.prcss(APIADDR, "Connect DB")
db_main = mariadb.connect(**database.db_main) db_main = mariadb.connect(**database.db_main)
cursor = db_main.cursor(dictionary=True) cursor = db_main.cursor(dictionary=True)
loggorilla.prcss(APIADDR, "Get dependency data: Session") loggorilla.prcss(APIADDR, "Get dependency data")
cursor.execute(f"SELECT * FROM auth_session WHERE id = %s ; ", (payload["session"]["id"],) ) cursor.execute(f"SELECT * FROM auth_session WHERE id = %s ; ", (payload["session"]["id"],) )
r_session = cursor.fetchone() r_session = cursor.fetchone()
loggorilla.prcss(APIADDR, "Get dependency data: Profile") if r_session is None:
cursor.execute(f"SELECT COUNT(*) AS `count`, auth_profile.* FROM auth_profile_verification LEFT JOIN auth_profile ON auth_profile.id = auth_profile_verification.auth_profile WHERE auth_profile.token = %s AND auth_profile_verification.type = 'email' AND auth_profile_verification.verified = 1 ; ", (r_session['token'],) ) session_not_found = True
r_profile = cursor.fetchone() r_session = {}
r_profile = {
"username" :None,
"email" :None,
"phone" :None,
"roles" :[0]
}
else:
session_not_found = False
cursor.execute(f"SELECT COUNT(*) AS `count`, auth_profile.* FROM auth_profile_verification LEFT JOIN auth_profile ON auth_profile.id = auth_profile_verification.auth_profile WHERE auth_profile.token = %s AND auth_profile_verification.type = 'email' AND auth_profile_verification.verified = 1 ; ", (r_session['token'],) )
r_profile = cursor.fetchone()
cursor.execute(f"SELECT auth_roles FROM auth_profile_roles WHERE auth_profile = %s ; ", (r_profile['id'],) )
r_roles = cursor.fetchall()
r_profile['roles'] = [item['auth_roles'] for item in r_roles]
loggorilla.prcss(APIADDR, "Get dependency data: Roles: execute") loggorilla.prcss(APIADDR, "Close DB")
cursor.execute(f"SELECT auth_roles FROM auth_profile_roles WHERE auth_profile = %s ; ", (r_profile['id'],) )
loggorilla.prcss(APIADDR, "Get dependency data: Roles: fetchall")
r_roles = cursor.fetchall()
loggorilla.fyinf(APIADDR, f"r_roles: {r_roles}")
loggorilla.prcss(APIADDR, "Get dependency data: Roles: variable replace")
r_profile['roles'] = [item['auth_roles'] for item in r_roles]
loggorilla.prcss(APIADDR, "Get dependency data: Close DB")
cursor.close() cursor.close()
db_main.close() db_main.close()
@ -60,11 +74,29 @@ class validation():
loggorilla.accss(APIADDR, f"Profile Username : {r_profile['username' ] if 'username' in r_profile else None}" ) loggorilla.accss(APIADDR, f"Profile Username : {r_profile['username' ] if 'username' in r_profile else None}" )
loggorilla.accss(APIADDR, f"Profile Email : {r_profile['email' ] if 'email' in r_profile else None}" ) loggorilla.accss(APIADDR, f"Profile Email : {r_profile['email' ] if 'email' in r_profile else None}" )
loggorilla.accss(APIADDR, f"Profile Phone : {r_profile['phone' ] if 'phone' in r_profile else None}" ) loggorilla.accss(APIADDR, f"Profile Phone : {r_profile['phone' ] if 'phone' in r_profile else None}" )
loggorilla.accss(APIADDR, f"Profile Roles : {r_profile['roles' ] if 'roles' in r_profile else None}" )
loggorilla.prcss(APIADDR, "Validation") loggorilla.prcss(APIADDR, "Validation")
if 0 not in r_profile['roles'] and datetime.datetime.now() > r_session['end']: if session_not_found:
loggorilla.accss(APIADDR, "Expired. Your session removed." ) loggorilla.accss(APIADDR, "Session not found" )
loggorilla.prcss(APIADDR, "Deleting") loggorilla.prcss(APIADDR, "Giving response")
response["status" ] = "failed"
response["desc" ] = "Your session not found."
response["data" ] = {
"valid" :{
"status" : 0,
"desc" : "removed"
},
"session" : r_session,
"profile" : r_profile
}
if for_api is True:
abort(401, "Session not found")
else:
redirect('/logout?msg=removed')
elif 0 not in r_profile['roles'] and datetime.datetime.now() > r_session['end']:
loggorilla.accss(APIADDR, "Session expired" )
loggorilla.prcss(APIADDR, "Deleting session")
self.cursor.execute("DELETE FROM auth_session WHERE id = %s ; ", (r_session['id'],) ) self.cursor.execute("DELETE FROM auth_session WHERE id = %s ; ", (r_session['id'],) )
loggorilla.prcss(APIADDR, "Giving response") loggorilla.prcss(APIADDR, "Giving response")
response["status" ] = "failed" response["status" ] = "failed"
@ -77,7 +109,10 @@ class validation():
"session" : r_session, "session" : r_session,
"profile" : r_profile "profile" : r_profile
} }
redirect('/logout?msg=expired') if for_api is True:
abort(401, "Session expired")
else:
redirect('/logout?msg=expired')
elif 0 not in r_profile['roles'] and r_profile["count"] == 0: elif 0 not in r_profile['roles'] and r_profile["count"] == 0:
loggorilla.accss(APIADDR, "No active account for this" ) loggorilla.accss(APIADDR, "No active account for this" )
loggorilla.prcss(APIADDR, "Giving response") loggorilla.prcss(APIADDR, "Giving response")