web_conversation_service.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. from typing import Optional, Union
  2. from sqlalchemy import select
  3. from sqlalchemy.orm import Session
  4. from core.app.entities.app_invoke_entities import InvokeFrom
  5. from extensions.ext_database import db
  6. from libs.infinite_scroll_pagination import InfiniteScrollPagination
  7. from models.account import Account
  8. from models.model import App, EndUser
  9. from models.web import PinnedConversation
  10. from services.conversation_service import ConversationService
  11. class WebConversationService:
  12. @classmethod
  13. def pagination_by_last_id(
  14. cls,
  15. *,
  16. session: Session,
  17. app_model: App,
  18. user: Optional[Union[Account, EndUser]],
  19. last_id: Optional[str],
  20. limit: int,
  21. invoke_from: InvokeFrom,
  22. pinned: Optional[bool] = None,
  23. sort_by="-updated_at",
  24. ) -> InfiniteScrollPagination:
  25. include_ids = None
  26. exclude_ids = None
  27. if pinned is not None and user:
  28. stmt = (
  29. select(PinnedConversation.conversation_id)
  30. .where(
  31. PinnedConversation.app_id == app_model.id,
  32. PinnedConversation.created_by_role == ("account" if isinstance(user, Account) else "end_user"),
  33. PinnedConversation.created_by == user.id,
  34. )
  35. .order_by(PinnedConversation.created_at.desc())
  36. )
  37. pinned_conversation_ids = session.scalars(stmt).all()
  38. if pinned:
  39. include_ids = pinned_conversation_ids
  40. else:
  41. exclude_ids = pinned_conversation_ids
  42. return ConversationService.pagination_by_last_id(
  43. session=session,
  44. app_model=app_model,
  45. user=user,
  46. last_id=last_id,
  47. limit=limit,
  48. invoke_from=invoke_from,
  49. include_ids=include_ids,
  50. exclude_ids=exclude_ids,
  51. sort_by=sort_by,
  52. )
  53. @classmethod
  54. def pin(cls, app_model: App, conversation_id: str, user: Optional[Union[Account, EndUser]]):
  55. pinned_conversation = (
  56. db.session.query(PinnedConversation)
  57. .filter(
  58. PinnedConversation.app_id == app_model.id,
  59. PinnedConversation.conversation_id == conversation_id,
  60. PinnedConversation.created_by_role == ("account" if isinstance(user, Account) else "end_user"),
  61. PinnedConversation.created_by == user.id,
  62. )
  63. .first()
  64. )
  65. if pinned_conversation:
  66. return
  67. conversation = ConversationService.get_conversation(
  68. app_model=app_model, conversation_id=conversation_id, user=user
  69. )
  70. pinned_conversation = PinnedConversation(
  71. app_id=app_model.id,
  72. conversation_id=conversation.id,
  73. created_by_role="account" if isinstance(user, Account) else "end_user",
  74. created_by=user.id,
  75. )
  76. db.session.add(pinned_conversation)
  77. db.session.commit()
  78. @classmethod
  79. def unpin(cls, app_model: App, conversation_id: str, user: Optional[Union[Account, EndUser]]):
  80. pinned_conversation = (
  81. db.session.query(PinnedConversation)
  82. .filter(
  83. PinnedConversation.app_id == app_model.id,
  84. PinnedConversation.conversation_id == conversation_id,
  85. PinnedConversation.created_by_role == ("account" if isinstance(user, Account) else "end_user"),
  86. PinnedConversation.created_by == user.id,
  87. )
  88. .first()
  89. )
  90. if not pinned_conversation:
  91. return
  92. db.session.delete(pinned_conversation)
  93. db.session.commit()