local_storage.py 2.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. import os
  2. import shutil
  3. from collections.abc import Generator
  4. from pathlib import Path
  5. from flask import Flask
  6. from extensions.storage.base_storage import BaseStorage
  7. class LocalStorage(BaseStorage):
  8. """Implementation for local storage."""
  9. def __init__(self, app: Flask):
  10. super().__init__(app)
  11. folder = self.app.config.get("STORAGE_LOCAL_PATH")
  12. if not os.path.isabs(folder):
  13. folder = os.path.join(app.root_path, folder)
  14. self.folder = folder
  15. def save(self, filename, data):
  16. if not self.folder or self.folder.endswith("/"):
  17. filename = self.folder + filename
  18. else:
  19. filename = self.folder + "/" + filename
  20. folder = os.path.dirname(filename)
  21. os.makedirs(folder, exist_ok=True)
  22. Path(os.path.join(os.getcwd(), filename)).write_bytes(data)
  23. def load_once(self, filename: str) -> bytes:
  24. if not self.folder or self.folder.endswith("/"):
  25. filename = self.folder + filename
  26. else:
  27. filename = self.folder + "/" + filename
  28. if not os.path.exists(filename):
  29. raise FileNotFoundError("File not found")
  30. data = Path(filename).read_bytes()
  31. return data
  32. def load_stream(self, filename: str) -> Generator:
  33. def generate(filename: str = filename) -> Generator:
  34. if not self.folder or self.folder.endswith("/"):
  35. filename = self.folder + filename
  36. else:
  37. filename = self.folder + "/" + filename
  38. if not os.path.exists(filename):
  39. raise FileNotFoundError("File not found")
  40. with open(filename, "rb") as f:
  41. while chunk := f.read(4096): # Read in chunks of 4KB
  42. yield chunk
  43. return generate()
  44. def download(self, filename, target_filepath):
  45. if not self.folder or self.folder.endswith("/"):
  46. filename = self.folder + filename
  47. else:
  48. filename = self.folder + "/" + filename
  49. if not os.path.exists(filename):
  50. raise FileNotFoundError("File not found")
  51. shutil.copyfile(filename, target_filepath)
  52. def exists(self, filename):
  53. if not self.folder or self.folder.endswith("/"):
  54. filename = self.folder + filename
  55. else:
  56. filename = self.folder + "/" + filename
  57. return os.path.exists(filename)
  58. def delete(self, filename):
  59. if not self.folder or self.folder.endswith("/"):
  60. filename = self.folder + filename
  61. else:
  62. filename = self.folder + "/" + filename
  63. if os.path.exists(filename):
  64. os.remove(filename)