remote_files.py 2.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. import urllib.parse
  2. from typing import cast
  3. import httpx
  4. from flask_login import current_user
  5. from flask_restful import Resource, marshal_with, reqparse
  6. import services
  7. from controllers.common import helpers
  8. from core.file import helpers as file_helpers
  9. from core.helper import ssrf_proxy
  10. from fields.file_fields import file_fields_with_signed_url, remote_file_info_fields
  11. from models.account import Account
  12. from services.file_service import FileService
  13. from .error import (
  14. FileTooLargeError,
  15. UnsupportedFileTypeError,
  16. )
  17. class RemoteFileInfoApi(Resource):
  18. @marshal_with(remote_file_info_fields)
  19. def get(self, url):
  20. decoded_url = urllib.parse.unquote(url)
  21. resp = ssrf_proxy.head(decoded_url)
  22. if resp.status_code != httpx.codes.OK:
  23. # failed back to get method
  24. resp = ssrf_proxy.get(decoded_url, timeout=3)
  25. resp.raise_for_status()
  26. return {
  27. "file_type": resp.headers.get("Content-Type", "application/octet-stream"),
  28. "file_length": int(resp.headers.get("Content-Length", 0)),
  29. }
  30. class RemoteFileUploadApi(Resource):
  31. @marshal_with(file_fields_with_signed_url)
  32. def post(self):
  33. parser = reqparse.RequestParser()
  34. parser.add_argument("url", type=str, required=True, help="URL is required")
  35. args = parser.parse_args()
  36. url = args["url"]
  37. resp = ssrf_proxy.head(url=url)
  38. if resp.status_code != httpx.codes.OK:
  39. resp = ssrf_proxy.get(url=url, timeout=3)
  40. resp.raise_for_status()
  41. file_info = helpers.guess_file_info_from_response(resp)
  42. if not FileService.is_file_size_within_limit(extension=file_info.extension, file_size=file_info.size):
  43. raise FileTooLargeError
  44. content = resp.content if resp.request.method == "GET" else ssrf_proxy.get(url).content
  45. try:
  46. user = cast(Account, current_user)
  47. upload_file = FileService.upload_file(
  48. filename=file_info.filename,
  49. content=content,
  50. mimetype=file_info.mimetype,
  51. user=user,
  52. source_url=url,
  53. )
  54. except services.errors.file.FileTooLargeError as file_too_large_error:
  55. raise FileTooLargeError(file_too_large_error.description)
  56. except services.errors.file.UnsupportedFileTypeError:
  57. raise UnsupportedFileTypeError()
  58. return {
  59. "id": upload_file.id,
  60. "name": upload_file.name,
  61. "size": upload_file.size,
  62. "extension": upload_file.extension,
  63. "url": file_helpers.get_signed_file_url(upload_file_id=upload_file.id),
  64. "mime_type": upload_file.mime_type,
  65. "created_by": upload_file.created_by,
  66. "created_at": upload_file.created_at,
  67. }, 201