| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899 | from typing import Any, Unionimport redisfrom redis.cluster import ClusterNode, RedisClusterfrom redis.connection import Connection, SSLConnectionfrom redis.sentinel import Sentinelfrom configs import dify_configfrom dify_app import DifyAppclass RedisClientWrapper:    """    A wrapper class for the Redis client that addresses the issue where the global    `redis_client` variable cannot be updated when a new Redis instance is returned    by Sentinel.    This class allows for deferred initialization of the Redis client, enabling the    client to be re-initialized with a new instance when necessary. This is particularly    useful in scenarios where the Redis instance may change dynamically, such as during    a failover in a Sentinel-managed Redis setup.    Attributes:        _client (redis.Redis): The actual Redis client instance. It remains None until                               initialized with the `initialize` method.    Methods:        initialize(client): Initializes the Redis client if it hasn't been initialized already.        __getattr__(item): Delegates attribute access to the Redis client, raising an error                           if the client is not initialized.    """    def __init__(self):        self._client = None    def initialize(self, client):        if self._client is None:            self._client = client    def __getattr__(self, item):        if self._client is None:            raise RuntimeError("Redis client is not initialized. Call init_app first.")        return getattr(self._client, item)redis_client = RedisClientWrapper()def init_app(app: DifyApp):    global redis_client    connection_class: type[Union[Connection, SSLConnection]] = Connection    if dify_config.REDIS_USE_SSL:        connection_class = SSLConnection    redis_params: dict[str, Any] = {        "username": dify_config.REDIS_USERNAME,        "password": dify_config.REDIS_PASSWORD or None,  # Temporary fix for empty password        "db": dify_config.REDIS_DB,        "encoding": "utf-8",        "encoding_errors": "strict",        "decode_responses": False,    }    if dify_config.REDIS_USE_SENTINEL:        assert dify_config.REDIS_SENTINELS is not None, "REDIS_SENTINELS must be set when REDIS_USE_SENTINEL is True"        sentinel_hosts = [            (node.split(":")[0], int(node.split(":")[1])) for node in dify_config.REDIS_SENTINELS.split(",")        ]        sentinel = Sentinel(            sentinel_hosts,            sentinel_kwargs={                "socket_timeout": dify_config.REDIS_SENTINEL_SOCKET_TIMEOUT,                "username": dify_config.REDIS_SENTINEL_USERNAME,                "password": dify_config.REDIS_SENTINEL_PASSWORD,            },        )        master = sentinel.master_for(dify_config.REDIS_SENTINEL_SERVICE_NAME, **redis_params)        redis_client.initialize(master)    elif dify_config.REDIS_USE_CLUSTERS:        assert dify_config.REDIS_CLUSTERS is not None, "REDIS_CLUSTERS must be set when REDIS_USE_CLUSTERS is True"        nodes = [            ClusterNode(host=node.split(":")[0], port=int(node.split(":")[1]))            for node in dify_config.REDIS_CLUSTERS.split(",")        ]        # FIXME: mypy error here, try to figure out how to fix it        redis_client.initialize(RedisCluster(startup_nodes=nodes, password=dify_config.REDIS_CLUSTERS_PASSWORD))  # type: ignore    else:        redis_params.update(            {                "host": dify_config.REDIS_HOST,                "port": dify_config.REDIS_PORT,                "connection_class": connection_class,            }        )        pool = redis.ConnectionPool(**redis_params)        redis_client.initialize(redis.Redis(connection_pool=pool))    app.extensions["redis"] = redis_client
 |