| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283 | from functools import wrapsfrom flask import requestfrom flask_restful import Resourcefrom werkzeug.exceptions import BadRequest, NotFound, Unauthorizedfrom controllers.web.error import WebSSOAuthRequiredErrorfrom extensions.ext_database import dbfrom libs.passport import PassportServicefrom models.model import App, EndUser, Sitefrom services.feature_service import FeatureServicedef validate_jwt_token(view=None):    def decorator(view):        @wraps(view)        def decorated(*args, **kwargs):            app_model, end_user = decode_jwt_token()            return view(app_model, end_user, *args, **kwargs)        return decorated    if view:        return decorator(view)    return decoratordef decode_jwt_token():    system_features = FeatureService.get_system_features()    try:        auth_header = request.headers.get('Authorization')        if auth_header is None:            raise Unauthorized('Authorization header is missing.')        if ' ' not in auth_header:            raise Unauthorized('Invalid Authorization header format. Expected \'Bearer <api-key>\' format.')        auth_scheme, tk = auth_header.split(None, 1)        auth_scheme = auth_scheme.lower()        if auth_scheme != 'bearer':            raise Unauthorized('Invalid Authorization header format. Expected \'Bearer <api-key>\' format.')        decoded = PassportService().verify(tk)        app_code = decoded.get('app_code')        app_model = db.session.query(App).filter(App.id == decoded['app_id']).first()        site = db.session.query(Site).filter(Site.code == app_code).first()        if not app_model:            raise NotFound()        if not app_code or not site:            raise BadRequest('Site URL is no longer valid.')        if app_model.enable_site is False:            raise BadRequest('Site is disabled.')        end_user = db.session.query(EndUser).filter(EndUser.id == decoded['end_user_id']).first()        if not end_user:            raise NotFound()        _validate_web_sso_token(decoded, system_features)        return app_model, end_user    except Unauthorized as e:        if system_features.sso_enforced_for_web:            raise WebSSOAuthRequiredError()        raise Unauthorized(e.description)def _validate_web_sso_token(decoded, system_features):    # Check if SSO is enforced for web, and if the token source is not SSO, raise an error and redirect to SSO login    if system_features.sso_enforced_for_web:        source = decoded.get('token_source')        if not source or source != 'sso':            raise WebSSOAuthRequiredError()    # Check if SSO is not enforced for web, and if the token source is SSO, raise an error and redirect to normal passport login    if not system_features.sso_enforced_for_web:        source = decoded.get('token_source')        if source and source == 'sso':            raise Unauthorized('sso token expired.')class WebApiResource(Resource):    method_decorators = [validate_jwt_token]
 |