helpers.py 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. import base64
  2. import hashlib
  3. import hmac
  4. import os
  5. import time
  6. from configs import dify_config
  7. def get_signed_file_url(upload_file_id: str) -> str:
  8. url = f"{dify_config.FILES_URL}/files/{upload_file_id}/file-preview"
  9. timestamp = str(int(time.time()))
  10. nonce = os.urandom(16).hex()
  11. key = dify_config.SECRET_KEY.encode()
  12. msg = f"file-preview|{upload_file_id}|{timestamp}|{nonce}"
  13. sign = hmac.new(key, msg.encode(), hashlib.sha256).digest()
  14. encoded_sign = base64.urlsafe_b64encode(sign).decode()
  15. return f"{url}?timestamp={timestamp}&nonce={nonce}&sign={encoded_sign}"
  16. def get_signed_file_url_for_plugin(filename: str, mimetype: str, tenant_id: str, user_id: str) -> str:
  17. url = f"{dify_config.FILES_URL}/files/upload/for-plugin"
  18. if user_id is None:
  19. user_id = "DEFAULT-USER"
  20. timestamp = str(int(time.time()))
  21. nonce = os.urandom(16).hex()
  22. key = dify_config.SECRET_KEY.encode()
  23. msg = f"upload|{filename}|{mimetype}|{tenant_id}|{user_id}|{timestamp}|{nonce}"
  24. sign = hmac.new(key, msg.encode(), hashlib.sha256).digest()
  25. encoded_sign = base64.urlsafe_b64encode(sign).decode()
  26. return f"{url}?timestamp={timestamp}&nonce={nonce}&sign={encoded_sign}&user_id={user_id}&tenant_id={tenant_id}"
  27. def verify_plugin_file_signature(
  28. *, filename: str, mimetype: str, tenant_id: str, user_id: str | None, timestamp: str, nonce: str, sign: str
  29. ) -> bool:
  30. if user_id is None:
  31. user_id = "DEFAULT-USER"
  32. data_to_sign = f"upload|{filename}|{mimetype}|{tenant_id}|{user_id}|{timestamp}|{nonce}"
  33. secret_key = dify_config.SECRET_KEY.encode()
  34. recalculated_sign = hmac.new(secret_key, data_to_sign.encode(), hashlib.sha256).digest()
  35. recalculated_encoded_sign = base64.urlsafe_b64encode(recalculated_sign).decode()
  36. # verify signature
  37. if sign != recalculated_encoded_sign:
  38. return False
  39. current_time = int(time.time())
  40. return current_time - int(timestamp) <= dify_config.FILES_ACCESS_TIMEOUT
  41. def verify_image_signature(*, upload_file_id: str, timestamp: str, nonce: str, sign: str) -> bool:
  42. data_to_sign = f"image-preview|{upload_file_id}|{timestamp}|{nonce}"
  43. secret_key = dify_config.SECRET_KEY.encode()
  44. recalculated_sign = hmac.new(secret_key, data_to_sign.encode(), hashlib.sha256).digest()
  45. recalculated_encoded_sign = base64.urlsafe_b64encode(recalculated_sign).decode()
  46. # verify signature
  47. if sign != recalculated_encoded_sign:
  48. return False
  49. current_time = int(time.time())
  50. return current_time - int(timestamp) <= dify_config.FILES_ACCESS_TIMEOUT
  51. def verify_file_signature(*, upload_file_id: str, timestamp: str, nonce: str, sign: str) -> bool:
  52. data_to_sign = f"file-preview|{upload_file_id}|{timestamp}|{nonce}"
  53. secret_key = dify_config.SECRET_KEY.encode()
  54. recalculated_sign = hmac.new(secret_key, data_to_sign.encode(), hashlib.sha256).digest()
  55. recalculated_encoded_sign = base64.urlsafe_b64encode(recalculated_sign).decode()
  56. # verify signature
  57. if sign != recalculated_encoded_sign:
  58. return False
  59. current_time = int(time.time())
  60. return current_time - int(timestamp) <= dify_config.FILES_ACCESS_TIMEOUT