diff --git a/app/modules/api/auth.py b/app/modules/api/auth.py new file mode 100644 index 0000000..6afcf04 --- /dev/null +++ b/app/modules/api/auth.py @@ -0,0 +1,702 @@ +import mysql.connector as mariadb +from mako.template import Template +from bottle import request + +import config.database as database +import config.globalvar as globalvar + +import bcrypt +import re +import datetime + +import scripts.googly as googly +import scripts.saltedkey as saltedkey +import scripts.loggorilla as loggorilla +import scripts.sendwave as sendwave +import scripts.tokenguard as tokenguard +import scripts.paperplease as paperplease + +class auth: + + def __init__(self): + # TODO: set database + self.db_main = mariadb.connect(**database.db_main) + self.cursor = self.db_main.cursor(dictionary=True) + # TODO: Config your SMTP + self.smtpconfig = globalvar.smtpconfig + + def register(self, params): + APIADDR = "/api/auth/registration/register/:roles" + loggorilla.prcss(APIADDR, "Define parameters") + response = {} + captcha = params["captcha" ] + username = params["username" ].lower() + email = params["email" ].lower() + password = params["password" ] + roles = params["roles" ] + self.cursor.execute("BEGIN;") + try: + loggorilla.prcss(APIADDR, "Process parameters") + hashed = bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode() + token = saltedkey.token(username, hashed) + # TODO: set production/development version + if globalvar.production == True: + # TODO: set reCAPTCHA['server'] + captcha_r = googly.recaptcha(captcha, globalvar.reCAPTCHA['server']) + score = captcha_r["score"] + else: + # For localhost reCAPTCHA['server'] + captcha_r = 'dev mode' + score = 0.9 + loggorilla.prcss(APIADDR, "Get dependency data") + self.cursor.execute("SELECT COUNT(*) AS `count` FROM auth_profile WHERE email = %s ; ", (email,) ) + result_profile = self.cursor.fetchone() + self.cursor.execute("SELECT COUNT(*) AS `count` FROM auth_profile WHERE username = %s ; ", (username,) ) + result_username = self.cursor.fetchone() + self.cursor.execute(f"SELECT COUNT(*) AS `count` FROM auth_profile_verification INNER JOIN auth_profile ON auth_profile.id = auth_profile_verification.auth_profile WHERE auth_profile.email = %s AND auth_profile_verification.type = 'email' AND auth_profile_verification.verified = 0 ; ", (email,) ) + result_unverified = self.cursor.fetchone() + # TODO: set roles + self.cursor.execute(f"SELECT id, name FROM `auth_roles` WHERE auth_roles.name = %s ; ", (roles,) ) + result_roles = self.cursor.fetchone() + loggorilla.prcss(APIADDR, "Validation") + if score < 0.2: + response["status" ] = "failed" + response["desc" ] = "you are sus as a bot" + response["data" ] = { + "recaptcha":captcha_r + } + elif globalvar.su_mode == False and roles == 'su': + response["status" ] = "failed" + response["desc" ] = "Forbidden to become super user" + response["data" ] = { + "recaptcha":captcha_r + } + elif not re.match(r'^\w+$', username): + response["status" ] = "failed" + response["desc" ] = "username can only use letters, numbers, and the underscore symbol" + response["data" ] = { + "recaptcha":captcha_r + } + elif len(username) > 35: + response["status" ] = "failed" + response["desc" ] = "username can not longer than 35 character" + response["data" ] = { + "recaptcha":captcha_r + } + elif len(username) < 3: + response["status" ] = "failed" + response["desc" ] = "username too short" + response["data" ] = { + "recaptcha":captcha_r + } + elif len(password) < 6: + response["status" ] = "failed" + response["desc" ] = "password too short" + response["data" ] = { + "recaptcha":captcha_r + } + elif result_unverified["count"] >= 1: + response["status" ] = "failed" + response["desc" ] = "check email for verification" + response["data" ] = { + "message": "Check email for verification. Please contact us if you still had a problem", + "resend": globalvar.resend_url(email), + "recaptcha":captcha_r + } + elif result_profile["count"] >= 1: + response["status" ] = "failed" + response["desc" ] = "email already taken" + response["data" ] = { + "recaptcha":captcha_r + } + elif result_username["count"] >= 1: + response["status" ] = "failed" + response["desc" ] = "username already taken" + response["data" ] = { + "recaptcha":captcha_r + } + elif captcha and username and email and password: + loggorilla.prcss(APIADDR, "Insering") + self.cursor.execute("INSERT INTO `auth` VALUES (%s, %s, NOW(), NULL);", (token, hashed) ) + self.cursor.execute("INSERT INTO `auth_profile` VALUES (DEFAULT, %s, %s, %s, NULL, NOW(), NULL);", (token, username, email) ) + auth_profile_lastrowid = self.cursor.lastrowid + self.cursor.execute("INSERT INTO `auth_profile_verification` VALUES (DEFAULT, %s, 'email', 0, NOW(), NULL);", (auth_profile_lastrowid,) ) + self.cursor.execute("INSERT INTO `auth_profile_roles` VALUES (DEFAULT, %s, %s, NOW(), NULL);", (auth_profile_lastrowid, result_roles['id']) ) + loggorilla.prcss(APIADDR, "Generate URL") + # TODO: set expired time + expired = datetime.datetime.now() + datetime.timedelta(minutes=30) # Can be hours or minutes + expired_isoformat = expired.isoformat() + payload = { + "token" : token, + "expired": expired_isoformat + } + # TODO: Config SSH key for tokenguard and set verification URL + token_encrypt = tokenguard.encode(payload, globalvar.ssh['key']['private'], globalvar.ssh['passphrase']) + verification_url = globalvar.verification_url(token_encrypt) + notme_url = globalvar.notme_url(token_encrypt) + loggorilla.prcss(APIADDR, "Sending email") + self.smtpconfig['subject' ] = f"{globalvar.title} email verification" + self.smtpconfig['to' ] = email + self.smtpconfig['text' ] = f"Please visit this link to complete the registration: {verification_url}. You are not registering this? report on this: {notme_url}." + self.smtpconfig['html' ] = Template(params["mako"]["email"]['template']).render( + title = globalvar.title, + heading = self.smtpconfig['subject'], + image = "https://colorlib.com/etc/email-template/10/images/email.png", + unsubscribe = "#", + container = Template(params["mako"]["email"]['container']).render( + verify = verification_url, + notme = notme_url + ) + ) + sendwave.smtp(self.smtpconfig) + loggorilla.prcss(APIADDR, "Giving response") + response["status" ] = "success" + response["desc" ] = "Register success. Check email for verification." + response["data" ] = { + "recaptcha":captcha_r + } + else: + response["status" ] = "failed" + response["desc" ] = "Form not complete." + response["data" ] = { + "recaptcha":captcha_r + } + except Exception as e: + self.cursor.execute("ROLLBACK;") + loggorilla.error(APIADDR, str(e) ) + response["status" ] = "failed" + response["desc" ] = "Internal Server Error. Please contact us if you still have an error." + finally: + self.cursor.execute("COMMIT;") + self.cursor.close() + self.db_main.close() + return response + + def resend(self, params): + APIADDR = "/api/auth/registration/resend" + loggorilla.prcss(APIADDR, "Define parameters") + response = {} + email = params["email"].lower() + try: + loggorilla.prcss(APIADDR, "Get dependency data") + self.cursor.execute(f"SELECT COUNT(*) AS `count`, auth_profile.token, auth_profile.email FROM auth_profile_verification INNER JOIN auth_profile ON auth_profile.id = auth_profile_verification.auth_profile WHERE auth_profile.email = %s AND auth_profile_verification.type = 'email' AND auth_profile_verification.verified = 0 ; ", (email,) ) + result_unverified = self.cursor.fetchone() + token = result_unverified["token"].decode() + if result_unverified["count"] >= 1: + loggorilla.prcss(APIADDR, "Generate URL") + # TODO: set expired time + expired = datetime.datetime.now() + datetime.timedelta(minutes=30) # Can be hours or minutes + expired_isoformat = expired.isoformat() + payload = { + "token" : token, + "expired": expired_isoformat + } + # TODO: Config SSH key for tokenguard and set verification URL + token_encrypt = tokenguard.encode(payload, globalvar.ssh['key']['private'], globalvar.ssh['passphrase']) + verification_url = globalvar.verification_url(token_encrypt) + notme_url = globalvar.notme_url(token_encrypt) + loggorilla.prcss(APIADDR, "Sending email") + self.smtpconfig['subject' ] = f"{globalvar.title} email verification" + self.smtpconfig['to' ] = email + self.smtpconfig['text' ] = f"Please visit this link to complete the registration: {verification_url}. You are not registering this? report on this: {notme_url}." + self.smtpconfig['html' ] = Template(params["mako"]["email"]['template']).render( + title = globalvar.title, + heading = self.smtpconfig['subject'], + image = "https://colorlib.com/etc/email-template/10/images/email.png", + unsubscribe = "#", + container = Template(params["mako"]["email"]['container']).render( + verify = verification_url, + notme = notme_url + ) + ) + sendwave.smtp(self.smtpconfig) + loggorilla.prcss(APIADDR, "Giving response") + response["status" ] = "success" + response["desc" ] = "Resend success. Check email for verification." + else: + response["status" ] = "failed" + response["desc" ] = "The parameters seems suspicious and you are not authorized for that" + except Exception as e: + loggorilla.error(APIADDR, str(e) ) + response["status" ] = "failed" + response["desc" ] = "Internal Server Error. Please contact us if you still have an error. for detail" + finally: + self.cursor.close() + self.db_main.close() + return response + + def purge(self, params): + APIADDR = "/api/auth/purge" + response = {} + loggorilla.prcss(APIADDR, "Define parameters") + key = params["key" ] + token = params["token" ] + self.cursor.execute("BEGIN;") + try: + # TODO: set auth_key + if 'key' in params and params["key"] == globalvar.auth_key: + loggorilla.prcss(APIADDR, "Get dependency data") + self.cursor.execute("SELECT COUNT(*) AS `count`, token, id FROM auth_profile WHERE token = %s ; ", (token,) ) + result_profile = self.cursor.fetchone() + loggorilla.prcss(APIADDR, "Deleting") + self.cursor.execute("DELETE FROM auth_profile_roles WHERE auth_profile = %s ; ", (result_profile['id'],) ) + self.cursor.execute("DELETE FROM auth_profile_verification WHERE auth_profile = %s ; ", (result_profile['id'],) ) + self.cursor.execute("DELETE FROM auth_profile WHERE id = %s ; ", (result_profile['id'],) ) + self.cursor.execute("DELETE FROM auth_session WHERE token = %s ; ", (result_profile['token'],) ) + self.cursor.execute("DELETE FROM auth WHERE token = %s ; ", (result_profile['token'],) ) + loggorilla.prcss(APIADDR, "Giving response") + response["status" ] = "success" + response["desc" ] = "Purge success" + else: + response["status" ] = "failed" + response["desc" ] = "Forbidden" + except Exception as e: + self.cursor.execute("ROLLBACK;") + loggorilla.error(APIADDR, str(e) ) + response["status" ] = "failed" + response["desc" ] = "Internal Server Error. Please contact us if you still have an error." + finally: + self.cursor.execute("COMMIT;") + self.cursor.close() + self.db_main.close() + return response + + def notme(self, params): + APIADDR = "/api/auth/registration/notme" + response = {} + loggorilla.prcss(APIADDR, "Define parameters") + token_encrypt = params["token"] + self.cursor.execute("BEGIN;") + try: + loggorilla.prcss(APIADDR, "Decrypt token") + # TODO: Config SSH key for tokenguard + payload = tokenguard.decode(token_encrypt, globalvar.ssh['key']['public']) + token = payload['token'] + + loggorilla.prcss(APIADDR, "Get dependency data") + self.cursor.execute(f"SELECT COUNT(*) AS `count`, auth_profile_verification.verified FROM auth_profile_verification INNER JOIN auth_profile ON auth_profile.id = auth_profile_verification.auth_profile WHERE auth_profile.token = %s AND auth_profile_verification.type = 'email' ; ", (token,) ) + result_verification = self.cursor.fetchone() + self.cursor.execute("SELECT COUNT(*) AS `count`, token, id, email FROM auth_profile WHERE token = %s ; ", (token,) ) + result_profile = self.cursor.fetchone() + + loggorilla.prcss(APIADDR, "Validation") + if result_verification['verified'] == 1: + response["status" ] = "failed" + response["desc" ] = "Your account already verified" + else: + loggorilla.prcss(APIADDR, "Deleting") + self.cursor.execute("DELETE FROM auth_profile_roles WHERE auth_profile = %s ; ", (result_profile['id'],) ) + self.cursor.execute("DELETE FROM auth_profile_verification WHERE auth_profile = %s ; ", (result_profile['id'],) ) + self.cursor.execute("DELETE FROM auth_profile WHERE id = %s ; ", (result_profile['id'],) ) + self.cursor.execute("DELETE FROM auth_session WHERE token = %s ; ", (result_profile['token'],) ) + self.cursor.execute("DELETE FROM auth WHERE token = %s ; ", (result_profile['token'],) ) + loggorilla.prcss(APIADDR, "Sending email") + self.smtpconfig['subject' ] = f"{globalvar.title} - Thanks for the reporting" + self.smtpconfig['to' ] = result_profile['email'] + self.smtpconfig['text' ] = "Thanks for your report. Now your data will be deleted from our system." + self.smtpconfig['html' ] = Template(params["mako"]["email"]['template']).render( + title = globalvar.title, + heading = self.smtpconfig['subject'], + image = "https://colorlib.com/etc/email-template/10/images/email.png", + unsubscribe = "#", + container = Template(params["mako"]["email"]['container']).render( + message = "Thanks for your report. Now your data will be deleted from our system." + ) + ) + sendwave.smtp(self.smtpconfig) + loggorilla.prcss(APIADDR, "Giving response") + response["status" ] = "success" + response["desc" ] = "Thanks for your report. Now your data will be deleted from our system." + except Exception as e: + self.cursor.execute("ROLLBACK;") + loggorilla.error(APIADDR, str(e) ) + response["status" ] = "failed" + response["desc" ] = "Internal Server Error. Please contact us if you still have an error. for detail" + finally: + self.cursor.execute("COMMIT;") + self.cursor.close() + self.db_main.close() + return response + + def verify(self, params): + APIADDR = "/api/auth/registration/verify" + response = {} + loggorilla.prcss(APIADDR, "Define parameters") + token_encrypt = params["token"] + self.cursor.execute("BEGIN;") + try: + loggorilla.prcss(APIADDR, "Decrypt token") + # TODO: Config SSH key for tokenguard + payload = tokenguard.decode(token_encrypt, globalvar.ssh['key']['public']) + token = payload['token'] + expired = datetime.datetime.fromisoformat(payload['expired']) + loggorilla.prcss(APIADDR, "Get dependency data") + self.cursor.execute(f"SELECT COUNT(*) AS `count`, auth_profile_verification.verified FROM auth_profile_verification INNER JOIN auth_profile ON auth_profile.id = auth_profile_verification.auth_profile WHERE auth_profile.token = %s AND auth_profile_verification.type = 'email' ; ", (token,) ) + result_verification = self.cursor.fetchone() + self.cursor.execute("SELECT COUNT(*) AS `count`, token, id, email FROM auth_profile WHERE token = %s ; ", (token,) ) + result_profile = self.cursor.fetchone() + loggorilla.prcss(APIADDR, "Validation") + if result_verification['verified'] == 1: + loggorilla.prcss(APIADDR, "Giving response") + response["status" ] = "failed" + response["desc" ] = "Your account already verified" + elif datetime.datetime.now() > expired: + loggorilla.prcss(APIADDR, "Deleting") + self.cursor.execute("DELETE FROM auth_profile_roles WHERE auth_profile = %s ; ", (result_profile['id'],) ) + self.cursor.execute("DELETE FROM auth_profile_verification WHERE auth_profile = %s ; ", (result_profile['id'],) ) + self.cursor.execute("DELETE FROM auth_profile WHERE id = %s ; ", (result_profile['id'],) ) + self.cursor.execute("DELETE FROM auth_session WHERE token = %s ; ", (result_profile['token'],) ) + self.cursor.execute("DELETE FROM auth WHERE token = %s ; ", (result_profile['token'],) ) + loggorilla.prcss(APIADDR, "Giving response") + response["status" ] = "failed" + response["desc" ] = "Expired. Your data removed." + else: + loggorilla.prcss(APIADDR, "Updating") + self.cursor.execute("UPDATE `auth_profile_verification` SET `verified` = 1 WHERE `type` = 'email' AND `auth_profile` = %s ; ", (result_profile['id'],) ) + loggorilla.prcss(APIADDR, "Sending email") + loggorilla.fyinf(APIADDR, "1") + self.smtpconfig['subject' ] = f"Welcome to {globalvar.title}" + self.smtpconfig['to' ] = result_profile['email'] + self.smtpconfig['text' ] = f"Welcome. Now your account is verified." + loggorilla.fyinf(APIADDR, "2") + self.smtpconfig['html' ] = Template(params["mako"]["email"]['template']).render( + title = globalvar.title, + heading = self.smtpconfig['subject'], + image = "https://colorlib.com/etc/email-template/10/images/email.png", + unsubscribe = "#", + container = Template(params["mako"]["email"]['container']).render( + message = "Welcome. Now your account is verified." + ) + ) + loggorilla.fyinf(APIADDR, "3") + sendwave.smtp(self.smtpconfig) + loggorilla.prcss(APIADDR, "Giving response") + response["status" ] = "success" + response["desc" ] = "Congratulation. Your account is verified." + except Exception as e: + self.cursor.execute("ROLLBACK;") + loggorilla.error(APIADDR, str(e) ) + response["status" ] = "failed" + response["desc" ] = "Internal Server Error. Please contact us if you still have an error. for detail" + finally: + self.cursor.execute("COMMIT;") + self.cursor.close() + self.db_main.close() + return response + + def login(self, params): + APIADDR = "/api/auth/login" + response = {} + loggorilla.prcss(APIADDR, "Define parameters") + username = params["username"].lower() + password = params["password"] + self.cursor.execute("BEGIN;") + try: + loggorilla.prcss(APIADDR, "Get dependency data") + self.cursor.execute("SELECT COUNT(*) AS `count`, auth.token, auth_profile.id, auth_profile.username, auth.password FROM auth_profile INNER JOIN auth ON auth.token = auth_profile.token WHERE auth_profile.username = %s ; ", (username,) ) + result_login = self.cursor.fetchone() + self.cursor.execute("SELECT `auth_profile`, `type`, `verified` FROM auth_profile_verification WHERE `type` = 'email' AND `auth_profile` = %s ; ", (result_login['id'],) ) + result_verification = self.cursor.fetchone() + loggorilla.prcss(APIADDR, "Validation") + if result_login['count'] == 1 and result_verification['verified'] == 1 and bcrypt.checkpw(password.encode(), result_login['password'].decode().encode() ) : + loggorilla.prcss(APIADDR, "Add session") + self.cursor.execute(f"INSERT INTO `auth_session` VALUES (DEFAULT, %s, NOW(), NOW() + INTERVAL 30 DAY, NOW(), NULL)", ( result_login['token'], ) ) + session_last_id = self.cursor.lastrowid + self.cursor.execute(f"SELECT `id`, `start`, `end` FROM `auth_session` WHERE id = %s ; ", ( session_last_id, ) ) + session = self.cursor.fetchone() + loggorilla.prcss(APIADDR, "Generate JWT token") + payload = { + "session" : { + "id" : session['id' ], + "start" : session['start' ].isoformat(), + "end" : session['end' ].isoformat() + } + } + jwt_token = tokenguard.encode(payload, globalvar.ssh['key']['private'], globalvar.ssh['passphrase']) + loggorilla.prcss(APIADDR, "Giving response") + response["status" ] = "success" + response["desc" ] = "Login success. Session added." + response["data" ] = { + "jwt" : jwt_token, + "username" : username + } + else: + response["status" ] = "failed" + response["desc" ] = "Username or password is incorrect" + except Exception as e: + self.cursor.execute("ROLLBACK;") + loggorilla.error(APIADDR, str(e) ) + response["status" ] = "failed" + response["desc" ] = "Internal Server Error. Please contact us if you still have an error. for detail" + finally: + self.cursor.execute("COMMIT;") + self.cursor.close() + self.db_main.close() + return response + + # Special API for Bottle/Beaker web session + def session(self, params): + APIADDR = "/api/auth/session/:type" + loggorilla.prcss(APIADDR, "Define parameters") + response = {} + type = params["type" ] # set / check / out + jwt = params["jwt" ] + + payload = tokenguard.decode(jwt, globalvar.ssh['key']['public']) + session_id = payload["session"]["id"] + + try: + session_beaker = request.environ.get('beaker.session') + if type == 'set': + loggorilla.prcss(APIADDR, "Set session") + session_beaker["token"] = jwt + session_beaker.save() + response["status" ] = "success" + response["desc" ] = "Session set" + elif type == 'check': + loggorilla.prcss(APIADDR, "Check session") + self.cursor.execute(f"SELECT COUNT(*) AS `count` FROM auth_session WHERE id = %s ; ", (session_id,) ) + result_session = self.cursor.fetchone() + if result_session == 0: + session_beaker.delete() + response["status" ] = "success" + response["desc" ] = "session out" + response["data" ] = { + "status":"lost" + } + else: + response["status" ] = "success" + response["desc" ] = "session active" + response["data" ] = { + "status":"active" + } + elif type == 'out': + loggorilla.prcss(APIADDR, "Out session") + session_beaker.delete() + response["status" ] = "success" + response["desc" ] = "Session out" + else: + response["status" ] = "failed" + response["desc" ] = "False parameters" + + except Exception as e: + loggorilla.error(APIADDR, str(e) ) + response["status" ] = "failed" + response["desc" ] = "Internal Server Error. Please contact us if you still have an error. for detail" + finally: + self.cursor.close() + self.db_main.close() + return response + + def forgot(self, params): + APIADDR = "/api/auth/password/forgot/:type" + loggorilla.prcss(APIADDR, "Define parameters") + response = {} + type = params["type" ] # POST: send / change + self.cursor.execute("BEGIN;") + try: + if type == "send": + loggorilla.prcss(APIADDR, "Define parameters inside decision") + email = params["email"].lower() + loggorilla.prcss(APIADDR, "Get dependency data") + self.cursor.execute(f"SELECT COUNT(*) AS `count`, auth_profile.token, auth_profile.email FROM auth_profile_verification INNER JOIN auth_profile ON auth_profile.id = auth_profile_verification.auth_profile WHERE auth_profile.email = %s AND auth_profile_verification.type = 'email' AND auth_profile_verification.verified = 1 ; ", (email,) ) + result_verified = self.cursor.fetchone() + token = result_verified["token"].decode() + if result_verified["count"] >= 1: + loggorilla.prcss(APIADDR, "Generate URL") + # TODO: set expired time + expired = datetime.datetime.now() + datetime.timedelta(minutes=30) # Can be hours or minutes + expired_isoformat = expired.isoformat() + payload = { + "token" : token, + "expired": expired_isoformat + } + # TODO: Config SSH key for tokenguard and set forgot URL + token_encrypt = tokenguard.encode(payload, globalvar.ssh['key']['private'], globalvar.ssh['passphrase']) + change_forgot_url = globalvar.change_forgot_url(token_encrypt) + loggorilla.prcss(APIADDR, "Sending email") + self.smtpconfig['subject' ] = f"{globalvar.title} forgot password" + self.smtpconfig['to' ] = email + self.smtpconfig['text' ] = f"Please visit this link to change password: {change_forgot_url}. Avoid the link if you are not request this." + self.smtpconfig['html' ] = Template(params["mako"]["email"]['template']).render( + title = globalvar.title, + heading = self.smtpconfig['subject'], + image = "https://colorlib.com/etc/email-template/10/images/email.png", + unsubscribe = "#", + container = Template(params["mako"]["email"]['container']).render( + change = change_forgot_url + ) + ) + sendwave.smtp(self.smtpconfig) + loggorilla.prcss(APIADDR, "Giving response") + response["status" ] = "success" + response["desc" ] = "Check email for password change." + else: + response["status" ] = "failed" + response["desc" ] = "The parameters seems suspicious and you are not authorized for that" + elif type == "change": + loggorilla.prcss(APIADDR, "Define parameters inside decision") + token_encrypt = params["token" ] + password = params["password" ] + loggorilla.prcss(APIADDR, "Decrypt token") + payload = tokenguard.decode(token_encrypt, globalvar.ssh['key']['public']) + token = payload['token'] + expired = datetime.datetime.fromisoformat(payload['expired']) + loggorilla.prcss(APIADDR, "Process parameters") + hashed = bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode() + loggorilla.prcss(APIADDR, "Get dependency data") + self.cursor.execute(f"SELECT COUNT(*) AS `count`, auth_profile.email FROM auth_profile_verification INNER JOIN auth_profile ON auth_profile.id = auth_profile_verification.auth_profile WHERE auth_profile.token = %s AND auth_profile_verification.type = 'email' AND auth_profile_verification.verified = 1 ; ", (token,) ) + result_verified = self.cursor.fetchone() + email = result_verified['email'] + loggorilla.prcss(APIADDR, "Validation") + if datetime.datetime.now() > expired: + response["status" ] = "failed" + response["desc" ] = "Expired" + elif len(password) < 6: + response["status" ] = "failed" + response["desc" ] = "password too short" + elif result_verified["count"] == 0: + response["status" ] = "failed" + response["desc" ] = "Forbidden: No active account for this" + response["data" ] = { + "message": "Please contact us if you still had a problem" + } + else: + loggorilla.prcss(APIADDR, "Updating") + self.cursor.execute("UPDATE `auth` SET `password` = %s, `when_update` = NOW() WHERE `token` = %s", (hashed, token) ) + loggorilla.prcss(APIADDR, "Sending email") + self.smtpconfig['subject' ] = f"{globalvar.title} password change success" + self.smtpconfig['to' ] = email + self.smtpconfig['text' ] = f"You had change your password." + self.smtpconfig['html' ] = Template(params["mako"]["email"]['template']).render( + title = globalvar.title, + heading = self.smtpconfig['subject'], + image = "https://colorlib.com/etc/email-template/10/images/email.png", + unsubscribe = "#", + container = Template(params["mako"]["email"]['container']).render( + message = f"You had change your password." + ) + ) + sendwave.smtp(self.smtpconfig) + loggorilla.prcss(APIADDR, "Giving response") + response["status" ] = "success" + response["desc" ] = "password change success" + else: + response["status" ] = "failed" + response["desc" ] = "forbidden" + except Exception as e: + self.cursor.execute("ROLLBACK;") + loggorilla.error(APIADDR, str(e) ) + response["status" ] = "failed" + response["desc" ] = "Internal Server Error. Please contact us if you still have an error. for detail" + finally: + self.cursor.execute("COMMIT;") + self.cursor.close() + self.db_main.close() + return response + + def change(self, params): + APIADDR = "/api/auth/password/change" + loggorilla.prcss(APIADDR, "Define parameters") + response = {} + + jwt = params["jwt" ] + old = params["old" ] + new = params["new" ] + + hashed = bcrypt.hashpw(new.encode(), bcrypt.gensalt()).decode() + + payload = tokenguard.decode(jwt, globalvar.ssh['key']['public']) + session_id = payload["session"]["id"] + self.cursor.execute("BEGIN;") + try: + loggorilla.prcss(APIADDR, "Get dependency data") + + self.cursor.execute(f"SELECT * FROM auth_session WHERE id = %s ; ", (session_id,) ) + result_session = self.cursor.fetchone() + token = result_session['token' ] + expired = result_session['end' ] + + self.cursor.execute(f"SELECT COUNT(*) AS `count`, auth_profile.id, auth_profile.email FROM auth_profile_verification INNER JOIN auth_profile ON auth_profile.id = auth_profile_verification.auth_profile WHERE auth_profile.token = %s AND auth_profile_verification.type = 'email' AND auth_profile_verification.verified = 1 ; ", (token,) ) + result_verified = self.cursor.fetchone() + profile = result_verified['id' ] + email = result_verified['email' ] + + self.cursor.execute(f"SELECT auth_roles FROM auth_profile_roles WHERE auth_profile = %s ; ", (profile,) ) + result_roles = self.cursor.fetchall() + roles = [] + for row in result_roles: + roles.append(row['auth_roles']) + + loggorilla.prcss(APIADDR, "Validation") + if datetime.datetime.now() > expired: + loggorilla.prcss(APIADDR, "Deleting") + self.cursor.execute("DELETE FROM auth_session WHERE id = %s ; ", (session_id,) ) + loggorilla.prcss(APIADDR, "Giving response") + response["status" ] = "failed" + response["desc" ] = "Expired. Your session removed." + elif len(new) < 6: + response["status" ] = "failed" + response["desc" ] = "password too short" + elif result_verified["count"] == 0: + response["status" ] = "failed" + response["desc" ] = "Forbidden: No active account for this" + response["data" ] = { + "message": "Please contact us if you still had a problem" + } + else: + loggorilla.prcss(APIADDR, "Updating") + self.cursor.execute("UPDATE `auth` SET `password` = %s, `when_update` = NOW() WHERE `token` = %s", (hashed, token) ) + loggorilla.prcss(APIADDR, "Sending email") + self.smtpconfig['subject' ] = f"{globalvar.title} password change success" + self.smtpconfig['to' ] = email + self.smtpconfig['text' ] = f"You had change your password." + self.smtpconfig['html' ] = Template(params["mako"]["email"]['template']).render( + title = globalvar.title, + heading = self.smtpconfig['subject'], + image = "https://colorlib.com/etc/email-template/10/images/email.png", + unsubscribe = "#", + container = Template(params["mako"]["email"]['container']).render( + message = f"You had change your password." + ) + ) + sendwave.smtp(self.smtpconfig) + loggorilla.prcss(APIADDR, "Giving response") + response["status" ] = "success" + response["desc" ] = "password change success" + + except Exception as e: + self.cursor.execute("ROLLBACK;") + loggorilla.error(APIADDR, str(e) ) + response["status" ] = "failed" + response["desc" ] = "Internal Server Error. Please contact us if you still have an error. for detail" + finally: + self.cursor.execute("COMMIT;") + self.cursor.close() + self.db_main.close() + return response + + def logout(self, params): + APIADDR = "/logout" + loggorilla.prcss(APIADDR, "Define parameters") + response = {} + jwt = params["jwt" ] + payload = tokenguard.decode(jwt, globalvar.ssh['key']['public']) + session_id = payload["session"]["id"] + self.cursor.execute("BEGIN;") + try: + loggorilla.prcss(APIADDR, "Deleting") + self.cursor.execute("DELETE FROM auth_session WHERE id = %s ; ", (session_id,) ) + loggorilla.prcss(APIADDR, "Giving response") + response["status" ] = "success" + response["desc" ] = f"Your session ({session_id}) removed." + except Exception as e: + self.cursor.execute("ROLLBACK;") + loggorilla.error(APIADDR, str(e) ) + response["status" ] = "failed" + response["desc" ] = "Internal Server Error. Please contact us if you still have an error. for detail" + finally: + self.cursor.execute("COMMIT;") + self.cursor.close() + self.db_main.close() + return response