Loading app/controller/apikey.py 0 → 100644 +249 −0 Original line number Diff line number Diff line from flask import Blueprint, request, jsonify, current_app from helpers.security import require_auth from helpers.limiter import limiter from model.apikey.apikey import ( insertAPIKey, getAPIKeys, getAPIKeysFull, getAPIKey, deleteAPIKey, updateAPIKey, ) from datetime import date, datetime from werkzeug.security import generate_password_hash, check_password_hash api_key_bp = Blueprint("apikey", __name__) @api_key_bp.route("/apikeys", methods=["GET"]) @require_auth @limiter.limit("10/second", override_defaults=False) def get_apikeys(): api_key_ids = getAPIKeys() if api_key_ids is False: return jsonify({"error": "Database error"}), 500 return jsonify({"apikeys": api_key_ids}) @api_key_bp.route("/apikeys/full", methods=["GET"]) @require_auth @limiter.limit("10/second", override_defaults=False) def get_apikeys_full(): apikeys = getAPIKeysFull() if apikeys is False: return jsonify({"error": "Database error"}), 500 return jsonify({"apikeys": apikeys}) @api_key_bp.route("/apikeys/<int:id>", methods=["GET"]) @require_auth @limiter.limit("10/second", override_defaults=False) def get_apikey(id): try: if not id: return jsonify({"error": "Missing APIKey id"}), 400 else: result = getAPIKey(id) if result == -1: return jsonify({"message": "apikey not found", "id": id}), 404 elif result: return jsonify(result) else: return jsonify({"error": "Failed to get apikey"}), 500 except Exception as e: return jsonify({"error": str(e)}), 500 @api_key_bp.route("/apikeys", methods=["PUT"]) @require_auth @limiter.limit("10/second", override_defaults=False) def create_apikey(): try: data = request.get_json() label = data.get("label") api_key = data.get("api_key") is_valid = data.get("is_valid") valid_until = data.get("valid_until") last_usage = data.get("last_usage") if "api_key" not in data: return jsonify({"error": "Missing api_key"}), 400 if "valid_until" not in data: return jsonify({"error": "Missing valid_until"}), 400 else: if data.get("valid_until") is None: return jsonify({"error": "Missing valid_until"}), 400 else: try: valid_until = datetime.strptime( data.get("valid_until"), "%Y-%m-%d %H:%M:%S" ) except ValueError: return ( jsonify( { "error": "valid_until must be in YYYY-MM-DD HH:MM:SS format" } ), 400, ) if "is_valid" in data and isinstance(data.get("is_valid"), bool): is_valid = data.get("is_valid") elif "is_valid" in data: return ( jsonify( { "error": "if you provide is_valid, it must be a boolean true or false" } ), 400, ) if "last_usage" in data: if data.get("last_usage") is not None: try: last_usage = datetime.strptime( data.get("last_usage"), "%Y-%m-%d %H:%M:%S" ) except ValueError: return ( jsonify( { "error": "last_usage must be in YYYY-MM-DD HH:MM:SS format" } ), 400, ) api_key_id = insertAPIKey(api_key, label, is_valid, valid_until, last_usage) if api_key_id > 0: return ( jsonify({"message": "APIKey inserted", "api_key_id": api_key_id}), 201, ) elif api_key_id == -1: return jsonify({"error": "apikey already exist"}), 409 else: return jsonify({"error": "Failed to insert apikey"}), 500 except Exception as e: return jsonify({"error": str(e)}), 500 @api_key_bp.route("/apikeys/<int:id>", methods=["POST"]) @require_auth @limiter.limit("10/second", override_defaults=False) def update_apikey(id): try: data = request.get_json() label = data.get("label") api_key = data.get("api_key") is_valid = data.get("is_valid") valid_until = data.get("valid_until") last_usage = data.get("last_usage") if "is_valid" in data and isinstance(data.get("is_valid"), bool): is_valid = data.get("is_valid") elif "is_valid" in data: return ( jsonify( { "error": "if you provide is_valid, it must be a boolean true or false" } ), 400, ) if "valid_until" in data: if data.get("valid_until") is None: return jsonify({"error": "if you provide valid_until, it cant be null"}), 400 else: try: valid_until = datetime.strptime( data.get("valid_until"), "%Y-%m-%d %H:%M:%S" ) except ValueError: return ( jsonify( { "error": "valid_until must be in YYYY-MM-DD HH:MM:SS format" } ), 400, ) if "last_usage" in data: if data.get("last_usage") is None: last_usage = "NULL" else: try: last_usage = datetime.strptime( data.get("last_usage"), "%Y-%m-%d %H:%M:%S" ) except ValueError: return ( jsonify( { "error": "last_usage must be in YYYY-MM-DD HH:MM:SS format" } ), 400, ) updatedAPIKey = updateAPIKey(id, label, is_valid, valid_until, last_usage) print(f"updatedAPIKey: {updatedAPIKey}") if updatedAPIKey == -1: return jsonify({"error": "the apikey is not existing"}), 500 elif updatedAPIKey == -2: return ( jsonify( {"error": "apikey is already used by another api key "} ), 500, ) elif updatedAPIKey: return ( jsonify({"message": "APIKey updated", "apikey": updatedAPIKey}), 200, ) else: return jsonify({"error": "Failed to update apikey"}), 500 except Exception as e: return jsonify({"error": str(e)}), 500 @api_key_bp.route("/apikeys/<int:id>", methods=["DELETE"]) @require_auth @limiter.limit("10/second", override_defaults=False) def delete_apikey(id): try: if not id: return jsonify({"error": "Missing APIKey id"}), 400 else: apikey = getAPIKey(id) if apikey == -1: return jsonify({"message": "apikey not found", "id": id}), 404 elif apikey: if deleteAPIKey(id): return jsonify({"message": "apikey deleted", "id": id}) else: return jsonify({"error": "Failed to delete apikey"}), 500 else: return jsonify({"error": "Failed to retrieve apikey"}), 500 except Exception as e: return jsonify({"error": str(e)}), 500 app/main.py +3 −1 Original line number Diff line number Diff line Loading @@ -5,7 +5,7 @@ from redis import Redis from flask_jwt_extended import JWTManager from app.helpers.db import db_cursor from config import Config from controller import diploma, healthcheck, auth, language, keyword, experience, about, service, email, user from controller import diploma, healthcheck, auth, language, keyword, experience, about, service, email, user, apikey from helpers.limiter import limiter from model.security import is_jwt_revoked Loading Loading @@ -82,6 +82,8 @@ app.register_blueprint(email.email_bp) app.register_blueprint(user.user_bp) app.register_blueprint(apikey.api_key_bp) if __name__ == "__main__": app.run(debug=True) app/model/apikey/apikey.py 0 → 100644 +207 −0 Original line number Diff line number Diff line from datetime import datetime from flask import current_app from typing import Any, Literal from helpers.db import db_cursor def getAPIKeys(): getAPIKeyQuery = """SELECT id FROM api_key """ print("getAPIKeyQuery", getAPIKeyQuery) try: with db_cursor(dict_results=True) as (conn, cur): cur.execute(getAPIKeyQuery) rows = cur.fetchall() return [row["id"] for row in rows] if rows else [] except Exception as e: current_app.logger.error(f"Database error while retrieving api_key: {e}") return False def getAPIKeysFull(): getAPIKeyQuery = """ SELECT id, CONCAT( LEFT(api_key, 4), REPEAT('*', GREATEST(LENGTH(api_key) - 8, 0)), RIGHT(api_key, 4) ) AS api_key, valid_until, is_valid, label, last_usage FROM api_key ORDER BY id """ try: with db_cursor(dict_results=True) as (conn, cur): cur.execute(getAPIKeyQuery) rows = cur.fetchall() return rows if rows else [] except Exception as e: current_app.logger.error(f"Database error while retrieving api_key: {e}") return False def getAPIKey(APIKeyId: int) -> Any | Literal[-1] | Literal[False]: getAPIKeyQuery = """SELECT id, CONCAT( LEFT(api_key, 4), REPEAT('*', GREATEST(LENGTH(api_key) - 8, 0)), RIGHT(api_key, 4) ) AS api_key, valid_until, is_valid, label, last_usage FROM api_key WHERE id = %s""" try: with db_cursor(dict_results=True) as (conn, cur): cur.execute(getAPIKeyQuery, (APIKeyId,)) row = cur.fetchone() return row if row else -1 except Exception as e: current_app.logger.error(f"Database error while retrieving api_key: {e}") return False def getAPIKeyByKey(APIKey: str) -> Any | Literal[-1] | Literal[False]: getAPIKeyQuery = ( """SELECT id, api_key, valid_until, is_valid, label, last_usage FROM api_key WHERE api_key = %s""" ) try: with db_cursor(dict_results=True) as (conn, cur): cur.execute(getAPIKeyQuery, (APIKey,)) row = cur.fetchone() return row if row else -1 except Exception as e: current_app.logger.error(f"Database error while retrieving api_key: {e}") return False def insertAPIKey(APIKey: str, label:str, is_valid: bool, valid_until: datetime , last_usage: datetime ) -> bool | int: ApiKeyExists = getAPIKeyByKey(APIKey) if(ApiKeyExists and ApiKeyExists != -1): return -1 InsertAPIKeyQuery = """INSERT INTO api_key (api_key, label, is_valid, valid_until, last_usage) VALUES (%s, %s, %s, %s, %s) RETURNING id;""" try: with db_cursor() as (conn, cur): if is_valid is None: is_valid = True cur.execute(InsertAPIKeyQuery, (APIKey, label, is_valid, valid_until, last_usage)) api_key_id = cur.fetchone()[0] conn.commit() return api_key_id except Exception as e: current_app.logger.error(f"Database error while inserting api_key: {e}") return False def deleteAPIKey(APIKeyId: int) -> bool: deleteAPIKeyQuery = """DELETE FROM api_key WHERE id = %s""" try: with db_cursor() as (conn, cur): cur.execute(deleteAPIKeyQuery, (APIKeyId,)) conn.commit() return True except Exception as e: current_app.logger.error(f"Database error while deleting api_key: {e}") return False def updateAPIKey(APIKeyId: int, label: str, is_valid: bool, valid_until: datetime, last_usage: datetime): getAPIKeyIdQuery = """SELECT id FROM api_key WHERE id = %s""" try: with db_cursor() as (conn, cur): cur.execute(getAPIKeyIdQuery, (APIKeyId,)) APIKeyIDToUpdate = cur.fetchone() if not APIKeyIDToUpdate: return -1 update_fields = [] update_values = [] if label: update_fields.append("label = %s") update_values.append(label) if is_valid is not None: # Explicitly checking for None update_fields.append("is_valid = %s") update_values.append(is_valid) if valid_until is not None: update_fields.append("valid_until = %s") update_values.append(valid_until) if last_usage is not None: if last_usage == "NULL": update_fields.append("last_usage = NULL") else: update_fields.append("last_usage = %s") update_values.append(last_usage) print("update_fields", update_fields , "update_values", update_values) if update_fields: update_query = f"""UPDATE api_key SET {', '.join(update_fields)} WHERE id = %s""" update_values.append(APIKeyId) cur.execute(update_query, tuple(update_values)) conn.commit() return APIKeyId # Returning the updated api_key ID return False # No update performed except Exception as e: current_app.logger.error(f"Database error while updating api_key: {e}") return False def updateLastUsage(APIKey: str, last_usage: datetime|None = None) -> bool: # when a default value is set in python function definition, # it is evaluated only once when the function is defined, # not each time the function is called. if last_usage is None: last_usage = datetime.now() api_key = getAPIKeyByKey(APIKey) print("api_key", api_key) return updateAPIKey( APIKeyId=api_key["id"], label=None, is_valid=None, valid_until=None, last_usage=last_usage ) Loading
app/controller/apikey.py 0 → 100644 +249 −0 Original line number Diff line number Diff line from flask import Blueprint, request, jsonify, current_app from helpers.security import require_auth from helpers.limiter import limiter from model.apikey.apikey import ( insertAPIKey, getAPIKeys, getAPIKeysFull, getAPIKey, deleteAPIKey, updateAPIKey, ) from datetime import date, datetime from werkzeug.security import generate_password_hash, check_password_hash api_key_bp = Blueprint("apikey", __name__) @api_key_bp.route("/apikeys", methods=["GET"]) @require_auth @limiter.limit("10/second", override_defaults=False) def get_apikeys(): api_key_ids = getAPIKeys() if api_key_ids is False: return jsonify({"error": "Database error"}), 500 return jsonify({"apikeys": api_key_ids}) @api_key_bp.route("/apikeys/full", methods=["GET"]) @require_auth @limiter.limit("10/second", override_defaults=False) def get_apikeys_full(): apikeys = getAPIKeysFull() if apikeys is False: return jsonify({"error": "Database error"}), 500 return jsonify({"apikeys": apikeys}) @api_key_bp.route("/apikeys/<int:id>", methods=["GET"]) @require_auth @limiter.limit("10/second", override_defaults=False) def get_apikey(id): try: if not id: return jsonify({"error": "Missing APIKey id"}), 400 else: result = getAPIKey(id) if result == -1: return jsonify({"message": "apikey not found", "id": id}), 404 elif result: return jsonify(result) else: return jsonify({"error": "Failed to get apikey"}), 500 except Exception as e: return jsonify({"error": str(e)}), 500 @api_key_bp.route("/apikeys", methods=["PUT"]) @require_auth @limiter.limit("10/second", override_defaults=False) def create_apikey(): try: data = request.get_json() label = data.get("label") api_key = data.get("api_key") is_valid = data.get("is_valid") valid_until = data.get("valid_until") last_usage = data.get("last_usage") if "api_key" not in data: return jsonify({"error": "Missing api_key"}), 400 if "valid_until" not in data: return jsonify({"error": "Missing valid_until"}), 400 else: if data.get("valid_until") is None: return jsonify({"error": "Missing valid_until"}), 400 else: try: valid_until = datetime.strptime( data.get("valid_until"), "%Y-%m-%d %H:%M:%S" ) except ValueError: return ( jsonify( { "error": "valid_until must be in YYYY-MM-DD HH:MM:SS format" } ), 400, ) if "is_valid" in data and isinstance(data.get("is_valid"), bool): is_valid = data.get("is_valid") elif "is_valid" in data: return ( jsonify( { "error": "if you provide is_valid, it must be a boolean true or false" } ), 400, ) if "last_usage" in data: if data.get("last_usage") is not None: try: last_usage = datetime.strptime( data.get("last_usage"), "%Y-%m-%d %H:%M:%S" ) except ValueError: return ( jsonify( { "error": "last_usage must be in YYYY-MM-DD HH:MM:SS format" } ), 400, ) api_key_id = insertAPIKey(api_key, label, is_valid, valid_until, last_usage) if api_key_id > 0: return ( jsonify({"message": "APIKey inserted", "api_key_id": api_key_id}), 201, ) elif api_key_id == -1: return jsonify({"error": "apikey already exist"}), 409 else: return jsonify({"error": "Failed to insert apikey"}), 500 except Exception as e: return jsonify({"error": str(e)}), 500 @api_key_bp.route("/apikeys/<int:id>", methods=["POST"]) @require_auth @limiter.limit("10/second", override_defaults=False) def update_apikey(id): try: data = request.get_json() label = data.get("label") api_key = data.get("api_key") is_valid = data.get("is_valid") valid_until = data.get("valid_until") last_usage = data.get("last_usage") if "is_valid" in data and isinstance(data.get("is_valid"), bool): is_valid = data.get("is_valid") elif "is_valid" in data: return ( jsonify( { "error": "if you provide is_valid, it must be a boolean true or false" } ), 400, ) if "valid_until" in data: if data.get("valid_until") is None: return jsonify({"error": "if you provide valid_until, it cant be null"}), 400 else: try: valid_until = datetime.strptime( data.get("valid_until"), "%Y-%m-%d %H:%M:%S" ) except ValueError: return ( jsonify( { "error": "valid_until must be in YYYY-MM-DD HH:MM:SS format" } ), 400, ) if "last_usage" in data: if data.get("last_usage") is None: last_usage = "NULL" else: try: last_usage = datetime.strptime( data.get("last_usage"), "%Y-%m-%d %H:%M:%S" ) except ValueError: return ( jsonify( { "error": "last_usage must be in YYYY-MM-DD HH:MM:SS format" } ), 400, ) updatedAPIKey = updateAPIKey(id, label, is_valid, valid_until, last_usage) print(f"updatedAPIKey: {updatedAPIKey}") if updatedAPIKey == -1: return jsonify({"error": "the apikey is not existing"}), 500 elif updatedAPIKey == -2: return ( jsonify( {"error": "apikey is already used by another api key "} ), 500, ) elif updatedAPIKey: return ( jsonify({"message": "APIKey updated", "apikey": updatedAPIKey}), 200, ) else: return jsonify({"error": "Failed to update apikey"}), 500 except Exception as e: return jsonify({"error": str(e)}), 500 @api_key_bp.route("/apikeys/<int:id>", methods=["DELETE"]) @require_auth @limiter.limit("10/second", override_defaults=False) def delete_apikey(id): try: if not id: return jsonify({"error": "Missing APIKey id"}), 400 else: apikey = getAPIKey(id) if apikey == -1: return jsonify({"message": "apikey not found", "id": id}), 404 elif apikey: if deleteAPIKey(id): return jsonify({"message": "apikey deleted", "id": id}) else: return jsonify({"error": "Failed to delete apikey"}), 500 else: return jsonify({"error": "Failed to retrieve apikey"}), 500 except Exception as e: return jsonify({"error": str(e)}), 500
app/main.py +3 −1 Original line number Diff line number Diff line Loading @@ -5,7 +5,7 @@ from redis import Redis from flask_jwt_extended import JWTManager from app.helpers.db import db_cursor from config import Config from controller import diploma, healthcheck, auth, language, keyword, experience, about, service, email, user from controller import diploma, healthcheck, auth, language, keyword, experience, about, service, email, user, apikey from helpers.limiter import limiter from model.security import is_jwt_revoked Loading Loading @@ -82,6 +82,8 @@ app.register_blueprint(email.email_bp) app.register_blueprint(user.user_bp) app.register_blueprint(apikey.api_key_bp) if __name__ == "__main__": app.run(debug=True)
app/model/apikey/apikey.py 0 → 100644 +207 −0 Original line number Diff line number Diff line from datetime import datetime from flask import current_app from typing import Any, Literal from helpers.db import db_cursor def getAPIKeys(): getAPIKeyQuery = """SELECT id FROM api_key """ print("getAPIKeyQuery", getAPIKeyQuery) try: with db_cursor(dict_results=True) as (conn, cur): cur.execute(getAPIKeyQuery) rows = cur.fetchall() return [row["id"] for row in rows] if rows else [] except Exception as e: current_app.logger.error(f"Database error while retrieving api_key: {e}") return False def getAPIKeysFull(): getAPIKeyQuery = """ SELECT id, CONCAT( LEFT(api_key, 4), REPEAT('*', GREATEST(LENGTH(api_key) - 8, 0)), RIGHT(api_key, 4) ) AS api_key, valid_until, is_valid, label, last_usage FROM api_key ORDER BY id """ try: with db_cursor(dict_results=True) as (conn, cur): cur.execute(getAPIKeyQuery) rows = cur.fetchall() return rows if rows else [] except Exception as e: current_app.logger.error(f"Database error while retrieving api_key: {e}") return False def getAPIKey(APIKeyId: int) -> Any | Literal[-1] | Literal[False]: getAPIKeyQuery = """SELECT id, CONCAT( LEFT(api_key, 4), REPEAT('*', GREATEST(LENGTH(api_key) - 8, 0)), RIGHT(api_key, 4) ) AS api_key, valid_until, is_valid, label, last_usage FROM api_key WHERE id = %s""" try: with db_cursor(dict_results=True) as (conn, cur): cur.execute(getAPIKeyQuery, (APIKeyId,)) row = cur.fetchone() return row if row else -1 except Exception as e: current_app.logger.error(f"Database error while retrieving api_key: {e}") return False def getAPIKeyByKey(APIKey: str) -> Any | Literal[-1] | Literal[False]: getAPIKeyQuery = ( """SELECT id, api_key, valid_until, is_valid, label, last_usage FROM api_key WHERE api_key = %s""" ) try: with db_cursor(dict_results=True) as (conn, cur): cur.execute(getAPIKeyQuery, (APIKey,)) row = cur.fetchone() return row if row else -1 except Exception as e: current_app.logger.error(f"Database error while retrieving api_key: {e}") return False def insertAPIKey(APIKey: str, label:str, is_valid: bool, valid_until: datetime , last_usage: datetime ) -> bool | int: ApiKeyExists = getAPIKeyByKey(APIKey) if(ApiKeyExists and ApiKeyExists != -1): return -1 InsertAPIKeyQuery = """INSERT INTO api_key (api_key, label, is_valid, valid_until, last_usage) VALUES (%s, %s, %s, %s, %s) RETURNING id;""" try: with db_cursor() as (conn, cur): if is_valid is None: is_valid = True cur.execute(InsertAPIKeyQuery, (APIKey, label, is_valid, valid_until, last_usage)) api_key_id = cur.fetchone()[0] conn.commit() return api_key_id except Exception as e: current_app.logger.error(f"Database error while inserting api_key: {e}") return False def deleteAPIKey(APIKeyId: int) -> bool: deleteAPIKeyQuery = """DELETE FROM api_key WHERE id = %s""" try: with db_cursor() as (conn, cur): cur.execute(deleteAPIKeyQuery, (APIKeyId,)) conn.commit() return True except Exception as e: current_app.logger.error(f"Database error while deleting api_key: {e}") return False def updateAPIKey(APIKeyId: int, label: str, is_valid: bool, valid_until: datetime, last_usage: datetime): getAPIKeyIdQuery = """SELECT id FROM api_key WHERE id = %s""" try: with db_cursor() as (conn, cur): cur.execute(getAPIKeyIdQuery, (APIKeyId,)) APIKeyIDToUpdate = cur.fetchone() if not APIKeyIDToUpdate: return -1 update_fields = [] update_values = [] if label: update_fields.append("label = %s") update_values.append(label) if is_valid is not None: # Explicitly checking for None update_fields.append("is_valid = %s") update_values.append(is_valid) if valid_until is not None: update_fields.append("valid_until = %s") update_values.append(valid_until) if last_usage is not None: if last_usage == "NULL": update_fields.append("last_usage = NULL") else: update_fields.append("last_usage = %s") update_values.append(last_usage) print("update_fields", update_fields , "update_values", update_values) if update_fields: update_query = f"""UPDATE api_key SET {', '.join(update_fields)} WHERE id = %s""" update_values.append(APIKeyId) cur.execute(update_query, tuple(update_values)) conn.commit() return APIKeyId # Returning the updated api_key ID return False # No update performed except Exception as e: current_app.logger.error(f"Database error while updating api_key: {e}") return False def updateLastUsage(APIKey: str, last_usage: datetime|None = None) -> bool: # when a default value is set in python function definition, # it is evaluated only once when the function is defined, # not each time the function is called. if last_usage is None: last_usage = datetime.now() api_key = getAPIKeyByKey(APIKey) print("api_key", api_key) return updateAPIKey( APIKeyId=api_key["id"], label=None, is_valid=None, valid_until=None, last_usage=last_usage )