base.py 1.2 KB

12345678910111213141516171819202122232425262728293031323334
  1. import json
  2. from collections.abc import Generator
  3. from typing import Generic, Optional, TypeVar
  4. from pydantic import BaseModel
  5. class BaseBackwardsInvocation:
  6. @classmethod
  7. def convert_to_event_stream(cls, response: Generator[BaseModel | dict | str, None, None] | BaseModel | dict):
  8. if isinstance(response, Generator):
  9. try:
  10. for chunk in response:
  11. if isinstance(chunk, BaseModel | dict):
  12. yield BaseBackwardsInvocationResponse(data=chunk).model_dump_json().encode() + b"\n\n"
  13. elif isinstance(chunk, str):
  14. yield f"event: {chunk}\n\n".encode()
  15. except Exception as e:
  16. error_message = BaseBackwardsInvocationResponse(error=str(e)).model_dump_json()
  17. yield f"{error_message}\n\n".encode()
  18. else:
  19. if isinstance(response, BaseModel):
  20. yield response.model_dump_json().encode() + b"\n\n"
  21. else:
  22. yield json.dumps(response).encode() + b"\n\n"
  23. T = TypeVar("T", bound=dict | str | bool | int | BaseModel)
  24. class BaseBackwardsInvocationResponse(BaseModel, Generic[T]):
  25. data: Optional[T] = None
  26. error: str = ""