| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061 | import iofrom collections.abc import Generatorfrom pathlib import Pathfrom flask import Flaskfrom supabase import Clientfrom extensions.storage.base_storage import BaseStorageclass SupabaseStorage(BaseStorage):    """Implementation for supabase obs storage."""    def __init__(self, app: Flask):        super().__init__(app)        app_config = self.app.config        self.bucket_name = app_config.get("SUPABASE_BUCKET_NAME")        self.client = Client(            supabase_url=app_config.get("SUPABASE_URL"), supabase_key=app_config.get("SUPABASE_API_KEY")        )        self.create_bucket(            id=app_config.get("SUPABASE_BUCKET_NAME"), bucket_name=app_config.get("SUPABASE_BUCKET_NAME")        )    def create_bucket(self, id, bucket_name):        if not self.bucket_exists():            self.client.storage.create_bucket(id=id, name=bucket_name)    def save(self, filename, data):        self.client.storage.from_(self.bucket_name).upload(filename, data)    def load_once(self, filename: str) -> bytes:        content = self.client.storage.from_(self.bucket_name).download(filename)        return content    def load_stream(self, filename: str) -> Generator:        def generate(filename: str = filename) -> Generator:            result = self.client.storage.from_(self.bucket_name).download(filename)            byte_stream = io.BytesIO(result)            while chunk := byte_stream.read(4096):  # Read in chunks of 4KB                yield chunk        return generate()    def download(self, filename, target_filepath):        result = self.client.storage.from_(self.bucket_name).download(filename)        Path(result).write_bytes(result)    def exists(self, filename):        result = self.client.storage.from_(self.bucket_name).list(filename)        if result.count() > 0:            return True        return False    def delete(self, filename):        self.client.storage.from_(self.bucket_name).remove(filename)    def bucket_exists(self):        buckets = self.client.storage.list_buckets()        return any(bucket.name == self.bucket_name for bucket in buckets)
 |