ext_storage.py 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. import os
  2. import shutil
  3. from collections.abc import Generator
  4. from contextlib import closing
  5. from datetime import datetime, timedelta, timezone
  6. from typing import Union
  7. import boto3
  8. import oss2 as aliyun_s3
  9. from azure.storage.blob import AccountSasPermissions, BlobServiceClient, ResourceTypes, generate_account_sas
  10. from botocore.client import Config
  11. from botocore.exceptions import ClientError
  12. from flask import Flask
  13. class Storage:
  14. def __init__(self):
  15. self.storage_type = None
  16. self.bucket_name = None
  17. self.client = None
  18. self.folder = None
  19. def init_app(self, app: Flask):
  20. self.storage_type = app.config.get('STORAGE_TYPE')
  21. if self.storage_type == 's3':
  22. self.bucket_name = app.config.get('S3_BUCKET_NAME')
  23. self.client = boto3.client(
  24. 's3',
  25. aws_secret_access_key=app.config.get('S3_SECRET_KEY'),
  26. aws_access_key_id=app.config.get('S3_ACCESS_KEY'),
  27. endpoint_url=app.config.get('S3_ENDPOINT'),
  28. region_name=app.config.get('S3_REGION'),
  29. config=Config(s3={'addressing_style': app.config.get('S3_ADDRESS_STYLE')})
  30. )
  31. elif self.storage_type == 'azure-blob':
  32. self.bucket_name = app.config.get('AZURE_BLOB_CONTAINER_NAME')
  33. sas_token = generate_account_sas(
  34. account_name=app.config.get('AZURE_BLOB_ACCOUNT_NAME'),
  35. account_key=app.config.get('AZURE_BLOB_ACCOUNT_KEY'),
  36. resource_types=ResourceTypes(service=True, container=True, object=True),
  37. permission=AccountSasPermissions(read=True, write=True, delete=True, list=True, add=True, create=True),
  38. expiry=datetime.now(timezone.utc).replace(tzinfo=None) + timedelta(hours=1)
  39. )
  40. self.client = BlobServiceClient(account_url=app.config.get('AZURE_BLOB_ACCOUNT_URL'),
  41. credential=sas_token)
  42. elif self.storage_type == 'aliyun-oss':
  43. self.bucket_name = app.config.get('ALIYUN_OSS_BUCKET_NAME')
  44. self.client = aliyun_s3.Bucket(
  45. aliyun_s3.Auth(app.config.get('ALIYUN_OSS_ACCESS_KEY'), app.config.get('ALIYUN_OSS_SECRET_KEY')),
  46. app.config.get('ALIYUN_OSS_ENDPOINT'),
  47. self.bucket_name,
  48. connect_timeout=30
  49. )
  50. else:
  51. self.folder = app.config.get('STORAGE_LOCAL_PATH')
  52. if not os.path.isabs(self.folder):
  53. self.folder = os.path.join(app.root_path, self.folder)
  54. def save(self, filename, data):
  55. if self.storage_type == 's3':
  56. self.client.put_object(Bucket=self.bucket_name, Key=filename, Body=data)
  57. elif self.storage_type == 'azure-blob':
  58. blob_container = self.client.get_container_client(container=self.bucket_name)
  59. blob_container.upload_blob(filename, data)
  60. elif self.storage_type == 'aliyun-oss':
  61. self.client.put_object(filename, data)
  62. else:
  63. if not self.folder or self.folder.endswith('/'):
  64. filename = self.folder + filename
  65. else:
  66. filename = self.folder + '/' + filename
  67. folder = os.path.dirname(filename)
  68. os.makedirs(folder, exist_ok=True)
  69. with open(os.path.join(os.getcwd(), filename), "wb") as f:
  70. f.write(data)
  71. def load(self, filename: str, stream: bool = False) -> Union[bytes, Generator]:
  72. if stream:
  73. return self.load_stream(filename)
  74. else:
  75. return self.load_once(filename)
  76. def load_once(self, filename: str) -> bytes:
  77. if self.storage_type == 's3':
  78. try:
  79. with closing(self.client) as client:
  80. data = client.get_object(Bucket=self.bucket_name, Key=filename)['Body'].read()
  81. except ClientError as ex:
  82. if ex.response['Error']['Code'] == 'NoSuchKey':
  83. raise FileNotFoundError("File not found")
  84. else:
  85. raise
  86. elif self.storage_type == 'azure-blob':
  87. blob = self.client.get_container_client(container=self.bucket_name)
  88. blob = blob.get_blob_client(blob=filename)
  89. data = blob.download_blob().readall()
  90. elif self.storage_type == 'aliyun-oss':
  91. with closing(self.client.get_object(filename)) as obj:
  92. data = obj.read()
  93. else:
  94. if not self.folder or self.folder.endswith('/'):
  95. filename = self.folder + filename
  96. else:
  97. filename = self.folder + '/' + filename
  98. if not os.path.exists(filename):
  99. raise FileNotFoundError("File not found")
  100. with open(filename, "rb") as f:
  101. data = f.read()
  102. return data
  103. def load_stream(self, filename: str) -> Generator:
  104. def generate(filename: str = filename) -> Generator:
  105. if self.storage_type == 's3':
  106. try:
  107. with closing(self.client) as client:
  108. response = client.get_object(Bucket=self.bucket_name, Key=filename)
  109. for chunk in response['Body'].iter_chunks():
  110. yield chunk
  111. except ClientError as ex:
  112. if ex.response['Error']['Code'] == 'NoSuchKey':
  113. raise FileNotFoundError("File not found")
  114. else:
  115. raise
  116. elif self.storage_type == 'azure-blob':
  117. blob = self.client.get_blob_client(container=self.bucket_name, blob=filename)
  118. with closing(blob.download_blob()) as blob_stream:
  119. while chunk := blob_stream.readall(4096):
  120. yield chunk
  121. elif self.storage_type == 'aliyun-oss':
  122. with closing(self.client.get_object(filename)) as obj:
  123. while chunk := obj.read(4096):
  124. yield chunk
  125. else:
  126. if not self.folder or self.folder.endswith('/'):
  127. filename = self.folder + filename
  128. else:
  129. filename = self.folder + '/' + filename
  130. if not os.path.exists(filename):
  131. raise FileNotFoundError("File not found")
  132. with open(filename, "rb") as f:
  133. while chunk := f.read(4096): # Read in chunks of 4KB
  134. yield chunk
  135. return generate()
  136. def download(self, filename, target_filepath):
  137. if self.storage_type == 's3':
  138. with closing(self.client) as client:
  139. client.download_file(self.bucket_name, filename, target_filepath)
  140. elif self.storage_type == 'azure-blob':
  141. blob = self.client.get_blob_client(container=self.bucket_name, blob=filename)
  142. with open(target_filepath, "wb") as my_blob:
  143. blob_data = blob.download_blob()
  144. blob_data.readinto(my_blob)
  145. elif self.storage_type == 'aliyun-oss':
  146. self.client.get_object_to_file(filename, target_filepath)
  147. else:
  148. if not self.folder or self.folder.endswith('/'):
  149. filename = self.folder + filename
  150. else:
  151. filename = self.folder + '/' + filename
  152. if not os.path.exists(filename):
  153. raise FileNotFoundError("File not found")
  154. shutil.copyfile(filename, target_filepath)
  155. def exists(self, filename):
  156. if self.storage_type == 's3':
  157. with closing(self.client) as client:
  158. try:
  159. client.head_object(Bucket=self.bucket_name, Key=filename)
  160. return True
  161. except:
  162. return False
  163. elif self.storage_type == 'azure-blob':
  164. blob = self.client.get_blob_client(container=self.bucket_name, blob=filename)
  165. return blob.exists()
  166. elif self.storage_type == 'aliyun-oss':
  167. return self.client.object_exists(filename)
  168. else:
  169. if not self.folder or self.folder.endswith('/'):
  170. filename = self.folder + filename
  171. else:
  172. filename = self.folder + '/' + filename
  173. return os.path.exists(filename)
  174. def delete(self, filename):
  175. if self.storage_type == 's3':
  176. self.client.delete_object(Bucket=self.bucket_name, Key=filename)
  177. elif self.storage_type == 'azure-blob':
  178. blob_container = self.client.get_container_client(container=self.bucket_name)
  179. blob_container.delete_blob(filename)
  180. elif self.storage_type == 'aliyun-oss':
  181. self.client.delete_object(filename)
  182. else:
  183. if not self.folder or self.folder.endswith('/'):
  184. filename = self.folder + filename
  185. else:
  186. filename = self.folder + '/' + filename
  187. if os.path.exists(filename):
  188. os.remove(filename)
  189. storage = Storage()
  190. def init_app(app: Flask):
  191. storage.init_app(app)