wraps.py 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
  1. from base64 import b64encode
  2. from functools import wraps
  3. from hashlib import sha1
  4. from hmac import new as hmac_new
  5. from flask import abort, current_app, request
  6. from extensions.ext_database import db
  7. from models.model import EndUser
  8. def inner_api_only(view):
  9. @wraps(view)
  10. def decorated(*args, **kwargs):
  11. if not current_app.config['INNER_API']:
  12. abort(404)
  13. # get header 'X-Inner-Api-Key'
  14. inner_api_key = request.headers.get('X-Inner-Api-Key')
  15. if not inner_api_key or inner_api_key != current_app.config['INNER_API_KEY']:
  16. abort(404)
  17. return view(*args, **kwargs)
  18. return decorated
  19. def inner_api_user_auth(view):
  20. @wraps(view)
  21. def decorated(*args, **kwargs):
  22. if not current_app.config['INNER_API']:
  23. return view(*args, **kwargs)
  24. # get header 'X-Inner-Api-Key'
  25. authorization = request.headers.get('Authorization')
  26. if not authorization:
  27. return view(*args, **kwargs)
  28. parts = authorization.split(':')
  29. if len(parts) != 2:
  30. return view(*args, **kwargs)
  31. user_id, token = parts
  32. if ' ' in user_id:
  33. user_id = user_id.split(' ')[1]
  34. inner_api_key = request.headers.get('X-Inner-Api-Key')
  35. data_to_sign = f'DIFY {user_id}'
  36. signature = hmac_new(inner_api_key.encode('utf-8'), data_to_sign.encode('utf-8'), sha1)
  37. signature = b64encode(signature.digest()).decode('utf-8')
  38. if signature != token:
  39. return view(*args, **kwargs)
  40. kwargs['user'] = db.session.query(EndUser).filter(EndUser.id == user_id).first()
  41. return view(*args, **kwargs)
  42. return decorated