base.py 1.3 KB

12345678910111213141516171819202122232425262728293031323334353637
  1. import json
  2. from collections.abc import Generator
  3. from typing import Generic, Optional, TypeVar
  4. from pydantic import BaseModel
  5. class BaseBackwardsInvocation:
  6. @classmethod
  7. def convert_to_event_stream(cls, response: Generator[BaseModel | dict | str, None, None] | BaseModel | dict):
  8. if isinstance(response, Generator):
  9. try:
  10. for chunk in response:
  11. if isinstance(chunk, BaseModel):
  12. yield BaseBackwardsInvocationResponse(data=chunk).model_dump_json().encode() + b"\n\n"
  13. elif isinstance(chunk, str):
  14. yield f"event: {chunk}\n\n".encode()
  15. else:
  16. yield json.dumps(chunk).encode() + b"\n\n"
  17. except Exception as e:
  18. error_message = BaseBackwardsInvocationResponse(error=str(e)).model_dump_json()
  19. yield f"{error_message}\n\n".encode()
  20. else:
  21. if isinstance(response, BaseModel):
  22. yield response.model_dump_json().encode() + b"\n\n"
  23. else:
  24. yield json.dumps(response).encode() + b"\n\n"
  25. T = TypeVar("T", bound=BaseModel | dict | str | bool | int)
  26. class BaseBackwardsInvocationResponse(BaseModel, Generic[T]):
  27. data: Optional[T] = None
  28. error: str = ""