diff options
| author | Michael Merickel <michael@merickel.org> | 2018-10-14 21:11:41 -0500 |
|---|---|---|
| committer | Michael Merickel <michael@merickel.org> | 2018-10-14 21:11:41 -0500 |
| commit | 3670c2cdb732d378ba6d38e72e7cd875ff726aa9 (patch) | |
| tree | 5213452a778c992d42602efe7d3b3655a349abd5 /tests/test_request.py | |
| parent | 2b024920847481592b1a13d4006d2a9fa8881d72 (diff) | |
| download | pyramid-3670c2cdb732d378ba6d38e72e7cd875ff726aa9.tar.gz pyramid-3670c2cdb732d378ba6d38e72e7cd875ff726aa9.tar.bz2 pyramid-3670c2cdb732d378ba6d38e72e7cd875ff726aa9.zip | |
move tests out of the package
Diffstat (limited to 'tests/test_request.py')
| -rw-r--r-- | tests/test_request.py | 588 |
1 files changed, 588 insertions, 0 deletions
diff --git a/tests/test_request.py b/tests/test_request.py new file mode 100644 index 000000000..c79c84d63 --- /dev/null +++ b/tests/test_request.py @@ -0,0 +1,588 @@ +from collections import deque +import unittest +from pyramid import testing + +from pyramid.compat import ( + PY2, + text_, + bytes_, + native_, + ) +from pyramid.security import ( + AuthenticationAPIMixin, + AuthorizationAPIMixin, + ) + +class TestRequest(unittest.TestCase): + def setUp(self): + self.config = testing.setUp() + + def tearDown(self): + testing.tearDown() + + def _getTargetClass(self): + from pyramid.request import Request + return Request + + def _makeOne(self, environ=None): + if environ is None: + environ = {} + return self._getTargetClass()(environ) + + def _registerResourceURL(self): + from pyramid.interfaces import IResourceURL + from zope.interface import Interface + class DummyResourceURL(object): + def __init__(self, context, request): + self.physical_path = '/context/' + self.virtual_path = '/context/' + self.config.registry.registerAdapter( + DummyResourceURL, (Interface, Interface), + IResourceURL) + + def test_class_conforms_to_IRequest(self): + from zope.interface.verify import verifyClass + from pyramid.interfaces import IRequest + verifyClass(IRequest, self._getTargetClass()) + + def test_instance_conforms_to_IRequest(self): + from zope.interface.verify import verifyObject + from pyramid.interfaces import IRequest + verifyObject(IRequest, self._makeOne()) + + def test_ResponseClass_is_pyramid_Response(self): + from pyramid.response import Response + cls = self._getTargetClass() + self.assertEqual(cls.ResponseClass, Response) + + def test_implements_security_apis(self): + apis = (AuthenticationAPIMixin, AuthorizationAPIMixin) + r = self._makeOne() + self.assertTrue(isinstance(r, apis)) + + def test_charset_defaults_to_utf8(self): + r = self._makeOne({'PATH_INFO':'/'}) + self.assertEqual(r.charset, 'UTF-8') + + def test_exception_defaults_to_None(self): + r = self._makeOne({'PATH_INFO':'/'}) + self.assertEqual(r.exception, None) + + def test_matchdict_defaults_to_None(self): + r = self._makeOne({'PATH_INFO':'/'}) + self.assertEqual(r.matchdict, None) + + def test_matched_route_defaults_to_None(self): + r = self._makeOne({'PATH_INFO':'/'}) + self.assertEqual(r.matched_route, None) + + def test_params_decoded_from_utf_8_by_default(self): + environ = { + 'PATH_INFO':'/', + 'QUERY_STRING':'la=La%20Pe%C3%B1a' + } + request = self._makeOne(environ) + request.charset = None + self.assertEqual(request.GET['la'], text_(b'La Pe\xf1a')) + + def test_tmpl_context(self): + from pyramid.request import TemplateContext + inst = self._makeOne() + result = inst.tmpl_context + self.assertEqual(result.__class__, TemplateContext) + + def test_session_configured(self): + from pyramid.interfaces import ISessionFactory + inst = self._makeOne() + def factory(request): + return 'orangejuice' + self.config.registry.registerUtility(factory, ISessionFactory) + inst.registry = self.config.registry + self.assertEqual(inst.session, 'orangejuice') + self.assertEqual(inst.__dict__['session'], 'orangejuice') + + def test_session_not_configured(self): + inst = self._makeOne() + inst.registry = self.config.registry + self.assertRaises(AttributeError, getattr, inst, 'session') + + def test_setattr_and_getattr_dotnotation(self): + inst = self._makeOne() + inst.foo = 1 + self.assertEqual(inst.foo, 1) + + def test_setattr_and_getattr(self): + environ = {} + inst = self._makeOne(environ) + setattr(inst, 'bar', 1) + self.assertEqual(getattr(inst, 'bar'), 1) + self.assertEqual(environ, {}) # make sure we're not using adhoc attrs + + def test_add_response_callback(self): + inst = self._makeOne() + self.assertEqual(len(inst.response_callbacks), 0) + def callback(request, response): + """ """ + inst.add_response_callback(callback) + self.assertEqual(list(inst.response_callbacks), [callback]) + inst.add_response_callback(callback) + self.assertEqual(list(inst.response_callbacks), [callback, callback]) + + def test__process_response_callbacks(self): + inst = self._makeOne() + def callback1(request, response): + request.called1 = True + response.called1 = True + def callback2(request, response): + request.called2 = True + response.called2 = True + inst.add_response_callback(callback1) + inst.add_response_callback(callback2) + response = DummyResponse() + inst._process_response_callbacks(response) + self.assertEqual(inst.called1, True) + self.assertEqual(inst.called2, True) + self.assertEqual(response.called1, True) + self.assertEqual(response.called2, True) + self.assertEqual(len(inst.response_callbacks), 0) + + def test__process_response_callback_adding_response_callback(self): + """ + When a response callback adds another callback, that new callback should still be called. + + See https://github.com/Pylons/pyramid/pull/1373 + """ + inst = self._makeOne() + def callback1(request, response): + request.called1 = True + response.called1 = True + request.add_response_callback(callback2) + def callback2(request, response): + request.called2 = True + response.called2 = True + inst.add_response_callback(callback1) + response = DummyResponse() + inst._process_response_callbacks(response) + self.assertEqual(inst.called1, True) + self.assertEqual(inst.called2, True) + self.assertEqual(response.called1, True) + self.assertEqual(response.called2, True) + self.assertEqual(len(inst.response_callbacks), 0) + + def test_add_finished_callback(self): + inst = self._makeOne() + self.assertEqual(len(inst.finished_callbacks), 0) + def callback(request): + """ """ + inst.add_finished_callback(callback) + self.assertEqual(list(inst.finished_callbacks), [callback]) + inst.add_finished_callback(callback) + self.assertEqual(list(inst.finished_callbacks), [callback, callback]) + + def test__process_finished_callbacks(self): + inst = self._makeOne() + def callback1(request): + request.called1 = True + def callback2(request): + request.called2 = True + inst.add_finished_callback(callback1) + inst.add_finished_callback(callback2) + inst._process_finished_callbacks() + self.assertEqual(inst.called1, True) + self.assertEqual(inst.called2, True) + self.assertEqual(len(inst.finished_callbacks), 0) + + def test_resource_url(self): + self._registerResourceURL() + environ = { + 'PATH_INFO':'/', + 'SERVER_NAME':'example.com', + 'SERVER_PORT':'80', + 'wsgi.url_scheme':'http', + } + inst = self._makeOne(environ) + root = DummyContext() + result = inst.resource_url(root) + self.assertEqual(result, 'http://example.com/context/') + + def test_route_url(self): + environ = { + 'PATH_INFO':'/', + 'SERVER_NAME':'example.com', + 'SERVER_PORT':'5432', + 'QUERY_STRING':'la=La%20Pe%C3%B1a', + 'wsgi.url_scheme':'http', + } + from pyramid.interfaces import IRoutesMapper + inst = self._makeOne(environ) + mapper = DummyRoutesMapper(route=DummyRoute('/1/2/3')) + self.config.registry.registerUtility(mapper, IRoutesMapper) + result = inst.route_url('flub', 'extra1', 'extra2', + a=1, b=2, c=3, _query={'a':1}, + _anchor=text_("foo")) + self.assertEqual(result, + 'http://example.com:5432/1/2/3/extra1/extra2?a=1#foo') + + def test_route_path(self): + environ = { + 'PATH_INFO':'/', + 'SERVER_NAME':'example.com', + 'SERVER_PORT':'5432', + 'QUERY_STRING':'la=La%20Pe%C3%B1a', + 'wsgi.url_scheme':'http', + } + from pyramid.interfaces import IRoutesMapper + inst = self._makeOne(environ) + mapper = DummyRoutesMapper(route=DummyRoute('/1/2/3')) + self.config.registry.registerUtility(mapper, IRoutesMapper) + result = inst.route_path('flub', 'extra1', 'extra2', + a=1, b=2, c=3, _query={'a':1}, + _anchor=text_("foo")) + self.assertEqual(result, '/1/2/3/extra1/extra2?a=1#foo') + + def test_static_url(self): + from pyramid.interfaces import IStaticURLInfo + environ = { + 'PATH_INFO':'/', + 'SERVER_NAME':'example.com', + 'SERVER_PORT':'5432', + 'QUERY_STRING':'', + 'wsgi.url_scheme':'http', + } + request = self._makeOne(environ) + info = DummyStaticURLInfo('abc') + self.config.registry.registerUtility(info, IStaticURLInfo) + result = request.static_url('pyramid.tests:static/foo.css') + self.assertEqual(result, 'abc') + self.assertEqual(info.args, + ('pyramid.tests:static/foo.css', request, {}) ) + + def test_is_response_false(self): + request = self._makeOne() + request.registry = self.config.registry + self.assertEqual(request.is_response('abc'), False) + + def test_is_response_true_ob_is_pyramid_response(self): + from pyramid.response import Response + r = Response('hello') + request = self._makeOne() + request.registry = self.config.registry + self.assertEqual(request.is_response(r), True) + + def test_is_response_false_adapter_is_not_self(self): + from pyramid.interfaces import IResponse + request = self._makeOne() + request.registry = self.config.registry + def adapter(ob): + return object() + class Foo(object): + pass + foo = Foo() + request.registry.registerAdapter(adapter, (Foo,), IResponse) + self.assertEqual(request.is_response(foo), False) + + def test_is_response_adapter_true(self): + from pyramid.interfaces import IResponse + request = self._makeOne() + request.registry = self.config.registry + class Foo(object): + pass + foo = Foo() + def adapter(ob): + return ob + request.registry.registerAdapter(adapter, (Foo,), IResponse) + self.assertEqual(request.is_response(foo), True) + + def test_json_body_invalid_json(self): + request = self._makeOne({'REQUEST_METHOD':'POST'}) + request.body = b'{' + self.assertRaises(ValueError, getattr, request, 'json_body') + + def test_json_body_valid_json(self): + request = self._makeOne({'REQUEST_METHOD':'POST'}) + request.body = b'{"a":1}' + self.assertEqual(request.json_body, {'a':1}) + + def test_json_body_alternate_charset(self): + import json + request = self._makeOne({'REQUEST_METHOD':'POST'}) + inp = text_( + b'/\xe6\xb5\x81\xe8\xa1\x8c\xe8\xb6\x8b\xe5\x8a\xbf', + 'utf-8' + ) + if PY2: + body = json.dumps({'a':inp}).decode('utf-8').encode('utf-16') + else: + body = bytes(json.dumps({'a':inp}), 'utf-16') + request.body = body + request.content_type = 'application/json; charset=utf-16' + self.assertEqual(request.json_body, {'a':inp}) + + def test_json_body_GET_request(self): + request = self._makeOne({'REQUEST_METHOD':'GET'}) + self.assertRaises(ValueError, getattr, request, 'json_body') + + def test_set_property(self): + request = self._makeOne() + opts = [2, 1] + def connect(obj): + return opts.pop() + request.set_property(connect, name='db') + self.assertEqual(1, request.db) + self.assertEqual(2, request.db) + + def test_set_property_reify(self): + request = self._makeOne() + opts = [2, 1] + def connect(obj): + return opts.pop() + request.set_property(connect, name='db', reify=True) + self.assertEqual(1, request.db) + self.assertEqual(1, request.db) + +class Test_route_request_iface(unittest.TestCase): + def _callFUT(self, name): + from pyramid.request import route_request_iface + return route_request_iface(name) + + def test_it(self): + iface = self._callFUT('routename') + self.assertEqual(iface.__name__, 'routename_IRequest') + self.assertTrue(hasattr(iface, 'combined')) + self.assertEqual(iface.combined.__name__, 'routename_combined_IRequest') + + def test_it_routename_with_spaces(self): + # see https://github.com/Pylons/pyramid/issues/232 + iface = self._callFUT('routename with spaces') + self.assertEqual(iface.__name__, 'routename with spaces_IRequest') + self.assertTrue(hasattr(iface, 'combined')) + self.assertEqual(iface.combined.__name__, + 'routename with spaces_combined_IRequest') + + +class Test_add_global_response_headers(unittest.TestCase): + def _callFUT(self, request, headerlist): + from pyramid.request import add_global_response_headers + return add_global_response_headers(request, headerlist) + + def test_it(self): + request = DummyRequest() + response = DummyResponse() + self._callFUT(request, [('c', 1)]) + self.assertEqual(len(request.response_callbacks), 1) + request.response_callbacks[0](None, response) + self.assertEqual(response.headerlist, [('c', 1)] ) + +class Test_call_app_with_subpath_as_path_info(unittest.TestCase): + def _callFUT(self, request, app): + from pyramid.request import call_app_with_subpath_as_path_info + return call_app_with_subpath_as_path_info(request, app) + + def test_it_all_request_and_environment_data_missing(self): + request = DummyRequest({}) + response = self._callFUT(request, 'app') + self.assertTrue(request.copied) + self.assertEqual(response, 'app') + self.assertEqual(request.environ['SCRIPT_NAME'], '') + self.assertEqual(request.environ['PATH_INFO'], '/') + + def test_it_with_subpath_and_path_info(self): + request = DummyRequest({'PATH_INFO':'/hello'}) + request.subpath = ('hello',) + response = self._callFUT(request, 'app') + self.assertTrue(request.copied) + self.assertEqual(response, 'app') + self.assertEqual(request.environ['SCRIPT_NAME'], '') + self.assertEqual(request.environ['PATH_INFO'], '/hello') + + def test_it_with_subpath_and_path_info_path_info_endswith_slash(self): + request = DummyRequest({'PATH_INFO':'/hello/'}) + request.subpath = ('hello',) + response = self._callFUT(request, 'app') + self.assertTrue(request.copied) + self.assertEqual(response, 'app') + self.assertEqual(request.environ['SCRIPT_NAME'], '') + self.assertEqual(request.environ['PATH_INFO'], '/hello/') + + def test_it_with_subpath_and_path_info_extra_script_name(self): + request = DummyRequest({'PATH_INFO':'/hello', 'SCRIPT_NAME':'/script'}) + request.subpath = ('hello',) + response = self._callFUT(request, 'app') + self.assertTrue(request.copied) + self.assertEqual(response, 'app') + self.assertEqual(request.environ['SCRIPT_NAME'], '/script') + self.assertEqual(request.environ['PATH_INFO'], '/hello') + + def test_it_with_extra_slashes_in_path_info(self): + request = DummyRequest({'PATH_INFO':'//hello/', + 'SCRIPT_NAME':'/script'}) + request.subpath = ('hello',) + response = self._callFUT(request, 'app') + self.assertTrue(request.copied) + self.assertEqual(response, 'app') + self.assertEqual(request.environ['SCRIPT_NAME'], '/script') + self.assertEqual(request.environ['PATH_INFO'], '/hello/') + + def test_subpath_path_info_and_script_name_have_utf8(self): + encoded = native_(text_(b'La Pe\xc3\xb1a')) + decoded = text_(bytes_(encoded), 'utf-8') + request = DummyRequest({'PATH_INFO':'/' + encoded, + 'SCRIPT_NAME':'/' + encoded}) + request.subpath = (decoded, ) + response = self._callFUT(request, 'app') + self.assertTrue(request.copied) + self.assertEqual(response, 'app') + self.assertEqual(request.environ['SCRIPT_NAME'], '/' + encoded) + self.assertEqual(request.environ['PATH_INFO'], '/' + encoded) + +class Test_apply_request_extensions(unittest.TestCase): + def setUp(self): + self.config = testing.setUp() + + def tearDown(self): + testing.tearDown() + + def _callFUT(self, request, extensions=None): + from pyramid.request import apply_request_extensions + return apply_request_extensions(request, extensions=extensions) + + def test_it_with_registry(self): + from pyramid.interfaces import IRequestExtensions + extensions = Dummy() + extensions.methods = {'foo': lambda x, y: y} + extensions.descriptors = {'bar': property(lambda x: 'bar')} + self.config.registry.registerUtility(extensions, IRequestExtensions) + request = DummyRequest() + request.registry = self.config.registry + self._callFUT(request) + self.assertEqual(request.bar, 'bar') + self.assertEqual(request.foo('abc'), 'abc') + + def test_it_override_extensions(self): + from pyramid.interfaces import IRequestExtensions + ignore = Dummy() + ignore.methods = {'x': lambda x, y, z: 'asdf'} + ignore.descriptors = {'bar': property(lambda x: 'asdf')} + self.config.registry.registerUtility(ignore, IRequestExtensions) + request = DummyRequest() + request.registry = self.config.registry + + extensions = Dummy() + extensions.methods = {'foo': lambda x, y: y} + extensions.descriptors = {'bar': property(lambda x: 'bar')} + self._callFUT(request, extensions=extensions) + self.assertRaises(AttributeError, lambda: request.x) + self.assertEqual(request.bar, 'bar') + self.assertEqual(request.foo('abc'), 'abc') + +class Dummy(object): + pass + +class Test_subclassing_Request(unittest.TestCase): + def test_subclass(self): + from pyramid.interfaces import IRequest + from pyramid.request import Request + + class RequestSub(Request): + pass + + self.assertTrue(hasattr(Request, '__provides__')) + self.assertTrue(hasattr(Request, '__implemented__')) + self.assertTrue(hasattr(Request, '__providedBy__')) + self.assertFalse(hasattr(RequestSub, '__provides__')) + self.assertTrue(hasattr(RequestSub, '__providedBy__')) + self.assertTrue(hasattr(RequestSub, '__implemented__')) + + self.assertTrue(IRequest.implementedBy(RequestSub)) + # The call to implementedBy will add __provides__ to the class + self.assertTrue(hasattr(RequestSub, '__provides__')) + + + def test_subclass_with_implementer(self): + from pyramid.interfaces import IRequest + from pyramid.request import Request + from pyramid.util import InstancePropertyHelper + from zope.interface import implementer + + @implementer(IRequest) + class RequestSub(Request): + pass + + self.assertTrue(hasattr(Request, '__provides__')) + self.assertTrue(hasattr(Request, '__implemented__')) + self.assertTrue(hasattr(Request, '__providedBy__')) + self.assertTrue(hasattr(RequestSub, '__provides__')) + self.assertTrue(hasattr(RequestSub, '__providedBy__')) + self.assertTrue(hasattr(RequestSub, '__implemented__')) + + req = RequestSub({}) + helper = InstancePropertyHelper() + helper.apply_properties(req, {'b': 'b'}) + + self.assertTrue(IRequest.providedBy(req)) + self.assertTrue(IRequest.implementedBy(RequestSub)) + + def test_subclass_mutate_before_providedBy(self): + from pyramid.interfaces import IRequest + from pyramid.request import Request + from pyramid.util import InstancePropertyHelper + + class RequestSub(Request): + pass + + req = RequestSub({}) + helper = InstancePropertyHelper() + helper.apply_properties(req, {'b': 'b'}) + + self.assertTrue(IRequest.providedBy(req)) + self.assertTrue(IRequest.implementedBy(RequestSub)) + + +class DummyRequest(object): + def __init__(self, environ=None): + if environ is None: + environ = {} + self.environ = environ + + def add_response_callback(self, callback): + self.response_callbacks = [callback] + + def get_response(self, app): + return app + + def copy(self): + self.copied = True + return self + +class DummyResponse: + def __init__(self): + self.headerlist = [] + + +class DummyContext: + pass + +class DummyRoutesMapper: + raise_exc = None + def __init__(self, route=None, raise_exc=False): + self.route = route + + def get_route(self, route_name): + return self.route + +class DummyRoute: + pregenerator = None + def __init__(self, result='/1/2/3'): + self.result = result + + def generate(self, kw): + self.kw = kw + return self.result + +class DummyStaticURLInfo: + def __init__(self, result): + self.result = result + + def generate(self, path, request, **kw): + self.args = path, request, kw + return self.result |
