diff options
| author | Chris McDonough <chrism@agendaless.com> | 2009-06-11 03:15:15 +0000 |
|---|---|---|
| committer | Chris McDonough <chrism@agendaless.com> | 2009-06-11 03:15:15 +0000 |
| commit | dfc2b65c1b6d2f938f68b7868a14d8f9a4faab9e (patch) | |
| tree | f3241401b7175a401e00286b11e3efe3c21f5093 /repoze | |
| parent | f8b0065b6ede54424d7a7b49f9f113e87634b5ab (diff) | |
| download | pyramid-dfc2b65c1b6d2f938f68b7868a14d8f9a4faab9e.tar.gz pyramid-dfc2b65c1b6d2f938f68b7868a14d8f9a4faab9e.tar.bz2 pyramid-dfc2b65c1b6d2f938f68b7868a14d8f9a4faab9e.zip | |
Merge unifyroutesandtraversal branch into trunk
Diffstat (limited to 'repoze')
| -rw-r--r-- | repoze/bfg/includes/configure.zcml | 14 | ||||
| -rw-r--r-- | repoze/bfg/interfaces.py | 25 | ||||
| -rw-r--r-- | repoze/bfg/request.py | 144 | ||||
| -rw-r--r-- | repoze/bfg/router.py | 86 | ||||
| -rw-r--r-- | repoze/bfg/tests/test_authentication.py | 27 | ||||
| -rw-r--r-- | repoze/bfg/tests/test_integration.py | 24 | ||||
| -rw-r--r-- | repoze/bfg/tests/test_request.py | 170 | ||||
| -rw-r--r-- | repoze/bfg/tests/test_router.py | 125 | ||||
| -rw-r--r-- | repoze/bfg/tests/test_traversal.py | 82 | ||||
| -rw-r--r-- | repoze/bfg/tests/test_urldispatch.py | 269 | ||||
| -rw-r--r-- | repoze/bfg/tests/test_zcml.py | 196 | ||||
| -rw-r--r-- | repoze/bfg/traversal.py | 46 | ||||
| -rw-r--r-- | repoze/bfg/urldispatch.py | 136 | ||||
| -rw-r--r-- | repoze/bfg/view.py | 12 | ||||
| -rw-r--r-- | repoze/bfg/zcml.py | 93 |
15 files changed, 656 insertions, 793 deletions
diff --git a/repoze/bfg/includes/configure.zcml b/repoze/bfg/includes/configure.zcml index 048a0cbc8..7e2036476 100644 --- a/repoze/bfg/includes/configure.zcml +++ b/repoze/bfg/includes/configure.zcml @@ -16,20 +16,6 @@ for="* repoze.bfg.interfaces.IRequest" /> - <!-- URL dispatch adapters --> - - <adapter - factory="repoze.bfg.urldispatch.RoutesModelTraverser" - provides="repoze.bfg.interfaces.ITraverserFactory" - for="repoze.bfg.interfaces.IRoutesContext" - /> - - <adapter - factory="repoze.bfg.urldispatch.RoutesContextURL" - provides="repoze.bfg.interfaces.IContextURL" - for="repoze.bfg.interfaces.IRoutesContext repoze.bfg.interfaces.IRequest" - /> - <include file="meta.zcml" /> </configure> diff --git a/repoze/bfg/interfaces.py b/repoze/bfg/interfaces.py index 79ef25f09..b8f62c05f 100644 --- a/repoze/bfg/interfaces.py +++ b/repoze/bfg/interfaces.py @@ -3,13 +3,6 @@ from zope.interface import Interface from zope.component.interfaces import IObjectEvent -class IRequestFactory(Interface): - """ A utility which generates a request object """ - def __call__(): - """ Return a request factory (a callable that accepts an - environ and returns an object implementing IRequest, - e.g. ``webob.Request``)""" - class IRequest(Interface): """ Request type interface attached to all request objects """ @@ -47,7 +40,11 @@ class IView(Interface): class IRootFactory(Interface): def __call__(environ): - """ Return a root object """ + """ Return a root object based on the WSGI environ """ + +class IDefaultRootFactory(Interface): + def __call__(environ): + """ Return the *default* root object for an application """ class ITraverser(Interface): def __call__(environ): @@ -126,10 +123,6 @@ class IRouter(Interface): registry = Attribute( """Component architecture registry local to this application.""") -class IRoutesContext(Interface): - """ A context (model instance) that is created as a result of URL - dispatching""" - class INewRequest(Interface): """ An event type that is emitted whenever repoze.bfg begins to process a new request """ @@ -165,10 +158,6 @@ class ILogger(Interface): class IRoutesMapper(Interface): """ Interface representing a Routes ``Mapper`` object """ -class IContextNotFound(Interface): - """ Interface implemented by contexts generated by code which - cannot find a context during root finding or traversal """ - class IForbiddenView(Interface): """ A utility which returns an IResponse as the result of the denial of a view invocation by a security policy.""" @@ -210,10 +199,6 @@ class IContextURL(Interface): def __call__(): """ Return a URL that points to the context """ -class IRoutesContextFactory(Interface): - """ A marker interface used to look up the default routes context factory - """ - class IAuthenticationPolicy(Interface): """ An object representing a BFG authentication policy. """ def authenticated_userid(request): diff --git a/repoze/bfg/request.py b/repoze/bfg/request.py index 71f639626..e5482848b 100644 --- a/repoze/bfg/request.py +++ b/repoze/bfg/request.py @@ -1,7 +1,14 @@ from zope.interface import implements from webob import Request as WebobRequest -import repoze.bfg.interfaces +from zope.interface.interface import InterfaceClass + +from repoze.bfg.interfaces import IRequest +from repoze.bfg.interfaces import IGETRequest +from repoze.bfg.interfaces import IPOSTRequest +from repoze.bfg.interfaces import IPUTRequest +from repoze.bfg.interfaces import IDELETERequest +from repoze.bfg.interfaces import IHEADRequest from repoze.bfg.threadlocal import manager @@ -73,6 +80,25 @@ def current_request(): """ return manager.get()['request'] +def request_factory(environ): + try: + method = environ['REQUEST_METHOD'] + except KeyError: + method = None + + if 'bfg.routes.route' in environ: + route = environ['bfg.routes.route'] + request_factories = route.request_factories + else: + request_factories = DEFAULT_REQUEST_FACTORIES + + try: + request_factory = request_factories[method]['factory'] + except KeyError: + request_factory = request_factories[None]['factory'] + + return request_factory(environ) + def make_request_ascii(event): """ An event handler that causes the request charset to be ASCII; used as an INewRequest subscriber so code written before 0.7.0 can @@ -80,53 +106,71 @@ def make_request_ascii(event): request = event.request request.charset = None -class Request(WebobRequest): - implements(repoze.bfg.interfaces.IRequest) - charset = 'utf-8' - -# We use 'precooked' Request subclasses that correspond to HTTP -# request methods within ``router.py`` when constructing a request -# object rather than using ``alsoProvides`` to attach the proper -# interface to an unsubclassed webob.Request. This pattern is purely -# an optimization (e.g. preventing calls to ``alsoProvides`` means the -# difference between 590 r/s and 690 r/s on a MacBook 2GHz). These -# classes are *not* APIs. None of these classes, nor the -# ``HTTP_METHOD_FACTORIES`` or ``HTTP_METHOD_INTERFACES`` lookup dicts -# should be imported directly by user code. - -class GETRequest(WebobRequest): - implements(repoze.bfg.interfaces.IGETRequest) - charset = 'utf-8' - -class POSTRequest(WebobRequest): - implements(repoze.bfg.interfaces.IPOSTRequest) - charset = 'utf-8' - -class PUTRequest(WebobRequest): - implements(repoze.bfg.interfaces.IPUTRequest) - charset = 'utf-8' - -class DELETERequest(WebobRequest): - implements(repoze.bfg.interfaces.IDELETERequest) - charset = 'utf-8' - -class HEADRequest(WebobRequest): - implements(repoze.bfg.interfaces.IHEADRequest) - charset = 'utf-8' - -HTTP_METHOD_FACTORIES = { - 'GET':GETRequest, - 'POST':POSTRequest, - 'PUT':PUTRequest, - 'DELETE':DELETERequest, - 'HEAD':HEADRequest, - } - -HTTP_METHOD_INTERFACES = { - 'GET':repoze.bfg.interfaces.IGETRequest, - 'POST':repoze.bfg.interfaces.IPOSTRequest, - 'PUT':repoze.bfg.interfaces.IPUTRequest, - 'DELETE':repoze.bfg.interfaces.IDELETERequest, - 'HEAD':repoze.bfg.interfaces.IHEADRequest, - } +def named_request_factories(name=None): + # We use 'precooked' Request subclasses that correspond to HTTP + # request methods when returning a request object from + # ``request_factory`` rather than using ``alsoProvides`` to attach + # the proper interface to an unsubclassed webob.Request. This + # pattern is purely an optimization (e.g. preventing calls to + # ``alsoProvides`` means the difference between 590 r/s and 690 + # r/s on a MacBook 2GHz). This method should be never imported + # directly by user code; it is *not* an API. + if name is None: + default_iface = IRequest + get_iface = IGETRequest + post_iface = IPOSTRequest + put_iface = IPUTRequest + delete_iface = IDELETERequest + head_iface = IHEADRequest + else: + default_iface = InterfaceClass('%s_IRequest' % name) + get_iface = InterfaceClass('%s_IGETRequest' % name, (default_iface,)) + post_iface = InterfaceClass('%s_IPOSTRequest' % name, (default_iface,)) + put_iface = InterfaceClass('%s_IPUTRequest' % name, (default_iface,)) + delete_iface = InterfaceClass('%s_IDELETERequest' % name, + (default_iface,)) + head_iface = InterfaceClass('%s_IHEADRequest' % name, (default_iface)) + + class Request(WebobRequest): + implements(default_iface) + charset = 'utf-8' + + class GETRequest(WebobRequest): + implements(get_iface) + charset = 'utf-8' + + class POSTRequest(WebobRequest): + implements(post_iface) + charset = 'utf-8' + class PUTRequest(WebobRequest): + implements(put_iface) + charset = 'utf-8' + + class DELETERequest(WebobRequest): + implements(delete_iface) + charset = 'utf-8' + + class HEADRequest(WebobRequest): + implements(head_iface) + charset = 'utf-8' + + factories = { + IRequest:{'interface':default_iface, 'factory':Request}, + IGETRequest:{'interface':get_iface, 'factory':GETRequest}, + IPOSTRequest:{'interface':post_iface, 'factory':POSTRequest}, + IPUTRequest:{'interface':put_iface, 'factory':PUTRequest}, + IDELETERequest:{'interface':delete_iface, 'factory':DELETERequest}, + IHEADRequest:{'interface':head_iface, 'factory':HEADRequest}, + None:{'interface':default_iface, 'factory':Request}, + 'GET':{'interface':get_iface, 'factory':GETRequest}, + 'POST':{'interface':post_iface, 'factory':POSTRequest}, + 'PUT':{'interface':put_iface, 'factory':PUTRequest}, + 'DELETE':{'interface':delete_iface, 'factory':DELETERequest}, + 'HEAD':{'interface':head_iface, 'factory':HEADRequest}, + } + + return factories + +DEFAULT_REQUEST_FACTORIES = named_request_factories() + diff --git a/repoze/bfg/router.py b/repoze/bfg/router.py index 0c00b239c..d8a55fe7b 100644 --- a/repoze/bfg/router.py +++ b/repoze/bfg/router.py @@ -16,7 +16,6 @@ from repoze.bfg.events import WSGIApplicationCreatedEvent from repoze.bfg.interfaces import ILogger from repoze.bfg.interfaces import ISecurityPolicy -from repoze.bfg.interfaces import IRequestFactory from repoze.bfg.interfaces import IResponseFactory from repoze.bfg.interfaces import IRootFactory from repoze.bfg.interfaces import IRouter @@ -30,16 +29,14 @@ from repoze.bfg.interfaces import IView from repoze.bfg.interfaces import IViewPermission from repoze.bfg.interfaces import IAuthorizationPolicy from repoze.bfg.interfaces import IAuthenticationPolicy -from repoze.bfg.interfaces import IRoutesContext -from repoze.bfg.interfaces import IRoutesContextFactory +from repoze.bfg.interfaces import IDefaultRootFactory from repoze.bfg.log import make_stream_logger from repoze.bfg.registry import Registry from repoze.bfg.registry import populateRegistry -from repoze.bfg.request import HTTP_METHOD_FACTORIES -from repoze.bfg.request import Request +from repoze.bfg.request import request_factory from repoze.bfg.secpols import registerBBBAuthn @@ -68,8 +65,6 @@ class Router(object): self.registry = registry self.logger = registry.queryUtility(ILogger, 'repoze.bfg.debug') - self.request_factory = registry.queryUtility(IRequestFactory) - forbidden = None unauthorized_app_factory = registry.queryUtility( @@ -129,7 +124,8 @@ class Router(object): self.secured = not not registry.queryUtility(IAuthenticationPolicy) - self.root_factory = registry.getUtility(IRootFactory) + self.root_factory = registry.queryUtility(IRootFactory, + default=DefaultRootFactory) self.root_policy = self.root_factory # b/w compat self.traverser_warned = {} @@ -145,33 +141,13 @@ class Router(object): threadlocals = {'registry':registry, 'request':None} self.threadlocal_manager.push(threadlocals) - - def respond(response, view_name): - registry.has_listeners and registry.notify(NewResponse(response)) - try: - start_response(response.status, response.headerlist) - return response.app_iter - except AttributeError: - raise ValueError( - 'Non-response object returned from view %s: %r' % - (view_name, response)) - + try: - if self.request_factory is None: - method = environ['REQUEST_METHOD'] - try: - # for speed we disuse HTTP_METHOD_FACTORIES.get - request_factory = HTTP_METHOD_FACTORIES[method] - except KeyError: - request_factory = Request - else: - request_factory = self.request_factory - + root = self.root_factory(environ) request = request_factory(environ) threadlocals['request'] = request - registry.has_listeners and registry.notify(NewRequest(request)) - root = self.root_factory(environ) + tdict = _traverse(root, environ) if '_deprecation_warning' in tdict: warning = tdict.pop('_deprecation_warning') @@ -198,6 +174,17 @@ class Router(object): request.virtual_root = vroot request.virtual_root_path = vroot_path + def respond(response, view_name): + registry.has_listeners and registry.notify( + NewResponse(response)) + try: + start_response(response.status, response.headerlist) + return response.app_iter + except AttributeError: + raise ValueError( + 'Non-response object returned from view %s: %r' % + (view_name, response)) + if self.secured: permitted = registry.queryMultiAdapter((context, request), @@ -299,10 +286,11 @@ def make_app(root_factory, package=None, filename='configure.zcml', ``repoze.bfg`` WSGI application. ``root_factory`` must be a callable that accepts a WSGI - environment and returns a traversal root object. It may be - ``None``, in which case traversal is not performed at all. - Instead, all URL-to-code mapping is done via URL dispatch (aka - Routes). + environment and returns a traversal root object. The traversal + root returned by the root factory is the *default* traversal root; + it can be overridden on a per-view basis. ``root_factory`` may be + ``None``, in which case a 'default default' traversal root is + used. ``package`` is a Python module representing the application's package. It is optional, defaulting to ``None``. If ``package`` @@ -360,19 +348,17 @@ def make_app(root_factory, package=None, filename='configure.zcml', authorization_policy = ACLAuthorizationPolicy() registry.registerUtility(authorization_policy, IAuthorizationPolicy) + if root_factory is None: + root_factory = DefaultRootFactory + + # register the *default* root factory so we can find it later + registry.registerUtility(root_factory, IDefaultRootFactory) + mapper = RoutesRootFactory(root_factory) registry.registerUtility(mapper, IRoutesMapper) populateRegistry(registry, filename, package) - context_factory = registry.queryUtility( - IRoutesContextFactory, - default=mapper.default_context_factory) - - if IRoutesContext.implementedBy(context_factory): - mapper.decorate_context = False - mapper.default_context_factory = context_factory - if not authentication_policy: # deal with bw compat of <= 0.8 security policies (deprecated) secpol = registry.queryUtility(ISecurityPolicy) @@ -390,13 +376,10 @@ def make_app(root_factory, package=None, filename='configure.zcml', if mapper.has_routes(): # if the user had any <route/> statements in his configuration, - # use the RoutesRootFactory as the root factory + # use the RoutesRootFactory as the IRootFactory; otherwise use the + # default root factory (optimization; we don't want to go through + # the Routes logic if we know there are no routes to match) root_factory = mapper - else: - # otherwise, use only the supplied root_factory (unless it's None) - if root_factory is None: - raise ValueError( - 'root_factory (aka get_root) was None and no routes connected') registry.registerUtility(root_factory, IRootFactory) @@ -415,3 +398,8 @@ def make_app(root_factory, package=None, filename='configure.zcml', return app +class DefaultRootFactory: + __parent__ = None + __name__ = None + def __init__(self, environ): + pass diff --git a/repoze/bfg/tests/test_authentication.py b/repoze/bfg/tests/test_authentication.py index 09782cd3c..258fadfd2 100644 --- a/repoze/bfg/tests/test_authentication.py +++ b/repoze/bfg/tests/test_authentication.py @@ -361,20 +361,23 @@ class TestAuthTktCookieHelper(unittest.TestCase): plugin = self._makeOne('secret') old_val = self._makeTicket(userid='userid') request = self._makeRequest({'HTTP_COOKIE':'auth_tkt=%s' % old_val}) - new_val = self._makeTicket(userid='other', userdata='userdata') result = plugin.remember(request, 'other', userdata='userdata') self.assertEqual(len(result), 3) - self.assertEqual(result[0], - ('Set-Cookie', - 'auth_tkt="%s"; Path=/' % new_val)) - self.assertEqual(result[1], - ('Set-Cookie', - 'auth_tkt="%s"; Path=/; Domain=localhost' - % new_val)) - self.assertEqual(result[2], - ('Set-Cookie', - 'auth_tkt="%s"; Path=/; Domain=.localhost' - % new_val)) + + self.assertEqual(result[0][0], 'Set-Cookie') + self.failUnless(result[0][1].endswith('; Path=/')) + self.failUnless(result[0][1].startswith('auth_tkt=')) + self.failIf(result[0][1].startswith('auth_tkt="%s"' % old_val)) + + self.assertEqual(result[1][0], 'Set-Cookie') + self.failUnless(result[1][1].endswith('; Path=/; Domain=localhost')) + self.failUnless(result[1][1].startswith('auth_tkt=')) + self.failIf(result[1][1].startswith('auth_tkt="%s"' % old_val)) + + self.assertEqual(result[2][0], 'Set-Cookie') + self.failUnless(result[2][1].endswith('; Path=/; Domain=.localhost')) + self.failUnless(result[2][1].startswith('auth_tkt=')) + self.failIf(result[2][1].startswith('auth_tkt="%s"' % old_val)) def test_remember_creds_different_include_ip(self): plugin = self._makeOne('secret', include_ip=True) diff --git a/repoze/bfg/tests/test_integration.py b/repoze/bfg/tests/test_integration.py index 9c2d87135..107c8ae89 100644 --- a/repoze/bfg/tests/test_integration.py +++ b/repoze/bfg/tests/test_integration.py @@ -19,6 +19,10 @@ def wsgiapptest(environ, start_response): """ """ return '123' +def _getRequestInterface(name_or_iface=None): + from repoze.bfg.request import DEFAULT_REQUEST_FACTORIES + return DEFAULT_REQUEST_FACTORIES[name_or_iface]['interface'] + class WGSIAppPlusBFGViewTests(unittest.TestCase): def setUp(self): cleanUp() @@ -36,7 +40,7 @@ class WGSIAppPlusBFGViewTests(unittest.TestCase): self.assertEqual(result, '123') def test_scanned(self): - from repoze.bfg.interfaces import IRequest + IRequest = _getRequestInterface() from repoze.bfg.interfaces import IView from repoze.bfg.zcml import scan context = DummyContext() @@ -76,7 +80,6 @@ class PushPagePlusBFGViewTests(unittest.TestCase): self.assertEqual(result.status, '200 OK') def test_scanned(self): - from repoze.bfg.interfaces import IRequest from repoze.bfg.interfaces import IView from repoze.bfg.zcml import scan context = DummyContext() @@ -85,6 +88,7 @@ class PushPagePlusBFGViewTests(unittest.TestCase): actions = context.actions self.assertEqual(len(actions), 2) action = actions[0] + IRequest = _getRequestInterface() self.assertEqual(action['args'], ('registerAdapter', pushtest, (INothing, IRequest), IView, '', None)) @@ -139,7 +143,6 @@ class TestGrokkedApp(unittest.TestCase): def test_it(self): import inspect from repoze.bfg.interfaces import IPOSTRequest - from repoze.bfg.interfaces import IRequest from repoze.bfg.interfaces import IView import repoze.bfg.tests.grokkedapp as package from zope.configuration import config @@ -150,22 +153,25 @@ class TestGrokkedApp(unittest.TestCase): xmlconfig.include(context, 'configure.zcml', package) actions = context.actions + post_iface = _getRequestInterface(IPOSTRequest) + request_iface = _getRequestInterface() + postview = actions[-1] self.assertEqual(postview[0][1], None) self.assertEqual(postview[0][2], '') - self.assertEqual(postview[0][3], IPOSTRequest) + self.assertEqual(postview[0][3], post_iface) self.assertEqual(postview[0][4], IView) self.assertEqual(postview[2][1], package.grokked_post) - self.assertEqual(postview[2][2], (None, IPOSTRequest)) + self.assertEqual(postview[2][2], (None, post_iface)) self.assertEqual(postview[2][3], IView) klassview = actions[-2] self.assertEqual(klassview[0][1], None) self.assertEqual(klassview[0][2], 'grokked_klass') - self.assertEqual(klassview[0][3], IRequest) + self.assertEqual(klassview[0][3], request_iface) self.assertEqual(klassview[0][4], IView) self.assertEqual(klassview[2][1], package.grokked_klass) - self.assertEqual(klassview[2][2], (None, IRequest)) + self.assertEqual(klassview[2][2], (None, request_iface)) self.assertEqual(klassview[2][3], IView) self.failUnless(inspect.isfunction(package.grokked_klass)) self.assertEqual(package.grokked_klass(None, None), None) @@ -173,10 +179,10 @@ class TestGrokkedApp(unittest.TestCase): funcview = actions[-3] self.assertEqual(funcview[0][1], None) self.assertEqual(funcview[0][2], '') - self.assertEqual(funcview[0][3], IRequest) + self.assertEqual(funcview[0][3], request_iface) self.assertEqual(funcview[0][4], IView) self.assertEqual(funcview[2][1], package.grokked) - self.assertEqual(funcview[2][2], (None, IRequest)) + self.assertEqual(funcview[2][2], (None, request_iface)) self.assertEqual(funcview[2][3], IView) class DummyContext: diff --git a/repoze/bfg/tests/test_request.py b/repoze/bfg/tests/test_request.py index 4771c3e1c..9cae6643a 100644 --- a/repoze/bfg/tests/test_request.py +++ b/repoze/bfg/tests/test_request.py @@ -13,8 +13,8 @@ class TestMakeRequestASCII(unittest.TestCase): class TestSubclassedRequest(unittest.TestCase): def _getTargetClass(self): - from repoze.bfg.request import Request - return Request + from repoze.bfg.request import DEFAULT_REQUEST_FACTORIES + return DEFAULT_REQUEST_FACTORIES[None]['factory'] def _makeOne(self, environ): request = self._getTargetClass()(environ) @@ -56,6 +56,172 @@ class TestCurrentRequest(unittest.TestCase): manager.pop() self.assertEqual(self._callFUT(), None) +class TestRequestFactory(unittest.TestCase): + def _callFUT(self, environ): + from repoze.bfg.request import request_factory + return request_factory(environ) + + def _getRequestFactory(self, name_or_iface=None): + from repoze.bfg.request import DEFAULT_REQUEST_FACTORIES + return DEFAULT_REQUEST_FACTORIES[name_or_iface]['factory'] + + def _makeRoute(self): + route = DummyRoute() + factories = {} + def factory(environ): + return environ + for name in (None, 'GET', 'POST', 'PUT', 'DELETE', 'HEAD'): + factories[name] = {'factory':factory} + route.request_factories = factories + return route + + def test_no_route_no_request_method(self): + from repoze.bfg.interfaces import IRequest + result = self._callFUT({}) + self.assertEqual(result.__class__, self._getRequestFactory()) + self.failUnless(IRequest.providedBy(result)) + + def test_no_route_unknown(self): + from repoze.bfg.interfaces import IRequest + result = self._callFUT({'REQUEST_METHOD':'UNKNOWN'}) + self.assertEqual(result.__class__, self._getRequestFactory()) + self.failUnless(IRequest.providedBy(result)) + + def test_no_route_get(self): + from repoze.bfg.interfaces import IGETRequest + result = self._callFUT({'REQUEST_METHOD':'GET'}) + self.assertEqual(result.__class__, self._getRequestFactory('GET')) + self.failUnless(IGETRequest.providedBy(result)) + + def test_no_route_post(self): + from repoze.bfg.interfaces import IPOSTRequest + result = self._callFUT({'REQUEST_METHOD':'POST'}) + self.assertEqual(result.__class__, self._getRequestFactory('POST')) + self.failUnless(IPOSTRequest.providedBy(result)) + + def test_no_route_put(self): + from repoze.bfg.interfaces import IPUTRequest + result = self._callFUT({'REQUEST_METHOD':'PUT'}) + self.assertEqual(result.__class__, self._getRequestFactory('PUT')) + self.failUnless(IPUTRequest.providedBy(result)) + + def test_no_route_delete(self): + from repoze.bfg.interfaces import IDELETERequest + result = self._callFUT({'REQUEST_METHOD':'DELETE'}) + self.assertEqual(result.__class__, self._getRequestFactory('DELETE')) + self.failUnless(IDELETERequest.providedBy(result)) + + def test_no_route_head(self): + from repoze.bfg.interfaces import IHEADRequest + result = self._callFUT({'REQUEST_METHOD':'HEAD'}) + self.assertEqual(result.__class__, self._getRequestFactory('HEAD')) + self.failUnless(IHEADRequest.providedBy(result)) + + def test_route_no_request_method(self): + route = self._makeRoute() + environ = {'bfg.routes.route':route} + result = self._callFUT(environ) + self.assertEqual(result, environ) + + def test_route_unknown(self): + route = self._makeRoute() + environ = {'bfg.routes.route':route, 'REQUEST_METHOD':'UNKNOWN'} + result = self._callFUT(environ) + self.assertEqual(result, environ) + + def test_route_known(self): + route = self._makeRoute() + environ = {'bfg.routes.route':route, 'REQUEST_METHOD':'GET'} + result = self._callFUT(environ) + self.assertEqual(result, environ) + +class TestNamedRequestFactories(unittest.TestCase): + def _callFUT(self, name): + from repoze.bfg.request import named_request_factories + return named_request_factories(name) + + def test_it_unnamed(self): + factories = self._callFUT(None) + from repoze.bfg.interfaces import IRequest + from repoze.bfg.interfaces import IGETRequest + from repoze.bfg.interfaces import IPOSTRequest + from repoze.bfg.interfaces import IPUTRequest + from repoze.bfg.interfaces import IDELETERequest + from repoze.bfg.interfaces import IHEADRequest + for alias, iface in ( + (None, IRequest), + ('GET', IGETRequest), + ('POST', IPOSTRequest), + ('PUT', IPUTRequest), + ('DELETE', IDELETERequest), + ('HEAD', IHEADRequest), + ): + self.failUnless(alias in factories) + self.failUnless(iface in factories) + self.assertEqual(factories[alias], factories[iface]) + named_iface = factories[alias]['interface'] + named_factory = factories[alias]['factory'] + self.failUnless(named_iface.implementedBy(named_factory)) + self.assertEqual(factories[alias]['interface'], iface) + self.assertEqual(factories[iface]['interface'], iface) + self.assertEqual(factories[alias]['factory'].charset, 'utf-8') + + def test_it_named(self): + factories = self._callFUT('name') + from zope.interface.interface import InterfaceClass + from repoze.bfg.interfaces import IRequest + from repoze.bfg.interfaces import IGETRequest + from repoze.bfg.interfaces import IPOSTRequest + from repoze.bfg.interfaces import IPUTRequest + from repoze.bfg.interfaces import IDELETERequest + from repoze.bfg.interfaces import IHEADRequest + for alias, iface in ( + (None, IRequest), + ('GET', IGETRequest), + ('POST', IPOSTRequest), + ('PUT', IPUTRequest), + ('DELETE', IDELETERequest), + ('HEAD', IHEADRequest), + ): + self.failUnless(alias in factories) + self.failUnless(iface in factories) + self.assertEqual(factories[alias], factories[iface]) + self.assertEqual(factories[alias]['factory'].charset, 'utf-8') + named_iface = factories[alias]['interface'] + named_factory = factories[alias]['factory'] + self.failUnless(named_iface.implementedBy(named_factory)) + +class TestDefaultRequestFactories(unittest.TestCase): + def test_it(self): + from repoze.bfg.request import DEFAULT_REQUEST_FACTORIES as factories + from repoze.bfg.interfaces import IRequest + from repoze.bfg.interfaces import IGETRequest + from repoze.bfg.interfaces import IPOSTRequest + from repoze.bfg.interfaces import IPUTRequest + from repoze.bfg.interfaces import IDELETERequest + from repoze.bfg.interfaces import IHEADRequest + for alias, iface in ( + (None, IRequest), + ('GET', IGETRequest), + ('POST', IPOSTRequest), + ('PUT', IPUTRequest), + ('DELETE', IDELETERequest), + ('HEAD', IHEADRequest), + ): + self.failUnless(alias in factories) + self.failUnless(iface in factories) + self.assertEqual(factories[alias], factories[iface]) + named_iface = factories[alias]['interface'] + named_factory = factories[alias]['factory'] + self.failUnless(named_iface.implementedBy(named_factory)) + self.assertEqual(factories[alias]['interface'], iface) + self.assertEqual(factories[iface]['interface'], iface) + self.assertEqual(factories[alias]['factory'].charset, 'utf-8') + + +class DummyRoute: + pass + class DummyRequest: pass diff --git a/repoze/bfg/tests/test_router.py b/repoze/bfg/tests/test_router.py index 6e18f6089..589843b0f 100644 --- a/repoze/bfg/tests/test_router.py +++ b/repoze/bfg/tests/test_router.py @@ -123,7 +123,7 @@ class RouterTests(unittest.TestCase): environ = self._makeEnviron() context = DummyContext() self._registerTraverserFactory(context) - rootfactory = self._registerRootFactory(None) + rootfactory = self._registerRootFactory('abc') router = self._makeOne() self.assertEqual(router.root_policy, rootfactory) @@ -132,13 +132,11 @@ class RouterTests(unittest.TestCase): def app(): """ """ self.registry.registerUtility(app, IForbiddenView) - self._registerRootFactory(None) router = self._makeOne() self.assertEqual(router.forbidden_view, app) def test_iforbiddenview_nooverride(self): context = DummyContext() - self._registerRootFactory(None) router = self._makeOne() from repoze.bfg.router import default_forbidden_view self.assertEqual(router.forbidden_view, default_forbidden_view) @@ -148,13 +146,11 @@ class RouterTests(unittest.TestCase): def app(): """ """ self.registry.registerUtility(app, INotFoundView) - self._registerRootFactory(None) router = self._makeOne() self.assertEqual(router.notfound_view, app) def test_inotfoundview_nooverride(self): context = DummyContext() - self._registerRootFactory(None) router = self._makeOne() from repoze.bfg.router import default_notfound_view self.assertEqual(router.notfound_view, default_notfound_view) @@ -164,7 +160,6 @@ class RouterTests(unittest.TestCase): environ = self._makeEnviron() context = DummyContext() self._registerTraverserFactory(context) - rootfactory = self._registerRootFactory(None) logger = self._registerLogger() def factory(): return 'yo' @@ -183,7 +178,6 @@ class RouterTests(unittest.TestCase): environ = self._makeEnviron() context = DummyContext() self._registerTraverserFactory(context) - rootfactory = self._registerRootFactory(None) logger = self._registerLogger() def factory(): return 'yo' @@ -202,7 +196,6 @@ class RouterTests(unittest.TestCase): context = DummyContext() self._registerTraverserFactory(context) logger = self._registerLogger() - self._registerRootFactory(None) router = self._makeOne() start_response = DummyStartResponse() result = router(environ, start_response) @@ -214,29 +207,12 @@ class RouterTests(unittest.TestCase): self.failIf('debug_notfound' in result[0]) self.assertEqual(len(logger.messages), 0) - def test_call_root_is_icontextnotfound(self): - from zope.interface import implements - from repoze.bfg.interfaces import IContextNotFound - class NotFound(object): - implements(IContextNotFound) - context = NotFound() - self._registerTraverserFactory(context) - environ = self._makeEnviron() - start_response = DummyStartResponse() - self._registerRootFactory(NotFound()) - router = self._makeOne() - result = router(environ, start_response) - status = start_response.status - self.assertEqual(status, '404 Not Found') - self.failUnless('http://localhost:8080' in result[0], result) - def test_call_no_view_registered_debug_notfound_false(self): environ = self._makeEnviron() context = DummyContext() self._registerTraverserFactory(context) logger = self._registerLogger() self._registerSettings(debug_notfound=False) - self._registerRootFactory(None) router = self._makeOne() start_response = DummyStartResponse() result = router(environ, start_response) @@ -254,7 +230,6 @@ class RouterTests(unittest.TestCase): self._registerTraverserFactory(context) self._registerSettings(debug_notfound=True) logger = self._registerLogger() - self._registerRootFactory(None) router = self._makeOne() start_response = DummyStartResponse() result = router(environ, start_response) @@ -282,7 +257,6 @@ class RouterTests(unittest.TestCase): environ = self._makeEnviron() view = make_view('abc') self._registerView(view, '', None, None) - self._registerRootFactory(None) router = self._makeOne() start_response = DummyStartResponse() self.assertRaises(ValueError, router, environ, start_response) @@ -292,7 +266,6 @@ class RouterTests(unittest.TestCase): context = DummyContext() environ = self._makeEnviron() self._registerTraverserFactory(context) - self._registerRootFactory(None) def app(context, request): """ """ self.registry.registerUtility(app, INotFoundView) @@ -318,7 +291,6 @@ class RouterTests(unittest.TestCase): environ = self._makeEnviron() self._registerView(view, '', IContext, IRequest) checker = self._registerViewPermission('', denied) - self._registerRootFactory(None) def app(context, request): """ """ self.registry.registerUtility(app, IForbiddenView) @@ -334,7 +306,7 @@ class RouterTests(unittest.TestCase): view = make_view(response) environ = self._makeEnviron() self._registerView(view, '', None, None) - self._registerRootFactory(None) + rootfactory = self._registerRootFactory(context) router = self._makeOne() start_response = DummyStartResponse() result = router(environ, start_response) @@ -344,7 +316,7 @@ class RouterTests(unittest.TestCase): self.assertEqual(environ['webob.adhoc_attrs']['view_name'], '') self.assertEqual(environ['webob.adhoc_attrs']['subpath'], []) self.assertEqual(environ['webob.adhoc_attrs']['context'], context) - self.assertEqual(environ['webob.adhoc_attrs']['root'], None) + self.assertEqual(environ['webob.adhoc_attrs']['root'], context) def test_call_deprecation_warning(self): context = DummyContext() @@ -354,7 +326,6 @@ class RouterTests(unittest.TestCase): view = make_view(response) environ = self._makeEnviron() self._registerView(view, '', None, None) - self._registerRootFactory(None) router = self._makeOne() logger = self._registerLogger() router.logger = logger @@ -368,12 +339,12 @@ class RouterTests(unittest.TestCase): self._registerTraverserFactory(context, view_name='foo', subpath=['bar'], traversed=['context']) + rootfactory = self._registerRootFactory(context) response = DummyResponse() response.app_iter = ['Hello world'] view = make_view(response) environ = self._makeEnviron() self._registerView(view, 'foo', None, None) - self._registerRootFactory(None) router = self._makeOne() start_response = DummyStartResponse() result = router(environ, start_response) @@ -383,7 +354,7 @@ class RouterTests(unittest.TestCase): self.assertEqual(environ['webob.adhoc_attrs']['view_name'], 'foo') self.assertEqual(environ['webob.adhoc_attrs']['subpath'], ['bar']) self.assertEqual(environ['webob.adhoc_attrs']['context'], context) - self.assertEqual(environ['webob.adhoc_attrs']['root'], None) + self.assertEqual(environ['webob.adhoc_attrs']['root'], context) def test_call_view_registered_specific_success(self): from zope.interface import Interface @@ -394,12 +365,12 @@ class RouterTests(unittest.TestCase): context = DummyContext() directlyProvides(context, IContext) self._registerTraverserFactory(context) + rootfactory = self._registerRootFactory(context) response = DummyResponse() response.app_iter = ['Hello world'] view = make_view(response) environ = self._makeEnviron() self._registerView(view, '', IContext, IRequest) - self._registerRootFactory(None) router = self._makeOne() start_response = DummyStartResponse() result = router(environ, start_response) @@ -409,7 +380,7 @@ class RouterTests(unittest.TestCase): self.assertEqual(environ['webob.adhoc_attrs']['view_name'], '') self.assertEqual(environ['webob.adhoc_attrs']['subpath'], []) self.assertEqual(environ['webob.adhoc_attrs']['context'], context) - self.assertEqual(environ['webob.adhoc_attrs']['root'], None) + self.assertEqual(environ['webob.adhoc_attrs']['root'], context) def test_call_view_registered_specific_fail(self): from zope.interface import Interface @@ -426,7 +397,6 @@ class RouterTests(unittest.TestCase): view = make_view(response) environ = self._makeEnviron() self._registerView(view, '', IContext, IRequest) - self._registerRootFactory(None) router = self._makeOne() start_response = DummyStartResponse() result = router(environ, start_response) @@ -446,7 +416,6 @@ class RouterTests(unittest.TestCase): view = make_view(response) environ = self._makeEnviron() self._registerView(view, '', IContext, IRequest) - self._registerRootFactory(None) router = self._makeOne() start_response = DummyStartResponse() result = router(environ, start_response) @@ -466,7 +435,6 @@ class RouterTests(unittest.TestCase): view = make_view(response) environ = self._makeEnviron() self._registerView(view, '', IContext, IRequest) - self._registerRootFactory(None) router = self._makeOne() router.debug_authorization = True start_response = DummyStartResponse() @@ -490,7 +458,6 @@ class RouterTests(unittest.TestCase): view = make_view(response) environ = self._makeEnviron() self._registerView(view, '', IContext, IRequest) - self._registerRootFactory(None) router = self._makeOne() router.debug_authorization = True start_response = DummyStartResponse() @@ -514,7 +481,6 @@ class RouterTests(unittest.TestCase): view = make_view(response) environ = self._makeEnviron() self._registerView(view, '', IContext, IRequest) - self._registerRootFactory(None) router = self._makeOne() router.debug_authorization = False start_response = DummyStartResponse() @@ -537,7 +503,6 @@ class RouterTests(unittest.TestCase): environ = self._makeEnviron() self._registerView(view, '', IContext, IRequest) checker = self._registerViewPermission('', True) - self._registerRootFactory(None) router = self._makeOne() start_response = DummyStartResponse() result = router(environ, start_response) @@ -561,7 +526,6 @@ class RouterTests(unittest.TestCase): environ = self._makeEnviron() self._registerView(view, '', IContext, IRequest) checker = self._registerViewPermission('', denied) - self._registerRootFactory(None) router = self._makeOne() start_response = DummyStartResponse() result = router(environ, start_response) @@ -588,7 +552,6 @@ class RouterTests(unittest.TestCase): self._registerView(view, '', IContext, IRequest) checker = self._registerViewPermission('', denied) self._registerSettings(debug_authorization=False) - self._registerRootFactory(None) router = self._makeOne() start_response = DummyStartResponse() result = router(environ, start_response) @@ -616,7 +579,6 @@ class RouterTests(unittest.TestCase): checker = self._registerViewPermission('', allowed) self._registerSettings(debug_authorization=True) logger = self._registerLogger() - self._registerRootFactory(None) router = self._makeOne() start_response = DummyStartResponse() result = router(environ, start_response) @@ -650,7 +612,6 @@ class RouterTests(unittest.TestCase): from repoze.bfg.interfaces import INewResponse request_events = self._registerEventListener(INewRequest) response_events = self._registerEventListener(INewResponse) - self._registerRootFactory(None) router = self._makeOne() start_response = DummyStartResponse() result = router(environ, start_response) @@ -667,7 +628,6 @@ class RouterTests(unittest.TestCase): view = make_view(response) environ = self._makeEnviron() self._registerView(view, '', None, None) - self._registerRootFactory(None) router = self._makeOne() start_response = DummyStartResponse() router.threadlocal_manager = DummyThreadLocalManager() @@ -687,7 +647,6 @@ class RouterTests(unittest.TestCase): view = make_view(response) environ = self._makeEnviron(REQUEST_METHOD='POST') self._registerView(view, '', None, None) - self._registerRootFactory(None) router = self._makeOne() start_response = DummyStartResponse() request_events = self._registerEventListener(INewRequest) @@ -709,7 +668,6 @@ class RouterTests(unittest.TestCase): view = make_view(response) environ = self._makeEnviron(REQUEST_METHOD='PUT') self._registerView(view, '', None, None) - self._registerRootFactory(None) router = self._makeOne() start_response = DummyStartResponse() request_events = self._registerEventListener(INewRequest) @@ -729,7 +687,6 @@ class RouterTests(unittest.TestCase): view = make_view(response) environ = self._makeEnviron(REQUEST_METHOD='UNKNOWN') self._registerView(view, '', None, None) - self._registerRootFactory(None) router = self._makeOne() start_response = DummyStartResponse() request_events = self._registerEventListener(INewRequest) @@ -737,30 +694,6 @@ class RouterTests(unittest.TestCase): request = request_events[0].request self.failUnless(IRequest.providedBy(request)) - def test_call_irequestfactory_override(self): - from repoze.bfg.interfaces import INewRequest - from repoze.bfg.interfaces import IRequestFactory - from repoze.bfg.testing import DummyRequest - self.registry.registerUtility(DummyRequest, IRequestFactory) - context = DummyContext() - self._registerTraverserFactory(context) - response = DummyResponse() - response.app_iter = ['Hello world'] - view = make_view(response) - environ = self._makeEnviron() - self._registerView(view, '', None, None) - self._registerRootFactory(None) - router = self._makeOne() - start_response = DummyStartResponse() - request_events = self._registerEventListener(INewRequest) - result = router(environ, start_response) - request = request_events[0].request - self.failUnless(isinstance(request, DummyRequest)) - self.assertEqual(request.root, None) - self.assertEqual(request.context, context) - self.assertEqual(request.view_name, '') - self.assertEqual(request.subpath, []) - class MakeAppTests(unittest.TestCase): def setUp(self): cleanUp() @@ -844,13 +777,14 @@ class MakeAppTests(unittest.TestCase): self.assertEqual(settings.reload_templates, True) self.assertEqual(settings.debug_authorization, True) self.failUnless(isinstance(rootfactory, RoutesRootFactory)) - self.assertEqual(rootfactory.get_root, rootpolicy) + self.assertEqual(rootfactory.default_root_factory, rootpolicy) self.failUnless(self.regmgr.pushed and self.regmgr.popped) def test_routes_in_config_no_rootpolicy(self): options= {'reload_templates':True, 'debug_authorization':True} from repoze.bfg.urldispatch import RoutesRootFactory + from repoze.bfg.router import DefaultRootFactory from repoze.bfg.tests import routesapp app = self._callFUT(None, routesapp, options=options) from repoze.bfg.interfaces import ISettings @@ -863,15 +797,18 @@ class MakeAppTests(unittest.TestCase): self.assertEqual(settings.reload_templates, True) self.assertEqual(settings.debug_authorization, True) self.failUnless(isinstance(rootfactory, RoutesRootFactory)) - self.assertEqual(rootfactory.get_root, None) + self.assertEqual(rootfactory.default_root_factory, DefaultRootFactory) self.failUnless(self.regmgr.pushed and self.regmgr.popped) def test_no_routes_in_config_no_rootpolicy(self): + from repoze.bfg.router import DefaultRootFactory + from repoze.bfg.interfaces import IRootFactory options= {'reload_templates':True, 'debug_authorization':True} from repoze.bfg.tests import fixtureapp - self.assertRaises(ValueError, self._callFUT, None, fixtureapp, - options=options) + app = self._callFUT(None, fixtureapp, options=options) + rootfactory = app.registry.getUtility(IRootFactory) + self.assertEqual(rootfactory, DefaultRootFactory) def test_authorization_policy_no_authentication_policy(self): from repoze.bfg.interfaces import IAuthorizationPolicy @@ -922,38 +859,6 @@ class MakeAppTests(unittest.TestCase): self.assertEqual(len(logger.messages), 1) self.failUnless('ISecurityPolicy' in logger.messages[0]) - def test_custom_default_context_factory_nodecorate(self): - from repoze.bfg.tests import routesapp - from zope.component import getGlobalSiteManager - from repoze.bfg.interfaces import IRoutesContextFactory - from repoze.bfg.interfaces import IRoutesMapper - class Dummy(object): - pass - gsm = getGlobalSiteManager() - gsm.registerUtility(Dummy, IRoutesContextFactory) - app = self._callFUT(None, routesapp, registry=gsm) - mapper = gsm.getUtility(IRoutesMapper) - self.assertEqual(mapper.default_context_factory, - Dummy) - self.assertEqual(mapper.decorate_context, True) - - def test_custom_default_context_factory_decorate(self): - from repoze.bfg.tests import routesapp - from zope.component import getGlobalSiteManager - from repoze.bfg.interfaces import IRoutesContextFactory - from repoze.bfg.interfaces import IRoutesMapper - from repoze.bfg.interfaces import IRoutesContext - from zope.interface import implements - class Dummy(object): - implements(IRoutesContext) - gsm = getGlobalSiteManager() - gsm.registerUtility(Dummy, IRoutesContextFactory) - app = self._callFUT(None, routesapp, registry=gsm) - mapper = gsm.getUtility(IRoutesMapper) - self.assertEqual(mapper.default_context_factory, - Dummy) - self.assertEqual(mapper.decorate_context, False) - class TestDefaultForbiddenView(unittest.TestCase): def _callFUT(self, context, request): from repoze.bfg.router import default_forbidden_view diff --git a/repoze/bfg/tests/test_traversal.py b/repoze/bfg/tests/test_traversal.py index 41cf667b9..506a69d17 100644 --- a/repoze/bfg/tests/test_traversal.py +++ b/repoze/bfg/tests/test_traversal.py @@ -189,6 +189,67 @@ class ModelGraphTraverserTests(unittest.TestCase): environ = self._getEnviron(PATH_INFO='/%s' % segment) self.assertRaises(TypeError, policy, environ) + def test_withroute_nothingfancy(self): + model = DummyContext() + traverser = self._makeOne(model) + routing_args = ((), {}) + environ = {'bfg.routes.matchdict': {}} + result = traverser(environ) + self.assertEqual(result['context'], model) + self.assertEqual(result['view_name'], '') + self.assertEqual(result['subpath'], []) + self.assertEqual(result['traversed'], []) + self.assertEqual(result['virtual_root'], model) + self.assertEqual(result['virtual_root_path'], []) + + def test_withroute_with_subpath(self): + model = DummyContext() + traverser = self._makeOne(model) + environ = {'bfg.routes.matchdict': {'subpath':'/a/b/c'}} + result = traverser(environ) + self.assertEqual(result['context'], model) + self.assertEqual(result['view_name'], '') + self.assertEqual(result['subpath'], ['a', 'b','c']) + self.assertEqual(result['traversed'], []) + self.assertEqual(result['virtual_root'], model) + self.assertEqual(result['virtual_root_path'], []) + + def test_withroute_with_path_info(self): + model = DummyContext() + traverser = self._makeOne(model) + environ = {'bfg.routes.matchdict': {'path_info':'foo/bar'}, + 'PATH_INFO':'/a/b/foo/bar', 'SCRIPT_NAME':''} + result = traverser(environ) + self.assertEqual(result['context'], model) + self.assertEqual(result['view_name'], '') + self.assertEqual(result['subpath'], []) + self.assertEqual(result['traversed'], []) + self.assertEqual(result['virtual_root'], model) + self.assertEqual(result['virtual_root_path'], []) + self.assertEqual(environ['PATH_INFO'], '/foo/bar') + self.assertEqual(environ['SCRIPT_NAME'], '/a/b') + + def test_withroute_with_path_info_PATH_INFO_w_extra_slash(self): + model = DummyContext() + traverser = self._makeOne(model) + environ = {'bfg.routes.matchdict':{'path_info':'foo/bar'}, + 'PATH_INFO':'/a/b//foo/bar', 'SCRIPT_NAME':''} + traverser(environ) + self.assertEqual(environ['PATH_INFO'], '/foo/bar') + self.assertEqual(environ['SCRIPT_NAME'], '/a/b') + + def test_withroute_and_traverse(self): + model = DummyContext() + traverser = self._makeOne(model) + environ = {'bfg.routes.matchdict': {'traverse':'foo/bar'}} + result = traverser(environ) + self.assertEqual(result['context'], model) + self.assertEqual(result['view_name'], 'foo') + self.assertEqual(result['subpath'], ['bar']) + self.assertEqual(result['traversed'], []) + self.assertEqual(result['virtual_root'], model) + self.assertEqual(result['virtual_root_path'], []) + class FindInterfaceTests(unittest.TestCase): def _callFUT(self, context, iface): from repoze.bfg.traversal import find_interface @@ -637,7 +698,21 @@ class TraversalContextURLTests(unittest.TestCase): context_url = self._makeOne(bar, request) result = context_url() self.assertEqual(result, 'http://example.com:5432//bar/') - + + def test_with_route(self): + root = DummyContext() + root.__name__ = None + root.__parent__ = None + one = DummyContext() + one.__name__ = 'one' + one.__parent__ = root + route = DummyRoute() + request = DummyRequest({'bfg.routes.route':route, + 'bfg.routes.matchdict':{'a':1}}) + context_url = self._makeOne(one, request) + result = context_url() + self.assertEqual(result, 'http://example.com/one/') + self.assertEqual(route.generate_kw, {'a':1, 'traverse':'/one/'}) class TestVirtualRoot(unittest.TestCase): def setUp(self): @@ -839,3 +914,8 @@ class DummyContextURL: def virtual_root(self): return '123' + +class DummyRoute: + def generate(self, **kw): + self.generate_kw = kw + return 'http://example.com' diff --git a/repoze/bfg/tests/test_urldispatch.py b/repoze/bfg/tests/test_urldispatch.py index 2b4578f94..68fda032d 100644 --- a/repoze/bfg/tests/test_urldispatch.py +++ b/repoze/bfg/tests/test_urldispatch.py @@ -22,139 +22,78 @@ class RoutesRootFactoryTests(unittest.TestCase): klass = self._getTargetClass() return klass(get_root) - def test_init_default_context_factory(self): - from zope.component import getGlobalSiteManager - from repoze.bfg.interfaces import IRoutesContextFactory - from repoze.bfg.urldispatch import DefaultRoutesContext - class Dummy(object): - pass - gsm = getGlobalSiteManager() - gsm.registerUtility(Dummy, IRoutesContextFactory) + def test_init_default_root_factory(self): mapper = self._makeOne(None) - self.assertEqual(mapper.default_context_factory, DefaultRoutesContext) - self.assertEqual(mapper.decorate_context, True) + self.assertEqual(mapper.default_root_factory, None) def test_no_route_matches(self): - marker = () - get_root = make_get_root(marker) + get_root = make_get_root(123) mapper = self._makeOne(get_root) environ = self._getEnviron(PATH_INFO='/') result = mapper(environ) - self.assertEqual(result, marker) + self.assertEqual(result, 123) self.assertEqual(mapper.environ, environ) def test_route_matches(self): - marker = () - get_root = make_get_root(marker) + get_root = make_get_root(123) mapper = self._makeOne(get_root) mapper.connect('foo', 'archives/:action/:article', foo='foo') environ = self._getEnviron(PATH_INFO='/archives/action1/article1') result = mapper(environ) - from repoze.bfg.interfaces import IRoutesContext - self.failUnless(IRoutesContext.providedBy(result)) - self.assertEqual(result.foo, 'foo') - self.assertEqual(result.action, 'action1') - self.assertEqual(result.article, 'article1') + self.assertEqual(result, 123) routing_args = environ['wsgiorg.routing_args'][1] self.assertEqual(routing_args['foo'], 'foo') self.assertEqual(routing_args['action'], 'action1') self.assertEqual(routing_args['article'], 'article1') - self.assertEqual(environ['bfg.route'].name, 'foo') + self.assertEqual(environ['bfg.routes.matchdict'], routing_args) + self.assertEqual(environ['bfg.routes.route'].name, 'foo') def test_unnamed_root_route_matches(self): - mapper = self._makeOne(None) + root_factory = make_get_root(123) + mapper = self._makeOne(root_factory) mapper.connect('') environ = self._getEnviron(PATH_INFO='/') result = mapper(environ) - from repoze.bfg.interfaces import IRoutesContext - self.failUnless(IRoutesContext.providedBy(result)) - self.assertEqual(environ['bfg.route'].name, None) + self.assertEqual(result, 123) + self.assertEqual(environ['bfg.routes.route'].name, None) + self.assertEqual(environ['bfg.routes.matchdict'], {}) + self.assertEqual(environ['wsgiorg.routing_args'], ((), {})) def test_named_root_route_matches(self): - mapper = self._makeOne(None) + root_factory = make_get_root(123) + mapper = self._makeOne(root_factory) mapper.connect('root', '') environ = self._getEnviron(PATH_INFO='/') result = mapper(environ) - from repoze.bfg.interfaces import IRoutesContext - self.failUnless(IRoutesContext.providedBy(result)) - self.assertEqual(environ['bfg.route'].name, 'root') + self.assertEqual(result, 123) + self.assertEqual(environ['bfg.routes.route'].name, 'root') + self.assertEqual(environ['bfg.routes.matchdict'], {}) + self.assertEqual(environ['wsgiorg.routing_args'], ((), {})) def test_unicode_in_route_default(self): - marker = () - get_root = make_get_root(marker) - mapper = self._makeOne(get_root) - class DummyRoute2: + root_factory = make_get_root(123) + mapper = self._makeOne(root_factory) + class DummyRoute: routepath = ':id' _factory = None - _provides = () la = unicode('\xc3\xb1a', 'utf-8') - mapper.routematch = lambda *arg: ({la:'id'}, DummyRoute2) + mapper.routematch = lambda *arg: ({la:'id'}, DummyRoute) mapper.connect('whatever', ':la') environ = self._getEnviron(PATH_INFO='/foo') result = mapper(environ) - from repoze.bfg.interfaces import IRoutesContext - self.failUnless(IRoutesContext.providedBy(result)) - self.assertEqual(getattr(result, la.encode('utf-8')), 'id') + self.assertEqual(result, 123) + self.assertEqual(environ['bfg.routes.route'], DummyRoute) + self.assertEqual(environ['bfg.routes.matchdict'], {u'\xf1a': 'id'}) routing_args = environ['wsgiorg.routing_args'][1] - self.assertEqual(routing_args[la.encode('utf-8')], 'id') + self.assertEqual(routing_args[la], 'id') - def test_no_fallback_get_root(self): - from repoze.bfg.urldispatch import RoutesContextNotFound - marker = () - mapper = self._makeOne(None) + def test_fallback_to_default_root_factory(self): + root_factory = make_get_root(123) + mapper = self._makeOne(root_factory) mapper.connect('wont', 'wont/:be/:found') environ = self._getEnviron(PATH_INFO='/archives/action1/article1') result = mapper(environ) - self.failUnless(isinstance(result, RoutesContextNotFound)) - - def test_custom_factory(self): - marker = () - get_root = make_get_root(marker) - mapper = self._makeOne(get_root) - from zope.interface import implements, Interface - class IDummy(Interface): - pass - class Dummy(object): - implements(IDummy) - def __init__(self, **kw): - self.__dict__.update(kw) - mapper.connect('article', 'archives/:action/:article', - _factory=Dummy) - environ = self._getEnviron(PATH_INFO='/archives/action1/article1') - result = mapper(environ) - self.assertEqual(result.action, 'action1') - self.assertEqual(result.article, 'article1') - from repoze.bfg.interfaces import IRoutesContext - self.failUnless(IRoutesContext.providedBy(result)) - self.failUnless(isinstance(result, Dummy)) - self.failUnless(IDummy.providedBy(result)) - self.failIf(hasattr(result, '_factory')) - - def test_decorate_context_false(self): - from repoze.bfg.interfaces import IRoutesContext - class Dummy: - def __init__(self, **kw): - pass - mapper = self._makeOne(None) - mapper.connect('root', '') - environ = self._getEnviron(PATH_INFO='/') - mapper.decorate_context = False - mapper.default_context_factory = Dummy - result = mapper(environ) - self.failIf(IRoutesContext.providedBy(result)) - - def test_decorate_context_true(self): - from repoze.bfg.interfaces import IRoutesContext - class Dummy: - def __init__(self, **kw): - pass - mapper = self._makeOne(None) - mapper.connect('root', '') - environ = self._getEnviron(PATH_INFO='/') - mapper.decorate_context = True - mapper.default_context_factory = Dummy - result = mapper(environ) - self.failUnless(IRoutesContext.providedBy(result)) + self.assertEqual(result, 123) def test_has_routes(self): mapper = self._makeOne(None) @@ -163,163 +102,23 @@ class RoutesRootFactoryTests(unittest.TestCase): self.assertEqual(mapper.has_routes(), True) def test_url_for(self): - marker = () - get_root = make_get_root(marker) - mapper = self._makeOne(get_root) + root_factory = make_get_root(None) + mapper = self._makeOne(root_factory) mapper.connect('whatever', 'archives/:action/:article') environ = self._getEnviron(PATH_INFO='/archives/action1/article1') - route = DummyRoute('yo') - environ['bfg.route'] = route result = mapper(environ) from routes import url_for result = url_for(action='action2', article='article2') self.assertEqual(result, '/archives/action2/article2') -class TestRoutesContextNotFound(unittest.TestCase): - def _getTargetClass(self): - from repoze.bfg.urldispatch import RoutesContextNotFound - return RoutesContextNotFound - - def _makeOne(self, msg): - return self._getTargetClass()(msg) - - def test_it(self): - inst = self._makeOne('hi') - self.assertEqual(inst.msg, 'hi') - def make_get_root(result): def dummy_get_root(environ): return result return dummy_get_root -class RoutesModelTraverserTests(unittest.TestCase): - def _getTargetClass(self): - from repoze.bfg.urldispatch import RoutesModelTraverser - return RoutesModelTraverser - - def _makeOne(self, model): - klass = self._getTargetClass() - return klass(model) - - def test_class_conforms_to_ITraverser(self): - from zope.interface.verify import verifyClass - from repoze.bfg.interfaces import ITraverser - verifyClass(ITraverser, self._getTargetClass()) - - def test_instance_conforms_to_ITraverser(self): - from zope.interface.verify import verifyObject - from repoze.bfg.interfaces import ITraverser - verifyObject(ITraverser, self._makeOne(None)) - - def test_it_nothingfancy(self): - model = DummyContext() - traverser = self._makeOne(model) - routing_args = ((), {}) - route = DummyRoute('yo') - environ = {'wsgiorg.routing_args': routing_args, 'bfg.route': route} - result = traverser(environ) - self.assertEqual(result['context'], model) - self.assertEqual(result['view_name'], 'yo') - self.assertEqual(result['subpath'], []) - self.assertEqual(result['traversed'], None) - self.assertEqual(result['virtual_root'], model) - self.assertEqual(result['virtual_root_path'], None) - - def test_call_with_subpath(self): - model = DummyContext() - traverser = self._makeOne(model) - routing_args = ((), {'subpath':'/a/b/c'}) - route = DummyRoute('yo') - environ = {'wsgiorg.routing_args':routing_args, 'bfg.route': route} - result = traverser(environ) - self.assertEqual(result['context'], model) - self.assertEqual(result['view_name'], 'yo') - self.assertEqual(result['subpath'], ['a', 'b','c']) - self.assertEqual(result['traversed'], None) - self.assertEqual(result['virtual_root'], model) - self.assertEqual(result['virtual_root_path'], None) - - def test_with_path_info(self): - model = DummyContext() - traverser = self._makeOne(model) - routing_args = ((), {'path_info':'foo/bar'}) - route = DummyRoute('yo') - environ = {'wsgiorg.routing_args': routing_args, 'bfg.route': route, - 'PATH_INFO':'/a/b/foo/bar', 'SCRIPT_NAME':''} - result = traverser(environ) - self.assertEqual(result['context'], model) - self.assertEqual(result['view_name'], 'yo') - self.assertEqual(result['subpath'], []) - self.assertEqual(result['traversed'], None) - self.assertEqual(result['virtual_root'], model) - self.assertEqual(result['virtual_root_path'], None) - self.assertEqual(environ['PATH_INFO'], '/foo/bar') - self.assertEqual(environ['SCRIPT_NAME'], '/a/b') - - def test_with_path_info_PATH_INFO_w_extra_slash(self): - model = DummyContext() - traverser = self._makeOne(model) - routing_args = ((), {'path_info':'foo/bar'}) - route = DummyRoute('yo') - environ = {'wsgiorg.routing_args': routing_args, 'bfg.route':route, - 'PATH_INFO':'/a/b//foo/bar', 'SCRIPT_NAME':''} - traverser(environ) - self.assertEqual(environ['PATH_INFO'], '/foo/bar') - self.assertEqual(environ['SCRIPT_NAME'], '/a/b') - -class RoutesContextURLTests(unittest.TestCase): - def _getTargetClass(self): - from repoze.bfg.urldispatch import RoutesContextURL - return RoutesContextURL - - def _makeOne(self, context, request): - return self._getTargetClass()(context, request) - - def test_class_conforms_to_IContextURL(self): - from zope.interface.verify import verifyClass - from repoze.bfg.interfaces import IContextURL - verifyClass(IContextURL, self._getTargetClass()) - - def test_instance_conforms_to_IContextURL(self): - from zope.interface.verify import verifyObject - from repoze.bfg.interfaces import IContextURL - verifyObject(IContextURL, self._makeOne(None, None)) - - def test_get_virtual_root(self): - context_url = self._makeOne(1,2) - self.assertEqual(context_url.virtual_root(), 1) - - def test_call(self): - from routes import Mapper - mapper = Mapper(controller_scan=None, directory=None, - explicit=True, always_scan=False) - args = {'a':'1', 'b':'2', 'c':'3'} - mapper.connect(':a/:b/:c') - mapper.create_regs([]) - environ = {'SERVER_NAME':'example.com', 'wsgi.url_scheme':'http', - 'SERVER_PORT':'80', 'wsgiorg.routing_args':((), args)} - mapper.environ = environ - from routes import request_config - config = request_config() - config.environ = environ - config.mapper = mapper - config.mapper_dict = args - config.host = 'www.example.com' - config.protocol = 'https' - config.redirect = None - request = DummyRequest() - request.environ = environ - context_url = self._makeOne(None, request) - result = context_url() - self.assertEqual(result, '/1/2/3') - class DummyContext(object): """ """ class DummyRequest(object): """ """ -class DummyRoute(object): - def __init__(self, name): - self.name = name - diff --git a/repoze/bfg/tests/test_zcml.py b/repoze/bfg/tests/test_zcml.py index e35944add..2fad91150 100644 --- a/repoze/bfg/tests/test_zcml.py +++ b/repoze/bfg/tests/test_zcml.py @@ -184,111 +184,54 @@ class TestViewDirective(unittest.TestCase): regadapt_discriminator = ('view', IFoo, '', IDummy, IView) self.assertEqual(regadapt['args'][2], (IFoo, IDummy)) - def test_adapted_class(self): - from zope.interface import Interface - import zope.component - - class IFoo(Interface): + def test_with_route_name(self): + class IFoo: pass - class IBar(Interface): + class IDummyRequest: pass - - class AView(object): - zope.component.adapts(IFoo, IBar) - - aview = AView() - context = DummyContext() - self._callFUT(context, view=aview) - + context.request_factories = {'foo':{None:{'interface':IDummyRequest}}} + view = lambda *arg: None + self._callFUT(context, 'repoze.view', IFoo, view=view, route_name='foo') actions = context.actions from repoze.bfg.interfaces import IView + from repoze.bfg.interfaces import IViewPermission + from repoze.bfg.security import ViewPermissionFactory from repoze.bfg.zcml import handler - self.assertEqual(len(actions), 1) - - regadapt = actions[0] - regadapt_discriminator = ('view', IFoo, '', IBar, IView) + self.assertEqual(len(actions), 2) + permission = actions[0] + permission_discriminator = ('permission', IFoo, '', IDummyRequest, + IViewPermission) + self.assertEqual(permission['discriminator'], permission_discriminator) + self.assertEqual(permission['callable'], handler) + self.assertEqual(permission['args'][0], 'registerAdapter') + self.failUnless(isinstance(permission['args'][1],ViewPermissionFactory)) + self.assertEqual(permission['args'][1].permission_name, 'repoze.view') + self.assertEqual(permission['args'][2], (IFoo, IDummyRequest)) + self.assertEqual(permission['args'][3], IViewPermission) + self.assertEqual(permission['args'][4], '') + self.assertEqual(permission['args'][5], None) + + regadapt = actions[1] + regadapt_discriminator = ('view', IFoo, '', IDummyRequest, IView) self.assertEqual(regadapt['discriminator'], regadapt_discriminator) self.assertEqual(regadapt['callable'], handler) self.assertEqual(regadapt['args'][0], 'registerAdapter') - self.assertEqual(regadapt['args'][1], aview) - self.assertEqual(regadapt['args'][2], (IFoo, IBar)) + self.assertEqual(regadapt['args'][1], view) + self.assertEqual(regadapt['args'][2], (IFoo, IDummyRequest)) self.assertEqual(regadapt['args'][3], IView) self.assertEqual(regadapt['args'][4], '') self.assertEqual(regadapt['args'][5], None) - def test_adapted_function(self): - from zope.interface import Interface - import zope.component - - class IFoo(Interface): - pass - class IBar(Interface): - pass - - @zope.component.adapter(IFoo, IBar) - def aview(context, request): - pass - aview(None, None) # dead chicken for test coverage - + def test_with_route_name_bad_order(self): context = DummyContext() - self._callFUT(context, view=aview) - - actions = context.actions - from repoze.bfg.interfaces import IView - from repoze.bfg.zcml import handler - - self.assertEqual(len(actions), 1) - - regadapt = actions[0] - regadapt_discriminator = ('view', IFoo, '', IBar, IView) - - self.assertEqual(regadapt['discriminator'], regadapt_discriminator) - self.assertEqual(regadapt['callable'], handler) - self.assertEqual(regadapt['args'][0], 'registerAdapter') - self.assertEqual(regadapt['args'][1], aview) - self.assertEqual(regadapt['args'][2], (IFoo, IBar)) - self.assertEqual(regadapt['args'][3], IView) - self.assertEqual(regadapt['args'][4], '') - self.assertEqual(regadapt['args'][5], None) - - def test_adapted_nonsense(self): - from repoze.bfg.interfaces import IRequest - from zope.interface import Interface - import zope.component - - class IFoo(Interface): - pass - class IBar(Interface): - pass - - @zope.component.adapter(IFoo) # too few arguments - def aview(context, request): - pass - aview(None, None) # dead chicken for test coverage - - context = DummyContext() - self._callFUT(context, view=aview) - - actions = context.actions - from repoze.bfg.interfaces import IView - from repoze.bfg.zcml import handler - - self.assertEqual(len(actions), 1) - - regadapt = actions[0] - regadapt_discriminator = ('view', None, '', IRequest, IView) - - self.assertEqual(regadapt['discriminator'], regadapt_discriminator) - self.assertEqual(regadapt['callable'], handler) - self.assertEqual(regadapt['args'][0], 'registerAdapter') - self.assertEqual(regadapt['args'][1], aview) - self.assertEqual(regadapt['args'][2], (None, IRequest)) - self.assertEqual(regadapt['args'][3], IView) - self.assertEqual(regadapt['args'][4], '') - self.assertEqual(regadapt['args'][5], None) + context.request_factories = {} + view = lambda *arg: None + from zope.configuration.exceptions import ConfigurationError + self.assertRaises(ConfigurationError, self._callFUT, context, + 'repoze.view', None, view, '', None, 'foo') class TestRouteRequirementFunction(unittest.TestCase): def _callFUT(self, context, attr, expr): @@ -372,7 +315,11 @@ class TestConnectRouteFunction(unittest.TestCase): self.assertEqual(D['_collection_name'], 'c') self.assertEqual(D['_parent_resource'], pr) self.assertEqual(D['conditions'], c) - self.assertEqual(D['_factory'], foo) + route = mapper.matchlist[-1] + self.assertEqual(route._factory, foo) + self.assertEqual(route.request_factories, + directive.context.request_factories['thename']) + def test_condition_subdomain_true(self): mapper = self._registerRoutesMapper() @@ -469,49 +416,45 @@ class TestRoute(unittest.TestCase): from repoze.bfg.zcml import Route return Route - def _makeOne(self, context, path, name, view, **kw): - return self._getTargetClass()(context, path, name, view, **kw) + def _makeOne(self, context, path, name, **kw): + return self._getTargetClass()(context, path, name, **kw) def test_defaults(self): context = DummyContext() - view = Dummy() - route = self._makeOne(context, 'path', 'name', view) + route = self._makeOne(context, 'path', 'name') self.assertEqual(route.path, 'path') self.assertEqual(route.name, 'name') - self.assertEqual(route.view, view) self.assertEqual(route.requirements, {}) def test_parent_collection_name_missing(self): context = DummyContext() - view = Dummy() from zope.configuration.exceptions import ConfigurationError self.assertRaises(ConfigurationError, self._makeOne, context, - 'path', 'name', view, - parent_member_name='a') + 'path', 'name', parent_member_name='a') def test_parent_collection_name_present(self): context = DummyContext() - view = Dummy() - route = self._makeOne(context, 'path', 'name', view, + route = self._makeOne(context, 'path', 'name', parent_member_name='a', parent_collection_name='p') self.assertEqual(route.parent_member_name, 'a') self.assertEqual(route.parent_collection_name, 'p') - def test_after(self): + def test_after_with_view(self): from repoze.bfg.zcml import handler from repoze.bfg.zcml import connect_route - from repoze.bfg.interfaces import IRoutesContext - from repoze.bfg.interfaces import IRequest from repoze.bfg.interfaces import IView context = DummyContext() view = Dummy() - route = self._makeOne(context, 'path', 'name', view) + route = self._makeOne(context, 'path', 'name', view=view) route.after() actions = context.actions self.assertEqual(len(actions), 2) + factories = context.request_factories + request_iface = factories['name'][None]['interface'] + view_action = actions[0] view_callable = view_action['callable'] view_discriminator = view_action['discriminator'] @@ -519,13 +462,13 @@ class TestRoute(unittest.TestCase): self.assertEqual(view_callable, handler) self.assertEqual(len(view_discriminator), 5) self.assertEqual(view_discriminator[0], 'view') - self.assertEqual(view_discriminator[1], IRoutesContext) - self.assertEqual(view_discriminator[2],'name') - self.assertEqual(view_discriminator[3], IRequest) + self.assertEqual(view_discriminator[1], None) + self.assertEqual(view_discriminator[2],'') + self.assertEqual(view_discriminator[3], request_iface) self.assertEqual(view_discriminator[4], IView) self.assertEqual(view_args, ('registerAdapter', view, - (IRoutesContext, IRequest), IView, - 'name', None)) + (None, request_iface), IView, + '', None)) route_action = actions[1] route_callable = route_action['callable'] @@ -534,7 +477,32 @@ class TestRoute(unittest.TestCase): self.assertEqual(route_callable, connect_route) self.assertEqual(len(route_discriminator), 7) self.assertEqual(route_discriminator[0], 'route') - self.assertEqual(route_discriminator[1], 'path') + self.assertEqual(route_discriminator[1], 'name') + self.assertEqual(route_discriminator[2],'{}') + self.assertEqual(route_discriminator[3], None) + self.assertEqual(route_discriminator[4], None) + self.assertEqual(route_discriminator[5], None) + self.assertEqual(route_discriminator[6], None) + self.assertEqual(route_args, (route,)) + + def test_after_without_view(self): + from repoze.bfg.zcml import connect_route + + context = DummyContext() + view = Dummy() + route = self._makeOne(context, 'path', 'name') + route.after() + actions = context.actions + self.assertEqual(len(actions), 1) + + route_action = actions[0] + route_callable = route_action['callable'] + route_discriminator = route_action['discriminator'] + route_args = route_action['args'] + self.assertEqual(route_callable, connect_route) + self.assertEqual(len(route_discriminator), 7) + self.assertEqual(route_discriminator[0], 'route') + self.assertEqual(route_discriminator[1], 'name') self.assertEqual(route_discriminator[2],'{}') self.assertEqual(route_discriminator[3], None) self.assertEqual(route_discriminator[4], None) @@ -611,6 +579,7 @@ class TestBFGViewFunctionGrokker(unittest.TestCase): obj.__for__ = Interface obj.__view_name__ = 'foo.html' obj.__request_type__ = IRequest + obj.__route_name__ = None context = DummyContext() result = grokker.grok('name', obj, context=context) self.assertEqual(result, True) @@ -729,13 +698,20 @@ class DummyRouteDirective: if not 'requirements' in kw: kw['requirements'] = {} self.__dict__.update(kw) + self.context = DummyContext() + self.context.request_factories = {self.name:{}} class DummyMapper: def __init__(self): self.connections = [] + self.matchlist = [] def connect(self, *arg, **kw): self.connections.append((arg, kw)) + self.matchlist.append(DummyRoute()) + +class DummyRoute: + pass from zope.interface import Interface class IDummy(Interface): diff --git a/repoze/bfg/traversal.py b/repoze/bfg/traversal.py index b8ba68faa..a0fdc5c71 100644 --- a/repoze/bfg/traversal.py +++ b/repoze/bfg/traversal.py @@ -489,10 +489,31 @@ class ModelGraphTraverser(object): self.root = root def __call__(self, environ, _marker=_marker): - try: - path = environ['PATH_INFO'] - except KeyError: - path = '/' + if 'bfg.routes.matchdict' in environ: + # this request matched a Routes route + matchdict = environ['bfg.routes.matchdict'] + if 'path_info' in matchdict: + # this is stolen from routes.middleware; if the route map + # has a *path_info capture, use it to influence the path + # info and script_name of the generated environment + oldpath = environ['PATH_INFO'] + newpath = matchdict['path_info'] or '' + environ['PATH_INFO'] = newpath + if not environ['PATH_INFO'].startswith('/'): + environ['PATH_INFO'] = '/' + environ['PATH_INFO'] + pattern = r'^(.*?)/' + re.escape(newpath) + '$' + environ['SCRIPT_NAME'] += re.sub(pattern, r'\1', oldpath) + if environ['SCRIPT_NAME'].endswith('/'): + environ['SCRIPT_NAME'] = environ['SCRIPT_NAME'][:-1] + path = matchdict.get('traverse', '/') + subpath = filter(None, matchdict.get('subpath', '').split('/')) + else: + # this request did not match a Routes route + subpath = [] + try: + path = environ['PATH_INFO'] + except KeyError: + path = '/' try: vroot_path_string = environ[VH_ROOT_KEY] except KeyError: @@ -535,8 +556,9 @@ class ModelGraphTraverser(object): ob = next i += 1 - return dict(context=ob, view_name=u'', subpath=[], traversed=traversed, - virtual_root=vroot, virtual_root_path=vroot_path, + return dict(context=ob, view_name=u'', subpath=subpath, + traversed=traversed, virtual_root=vroot, + virtual_root_path=vroot_path, root=self.root) class TraversalContextURL(object): @@ -584,8 +606,16 @@ class TraversalContextURL(object): else: if path.startswith(vroot_path): path = path[len(vroot_path):] - - app_url = request.application_url # never ends in a slash + + environ = request.environ + if 'bfg.routes.route' in environ: + matchdict = environ['bfg.routes.matchdict'].copy() + matchdict['traverse'] = path + route = environ['bfg.routes.route'] + app_url = route.generate(**matchdict) + else: + app_url = request.application_url # never ends in a slash + return app_url + path always_safe = ('ABCDEFGHIJKLMNOPQRSTUVWXYZ' diff --git a/repoze/bfg/urldispatch.py b/repoze/bfg/urldispatch.py index 612843cfa..bfeae3333 100644 --- a/repoze/bfg/urldispatch.py +++ b/repoze/bfg/urldispatch.py @@ -1,72 +1,25 @@ -import re - -from zope.component import queryUtility - -from zope.interface import implements -from zope.interface import alsoProvides -from zope.interface import classProvides - from routes import Mapper from routes import request_config -from routes import url_for - -from repoze.bfg.interfaces import IContextNotFound -from repoze.bfg.interfaces import IContextURL -from repoze.bfg.interfaces import IRoutesContext -from repoze.bfg.interfaces import IRoutesContextFactory -from repoze.bfg.interfaces import ITraverser -from repoze.bfg.interfaces import ITraverserFactory -_marker = () - -class DefaultRoutesContext(object): - implements(IRoutesContext) - def __init__(self, **kw): - self.__dict__.update(kw) - -class RoutesContextNotFound(object): - implements(IContextNotFound) - def __init__(self, msg): - self.msg = msg - -_notfound = RoutesContextNotFound( - 'Routes context cannot be found and no fallback "get_root"') +_marker = object() class RoutesRootFactory(Mapper): - """ The ``RoutesRootFactory`` is a wrapper for the root factory - callable passed in to the repoze.bfg ``Router`` at initialization - time. When it is instantiated, it wraps the root factory of an - application in such a way that the `Routes - <http://routes.groovie.org/index.html>`_ engine has the 'first - crack' at resolving the current request URL to a repoze.bfg view. - Any view that claims it is 'for' the interface - ``repoze.bfg.interfaces.IRoutesContext`` will be called if its - name matches the Routes route ``name`` name for the match. It - will be passed a context object that has attributes that are - present as Routes match arguments dictionary keys. If no Routes - route matches the current request, the 'fallback' get_root is - called.""" - decorate_context = True - def __init__(self, get_root=None, **kw): - self.get_root = get_root + def __init__(self, default_root_factory, **kw): + self.default_root_factory = default_root_factory kw['controller_scan'] = None kw['always_scan'] = False kw['directory'] = None kw['explicit'] = True Mapper.__init__(self, **kw) self._regs_created = False - self.default_context_factory = DefaultRoutesContext def has_routes(self): return bool(self.matchlist) def connect(self, *arg, **kw): - # we need to deal with our custom attributes specially :-( - factory = None - if '_factory' in kw: - factory = kw.pop('_factory') result = Mapper.connect(self, *arg, **kw) - self.matchlist[-1]._factory = factory + route = self.matchlist[-1] + route._factory = None # overridden by ZCML return result def __call__(self, environ): @@ -89,78 +42,11 @@ class RoutesRootFactory(Mapper): config.host = environ.get('HTTP_HOST', environ['SERVER_NAME']) config.protocol = environ['wsgi.url_scheme'] config.redirect = None - kw = {} - for k, v in args.items(): - # Routes "helpfully" converts default parameter names - # into Unicode; these can't be used as attr names - if k.__class__ is unicode: - k = k.encode('utf-8') - kw[k] = v - factory = route._factory - if factory is None: - context = self.default_context_factory(**kw) - if self.decorate_context: - alsoProvides(context, IRoutesContext) - else: - context = factory(**kw) - alsoProvides(context, IRoutesContext) - environ['wsgiorg.routing_args'] = ((), kw) - environ['bfg.route'] = route - return context - - if self.get_root is None: - return _notfound - - return self.get_root(environ) - -class RoutesModelTraverser(object): - classProvides(ITraverserFactory) - implements(ITraverser) - def __init__(self, context): - self.context = context - - def __call__(self, environ): - route = environ['bfg.route'] - match = environ['wsgiorg.routing_args'][1] - - subpath = match.get('subpath', []) - if subpath: - subpath = filter(None, subpath.split('/')) - - if 'path_info' in match: - # this is stolen from routes.middleware; if the route map - # has a *path_info capture, use it to influence the path - # info and script_name of the generated environment - oldpath = environ['PATH_INFO'] - newpath = match['path_info'] or '' - environ['PATH_INFO'] = newpath - if not environ['PATH_INFO'].startswith('/'): - environ['PATH_INFO'] = '/' + environ['PATH_INFO'] - pattern = r'^(.*?)/' + re.escape(newpath) + '$' - environ['SCRIPT_NAME'] += re.sub(pattern, r'\1', oldpath) - if environ['SCRIPT_NAME'].endswith('/'): - environ['SCRIPT_NAME'] = environ['SCRIPT_NAME'][:-1] - - return dict(context=self.context, view_name=route.name, - subpath=subpath, traversed=None, virtual_root=self.context, - virtual_root_path=None, root=self.context) - -class RoutesContextURL(object): - """ The IContextURL adapter used to generate URLs for a context - object obtained via Routes URL dispatch. This implementation - juses the ``url_for`` Routes API to generate a URL based on - ``environ['wsgiorg.routing_args']``. Routes context objects, - unlike traversal-based context objects, cannot have a virtual root - that differs from its physical root; furthermore, the physical - root of a Routes context is always itself, so the ``virtual_root`` - function returns the context of this adapter unconditionally.""" - implements(IContextURL) - def __init__(self, context, request): - self.context = context - self.request = request + environ['wsgiorg.routing_args'] = ((), args) + environ['bfg.routes.route'] = route + environ['bfg.routes.matchdict'] = args + factory = route._factory or self.default_root_factory + return factory(environ) - def virtual_root(self): - return self.context + return self.default_root_factory(environ) - def __call__(self): - return url_for(**self.request.environ['wsgiorg.routing_args'][1]) diff --git a/repoze/bfg/view.py b/repoze/bfg/view.py index e6cd11939..7eb996a8d 100644 --- a/repoze/bfg/view.py +++ b/repoze/bfg/view.py @@ -146,7 +146,7 @@ class bfg_view(object): from repoze.bfg.interfaces import IRequest @bfg_view(name='my_view', request_type=IRequest, for_=IMyModel, - permission='read')) + permission='read', route_name='site1')) def my_view(context, request): return render_template_to_response('templates/my.pt') @@ -157,6 +157,7 @@ class bfg_view(object): view='.views.my_view' name='my_view' permission='read' + route_name='site1' /> If ``name`` is not supplied, the empty string is used (implying @@ -171,6 +172,10 @@ class bfg_view(object): If ``permission`` is not supplied, no permission is registered for this view (it's accessible by any caller). + If ``route_name`` is not supplied, the view declaration is considered + to be made against the 'default' route (the route which matches when + no ZCML-defined route matches the request). + Any individual or all parameters can be omitted. The simplest bfg_view declaration then becomes:: @@ -217,11 +222,13 @@ class bfg_view(object): <scan package="."/> """ - def __init__(self, name='', request_type=None, for_=None, permission=None): + def __init__(self, name='', request_type=None, for_=None, permission=None, + route_name=None): self.name = name self.request_type = request_type self.for_ = for_ self.permission = permission + self.route_name = route_name def __call__(self, wrapped): _bfg_view = wrapped @@ -244,5 +251,6 @@ class bfg_view(object): _bfg_view.__for__ = self.for_ _bfg_view.__view_name__ = self.name _bfg_view.__request_type__ = self.request_type + _bfg_view.__route_name__ = self.route_name return _bfg_view diff --git a/repoze/bfg/zcml.py b/repoze/bfg/zcml.py index bd12e926c..eeb8dfc32 100644 --- a/repoze/bfg/zcml.py +++ b/repoze/bfg/zcml.py @@ -3,7 +3,6 @@ import types from zope.configuration import xmlconfig -from zope.component import adaptedBy from zope.component import getSiteManager from zope.component import queryUtility @@ -19,13 +18,12 @@ from zope.interface import implements from zope.schema import Bool from zope.schema import TextLine -from repoze.bfg.interfaces import IRequest from repoze.bfg.interfaces import IRoutesMapper -from repoze.bfg.interfaces import IRoutesContext from repoze.bfg.interfaces import IViewPermission from repoze.bfg.interfaces import IView -from repoze.bfg.request import HTTP_METHOD_INTERFACES +from repoze.bfg.request import DEFAULT_REQUEST_FACTORIES +from repoze.bfg.request import named_request_factories from repoze.bfg.security import ViewPermissionFactory @@ -35,10 +33,6 @@ def handler(methodName, *args, **kwargs): method = getattr(getSiteManager(), methodName) method(*args, **kwargs) -class Uncacheable(object): - """ Include in discriminators of actions which are not cacheable; - this class only exists for backwards compatibility (<0.8.1)""" - def view( _context, permission=None, @@ -46,38 +40,29 @@ def view( view=None, name="", request_type=None, + route_name=None, cacheable=True, # not used, here for b/w compat < 0.8 ): if not view: raise ConfigurationError('"view" attribute was not specified') - # adapts() decorations may be used against either functions or - # class instances - if inspect.isfunction(view): - adapted_by = adaptedBy(view) + if route_name is None: + request_factories = DEFAULT_REQUEST_FACTORIES else: - adapted_by = adaptedBy(type(view)) - - if adapted_by is not None: try: - if for_ is None: - for_, _ = adapted_by - if request_type is None: - _, request_type = adapted_by - except ValueError: - # the component adaptation annotation does not conform to - # the view specification; we ignore it. - pass - - if request_type is None: - request_type = IRequest - - elif isinstance(request_type, basestring): - if request_type in HTTP_METHOD_INTERFACES: - request_type = HTTP_METHOD_INTERFACES[request_type] - else: - request_type = _context.resolve(request_type) + request_factories = _context.request_factories[route_name] + except KeyError: + raise ConfigurationError( + 'Unknown route_name "%s". <route> definitions must be ordered ' + 'before the view definition which mentions the route\'s name ' + 'within ZCML (or before the "scan" directive is invoked ' + 'within a bfg_view decorator).' % route_name) + + if request_type in request_factories: + request_type = request_factories[request_type]['interface'] + else: + request_type = _context.resolve(request_type) if inspect.isclass(view): # If the object we've located is a class, turn it into a @@ -110,8 +95,7 @@ def view( discriminator = ('view', for_, name, request_type, IView), callable = handler, args = ('registerAdapter', - view, (for_, request_type), IView, name, - _context.info), + view, (for_, request_type), IView, name, _context.info), ) class IViewDirective(Interface): @@ -148,7 +132,10 @@ class IViewDirective(Interface): required=False ) -PVERSION = 1 + route_name = TextLine( + title = u'The route that must match for this view to be used', + required = False) + def zcml_configure(name, package): context = zope.configuration.config.ConfigurationMachine() @@ -187,9 +174,11 @@ class BFGViewFunctionGrokker(martian.InstanceGrokker): for_ = obj.__for__ name = obj.__view_name__ request_type = obj.__request_type__ + route_name = obj.__route_name__ context = kw['context'] view(context, permission=permission, for_=for_, - view=obj, name=name, request_type=request_type) + view=obj, name=name, request_type=request_type, + route_name=route_name) return True return False @@ -209,7 +198,7 @@ class IRouteDirective(Interface): """ name = TextLine(title=u'name', required=True) path = TextLine(title=u'path', required=True) - view = GlobalObject(title=u'view', required=True) + view = GlobalObject(title=u'view', required=False) permission = TextLine(title=u'permission', required=False) factory = GlobalObject(title=u'context factory', required=False) minimize = Bool(title=u'minimize', required=False) @@ -275,10 +264,12 @@ def connect_route(directive): if conditions: kw['conditions'] = conditions - if directive.factory: - kw['_factory'] = directive.factory - - return mapper.connect(*args, **kw) + result = mapper.connect(*args, **kw) + route = mapper.matchlist[-1] + route._factory = directive.factory + context = directive.context + route.request_factories = context.request_factories[directive.name] + return result class Route(zope.configuration.config.GroupingContextDecorator): """ Handle ``route`` ZCML directives @@ -305,13 +296,12 @@ class Route(zope.configuration.config.GroupingContextDecorator): implements(zope.configuration.config.IConfigurationContext, IRouteDirective) - def __init__(self, context, path, name, view, **kw): + def __init__(self, context, path, name, **kw): self.validate(**kw) self.requirements = {} # mutated by subdirectives self.context = context self.path = path self.name = name - self.view = view self.__dict__.update(**kw) def validate(self, **kw): @@ -324,16 +314,27 @@ class Route(zope.configuration.config.GroupingContextDecorator): 'specified together') def after(self): - view(self.context, self.permission, IRoutesContext, self.view, - self.name, None) + context = self.context + name = self.name + if not hasattr(context, 'request_factories'): + context.request_factories = {} + context.request_factories[name] = named_request_factories(name) + + if self.view: + view(context, self.permission, None, self.view, '', + self.request_type, name) method = self.condition_method or self.request_type self.context.action( - discriminator = ('route', self.path, repr(self.requirements), + discriminator = ('route', self.name, repr(self.requirements), method, self.condition_subdomain, self.condition_function, self.subdomains), callable = connect_route, args = (self,), ) +class Uncacheable(object): + """ Include in discriminators of actions which are not cacheable; + this class only exists for backwards compatibility (<0.8.1)""" + |
