data_source.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284
  1. import datetime
  2. import json
  3. from flask import request
  4. from flask_login import current_user
  5. from flask_restful import Resource, marshal_with, reqparse
  6. from sqlalchemy import select
  7. from sqlalchemy.orm import Session
  8. from werkzeug.exceptions import NotFound
  9. from controllers.console import api
  10. from controllers.console.setup import setup_required
  11. from controllers.console.wraps import account_initialization_required
  12. from core.indexing_runner import IndexingRunner
  13. from core.rag.extractor.entity.extract_setting import ExtractSetting
  14. from core.rag.extractor.notion_extractor import NotionExtractor
  15. from extensions.ext_database import db
  16. from fields.data_source_fields import integrate_list_fields, integrate_notion_info_list_fields
  17. from libs.login import login_required
  18. from models import DataSourceOauthBinding, Document
  19. from services.dataset_service import DatasetService, DocumentService
  20. from tasks.document_indexing_sync_task import document_indexing_sync_task
  21. class DataSourceApi(Resource):
  22. @setup_required
  23. @login_required
  24. @account_initialization_required
  25. @marshal_with(integrate_list_fields)
  26. def get(self):
  27. # get workspace data source integrates
  28. data_source_integrates = (
  29. db.session.query(DataSourceOauthBinding)
  30. .filter(
  31. DataSourceOauthBinding.tenant_id == current_user.current_tenant_id,
  32. DataSourceOauthBinding.disabled == False,
  33. )
  34. .all()
  35. )
  36. base_url = request.url_root.rstrip("/")
  37. data_source_oauth_base_path = "/console/api/oauth/data-source"
  38. providers = ["notion"]
  39. integrate_data = []
  40. for provider in providers:
  41. # existing_integrate = next((ai for ai in data_source_integrates if ai.provider == provider), None)
  42. existing_integrates = filter(lambda item: item.provider == provider, data_source_integrates)
  43. if existing_integrates:
  44. for existing_integrate in list(existing_integrates):
  45. integrate_data.append(
  46. {
  47. "id": existing_integrate.id,
  48. "provider": provider,
  49. "created_at": existing_integrate.created_at,
  50. "is_bound": True,
  51. "disabled": existing_integrate.disabled,
  52. "source_info": existing_integrate.source_info,
  53. "link": f"{base_url}{data_source_oauth_base_path}/{provider}",
  54. }
  55. )
  56. else:
  57. integrate_data.append(
  58. {
  59. "id": None,
  60. "provider": provider,
  61. "created_at": None,
  62. "source_info": None,
  63. "is_bound": False,
  64. "disabled": None,
  65. "link": f"{base_url}{data_source_oauth_base_path}/{provider}",
  66. }
  67. )
  68. return {"data": integrate_data}, 200
  69. @setup_required
  70. @login_required
  71. @account_initialization_required
  72. def patch(self, binding_id, action):
  73. binding_id = str(binding_id)
  74. action = str(action)
  75. with Session(db.engine) as session:
  76. data_source_binding = session.execute(
  77. select(DataSourceOauthBinding).filter_by(id=binding_id)
  78. ).scalar_one_or_none()
  79. if data_source_binding is None:
  80. raise NotFound("Data source binding not found.")
  81. # enable binding
  82. if action == "enable":
  83. if data_source_binding.disabled:
  84. data_source_binding.disabled = False
  85. data_source_binding.updated_at = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
  86. db.session.add(data_source_binding)
  87. db.session.commit()
  88. else:
  89. raise ValueError("Data source is not disabled.")
  90. # disable binding
  91. if action == "disable":
  92. if not data_source_binding.disabled:
  93. data_source_binding.disabled = True
  94. data_source_binding.updated_at = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
  95. db.session.add(data_source_binding)
  96. db.session.commit()
  97. else:
  98. raise ValueError("Data source is disabled.")
  99. return {"result": "success"}, 200
  100. class DataSourceNotionListApi(Resource):
  101. @setup_required
  102. @login_required
  103. @account_initialization_required
  104. @marshal_with(integrate_notion_info_list_fields)
  105. def get(self):
  106. dataset_id = request.args.get("dataset_id", default=None, type=str)
  107. exist_page_ids = []
  108. with Session(db.engine) as session:
  109. # import notion in the exist dataset
  110. if dataset_id:
  111. dataset = DatasetService.get_dataset(dataset_id)
  112. if not dataset:
  113. raise NotFound("Dataset not found.")
  114. if dataset.data_source_type != "notion_import":
  115. raise ValueError("Dataset is not notion type.")
  116. documents = session.execute(
  117. select(Document).filter_by(
  118. dataset_id=dataset_id,
  119. tenant_id=current_user.current_tenant_id,
  120. data_source_type="notion_import",
  121. enabled=True,
  122. )
  123. ).all()
  124. if documents:
  125. for document in documents:
  126. data_source_info = json.loads(document.data_source_info)
  127. exist_page_ids.append(data_source_info["notion_page_id"])
  128. # get all authorized pages
  129. data_source_bindings = session.execute(
  130. select(DataSourceOauthBinding).filter_by(
  131. tenant_id=current_user.current_tenant_id, provider="notion", disabled=False
  132. )
  133. ).all()
  134. if not data_source_bindings:
  135. return {"notion_info": []}, 200
  136. pre_import_info_list = []
  137. for data_source_binding in data_source_bindings:
  138. source_info = data_source_binding.source_info
  139. pages = source_info["pages"]
  140. # Filter out already bound pages
  141. for page in pages:
  142. if page["page_id"] in exist_page_ids:
  143. page["is_bound"] = True
  144. else:
  145. page["is_bound"] = False
  146. pre_import_info = {
  147. "workspace_name": source_info["workspace_name"],
  148. "workspace_icon": source_info["workspace_icon"],
  149. "workspace_id": source_info["workspace_id"],
  150. "pages": pages,
  151. }
  152. pre_import_info_list.append(pre_import_info)
  153. return {"notion_info": pre_import_info_list}, 200
  154. class DataSourceNotionApi(Resource):
  155. @setup_required
  156. @login_required
  157. @account_initialization_required
  158. def get(self, workspace_id, page_id, page_type):
  159. workspace_id = str(workspace_id)
  160. page_id = str(page_id)
  161. with Session(db.engine) as session:
  162. data_source_binding = session.execute(
  163. select(DataSourceOauthBinding).filter(
  164. db.and_(
  165. DataSourceOauthBinding.tenant_id == current_user.current_tenant_id,
  166. DataSourceOauthBinding.provider == "notion",
  167. DataSourceOauthBinding.disabled == False,
  168. DataSourceOauthBinding.source_info["workspace_id"] == f'"{workspace_id}"',
  169. )
  170. )
  171. ).scalar_one_or_none()
  172. if not data_source_binding:
  173. raise NotFound("Data source binding not found.")
  174. extractor = NotionExtractor(
  175. notion_workspace_id=workspace_id,
  176. notion_obj_id=page_id,
  177. notion_page_type=page_type,
  178. notion_access_token=data_source_binding.access_token,
  179. tenant_id=current_user.current_tenant_id,
  180. )
  181. text_docs = extractor.extract()
  182. return {"content": "\n".join([doc.page_content for doc in text_docs])}, 200
  183. @setup_required
  184. @login_required
  185. @account_initialization_required
  186. def post(self):
  187. parser = reqparse.RequestParser()
  188. parser.add_argument("notion_info_list", type=list, required=True, nullable=True, location="json")
  189. parser.add_argument("process_rule", type=dict, required=True, nullable=True, location="json")
  190. parser.add_argument("doc_form", type=str, default="text_model", required=False, nullable=False, location="json")
  191. parser.add_argument(
  192. "doc_language", type=str, default="English", required=False, nullable=False, location="json"
  193. )
  194. args = parser.parse_args()
  195. # validate args
  196. DocumentService.estimate_args_validate(args)
  197. notion_info_list = args["notion_info_list"]
  198. extract_settings = []
  199. for notion_info in notion_info_list:
  200. workspace_id = notion_info["workspace_id"]
  201. for page in notion_info["pages"]:
  202. extract_setting = ExtractSetting(
  203. datasource_type="notion_import",
  204. notion_info={
  205. "notion_workspace_id": workspace_id,
  206. "notion_obj_id": page["page_id"],
  207. "notion_page_type": page["type"],
  208. "tenant_id": current_user.current_tenant_id,
  209. },
  210. document_model=args["doc_form"],
  211. )
  212. extract_settings.append(extract_setting)
  213. indexing_runner = IndexingRunner()
  214. response = indexing_runner.indexing_estimate(
  215. current_user.current_tenant_id,
  216. extract_settings,
  217. args["process_rule"],
  218. args["doc_form"],
  219. args["doc_language"],
  220. )
  221. return response, 200
  222. class DataSourceNotionDatasetSyncApi(Resource):
  223. @setup_required
  224. @login_required
  225. @account_initialization_required
  226. def get(self, dataset_id):
  227. dataset_id_str = str(dataset_id)
  228. dataset = DatasetService.get_dataset(dataset_id_str)
  229. if dataset is None:
  230. raise NotFound("Dataset not found.")
  231. documents = DocumentService.get_document_by_dataset_id(dataset_id_str)
  232. for document in documents:
  233. document_indexing_sync_task.delay(dataset_id_str, document.id)
  234. return 200
  235. class DataSourceNotionDocumentSyncApi(Resource):
  236. @setup_required
  237. @login_required
  238. @account_initialization_required
  239. def get(self, dataset_id, document_id):
  240. dataset_id_str = str(dataset_id)
  241. document_id_str = str(document_id)
  242. dataset = DatasetService.get_dataset(dataset_id_str)
  243. if dataset is None:
  244. raise NotFound("Dataset not found.")
  245. document = DocumentService.get_document(dataset_id_str, document_id_str)
  246. if document is None:
  247. raise NotFound("Document not found.")
  248. document_indexing_sync_task.delay(dataset_id_str, document_id_str)
  249. return 200
  250. api.add_resource(DataSourceApi, "/data-source/integrates", "/data-source/integrates/<uuid:binding_id>/<string:action>")
  251. api.add_resource(DataSourceNotionListApi, "/notion/pre-import/pages")
  252. api.add_resource(
  253. DataSourceNotionApi,
  254. "/notion/workspaces/<uuid:workspace_id>/pages/<uuid:page_id>/<string:page_type>/preview",
  255. "/datasets/notion-indexing-estimate",
  256. )
  257. api.add_resource(DataSourceNotionDatasetSyncApi, "/datasets/<uuid:dataset_id>/notion/sync")
  258. api.add_resource(
  259. DataSourceNotionDocumentSyncApi, "/datasets/<uuid:dataset_id>/documents/<uuid:document_id>/notion/sync"
  260. )