1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162 |
- from base64 import b64encode
- from functools import wraps
- from hashlib import sha1
- from hmac import new as hmac_new
- from flask import abort, current_app, request
- from extensions.ext_database import db
- from models.model import EndUser
- def inner_api_only(view):
- @wraps(view)
- def decorated(*args, **kwargs):
- if not current_app.config['INNER_API']:
- abort(404)
- # get header 'X-Inner-Api-Key'
- inner_api_key = request.headers.get('X-Inner-Api-Key')
- if not inner_api_key or inner_api_key != current_app.config['INNER_API_KEY']:
- abort(404)
- return view(*args, **kwargs)
- return decorated
- def inner_api_user_auth(view):
- @wraps(view)
- def decorated(*args, **kwargs):
- if not current_app.config['INNER_API']:
- return view(*args, **kwargs)
- # get header 'X-Inner-Api-Key'
- authorization = request.headers.get('Authorization')
- if not authorization:
- return view(*args, **kwargs)
- parts = authorization.split(':')
- if len(parts) != 2:
- return view(*args, **kwargs)
- user_id, token = parts
- if ' ' in user_id:
- user_id = user_id.split(' ')[1]
- inner_api_key = request.headers.get('X-Inner-Api-Key')
- data_to_sign = f'DIFY {user_id}'
- signature = hmac_new(inner_api_key.encode('utf-8'), data_to_sign.encode('utf-8'), sha1)
- signature = b64encode(signature.digest()).decode('utf-8')
- if signature != token:
- return view(*args, **kwargs)
- kwargs['user'] = db.session.query(EndUser).filter(EndUser.id == user_id).first()
- return view(*args, **kwargs)
- return decorated
|