1+ # Note: Since Aug 2019 we move all e2e tests into test_e2e.py,
2+ # so this test_application file contains only unit tests without dependency.
13import os
24import json
35import logging
1315from tests .test_token_cache import TokenCacheTestCase
1416
1517
16- THIS_FOLDER = os .path .dirname (__file__ )
17- CONFIG_FILE = os .path .join (THIS_FOLDER , 'config.json' )
18- CONFIG = {}
19- if os .path .exists (CONFIG_FILE ):
20- with open (CONFIG_FILE ) as conf :
21- CONFIG = json .load (conf )
22-
2318logger = logging .getLogger (__name__ )
2419logging .basicConfig (level = logging .DEBUG )
2520
2621
27- class Oauth2TestCase (unittest .TestCase ):
28-
29- def assertLoosely (self , response , assertion = None ,
30- skippable_errors = ("invalid_grant" , "interaction_required" )):
31- if response .get ("error" ) in skippable_errors :
32- logger .debug ("Response = %s" , response )
33- # Some of these errors are configuration issues, not library issues
34- raise unittest .SkipTest (response .get ("error_description" ))
35- else :
36- if assertion is None :
37- assertion = lambda : self .assertIn (
38- "access_token" , response ,
39- "{error}: {error_description}" .format (
40- # Do explicit response.get(...) rather than **response
41- error = response .get ("error" ),
42- error_description = response .get ("error_description" )))
43- assertion ()
44-
45- def assertCacheWorks (self , result_from_wire ):
46- result = result_from_wire
47- # You can filter by predefined username, or let end user to choose one
48- accounts = self .app .get_accounts (username = CONFIG .get ("username" ))
49- self .assertNotEqual (0 , len (accounts ))
50- account = accounts [0 ]
51- # Going to test acquire_token_silent(...) to locate an AT from cache
52- result_from_cache = self .app .acquire_token_silent (
53- CONFIG ["scope" ], account = account )
54- self .assertIsNotNone (result_from_cache )
55- self .assertEqual (result ['access_token' ], result_from_cache ['access_token' ],
56- "We should get a cached AT" )
57-
58- # Going to test acquire_token_silent(...) to obtain an AT by a RT from cache
59- self .app .token_cache ._cache ["AccessToken" ] = {} # A hacky way to clear ATs
60- result_from_cache = self .app .acquire_token_silent (
61- CONFIG ["scope" ], account = account )
62- self .assertIsNotNone (result_from_cache ,
63- "We should get a result from acquire_token_silent(...) call" )
64- self .assertNotEqual (result ['access_token' ], result_from_cache ['access_token' ],
65- "We should get a fresh AT (via RT)" )
66-
67-
68- @unittest .skipUnless ("client_id" in CONFIG , "client_id missing" )
69- class TestConfidentialClientApplication (unittest .TestCase ):
70-
71- def assertCacheWorks (self , result_from_wire , result_from_cache ):
72- self .assertIsNotNone (result_from_cache )
73- self .assertEqual (
74- result_from_wire ['access_token' ], result_from_cache ['access_token' ])
75-
76- @unittest .skipUnless ("client_secret" in CONFIG , "Missing client secret" )
77- def test_client_secret (self ):
78- app = ConfidentialClientApplication (
79- CONFIG ["client_id" ], client_credential = CONFIG .get ("client_secret" ),
80- authority = CONFIG .get ("authority" ))
81- scope = CONFIG .get ("scope" , [])
82- result = app .acquire_token_for_client (scope )
83- self .assertIn ('access_token' , result )
84- self .assertCacheWorks (result , app .acquire_token_silent (scope , account = None ))
85-
86- @unittest .skipUnless ("client_certificate" in CONFIG , "Missing client cert" )
87- def test_client_certificate (self ):
88- client_certificate = CONFIG ["client_certificate" ]
89- assert ("private_key_path" in client_certificate
90- and "thumbprint" in client_certificate )
91- key_path = os .path .join (THIS_FOLDER , client_certificate ['private_key_path' ])
92- with open (key_path ) as f :
93- pem = f .read ()
94- app = ConfidentialClientApplication (
95- CONFIG ['client_id' ],
96- {"private_key" : pem , "thumbprint" : client_certificate ["thumbprint" ]})
97- scope = CONFIG .get ("scope" , [])
98- result = app .acquire_token_for_client (scope )
99- self .assertIn ('access_token' , result )
100- self .assertCacheWorks (result , app .acquire_token_silent (scope , account = None ))
22+ class TestHelperExtractCerts (unittest .TestCase ): # It is used by SNI scenario
10123
10224 def test_extract_a_tag_less_public_cert (self ):
10325 pem = "my_cert"
@@ -116,92 +38,13 @@ def test_extract_multiple_tag_enclosed_certs(self):
11638 -----BEGIN CERTIFICATE-----
11739 my_cert1
11840 -----END CERTIFICATE-----
119-
41+
12042 -----BEGIN CERTIFICATE-----
12143 my_cert2
12244 -----END CERTIFICATE-----
12345 """
12446 self .assertEqual (["my_cert1" , "my_cert2" ], extract_certs (pem ))
12547
126- @unittest .skipUnless ("public_certificate" in CONFIG , "Missing Public cert" )
127- def test_subject_name_issuer_authentication (self ):
128- assert ("private_key_file" in CONFIG
129- and "thumbprint" in CONFIG and "public_certificate" in CONFIG )
130- with open (os .path .join (THIS_FOLDER , CONFIG ['private_key_file' ])) as f :
131- pem = f .read ()
132- with open (os .path .join (THIS_FOLDER , CONFIG ['public_certificate' ])) as f :
133- public_certificate = f .read ()
134- app = ConfidentialClientApplication (
135- CONFIG ['client_id' ], authority = CONFIG ["authority" ],
136- client_credential = {"private_key" : pem , "thumbprint" : CONFIG ["thumbprint" ],
137- "public_certificate" : public_certificate })
138- scope = CONFIG .get ("scope" , [])
139- result = app .acquire_token_for_client (scope )
140- self .assertIn ('access_token' , result )
141- self .assertCacheWorks (result , app .acquire_token_silent (scope , account = None ))
142-
143- @unittest .skipUnless ("client_id" in CONFIG , "client_id missing" )
144- class TestPublicClientApplication (Oauth2TestCase ):
145-
146- @unittest .skipUnless ("username" in CONFIG and "password" in CONFIG , "Missing U/P" )
147- def test_username_password (self ):
148- self .app = PublicClientApplication (
149- CONFIG ["client_id" ], authority = CONFIG ["authority" ])
150- result = self .app .acquire_token_by_username_password (
151- CONFIG ["username" ], CONFIG ["password" ], scopes = CONFIG .get ("scope" ))
152- self .assertLoosely (result )
153- self .assertCacheWorks (result )
154-
155- def test_device_flow (self ):
156- self .app = PublicClientApplication (
157- CONFIG ["client_id" ], authority = CONFIG ["authority" ])
158- flow = self .app .initiate_device_flow (scopes = CONFIG .get ("scope" ))
159- assert "user_code" in flow , str (flow ) # Provision or policy might block DF
160- logging .warning (flow ["message" ])
161-
162- duration = 30
163- logging .warning ("We will wait up to %d seconds for you to sign in" % duration )
164- flow ["expires_at" ] = time .time () + duration # Shorten the time for quick test
165- result = self .app .acquire_token_by_device_flow (flow )
166- self .assertLoosely (
167- result ,
168- assertion = lambda : self .assertIn ('access_token' , result ),
169- skippable_errors = self .app .client .DEVICE_FLOW_RETRIABLE_ERRORS )
170-
171- if "access_token" in result :
172- self .assertCacheWorks (result )
173-
174-
175- @unittest .skipUnless ("client_id" in CONFIG , "client_id missing" )
176- class TestClientApplication (Oauth2TestCase ):
177-
178- @classmethod
179- def setUpClass (cls ):
180- cls .app = ClientApplication (
181- CONFIG ["client_id" ], client_credential = CONFIG .get ("client_secret" ),
182- authority = CONFIG .get ("authority" ))
183-
184- @unittest .skipUnless ("scope" in CONFIG , "Missing scope" )
185- def test_auth_code (self ):
186- from msal .oauth2cli .authcode import obtain_auth_code
187- port = CONFIG .get ("listen_port" , 44331 )
188- redirect_uri = "http://localhost:%s" % port
189- auth_request_uri = self .app .get_authorization_request_url (
190- CONFIG ["scope" ], redirect_uri = redirect_uri )
191- ac = obtain_auth_code (port , auth_uri = auth_request_uri )
192- self .assertNotEqual (ac , None )
193-
194- result = self .app .acquire_token_by_authorization_code (
195- ac , CONFIG ["scope" ], redirect_uri = redirect_uri )
196- logging .debug ("cache = %s" , json .dumps (self .app .token_cache ._cache , indent = 4 ))
197- self .assertIn (
198- "access_token" , result ,
199- "{error}: {error_description}" .format (
200- # Note: No interpolation here, cause error won't always present
201- error = result .get ("error" ),
202- error_description = result .get ("error_description" )))
203- self .assertCacheWorks (result )
204-
20548
20649class TestClientApplicationAcquireTokenSilentFociBehaviors (unittest .TestCase ):
20750
0 commit comments