| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117 | from collections.abc import Callablefrom functools import wrapsfrom typing import Optionalfrom flask import requestfrom flask_restful import reqparse  # type: ignorefrom pydantic import BaseModelfrom sqlalchemy.orm import Sessionfrom extensions.ext_database import dbfrom models.account import Account, Tenantfrom models.model import EndUserfrom services.account_service import AccountServicedef get_user(tenant_id: str, user_id: str | None) -> Account | EndUser:    try:        with Session(db.engine) as session:            if not user_id:                user_id = "DEFAULT-USER"            if user_id == "DEFAULT-USER":                user_model = session.query(EndUser).filter(EndUser.session_id == "DEFAULT-USER").first()                if not user_model:                    user_model = EndUser(                        tenant_id=tenant_id,                        type="service_api",                        is_anonymous=True if user_id == "DEFAULT-USER" else False,                        session_id=user_id,                    )                    session.add(user_model)                    session.commit()            else:                user_model = AccountService.load_user(user_id)                if not user_model:                    user_model = session.query(EndUser).filter(EndUser.id == user_id).first()                if not user_model:                    raise ValueError("user not found")    except Exception:        raise ValueError("user not found")    return user_modeldef get_user_tenant(view: Optional[Callable] = None):    def decorator(view_func):        @wraps(view_func)        def decorated_view(*args, **kwargs):            # fetch json body            parser = reqparse.RequestParser()            parser.add_argument("tenant_id", type=str, required=True, location="json")            parser.add_argument("user_id", type=str, required=True, location="json")            kwargs = parser.parse_args()            user_id = kwargs.get("user_id")            tenant_id = kwargs.get("tenant_id")            if not tenant_id:                raise ValueError("tenant_id is required")            if not user_id:                user_id = "DEFAULT-USER"            del kwargs["tenant_id"]            del kwargs["user_id"]            try:                tenant_model = (                    db.session.query(Tenant)                    .filter(                        Tenant.id == tenant_id,                    )                    .first()                )            except Exception:                raise ValueError("tenant not found")            if not tenant_model:                raise ValueError("tenant not found")            kwargs["tenant_model"] = tenant_model            kwargs["user_model"] = get_user(tenant_id, user_id)            return view_func(*args, **kwargs)        return decorated_view    if view is None:        return decorator    else:        return decorator(view)def plugin_data(view: Optional[Callable] = None, *, payload_type: type[BaseModel]):    def decorator(view_func):        def decorated_view(*args, **kwargs):            try:                data = request.get_json()            except Exception:                raise ValueError("invalid json")            try:                payload = payload_type(**data)            except Exception as e:                raise ValueError(f"invalid payload: {str(e)}")            kwargs["payload"] = payload            return view_func(*args, **kwargs)        return decorated_view    if view is None:        return decorator    else:        return decorator(view)
 |