| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374 | from flask_login import current_user  # type: ignorefrom flask_restful import Resource, reqparse  # type: ignorefrom werkzeug.exceptions import Forbiddenfrom controllers.console import apifrom controllers.console.auth.error import ApiKeyAuthFailedErrorfrom libs.login import login_requiredfrom services.auth.api_key_auth_service import ApiKeyAuthServicefrom ..wraps import account_initialization_required, setup_requiredclass ApiKeyAuthDataSource(Resource):    @setup_required    @login_required    @account_initialization_required    def get(self):        data_source_api_key_bindings = ApiKeyAuthService.get_provider_auth_list(current_user.current_tenant_id)        if data_source_api_key_bindings:            return {                "sources": [                    {                        "id": data_source_api_key_binding.id,                        "category": data_source_api_key_binding.category,                        "provider": data_source_api_key_binding.provider,                        "disabled": data_source_api_key_binding.disabled,                        "created_at": int(data_source_api_key_binding.created_at.timestamp()),                        "updated_at": int(data_source_api_key_binding.updated_at.timestamp()),                    }                    for data_source_api_key_binding in data_source_api_key_bindings                ]            }        return {"sources": []}class ApiKeyAuthDataSourceBinding(Resource):    @setup_required    @login_required    @account_initialization_required    def post(self):        # The role of the current user in the table must be admin or owner        if not current_user.is_admin_or_owner:            raise Forbidden()        parser = reqparse.RequestParser()        parser.add_argument("category", type=str, required=True, nullable=False, location="json")        parser.add_argument("provider", type=str, required=True, nullable=False, location="json")        parser.add_argument("credentials", type=dict, required=True, nullable=False, location="json")        args = parser.parse_args()        ApiKeyAuthService.validate_api_key_auth_args(args)        try:            ApiKeyAuthService.create_provider_auth(current_user.current_tenant_id, args)        except Exception as e:            raise ApiKeyAuthFailedError(str(e))        return {"result": "success"}, 200class ApiKeyAuthDataSourceBindingDelete(Resource):    @setup_required    @login_required    @account_initialization_required    def delete(self, binding_id):        # The role of the current user in the table must be admin or owner        if not current_user.is_admin_or_owner:            raise Forbidden()        ApiKeyAuthService.delete_provider_auth(current_user.current_tenant_id, binding_id)        return {"result": "success"}, 200api.add_resource(ApiKeyAuthDataSource, "/api-key-auth/data-source")api.add_resource(ApiKeyAuthDataSourceBinding, "/api-key-auth/data-source/binding")api.add_resource(ApiKeyAuthDataSourceBindingDelete, "/api-key-auth/data-source/<uuid:binding_id>")
 |