| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374 | from flask_login import current_userfrom flask_restful import Resource, reqparsefrom werkzeug.exceptions import Forbiddenfrom controllers.console import apifrom controllers.console.auth.error import ApiKeyAuthFailedErrorfrom libs.login import login_requiredfrom services.auth.api_key_auth_service import ApiKeyAuthServicefrom ..setup import setup_requiredfrom ..wraps import account_initialization_requiredclass ApiKeyAuthDataSource(Resource):    @setup_required    @login_required    @account_initialization_required    def get(self):        data_source_api_key_bindings = ApiKeyAuthService.get_provider_auth_list(current_user.current_tenant_id)        if data_source_api_key_bindings:            return {                'sources': [{                                'id': data_source_api_key_binding.id,                                'category': data_source_api_key_binding.category,                                'provider': data_source_api_key_binding.provider,                                'disabled': data_source_api_key_binding.disabled,                                'created_at': int(data_source_api_key_binding.created_at.timestamp()),                                'updated_at': int(data_source_api_key_binding.updated_at.timestamp()),                            }                            for data_source_api_key_binding in                             data_source_api_key_bindings]            }        return {'sources': []}class ApiKeyAuthDataSourceBinding(Resource):    @setup_required    @login_required    @account_initialization_required    def post(self):        # The role of the current user in the table must be admin or owner        if not current_user.is_admin_or_owner:            raise Forbidden()        parser = reqparse.RequestParser()        parser.add_argument('category', type=str, required=True, nullable=False, location='json')        parser.add_argument('provider', type=str, required=True, nullable=False, location='json')        parser.add_argument('credentials', type=dict, required=True, nullable=False, location='json')        args = parser.parse_args()        ApiKeyAuthService.validate_api_key_auth_args(args)        try:            ApiKeyAuthService.create_provider_auth(current_user.current_tenant_id, args)        except Exception as e:            raise ApiKeyAuthFailedError(str(e))        return {'result': 'success'}, 200class ApiKeyAuthDataSourceBindingDelete(Resource):    @setup_required    @login_required    @account_initialization_required    def delete(self, binding_id):        # The role of the current user in the table must be admin or owner        if not current_user.is_admin_or_owner:            raise Forbidden()        ApiKeyAuthService.delete_provider_auth(current_user.current_tenant_id, binding_id)        return {'result': 'success'}, 200api.add_resource(ApiKeyAuthDataSource, '/api-key-auth/data-source')api.add_resource(ApiKeyAuthDataSourceBinding, '/api-key-auth/data-source/binding')api.add_resource(ApiKeyAuthDataSourceBindingDelete, '/api-key-auth/data-source/<uuid:binding_id>')
 |