| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293 | from functools import wrapsfrom flask import requestfrom flask_restful import Resource  # type: ignorefrom werkzeug.exceptions import BadRequest, NotFound, Unauthorizedfrom controllers.web.error import WebSSOAuthRequiredErrorfrom extensions.ext_database import dbfrom libs.passport import PassportServicefrom models.model import App, EndUser, Sitefrom services.enterprise.enterprise_service import EnterpriseServicefrom services.feature_service import FeatureServicedef validate_jwt_token(view=None):    def decorator(view):        @wraps(view)        def decorated(*args, **kwargs):            app_model, end_user = decode_jwt_token()            return view(app_model, end_user, *args, **kwargs)        return decorated    if view:        return decorator(view)    return decoratordef decode_jwt_token():    system_features = FeatureService.get_system_features()    app_code = request.headers.get("X-App-Code")    try:        auth_header = request.headers.get("Authorization")        if auth_header is None:            raise Unauthorized("Authorization header is missing.")        if " " not in auth_header:            raise Unauthorized("Invalid Authorization header format. Expected 'Bearer <api-key>' format.")        auth_scheme, tk = auth_header.split(None, 1)        auth_scheme = auth_scheme.lower()        if auth_scheme != "bearer":            raise Unauthorized("Invalid Authorization header format. Expected 'Bearer <api-key>' format.")        decoded = PassportService().verify(tk)        app_code = decoded.get("app_code")        app_model = db.session.query(App).filter(App.id == decoded["app_id"]).first()        site = db.session.query(Site).filter(Site.code == app_code).first()        if not app_model:            raise NotFound()        if not app_code or not site:            raise BadRequest("Site URL is no longer valid.")        if app_model.enable_site is False:            raise BadRequest("Site is disabled.")        end_user = db.session.query(EndUser).filter(EndUser.id == decoded["end_user_id"]).first()        if not end_user:            raise NotFound()        _validate_web_sso_token(decoded, system_features, app_code)        return app_model, end_user    except Unauthorized as e:        if system_features.sso_enforced_for_web:            app_web_sso_enabled = EnterpriseService.get_app_web_sso_enabled(app_code).get("enabled", False)            if app_web_sso_enabled:                raise WebSSOAuthRequiredError()        raise Unauthorized(e.description)def _validate_web_sso_token(decoded, system_features, app_code):    app_web_sso_enabled = False    # Check if SSO is enforced for web, and if the token source is not SSO, raise an error and redirect to SSO login    if system_features.sso_enforced_for_web:        app_web_sso_enabled = EnterpriseService.get_app_web_sso_enabled(app_code).get("enabled", False)        if app_web_sso_enabled:            source = decoded.get("token_source")            if not source or source != "sso":                raise WebSSOAuthRequiredError()    # Check if SSO is not enforced for web, and if the token source is SSO,    # raise an error and redirect to normal passport login    if not system_features.sso_enforced_for_web or not app_web_sso_enabled:        source = decoded.get("token_source")        if source and source == "sso":            raise Unauthorized("sso token expired.")class WebApiResource(Resource):    method_decorators = [validate_jwt_token]
 |