Main API module: auth.py

This commit is contained in:
Dita Aji Pratama 2024-06-17 12:34:50 +07:00
parent d4d12d920d
commit 1b906999e2

702
app/modules/api/auth.py Normal file
View File

@ -0,0 +1,702 @@
import mysql.connector as mariadb
from mako.template import Template
from bottle import request
import config.database as database
import config.globalvar as globalvar
import bcrypt
import re
import datetime
import scripts.googly as googly
import scripts.saltedkey as saltedkey
import scripts.loggorilla as loggorilla
import scripts.sendwave as sendwave
import scripts.tokenguard as tokenguard
import scripts.paperplease as paperplease
class auth:
def __init__(self):
# TODO: set database
self.db_main = mariadb.connect(**database.db_main)
self.cursor = self.db_main.cursor(dictionary=True)
# TODO: Config your SMTP
self.smtpconfig = globalvar.smtpconfig
def register(self, params):
APIADDR = "/api/auth/registration/register/:roles"
loggorilla.prcss(APIADDR, "Define parameters")
response = {}
captcha = params["captcha" ]
username = params["username" ].lower()
email = params["email" ].lower()
password = params["password" ]
roles = params["roles" ]
self.cursor.execute("BEGIN;")
try:
loggorilla.prcss(APIADDR, "Process parameters")
hashed = bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode()
token = saltedkey.token(username, hashed)
# TODO: set production/development version
if globalvar.production == True:
# TODO: set reCAPTCHA['server']
captcha_r = googly.recaptcha(captcha, globalvar.reCAPTCHA['server'])
score = captcha_r["score"]
else:
# For localhost reCAPTCHA['server']
captcha_r = 'dev mode'
score = 0.9
loggorilla.prcss(APIADDR, "Get dependency data")
self.cursor.execute("SELECT COUNT(*) AS `count` FROM auth_profile WHERE email = %s ; ", (email,) )
result_profile = self.cursor.fetchone()
self.cursor.execute("SELECT COUNT(*) AS `count` FROM auth_profile WHERE username = %s ; ", (username,) )
result_username = self.cursor.fetchone()
self.cursor.execute(f"SELECT COUNT(*) AS `count` FROM auth_profile_verification INNER JOIN auth_profile ON auth_profile.id = auth_profile_verification.auth_profile WHERE auth_profile.email = %s AND auth_profile_verification.type = 'email' AND auth_profile_verification.verified = 0 ; ", (email,) )
result_unverified = self.cursor.fetchone()
# TODO: set roles
self.cursor.execute(f"SELECT id, name FROM `auth_roles` WHERE auth_roles.name = %s ; ", (roles,) )
result_roles = self.cursor.fetchone()
loggorilla.prcss(APIADDR, "Validation")
if score < 0.2:
response["status" ] = "failed"
response["desc" ] = "you are sus as a bot"
response["data" ] = {
"recaptcha":captcha_r
}
elif globalvar.su_mode == False and roles == 'su':
response["status" ] = "failed"
response["desc" ] = "Forbidden to become super user"
response["data" ] = {
"recaptcha":captcha_r
}
elif not re.match(r'^\w+$', username):
response["status" ] = "failed"
response["desc" ] = "username can only use letters, numbers, and the underscore symbol"
response["data" ] = {
"recaptcha":captcha_r
}
elif len(username) > 35:
response["status" ] = "failed"
response["desc" ] = "username can not longer than 35 character"
response["data" ] = {
"recaptcha":captcha_r
}
elif len(username) < 3:
response["status" ] = "failed"
response["desc" ] = "username too short"
response["data" ] = {
"recaptcha":captcha_r
}
elif len(password) < 6:
response["status" ] = "failed"
response["desc" ] = "password too short"
response["data" ] = {
"recaptcha":captcha_r
}
elif result_unverified["count"] >= 1:
response["status" ] = "failed"
response["desc" ] = "check email for verification"
response["data" ] = {
"message": "Check email for verification. Please contact us if you still had a problem",
"resend": globalvar.resend_url(email),
"recaptcha":captcha_r
}
elif result_profile["count"] >= 1:
response["status" ] = "failed"
response["desc" ] = "email already taken"
response["data" ] = {
"recaptcha":captcha_r
}
elif result_username["count"] >= 1:
response["status" ] = "failed"
response["desc" ] = "username already taken"
response["data" ] = {
"recaptcha":captcha_r
}
elif captcha and username and email and password:
loggorilla.prcss(APIADDR, "Insering")
self.cursor.execute("INSERT INTO `auth` VALUES (%s, %s, NOW(), NULL);", (token, hashed) )
self.cursor.execute("INSERT INTO `auth_profile` VALUES (DEFAULT, %s, %s, %s, NULL, NOW(), NULL);", (token, username, email) )
auth_profile_lastrowid = self.cursor.lastrowid
self.cursor.execute("INSERT INTO `auth_profile_verification` VALUES (DEFAULT, %s, 'email', 0, NOW(), NULL);", (auth_profile_lastrowid,) )
self.cursor.execute("INSERT INTO `auth_profile_roles` VALUES (DEFAULT, %s, %s, NOW(), NULL);", (auth_profile_lastrowid, result_roles['id']) )
loggorilla.prcss(APIADDR, "Generate URL")
# TODO: set expired time
expired = datetime.datetime.now() + datetime.timedelta(minutes=30) # Can be hours or minutes
expired_isoformat = expired.isoformat()
payload = {
"token" : token,
"expired": expired_isoformat
}
# TODO: Config SSH key for tokenguard and set verification URL
token_encrypt = tokenguard.encode(payload, globalvar.ssh['key']['private'], globalvar.ssh['passphrase'])
verification_url = globalvar.verification_url(token_encrypt)
notme_url = globalvar.notme_url(token_encrypt)
loggorilla.prcss(APIADDR, "Sending email")
self.smtpconfig['subject' ] = f"{globalvar.title} email verification"
self.smtpconfig['to' ] = email
self.smtpconfig['text' ] = f"Please visit this link to complete the registration: {verification_url}. You are not registering this? report on this: {notme_url}."
self.smtpconfig['html' ] = Template(params["mako"]["email"]['template']).render(
title = globalvar.title,
heading = self.smtpconfig['subject'],
image = "https://colorlib.com/etc/email-template/10/images/email.png",
unsubscribe = "#",
container = Template(params["mako"]["email"]['container']).render(
verify = verification_url,
notme = notme_url
)
)
sendwave.smtp(self.smtpconfig)
loggorilla.prcss(APIADDR, "Giving response")
response["status" ] = "success"
response["desc" ] = "Register success. Check email for verification."
response["data" ] = {
"recaptcha":captcha_r
}
else:
response["status" ] = "failed"
response["desc" ] = "Form not complete."
response["data" ] = {
"recaptcha":captcha_r
}
except Exception as e:
self.cursor.execute("ROLLBACK;")
loggorilla.error(APIADDR, str(e) )
response["status" ] = "failed"
response["desc" ] = "Internal Server Error. Please contact us if you still have an error."
finally:
self.cursor.execute("COMMIT;")
self.cursor.close()
self.db_main.close()
return response
def resend(self, params):
APIADDR = "/api/auth/registration/resend"
loggorilla.prcss(APIADDR, "Define parameters")
response = {}
email = params["email"].lower()
try:
loggorilla.prcss(APIADDR, "Get dependency data")
self.cursor.execute(f"SELECT COUNT(*) AS `count`, auth_profile.token, auth_profile.email FROM auth_profile_verification INNER JOIN auth_profile ON auth_profile.id = auth_profile_verification.auth_profile WHERE auth_profile.email = %s AND auth_profile_verification.type = 'email' AND auth_profile_verification.verified = 0 ; ", (email,) )
result_unverified = self.cursor.fetchone()
token = result_unverified["token"].decode()
if result_unverified["count"] >= 1:
loggorilla.prcss(APIADDR, "Generate URL")
# TODO: set expired time
expired = datetime.datetime.now() + datetime.timedelta(minutes=30) # Can be hours or minutes
expired_isoformat = expired.isoformat()
payload = {
"token" : token,
"expired": expired_isoformat
}
# TODO: Config SSH key for tokenguard and set verification URL
token_encrypt = tokenguard.encode(payload, globalvar.ssh['key']['private'], globalvar.ssh['passphrase'])
verification_url = globalvar.verification_url(token_encrypt)
notme_url = globalvar.notme_url(token_encrypt)
loggorilla.prcss(APIADDR, "Sending email")
self.smtpconfig['subject' ] = f"{globalvar.title} email verification"
self.smtpconfig['to' ] = email
self.smtpconfig['text' ] = f"Please visit this link to complete the registration: {verification_url}. You are not registering this? report on this: {notme_url}."
self.smtpconfig['html' ] = Template(params["mako"]["email"]['template']).render(
title = globalvar.title,
heading = self.smtpconfig['subject'],
image = "https://colorlib.com/etc/email-template/10/images/email.png",
unsubscribe = "#",
container = Template(params["mako"]["email"]['container']).render(
verify = verification_url,
notme = notme_url
)
)
sendwave.smtp(self.smtpconfig)
loggorilla.prcss(APIADDR, "Giving response")
response["status" ] = "success"
response["desc" ] = "Resend success. Check email for verification."
else:
response["status" ] = "failed"
response["desc" ] = "The parameters seems suspicious and you are not authorized for that"
except Exception as e:
loggorilla.error(APIADDR, str(e) )
response["status" ] = "failed"
response["desc" ] = "Internal Server Error. Please contact us if you still have an error. for detail"
finally:
self.cursor.close()
self.db_main.close()
return response
def purge(self, params):
APIADDR = "/api/auth/purge"
response = {}
loggorilla.prcss(APIADDR, "Define parameters")
key = params["key" ]
token = params["token" ]
self.cursor.execute("BEGIN;")
try:
# TODO: set auth_key
if 'key' in params and params["key"] == globalvar.auth_key:
loggorilla.prcss(APIADDR, "Get dependency data")
self.cursor.execute("SELECT COUNT(*) AS `count`, token, id FROM auth_profile WHERE token = %s ; ", (token,) )
result_profile = self.cursor.fetchone()
loggorilla.prcss(APIADDR, "Deleting")
self.cursor.execute("DELETE FROM auth_profile_roles WHERE auth_profile = %s ; ", (result_profile['id'],) )
self.cursor.execute("DELETE FROM auth_profile_verification WHERE auth_profile = %s ; ", (result_profile['id'],) )
self.cursor.execute("DELETE FROM auth_profile WHERE id = %s ; ", (result_profile['id'],) )
self.cursor.execute("DELETE FROM auth_session WHERE token = %s ; ", (result_profile['token'],) )
self.cursor.execute("DELETE FROM auth WHERE token = %s ; ", (result_profile['token'],) )
loggorilla.prcss(APIADDR, "Giving response")
response["status" ] = "success"
response["desc" ] = "Purge success"
else:
response["status" ] = "failed"
response["desc" ] = "Forbidden"
except Exception as e:
self.cursor.execute("ROLLBACK;")
loggorilla.error(APIADDR, str(e) )
response["status" ] = "failed"
response["desc" ] = "Internal Server Error. Please contact us if you still have an error."
finally:
self.cursor.execute("COMMIT;")
self.cursor.close()
self.db_main.close()
return response
def notme(self, params):
APIADDR = "/api/auth/registration/notme"
response = {}
loggorilla.prcss(APIADDR, "Define parameters")
token_encrypt = params["token"]
self.cursor.execute("BEGIN;")
try:
loggorilla.prcss(APIADDR, "Decrypt token")
# TODO: Config SSH key for tokenguard
payload = tokenguard.decode(token_encrypt, globalvar.ssh['key']['public'])
token = payload['token']
loggorilla.prcss(APIADDR, "Get dependency data")
self.cursor.execute(f"SELECT COUNT(*) AS `count`, auth_profile_verification.verified FROM auth_profile_verification INNER JOIN auth_profile ON auth_profile.id = auth_profile_verification.auth_profile WHERE auth_profile.token = %s AND auth_profile_verification.type = 'email' ; ", (token,) )
result_verification = self.cursor.fetchone()
self.cursor.execute("SELECT COUNT(*) AS `count`, token, id, email FROM auth_profile WHERE token = %s ; ", (token,) )
result_profile = self.cursor.fetchone()
loggorilla.prcss(APIADDR, "Validation")
if result_verification['verified'] == 1:
response["status" ] = "failed"
response["desc" ] = "Your account already verified"
else:
loggorilla.prcss(APIADDR, "Deleting")
self.cursor.execute("DELETE FROM auth_profile_roles WHERE auth_profile = %s ; ", (result_profile['id'],) )
self.cursor.execute("DELETE FROM auth_profile_verification WHERE auth_profile = %s ; ", (result_profile['id'],) )
self.cursor.execute("DELETE FROM auth_profile WHERE id = %s ; ", (result_profile['id'],) )
self.cursor.execute("DELETE FROM auth_session WHERE token = %s ; ", (result_profile['token'],) )
self.cursor.execute("DELETE FROM auth WHERE token = %s ; ", (result_profile['token'],) )
loggorilla.prcss(APIADDR, "Sending email")
self.smtpconfig['subject' ] = f"{globalvar.title} - Thanks for the reporting"
self.smtpconfig['to' ] = result_profile['email']
self.smtpconfig['text' ] = "Thanks for your report. Now your data will be deleted from our system."
self.smtpconfig['html' ] = Template(params["mako"]["email"]['template']).render(
title = globalvar.title,
heading = self.smtpconfig['subject'],
image = "https://colorlib.com/etc/email-template/10/images/email.png",
unsubscribe = "#",
container = Template(params["mako"]["email"]['container']).render(
message = "Thanks for your report. Now your data will be deleted from our system."
)
)
sendwave.smtp(self.smtpconfig)
loggorilla.prcss(APIADDR, "Giving response")
response["status" ] = "success"
response["desc" ] = "Thanks for your report. Now your data will be deleted from our system."
except Exception as e:
self.cursor.execute("ROLLBACK;")
loggorilla.error(APIADDR, str(e) )
response["status" ] = "failed"
response["desc" ] = "Internal Server Error. Please contact us if you still have an error. for detail"
finally:
self.cursor.execute("COMMIT;")
self.cursor.close()
self.db_main.close()
return response
def verify(self, params):
APIADDR = "/api/auth/registration/verify"
response = {}
loggorilla.prcss(APIADDR, "Define parameters")
token_encrypt = params["token"]
self.cursor.execute("BEGIN;")
try:
loggorilla.prcss(APIADDR, "Decrypt token")
# TODO: Config SSH key for tokenguard
payload = tokenguard.decode(token_encrypt, globalvar.ssh['key']['public'])
token = payload['token']
expired = datetime.datetime.fromisoformat(payload['expired'])
loggorilla.prcss(APIADDR, "Get dependency data")
self.cursor.execute(f"SELECT COUNT(*) AS `count`, auth_profile_verification.verified FROM auth_profile_verification INNER JOIN auth_profile ON auth_profile.id = auth_profile_verification.auth_profile WHERE auth_profile.token = %s AND auth_profile_verification.type = 'email' ; ", (token,) )
result_verification = self.cursor.fetchone()
self.cursor.execute("SELECT COUNT(*) AS `count`, token, id, email FROM auth_profile WHERE token = %s ; ", (token,) )
result_profile = self.cursor.fetchone()
loggorilla.prcss(APIADDR, "Validation")
if result_verification['verified'] == 1:
loggorilla.prcss(APIADDR, "Giving response")
response["status" ] = "failed"
response["desc" ] = "Your account already verified"
elif datetime.datetime.now() > expired:
loggorilla.prcss(APIADDR, "Deleting")
self.cursor.execute("DELETE FROM auth_profile_roles WHERE auth_profile = %s ; ", (result_profile['id'],) )
self.cursor.execute("DELETE FROM auth_profile_verification WHERE auth_profile = %s ; ", (result_profile['id'],) )
self.cursor.execute("DELETE FROM auth_profile WHERE id = %s ; ", (result_profile['id'],) )
self.cursor.execute("DELETE FROM auth_session WHERE token = %s ; ", (result_profile['token'],) )
self.cursor.execute("DELETE FROM auth WHERE token = %s ; ", (result_profile['token'],) )
loggorilla.prcss(APIADDR, "Giving response")
response["status" ] = "failed"
response["desc" ] = "Expired. Your data removed."
else:
loggorilla.prcss(APIADDR, "Updating")
self.cursor.execute("UPDATE `auth_profile_verification` SET `verified` = 1 WHERE `type` = 'email' AND `auth_profile` = %s ; ", (result_profile['id'],) )
loggorilla.prcss(APIADDR, "Sending email")
loggorilla.fyinf(APIADDR, "1")
self.smtpconfig['subject' ] = f"Welcome to {globalvar.title}"
self.smtpconfig['to' ] = result_profile['email']
self.smtpconfig['text' ] = f"Welcome. Now your account is verified."
loggorilla.fyinf(APIADDR, "2")
self.smtpconfig['html' ] = Template(params["mako"]["email"]['template']).render(
title = globalvar.title,
heading = self.smtpconfig['subject'],
image = "https://colorlib.com/etc/email-template/10/images/email.png",
unsubscribe = "#",
container = Template(params["mako"]["email"]['container']).render(
message = "Welcome. Now your account is verified."
)
)
loggorilla.fyinf(APIADDR, "3")
sendwave.smtp(self.smtpconfig)
loggorilla.prcss(APIADDR, "Giving response")
response["status" ] = "success"
response["desc" ] = "Congratulation. Your account is verified."
except Exception as e:
self.cursor.execute("ROLLBACK;")
loggorilla.error(APIADDR, str(e) )
response["status" ] = "failed"
response["desc" ] = "Internal Server Error. Please contact us if you still have an error. for detail"
finally:
self.cursor.execute("COMMIT;")
self.cursor.close()
self.db_main.close()
return response
def login(self, params):
APIADDR = "/api/auth/login"
response = {}
loggorilla.prcss(APIADDR, "Define parameters")
username = params["username"].lower()
password = params["password"]
self.cursor.execute("BEGIN;")
try:
loggorilla.prcss(APIADDR, "Get dependency data")
self.cursor.execute("SELECT COUNT(*) AS `count`, auth.token, auth_profile.id, auth_profile.username, auth.password FROM auth_profile INNER JOIN auth ON auth.token = auth_profile.token WHERE auth_profile.username = %s ; ", (username,) )
result_login = self.cursor.fetchone()
self.cursor.execute("SELECT `auth_profile`, `type`, `verified` FROM auth_profile_verification WHERE `type` = 'email' AND `auth_profile` = %s ; ", (result_login['id'],) )
result_verification = self.cursor.fetchone()
loggorilla.prcss(APIADDR, "Validation")
if result_login['count'] == 1 and result_verification['verified'] == 1 and bcrypt.checkpw(password.encode(), result_login['password'].decode().encode() ) :
loggorilla.prcss(APIADDR, "Add session")
self.cursor.execute(f"INSERT INTO `auth_session` VALUES (DEFAULT, %s, NOW(), NOW() + INTERVAL 30 DAY, NOW(), NULL)", ( result_login['token'], ) )
session_last_id = self.cursor.lastrowid
self.cursor.execute(f"SELECT `id`, `start`, `end` FROM `auth_session` WHERE id = %s ; ", ( session_last_id, ) )
session = self.cursor.fetchone()
loggorilla.prcss(APIADDR, "Generate JWT token")
payload = {
"session" : {
"id" : session['id' ],
"start" : session['start' ].isoformat(),
"end" : session['end' ].isoformat()
}
}
jwt_token = tokenguard.encode(payload, globalvar.ssh['key']['private'], globalvar.ssh['passphrase'])
loggorilla.prcss(APIADDR, "Giving response")
response["status" ] = "success"
response["desc" ] = "Login success. Session added."
response["data" ] = {
"jwt" : jwt_token,
"username" : username
}
else:
response["status" ] = "failed"
response["desc" ] = "Username or password is incorrect"
except Exception as e:
self.cursor.execute("ROLLBACK;")
loggorilla.error(APIADDR, str(e) )
response["status" ] = "failed"
response["desc" ] = "Internal Server Error. Please contact us if you still have an error. for detail"
finally:
self.cursor.execute("COMMIT;")
self.cursor.close()
self.db_main.close()
return response
# Special API for Bottle/Beaker web session
def session(self, params):
APIADDR = "/api/auth/session/:type"
loggorilla.prcss(APIADDR, "Define parameters")
response = {}
type = params["type" ] # set / check / out
jwt = params["jwt" ]
payload = tokenguard.decode(jwt, globalvar.ssh['key']['public'])
session_id = payload["session"]["id"]
try:
session_beaker = request.environ.get('beaker.session')
if type == 'set':
loggorilla.prcss(APIADDR, "Set session")
session_beaker["token"] = jwt
session_beaker.save()
response["status" ] = "success"
response["desc" ] = "Session set"
elif type == 'check':
loggorilla.prcss(APIADDR, "Check session")
self.cursor.execute(f"SELECT COUNT(*) AS `count` FROM auth_session WHERE id = %s ; ", (session_id,) )
result_session = self.cursor.fetchone()
if result_session == 0:
session_beaker.delete()
response["status" ] = "success"
response["desc" ] = "session out"
response["data" ] = {
"status":"lost"
}
else:
response["status" ] = "success"
response["desc" ] = "session active"
response["data" ] = {
"status":"active"
}
elif type == 'out':
loggorilla.prcss(APIADDR, "Out session")
session_beaker.delete()
response["status" ] = "success"
response["desc" ] = "Session out"
else:
response["status" ] = "failed"
response["desc" ] = "False parameters"
except Exception as e:
loggorilla.error(APIADDR, str(e) )
response["status" ] = "failed"
response["desc" ] = "Internal Server Error. Please contact us if you still have an error. for detail"
finally:
self.cursor.close()
self.db_main.close()
return response
def forgot(self, params):
APIADDR = "/api/auth/password/forgot/:type"
loggorilla.prcss(APIADDR, "Define parameters")
response = {}
type = params["type" ] # POST: send / change
self.cursor.execute("BEGIN;")
try:
if type == "send":
loggorilla.prcss(APIADDR, "Define parameters inside decision")
email = params["email"].lower()
loggorilla.prcss(APIADDR, "Get dependency data")
self.cursor.execute(f"SELECT COUNT(*) AS `count`, auth_profile.token, auth_profile.email FROM auth_profile_verification INNER JOIN auth_profile ON auth_profile.id = auth_profile_verification.auth_profile WHERE auth_profile.email = %s AND auth_profile_verification.type = 'email' AND auth_profile_verification.verified = 1 ; ", (email,) )
result_verified = self.cursor.fetchone()
token = result_verified["token"].decode()
if result_verified["count"] >= 1:
loggorilla.prcss(APIADDR, "Generate URL")
# TODO: set expired time
expired = datetime.datetime.now() + datetime.timedelta(minutes=30) # Can be hours or minutes
expired_isoformat = expired.isoformat()
payload = {
"token" : token,
"expired": expired_isoformat
}
# TODO: Config SSH key for tokenguard and set forgot URL
token_encrypt = tokenguard.encode(payload, globalvar.ssh['key']['private'], globalvar.ssh['passphrase'])
change_forgot_url = globalvar.change_forgot_url(token_encrypt)
loggorilla.prcss(APIADDR, "Sending email")
self.smtpconfig['subject' ] = f"{globalvar.title} forgot password"
self.smtpconfig['to' ] = email
self.smtpconfig['text' ] = f"Please visit this link to change password: {change_forgot_url}. Avoid the link if you are not request this."
self.smtpconfig['html' ] = Template(params["mako"]["email"]['template']).render(
title = globalvar.title,
heading = self.smtpconfig['subject'],
image = "https://colorlib.com/etc/email-template/10/images/email.png",
unsubscribe = "#",
container = Template(params["mako"]["email"]['container']).render(
change = change_forgot_url
)
)
sendwave.smtp(self.smtpconfig)
loggorilla.prcss(APIADDR, "Giving response")
response["status" ] = "success"
response["desc" ] = "Check email for password change."
else:
response["status" ] = "failed"
response["desc" ] = "The parameters seems suspicious and you are not authorized for that"
elif type == "change":
loggorilla.prcss(APIADDR, "Define parameters inside decision")
token_encrypt = params["token" ]
password = params["password" ]
loggorilla.prcss(APIADDR, "Decrypt token")
payload = tokenguard.decode(token_encrypt, globalvar.ssh['key']['public'])
token = payload['token']
expired = datetime.datetime.fromisoformat(payload['expired'])
loggorilla.prcss(APIADDR, "Process parameters")
hashed = bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode()
loggorilla.prcss(APIADDR, "Get dependency data")
self.cursor.execute(f"SELECT COUNT(*) AS `count`, auth_profile.email FROM auth_profile_verification INNER JOIN auth_profile ON auth_profile.id = auth_profile_verification.auth_profile WHERE auth_profile.token = %s AND auth_profile_verification.type = 'email' AND auth_profile_verification.verified = 1 ; ", (token,) )
result_verified = self.cursor.fetchone()
email = result_verified['email']
loggorilla.prcss(APIADDR, "Validation")
if datetime.datetime.now() > expired:
response["status" ] = "failed"
response["desc" ] = "Expired"
elif len(password) < 6:
response["status" ] = "failed"
response["desc" ] = "password too short"
elif result_verified["count"] == 0:
response["status" ] = "failed"
response["desc" ] = "Forbidden: No active account for this"
response["data" ] = {
"message": "Please contact us if you still had a problem"
}
else:
loggorilla.prcss(APIADDR, "Updating")
self.cursor.execute("UPDATE `auth` SET `password` = %s, `when_update` = NOW() WHERE `token` = %s", (hashed, token) )
loggorilla.prcss(APIADDR, "Sending email")
self.smtpconfig['subject' ] = f"{globalvar.title} password change success"
self.smtpconfig['to' ] = email
self.smtpconfig['text' ] = f"You had change your password."
self.smtpconfig['html' ] = Template(params["mako"]["email"]['template']).render(
title = globalvar.title,
heading = self.smtpconfig['subject'],
image = "https://colorlib.com/etc/email-template/10/images/email.png",
unsubscribe = "#",
container = Template(params["mako"]["email"]['container']).render(
message = f"You had change your password."
)
)
sendwave.smtp(self.smtpconfig)
loggorilla.prcss(APIADDR, "Giving response")
response["status" ] = "success"
response["desc" ] = "password change success"
else:
response["status" ] = "failed"
response["desc" ] = "forbidden"
except Exception as e:
self.cursor.execute("ROLLBACK;")
loggorilla.error(APIADDR, str(e) )
response["status" ] = "failed"
response["desc" ] = "Internal Server Error. Please contact us if you still have an error. for detail"
finally:
self.cursor.execute("COMMIT;")
self.cursor.close()
self.db_main.close()
return response
def change(self, params):
APIADDR = "/api/auth/password/change"
loggorilla.prcss(APIADDR, "Define parameters")
response = {}
jwt = params["jwt" ]
old = params["old" ]
new = params["new" ]
hashed = bcrypt.hashpw(new.encode(), bcrypt.gensalt()).decode()
payload = tokenguard.decode(jwt, globalvar.ssh['key']['public'])
session_id = payload["session"]["id"]
self.cursor.execute("BEGIN;")
try:
loggorilla.prcss(APIADDR, "Get dependency data")
self.cursor.execute(f"SELECT * FROM auth_session WHERE id = %s ; ", (session_id,) )
result_session = self.cursor.fetchone()
token = result_session['token' ]
expired = result_session['end' ]
self.cursor.execute(f"SELECT COUNT(*) AS `count`, auth_profile.id, auth_profile.email FROM auth_profile_verification INNER JOIN auth_profile ON auth_profile.id = auth_profile_verification.auth_profile WHERE auth_profile.token = %s AND auth_profile_verification.type = 'email' AND auth_profile_verification.verified = 1 ; ", (token,) )
result_verified = self.cursor.fetchone()
profile = result_verified['id' ]
email = result_verified['email' ]
self.cursor.execute(f"SELECT auth_roles FROM auth_profile_roles WHERE auth_profile = %s ; ", (profile,) )
result_roles = self.cursor.fetchall()
roles = []
for row in result_roles:
roles.append(row['auth_roles'])
loggorilla.prcss(APIADDR, "Validation")
if datetime.datetime.now() > expired:
loggorilla.prcss(APIADDR, "Deleting")
self.cursor.execute("DELETE FROM auth_session WHERE id = %s ; ", (session_id,) )
loggorilla.prcss(APIADDR, "Giving response")
response["status" ] = "failed"
response["desc" ] = "Expired. Your session removed."
elif len(new) < 6:
response["status" ] = "failed"
response["desc" ] = "password too short"
elif result_verified["count"] == 0:
response["status" ] = "failed"
response["desc" ] = "Forbidden: No active account for this"
response["data" ] = {
"message": "Please contact us if you still had a problem"
}
else:
loggorilla.prcss(APIADDR, "Updating")
self.cursor.execute("UPDATE `auth` SET `password` = %s, `when_update` = NOW() WHERE `token` = %s", (hashed, token) )
loggorilla.prcss(APIADDR, "Sending email")
self.smtpconfig['subject' ] = f"{globalvar.title} password change success"
self.smtpconfig['to' ] = email
self.smtpconfig['text' ] = f"You had change your password."
self.smtpconfig['html' ] = Template(params["mako"]["email"]['template']).render(
title = globalvar.title,
heading = self.smtpconfig['subject'],
image = "https://colorlib.com/etc/email-template/10/images/email.png",
unsubscribe = "#",
container = Template(params["mako"]["email"]['container']).render(
message = f"You had change your password."
)
)
sendwave.smtp(self.smtpconfig)
loggorilla.prcss(APIADDR, "Giving response")
response["status" ] = "success"
response["desc" ] = "password change success"
except Exception as e:
self.cursor.execute("ROLLBACK;")
loggorilla.error(APIADDR, str(e) )
response["status" ] = "failed"
response["desc" ] = "Internal Server Error. Please contact us if you still have an error. for detail"
finally:
self.cursor.execute("COMMIT;")
self.cursor.close()
self.db_main.close()
return response
def logout(self, params):
APIADDR = "/logout"
loggorilla.prcss(APIADDR, "Define parameters")
response = {}
jwt = params["jwt" ]
payload = tokenguard.decode(jwt, globalvar.ssh['key']['public'])
session_id = payload["session"]["id"]
self.cursor.execute("BEGIN;")
try:
loggorilla.prcss(APIADDR, "Deleting")
self.cursor.execute("DELETE FROM auth_session WHERE id = %s ; ", (session_id,) )
loggorilla.prcss(APIADDR, "Giving response")
response["status" ] = "success"
response["desc" ] = f"Your session ({session_id}) removed."
except Exception as e:
self.cursor.execute("ROLLBACK;")
loggorilla.error(APIADDR, str(e) )
response["status" ] = "failed"
response["desc" ] = "Internal Server Error. Please contact us if you still have an error. for detail"
finally:
self.cursor.execute("COMMIT;")
self.cursor.close()
self.db_main.close()
return response