oauth.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. import urllib.parse
  2. from dataclasses import dataclass
  3. import requests
  4. @dataclass
  5. class OAuthUserInfo:
  6. id: str
  7. name: str
  8. email: str
  9. class OAuth:
  10. def __init__(self, client_id: str, client_secret: str, redirect_uri: str):
  11. self.client_id = client_id
  12. self.client_secret = client_secret
  13. self.redirect_uri = redirect_uri
  14. def get_authorization_url(self):
  15. raise NotImplementedError()
  16. def get_access_token(self, code: str):
  17. raise NotImplementedError()
  18. def get_raw_user_info(self, token: str):
  19. raise NotImplementedError()
  20. def get_user_info(self, token: str) -> OAuthUserInfo:
  21. raw_info = self.get_raw_user_info(token)
  22. return self._transform_user_info(raw_info)
  23. def _transform_user_info(self, raw_info: dict) -> OAuthUserInfo:
  24. raise NotImplementedError()
  25. class GitHubOAuth(OAuth):
  26. _AUTH_URL = "https://github.com/login/oauth/authorize"
  27. _TOKEN_URL = "https://github.com/login/oauth/access_token"
  28. _USER_INFO_URL = "https://api.github.com/user"
  29. _EMAIL_INFO_URL = "https://api.github.com/user/emails"
  30. def get_authorization_url(self):
  31. params = {
  32. "client_id": self.client_id,
  33. "redirect_uri": self.redirect_uri,
  34. "scope": "user:email", # Request only basic user information
  35. }
  36. return f"{self._AUTH_URL}?{urllib.parse.urlencode(params)}"
  37. def get_access_token(self, code: str):
  38. data = {
  39. "client_id": self.client_id,
  40. "client_secret": self.client_secret,
  41. "code": code,
  42. "redirect_uri": self.redirect_uri,
  43. }
  44. headers = {"Accept": "application/json"}
  45. response = requests.post(self._TOKEN_URL, data=data, headers=headers)
  46. response_json = response.json()
  47. access_token = response_json.get("access_token")
  48. if not access_token:
  49. raise ValueError(f"Error in GitHub OAuth: {response_json}")
  50. return access_token
  51. def get_raw_user_info(self, token: str):
  52. headers = {"Authorization": f"token {token}"}
  53. response = requests.get(self._USER_INFO_URL, headers=headers)
  54. response.raise_for_status()
  55. user_info = response.json()
  56. email_response = requests.get(self._EMAIL_INFO_URL, headers=headers)
  57. email_info = email_response.json()
  58. primary_email = next((email for email in email_info if email["primary"] == True), None)
  59. return {**user_info, "email": primary_email["email"]}
  60. def _transform_user_info(self, raw_info: dict) -> OAuthUserInfo:
  61. email = raw_info.get("email")
  62. if not email:
  63. email = f"{raw_info['id']}+{raw_info['login']}@users.noreply.github.com"
  64. return OAuthUserInfo(id=str(raw_info["id"]), name=raw_info["name"], email=email)
  65. class GoogleOAuth(OAuth):
  66. _AUTH_URL = "https://accounts.google.com/o/oauth2/v2/auth"
  67. _TOKEN_URL = "https://oauth2.googleapis.com/token"
  68. _USER_INFO_URL = "https://www.googleapis.com/oauth2/v3/userinfo"
  69. def get_authorization_url(self):
  70. params = {
  71. "client_id": self.client_id,
  72. "response_type": "code",
  73. "redirect_uri": self.redirect_uri,
  74. "scope": "openid email",
  75. }
  76. return f"{self._AUTH_URL}?{urllib.parse.urlencode(params)}"
  77. def get_access_token(self, code: str):
  78. data = {
  79. "client_id": self.client_id,
  80. "client_secret": self.client_secret,
  81. "code": code,
  82. "grant_type": "authorization_code",
  83. "redirect_uri": self.redirect_uri,
  84. }
  85. headers = {"Accept": "application/json"}
  86. response = requests.post(self._TOKEN_URL, data=data, headers=headers)
  87. response_json = response.json()
  88. access_token = response_json.get("access_token")
  89. if not access_token:
  90. raise ValueError(f"Error in Google OAuth: {response_json}")
  91. return access_token
  92. def get_raw_user_info(self, token: str):
  93. headers = {"Authorization": f"Bearer {token}"}
  94. response = requests.get(self._USER_INFO_URL, headers=headers)
  95. response.raise_for_status()
  96. return response.json()
  97. def _transform_user_info(self, raw_info: dict) -> OAuthUserInfo:
  98. return OAuthUserInfo(id=str(raw_info["sub"]), name=None, email=raw_info["email"])