diff --git a/tests/test_sso.py b/tests/test_sso.py index 7990cf9c..c29de60e 100644 --- a/tests/test_sso.py +++ b/tests/test_sso.py @@ -294,6 +294,34 @@ def test_get_profile_and_token_without_first_name_or_last_name_returns_expected_ "grant_type": "authorization_code", } + def test_get_profile_and_token_with_code_verifier( + self, mock_profile, capture_and_mock_http_client_request + ): + response_dict = { + "profile": mock_profile, + "access_token": "01DY34ACQTM3B1CSX1YSZ8Z00D", + } + + request_kwargs = capture_and_mock_http_client_request( + self.http_client, response_dict, 200 + ) + + profile_and_token = syncify( + self.sso.get_profile_and_token("123", code_verifier="test_code_verifier") + ) + + assert profile_and_token.access_token == "01DY34ACQTM3B1CSX1YSZ8Z00D" + assert profile_and_token.profile.dict() == mock_profile + assert request_kwargs["url"].endswith("/sso/token") + assert request_kwargs["method"] == "post" + assert request_kwargs["json"] == { + "client_id": "client_b27needthisforssotemxo", + "client_secret": "sk_test", + "code": "123", + "grant_type": "authorization_code", + "code_verifier": "test_code_verifier", + } + def test_get_profile(self, mock_profile, capture_and_mock_http_client_request): request_kwargs = capture_and_mock_http_client_request( self.http_client, mock_profile, 200 diff --git a/workos/sso.py b/workos/sso.py index 7fa722a6..63e271da 100644 --- a/workos/sso.py +++ b/workos/sso.py @@ -117,7 +117,9 @@ def get_profile(self, access_token: str) -> SyncOrAsync[Profile]: """ ... - def get_profile_and_token(self, code: str) -> SyncOrAsync[ProfileAndToken]: + def get_profile_and_token( + self, code: str, code_verifier: Optional[str] = None + ) -> SyncOrAsync[ProfileAndToken]: """Get the profile of an authenticated User Once authenticated, using the code returned having followed the authorization URL, @@ -125,6 +127,7 @@ def get_profile_and_token(self, code: str) -> SyncOrAsync[ProfileAndToken]: Args: code (str): Code returned by WorkOS on completion of OAuth 2.0 workflow. + code_verifier (str): The code verifier for the PKCE flow. (Optional) Returns: ProfileAndToken: WorkOSProfileAndToken object representing the User. @@ -217,7 +220,9 @@ def get_profile(self, access_token: str) -> Profile: return Profile.model_validate(response) - def get_profile_and_token(self, code: str) -> ProfileAndToken: + def get_profile_and_token( + self, code: str, code_verifier: Optional[str] = None + ) -> ProfileAndToken: json = { "client_id": self._http_client.client_id, "client_secret": self._http_client.api_key, @@ -225,6 +230,9 @@ def get_profile_and_token(self, code: str) -> ProfileAndToken: "grant_type": OAUTH_GRANT_TYPE, } + if code_verifier is not None: + json["code_verifier"] = code_verifier + response = self._http_client.request( TOKEN_PATH, method=REQUEST_METHOD_POST, json=json ) @@ -321,7 +329,9 @@ async def get_profile(self, access_token: str) -> Profile: return Profile.model_validate(response) - async def get_profile_and_token(self, code: str) -> ProfileAndToken: + async def get_profile_and_token( + self, code: str, code_verifier: Optional[str] = None + ) -> ProfileAndToken: json = { "client_id": self._http_client.client_id, "client_secret": self._http_client.api_key, @@ -329,6 +339,9 @@ async def get_profile_and_token(self, code: str) -> ProfileAndToken: "grant_type": OAUTH_GRANT_TYPE, } + if code_verifier is not None: + json["code_verifier"] = code_verifier + response = await self._http_client.request( TOKEN_PATH, method=REQUEST_METHOD_POST, json=json )