| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305 | import hashlibimport jsonimport loggingimport osimport threadingimport timefrom collections.abc import Mappingfrom pathlib import Pathfrom .python_3x import http_request, makedirs_wrapperfrom .utils import (    CONFIGURATIONS,    NAMESPACE_NAME,    NOTIFICATION_ID,    get_value_from_dict,    init_ip,    no_key_cache_key,    signature,    url_encode_wrapper,)logger = logging.getLogger(__name__)class ApolloClient:    def __init__(        self,        config_url,        app_id,        cluster="default",        secret="",        start_hot_update=True,        change_listener=None,        _notification_map=None,    ):        # Core routing parameters        self.config_url = config_url        self.cluster = cluster        self.app_id = app_id        # Non-core parameters        self.ip = init_ip()        self.secret = secret        # Check the parameter variables        # Private control variables        self._cycle_time = 5        self._stopping = False        self._cache = {}        self._no_key = {}        self._hash = {}        self._pull_timeout = 75        self._cache_file_path = os.path.expanduser("~") + "/.dify/config/remote-settings/apollo/cache/"        self._long_poll_thread = None        self._change_listener = change_listener  # "add" "delete" "update"        if _notification_map is None:            _notification_map = {"application": -1}        self._notification_map = _notification_map        self.last_release_key = None        # Private startup method        self._path_checker()        if start_hot_update:            self._start_hot_update()        # start the heartbeat thread        heartbeat = threading.Thread(target=self._heart_beat)        heartbeat.daemon = True        heartbeat.start()    def get_json_from_net(self, namespace="application"):        url = "{}/configs/{}/{}/{}?releaseKey={}&ip={}".format(            self.config_url, self.app_id, self.cluster, namespace, "", self.ip        )        try:            code, body = http_request(url, timeout=3, headers=self._sign_headers(url))            if code == 200:                if not body:                    logger.error(f"get_json_from_net load configs failed, body is {body}")                    return None                data = json.loads(body)                data = data["configurations"]                return_data = {CONFIGURATIONS: data}                return return_data            else:                return None        except Exception:            logger.exception("an error occurred in get_json_from_net")            return None    def get_value(self, key, default_val=None, namespace="application"):        try:            # read memory configuration            namespace_cache = self._cache.get(namespace)            val = get_value_from_dict(namespace_cache, key)            if val is not None:                return val            no_key = no_key_cache_key(namespace, key)            if no_key in self._no_key:                return default_val            # read the network configuration            namespace_data = self.get_json_from_net(namespace)            val = get_value_from_dict(namespace_data, key)            if val is not None:                self._update_cache_and_file(namespace_data, namespace)                return val            # read the file configuration            namespace_cache = self._get_local_cache(namespace)            val = get_value_from_dict(namespace_cache, key)            if val is not None:                self._update_cache_and_file(namespace_cache, namespace)                return val            # If all of them are not obtained, the default value is returned            # and the local cache is set to None            self._set_local_cache_none(namespace, key)            return default_val        except Exception:            logger.exception("get_value has error, [key is %s], [namespace is %s]", key, namespace)            return default_val    # Set the key of a namespace to none, and do not set default val    # to ensure the real-time correctness of the function call.    # If the user does not have the same default val twice    # and the default val is used here, there may be a problem.    def _set_local_cache_none(self, namespace, key):        no_key = no_key_cache_key(namespace, key)        self._no_key[no_key] = key    def _start_hot_update(self):        self._long_poll_thread = threading.Thread(target=self._listener)        # When the asynchronous thread is started, the daemon thread will automatically exit        # when the main thread is launched.        self._long_poll_thread.daemon = True        self._long_poll_thread.start()    def stop(self):        self._stopping = True        logger.info("Stopping listener...")    # Call the set callback function, and if it is abnormal, try it out    def _call_listener(self, namespace, old_kv, new_kv):        if self._change_listener is None:            return        if old_kv is None:            old_kv = {}        if new_kv is None:            new_kv = {}        try:            for key in old_kv:                new_value = new_kv.get(key)                old_value = old_kv.get(key)                if new_value is None:                    # If newValue is empty, it means key, and the value is deleted.                    self._change_listener("delete", namespace, key, old_value)                    continue                if new_value != old_value:                    self._change_listener("update", namespace, key, new_value)                    continue            for key in new_kv:                new_value = new_kv.get(key)                old_value = old_kv.get(key)                if old_value is None:                    self._change_listener("add", namespace, key, new_value)        except BaseException as e:            logger.warning(str(e))    def _path_checker(self):        if not os.path.isdir(self._cache_file_path):            makedirs_wrapper(self._cache_file_path)    # update the local cache and file cache    def _update_cache_and_file(self, namespace_data, namespace="application"):        # update the local cache        self._cache[namespace] = namespace_data        # update the file cache        new_string = json.dumps(namespace_data)        new_hash = hashlib.md5(new_string.encode("utf-8")).hexdigest()        if self._hash.get(namespace) == new_hash:            pass        else:            file_path = Path(self._cache_file_path) / f"{self.app_id}_configuration_{namespace}.txt"            file_path.write_text(new_string)            self._hash[namespace] = new_hash    # get the configuration from the local file    def _get_local_cache(self, namespace="application"):        cache_file_path = os.path.join(self._cache_file_path, f"{self.app_id}_configuration_{namespace}.txt")        if os.path.isfile(cache_file_path):            with open(cache_file_path) as f:                result = json.loads(f.readline())            return result        return {}    def _long_poll(self):        notifications = []        for key in self._cache:            namespace_data = self._cache[key]            notification_id = -1            if NOTIFICATION_ID in namespace_data:                notification_id = self._cache[key][NOTIFICATION_ID]            notifications.append({NAMESPACE_NAME: key, NOTIFICATION_ID: notification_id})        try:            # if the length is 0 it is returned directly            if len(notifications) == 0:                return            url = "{}/notifications/v2".format(self.config_url)            params = {                "appId": self.app_id,                "cluster": self.cluster,                "notifications": json.dumps(notifications, ensure_ascii=False),            }            param_str = url_encode_wrapper(params)            url = url + "?" + param_str            code, body = http_request(url, self._pull_timeout, headers=self._sign_headers(url))            http_code = code            if http_code == 304:                logger.debug("No change, loop...")                return            if http_code == 200:                if not body:                    logger.error(f"_long_poll load configs failed,body is {body}")                    return                data = json.loads(body)                for entry in data:                    namespace = entry[NAMESPACE_NAME]                    n_id = entry[NOTIFICATION_ID]                    logger.info("%s has changes: notificationId=%d", namespace, n_id)                    self._get_net_and_set_local(namespace, n_id, call_change=True)                    return            else:                logger.warning("Sleep...")        except Exception as e:            logger.warning(str(e))    def _get_net_and_set_local(self, namespace, n_id, call_change=False):        namespace_data = self.get_json_from_net(namespace)        if not namespace_data:            return        namespace_data[NOTIFICATION_ID] = n_id        old_namespace = self._cache.get(namespace)        self._update_cache_and_file(namespace_data, namespace)        if self._change_listener is not None and call_change and old_namespace:            old_kv = old_namespace.get(CONFIGURATIONS)            new_kv = namespace_data.get(CONFIGURATIONS)            self._call_listener(namespace, old_kv, new_kv)    def _listener(self):        logger.info("start long_poll")        while not self._stopping:            self._long_poll()            time.sleep(self._cycle_time)        logger.info("stopped, long_poll")    # add the need for endorsement to the header    def _sign_headers(self, url: str) -> Mapping[str, str]:        headers: dict[str, str] = {}        if self.secret == "":            return headers        uri = url[len(self.config_url) : len(url)]        time_unix_now = str(int(round(time.time() * 1000)))        headers["Authorization"] = "Apollo " + self.app_id + ":" + signature(time_unix_now, uri, self.secret)        headers["Timestamp"] = time_unix_now        return headers    def _heart_beat(self):        while not self._stopping:            for namespace in self._notification_map:                self._do_heart_beat(namespace)            time.sleep(60 * 10)  # 10分钟    def _do_heart_beat(self, namespace):        url = "{}/configs/{}/{}/{}?ip={}".format(self.config_url, self.app_id, self.cluster, namespace, self.ip)        try:            code, body = http_request(url, timeout=3, headers=self._sign_headers(url))            if code == 200:                if not body:                    logger.error(f"_do_heart_beat load configs failed,body is {body}")                    return None                data = json.loads(body)                if self.last_release_key == data["releaseKey"]:                    return None                self.last_release_key = data["releaseKey"]                data = data["configurations"]                self._update_cache_and_file(data, namespace)            else:                return None        except Exception:            logger.exception("an error occurred in _do_heart_beat")            return None    def get_all_dicts(self, namespace):        namespace_data = self._cache.get(namespace)        if namespace_data is None:            net_namespace_data = self.get_json_from_net(namespace)            if not net_namespace_data:                return namespace_data            namespace_data = net_namespace_data.get(CONFIGURATIONS)            if namespace_data:                self._update_cache_and_file(namespace_data, namespace)        return namespace_data
 |