enterprise_sso_service.py 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. import logging
  2. from models.account import Account, AccountStatus
  3. from services.account_service import AccountService, TenantService
  4. from services.enterprise.base import EnterpriseRequest
  5. logger = logging.getLogger(__name__)
  6. class EnterpriseSSOService:
  7. @classmethod
  8. def get_sso_saml_login(cls) -> str:
  9. return EnterpriseRequest.send_request('GET', '/sso/saml/login')
  10. @classmethod
  11. def post_sso_saml_acs(cls, saml_response: str) -> str:
  12. response = EnterpriseRequest.send_request('POST', '/sso/saml/acs', json={'SAMLResponse': saml_response})
  13. if 'email' not in response or response['email'] is None:
  14. logger.exception(response)
  15. raise Exception('Saml response is invalid')
  16. return cls.login_with_email(response.get('email'))
  17. @classmethod
  18. def get_sso_oidc_login(cls):
  19. return EnterpriseRequest.send_request('GET', '/sso/oidc/login')
  20. @classmethod
  21. def get_sso_oidc_callback(cls, args: dict):
  22. state_from_query = args['state']
  23. code_from_query = args['code']
  24. state_from_cookies = args['oidc-state']
  25. if state_from_cookies != state_from_query:
  26. raise Exception('invalid state or code')
  27. response = EnterpriseRequest.send_request('GET', '/sso/oidc/callback', params={'code': code_from_query})
  28. if 'email' not in response or response['email'] is None:
  29. logger.exception(response)
  30. raise Exception('OIDC response is invalid')
  31. return cls.login_with_email(response.get('email'))
  32. @classmethod
  33. def login_with_email(cls, email: str) -> str:
  34. account = Account.query.filter_by(email=email).first()
  35. if account is None:
  36. raise Exception('account not found, please contact system admin to invite you to join in a workspace')
  37. if account.status == AccountStatus.BANNED:
  38. raise Exception('account is banned, please contact system admin')
  39. tenants = TenantService.get_join_tenants(account)
  40. if len(tenants) == 0:
  41. raise Exception("workspace not found, please contact system admin to invite you to join in a workspace")
  42. token = AccountService.get_account_jwt_token(account)
  43. return token