diff options
| -rw-r--r-- | CHANGES.txt | 9 | ||||
| -rw-r--r-- | LICENSE.txt | 24 | ||||
| -rw-r--r-- | pyramid/authentication.py | 164 | ||||
| -rw-r--r-- | pyramid/tests/test_authentication.py | 311 |
4 files changed, 380 insertions, 128 deletions
diff --git a/CHANGES.txt b/CHANGES.txt index 443f9ca4c..8bf4b9376 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,3 +1,12 @@ +Next release +============ + +Internal +-------- + +- Internalize code previously depended upon as imports from the + ``paste.auth`` module (futureproof). + 1.2a5 (2011-09-04) ================== diff --git a/LICENSE.txt b/LICENSE.txt index f7ace1698..a1f6d4777 100644 --- a/LICENSE.txt +++ b/LICENSE.txt @@ -135,3 +135,27 @@ following license: IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +Portions of the code marked as "stolen from Paste" are provided under the +following license: + + Copyright (c) 2006-2007 Ian Bicking and Contributors + + Permission is hereby granted, free of charge, to any person obtaining + a copy of this software and associated documentation files (the + "Software"), to deal in the Software without restriction, including + without limitation the rights to use, copy, modify, merge, publish, + distribute, sublicense, and/or sell copies of the Software, and to + permit persons to whom the Software is furnished to do so, subject to + the following conditions: + + The above copyright notice and this permission notice shall be + included in all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE + LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION + OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION + WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/pyramid/authentication.py b/pyramid/authentication.py index b6d0ca667..61ab4143e 100644 --- a/pyramid/authentication.py +++ b/pyramid/authentication.py @@ -1,11 +1,10 @@ from codecs import utf_8_decode from codecs import utf_8_encode +from hashlib import md5 import datetime import re -import time - -from paste.auth import auth_tkt -from paste.request import get_cookies +import time as time_mod +import urllib from zope.interface import implements @@ -16,7 +15,6 @@ from pyramid.request import add_global_response_headers from pyramid.security import Authenticated from pyramid.security import Everyone - VALID_TOKEN = re.compile(r"^[A-Za-z][A-Za-z0-9+_-]*$") class CallbackAuthenticationPolicy(object): @@ -108,7 +106,6 @@ class CallbackAuthenticationPolicy(object): ) return effective_principals - class RepozeWho1AuthenticationPolicy(CallbackAuthenticationPolicy): """ A :app:`Pyramid` :term:`authentication policy` which obtains data from the :mod:`repoze.who` 1.X WSGI 'API' (the @@ -392,6 +389,140 @@ def b64encode(v): def b64decode(v): return v.decode('base64') +# this class licensed under the MIT license (stolen from Paste) +class AuthTicket(object): + """ + This class represents an authentication token. You must pass in + the shared secret, the userid, and the IP address. Optionally you + can include tokens (a list of strings, representing role names), + 'user_data', which is arbitrary data available for your own use in + later scripts. Lastly, you can override the cookie name and + timestamp. + + Once you provide all the arguments, use .cookie_value() to + generate the appropriate authentication ticket. + + CGI usage:: + + token = auth_tkt.AuthTick('sharedsecret', 'username', + os.environ['REMOTE_ADDR'], tokens=['admin']) + print 'Status: 200 OK' + print 'Content-type: text/html' + print token.cookie() + print + ... redirect HTML ... + + Webware usage:: + + token = auth_tkt.AuthTick('sharedsecret', 'username', + self.request().environ()['REMOTE_ADDR'], tokens=['admin']) + self.response().setCookie('auth_tkt', token.cookie_value()) + """ + + def __init__(self, secret, userid, ip, tokens=(), user_data='', + time=None, cookie_name='auth_tkt', + secure=False): + self.secret = secret + self.userid = userid + self.ip = ip + self.tokens = ','.join(tokens) + self.user_data = user_data + if time is None: + self.time = time_mod.time() + else: + self.time = time + self.cookie_name = cookie_name + self.secure = secure + + def digest(self): + return calculate_digest( + self.ip, self.time, self.secret, self.userid, self.tokens, + self.user_data) + + def cookie_value(self): + v = '%s%08x%s!' % (self.digest(), int(self.time), + urllib.quote(self.userid)) + if self.tokens: + v += self.tokens + '!' + v += self.user_data + return v + +# this class licensed under the MIT license (stolen from Paste) +class BadTicket(Exception): + """ + Exception raised when a ticket can't be parsed. If we get far enough to + determine what the expected digest should have been, expected is set. + This should not be shown by default, but can be useful for debugging. + """ + def __init__(self, msg, expected=None): + self.expected = expected + Exception.__init__(self, msg) + +# this function licensed under the MIT license (stolen from Paste) +def parse_ticket(secret, ticket, ip): + """ + Parse the ticket, returning (timestamp, userid, tokens, user_data). + + If the ticket cannot be parsed, a ``BadTicket`` exception will be raised + with an explanation. + """ + ticket = ticket.strip('"') + digest = ticket[:32] + try: + timestamp = int(ticket[32:40], 16) + except ValueError, e: + raise BadTicket('Timestamp is not a hex integer: %s' % e) + try: + userid, data = ticket[40:].split('!', 1) + except ValueError: + raise BadTicket('userid is not followed by !') + userid = urllib.unquote(userid) + if '!' in data: + tokens, user_data = data.split('!', 1) + else: # pragma: no cover (never generated) + # @@: Is this the right order? + tokens = '' + user_data = data + + expected = calculate_digest(ip, timestamp, secret, + userid, tokens, user_data) + + if expected != digest: + raise BadTicket('Digest signature is not correct', + expected=(expected, digest)) + + tokens = tokens.split(',') + + return (timestamp, userid, tokens, user_data) + +# this function licensed under the MIT license (stolen from Paste) +def calculate_digest(ip, timestamp, secret, userid, tokens, user_data): + secret = maybe_encode(secret) + userid = maybe_encode(userid) + tokens = maybe_encode(tokens) + user_data = maybe_encode(user_data) + digest0 = md5( + encode_ip_timestamp(ip, timestamp) + secret + userid + '\0' + + tokens + '\0' + user_data).hexdigest() + digest = md5(digest0 + secret).hexdigest() + return digest + +# this function licensed under the MIT license (stolen from Paste) +def encode_ip_timestamp(ip, timestamp): + ip_chars = ''.join(map(chr, map(int, ip.split('.')))) + t = int(timestamp) + ts = ((t & 0xff000000) >> 24, + (t & 0xff0000) >> 16, + (t & 0xff00) >> 8, + t & 0xff) + ts_chars = ''.join(map(chr, ts)) + return ip_chars + ts_chars + +def maybe_encode(s, encoding='utf8'): + if isinstance(s, unicode): + s = s.encode(encoding) + return s + EXPIRE = object() class AuthTktCookieHelper(object): @@ -401,7 +532,9 @@ class AuthTktCookieHelper(object): :class:`pyramid.authentication.AuthTktAuthenticationPolicy` for the meanings of the constructor arguments. """ - auth_tkt = auth_tkt # for tests + parse_ticket = staticmethod(parse_ticket) # for tests + AuthTicket = AuthTicket # for tests + BadTicket = BadTicket # for tests now = None # for tests userid_type_decoders = { @@ -487,10 +620,9 @@ class AuthTktCookieHelper(object): """ Return a dictionary with authentication information, or ``None`` if no valid auth_tkt is attached to ``request``""" environ = request.environ - cookies = get_cookies(environ) - cookie = cookies.get(self.cookie_name) + cookie = request.cookies.get(self.cookie_name) - if cookie is None or not cookie.value: + if cookie is None: return None if self.include_ip: @@ -499,15 +631,15 @@ class AuthTktCookieHelper(object): remote_addr = '0.0.0.0' try: - timestamp, userid, tokens, user_data = self.auth_tkt.parse_ticket( - self.secret, cookie.value, remote_addr) - except self.auth_tkt.BadTicket: + timestamp, userid, tokens, user_data = self.parse_ticket( + self.secret, cookie, remote_addr) + except self.BadTicket: return None now = self.now # service tests if now is None: - now = time.time() + now = time_mod.time() if self.timeout and ( (timestamp + self.timeout) < now ): # the auth_tkt data has expired @@ -592,7 +724,7 @@ class AuthTktCookieHelper(object): if not (isinstance(token, str) and VALID_TOKEN.match(token)): raise ValueError("Invalid token %r" % (token,)) - ticket = self.auth_tkt.AuthTicket( + ticket = self.AuthTicket( self.secret, userid, remote_addr, @@ -655,3 +787,5 @@ class SessionAuthenticationPolicy(CallbackAuthenticationPolicy): def unauthenticated_userid(self, request): return request.session.get(self.userid_key) + +# 14a3263f21e58dc0c1a4c994ab640bff4e6448d1ZWRpdG9y!userid_type:b64unicode diff --git a/pyramid/tests/test_authentication.py b/pyramid/tests/test_authentication.py index e8904f9c0..40f6731bf 100644 --- a/pyramid/tests/test_authentication.py +++ b/pyramid/tests/test_authentication.py @@ -435,17 +435,21 @@ class TestAuthTktCookieHelper(unittest.TestCase): return AuthTktCookieHelper def _makeOne(self, *arg, **kw): - plugin = self._getTargetClass()(*arg, **kw) - plugin.auth_tkt = DummyAuthTktModule() - return plugin - - def _makeRequest(self, kw=None): + helper = self._getTargetClass()(*arg, **kw) + # laziness after moving auth_tkt classes and funcs into + # authentication module + auth_tkt = DummyAuthTktModule() + helper.auth_tkt = auth_tkt + helper.AuthTicket = auth_tkt.AuthTicket + helper.parse_ticket = auth_tkt.parse_ticket + helper.BadTicket = auth_tkt.BadTicket + return helper + + def _makeRequest(self, cookie=None): environ = {'wsgi.version': (1,0)} - if kw is not None: - environ.update(kw) environ['REMOTE_ADDR'] = '1.1.1.1' environ['SERVER_NAME'] = 'localhost' - return DummyRequest(environ) + return DummyRequest(environ, cookie=cookie) def _cookieValue(self, cookie): return eval(cookie.value) @@ -464,57 +468,57 @@ class TestAuthTktCookieHelper(unittest.TestCase): return cookies.get('auth_tkt') def test_identify_nocookie(self): - plugin = self._makeOne('secret') + helper = self._makeOne('secret') request = self._makeRequest() - result = plugin.identify(request) + result = helper.identify(request) self.assertEqual(result, None) def test_identify_cookie_value_is_None(self): - plugin = self._makeOne('secret') - request = self._makeRequest({'HTTP_COOKIE':'auth_tkt='}) - result = plugin.identify(request) + helper = self._makeOne('secret') + request = self._makeRequest(None) + result = helper.identify(request) self.assertEqual(result, None) def test_identify_good_cookie_include_ip(self): - plugin = self._makeOne('secret', include_ip=True) - request = self._makeRequest({'HTTP_COOKIE':'auth_tkt=ticket'}) - result = plugin.identify(request) + helper = self._makeOne('secret', include_ip=True) + request = self._makeRequest('ticket') + result = helper.identify(request) self.assertEqual(len(result), 4) self.assertEqual(result['tokens'], ()) self.assertEqual(result['userid'], 'userid') self.assertEqual(result['userdata'], '') self.assertEqual(result['timestamp'], 0) - self.assertEqual(plugin.auth_tkt.value, 'ticket') - self.assertEqual(plugin.auth_tkt.remote_addr, '1.1.1.1') - self.assertEqual(plugin.auth_tkt.secret, 'secret') + self.assertEqual(helper.auth_tkt.value, 'ticket') + self.assertEqual(helper.auth_tkt.remote_addr, '1.1.1.1') + self.assertEqual(helper.auth_tkt.secret, 'secret') environ = request.environ self.assertEqual(environ['REMOTE_USER_TOKENS'], ()) self.assertEqual(environ['REMOTE_USER_DATA'],'') self.assertEqual(environ['AUTH_TYPE'],'cookie') def test_identify_good_cookie_dont_include_ip(self): - plugin = self._makeOne('secret', include_ip=False) - request = self._makeRequest({'HTTP_COOKIE':'auth_tkt=ticket'}) - result = plugin.identify(request) + helper = self._makeOne('secret', include_ip=False) + request = self._makeRequest('ticket') + result = helper.identify(request) self.assertEqual(len(result), 4) self.assertEqual(result['tokens'], ()) self.assertEqual(result['userid'], 'userid') self.assertEqual(result['userdata'], '') self.assertEqual(result['timestamp'], 0) - self.assertEqual(plugin.auth_tkt.value, 'ticket') - self.assertEqual(plugin.auth_tkt.remote_addr, '0.0.0.0') - self.assertEqual(plugin.auth_tkt.secret, 'secret') + self.assertEqual(helper.auth_tkt.value, 'ticket') + self.assertEqual(helper.auth_tkt.remote_addr, '0.0.0.0') + self.assertEqual(helper.auth_tkt.secret, 'secret') environ = request.environ self.assertEqual(environ['REMOTE_USER_TOKENS'], ()) self.assertEqual(environ['REMOTE_USER_DATA'],'') self.assertEqual(environ['AUTH_TYPE'],'cookie') def test_identify_good_cookie_int_useridtype(self): - plugin = self._makeOne('secret', include_ip=False) - plugin.auth_tkt.userid = '1' - plugin.auth_tkt.user_data = 'userid_type:int' - request = self._makeRequest({'HTTP_COOKIE':'auth_tkt=ticket'}) - result = plugin.identify(request) + helper = self._makeOne('secret', include_ip=False) + helper.auth_tkt.userid = '1' + helper.auth_tkt.user_data = 'userid_type:int' + request = self._makeRequest('ticket') + result = helper.identify(request) self.assertEqual(len(result), 4) self.assertEqual(result['tokens'], ()) self.assertEqual(result['userid'], 1) @@ -526,11 +530,11 @@ class TestAuthTktCookieHelper(unittest.TestCase): self.assertEqual(environ['AUTH_TYPE'],'cookie') def test_identify_nonuseridtype_user_data(self): - plugin = self._makeOne('secret', include_ip=False) - plugin.auth_tkt.userid = '1' - plugin.auth_tkt.user_data = 'bogus:int' - request = self._makeRequest({'HTTP_COOKIE':'auth_tkt=ticket'}) - result = plugin.identify(request) + helper = self._makeOne('secret', include_ip=False) + helper.auth_tkt.userid = '1' + helper.auth_tkt.user_data = 'bogus:int' + request = self._makeRequest('ticket') + result = helper.identify(request) self.assertEqual(len(result), 4) self.assertEqual(result['tokens'], ()) self.assertEqual(result['userid'], '1') @@ -542,11 +546,11 @@ class TestAuthTktCookieHelper(unittest.TestCase): self.assertEqual(environ['AUTH_TYPE'],'cookie') def test_identify_good_cookie_unknown_useridtype(self): - plugin = self._makeOne('secret', include_ip=False) - plugin.auth_tkt.userid = 'abc' - plugin.auth_tkt.user_data = 'userid_type:unknown' - request = self._makeRequest({'HTTP_COOKIE':'auth_tkt=ticket'}) - result = plugin.identify(request) + helper = self._makeOne('secret', include_ip=False) + helper.auth_tkt.userid = 'abc' + helper.auth_tkt.user_data = 'userid_type:unknown' + request = self._makeRequest('ticket') + result = helper.identify(request) self.assertEqual(len(result), 4) self.assertEqual(result['tokens'], ()) self.assertEqual(result['userid'], 'abc') @@ -558,11 +562,11 @@ class TestAuthTktCookieHelper(unittest.TestCase): self.assertEqual(environ['AUTH_TYPE'],'cookie') def test_identify_good_cookie_b64str_useridtype(self): - plugin = self._makeOne('secret', include_ip=False) - plugin.auth_tkt.userid = 'encoded'.encode('base64').strip() - plugin.auth_tkt.user_data = 'userid_type:b64str' - request = self._makeRequest({'HTTP_COOKIE':'auth_tkt=ticket'}) - result = plugin.identify(request) + helper = self._makeOne('secret', include_ip=False) + helper.auth_tkt.userid = 'encoded'.encode('base64').strip() + helper.auth_tkt.user_data = 'userid_type:b64str' + request = self._makeRequest('ticket') + result = helper.identify(request) self.assertEqual(len(result), 4) self.assertEqual(result['tokens'], ()) self.assertEqual(result['userid'], 'encoded') @@ -574,11 +578,11 @@ class TestAuthTktCookieHelper(unittest.TestCase): self.assertEqual(environ['AUTH_TYPE'],'cookie') def test_identify_good_cookie_b64unicode_useridtype(self): - plugin = self._makeOne('secret', include_ip=False) - plugin.auth_tkt.userid = '\xc3\xa9ncoded'.encode('base64').strip() - plugin.auth_tkt.user_data = 'userid_type:b64unicode' - request = self._makeRequest({'HTTP_COOKIE':'auth_tkt=ticket'}) - result = plugin.identify(request) + helper = self._makeOne('secret', include_ip=False) + helper.auth_tkt.userid = '\xc3\xa9ncoded'.encode('base64').strip() + helper.auth_tkt.user_data = 'userid_type:b64unicode' + request = self._makeRequest('ticket') + result = helper.identify(request) self.assertEqual(len(result), 4) self.assertEqual(result['tokens'], ()) self.assertEqual(result['userid'], unicode('\xc3\xa9ncoded', 'utf-8')) @@ -590,26 +594,26 @@ class TestAuthTktCookieHelper(unittest.TestCase): self.assertEqual(environ['AUTH_TYPE'],'cookie') def test_identify_bad_cookie(self): - plugin = self._makeOne('secret', include_ip=True) - plugin.auth_tkt.parse_raise = True - request = self._makeRequest({'HTTP_COOKIE':'auth_tkt=bogus'}) - result = plugin.identify(request) + helper = self._makeOne('secret', include_ip=True) + helper.auth_tkt.parse_raise = True + request = self._makeRequest('ticket') + result = helper.identify(request) self.assertEqual(result, None) def test_identify_cookie_timed_out(self): - plugin = self._makeOne('secret', timeout=1) + helper = self._makeOne('secret', timeout=1) request = self._makeRequest({'HTTP_COOKIE':'auth_tkt=bogus'}) - result = plugin.identify(request) + result = helper.identify(request) self.assertEqual(result, None) def test_identify_cookie_reissue(self): import time - plugin = self._makeOne('secret', timeout=10, reissue_time=0) + helper = self._makeOne('secret', timeout=10, reissue_time=0) now = time.time() - plugin.auth_tkt.timestamp = now - plugin.now = now + 1 - request = self._makeRequest({'HTTP_COOKIE':'auth_tkt=bogus'}) - result = plugin.identify(request) + helper.auth_tkt.timestamp = now + helper.now = now + 1 + request = self._makeRequest('bogus') + result = helper.identify(request) self.assertTrue(result) self.assertEqual(len(request.callbacks), 1) response = DummyResponse() @@ -619,37 +623,41 @@ class TestAuthTktCookieHelper(unittest.TestCase): def test_identify_cookie_reissue_already_reissued_this_request(self): import time - plugin = self._makeOne('secret', timeout=10, reissue_time=0) + helper = self._makeOne('secret', timeout=10, reissue_time=0) now = time.time() - plugin.auth_tkt.timestamp = now - plugin.now = now + 1 - request = self._makeRequest({'HTTP_COOKIE':'auth_tkt=bogus'}) + helper.auth_tkt.timestamp = now + helper.now = now + 1 + request = self._makeRequest('bogus') request._authtkt_reissued = True - result = plugin.identify(request) + result = helper.identify(request) self.assertTrue(result) self.assertEqual(len(request.callbacks), 0) def test_identify_cookie_reissue_notyet(self): import time - plugin = self._makeOne('secret', timeout=10, reissue_time=10) + helper = self._makeOne('secret', timeout=10, reissue_time=10) now = time.time() - plugin.auth_tkt.timestamp = now - plugin.now = now + 1 - request = self._makeRequest({'HTTP_COOKIE':'auth_tkt=bogus'}) - result = plugin.identify(request) + helper.auth_tkt.timestamp = now + helper.now = now + 1 + request = self._makeRequest('bogus') + result = helper.identify(request) self.assertTrue(result) self.assertEqual(len(request.callbacks), 0) def test_identify_cookie_reissue_with_tokens_default(self): # see https://github.com/Pylons/pyramid/issues#issue/108 import time - plugin = self._makeOne('secret', timeout=10, reissue_time=0) - plugin.auth_tkt = DummyAuthTktModule(tokens=['']) + helper = self._makeOne('secret', timeout=10, reissue_time=0) + auth_tkt = DummyAuthTktModule(tokens=['']) + helper.auth_tkt = auth_tkt + helper.AuthTicket = auth_tkt.AuthTicket + helper.parse_ticket = auth_tkt.parse_ticket + helper.BadTicket = auth_tkt.BadTicket now = time.time() - plugin.auth_tkt.timestamp = now - plugin.now = now + 1 - request = self._makeRequest({'HTTP_COOKIE':'auth_tkt=bogus'}) - result = plugin.identify(request) + helper.auth_tkt.timestamp = now + helper.now = now + 1 + request = self._makeRequest('bogus') + result = helper.identify(request) self.assertTrue(result) self.assertEqual(len(request.callbacks), 1) response = DummyResponse() @@ -659,9 +667,9 @@ class TestAuthTktCookieHelper(unittest.TestCase): self.assertTrue("'tokens': []" in response.headerlist[0][1]) def test_remember(self): - plugin = self._makeOne('secret') + helper = self._makeOne('secret') request = self._makeRequest() - result = plugin.remember(request, 'userid') + result = helper.remember(request, 'userid') self.assertEqual(len(result), 3) self.assertEqual(result[0][0], 'Set-Cookie') @@ -677,9 +685,9 @@ class TestAuthTktCookieHelper(unittest.TestCase): self.assertTrue(result[2][1].startswith('auth_tkt=')) def test_remember_include_ip(self): - plugin = self._makeOne('secret', include_ip=True) + helper = self._makeOne('secret', include_ip=True) request = self._makeRequest() - result = plugin.remember(request, 'other') + result = helper.remember(request, 'other') self.assertEqual(len(result), 3) self.assertEqual(result[0][0], 'Set-Cookie') @@ -695,10 +703,10 @@ class TestAuthTktCookieHelper(unittest.TestCase): self.assertTrue(result[2][1].startswith('auth_tkt=')) def test_remember_path(self): - plugin = self._makeOne('secret', include_ip=True, + helper = self._makeOne('secret', include_ip=True, path="/cgi-bin/app.cgi/") request = self._makeRequest() - result = plugin.remember(request, 'other') + result = helper.remember(request, 'other') self.assertEqual(len(result), 3) self.assertEqual(result[0][0], 'Set-Cookie') @@ -716,9 +724,9 @@ class TestAuthTktCookieHelper(unittest.TestCase): self.assertTrue(result[2][1].startswith('auth_tkt=')) def test_remember_http_only(self): - plugin = self._makeOne('secret', include_ip=True, http_only=True) + helper = self._makeOne('secret', include_ip=True, http_only=True) request = self._makeRequest() - result = plugin.remember(request, 'other') + result = helper.remember(request, 'other') self.assertEqual(len(result), 3) self.assertEqual(result[0][0], 'Set-Cookie') @@ -734,9 +742,9 @@ class TestAuthTktCookieHelper(unittest.TestCase): self.assertTrue(result[2][1].startswith('auth_tkt=')) def test_remember_secure(self): - plugin = self._makeOne('secret', include_ip=True, secure=True) + helper = self._makeOne('secret', include_ip=True, secure=True) request = self._makeRequest() - result = plugin.remember(request, 'other') + result = helper.remember(request, 'other') self.assertEqual(len(result), 3) self.assertEqual(result[0][0], 'Set-Cookie') @@ -752,9 +760,9 @@ class TestAuthTktCookieHelper(unittest.TestCase): self.assertTrue(result[2][1].startswith('auth_tkt=')) def test_remember_wild_domain_disabled(self): - plugin = self._makeOne('secret', wild_domain=False) + helper = self._makeOne('secret', wild_domain=False) request = self._makeRequest() - result = plugin.remember(request, 'other') + result = helper.remember(request, 'other') self.assertEqual(len(result), 2) self.assertEqual(result[0][0], 'Set-Cookie') @@ -766,10 +774,10 @@ class TestAuthTktCookieHelper(unittest.TestCase): self.assertTrue(result[1][1].startswith('auth_tkt=')) def test_remember_domain_has_port(self): - plugin = self._makeOne('secret', wild_domain=False) + helper = self._makeOne('secret', wild_domain=False) request = self._makeRequest() request.environ['HTTP_HOST'] = 'example.com:80' - result = plugin.remember(request, 'other') + result = helper.remember(request, 'other') self.assertEqual(len(result), 2) self.assertEqual(result[0][0], 'Set-Cookie') @@ -781,9 +789,9 @@ class TestAuthTktCookieHelper(unittest.TestCase): self.assertTrue(result[1][1].startswith('auth_tkt=')) def test_remember_string_userid(self): - plugin = self._makeOne('secret') + helper = self._makeOne('secret') request = self._makeRequest() - result = plugin.remember(request, 'userid') + result = helper.remember(request, 'userid') values = self._parseHeaders(result) self.assertEqual(len(result), 3) val = self._cookieValue(values[0]) @@ -791,9 +799,9 @@ class TestAuthTktCookieHelper(unittest.TestCase): self.assertEqual(val['user_data'], 'userid_type:b64str') def test_remember_int_userid(self): - plugin = self._makeOne('secret') + helper = self._makeOne('secret') request = self._makeRequest() - result = plugin.remember(request, 1) + result = helper.remember(request, 1) values = self._parseHeaders(result) self.assertEqual(len(result), 3) val = self._cookieValue(values[0]) @@ -801,9 +809,9 @@ class TestAuthTktCookieHelper(unittest.TestCase): self.assertEqual(val['user_data'], 'userid_type:int') def test_remember_long_userid(self): - plugin = self._makeOne('secret') + helper = self._makeOne('secret') request = self._makeRequest() - result = plugin.remember(request, long(1)) + result = helper.remember(request, long(1)) values = self._parseHeaders(result) self.assertEqual(len(result), 3) val = self._cookieValue(values[0]) @@ -811,10 +819,10 @@ class TestAuthTktCookieHelper(unittest.TestCase): self.assertEqual(val['user_data'], 'userid_type:int') def test_remember_unicode_userid(self): - plugin = self._makeOne('secret') + helper = self._makeOne('secret') request = self._makeRequest() userid = unicode('\xc2\xa9', 'utf-8') - result = plugin.remember(request, userid) + result = helper.remember(request, userid) values = self._parseHeaders(result) self.assertEqual(len(result), 3) val = self._cookieValue(values[0]) @@ -823,19 +831,19 @@ class TestAuthTktCookieHelper(unittest.TestCase): self.assertEqual(val['user_data'], 'userid_type:b64unicode') def test_remember_insane_userid(self): - plugin = self._makeOne('secret') + helper = self._makeOne('secret') request = self._makeRequest() userid = object() - result = plugin.remember(request, userid) + result = helper.remember(request, userid) values = self._parseHeaders(result) self.assertEqual(len(result), 3) value = values[0] self.assertTrue('userid' in value.value) def test_remember_max_age(self): - plugin = self._makeOne('secret') + helper = self._makeOne('secret') request = self._makeRequest() - result = plugin.remember(request, 'userid', max_age='500') + result = helper.remember(request, 'userid', max_age='500') values = self._parseHeaders(result) self.assertEqual(len(result), 3) @@ -843,9 +851,9 @@ class TestAuthTktCookieHelper(unittest.TestCase): self.assertTrue(values[0]['expires']) def test_remember_tokens(self): - plugin = self._makeOne('secret') + helper = self._makeOne('secret') request = self._makeRequest() - result = plugin.remember(request, 'other', tokens=('foo', 'bar')) + result = helper.remember(request, 'other', tokens=('foo', 'bar')) self.assertEqual(len(result), 3) self.assertEqual(result[0][0], 'Set-Cookie') @@ -858,23 +866,23 @@ class TestAuthTktCookieHelper(unittest.TestCase): self.assertTrue("'tokens': ('foo', 'bar')" in result[2][1]) def test_remember_non_string_token(self): - plugin = self._makeOne('secret') + helper = self._makeOne('secret') request = self._makeRequest() - self.assertRaises(ValueError, plugin.remember, request, 'other', + self.assertRaises(ValueError, helper.remember, request, 'other', tokens=(u'foo',)) def test_remember_invalid_token_format(self): - plugin = self._makeOne('secret') + helper = self._makeOne('secret') request = self._makeRequest() - self.assertRaises(ValueError, plugin.remember, request, 'other', + self.assertRaises(ValueError, helper.remember, request, 'other', tokens=('foo bar',)) - self.assertRaises(ValueError, plugin.remember, request, 'other', + self.assertRaises(ValueError, helper.remember, request, 'other', tokens=('1bar',)) def test_forget(self): - plugin = self._makeOne('secret') + helper = self._makeOne('secret') request = self._makeRequest() - headers = plugin.forget(request) + headers = helper.forget(request) self.assertEqual(len(headers), 3) name, value = headers[0] self.assertEqual(name, 'Set-Cookie') @@ -891,6 +899,66 @@ class TestAuthTktCookieHelper(unittest.TestCase): 'auth_tkt=""; Path=/; Domain=.localhost; Max-Age=0; ' 'Expires=Wed, 31-Dec-97 23:59:59 GMT') +class TestAuthTicket(unittest.TestCase): + def _makeOne(self, *arg, **kw): + from pyramid.authentication import AuthTicket + return AuthTicket(*arg, **kw) + + def test_ctor_with_tokens(self): + ticket = self._makeOne('secret', 'userid', 'ip', tokens=('a', 'b')) + self.assertEqual(ticket.tokens, 'a,b') + + def test_ctor_with_time(self): + ticket = self._makeOne('secret', 'userid', 'ip', time='time') + self.assertEqual(ticket.time, 'time') + + def test_digest(self): + ticket = self._makeOne('secret', 'userid', '0.0.0.0', time=10) + result = ticket.digest() + self.assertEqual(result, '126fd6224912187ee9ffa80e0b81420c') + + def test_cookie_value(self): + ticket = self._makeOne('secret', 'userid', '0.0.0.0', time=10, + tokens=('a', 'b')) + result = ticket.cookie_value() + self.assertEqual(result, + '66f9cc3e423dc57c91df696cf3d1f0d80000000auserid!a,b!') + +class TestBadTicket(unittest.TestCase): + def _makeOne(self, msg, expected=None): + from pyramid.authentication import BadTicket + return BadTicket(msg, expected) + + def test_it(self): + exc = self._makeOne('msg', expected=True) + self.assertEqual(exc.expected, True) + self.assertTrue(isinstance(exc, Exception)) + +class Test_parse_ticket(unittest.TestCase): + def _callFUT(self, secret, ticket, ip): + from pyramid.authentication import parse_ticket + return parse_ticket(secret, ticket, ip) + + def _assertRaisesBadTicket(self, secret, ticket, ip): + from pyramid.authentication import BadTicket + self.assertRaises(BadTicket,self._callFUT, secret, ticket, ip) + + def test_bad_timestamp(self): + ticket = 'x' * 64 + self._assertRaisesBadTicket('secret', ticket, 'ip') + + def test_bad_userid_or_data(self): + ticket = 'x' * 32 + '11111111' + 'x' * 10 + self._assertRaisesBadTicket('secret', ticket, 'ip') + + def test_digest_sig_incorrect(self): + ticket = 'x' * 32 + '11111111' + 'a!b!c' + self._assertRaisesBadTicket('secret', ticket, '0.0.0.0') + + def test_correct_with_user_data(self): + ticket = '66f9cc3e423dc57c91df696cf3d1f0d80000000auserid!a,b!' + result = self._callFUT('secret', ticket, '0.0.0.0') + self.assertEqual(result, (10, 'userid', ['a', 'b'], '')) class TestSessionAuthenticationPolicy(unittest.TestCase): def _getTargetClass(self): @@ -984,15 +1052,32 @@ class TestSessionAuthenticationPolicy(unittest.TestCase): self.assertEqual(request.session.get('userid'), None) self.assertEqual(result, []) +class Test_maybe_encode(unittest.TestCase): + def _callFUT(self, s, encoding='utf-8'): + from pyramid.authentication import maybe_encode + return maybe_encode(s, encoding) + + def test_unicode(self): + result = self._callFUT(u'abc') + self.assertEqual(result, 'abc') + class DummyContext: pass +class DummyCookies(object): + def __init__(self, cookie): + self.cookie = cookie + + def get(self, name): + return self.cookie + class DummyRequest: - def __init__(self, environ=None, session=None, registry=None): + def __init__(self, environ=None, session=None, registry=None, cookie=None): self.environ = environ or {} self.session = session or {} self.registry = registry self.callbacks = [] + self.cookies = DummyCookies(cookie) def add_response_callback(self, callback): self.callbacks.append(callback) |
