summaryrefslogtreecommitdiff
path: root/repoze
diff options
context:
space:
mode:
authorChris McDonough <chrism@agendaless.com>2008-07-07 04:44:57 +0000
committerChris McDonough <chrism@agendaless.com>2008-07-07 04:44:57 +0000
commit7de404bb4af2744a64c13e31a780fc0229b8f9e5 (patch)
tree49f0b91b005777071050bf72732300f3bcd8d3ad /repoze
parent93a4f5df2f74e4cbefc70733f2c0258859207106 (diff)
downloadpyramid-7de404bb4af2744a64c13e31a780fc0229b8f9e5.tar.gz
pyramid-7de404bb4af2744a64c13e31a780fc0229b8f9e5.tar.bz2
pyramid-7de404bb4af2744a64c13e31a780fc0229b8f9e5.zip
Look up a view after traversal; adapt it to IWSGIApplication.
Diffstat (limited to 'repoze')
-rw-r--r--repoze/bfg/interfaces.py54
-rw-r--r--repoze/bfg/location.py118
-rw-r--r--repoze/bfg/mapply.py71
-rw-r--r--repoze/bfg/router.py45
-rw-r--r--repoze/bfg/tests/test_router.py263
-rw-r--r--repoze/bfg/tests/test_traversal.py72
-rw-r--r--repoze/bfg/tests/test_wsgiadapter.py75
-rw-r--r--repoze/bfg/traversal.py43
-rw-r--r--repoze/bfg/wsgiadapter.py29
9 files changed, 568 insertions, 202 deletions
diff --git a/repoze/bfg/interfaces.py b/repoze/bfg/interfaces.py
index b0b769b02..af68410dd 100644
--- a/repoze/bfg/interfaces.py
+++ b/repoze/bfg/interfaces.py
@@ -1,29 +1,45 @@
from zope.interface import Interface
+from zope.interface import Attribute
-class IWSGIApplicationFactory(Interface):
- def __call__(context):
- """ Return a WSGI (PEP333) application """
+class IResponse(Interface):
+ status = Attribute('WSGI status code of response')
+ headerlist = Attribute('List of response headers')
+ app_iter = Attribute('Iterable representing the response body')
+
+class IView(Interface):
+ def __call__(*arg, **kw):
+ """ Must return an object that implements IResponse; args are
+ mapped into an IView's __call__ by mapply-like code """
+
+class IViewFactory(Interface):
+ def __call__(context, request):
+ """ Return an object that implements IView """
class IRootPolicy(Interface):
def __call__(environ):
""" Return a root object """
-class ITraversalPolicy(Interface):
- def __call__(environ, root):
- """ Return a tuple in the form (context, name, subpath) """
+class IPublishTraverser(Interface):
+ def __call__(path):
+ """ Return a tuple in the form (context, name, subpath), typically
+ the result of an object graph traversal """
-class ISecurityPolicy(Interface):
- def __call__(environ, context, name):
- """ Return a WSGI app on unauthorized or None to signify that
- the request is allowed to continue """
-
-class ITraverser(Interface):
- def __init__(context):
- """ Accept a context """
+class IPublishTraverserFactory(Interface):
+ def __call__(context, request):
+ """ Return an object that implements IPublishTraverser """
- def __call__(environ, name):
- """ Return a subcontext or based on name """
-
-class IWebObRequest(Interface):
- """ Marker interface for a webob.Request object """
+class IWSGIApplication(Interface):
+ def __call__(environ, start_response):
+ """ A PEP 333 application """
+
+class IWSGIApplicationFactory(Interface):
+ def __call__(view, request):
+ """ Return an object that implements IWSGIApplication """
+
+class IRequest(Interface):
+ """ Marker interface for a request object """
+class ILocation(Interface):
+ """Objects that have a structural location"""
+ __parent__ = Attribute("The parent in the location hierarchy")
+ __name__ = Attribute("The name of the object")
diff --git a/repoze/bfg/location.py b/repoze/bfg/location.py
new file mode 100644
index 000000000..37e5386b3
--- /dev/null
+++ b/repoze/bfg/location.py
@@ -0,0 +1,118 @@
+##############################################################################
+#
+# Copyright (c) 2003 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+
+import zope.interface
+from zope.proxy import ProxyBase
+from zope.proxy import getProxiedObject
+from zope.proxy import non_overridable
+from zope.proxy.decorator import DecoratorSpecificationDescriptor
+from repoze.bfg.interfaces import ILocation
+
+class Location(object):
+ """Stupid mix-in that defines `__parent__` and `__name__` attributes."""
+
+ zope.interface.implements(ILocation)
+
+ __parent__ = __name__ = None
+
+def locate(object, parent, name=None):
+ """Locate an object in another
+
+ This method should only be called from trusted code, because it
+ sets attributes that are normally unsettable.
+ """
+
+ object.__parent__ = parent
+ object.__name__ = name
+
+
+def located(object, parent, name=None):
+ """Locate an object in another and return it.
+
+ If the object does not provide ILocation a LocationProxy is returned.
+
+ """
+ if ILocation.providedBy(object):
+ if parent is not object.__parent__ or name != object.__name__:
+ locate(object, parent, name)
+ return object
+ return LocationProxy(object, parent, name)
+
+
+def LocationIterator(object):
+ while object is not None:
+ yield object
+ object = getattr(object, '__parent__', None)
+
+
+def inside(l1, l2):
+ """Is l1 inside l2
+
+ L1 is inside l2 if l2 is an ancestor of l1.
+
+ """
+ while l1 is not None:
+ if l1 is l2:
+ return True
+ l1 = l1.__parent__
+
+ return False
+
+
+class ClassAndInstanceDescr(object):
+
+ def __init__(self, *args):
+ self.funcs = args
+
+ def __get__(self, inst, cls):
+ if inst is None:
+ return self.funcs[1](cls)
+ return self.funcs[0](inst)
+
+
+class LocationProxy(ProxyBase):
+ """Location-object proxy
+
+ This is a non-picklable proxy that can be put around objects that
+ don't implement `ILocation`.
+
+ """
+
+ zope.interface.implements(ILocation)
+
+ __slots__ = '__parent__', '__name__'
+ __safe_for_unpickling__ = True
+
+ def __new__(self, ob, container=None, name=None):
+ return ProxyBase.__new__(self, ob)
+
+ def __init__(self, ob, container=None, name=None):
+ ProxyBase.__init__(self, ob)
+ self.__parent__ = container
+ self.__name__ = name
+
+ @non_overridable
+ def __reduce__(self, proto=None):
+ raise TypeError("Not picklable")
+
+
+ __doc__ = ClassAndInstanceDescr(
+ lambda inst: getProxiedObject(inst).__doc__,
+ lambda cls, __doc__ = __doc__: __doc__,
+ )
+
+ __reduce_ex__ = __reduce__
+
+ __providedBy__ = DecoratorSpecificationDescriptor()
+
diff --git a/repoze/bfg/mapply.py b/repoze/bfg/mapply.py
new file mode 100644
index 000000000..421db2ce3
--- /dev/null
+++ b/repoze/bfg/mapply.py
@@ -0,0 +1,71 @@
+##############################################################################
+#
+# Copyright (c) 2002 Zope Corporation and Contributors. All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+"""Provide an apply-like facility that works with any mapping object
+"""
+
+def mapply(object,
+ positional=(),
+ keyword={},
+ maybe=True,
+ ):
+
+ if hasattr(object,'__bases__'): # the object is a class
+ raise TypeError('Cannot publish class %s' % object)
+ else:
+ f=object
+ im=0
+ if hasattr(f, 'im_func'):
+ im=1
+ elif not hasattr(f,'func_defaults'):
+ if hasattr(f, '__call__'):
+ f=f.__call__
+ if hasattr(f, 'im_func'):
+ im=1
+ elif not hasattr(f,'func_defaults') and maybe:
+ return object
+ elif maybe:
+ return object
+
+ if im:
+ f=f.im_func
+ c=f.func_code
+ defaults=f.func_defaults
+ names=c.co_varnames[1:c.co_argcount]
+ else:
+ defaults=f.func_defaults
+ c=f.func_code
+ names=c.co_varnames[:c.co_argcount]
+
+ nargs=len(names)
+ if positional:
+ positional=list(positional)
+ if len(positional) > nargs:
+ raise TypeError('too many arguments')
+ args=positional
+ else:
+ args=[]
+
+ get=keyword.get
+ nrequired=len(names) - (len(defaults or ()))
+ for index in range(len(args), len(names)):
+ name=names[index]
+ v=get(name, args)
+ if v is args:
+ if index < nrequired:
+ raise TypeError('Argument %s was omitted' % name)
+ else:
+ v=defaults[index-nrequired]
+ args.append(v)
+
+ args=tuple(args)
+ return object(*args)
diff --git a/repoze/bfg/router.py b/repoze/bfg/router.py
index fde50b528..4f73ddf32 100644
--- a/repoze/bfg/router.py
+++ b/repoze/bfg/router.py
@@ -1,28 +1,45 @@
+from zope.component import getGlobalSiteManager
+from zope.component import getMultiAdapter
from zope.component import queryMultiAdapter
from zope.interface import directlyProvides
from webob import Request
from webob.exc import HTTPNotFound
+from repoze.bfg.interfaces import IPublishTraverserFactory
+from repoze.bfg.interfaces import IViewFactory
from repoze.bfg.interfaces import IWSGIApplicationFactory
-from repoze.bfg.interfaces import IWebObRequest
+
+from repoze.bfg.interfaces import IRequest
+
+_marker = ()
class Router:
- def __init__(self, root_policy, traversal_policy, security_policy):
+ def __init__(self, root_policy):
self.root_policy = root_policy
- self.traversal_policy = traversal_policy
- self.security_policy = security_policy
def __call__(self, environ, start_response):
+ request = Request(environ)
+ directlyProvides(request, IRequest)
root = self.root_policy(environ)
- context, name, subpath = self.traversal_policy(environ, root)
- app = self.security_policy(environ, context, name)
- if app is None:
- environ['repoze.bfg.subpath'] = subpath
- request = Request(environ)
- directlyProvides(request, IWebObRequest)
- app = queryMultiAdapter((context, request),
- IWSGIApplicationFactory, name=name)
- if app is None:
- app = HTTPNotFound(request.url)
+ path = environ.get('PATH_INFO', '/')
+ traverser = getMultiAdapter((root, request), IPublishTraverserFactory)
+ context, name, subpath = traverser(path)
+ request.subpath = subpath
+ app = queryMultiAdapter((context, request), IViewFactory, name=name,
+ default=_marker)
+ if app is _marker:
+ app = HTTPNotFound(request.url)
+ else:
+ app = getMultiAdapter((app, request), IWSGIApplicationFactory)
return app(environ, start_response)
+
+def make_app(**config):
+ from repoze.bfg.traversal import NaivePublishTraverser
+ from repoze.bfg.wsgiadapter import NaiveWSGIViewAdapter
+ gsm = getGlobalSiteManager()
+ gsm.registerAdapter(NaivePublishTraverser, (None, None),
+ IPublishTraverserFactory)
+ gsm.registerAdapter(NaiveWSGIViewAdapter, (None, None),
+ IWSGIApplicationFactory)
+
diff --git a/repoze/bfg/tests/test_router.py b/repoze/bfg/tests/test_router.py
index c684e06b5..bfa72fe41 100644
--- a/repoze/bfg/tests/test_router.py
+++ b/repoze/bfg/tests/test_router.py
@@ -9,12 +9,24 @@ class RouterTests(unittest.TestCase, PlacelessSetup):
def tearDown(self):
PlacelessSetup.tearDown(self)
- def _registerFactory(self, app, name, *for_):
+ def _registerTraverserFactory(self, app, name, *for_):
+ import zope.component
+ gsm = zope.component.getGlobalSiteManager()
+ from repoze.bfg.interfaces import IPublishTraverserFactory
+ gsm.registerAdapter(app, for_, IPublishTraverserFactory, name)
+
+ def _registerViewFactory(self, app, name, *for_):
+ import zope.component
+ gsm = zope.component.getGlobalSiteManager()
+ from repoze.bfg.interfaces import IViewFactory
+ gsm.registerAdapter(app, for_, IViewFactory, name)
+
+ def _registerWSGIFactory(self, app, name, *for_):
import zope.component
gsm = zope.component.getGlobalSiteManager()
from repoze.bfg.interfaces import IWSGIApplicationFactory
gsm.registerAdapter(app, for_, IWSGIApplicationFactory, name)
-
+
def _getTargetClass(self):
from repoze.bfg.router import Router
return Router
@@ -33,151 +45,166 @@ class RouterTests(unittest.TestCase, PlacelessSetup):
environ.update(extras)
return environ
- def test_call_no_app_registered(self):
- statii = []
- headerii = []
- def rootpolicy(environ):
- return None
- def traversalpolicy(environ, root):
- return DummyContext(), 'foo', []
- def securitypolicy(environ, context, name):
- return None
- def start_response(status, headers):
- statii[:] = [status]
- headerii[:] = [headers]
+ def test_call_no_view_registered(self):
+ rootpolicy = make_rootpolicy(None)
environ = self._makeEnviron()
- router = self._makeOne(rootpolicy, traversalpolicy, securitypolicy)
+ context = DummyContext()
+ traversalfactory = make_traversal_factory(context, '', [])
+ self._registerTraverserFactory(traversalfactory, '', None, None)
+ router = self._makeOne(rootpolicy)
+ start_response = DummyStartResponse()
result = router(environ, start_response)
- headers = headerii[0]
+ headers = start_response.headers
self.assertEqual(len(headers), 2)
- status = statii[0]
+ status = start_response.status
self.assertEqual(status, '404 Not Found')
self.failUnless('http://localhost:8080' in result[0], result)
- def test_call_securitypolicy_denies(self):
- statii = []
- headerii = []
- def rootpolicy(environ):
- return None
- def traversalpolicy(environ, root):
- return DummyContext(), 'foo', []
- def securitypolicy(environ, context, name):
- from webob.exc import HTTPUnauthorized
- return HTTPUnauthorized()
- def start_response(status, headers):
- statii[:] = [status]
- headerii[:] = [headers]
- environ = self._makeEnviron()
- router = self._makeOne(rootpolicy, traversalpolicy, securitypolicy)
- result = router(environ, start_response)
- headers = headerii[0]
- self.assertEqual(len(headers), 2)
- status = statii[0]
- self.assertEqual(status, '401 Unauthorized')
-
- def test_call_app_registered_nonspecific_default_path(self):
- def rootpolicy(environ):
- return None
+ def test_call_view_registered_nonspecific_default_path(self):
+ rootpolicy = make_rootpolicy(None)
context = DummyContext()
- def traversalpolicy(environ, root):
- return context, '', []
- def start_response(status, headers):
- pass
- def securitypolicy(environ, context, name):
- return None
+ traversalfactory = make_traversal_factory(context, '', [])
+ response = DummyResponse()
+ viewfactory = make_view_factory(response)
+ wsgifactory = make_wsgi_factory('200 OK', (), ['Hello world'])
environ = self._makeEnviron()
- self._registerFactory(DummyWSGIApplicationFactory, '', None, None)
- router = self._makeOne(rootpolicy, traversalpolicy, securitypolicy)
+ self._registerTraverserFactory(traversalfactory, '', None, None)
+ self._registerViewFactory(viewfactory, '', None, None)
+ self._registerWSGIFactory(wsgifactory, '', None, None)
+ router = self._makeOne(rootpolicy)
+ start_response = DummyStartResponse()
result = router(environ, start_response)
- self.failUnless(result[0] is context)
- import webob
- self.failUnless(isinstance(result[1], webob.Request))
- self.assertEqual(environ['repoze.bfg.subpath'], [])
-
- def test_call_app_registered_nonspecific_nondefault_path_and_subpath(self):
- def rootpolicy(environ):
- return None
+ self.assertEqual(result, ['Hello world'])
+ self.assertEqual(start_response.headers, ())
+ self.assertEqual(start_response.status, '200 OK')
+ request = environ['request']
+ self.assertEqual(environ['request'].subpath, [])
+ self.assertEqual(environ['view'].context, context)
+
+ def test_call_view_registered_nonspecific_nondefault_path_and_subpath(self):
+ rootpolicy = make_rootpolicy(None)
context = DummyContext()
- def traversalpolicy(environ, root):
- return context, 'foo', ['bar', 'baz']
- def start_response(status, headers):
- pass
- def securitypolicy(environ, context, name):
- return None
+ traversalfactory = make_traversal_factory(context, 'foo', ['bar'])
+ response = DummyResponse()
+ viewfactory = make_view_factory(response)
+ wsgifactory = make_wsgi_factory('200 OK', (), ['Hello world'])
environ = self._makeEnviron()
- self._registerFactory(DummyWSGIApplicationFactory, 'foo', None, None)
- router = self._makeOne(rootpolicy, traversalpolicy, securitypolicy)
+ self._registerTraverserFactory(traversalfactory, '', None, None)
+ self._registerViewFactory(viewfactory, 'foo', None, None)
+ self._registerWSGIFactory(wsgifactory, '', None, None)
+ router = self._makeOne(rootpolicy)
+ start_response = DummyStartResponse()
result = router(environ, start_response)
- self.failUnless(result[0] is context)
- import webob
- self.failUnless(isinstance(result[1], webob.Request))
- self.assertEqual(environ['repoze.bfg.subpath'], ['bar', 'baz'])
-
- def test_call_app_registered_specific_success(self):
- def rootpolicy(environ):
- return None
- context = DummyContext()
+ self.assertEqual(result, ['Hello world'])
+ self.assertEqual(start_response.headers, ())
+ self.assertEqual(start_response.status, '200 OK')
+ request = environ['request']
+ self.assertEqual(environ['request'].subpath, ['bar'])
+ self.assertEqual(environ['view'].context, context)
+
+ def test_call_view_registered_specific_success(self):
+ rootpolicy = make_rootpolicy(None)
from zope.interface import Interface
from zope.interface import directlyProvides
class IContext(Interface):
pass
+ from repoze.bfg.interfaces import IRequest
+ context = DummyContext()
directlyProvides(context, IContext)
- def traversalpolicy(environ, root):
- return context, 'foo', ['bar', 'baz']
- def start_response(status, headers):
- pass
- def securitypolicy(environ, context, name):
- return None
+ traversalfactory = make_traversal_factory(context, '', [])
+ response = DummyResponse()
+ viewfactory = make_view_factory(response)
+ wsgifactory = make_wsgi_factory('200 OK', (), ['Hello world'])
environ = self._makeEnviron()
- from repoze.bfg.interfaces import IWebObRequest
- self._registerFactory(DummyWSGIApplicationFactory, 'foo', IContext,
- IWebObRequest)
- router = self._makeOne(rootpolicy, traversalpolicy, securitypolicy)
+ self._registerTraverserFactory(traversalfactory, '', None, None)
+ self._registerViewFactory(viewfactory, '', IContext, IRequest)
+ self._registerWSGIFactory(wsgifactory, '', None, None)
+ router = self._makeOne(rootpolicy)
+ start_response = DummyStartResponse()
result = router(environ, start_response)
- self.failUnless(result[0] is context)
- import webob
- self.failUnless(isinstance(result[1], webob.Request))
- self.assertEqual(environ['repoze.bfg.subpath'], ['bar', 'baz'])
+ self.assertEqual(result, ['Hello world'])
+ self.assertEqual(start_response.headers, ())
+ self.assertEqual(start_response.status, '200 OK')
+ request = environ['request']
+ self.assertEqual(environ['request'].subpath, [])
+ self.assertEqual(environ['view'].context, context)
- def test_call_app_registered_specific_fail(self):
- context = DummyContext()
+ def test_call_view_registered_specific_fail(self):
+ rootpolicy = make_rootpolicy(None)
from zope.interface import Interface
from zope.interface import directlyProvides
- class INotContext(Interface):
- pass
class IContext(Interface):
pass
+ class INotContext(Interface):
+ pass
+ from repoze.bfg.interfaces import IRequest
+ context = DummyContext()
directlyProvides(context, INotContext)
- statii = []
- headerii = []
- def rootpolicy(environ):
- return None
- def traversalpolicy(environ, root):
- return context, 'foo', []
- def start_response(status, headers):
- statii[:] = [status]
- headerii[:] = [headers]
- def securitypolicy(environ, context, name):
- return None
+ traversalfactory = make_traversal_factory(context, '', [''])
+ response = DummyResponse()
+ viewfactory = make_view_factory(response)
+ wsgifactory = make_wsgi_factory('200 OK', (), ['Hello world'])
environ = self._makeEnviron()
- from repoze.bfg.interfaces import IWebObRequest
- self._registerFactory(DummyWSGIApplicationFactory, 'foo', IContext,
- IWebObRequest)
- router = self._makeOne(rootpolicy, traversalpolicy, securitypolicy)
+ self._registerTraverserFactory(traversalfactory, '', None, None)
+ self._registerViewFactory(viewfactory, '', IContext, IRequest)
+ self._registerWSGIFactory(wsgifactory, '', None, None)
+ router = self._makeOne(rootpolicy)
+ start_response = DummyStartResponse()
result = router(environ, start_response)
- headers = headerii[0]
- self.assertEqual(len(headers), 2)
- status = statii[0]
- self.assertEqual(status, '404 Not Found')
- self.failUnless('http://localhost:8080' in result[0], result)
+ self.failUnless('404' in result[0])
+ self.assertEqual(start_response.status, '404 Not Found')
class DummyContext:
pass
-class DummyWSGIApplicationFactory:
- def __init__(self, context, request):
- self.context = context
- self.request = request
+def make_wsgi_factory(status, headers, app_iter):
+ class DummyWSGIApplicationFactory:
+ def __init__(self, view, request):
+ self.view = view
+ self.request = request
+
+ def __call__(self, environ, start_response):
+ environ['view'] = self.view
+ environ['request'] = self.request
+ start_response(status, headers)
+ return app_iter
+
+ return DummyWSGIApplicationFactory
+
+def make_view_factory(response):
+ class DummyViewFactory:
+ def __init__(self, context, request):
+ self.context = context
+ self.request = request
+
+ def __call__(self):
+ return response
+ return DummyViewFactory
- def __call__(self, environ, start_response):
- return self.context, self.request
+def make_traversal_factory(context, name, subpath):
+ class DummyTraversalFactory:
+ def __init__(self, root, request):
+ self.root = root
+ self.request = request
+
+ def __call__(self, path):
+ return context, name, subpath
+ return DummyTraversalFactory
+
+def make_rootpolicy(root):
+ def rootpolicy(environ):
+ return root
+ return rootpolicy
+
+class DummyStartResponse:
+ status = ()
+ headers = ()
+ def __call__(self, status, headers):
+ self.status = status
+ self.headers = headers
+
+class DummyResponse:
+ status = '200 OK'
+ headerlist = ()
+ app_iter = ()
+
diff --git a/repoze/bfg/tests/test_traversal.py b/repoze/bfg/tests/test_traversal.py
index bb0b98b79..115086b8b 100644
--- a/repoze/bfg/tests/test_traversal.py
+++ b/repoze/bfg/tests/test_traversal.py
@@ -36,62 +36,61 @@ class NaivePolicyTests(unittest.TestCase, PlacelessSetup):
PlacelessSetup.tearDown(self)
def _getTargetClass(self):
- from repoze.bfg.traversal import NaiveTraversalPolicy
- return NaiveTraversalPolicy
+ from repoze.bfg.traversal import NaivePublishTraverser
+ return NaivePublishTraverser
def _makeOne(self, *arg, **kw):
import zope.component
gsm = zope.component.getGlobalSiteManager()
- from repoze.bfg.interfaces import ITraverser
- gsm.registerAdapter(DummyTraverser, (None,), ITraverser, '')
klass = self._getTargetClass()
return klass(*arg, **kw)
- def test_class_conforms_to_ITraversalPolicy(self):
+ def test_class_conforms_to_IPublishTraverser(self):
from zope.interface.verify import verifyClass
- from repoze.bfg.interfaces import ITraversalPolicy
- verifyClass(ITraversalPolicy, self._getTargetClass())
+ from repoze.bfg.interfaces import IPublishTraverser
+ verifyClass(IPublishTraverser, self._getTargetClass())
- def test_instance_conforms_to_ITraversalPolicy(self):
+ def test_instance_conforms_to_IPublishTraverser(self):
from zope.interface.verify import verifyObject
- from repoze.bfg.interfaces import ITraversalPolicy
- verifyObject(ITraversalPolicy, self._makeOne())
+ from repoze.bfg.interfaces import IPublishTraverser
+ context = DummyContext()
+ request = DummyRequest()
+ verifyObject(IPublishTraverser, self._makeOne(context, request))
- def test_call_nonkeyerror_raises(self):
- policy = self._makeOne()
- environ = {'PATH_INFO':'/foo'}
- root = None
- self.assertRaises(TypeError, policy, environ, root)
+ def test_call_pathel_with_no_getitem(self):
+ request = DummyRequest()
+ policy = self._makeOne(None, request)
+ ctx, name, subpath = policy('/foo/bar')
+ self.assertEqual(ctx, None)
+ self.assertEqual(name, 'foo')
+ self.assertEqual(subpath, ['bar'])
def test_call_withconn_getitem_emptypath_nosubpath(self):
- policy = self._makeOne()
- context = DummyContext()
- environ = {'PATH_INFO':''}
- root = context
- ctx, name, subpath = policy(environ, root)
- self.assertEqual(context, ctx)
+ root = DummyContext()
+ request = DummyRequest()
+ policy = self._makeOne(root, request)
+ ctx, name, subpath = policy('')
+ self.assertEqual(ctx, root)
self.assertEqual(name, '')
self.assertEqual(subpath, [])
def test_call_withconn_getitem_withpath_nosubpath(self):
- policy = self._makeOne()
- context = DummyContext()
- context2 = DummyContext(context)
- environ = {'PATH_INFO':'/foo/bar'}
- root = context
- ctx, name, subpath = policy(environ, root)
- self.assertEqual(context, ctx)
+ foo = DummyContext()
+ root = DummyContext(foo)
+ request = DummyRequest()
+ policy = self._makeOne(root, request)
+ ctx, name, subpath = policy('/foo/bar')
+ self.assertEqual(ctx, foo)
self.assertEqual(name, 'bar')
self.assertEqual(subpath, [])
def test_call_withconn_getitem_withpath_withsubpath(self):
- policy = self._makeOne()
- context = DummyContext()
- context2 = DummyContext(context)
- environ = {'PATH_INFO':'/foo/bar/baz/buz'}
- root = context
- ctx, name, subpath = policy(environ, root)
- self.assertEqual(context, ctx)
+ foo = DummyContext()
+ request = DummyRequest()
+ root = DummyContext(foo)
+ policy = self._makeOne(root, request)
+ ctx, name, subpath = policy('/foo/bar/baz/buz')
+ self.assertEqual(ctx, foo)
self.assertEqual(name, 'bar')
self.assertEqual(subpath, ['baz', 'buz'])
@@ -103,6 +102,9 @@ class DummyContext:
if self.next is None:
raise KeyError, name
return self.next
+
+class DummyRequest:
+ pass
class DummyTraverser:
def __init__(self, context):
diff --git a/repoze/bfg/tests/test_wsgiadapter.py b/repoze/bfg/tests/test_wsgiadapter.py
new file mode 100644
index 000000000..71e2f5c81
--- /dev/null
+++ b/repoze/bfg/tests/test_wsgiadapter.py
@@ -0,0 +1,75 @@
+import unittest
+
+class NaiveWSGIAdapterTests(unittest.TestCase):
+ def _getTargetClass(self):
+ from repoze.bfg.wsgiadapter import NaiveWSGIViewAdapter
+ return NaiveWSGIViewAdapter
+
+ def _makeOne(self, *arg, **kw):
+ klass = self._getTargetClass()
+ return klass(*arg, **kw)
+
+ def test_view_takes_no_args(self):
+ response = DummyResponse()
+ response.app_iter = ['Hello world']
+ def view():
+ return response
+ request = DummyRequest()
+ adapter = self._makeOne(view, request)
+ environ = {}
+ start_response = DummyStartResponse()
+ result = adapter(environ, start_response)
+ self.assertEqual(result, ['Hello world'])
+ self.assertEqual(start_response.headers, ())
+ self.assertEqual(start_response.status, '200 OK')
+
+ def test_view_takes_pep_333_args(self):
+ response = DummyResponse()
+ response.app_iter = ['Hello world']
+ def view(environ, start_response):
+ response.environ = environ
+ response.start_response = start_response
+ return response
+ request = DummyRequest()
+ adapter = self._makeOne(view, request)
+ environ = {}
+ start_response = DummyStartResponse()
+ result = adapter(environ, start_response)
+ self.assertEqual(result, ['Hello world'])
+ self.assertEqual(start_response.headers, ())
+ self.assertEqual(start_response.status, '200 OK')
+ self.assertEqual(response.environ, environ)
+ self.assertEqual(response.start_response, start_response)
+
+ def test_view_takes_zopey_args(self):
+ request = DummyRequest()
+ response = DummyResponse()
+ response.app_iter = ['Hello world']
+ def view(request):
+ response.request = request
+ return response
+ adapter = self._makeOne(view, request)
+ environ = {}
+ start_response = DummyStartResponse()
+ result = adapter(environ, start_response)
+ self.assertEqual(result, ['Hello world'])
+ self.assertEqual(start_response.headers, ())
+ self.assertEqual(start_response.status, '200 OK')
+ self.assertEqual(response.request, request)
+
+class DummyRequest:
+ pass
+
+class DummyResponse:
+ status = '200 OK'
+ headerlist = ()
+ app_iter = ()
+
+class DummyStartResponse:
+ status = None
+ headers = None
+ def __call__(self, status, headers):
+ self.status = status
+ self.headers = headers
+
+
diff --git a/repoze/bfg/traversal.py b/repoze/bfg/traversal.py
index 1b9cfb332..9380b2a0a 100644
--- a/repoze/bfg/traversal.py
+++ b/repoze/bfg/traversal.py
@@ -1,9 +1,10 @@
import urllib
+from zope.interface import classProvides
from zope.interface import implements
-from repoze.bfg.interfaces import ITraversalPolicy
-from repoze.bfg.interfaces import ITraverser
+from repoze.bfg.interfaces import IPublishTraverser
+from repoze.bfg.interfaces import IPublishTraverserFactory
def split_path(path):
if path.startswith('/'):
@@ -21,27 +22,37 @@ def split_path(path):
clean.append(segment)
return clean
-class NaiveTraversalPolicy:
- implements(ITraversalPolicy)
+def step(ob, name, default):
+ if not hasattr(ob, '__getitem__'):
+ return default
+ try:
+ return ob[name]
+ except KeyError:
+ return default
- def __call__(self, environ, root):
- path = split_path(environ['PATH_INFO'])
-
- ob = root
+_marker = ()
+
+class NaivePublishTraverser:
+ classProvides(IPublishTraverserFactory)
+ implements(IPublishTraverser)
+ def __init__(self, root, request):
+ self.root = root
+ self.request = request
+
+ def __call__(self, path):
+ path = split_path(path)
+
+ ob = self.root
name = ''
while path:
- segment = pop(path)
- traverser = ITraverser(ob)
- next = traverser(environ, segment)
- if next is None:
- if path:
- name = pop(path)
+ segment = path.pop(0)
+ next = step(ob, segment, _marker)
+ if next is _marker:
+ name = segment
break
ob = next
return ob, name, path
-def pop(path):
- return path.pop(0)
diff --git a/repoze/bfg/wsgiadapter.py b/repoze/bfg/wsgiadapter.py
new file mode 100644
index 000000000..f76360c27
--- /dev/null
+++ b/repoze/bfg/wsgiadapter.py
@@ -0,0 +1,29 @@
+from zope.interface import implements
+from zope.interface import classProvides
+
+from repoze.bfg.interfaces import IWSGIApplicationFactory
+from repoze.bfg.interfaces import IWSGIApplication
+from repoze.bfg.mapply import mapply
+
+class NaiveWSGIViewAdapter:
+ classProvides(IWSGIApplicationFactory)
+ implements(IWSGIApplication)
+
+ def __init__(self, view, request):
+ self.view = view
+ self.request = request
+
+ def __call__(self, environ, start_response):
+ catch_response = []
+ def replace_start_response(status, headers):
+ catch_response[:] = (status, headers)
+ kwdict = {
+ 'request':self.request,
+ 'environ':environ,
+ 'start_response':start_response,
+ }
+ response = mapply(self.view, positional = (), keyword = kwdict)
+ if not catch_response:
+ catch_response = (response.status, response.headerlist)
+ start_response(*catch_response)
+ return response.app_iter