import mysql.connector as mariadb import datetime import config.database as database import config.globalvar as globalvar import scripts.loggorilla as loggorilla import scripts.tokenguard as tokenguard class session(): def __init__(self): self.db_main = mariadb.connect(**database.db_main) self.cursor = self.db_main.cursor(dictionary=True) def user(self, jwt, allowed_roles): APIADDR = "procedure.validation" response = {} try: loggorilla.prcss(APIADDR, "Define parameters") payload = tokenguard.decode(jwt, globalvar.ssh['key']['public']) loggorilla.prcss(APIADDR, "Get dependency data") self.cursor.execute(f"SELECT * FROM auth_session WHERE id = %s ; ", (payload["session"]["id"],) ) r_session = self.cursor.fetchone() self.cursor.execute(f"SELECT COUNT(*) AS `count`, auth_profile.* FROM auth_profile_verification LEFT JOIN auth_profile ON auth_profile.id = auth_profile_verification.auth_profile WHERE auth_profile.token = %s AND auth_profile_verification.type = 'email' AND auth_profile_verification.verified = 1 ; ", (r_session['token'],) ) r_profile = self.cursor.fetchone() self.cursor.execute(f"SELECT auth_roles FROM auth_profile_roles WHERE auth_profile = %s ; ", (r_profile['id'],) ) r_roles = self.cursor.fetchall() r_profile['roles'] = [0] for row in r_roles: r_profile['roles'].remove(0) r_profile['roles'].append(row['auth_roles']) loggorilla.prcss(APIADDR, "Validation") if datetime.datetime.now() > r_session['end']: loggorilla.prcss(APIADDR, "Deleting") self.cursor.execute("DELETE FROM auth_session WHERE id = %s ; ", (r_session['id'],) ) loggorilla.prcss(APIADDR, "Giving response") loggorilla.accss(APIADDR, "Expired. Your session removed." ) response["status" ] = "failed" response["desc" ] = "Expired. Your session removed." response["data" ] = { "valid" :{ "status" : 0, "desc" : "expired" }, "session" : r_session, "profile" : r_profile } elif r_profile["count"] == 0: loggorilla.prcss(APIADDR, "Giving response") loggorilla.accss(APIADDR, "No active account for this" ) response["status" ] = "failed" response["desc" ] = "No active account for this" response["data" ] = { "message" : "Please contact us if you still had a problem", "valid" :{ "status" : 0, "desc" : "forbidden" }, "session" : r_session, "profile" : r_profile } elif any(role in allowed_roles for role in r_profile['roles']): loggorilla.prcss(APIADDR, "Giving response") response["status" ] = "success" response["desc" ] = "User roles authorized" response["data" ] = { "valid" :{ "status" : 1, "desc" : "authorized" }, "session" : r_session, "profile" : r_profile } else: loggorilla.prcss(APIADDR, "Giving response") loggorilla.accss(APIADDR, "User roles unauthorized" ) response["status" ] = "failed" response["desc" ] = "User roles unauthorized" response["data" ] = { "valid" :{ "status" : 0, "desc" : "unauthorized" }, "session" : r_session, "profile" : r_profile } except Exception as e: loggorilla.error(APIADDR, str(e) ) response["status" ] = "failed" response["desc" ] = "Internal Server Error. Please contact us if you still have an error. for detail" finally: self.cursor.close() self.db_main.close() return response