volcengine_storage.py 1.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849
  1. from collections.abc import Generator
  2. import tos
  3. from flask import Flask
  4. from extensions.storage.base_storage import BaseStorage
  5. class VolcengineStorage(BaseStorage):
  6. """Implementation for Volcengine TOS storage."""
  7. def __init__(self, app: Flask):
  8. super().__init__(app)
  9. app_config = self.app.config
  10. self.bucket_name = app_config.get("VOLCENGINE_TOS_BUCKET_NAME")
  11. self.client = tos.TosClientV2(
  12. ak=app_config.get("VOLCENGINE_TOS_ACCESS_KEY"),
  13. sk=app_config.get("VOLCENGINE_TOS_SECRET_KEY"),
  14. endpoint=app_config.get("VOLCENGINE_TOS_ENDPOINT"),
  15. region=app_config.get("VOLCENGINE_TOS_REGION"),
  16. )
  17. def save(self, filename, data):
  18. self.client.put_object(bucket=self.bucket_name, key=filename, content=data)
  19. def load_once(self, filename: str) -> bytes:
  20. data = self.client.get_object(bucket=self.bucket_name, key=filename).read()
  21. return data
  22. def load_stream(self, filename: str) -> Generator:
  23. def generate(filename: str = filename) -> Generator:
  24. response = self.client.get_object(bucket=self.bucket_name, key=filename)
  25. while chunk := response.read(4096):
  26. yield chunk
  27. return generate()
  28. def download(self, filename, target_filepath):
  29. self.client.get_object_to_file(bucket=self.bucket_name, key=filename, file_path=target_filepath)
  30. def exists(self, filename):
  31. res = self.client.head_object(bucket=self.bucket_name, key=filename)
  32. if res.status_code != 200:
  33. return False
  34. return True
  35. def delete(self, filename):
  36. self.client.delete_object(bucket=self.bucket_name, key=filename)