web_conversation_service.py 3.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. from typing import Optional, Union
  2. from core.app.entities.app_invoke_entities import InvokeFrom
  3. from extensions.ext_database import db
  4. from libs.infinite_scroll_pagination import InfiniteScrollPagination
  5. from models.account import Account
  6. from models.model import App, EndUser
  7. from models.web import PinnedConversation
  8. from services.conversation_service import ConversationService
  9. class WebConversationService:
  10. @classmethod
  11. def pagination_by_last_id(cls, app_model: App, user: Optional[Union[Account, EndUser]],
  12. last_id: Optional[str], limit: int, invoke_from: InvokeFrom,
  13. pinned: Optional[bool] = None,
  14. sort_by='-updated_at') -> InfiniteScrollPagination:
  15. include_ids = None
  16. exclude_ids = None
  17. if pinned is not None:
  18. pinned_conversations = db.session.query(PinnedConversation).filter(
  19. PinnedConversation.app_id == app_model.id,
  20. PinnedConversation.created_by_role == ('account' if isinstance(user, Account) else 'end_user'),
  21. PinnedConversation.created_by == user.id
  22. ).order_by(PinnedConversation.created_at.desc()).all()
  23. pinned_conversation_ids = [pc.conversation_id for pc in pinned_conversations]
  24. if pinned:
  25. include_ids = pinned_conversation_ids
  26. else:
  27. exclude_ids = pinned_conversation_ids
  28. return ConversationService.pagination_by_last_id(
  29. app_model=app_model,
  30. user=user,
  31. last_id=last_id,
  32. limit=limit,
  33. invoke_from=invoke_from,
  34. include_ids=include_ids,
  35. exclude_ids=exclude_ids,
  36. sort_by=sort_by
  37. )
  38. @classmethod
  39. def pin(cls, app_model: App, conversation_id: str, user: Optional[Union[Account, EndUser]]):
  40. pinned_conversation = db.session.query(PinnedConversation).filter(
  41. PinnedConversation.app_id == app_model.id,
  42. PinnedConversation.conversation_id == conversation_id,
  43. PinnedConversation.created_by_role == ('account' if isinstance(user, Account) else 'end_user'),
  44. PinnedConversation.created_by == user.id
  45. ).first()
  46. if pinned_conversation:
  47. return
  48. conversation = ConversationService.get_conversation(
  49. app_model=app_model,
  50. conversation_id=conversation_id,
  51. user=user
  52. )
  53. pinned_conversation = PinnedConversation(
  54. app_id=app_model.id,
  55. conversation_id=conversation.id,
  56. created_by_role='account' if isinstance(user, Account) else 'end_user',
  57. created_by=user.id
  58. )
  59. db.session.add(pinned_conversation)
  60. db.session.commit()
  61. @classmethod
  62. def unpin(cls, app_model: App, conversation_id: str, user: Optional[Union[Account, EndUser]]):
  63. pinned_conversation = db.session.query(PinnedConversation).filter(
  64. PinnedConversation.app_id == app_model.id,
  65. PinnedConversation.conversation_id == conversation_id,
  66. PinnedConversation.created_by_role == ('account' if isinstance(user, Account) else 'end_user'),
  67. PinnedConversation.created_by == user.id
  68. ).first()
  69. if not pinned_conversation:
  70. return
  71. db.session.delete(pinned_conversation)
  72. db.session.commit()