diff options
| author | Chris McDonough <chrism@agendaless.com> | 2008-07-07 04:44:57 +0000 |
|---|---|---|
| committer | Chris McDonough <chrism@agendaless.com> | 2008-07-07 04:44:57 +0000 |
| commit | 7de404bb4af2744a64c13e31a780fc0229b8f9e5 (patch) | |
| tree | 49f0b91b005777071050bf72732300f3bcd8d3ad /repoze | |
| parent | 93a4f5df2f74e4cbefc70733f2c0258859207106 (diff) | |
| download | pyramid-7de404bb4af2744a64c13e31a780fc0229b8f9e5.tar.gz pyramid-7de404bb4af2744a64c13e31a780fc0229b8f9e5.tar.bz2 pyramid-7de404bb4af2744a64c13e31a780fc0229b8f9e5.zip | |
Look up a view after traversal; adapt it to IWSGIApplication.
Diffstat (limited to 'repoze')
| -rw-r--r-- | repoze/bfg/interfaces.py | 54 | ||||
| -rw-r--r-- | repoze/bfg/location.py | 118 | ||||
| -rw-r--r-- | repoze/bfg/mapply.py | 71 | ||||
| -rw-r--r-- | repoze/bfg/router.py | 45 | ||||
| -rw-r--r-- | repoze/bfg/tests/test_router.py | 263 | ||||
| -rw-r--r-- | repoze/bfg/tests/test_traversal.py | 72 | ||||
| -rw-r--r-- | repoze/bfg/tests/test_wsgiadapter.py | 75 | ||||
| -rw-r--r-- | repoze/bfg/traversal.py | 43 | ||||
| -rw-r--r-- | repoze/bfg/wsgiadapter.py | 29 |
9 files changed, 568 insertions, 202 deletions
diff --git a/repoze/bfg/interfaces.py b/repoze/bfg/interfaces.py index b0b769b02..af68410dd 100644 --- a/repoze/bfg/interfaces.py +++ b/repoze/bfg/interfaces.py @@ -1,29 +1,45 @@ from zope.interface import Interface +from zope.interface import Attribute -class IWSGIApplicationFactory(Interface): - def __call__(context): - """ Return a WSGI (PEP333) application """ +class IResponse(Interface): + status = Attribute('WSGI status code of response') + headerlist = Attribute('List of response headers') + app_iter = Attribute('Iterable representing the response body') + +class IView(Interface): + def __call__(*arg, **kw): + """ Must return an object that implements IResponse; args are + mapped into an IView's __call__ by mapply-like code """ + +class IViewFactory(Interface): + def __call__(context, request): + """ Return an object that implements IView """ class IRootPolicy(Interface): def __call__(environ): """ Return a root object """ -class ITraversalPolicy(Interface): - def __call__(environ, root): - """ Return a tuple in the form (context, name, subpath) """ +class IPublishTraverser(Interface): + def __call__(path): + """ Return a tuple in the form (context, name, subpath), typically + the result of an object graph traversal """ -class ISecurityPolicy(Interface): - def __call__(environ, context, name): - """ Return a WSGI app on unauthorized or None to signify that - the request is allowed to continue """ - -class ITraverser(Interface): - def __init__(context): - """ Accept a context """ +class IPublishTraverserFactory(Interface): + def __call__(context, request): + """ Return an object that implements IPublishTraverser """ - def __call__(environ, name): - """ Return a subcontext or based on name """ - -class IWebObRequest(Interface): - """ Marker interface for a webob.Request object """ +class IWSGIApplication(Interface): + def __call__(environ, start_response): + """ A PEP 333 application """ + +class IWSGIApplicationFactory(Interface): + def __call__(view, request): + """ Return an object that implements IWSGIApplication """ + +class IRequest(Interface): + """ Marker interface for a request object """ +class ILocation(Interface): + """Objects that have a structural location""" + __parent__ = Attribute("The parent in the location hierarchy") + __name__ = Attribute("The name of the object") diff --git a/repoze/bfg/location.py b/repoze/bfg/location.py new file mode 100644 index 000000000..37e5386b3 --- /dev/null +++ b/repoze/bfg/location.py @@ -0,0 +1,118 @@ +############################################################################## +# +# Copyright (c) 2003 Zope Corporation and Contributors. +# All Rights Reserved. +# +# This software is subject to the provisions of the Zope Public License, +# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution. +# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED +# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS +# FOR A PARTICULAR PURPOSE. +# +############################################################################## + +import zope.interface +from zope.proxy import ProxyBase +from zope.proxy import getProxiedObject +from zope.proxy import non_overridable +from zope.proxy.decorator import DecoratorSpecificationDescriptor +from repoze.bfg.interfaces import ILocation + +class Location(object): + """Stupid mix-in that defines `__parent__` and `__name__` attributes.""" + + zope.interface.implements(ILocation) + + __parent__ = __name__ = None + +def locate(object, parent, name=None): + """Locate an object in another + + This method should only be called from trusted code, because it + sets attributes that are normally unsettable. + """ + + object.__parent__ = parent + object.__name__ = name + + +def located(object, parent, name=None): + """Locate an object in another and return it. + + If the object does not provide ILocation a LocationProxy is returned. + + """ + if ILocation.providedBy(object): + if parent is not object.__parent__ or name != object.__name__: + locate(object, parent, name) + return object + return LocationProxy(object, parent, name) + + +def LocationIterator(object): + while object is not None: + yield object + object = getattr(object, '__parent__', None) + + +def inside(l1, l2): + """Is l1 inside l2 + + L1 is inside l2 if l2 is an ancestor of l1. + + """ + while l1 is not None: + if l1 is l2: + return True + l1 = l1.__parent__ + + return False + + +class ClassAndInstanceDescr(object): + + def __init__(self, *args): + self.funcs = args + + def __get__(self, inst, cls): + if inst is None: + return self.funcs[1](cls) + return self.funcs[0](inst) + + +class LocationProxy(ProxyBase): + """Location-object proxy + + This is a non-picklable proxy that can be put around objects that + don't implement `ILocation`. + + """ + + zope.interface.implements(ILocation) + + __slots__ = '__parent__', '__name__' + __safe_for_unpickling__ = True + + def __new__(self, ob, container=None, name=None): + return ProxyBase.__new__(self, ob) + + def __init__(self, ob, container=None, name=None): + ProxyBase.__init__(self, ob) + self.__parent__ = container + self.__name__ = name + + @non_overridable + def __reduce__(self, proto=None): + raise TypeError("Not picklable") + + + __doc__ = ClassAndInstanceDescr( + lambda inst: getProxiedObject(inst).__doc__, + lambda cls, __doc__ = __doc__: __doc__, + ) + + __reduce_ex__ = __reduce__ + + __providedBy__ = DecoratorSpecificationDescriptor() + diff --git a/repoze/bfg/mapply.py b/repoze/bfg/mapply.py new file mode 100644 index 000000000..421db2ce3 --- /dev/null +++ b/repoze/bfg/mapply.py @@ -0,0 +1,71 @@ +############################################################################## +# +# Copyright (c) 2002 Zope Corporation and Contributors. All Rights Reserved. +# +# This software is subject to the provisions of the Zope Public License, +# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution. +# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED +# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS +# FOR A PARTICULAR PURPOSE +# +############################################################################## +"""Provide an apply-like facility that works with any mapping object +""" + +def mapply(object, + positional=(), + keyword={}, + maybe=True, + ): + + if hasattr(object,'__bases__'): # the object is a class + raise TypeError('Cannot publish class %s' % object) + else: + f=object + im=0 + if hasattr(f, 'im_func'): + im=1 + elif not hasattr(f,'func_defaults'): + if hasattr(f, '__call__'): + f=f.__call__ + if hasattr(f, 'im_func'): + im=1 + elif not hasattr(f,'func_defaults') and maybe: + return object + elif maybe: + return object + + if im: + f=f.im_func + c=f.func_code + defaults=f.func_defaults + names=c.co_varnames[1:c.co_argcount] + else: + defaults=f.func_defaults + c=f.func_code + names=c.co_varnames[:c.co_argcount] + + nargs=len(names) + if positional: + positional=list(positional) + if len(positional) > nargs: + raise TypeError('too many arguments') + args=positional + else: + args=[] + + get=keyword.get + nrequired=len(names) - (len(defaults or ())) + for index in range(len(args), len(names)): + name=names[index] + v=get(name, args) + if v is args: + if index < nrequired: + raise TypeError('Argument %s was omitted' % name) + else: + v=defaults[index-nrequired] + args.append(v) + + args=tuple(args) + return object(*args) diff --git a/repoze/bfg/router.py b/repoze/bfg/router.py index fde50b528..4f73ddf32 100644 --- a/repoze/bfg/router.py +++ b/repoze/bfg/router.py @@ -1,28 +1,45 @@ +from zope.component import getGlobalSiteManager +from zope.component import getMultiAdapter from zope.component import queryMultiAdapter from zope.interface import directlyProvides from webob import Request from webob.exc import HTTPNotFound +from repoze.bfg.interfaces import IPublishTraverserFactory +from repoze.bfg.interfaces import IViewFactory from repoze.bfg.interfaces import IWSGIApplicationFactory -from repoze.bfg.interfaces import IWebObRequest + +from repoze.bfg.interfaces import IRequest + +_marker = () class Router: - def __init__(self, root_policy, traversal_policy, security_policy): + def __init__(self, root_policy): self.root_policy = root_policy - self.traversal_policy = traversal_policy - self.security_policy = security_policy def __call__(self, environ, start_response): + request = Request(environ) + directlyProvides(request, IRequest) root = self.root_policy(environ) - context, name, subpath = self.traversal_policy(environ, root) - app = self.security_policy(environ, context, name) - if app is None: - environ['repoze.bfg.subpath'] = subpath - request = Request(environ) - directlyProvides(request, IWebObRequest) - app = queryMultiAdapter((context, request), - IWSGIApplicationFactory, name=name) - if app is None: - app = HTTPNotFound(request.url) + path = environ.get('PATH_INFO', '/') + traverser = getMultiAdapter((root, request), IPublishTraverserFactory) + context, name, subpath = traverser(path) + request.subpath = subpath + app = queryMultiAdapter((context, request), IViewFactory, name=name, + default=_marker) + if app is _marker: + app = HTTPNotFound(request.url) + else: + app = getMultiAdapter((app, request), IWSGIApplicationFactory) return app(environ, start_response) + +def make_app(**config): + from repoze.bfg.traversal import NaivePublishTraverser + from repoze.bfg.wsgiadapter import NaiveWSGIViewAdapter + gsm = getGlobalSiteManager() + gsm.registerAdapter(NaivePublishTraverser, (None, None), + IPublishTraverserFactory) + gsm.registerAdapter(NaiveWSGIViewAdapter, (None, None), + IWSGIApplicationFactory) + diff --git a/repoze/bfg/tests/test_router.py b/repoze/bfg/tests/test_router.py index c684e06b5..bfa72fe41 100644 --- a/repoze/bfg/tests/test_router.py +++ b/repoze/bfg/tests/test_router.py @@ -9,12 +9,24 @@ class RouterTests(unittest.TestCase, PlacelessSetup): def tearDown(self): PlacelessSetup.tearDown(self) - def _registerFactory(self, app, name, *for_): + def _registerTraverserFactory(self, app, name, *for_): + import zope.component + gsm = zope.component.getGlobalSiteManager() + from repoze.bfg.interfaces import IPublishTraverserFactory + gsm.registerAdapter(app, for_, IPublishTraverserFactory, name) + + def _registerViewFactory(self, app, name, *for_): + import zope.component + gsm = zope.component.getGlobalSiteManager() + from repoze.bfg.interfaces import IViewFactory + gsm.registerAdapter(app, for_, IViewFactory, name) + + def _registerWSGIFactory(self, app, name, *for_): import zope.component gsm = zope.component.getGlobalSiteManager() from repoze.bfg.interfaces import IWSGIApplicationFactory gsm.registerAdapter(app, for_, IWSGIApplicationFactory, name) - + def _getTargetClass(self): from repoze.bfg.router import Router return Router @@ -33,151 +45,166 @@ class RouterTests(unittest.TestCase, PlacelessSetup): environ.update(extras) return environ - def test_call_no_app_registered(self): - statii = [] - headerii = [] - def rootpolicy(environ): - return None - def traversalpolicy(environ, root): - return DummyContext(), 'foo', [] - def securitypolicy(environ, context, name): - return None - def start_response(status, headers): - statii[:] = [status] - headerii[:] = [headers] + def test_call_no_view_registered(self): + rootpolicy = make_rootpolicy(None) environ = self._makeEnviron() - router = self._makeOne(rootpolicy, traversalpolicy, securitypolicy) + context = DummyContext() + traversalfactory = make_traversal_factory(context, '', []) + self._registerTraverserFactory(traversalfactory, '', None, None) + router = self._makeOne(rootpolicy) + start_response = DummyStartResponse() result = router(environ, start_response) - headers = headerii[0] + headers = start_response.headers self.assertEqual(len(headers), 2) - status = statii[0] + status = start_response.status self.assertEqual(status, '404 Not Found') self.failUnless('http://localhost:8080' in result[0], result) - def test_call_securitypolicy_denies(self): - statii = [] - headerii = [] - def rootpolicy(environ): - return None - def traversalpolicy(environ, root): - return DummyContext(), 'foo', [] - def securitypolicy(environ, context, name): - from webob.exc import HTTPUnauthorized - return HTTPUnauthorized() - def start_response(status, headers): - statii[:] = [status] - headerii[:] = [headers] - environ = self._makeEnviron() - router = self._makeOne(rootpolicy, traversalpolicy, securitypolicy) - result = router(environ, start_response) - headers = headerii[0] - self.assertEqual(len(headers), 2) - status = statii[0] - self.assertEqual(status, '401 Unauthorized') - - def test_call_app_registered_nonspecific_default_path(self): - def rootpolicy(environ): - return None + def test_call_view_registered_nonspecific_default_path(self): + rootpolicy = make_rootpolicy(None) context = DummyContext() - def traversalpolicy(environ, root): - return context, '', [] - def start_response(status, headers): - pass - def securitypolicy(environ, context, name): - return None + traversalfactory = make_traversal_factory(context, '', []) + response = DummyResponse() + viewfactory = make_view_factory(response) + wsgifactory = make_wsgi_factory('200 OK', (), ['Hello world']) environ = self._makeEnviron() - self._registerFactory(DummyWSGIApplicationFactory, '', None, None) - router = self._makeOne(rootpolicy, traversalpolicy, securitypolicy) + self._registerTraverserFactory(traversalfactory, '', None, None) + self._registerViewFactory(viewfactory, '', None, None) + self._registerWSGIFactory(wsgifactory, '', None, None) + router = self._makeOne(rootpolicy) + start_response = DummyStartResponse() result = router(environ, start_response) - self.failUnless(result[0] is context) - import webob - self.failUnless(isinstance(result[1], webob.Request)) - self.assertEqual(environ['repoze.bfg.subpath'], []) - - def test_call_app_registered_nonspecific_nondefault_path_and_subpath(self): - def rootpolicy(environ): - return None + self.assertEqual(result, ['Hello world']) + self.assertEqual(start_response.headers, ()) + self.assertEqual(start_response.status, '200 OK') + request = environ['request'] + self.assertEqual(environ['request'].subpath, []) + self.assertEqual(environ['view'].context, context) + + def test_call_view_registered_nonspecific_nondefault_path_and_subpath(self): + rootpolicy = make_rootpolicy(None) context = DummyContext() - def traversalpolicy(environ, root): - return context, 'foo', ['bar', 'baz'] - def start_response(status, headers): - pass - def securitypolicy(environ, context, name): - return None + traversalfactory = make_traversal_factory(context, 'foo', ['bar']) + response = DummyResponse() + viewfactory = make_view_factory(response) + wsgifactory = make_wsgi_factory('200 OK', (), ['Hello world']) environ = self._makeEnviron() - self._registerFactory(DummyWSGIApplicationFactory, 'foo', None, None) - router = self._makeOne(rootpolicy, traversalpolicy, securitypolicy) + self._registerTraverserFactory(traversalfactory, '', None, None) + self._registerViewFactory(viewfactory, 'foo', None, None) + self._registerWSGIFactory(wsgifactory, '', None, None) + router = self._makeOne(rootpolicy) + start_response = DummyStartResponse() result = router(environ, start_response) - self.failUnless(result[0] is context) - import webob - self.failUnless(isinstance(result[1], webob.Request)) - self.assertEqual(environ['repoze.bfg.subpath'], ['bar', 'baz']) - - def test_call_app_registered_specific_success(self): - def rootpolicy(environ): - return None - context = DummyContext() + self.assertEqual(result, ['Hello world']) + self.assertEqual(start_response.headers, ()) + self.assertEqual(start_response.status, '200 OK') + request = environ['request'] + self.assertEqual(environ['request'].subpath, ['bar']) + self.assertEqual(environ['view'].context, context) + + def test_call_view_registered_specific_success(self): + rootpolicy = make_rootpolicy(None) from zope.interface import Interface from zope.interface import directlyProvides class IContext(Interface): pass + from repoze.bfg.interfaces import IRequest + context = DummyContext() directlyProvides(context, IContext) - def traversalpolicy(environ, root): - return context, 'foo', ['bar', 'baz'] - def start_response(status, headers): - pass - def securitypolicy(environ, context, name): - return None + traversalfactory = make_traversal_factory(context, '', []) + response = DummyResponse() + viewfactory = make_view_factory(response) + wsgifactory = make_wsgi_factory('200 OK', (), ['Hello world']) environ = self._makeEnviron() - from repoze.bfg.interfaces import IWebObRequest - self._registerFactory(DummyWSGIApplicationFactory, 'foo', IContext, - IWebObRequest) - router = self._makeOne(rootpolicy, traversalpolicy, securitypolicy) + self._registerTraverserFactory(traversalfactory, '', None, None) + self._registerViewFactory(viewfactory, '', IContext, IRequest) + self._registerWSGIFactory(wsgifactory, '', None, None) + router = self._makeOne(rootpolicy) + start_response = DummyStartResponse() result = router(environ, start_response) - self.failUnless(result[0] is context) - import webob - self.failUnless(isinstance(result[1], webob.Request)) - self.assertEqual(environ['repoze.bfg.subpath'], ['bar', 'baz']) + self.assertEqual(result, ['Hello world']) + self.assertEqual(start_response.headers, ()) + self.assertEqual(start_response.status, '200 OK') + request = environ['request'] + self.assertEqual(environ['request'].subpath, []) + self.assertEqual(environ['view'].context, context) - def test_call_app_registered_specific_fail(self): - context = DummyContext() + def test_call_view_registered_specific_fail(self): + rootpolicy = make_rootpolicy(None) from zope.interface import Interface from zope.interface import directlyProvides - class INotContext(Interface): - pass class IContext(Interface): pass + class INotContext(Interface): + pass + from repoze.bfg.interfaces import IRequest + context = DummyContext() directlyProvides(context, INotContext) - statii = [] - headerii = [] - def rootpolicy(environ): - return None - def traversalpolicy(environ, root): - return context, 'foo', [] - def start_response(status, headers): - statii[:] = [status] - headerii[:] = [headers] - def securitypolicy(environ, context, name): - return None + traversalfactory = make_traversal_factory(context, '', ['']) + response = DummyResponse() + viewfactory = make_view_factory(response) + wsgifactory = make_wsgi_factory('200 OK', (), ['Hello world']) environ = self._makeEnviron() - from repoze.bfg.interfaces import IWebObRequest - self._registerFactory(DummyWSGIApplicationFactory, 'foo', IContext, - IWebObRequest) - router = self._makeOne(rootpolicy, traversalpolicy, securitypolicy) + self._registerTraverserFactory(traversalfactory, '', None, None) + self._registerViewFactory(viewfactory, '', IContext, IRequest) + self._registerWSGIFactory(wsgifactory, '', None, None) + router = self._makeOne(rootpolicy) + start_response = DummyStartResponse() result = router(environ, start_response) - headers = headerii[0] - self.assertEqual(len(headers), 2) - status = statii[0] - self.assertEqual(status, '404 Not Found') - self.failUnless('http://localhost:8080' in result[0], result) + self.failUnless('404' in result[0]) + self.assertEqual(start_response.status, '404 Not Found') class DummyContext: pass -class DummyWSGIApplicationFactory: - def __init__(self, context, request): - self.context = context - self.request = request +def make_wsgi_factory(status, headers, app_iter): + class DummyWSGIApplicationFactory: + def __init__(self, view, request): + self.view = view + self.request = request + + def __call__(self, environ, start_response): + environ['view'] = self.view + environ['request'] = self.request + start_response(status, headers) + return app_iter + + return DummyWSGIApplicationFactory + +def make_view_factory(response): + class DummyViewFactory: + def __init__(self, context, request): + self.context = context + self.request = request + + def __call__(self): + return response + return DummyViewFactory - def __call__(self, environ, start_response): - return self.context, self.request +def make_traversal_factory(context, name, subpath): + class DummyTraversalFactory: + def __init__(self, root, request): + self.root = root + self.request = request + + def __call__(self, path): + return context, name, subpath + return DummyTraversalFactory + +def make_rootpolicy(root): + def rootpolicy(environ): + return root + return rootpolicy + +class DummyStartResponse: + status = () + headers = () + def __call__(self, status, headers): + self.status = status + self.headers = headers + +class DummyResponse: + status = '200 OK' + headerlist = () + app_iter = () + diff --git a/repoze/bfg/tests/test_traversal.py b/repoze/bfg/tests/test_traversal.py index bb0b98b79..115086b8b 100644 --- a/repoze/bfg/tests/test_traversal.py +++ b/repoze/bfg/tests/test_traversal.py @@ -36,62 +36,61 @@ class NaivePolicyTests(unittest.TestCase, PlacelessSetup): PlacelessSetup.tearDown(self) def _getTargetClass(self): - from repoze.bfg.traversal import NaiveTraversalPolicy - return NaiveTraversalPolicy + from repoze.bfg.traversal import NaivePublishTraverser + return NaivePublishTraverser def _makeOne(self, *arg, **kw): import zope.component gsm = zope.component.getGlobalSiteManager() - from repoze.bfg.interfaces import ITraverser - gsm.registerAdapter(DummyTraverser, (None,), ITraverser, '') klass = self._getTargetClass() return klass(*arg, **kw) - def test_class_conforms_to_ITraversalPolicy(self): + def test_class_conforms_to_IPublishTraverser(self): from zope.interface.verify import verifyClass - from repoze.bfg.interfaces import ITraversalPolicy - verifyClass(ITraversalPolicy, self._getTargetClass()) + from repoze.bfg.interfaces import IPublishTraverser + verifyClass(IPublishTraverser, self._getTargetClass()) - def test_instance_conforms_to_ITraversalPolicy(self): + def test_instance_conforms_to_IPublishTraverser(self): from zope.interface.verify import verifyObject - from repoze.bfg.interfaces import ITraversalPolicy - verifyObject(ITraversalPolicy, self._makeOne()) + from repoze.bfg.interfaces import IPublishTraverser + context = DummyContext() + request = DummyRequest() + verifyObject(IPublishTraverser, self._makeOne(context, request)) - def test_call_nonkeyerror_raises(self): - policy = self._makeOne() - environ = {'PATH_INFO':'/foo'} - root = None - self.assertRaises(TypeError, policy, environ, root) + def test_call_pathel_with_no_getitem(self): + request = DummyRequest() + policy = self._makeOne(None, request) + ctx, name, subpath = policy('/foo/bar') + self.assertEqual(ctx, None) + self.assertEqual(name, 'foo') + self.assertEqual(subpath, ['bar']) def test_call_withconn_getitem_emptypath_nosubpath(self): - policy = self._makeOne() - context = DummyContext() - environ = {'PATH_INFO':''} - root = context - ctx, name, subpath = policy(environ, root) - self.assertEqual(context, ctx) + root = DummyContext() + request = DummyRequest() + policy = self._makeOne(root, request) + ctx, name, subpath = policy('') + self.assertEqual(ctx, root) self.assertEqual(name, '') self.assertEqual(subpath, []) def test_call_withconn_getitem_withpath_nosubpath(self): - policy = self._makeOne() - context = DummyContext() - context2 = DummyContext(context) - environ = {'PATH_INFO':'/foo/bar'} - root = context - ctx, name, subpath = policy(environ, root) - self.assertEqual(context, ctx) + foo = DummyContext() + root = DummyContext(foo) + request = DummyRequest() + policy = self._makeOne(root, request) + ctx, name, subpath = policy('/foo/bar') + self.assertEqual(ctx, foo) self.assertEqual(name, 'bar') self.assertEqual(subpath, []) def test_call_withconn_getitem_withpath_withsubpath(self): - policy = self._makeOne() - context = DummyContext() - context2 = DummyContext(context) - environ = {'PATH_INFO':'/foo/bar/baz/buz'} - root = context - ctx, name, subpath = policy(environ, root) - self.assertEqual(context, ctx) + foo = DummyContext() + request = DummyRequest() + root = DummyContext(foo) + policy = self._makeOne(root, request) + ctx, name, subpath = policy('/foo/bar/baz/buz') + self.assertEqual(ctx, foo) self.assertEqual(name, 'bar') self.assertEqual(subpath, ['baz', 'buz']) @@ -103,6 +102,9 @@ class DummyContext: if self.next is None: raise KeyError, name return self.next + +class DummyRequest: + pass class DummyTraverser: def __init__(self, context): diff --git a/repoze/bfg/tests/test_wsgiadapter.py b/repoze/bfg/tests/test_wsgiadapter.py new file mode 100644 index 000000000..71e2f5c81 --- /dev/null +++ b/repoze/bfg/tests/test_wsgiadapter.py @@ -0,0 +1,75 @@ +import unittest + +class NaiveWSGIAdapterTests(unittest.TestCase): + def _getTargetClass(self): + from repoze.bfg.wsgiadapter import NaiveWSGIViewAdapter + return NaiveWSGIViewAdapter + + def _makeOne(self, *arg, **kw): + klass = self._getTargetClass() + return klass(*arg, **kw) + + def test_view_takes_no_args(self): + response = DummyResponse() + response.app_iter = ['Hello world'] + def view(): + return response + request = DummyRequest() + adapter = self._makeOne(view, request) + environ = {} + start_response = DummyStartResponse() + result = adapter(environ, start_response) + self.assertEqual(result, ['Hello world']) + self.assertEqual(start_response.headers, ()) + self.assertEqual(start_response.status, '200 OK') + + def test_view_takes_pep_333_args(self): + response = DummyResponse() + response.app_iter = ['Hello world'] + def view(environ, start_response): + response.environ = environ + response.start_response = start_response + return response + request = DummyRequest() + adapter = self._makeOne(view, request) + environ = {} + start_response = DummyStartResponse() + result = adapter(environ, start_response) + self.assertEqual(result, ['Hello world']) + self.assertEqual(start_response.headers, ()) + self.assertEqual(start_response.status, '200 OK') + self.assertEqual(response.environ, environ) + self.assertEqual(response.start_response, start_response) + + def test_view_takes_zopey_args(self): + request = DummyRequest() + response = DummyResponse() + response.app_iter = ['Hello world'] + def view(request): + response.request = request + return response + adapter = self._makeOne(view, request) + environ = {} + start_response = DummyStartResponse() + result = adapter(environ, start_response) + self.assertEqual(result, ['Hello world']) + self.assertEqual(start_response.headers, ()) + self.assertEqual(start_response.status, '200 OK') + self.assertEqual(response.request, request) + +class DummyRequest: + pass + +class DummyResponse: + status = '200 OK' + headerlist = () + app_iter = () + +class DummyStartResponse: + status = None + headers = None + def __call__(self, status, headers): + self.status = status + self.headers = headers + + diff --git a/repoze/bfg/traversal.py b/repoze/bfg/traversal.py index 1b9cfb332..9380b2a0a 100644 --- a/repoze/bfg/traversal.py +++ b/repoze/bfg/traversal.py @@ -1,9 +1,10 @@ import urllib +from zope.interface import classProvides from zope.interface import implements -from repoze.bfg.interfaces import ITraversalPolicy -from repoze.bfg.interfaces import ITraverser +from repoze.bfg.interfaces import IPublishTraverser +from repoze.bfg.interfaces import IPublishTraverserFactory def split_path(path): if path.startswith('/'): @@ -21,27 +22,37 @@ def split_path(path): clean.append(segment) return clean -class NaiveTraversalPolicy: - implements(ITraversalPolicy) +def step(ob, name, default): + if not hasattr(ob, '__getitem__'): + return default + try: + return ob[name] + except KeyError: + return default - def __call__(self, environ, root): - path = split_path(environ['PATH_INFO']) - - ob = root +_marker = () + +class NaivePublishTraverser: + classProvides(IPublishTraverserFactory) + implements(IPublishTraverser) + def __init__(self, root, request): + self.root = root + self.request = request + + def __call__(self, path): + path = split_path(path) + + ob = self.root name = '' while path: - segment = pop(path) - traverser = ITraverser(ob) - next = traverser(environ, segment) - if next is None: - if path: - name = pop(path) + segment = path.pop(0) + next = step(ob, segment, _marker) + if next is _marker: + name = segment break ob = next return ob, name, path -def pop(path): - return path.pop(0) diff --git a/repoze/bfg/wsgiadapter.py b/repoze/bfg/wsgiadapter.py new file mode 100644 index 000000000..f76360c27 --- /dev/null +++ b/repoze/bfg/wsgiadapter.py @@ -0,0 +1,29 @@ +from zope.interface import implements +from zope.interface import classProvides + +from repoze.bfg.interfaces import IWSGIApplicationFactory +from repoze.bfg.interfaces import IWSGIApplication +from repoze.bfg.mapply import mapply + +class NaiveWSGIViewAdapter: + classProvides(IWSGIApplicationFactory) + implements(IWSGIApplication) + + def __init__(self, view, request): + self.view = view + self.request = request + + def __call__(self, environ, start_response): + catch_response = [] + def replace_start_response(status, headers): + catch_response[:] = (status, headers) + kwdict = { + 'request':self.request, + 'environ':environ, + 'start_response':start_response, + } + response = mapply(self.view, positional = (), keyword = kwdict) + if not catch_response: + catch_response = (response.status, response.headerlist) + start_response(*catch_response) + return response.app_iter |
