| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475 | from flask_login import current_userfrom flask_restful import Resource, reqparsefrom werkzeug.exceptions import Forbiddenfrom controllers.console import apifrom controllers.console.auth.error import ApiKeyAuthFailedErrorfrom libs.login import login_requiredfrom services.auth.api_key_auth_service import ApiKeyAuthServicefrom ..setup import setup_requiredfrom ..wraps import account_initialization_requiredclass ApiKeyAuthDataSource(Resource):    @setup_required    @login_required    @account_initialization_required    def get(self):        data_source_api_key_bindings = ApiKeyAuthService.get_provider_auth_list(current_user.current_tenant_id)        if data_source_api_key_bindings:            return {                "sources": [                    {                        "id": data_source_api_key_binding.id,                        "category": data_source_api_key_binding.category,                        "provider": data_source_api_key_binding.provider,                        "disabled": data_source_api_key_binding.disabled,                        "created_at": int(data_source_api_key_binding.created_at.timestamp()),                        "updated_at": int(data_source_api_key_binding.updated_at.timestamp()),                    }                    for data_source_api_key_binding in data_source_api_key_bindings                ]            }        return {"sources": []}class ApiKeyAuthDataSourceBinding(Resource):    @setup_required    @login_required    @account_initialization_required    def post(self):        # The role of the current user in the table must be admin or owner        if not current_user.is_admin_or_owner:            raise Forbidden()        parser = reqparse.RequestParser()        parser.add_argument("category", type=str, required=True, nullable=False, location="json")        parser.add_argument("provider", type=str, required=True, nullable=False, location="json")        parser.add_argument("credentials", type=dict, required=True, nullable=False, location="json")        args = parser.parse_args()        ApiKeyAuthService.validate_api_key_auth_args(args)        try:            ApiKeyAuthService.create_provider_auth(current_user.current_tenant_id, args)        except Exception as e:            raise ApiKeyAuthFailedError(str(e))        return {"result": "success"}, 200class ApiKeyAuthDataSourceBindingDelete(Resource):    @setup_required    @login_required    @account_initialization_required    def delete(self, binding_id):        # The role of the current user in the table must be admin or owner        if not current_user.is_admin_or_owner:            raise Forbidden()        ApiKeyAuthService.delete_provider_auth(current_user.current_tenant_id, binding_id)        return {"result": "success"}, 200api.add_resource(ApiKeyAuthDataSource, "/api-key-auth/data-source")api.add_resource(ApiKeyAuthDataSourceBinding, "/api-key-auth/data-source/binding")api.add_resource(ApiKeyAuthDataSourceBindingDelete, "/api-key-auth/data-source/<uuid:binding_id>")
 |