google_storage.py 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
  1. import base64
  2. import io
  3. import json
  4. from collections.abc import Generator
  5. from contextlib import closing
  6. from flask import Flask
  7. from google.cloud import storage as GoogleCloudStorage
  8. from extensions.storage.base_storage import BaseStorage
  9. class GoogleStorage(BaseStorage):
  10. """Implementation for google storage.
  11. """
  12. def __init__(self, app: Flask):
  13. super().__init__(app)
  14. app_config = self.app.config
  15. self.bucket_name = app_config.get('GOOGLE_STORAGE_BUCKET_NAME')
  16. service_account_json_str = app_config.get('GOOGLE_STORAGE_SERVICE_ACCOUNT_JSON_BASE64')
  17. # if service_account_json_str is empty, use Application Default Credentials
  18. if service_account_json_str:
  19. service_account_json = base64.b64decode(service_account_json_str).decode('utf-8')
  20. # convert str to object
  21. service_account_obj = json.loads(service_account_json)
  22. self.client = GoogleCloudStorage.Client.from_service_account_info(service_account_obj)
  23. else:
  24. self.client = GoogleCloudStorage.Client()
  25. def save(self, filename, data):
  26. bucket = self.client.get_bucket(self.bucket_name)
  27. blob = bucket.blob(filename)
  28. with io.BytesIO(data) as stream:
  29. blob.upload_from_file(stream)
  30. def load_once(self, filename: str) -> bytes:
  31. bucket = self.client.get_bucket(self.bucket_name)
  32. blob = bucket.get_blob(filename)
  33. data = blob.download_as_bytes()
  34. return data
  35. def load_stream(self, filename: str) -> Generator:
  36. def generate(filename: str = filename) -> Generator:
  37. bucket = self.client.get_bucket(self.bucket_name)
  38. blob = bucket.get_blob(filename)
  39. with closing(blob.open(mode='rb')) as blob_stream:
  40. while chunk := blob_stream.read(4096):
  41. yield chunk
  42. return generate()
  43. def download(self, filename, target_filepath):
  44. bucket = self.client.get_bucket(self.bucket_name)
  45. blob = bucket.get_blob(filename)
  46. blob.download_to_filename(target_filepath)
  47. def exists(self, filename):
  48. bucket = self.client.get_bucket(self.bucket_name)
  49. blob = bucket.blob(filename)
  50. return blob.exists()
  51. def delete(self, filename):
  52. bucket = self.client.get_bucket(self.bucket_name)
  53. bucket.delete_blob(filename)