wrapper.py 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. from contextvars import ContextVar
  2. from typing import Generic, TypeVar
  3. T = TypeVar("T")
  4. class HiddenValue:
  5. pass
  6. _default = HiddenValue()
  7. class RecyclableContextVar(Generic[T]):
  8. """
  9. RecyclableContextVar is a wrapper around ContextVar
  10. It's safe to use in gunicorn with thread recycling, but features like `reset` are not available for now
  11. NOTE: you need to call `increment_thread_recycles` before requests
  12. """
  13. _thread_recycles: ContextVar[int] = ContextVar("thread_recycles")
  14. @classmethod
  15. def increment_thread_recycles(cls):
  16. try:
  17. recycles = cls._thread_recycles.get()
  18. cls._thread_recycles.set(recycles + 1)
  19. except LookupError:
  20. cls._thread_recycles.set(0)
  21. def __init__(self, context_var: ContextVar[T]):
  22. self._context_var = context_var
  23. self._updates = ContextVar[int](context_var.name + "_updates", default=0)
  24. def get(self, default: T | HiddenValue = _default) -> T:
  25. thread_recycles = self._thread_recycles.get(0)
  26. self_updates = self._updates.get()
  27. if thread_recycles > self_updates:
  28. self._updates.set(thread_recycles)
  29. # check if thread is recycled and should be updated
  30. if thread_recycles < self_updates:
  31. return self._context_var.get()
  32. else:
  33. # thread_recycles >= self_updates, means current context is invalid
  34. if isinstance(default, HiddenValue) or default is _default:
  35. raise LookupError
  36. else:
  37. return default
  38. def set(self, value: T):
  39. # it leads to a situation that self.updates is less than cls.thread_recycles if `set` was never called before
  40. # increase it manually
  41. thread_recycles = self._thread_recycles.get(0)
  42. self_updates = self._updates.get()
  43. if thread_recycles > self_updates:
  44. self._updates.set(thread_recycles)
  45. if self._updates.get() == self._thread_recycles.get(0):
  46. # after increment,
  47. self._updates.set(self._updates.get() + 1)
  48. # set the context
  49. self._context_var.set(value)