Procedure for a validation and webmail

This commit is contained in:
Dita Aji Pratama 2025-01-20 13:50:15 +07:00
parent b39f91ad6c
commit 32d2fa6059
2 changed files with 249 additions and 0 deletions

226
procedure/validation.py Normal file
View File

@ -0,0 +1,226 @@
import mysql.connector as mariadb
import datetime
import re
from bottle import request, abort, redirect
from config import database, globalvar
from scripts import loggorilla, tokenguard
class validation():
def __init__(self):
self.db_main = mariadb.connect(**database.db_main)
self.cursor = self.db_main.cursor(dictionary=True)
def register(self, APIADDR, captcha, score, roles, username, password, email):
response={}
try:
loggorilla.prcss(APIADDR, "Get the data for checking")
self.cursor.execute("SELECT COUNT(*) AS `count` FROM auth_profile WHERE email = %s ; ", (email,) )
result_profile = self.cursor.fetchone()
self.cursor.execute("SELECT COUNT(*) AS `count` FROM auth_profile WHERE username = %s ; ", (username,) )
result_username = self.cursor.fetchone()
self.cursor.execute(f"SELECT COUNT(*) AS `count` FROM auth_profile_verification INNER JOIN auth_profile ON auth_profile.id = auth_profile_verification.auth_profile WHERE auth_profile.email = %s AND auth_profile_verification.type = 'email' AND auth_profile_verification.verified = 0 ; ", (email,) )
result_unverified = self.cursor.fetchone()
self.cursor.execute(f"SELECT id, name FROM `auth_roles` WHERE auth_roles.name = %s ; ", (roles,) )
result_roles = self.cursor.fetchone()
loggorilla.prcss(APIADDR, "Validating")
if score < 0.2:
response["status" ] = "failed"
response["desc" ] = "you are sus as a bot"
elif roles in globalvar.forbidden_registration:
response["status" ] = "failed"
response["desc" ] = f"Forbidden to become {roles}"
elif not re.match(r'^\w+$', username):
response["status" ] = "failed"
response["desc" ] = "username can only use letters, numbers, and the underscore symbol"
elif len(username) > 35:
response["status" ] = "failed"
response["desc" ] = "username can not longer than 35 character"
elif len(username) < 3:
response["status" ] = "failed"
response["desc" ] = "username too short"
elif len(password) < 6:
response["status" ] = "failed"
response["desc" ] = "password too short"
elif result_unverified["count"] >= 1:
response["status" ] = "failed"
response["desc" ] = "check email for verification"
response["data" ] = {
"message": "Check email for verification. Please contact us if you still had a problem",
"resend": globalvar.resend_url(email)
}
elif result_profile["count"] >= 1:
response["status" ] = "failed"
response["desc" ] = "email already taken"
elif result_username["count"] >= 1:
response["status" ] = "failed"
response["desc" ] = "username already taken"
elif not (captcha and username and email and password):
response["status" ] = "failed"
response["desc" ] = "Form not complete."
else:
response["status" ] = "valid"
response["desc" ] = "You can continue your register process"
loggorilla.accss(APIADDR, f"Status : {response['status']}")
loggorilla.accss(APIADDR, f"Description : {response['desc' ]}")
except Exception as e:
loggorilla.error(APIADDR, str(e) )
response["status" ] = "failed"
response["desc" ] = "Internal Server Error. Please contact us if you still have an error."
finally:
self.cursor.close()
self.db_main.close()
return response
def account(self, APIADDR, allowed_roles, jwt=None):
response = {}
loggorilla.prcss(APIADDR, "Get jwt")
if jwt is None:
loggorilla.fyinf(APIADDR, "jwt params is empty: Use beaker session")
for_api = False
beaker_session = request.environ.get('beaker.session')
jwt = beaker_session["token"] if "token" in beaker_session else None
else:
loggorilla.fyinf(APIADDR, "jwt params is available: Use jwt from params")
for_api = True
loggorilla.prcss(APIADDR, "Define parameters")
if jwt is None:
loggorilla.fyinf(APIADDR, "Guest")
r_session = {}
r_profile = {
"username" :None,
"email" :None,
"phone" :None,
"roles" :[0]
}
session_not_found = False
else:
loggorilla.fyinf(APIADDR, "With JWT")
loggorilla.prcss(APIADDR, "Get JWT payload data")
payload = tokenguard.decode(jwt, globalvar.ssh['key']['public'])
loggorilla.prcss(APIADDR, "Connect DB")
db_main = mariadb.connect(**database.db_main)
cursor = db_main.cursor(dictionary=True)
loggorilla.prcss(APIADDR, "Get dependency data")
cursor.execute(f"SELECT * FROM auth_session WHERE id = %s ; ", (payload["session"]["id"],) )
r_session = cursor.fetchone()
if r_session is None:
session_not_found = True
r_session = {}
r_profile = {
"username" :None,
"email" :None,
"phone" :None,
"roles" :[0]
}
else:
session_not_found = False
cursor.execute(f"SELECT COUNT(*) AS `count`, auth_profile.* FROM auth_profile_verification LEFT JOIN auth_profile ON auth_profile.id = auth_profile_verification.auth_profile WHERE auth_profile.token = %s AND auth_profile_verification.type = 'email' AND auth_profile_verification.verified = 1 ; ", (r_session['token'],) )
r_profile = cursor.fetchone()
cursor.execute(f"SELECT auth_roles FROM auth_profile_roles WHERE auth_profile = %s ; ", (r_profile['id'],) )
r_roles = cursor.fetchall()
r_profile['roles'] = [item['auth_roles'] for item in r_roles]
loggorilla.prcss(APIADDR, "Close DB")
cursor.close()
db_main.close()
loggorilla.accss(APIADDR, f"Session ID : {r_session['id' ] if 'id' in r_session else None}" )
loggorilla.accss(APIADDR, f"Session Start : {r_session['start' ].strftime('%Y-%m-%d %H:%M:%S') if 'start' in r_session else None}" )
loggorilla.accss(APIADDR, f"Session End : {r_session['end' ].strftime('%Y-%m-%d %H:%M:%S') if 'end' in r_session else None}" )
loggorilla.accss(APIADDR, f"Profile ID : {r_profile['id' ] if 'id' in r_profile else None}" )
loggorilla.accss(APIADDR, f"Profile Username : {r_profile['username' ] if 'username' in r_profile else None}" )
loggorilla.accss(APIADDR, f"Profile Email : {r_profile['email' ] if 'email' in r_profile else None}" )
loggorilla.accss(APIADDR, f"Profile Phone : {r_profile['phone' ] if 'phone' in r_profile else None}" )
loggorilla.accss(APIADDR, f"Profile Roles : {r_profile['roles' ] if 'roles' in r_profile else None}" )
loggorilla.prcss(APIADDR, "Validation")
if session_not_found:
loggorilla.accss(APIADDR, "Session not found" )
loggorilla.prcss(APIADDR, "Giving response")
response["status" ] = "failed"
response["desc" ] = "Your session not found."
response["data" ] = {
"token" : jwt,
"valid" :{
"status" : 0,
"desc" : "removed"
},
"session" : r_session,
"profile" : r_profile
}
if for_api is True:
abort(401, "Session not found")
else:
redirect('/logout?msg=removed')
elif 0 not in r_profile['roles'] and datetime.datetime.now() > r_session['end']:
loggorilla.accss(APIADDR, "Session expired" )
loggorilla.prcss(APIADDR, "Deleting session")
self.cursor.execute("DELETE FROM auth_session WHERE id = %s ; ", (r_session['id'],) )
loggorilla.prcss(APIADDR, "Giving response")
response["status" ] = "failed"
response["desc" ] = "Expired. Your session removed."
response["data" ] = {
"token" : jwt,
"valid" :{
"status" : 0,
"desc" : "expired"
},
"session" : r_session,
"profile" : r_profile
}
if for_api is True:
abort(401, "Session expired")
else:
redirect('/logout?msg=expired')
elif 0 not in r_profile['roles'] and r_profile["count"] == 0:
loggorilla.accss(APIADDR, "No active account for this" )
loggorilla.prcss(APIADDR, "Giving response")
response["status" ] = "failed"
response["desc" ] = "No active account for this"
response["data" ] = {
"token" : jwt,
"message" : "Please contact us if you still had a problem",
"valid" :{
"status" : 0,
"desc" : "fake"
},
"session" : r_session,
"profile" : r_profile
}
abort(403, "Please contact us if you still had a problem.") # 403 Forbidden
elif any(role in allowed_roles for role in r_profile['roles']):
loggorilla.accss(APIADDR, "User roles authorized" )
loggorilla.prcss(APIADDR, "Giving response")
response["status" ] = "success"
response["desc" ] = "User roles authorized"
response["data" ] = {
"token" : str(jwt),
"valid" :{
"status" : 1,
"desc" : "authorized"
},
"session" : r_session,
"profile" : r_profile
}
return response
else:
loggorilla.accss(APIADDR, "User roles unauthorized" )
loggorilla.prcss(APIADDR, "Giving response")
response["status" ] = "failed"
response["desc" ] = "User roles unauthorized"
response["data" ] = {
"token" : jwt,
"valid" :{
"status" : 0,
"desc" : "unauthorized"
},
"session" : r_session,
"profile" : r_profile
}
abort(401, "User roles unauthorized") # 401 Unauthorized

23
procedure/webmail.py Normal file
View File

@ -0,0 +1,23 @@
from mako.template import Template
from config import globalvar
class webmail():
def __init__(self):
pass
def verification(self, APIADDR, params, data):
return {
"subject" : f"{globalvar.title} email verification",
"text" : f"Please visit this link to complete the registration: {data['verify']}. You are not registering this? report on this: {data['notme'].",
"html" : Template(params["mako"]["email"]['index']).render(
title = globalvar.title,
header = globalvar.title,
copyright = globalvar.copyright,
container = Template(params["mako"]["email"]['container']).render(
header = "One more step to complete your registration!",
verify = data['verify' ],
notme = data['notme' ]
)
)
}