client.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304
  1. import hashlib
  2. import json
  3. import logging
  4. import os
  5. import threading
  6. import time
  7. from pathlib import Path
  8. from .python_3x import http_request, makedirs_wrapper
  9. from .utils import (
  10. CONFIGURATIONS,
  11. NAMESPACE_NAME,
  12. NOTIFICATION_ID,
  13. get_value_from_dict,
  14. init_ip,
  15. no_key_cache_key,
  16. signature,
  17. url_encode_wrapper,
  18. )
  19. logger = logging.getLogger(__name__)
  20. class ApolloClient:
  21. def __init__(
  22. self,
  23. config_url,
  24. app_id,
  25. cluster="default",
  26. secret="",
  27. start_hot_update=True,
  28. change_listener=None,
  29. _notification_map=None,
  30. ):
  31. # Core routing parameters
  32. self.config_url = config_url
  33. self.cluster = cluster
  34. self.app_id = app_id
  35. # Non-core parameters
  36. self.ip = init_ip()
  37. self.secret = secret
  38. # Check the parameter variables
  39. # Private control variables
  40. self._cycle_time = 5
  41. self._stopping = False
  42. self._cache = {}
  43. self._no_key = {}
  44. self._hash = {}
  45. self._pull_timeout = 75
  46. self._cache_file_path = os.path.expanduser("~") + "/.dify/config/remote-settings/apollo/cache/"
  47. self._long_poll_thread = None
  48. self._change_listener = change_listener # "add" "delete" "update"
  49. if _notification_map is None:
  50. _notification_map = {"application": -1}
  51. self._notification_map = _notification_map
  52. self.last_release_key = None
  53. # Private startup method
  54. self._path_checker()
  55. if start_hot_update:
  56. self._start_hot_update()
  57. # start the heartbeat thread
  58. heartbeat = threading.Thread(target=self._heart_beat)
  59. heartbeat.daemon = True
  60. heartbeat.start()
  61. def get_json_from_net(self, namespace="application"):
  62. url = "{}/configs/{}/{}/{}?releaseKey={}&ip={}".format(
  63. self.config_url, self.app_id, self.cluster, namespace, "", self.ip
  64. )
  65. try:
  66. code, body = http_request(url, timeout=3, headers=self._sign_headers(url))
  67. if code == 200:
  68. if not body:
  69. logger.error(f"get_json_from_net load configs failed, body is {body}")
  70. return None
  71. data = json.loads(body)
  72. data = data["configurations"]
  73. return_data = {CONFIGURATIONS: data}
  74. return return_data
  75. else:
  76. return None
  77. except Exception:
  78. logger.exception("an error occurred in get_json_from_net")
  79. return None
  80. def get_value(self, key, default_val=None, namespace="application"):
  81. try:
  82. # read memory configuration
  83. namespace_cache = self._cache.get(namespace)
  84. val = get_value_from_dict(namespace_cache, key)
  85. if val is not None:
  86. return val
  87. no_key = no_key_cache_key(namespace, key)
  88. if no_key in self._no_key:
  89. return default_val
  90. # read the network configuration
  91. namespace_data = self.get_json_from_net(namespace)
  92. val = get_value_from_dict(namespace_data, key)
  93. if val is not None:
  94. self._update_cache_and_file(namespace_data, namespace)
  95. return val
  96. # read the file configuration
  97. namespace_cache = self._get_local_cache(namespace)
  98. val = get_value_from_dict(namespace_cache, key)
  99. if val is not None:
  100. self._update_cache_and_file(namespace_cache, namespace)
  101. return val
  102. # If all of them are not obtained, the default value is returned
  103. # and the local cache is set to None
  104. self._set_local_cache_none(namespace, key)
  105. return default_val
  106. except Exception:
  107. logger.exception("get_value has error, [key is %s], [namespace is %s]", key, namespace)
  108. return default_val
  109. # Set the key of a namespace to none, and do not set default val
  110. # to ensure the real-time correctness of the function call.
  111. # If the user does not have the same default val twice
  112. # and the default val is used here, there may be a problem.
  113. def _set_local_cache_none(self, namespace, key):
  114. no_key = no_key_cache_key(namespace, key)
  115. self._no_key[no_key] = key
  116. def _start_hot_update(self):
  117. self._long_poll_thread = threading.Thread(target=self._listener)
  118. # When the asynchronous thread is started, the daemon thread will automatically exit
  119. # when the main thread is launched.
  120. self._long_poll_thread.daemon = True
  121. self._long_poll_thread.start()
  122. def stop(self):
  123. self._stopping = True
  124. logger.info("Stopping listener...")
  125. # Call the set callback function, and if it is abnormal, try it out
  126. def _call_listener(self, namespace, old_kv, new_kv):
  127. if self._change_listener is None:
  128. return
  129. if old_kv is None:
  130. old_kv = {}
  131. if new_kv is None:
  132. new_kv = {}
  133. try:
  134. for key in old_kv:
  135. new_value = new_kv.get(key)
  136. old_value = old_kv.get(key)
  137. if new_value is None:
  138. # If newValue is empty, it means key, and the value is deleted.
  139. self._change_listener("delete", namespace, key, old_value)
  140. continue
  141. if new_value != old_value:
  142. self._change_listener("update", namespace, key, new_value)
  143. continue
  144. for key in new_kv:
  145. new_value = new_kv.get(key)
  146. old_value = old_kv.get(key)
  147. if old_value is None:
  148. self._change_listener("add", namespace, key, new_value)
  149. except BaseException as e:
  150. logger.warning(str(e))
  151. def _path_checker(self):
  152. if not os.path.isdir(self._cache_file_path):
  153. makedirs_wrapper(self._cache_file_path)
  154. # update the local cache and file cache
  155. def _update_cache_and_file(self, namespace_data, namespace="application"):
  156. # update the local cache
  157. self._cache[namespace] = namespace_data
  158. # update the file cache
  159. new_string = json.dumps(namespace_data)
  160. new_hash = hashlib.md5(new_string.encode("utf-8")).hexdigest()
  161. if self._hash.get(namespace) == new_hash:
  162. pass
  163. else:
  164. file_path = Path(self._cache_file_path) / f"{self.app_id}_configuration_{namespace}.txt"
  165. file_path.write_text(new_string)
  166. self._hash[namespace] = new_hash
  167. # get the configuration from the local file
  168. def _get_local_cache(self, namespace="application"):
  169. cache_file_path = os.path.join(self._cache_file_path, f"{self.app_id}_configuration_{namespace}.txt")
  170. if os.path.isfile(cache_file_path):
  171. with open(cache_file_path) as f:
  172. result = json.loads(f.readline())
  173. return result
  174. return {}
  175. def _long_poll(self):
  176. notifications = []
  177. for key in self._cache:
  178. namespace_data = self._cache[key]
  179. notification_id = -1
  180. if NOTIFICATION_ID in namespace_data:
  181. notification_id = self._cache[key][NOTIFICATION_ID]
  182. notifications.append({NAMESPACE_NAME: key, NOTIFICATION_ID: notification_id})
  183. try:
  184. # if the length is 0 it is returned directly
  185. if len(notifications) == 0:
  186. return
  187. url = "{}/notifications/v2".format(self.config_url)
  188. params = {
  189. "appId": self.app_id,
  190. "cluster": self.cluster,
  191. "notifications": json.dumps(notifications, ensure_ascii=False),
  192. }
  193. param_str = url_encode_wrapper(params)
  194. url = url + "?" + param_str
  195. code, body = http_request(url, self._pull_timeout, headers=self._sign_headers(url))
  196. http_code = code
  197. if http_code == 304:
  198. logger.debug("No change, loop...")
  199. return
  200. if http_code == 200:
  201. if not body:
  202. logger.error(f"_long_poll load configs failed,body is {body}")
  203. return
  204. data = json.loads(body)
  205. for entry in data:
  206. namespace = entry[NAMESPACE_NAME]
  207. n_id = entry[NOTIFICATION_ID]
  208. logger.info("%s has changes: notificationId=%d", namespace, n_id)
  209. self._get_net_and_set_local(namespace, n_id, call_change=True)
  210. return
  211. else:
  212. logger.warning("Sleep...")
  213. except Exception as e:
  214. logger.warning(str(e))
  215. def _get_net_and_set_local(self, namespace, n_id, call_change=False):
  216. namespace_data = self.get_json_from_net(namespace)
  217. if not namespace_data:
  218. return
  219. namespace_data[NOTIFICATION_ID] = n_id
  220. old_namespace = self._cache.get(namespace)
  221. self._update_cache_and_file(namespace_data, namespace)
  222. if self._change_listener is not None and call_change and old_namespace:
  223. old_kv = old_namespace.get(CONFIGURATIONS)
  224. new_kv = namespace_data.get(CONFIGURATIONS)
  225. self._call_listener(namespace, old_kv, new_kv)
  226. def _listener(self):
  227. logger.info("start long_poll")
  228. while not self._stopping:
  229. self._long_poll()
  230. time.sleep(self._cycle_time)
  231. logger.info("stopped, long_poll")
  232. # add the need for endorsement to the header
  233. def _sign_headers(self, url):
  234. headers = {}
  235. if self.secret == "":
  236. return headers
  237. uri = url[len(self.config_url) : len(url)]
  238. time_unix_now = str(int(round(time.time() * 1000)))
  239. headers["Authorization"] = "Apollo " + self.app_id + ":" + signature(time_unix_now, uri, self.secret)
  240. headers["Timestamp"] = time_unix_now
  241. return headers
  242. def _heart_beat(self):
  243. while not self._stopping:
  244. for namespace in self._notification_map:
  245. self._do_heart_beat(namespace)
  246. time.sleep(60 * 10) # 10分钟
  247. def _do_heart_beat(self, namespace):
  248. url = "{}/configs/{}/{}/{}?ip={}".format(self.config_url, self.app_id, self.cluster, namespace, self.ip)
  249. try:
  250. code, body = http_request(url, timeout=3, headers=self._sign_headers(url))
  251. if code == 200:
  252. if not body:
  253. logger.error(f"_do_heart_beat load configs failed,body is {body}")
  254. return None
  255. data = json.loads(body)
  256. if self.last_release_key == data["releaseKey"]:
  257. return None
  258. self.last_release_key = data["releaseKey"]
  259. data = data["configurations"]
  260. self._update_cache_and_file(data, namespace)
  261. else:
  262. return None
  263. except Exception:
  264. logger.exception("an error occurred in _do_heart_beat")
  265. return None
  266. def get_all_dicts(self, namespace):
  267. namespace_data = self._cache.get(namespace)
  268. if namespace_data is None:
  269. net_namespace_data = self.get_json_from_net(namespace)
  270. if not net_namespace_data:
  271. return namespace_data
  272. namespace_data = net_namespace_data.get(CONFIGURATIONS)
  273. if namespace_data:
  274. self._update_cache_and_file(namespace_data, namespace)
  275. return namespace_data