local_storage.py 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. import os
  2. import shutil
  3. from collections.abc import Generator
  4. from flask import Flask
  5. from extensions.storage.base_storage import BaseStorage
  6. class LocalStorage(BaseStorage):
  7. """Implementation for local storage.
  8. """
  9. def __init__(self, app: Flask):
  10. super().__init__(app)
  11. folder = self.app.config.get('STORAGE_LOCAL_PATH')
  12. if not os.path.isabs(folder):
  13. folder = os.path.join(app.root_path, folder)
  14. self.folder = folder
  15. def save(self, filename, data):
  16. if not self.folder or self.folder.endswith('/'):
  17. filename = self.folder + filename
  18. else:
  19. filename = self.folder + '/' + filename
  20. folder = os.path.dirname(filename)
  21. os.makedirs(folder, exist_ok=True)
  22. with open(os.path.join(os.getcwd(), filename), "wb") as f:
  23. f.write(data)
  24. def load_once(self, filename: str) -> bytes:
  25. if not self.folder or self.folder.endswith('/'):
  26. filename = self.folder + filename
  27. else:
  28. filename = self.folder + '/' + filename
  29. if not os.path.exists(filename):
  30. raise FileNotFoundError("File not found")
  31. with open(filename, "rb") as f:
  32. data = f.read()
  33. return data
  34. def load_stream(self, filename: str) -> Generator:
  35. def generate(filename: str = filename) -> Generator:
  36. if not self.folder or self.folder.endswith('/'):
  37. filename = self.folder + filename
  38. else:
  39. filename = self.folder + '/' + filename
  40. if not os.path.exists(filename):
  41. raise FileNotFoundError("File not found")
  42. with open(filename, "rb") as f:
  43. while chunk := f.read(4096): # Read in chunks of 4KB
  44. yield chunk
  45. return generate()
  46. def download(self, filename, target_filepath):
  47. if not self.folder or self.folder.endswith('/'):
  48. filename = self.folder + filename
  49. else:
  50. filename = self.folder + '/' + filename
  51. if not os.path.exists(filename):
  52. raise FileNotFoundError("File not found")
  53. shutil.copyfile(filename, target_filepath)
  54. def exists(self, filename):
  55. if not self.folder or self.folder.endswith('/'):
  56. filename = self.folder + filename
  57. else:
  58. filename = self.folder + '/' + filename
  59. return os.path.exists(filename)
  60. def delete(self, filename):
  61. if not self.folder or self.folder.endswith('/'):
  62. filename = self.folder + filename
  63. else:
  64. filename = self.folder + '/' + filename
  65. if os.path.exists(filename):
  66. os.remove(filename)