opendal_storage.py 3.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. import logging
  2. import os
  3. from collections.abc import Generator
  4. from pathlib import Path
  5. import opendal
  6. from dotenv import dotenv_values
  7. from extensions.storage.base_storage import BaseStorage
  8. logger = logging.getLogger(__name__)
  9. def _get_opendal_kwargs(*, scheme: str, env_file_path: str = ".env", prefix: str = "OPENDAL_"):
  10. kwargs = {}
  11. config_prefix = prefix + scheme.upper() + "_"
  12. for key, value in os.environ.items():
  13. if key.startswith(config_prefix):
  14. kwargs[key[len(config_prefix) :].lower()] = value
  15. file_env_vars = dotenv_values(env_file_path)
  16. for key, value in file_env_vars.items():
  17. if key.startswith(config_prefix) and key[len(config_prefix) :].lower() not in kwargs and value:
  18. kwargs[key[len(config_prefix) :].lower()] = value
  19. return kwargs
  20. class OpenDALStorage(BaseStorage):
  21. def __init__(self, scheme: str, **kwargs):
  22. kwargs = kwargs or _get_opendal_kwargs(scheme=scheme)
  23. if scheme == "fs":
  24. root = kwargs.get("root", "storage")
  25. Path(root).mkdir(parents=True, exist_ok=True)
  26. # self.op = opendal.Operator(scheme=scheme, **kwargs)
  27. self.op = opendal.Operator(scheme=scheme, **kwargs)
  28. logger.debug(f"opendal operator created with scheme {scheme}")
  29. retry_layer = opendal.layers.RetryLayer(max_times=3, factor=2.0, jitter=True)
  30. self.op = self.op.layer(retry_layer)
  31. logger.debug("added retry layer to opendal operator")
  32. def save(self, filename: str, data: bytes) -> None:
  33. self.op.write(path=filename, bs=data)
  34. logger.debug(f"file {filename} saved")
  35. def load_once(self, filename: str) -> bytes:
  36. if not self.exists(filename):
  37. raise FileNotFoundError("File not found")
  38. content = self.op.read(path=filename)
  39. logger.debug(f"file {filename} loaded")
  40. return content
  41. def load_stream(self, filename: str) -> Generator:
  42. if not self.exists(filename):
  43. raise FileNotFoundError("File not found")
  44. batch_size = 4096
  45. file = self.op.open(path=filename, mode="rb")
  46. while chunk := file.read(batch_size):
  47. yield chunk
  48. logger.debug(f"file {filename} loaded as stream")
  49. def download(self, filename: str, target_filepath: str):
  50. if not self.exists(filename):
  51. raise FileNotFoundError("File not found")
  52. with Path(target_filepath).open("wb") as f:
  53. f.write(self.op.read(path=filename))
  54. logger.debug(f"file {filename} downloaded to {target_filepath}")
  55. def exists(self, filename: str) -> bool:
  56. # FIXME this is a workaround for opendal python-binding do not have a exists method and no better
  57. # error handler here when opendal python-binding has a exists method, we should use it
  58. # more https://github.com/apache/opendal/blob/main/bindings/python/src/operator.rs
  59. try:
  60. res = self.op.stat(path=filename).mode.is_file()
  61. logger.debug(f"file {filename} checked")
  62. return res
  63. except Exception:
  64. return False
  65. def delete(self, filename: str):
  66. if self.exists(filename):
  67. self.op.delete(path=filename)
  68. logger.debug(f"file {filename} deleted")
  69. return
  70. logger.debug(f"file {filename} not found, skip delete")