2025-01-20 13:50:15 +07:00
import mysql . connector as mariadb
import datetime
import re
from bottle import request , abort , redirect
from config import database , globalvar
from scripts import loggorilla , tokenguard
class validation ( ) :
def __init__ ( self ) :
self . db_main = mariadb . connect ( * * database . db_main )
self . cursor = self . db_main . cursor ( dictionary = True )
def register ( self , APIADDR , captcha , score , roles , username , password , email ) :
response = { }
try :
loggorilla . prcss ( APIADDR , " Get the data for checking " )
self . cursor . execute ( " SELECT COUNT(*) AS `count` FROM auth_profile WHERE email = %s ; " , ( email , ) )
result_profile = self . cursor . fetchone ( )
self . cursor . execute ( " SELECT COUNT(*) AS `count` FROM auth_profile WHERE username = %s ; " , ( username , ) )
result_username = self . cursor . fetchone ( )
2025-01-20 17:54:23 +07:00
self . cursor . execute ( f " SELECT COUNT(*) AS `count` FROM auth_profile_verification INNER JOIN auth_profile ON auth_profile.id = auth_profile_verification.profile WHERE auth_profile.email = %s AND auth_profile_verification.type = ' email ' AND auth_profile_verification.verified = 0 ; " , ( email , ) )
2025-01-20 13:50:15 +07:00
result_unverified = self . cursor . fetchone ( )
loggorilla . prcss ( APIADDR , " Validating " )
if score < 0.2 :
response [ " status " ] = " failed "
response [ " desc " ] = " you are sus as a bot "
elif roles in globalvar . forbidden_registration :
response [ " status " ] = " failed "
response [ " desc " ] = f " Forbidden to become { roles } "
elif not re . match ( r ' ^ \ w+$ ' , username ) :
response [ " status " ] = " failed "
response [ " desc " ] = " username can only use letters, numbers, and the underscore symbol "
elif len ( username ) > 35 :
response [ " status " ] = " failed "
response [ " desc " ] = " username can not longer than 35 character "
elif len ( username ) < 3 :
response [ " status " ] = " failed "
response [ " desc " ] = " username too short "
elif len ( password ) < 6 :
response [ " status " ] = " failed "
response [ " desc " ] = " password too short "
elif result_unverified [ " count " ] > = 1 :
response [ " status " ] = " failed "
response [ " desc " ] = " check email for verification "
response [ " data " ] = {
" message " : " Check email for verification. Please contact us if you still had a problem " ,
" resend " : globalvar . resend_url ( email )
}
elif result_profile [ " count " ] > = 1 :
response [ " status " ] = " failed "
response [ " desc " ] = " email already taken "
elif result_username [ " count " ] > = 1 :
response [ " status " ] = " failed "
response [ " desc " ] = " username already taken "
elif not ( captcha and username and email and password ) :
response [ " status " ] = " failed "
response [ " desc " ] = " Form not complete. "
else :
response [ " status " ] = " valid "
response [ " desc " ] = " You can continue your register process "
loggorilla . accss ( APIADDR , f " Status : { response [ ' status ' ] } " )
loggorilla . accss ( APIADDR , f " Description : { response [ ' desc ' ] } " )
except Exception as e :
loggorilla . error ( APIADDR , str ( e ) )
response [ " status " ] = " failed "
response [ " desc " ] = " Internal Server Error. Please contact us if you still have an error. "
finally :
self . cursor . close ( )
self . db_main . close ( )
return response
def account ( self , APIADDR , allowed_roles , jwt = None ) :
response = { }
loggorilla . prcss ( APIADDR , " Get jwt " )
if jwt is None :
loggorilla . fyinf ( APIADDR , " jwt params is empty: Use beaker session " )
for_api = False
beaker_session = request . environ . get ( ' beaker.session ' )
jwt = beaker_session [ " token " ] if " token " in beaker_session else None
else :
loggorilla . fyinf ( APIADDR , " jwt params is available: Use jwt from params " )
for_api = True
loggorilla . prcss ( APIADDR , " Define parameters " )
if jwt is None :
loggorilla . fyinf ( APIADDR , " Guest " )
r_session = { }
r_profile = {
" username " : None ,
" email " : None ,
" phone " : None ,
" roles " : [ 0 ]
}
session_not_found = False
else :
loggorilla . fyinf ( APIADDR , " With JWT " )
loggorilla . prcss ( APIADDR , " Get JWT payload data " )
payload = tokenguard . decode ( jwt , globalvar . ssh [ ' key ' ] [ ' public ' ] )
loggorilla . prcss ( APIADDR , " Connect DB " )
db_main = mariadb . connect ( * * database . db_main )
cursor = db_main . cursor ( dictionary = True )
loggorilla . prcss ( APIADDR , " Get dependency data " )
cursor . execute ( f " SELECT * FROM auth_session WHERE id = %s ; " , ( payload [ " session " ] [ " id " ] , ) )
r_session = cursor . fetchone ( )
if r_session is None :
session_not_found = True
r_session = { }
r_profile = {
" username " : None ,
" email " : None ,
" phone " : None ,
" roles " : [ 0 ]
}
else :
session_not_found = False
2025-01-20 17:54:23 +07:00
cursor . execute ( f " SELECT COUNT(*) AS `count`, auth_profile.* FROM auth_profile_verification LEFT JOIN auth_profile ON auth_profile.id = auth_profile_verification.profile WHERE auth_profile.token = %s AND auth_profile_verification.type = ' email ' AND auth_profile_verification.verified = 1 ; " , ( r_session [ ' token ' ] , ) )
2025-01-20 13:50:15 +07:00
r_profile = cursor . fetchone ( )
cursor . execute ( f " SELECT auth_roles FROM auth_profile_roles WHERE auth_profile = %s ; " , ( r_profile [ ' id ' ] , ) )
r_roles = cursor . fetchall ( )
r_profile [ ' roles ' ] = [ item [ ' auth_roles ' ] for item in r_roles ]
loggorilla . prcss ( APIADDR , " Close DB " )
cursor . close ( )
db_main . close ( )
loggorilla . accss ( APIADDR , f " Session ID : { r_session [ ' id ' ] if ' id ' in r_session else None } " )
loggorilla . accss ( APIADDR , f " Session Start : { r_session [ ' start ' ] . strftime ( ' % Y- % m- %d % H: % M: % S ' ) if ' start ' in r_session else None } " )
loggorilla . accss ( APIADDR , f " Session End : { r_session [ ' end ' ] . strftime ( ' % Y- % m- %d % H: % M: % S ' ) if ' end ' in r_session else None } " )
loggorilla . accss ( APIADDR , f " Profile ID : { r_profile [ ' id ' ] if ' id ' in r_profile else None } " )
loggorilla . accss ( APIADDR , f " Profile Username : { r_profile [ ' username ' ] if ' username ' in r_profile else None } " )
loggorilla . accss ( APIADDR , f " Profile Email : { r_profile [ ' email ' ] if ' email ' in r_profile else None } " )
loggorilla . accss ( APIADDR , f " Profile Phone : { r_profile [ ' phone ' ] if ' phone ' in r_profile else None } " )
loggorilla . accss ( APIADDR , f " Profile Roles : { r_profile [ ' roles ' ] if ' roles ' in r_profile else None } " )
loggorilla . prcss ( APIADDR , " Validation " )
if session_not_found :
loggorilla . accss ( APIADDR , " Session not found " )
loggorilla . prcss ( APIADDR , " Giving response " )
response [ " status " ] = " failed "
response [ " desc " ] = " Your session not found. "
response [ " data " ] = {
" token " : jwt ,
" valid " : {
" status " : 0 ,
" desc " : " removed "
} ,
" session " : r_session ,
" profile " : r_profile
}
if for_api is True :
abort ( 401 , " Session not found " )
else :
redirect ( ' /logout?msg=removed ' )
elif 0 not in r_profile [ ' roles ' ] and datetime . datetime . now ( ) > r_session [ ' end ' ] :
loggorilla . accss ( APIADDR , " Session expired " )
loggorilla . prcss ( APIADDR , " Deleting session " )
self . cursor . execute ( " DELETE FROM auth_session WHERE id = %s ; " , ( r_session [ ' id ' ] , ) )
loggorilla . prcss ( APIADDR , " Giving response " )
response [ " status " ] = " failed "
response [ " desc " ] = " Expired. Your session removed. "
response [ " data " ] = {
" token " : jwt ,
" valid " : {
" status " : 0 ,
" desc " : " expired "
} ,
" session " : r_session ,
" profile " : r_profile
}
if for_api is True :
abort ( 401 , " Session expired " )
else :
redirect ( ' /logout?msg=expired ' )
elif 0 not in r_profile [ ' roles ' ] and r_profile [ " count " ] == 0 :
loggorilla . accss ( APIADDR , " No active account for this " )
loggorilla . prcss ( APIADDR , " Giving response " )
response [ " status " ] = " failed "
response [ " desc " ] = " No active account for this "
response [ " data " ] = {
" token " : jwt ,
" message " : " Please contact us if you still had a problem " ,
" valid " : {
" status " : 0 ,
" desc " : " fake "
} ,
" session " : r_session ,
" profile " : r_profile
}
abort ( 403 , " Please contact us if you still had a problem. " ) # 403 Forbidden
elif any ( role in allowed_roles for role in r_profile [ ' roles ' ] ) :
loggorilla . accss ( APIADDR , " User roles authorized " )
loggorilla . prcss ( APIADDR , " Giving response " )
response [ " status " ] = " success "
response [ " desc " ] = " User roles authorized "
response [ " data " ] = {
" token " : str ( jwt ) ,
" valid " : {
" status " : 1 ,
" desc " : " authorized "
} ,
" session " : r_session ,
" profile " : r_profile
}
return response
else :
loggorilla . accss ( APIADDR , " User roles unauthorized " )
loggorilla . prcss ( APIADDR , " Giving response " )
response [ " status " ] = " failed "
response [ " desc " ] = " User roles unauthorized "
response [ " data " ] = {
" token " : jwt ,
" valid " : {
" status " : 0 ,
" desc " : " unauthorized "
} ,
" session " : r_session ,
" profile " : r_profile
}
abort ( 401 , " User roles unauthorized " ) # 401 Unauthorized