Loading app/controller/auth.py +14 −0 Original line number Diff line number Diff line import datetime from flask import Blueprint, request, jsonify from helpers.security import check_auth from model.security import revoke_jwt from model.user.user import updateLastLogin, getUserByLogin, updateUser from helpers.limiter import limiter from flask_jwt_extended import ( create_access_token, Loading @@ -27,6 +29,18 @@ def login(): ): return jsonify({"error": "Invalid credentials"}), 401 updateLastLogin(username) user = getUserByLogin(username) if(user.get('force_jwt_reconnect')): updateUser(id=user['id'], userLogin=None, password=None, is_active=None, force_jwt_reconnect=False, last_login=None ) # Create tokens # access_token = create_access_token(identity={"username": username, "role": user["role"]}) # refresh_token = create_refresh_token(identity={"username": username, "role": user["role"]}) Loading app/helpers/security.py +45 −2 Original line number Diff line number Diff line from flask import request, jsonify from model.security import check_api_key, check_token, check_auth from flask.cli import F from model.security import check_api_key, check_token, check_auth, revoke_jwt from model.user.user import updateLastLogin, getUserByLogin import datetime from flask_jwt_extended import ( get_jwt_identity, get_jwt, jwt_required, verify_jwt_in_request ) from jwt.exceptions import DecodeError def require_auth(f): Loading @@ -13,11 +23,44 @@ def require_auth(f): print("check access by API KEY") successAuth = check_api_key(key) elif auth_header and auth_header.startswith("Bearer "): print("check access by Bearer Token") print("check access by regular Bearer Token") successAuth = check_token(auth_header) if(not successAuth): print("check access by JWT Bearer Token") try: if(verify_jwt_in_request(optional=True)): jwtIdentity = get_jwt_identity() # This gets the 'sub' claim by default print(f"JWT is valid, username: {jwtIdentity.get('username')}") updateLastLogin(jwtIdentity.get('username')) successAuth = True user = getUserByLogin(jwtIdentity.get('username')) if(user.get('force_jwt_reconnect')): print("Revoking JWT due to force_jwt_reconnect") jwt = get_jwt() revoke_jwt(jwt["jti"]) # Revoke the JWT try: successAuth = verify_jwt_in_request() # Re-verify the JWT after revocation to trigger error except Exception as e: raise except DecodeError as e: print("JWT decode error or not authorized regular token") pass except Exception as e: print("JWT is invalid") print(type(e).__name__) print(type(e)) raise elif auth and auth.username and auth.password: print("check access by Basic Auth") successAuth = check_auth(auth.username, auth.password) updateLastLogin(auth.username) if successAuth: return f(*args, **kwargs) Loading app/model/security.py +17 −16 Original line number Diff line number Diff line Loading @@ -31,22 +31,23 @@ def check_api_key(key: str) -> bool: def check_token(auth_header: str) -> bool: token = auth_header.split(" ")[1] # Extract token query = """SELECT 1 FROM api_key WHERE api_key = %s AND is_valid = TRUE AND valid_until >= NOW() LIMIT 1;""" try: with db_cursor() as (conn, cur): cur.execute(query, (token,)) return ( cur.fetchone() is not None ) # Returns True if key exists, False otherwise except Exception as e: current_app.logger.error(f"Database error while checking API key: {e}") return False # Return False in case of any DB error return check_api_key(token) # Reuse the same function to check the token # query = """SELECT 1 # FROM api_key # WHERE api_key = %s # AND is_valid = TRUE # AND valid_until >= NOW() # LIMIT 1;""" # try: # with db_cursor() as (conn, cur): # cur.execute(query, (token,)) # return ( # cur.fetchone() is not None # ) # Returns True if key exists, False otherwise # except Exception as e: # current_app.logger.error(f"Database error while checking API key: {e}") # return False # Return False in case of any DB error def check_auth(username, password): Loading Loading
app/controller/auth.py +14 −0 Original line number Diff line number Diff line import datetime from flask import Blueprint, request, jsonify from helpers.security import check_auth from model.security import revoke_jwt from model.user.user import updateLastLogin, getUserByLogin, updateUser from helpers.limiter import limiter from flask_jwt_extended import ( create_access_token, Loading @@ -27,6 +29,18 @@ def login(): ): return jsonify({"error": "Invalid credentials"}), 401 updateLastLogin(username) user = getUserByLogin(username) if(user.get('force_jwt_reconnect')): updateUser(id=user['id'], userLogin=None, password=None, is_active=None, force_jwt_reconnect=False, last_login=None ) # Create tokens # access_token = create_access_token(identity={"username": username, "role": user["role"]}) # refresh_token = create_refresh_token(identity={"username": username, "role": user["role"]}) Loading
app/helpers/security.py +45 −2 Original line number Diff line number Diff line from flask import request, jsonify from model.security import check_api_key, check_token, check_auth from flask.cli import F from model.security import check_api_key, check_token, check_auth, revoke_jwt from model.user.user import updateLastLogin, getUserByLogin import datetime from flask_jwt_extended import ( get_jwt_identity, get_jwt, jwt_required, verify_jwt_in_request ) from jwt.exceptions import DecodeError def require_auth(f): Loading @@ -13,11 +23,44 @@ def require_auth(f): print("check access by API KEY") successAuth = check_api_key(key) elif auth_header and auth_header.startswith("Bearer "): print("check access by Bearer Token") print("check access by regular Bearer Token") successAuth = check_token(auth_header) if(not successAuth): print("check access by JWT Bearer Token") try: if(verify_jwt_in_request(optional=True)): jwtIdentity = get_jwt_identity() # This gets the 'sub' claim by default print(f"JWT is valid, username: {jwtIdentity.get('username')}") updateLastLogin(jwtIdentity.get('username')) successAuth = True user = getUserByLogin(jwtIdentity.get('username')) if(user.get('force_jwt_reconnect')): print("Revoking JWT due to force_jwt_reconnect") jwt = get_jwt() revoke_jwt(jwt["jti"]) # Revoke the JWT try: successAuth = verify_jwt_in_request() # Re-verify the JWT after revocation to trigger error except Exception as e: raise except DecodeError as e: print("JWT decode error or not authorized regular token") pass except Exception as e: print("JWT is invalid") print(type(e).__name__) print(type(e)) raise elif auth and auth.username and auth.password: print("check access by Basic Auth") successAuth = check_auth(auth.username, auth.password) updateLastLogin(auth.username) if successAuth: return f(*args, **kwargs) Loading
app/model/security.py +17 −16 Original line number Diff line number Diff line Loading @@ -31,22 +31,23 @@ def check_api_key(key: str) -> bool: def check_token(auth_header: str) -> bool: token = auth_header.split(" ")[1] # Extract token query = """SELECT 1 FROM api_key WHERE api_key = %s AND is_valid = TRUE AND valid_until >= NOW() LIMIT 1;""" try: with db_cursor() as (conn, cur): cur.execute(query, (token,)) return ( cur.fetchone() is not None ) # Returns True if key exists, False otherwise except Exception as e: current_app.logger.error(f"Database error while checking API key: {e}") return False # Return False in case of any DB error return check_api_key(token) # Reuse the same function to check the token # query = """SELECT 1 # FROM api_key # WHERE api_key = %s # AND is_valid = TRUE # AND valid_until >= NOW() # LIMIT 1;""" # try: # with db_cursor() as (conn, cur): # cur.execute(query, (token,)) # return ( # cur.fetchone() is not None # ) # Returns True if key exists, False otherwise # except Exception as e: # current_app.logger.error(f"Database error while checking API key: {e}") # return False # Return False in case of any DB error def check_auth(username, password): Loading