summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--CHANGES.txt113
-rw-r--r--docs/api/authentication.rst14
-rw-r--r--docs/api/authorization.rst11
-rw-r--r--docs/api/security.rst27
-rw-r--r--docs/api/traversal.rst3
-rw-r--r--docs/api/view.rst2
-rw-r--r--docs/conf.py4
-rw-r--r--docs/glossary.rst26
-rw-r--r--docs/index.rst4
-rw-r--r--docs/narr/hooks.rst71
-rw-r--r--docs/narr/security.rst175
-rw-r--r--docs/narr/traversal.rst14
-rw-r--r--docs/narr/urldispatch.rst14
-rw-r--r--docs/narr/views.rst24
-rw-r--r--repoze/bfg/authentication.py86
-rw-r--r--repoze/bfg/authorization.py112
-rw-r--r--repoze/bfg/interfaces.py56
-rw-r--r--repoze/bfg/registry.py41
-rw-r--r--repoze/bfg/router.py231
-rw-r--r--repoze/bfg/scifi.py257
-rw-r--r--repoze/bfg/secpols.py469
-rw-r--r--repoze/bfg/security.py621
-rw-r--r--repoze/bfg/testing.py85
-rw-r--r--repoze/bfg/tests/test_authentication.py176
-rw-r--r--repoze/bfg/tests/test_authorization.py179
-rw-r--r--repoze/bfg/tests/test_integration.py7
-rw-r--r--repoze/bfg/tests/test_registry.py64
-rw-r--r--repoze/bfg/tests/test_router.py433
-rw-r--r--repoze/bfg/tests/test_secpols.py766
-rw-r--r--repoze/bfg/tests/test_security.py890
-rw-r--r--repoze/bfg/tests/test_testing.py100
-rw-r--r--repoze/bfg/tests/test_threadlocal.py46
-rw-r--r--repoze/bfg/tests/test_view.py125
-rw-r--r--repoze/bfg/tests/test_wsgi.py4
-rw-r--r--repoze/bfg/threadlocal.py39
-rw-r--r--repoze/bfg/view.py58
-rw-r--r--repoze/bfg/wsgi.py2
-rw-r--r--repoze/bfg/zcml.py17
-rw-r--r--setup.py2
39 files changed, 3489 insertions, 1879 deletions
diff --git a/CHANGES.txt b/CHANGES.txt
index 40a810305..5891313fd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,3 +1,116 @@
+Next release
+============
+
+Features
+--------
+
+- New API functions named ``forget`` and ``remember`` are available in
+ the ``security`` module. The ``forget`` function returns headers
+ which will cause the currently authenticated user to be logged out
+ when set in a response. The ``remember`` function (when passed the
+ proper arguments) will return headers which will cause a principal
+ to be "logged in" when set in a response. See the Security API
+ chapter of the docs for more info.
+
+- New keyword arguments to the ``repoze.bfg.router.make_app`` call
+ have been added: ``authentication_policy`` and
+ ``authorization_policy``. These should, respectively, be an
+ implementation of an authentication policy (an object implementing
+ the ``repoze.bfg.interfaces.IAuthenticationPolicy`` interface) and
+ an implementation of an authorization policy (an object implementing
+ ``repoze.bfg.interfaces.IAuthorizationPolicy). Concrete
+ implementations of authentication policies exist in
+ ``repoze.bfg.authentication``. Concrete implementations of
+ authorization policies exist in ``repoze.bfg.authorization``.
+
+ Both ``authentication_policy`` and ``authorization_policy`` default
+ to ``None``.
+
+ If ``authentication_policy`` is ``None``, but
+ ``authorization_policy`` is *not* ``None``, then
+ ``authorization_policy`` is ignored (the ability to do authorization
+ depends on authentication).
+
+ If the ``authentication_policy`` argument is *not* ``None``, and the
+ ``authorization_policy`` argument *is* ``None``, the authorization
+ policy defaults to an authorization implementation that uses ACLs
+ (``repoze.bfg.authorization.ACLAuthorizationPolicy``).
+
+ .. note:: we no longer encourage configuration of "security
+ policies" using ZCML, as previously we did for
+ ``ISecurityPolicy``. This is because it's not uncommon to need to
+ configure settings for concrete authorization or authentication
+ policies using paste .ini parameters; the app entry point for your
+ applicatio is the natural place to do this.
+
+- Two new abstractions have been added in the way of adapters used by
+ the system: an ``IAuthorizationPolicy`` and an
+ ``IAuthenticationPolicy``. A combination of these (as registered by
+ the ``securitypolicy`` ZCML directive) take the place of the
+ ``ISecurityPolicy`` abstraction in previous releases of repoze.who.
+ The API functions in ``repoze.who.security`` (such as
+ ``authentication_userid``, ``effective_principals``,
+ ``has_permission``, and so on) have been changed to try to make use
+ of these new adapters. If you're using an older ``ISecurityPolicy``
+ adapter, the system will still work, but it will print deprecation
+ warnings when such a policy is used.
+
+- The way the (internal) IViewPermission utilities registered via ZCML
+ are invoked has changed. They are purely adapters now, returning a
+ boolean result, rather than returning a callable. You shouldn't have
+ been using these anyway. ;-)
+
+- New concrete implementations of IAuthenticationPolicy have been
+ added to the ``repoze.bfg.authentication`` module:
+ ``RepozeWho1AuthenticationPolicy`` which uses ``repoze.who``
+ identity to retrieve authentication data from and
+ ``RemoteUserAuthenticationPolicy``, which uses the ``REMOTE_USER``
+ value in the WSGI environment to retrieve authentication data.
+
+- A new concrete implementation of IAuthorizationPolicy has been added
+ to the ``repoze.bfg.authorization`` module:
+ ``ACLAuthorizationPolicy`` which uses ACL inheritance to do
+ authorization.
+
+- It is now possible to register a custom
+ ``repoze.bfg.interfaces.IForbiddenResponseFactory`` for a given
+ application. This feature replaces the
+ ``repoze.bfg.interfaces.IUnauthorizedAppFactory`` feature previously
+ described in the Hooks chapter. The IForbiddenResponseFactory will
+ be called when the framework detects an authorization failure; it
+ should accept a context object and a request object; it should
+ return an IResponse object (a webob response, basically). Read the
+ below point for more info and see the Hooks narrative chapter of the
+ BFG docs for more info.
+
+Backwards Incompatibilities
+---------------------------
+
+- Custom NotFound and Forbidden (nee' Unauthorized) WSGI applications
+ (registered as a utility for INotFoundAppFactory and
+ IUnauthorizedAppFactory) could rely on an environment key named
+ ``message`` describing the circumstance of the response. This key
+ has been renamed to ``repoze.bfg.message`` (as per the WSGI spec,
+ which requires environment extensions to contain dots).
+
+Deprecations
+------------
+
+- The ``repoze.bfg.interfaces.IUnauthorizedAppFactory`` interface has
+ been deprecated in favor of using the new
+ ``repoze.bfg.interfaces.IForbiddenResponseFactory`` mechanism.
+
+- The ``view_execution_permitted`` API should now be imported from the
+ ``repoze.bfg.security`` module instead of the ``repoze.bfg.view``
+ module.
+
+- The ``authenticated_userid`` and ``effective_principals`` APIs in
+ ``repoze.bfg.security`` used to only take a single argument
+ (request). They now accept two arguments (``context`` and
+ ``request``). Calling them with a single argument is still
+ supported but issues a deprecation warning.
+
+
0.8.1 (2009-05-21)
==================
diff --git a/docs/api/authentication.rst b/docs/api/authentication.rst
new file mode 100644
index 000000000..f51eeb302
--- /dev/null
+++ b/docs/api/authentication.rst
@@ -0,0 +1,14 @@
+:mod:`repoze.bfg.authentication`
+================================
+
+.. automodule:: repoze.bfg.authentication
+
+.. _authentication_policies_api_section:
+
+Authentication Policies
+~~~~~~~~~~~~~~~~~~~~~~~
+
+.. autoclass:: RepozeWho1AuthenticationPolicy
+
+.. autoclass:: RemoteUserAuthenticationPolicy
+
diff --git a/docs/api/authorization.rst b/docs/api/authorization.rst
new file mode 100644
index 000000000..e44b1e293
--- /dev/null
+++ b/docs/api/authorization.rst
@@ -0,0 +1,11 @@
+:mod:`repoze.bfg.authorization`
+===============================
+
+.. automodule:: repoze.bfg.authorization
+
+.. _authorization_policies_api_section:
+
+Authorization Policies
+~~~~~~~~~~~~~~~~~~~~~~
+
+.. autoclass:: ACLAuthorizationPolicy
diff --git a/docs/api/security.rst b/docs/api/security.rst
index 5990f1809..77b60f5d0 100644
--- a/docs/api/security.rst
+++ b/docs/api/security.rst
@@ -8,14 +8,21 @@
API Functions
~~~~~~~~~~~~~
-.. autofunction:: authenticated_userid
+.. autofunction:: authenticated_userid(context, request)
-.. autofunction:: effective_principals
+.. autofunction:: effective_principals(context, request)
.. autofunction:: has_permission
.. autofunction:: principals_allowed_by_permission
+.. autofunction:: forget
+
+.. autofunction:: remember
+
+.. autofunction:: view_execution_permitted
+
+
Constants
~~~~~~~~~
@@ -29,8 +36,8 @@ Constants
The special principal id named 'Authenticated'. This principal id
is granted to all requests which contain any other non-Everyone
- principal id (according to the security policy). Its actual value
- is the string 'system.Authenticated'.
+ principal id (according to the :term:`authentication policy`).
+ Its actual value is the string 'system.Authenticated'.
.. attribute:: ALL_PERMISSIONS
@@ -73,15 +80,3 @@ Return Values
.. autoclass:: Allowed
:members:
-.. _security_policies_api_section:
-
-Security Policies
-~~~~~~~~~~~~~~~~~
-
-.. autofunction:: WhoACLSecurityPolicy
-
-.. autofunction:: WhoInheritingACLSecurityPolicy
-
-.. autofunction:: RemoteUserACLSecurityPolicy
-
-.. autofunction:: RemoteUserInheritingACLSecurityPolicy
diff --git a/docs/api/traversal.rst b/docs/api/traversal.rst
index 2f57fd3aa..4b7ac41f5 100644
--- a/docs/api/traversal.rst
+++ b/docs/api/traversal.rst
@@ -21,9 +21,6 @@
.. autofunction:: traverse
-.. note:: A function named ``model_url`` used to be present in this
- module. It was moved to :ref:`url_module` in version 0.6.1.
-
Secondary APIs
~~~~~~~~~~~~~~
diff --git a/docs/api/view.rst b/docs/api/view.rst
index 3b34b7e22..40c69d24b 100644
--- a/docs/api/view.rst
+++ b/docs/api/view.rst
@@ -13,8 +13,6 @@
.. autofunction:: is_response
- .. autofunction:: view_execution_permitted
-
.. autoclass:: bfg_view
:members:
diff --git a/docs/conf.py b/docs/conf.py
index ae594c7d3..bfb096b3b 100644
--- a/docs/conf.py
+++ b/docs/conf.py
@@ -51,9 +51,9 @@ copyright = '2008, Agendaless Consulting'
# other places throughout the built documents.
#
# The short X.Y version.
-version = '0.8.1'
+version = '0.9dev'
# The full version, including alpha/beta/rc tags.
-release = '0.8.1'
+release = '0.9dev'
# There are two options for replacing |today|: either, you set today to some
# non-false value, then it is used:
diff --git a/docs/glossary.rst b/docs/glossary.rst
index 92e2264f9..e6efcc565 100644
--- a/docs/glossary.rst
+++ b/docs/glossary.rst
@@ -141,14 +141,16 @@ Glossary
perfom authentication: it leaves it up to an upstream component
such as :term:`repoze.who`. :mod:`repoze.bfg` uses the
:term:`authentication` data supplied by the upstream component as
- one input during :term:`authorization`.
+ one input during :term:`authorization`. Authentication in
+ :mod:`repoze.bfg` is performed via an :term:`authentication
+ policy`.
Authorization
The act of determining whether a user can perform a specific
action. In bfg terms, this means determining whether, for a given
context, any :term:`principal` (or principals) associated with the
request have the requisite :term:`permission` to allow the request
to continue. Authorization in :mod:`repoze.bfg` is performed via
- its :term:`security policy`.
+ its :term:`authorization policy`.
Principal
A *principal* is a string or unicode object representing a user or
a user's membership in a group. It is provided by the
@@ -158,14 +160,16 @@ Glossary
bar", the request might have information attached to it that would
indictate that Bob was represented by three principals: "bob",
"group foo" and "group bar".
- Security Policy
- A security policy in :mod:`repoze.bfg` terms is a bit of code
- which accepts a request, the :term:`ACL` associated with a
- context, and the :term:`permission` associated with a particular
- view, and subsequently determines whether or not the principals
- associated with the request can perform the action associated with
- the permission based on the ACL found on the :term:`context` (or
- any of its parents).
+ Authorization Policy
+ An authorization policy in :mod:`repoze.bfg` terms is a bit of
+ code which has an API which determines whether or not the
+ principals associated with the request can perform an action
+ associated with a permission, based on the information found on the
+ :term:`context`.
+ Authentication Policy
+ An authentication policy in :mod:`repoze.bfg` terms is a bit of
+ code which has an API which determines the current
+ :term:`principal` (or principals) associated with a request.
WSGI
`Web Server Gateway Interface <http://wsgi.org/>`_. This is a
Python standard for connecting web applications to web servers,
@@ -274,7 +278,7 @@ Glossary
object. In :mod:`repoze.bfg`, an interface may be attached to an
model object or a request object in order to identify that the
object is "of a type". Interfaces are used internally by
- :mod:`repoze.bfg` to perform view lookups and security policy
+ :mod:`repoze.bfg` to perform view lookups and other policy
lookups. Interfaces are exposed to application programmers by the
``view`` ZCML directive or the corresponding ``bfg_view``
decorator in the form of both the ``for`` attribute and the
diff --git a/docs/index.rst b/docs/index.rst
index 20c564396..d86cd895e 100644
--- a/docs/index.rst
+++ b/docs/index.rst
@@ -50,15 +50,17 @@ Per-module :mod:`repoze.bfg` API documentation.
.. toctree::
:maxdepth: 2
+ api/authentication
+ api/authorization
api/events
api/interfaces
+ api/location
api/push
api/router
api/security
api/template
api/testing
api/traversal
- api/location
api/url
api/view
api/wsgi
diff --git a/docs/narr/hooks.rst b/docs/narr/hooks.rst
index 21906e466..657ad8a67 100644
--- a/docs/narr/hooks.rst
+++ b/docs/narr/hooks.rst
@@ -88,7 +88,7 @@ an object that implements any particular interface; it simply needs
have a ``status`` attribute, a ``headerlist`` attribute, and and
``app_iter`` attribute.
-Changing the NotFound application
+Changing the NotFound Application
---------------------------------
When :mod:`repoze.bfg` can't map a URL to code, it creates and invokes
@@ -119,54 +119,55 @@ sample code that implements a minimal NotFound application factory:
.. note:: When a NotFound application factory is invoked, it is passed
the WSGI environ and the WSGI ``start_response`` handler by
:mod:`repoze.bfg`. Within the WSGI environ will be a key named
- ``message`` that has a value explaining why the not found error was
- raised. This error will be different when the ``debug_notfound``
- environment setting is true than it is when it is false.
+ ``repoze.bfg.message`` that has a value explaining why the not
+ found error was raised. This error will be different when the
+ ``debug_notfound`` environment setting is true than it is when it
+ is false.
-Changing the Unauthorized application
--------------------------------------
+Changing the Forbidden Response
+-------------------------------
When :mod:`repoze.bfg` can't authorize execution of a view based on
-the security policy in use, it creates and invokes an Unauthorized
-WSGI application. The application it invokes can be customized by
-placing something like the following ZCML in your ``configure.zcml``
-file.
+the authorization policy in use, it invokes a "forbidden response
+factory". Usually this forbidden response factory is a default 401
+response, but it can be overridden as necessary by placing something
+like the following ZCML in your ``configure.zcml`` file.
.. code-block:: xml
:linenos:
- <utility provides="repoze.bfg.interfaces.IUnauthorizedAppFactory"
- component="helloworld.factories.unauthorized_app_factory"/>
+ <utility provides="repoze.bfg.interfaces.IForbiddenResponseFactory"
+ component="helloworld.factories.forbidden_response_factory"/>
-Replace ``helloworld.factories.unauthorized_app_factory`` with the
-Python dotted name to the request factory you want to use. Here's
-some sample code that implements a minimal Unauthorized application
-factory:
+Replace ``helloworld.factories.forbidden_app_factory`` with the Python
+dotted name to the forbidden response factory you want to use. The
+response factory must accept two parameters: ``context`` and
+``request``. The ``context`` is the context found by the router when
+the view invocation was denied. The ``request`` is the current
+:term:`request` representing the denied action. Here's some sample
+code that implements a minimal forbidden response factory:
.. code-block:: python
- from webob.exc import HTTPUnauthorized
+ from repoze.bfg.chameleon_zpt import render_template_to_response
- class MyUnauthorized(HTTPUnauthorized):
- pass
+ def forbidden_response_factory(context, request):
+ return render_template_to_response('templates/login_form.pt')
- def notfound_app_factory():
- return MyUnauthorized
+.. note:: When an forbidden response factory is invoked, it is passed
+ the request as the second argument. An attribute of the request is
+ ``environ``, which is the WSGI environment. Within the WSGI
+ environ will be a key named ``repoze.bfg.message`` that has a value
+ explaining why the current view invocation was forbidden. This
+ error will be different when the ``debug_authorization``
+ environment setting is true than it is when it is false.
-.. note:: When an Unauthorized application factory is invoked, it is
- passed the WSGI environ and the WSGI ``start_response`` handler by
- :mod:`repoze.bfg`. Within the WSGI environ will be a key named
- ``message`` that has a value explaining why the action was not
- authorized. This error will be different when the
- ``debug_authorization`` environment setting is true than it is when
- it is false.
-
-.. note:: You can influence the status code of Unauthorized responses
- by using an alterate unauthorized application factory. For
- example, you may return an unauthorized application with a ``403
- Forbidden`` status code, rather than use the default unauthorized
- application factory, which sends a response with a ``401
- Unauthorized`` status code.
+.. warning:: the default forbidden application factory sends a
+ response with a ``401 Unauthorized`` status code for backwards
+ compatibility reasons. You can influence the status code of
+ Forbidden responses by using an alterate forbidden application
+ factory. For example, it would make sense to return an forbidden
+ application with a ``403 Forbidden`` status code.
Changing the Default Routes Context Factory
-------------------------------------------
diff --git a/docs/narr/security.rst b/docs/narr/security.rst
index 3de7f9140..365f19010 100644
--- a/docs/narr/security.rst
+++ b/docs/narr/security.rst
@@ -9,53 +9,64 @@ from being invoked when the user represented by credentials in the
:term:`request` does not have an appropriate level of access in a
specific context.
-Authorization is enabled by adding configuration to your
-``configure.zcml`` which specifies a :term:`security policy`.
+Authorization is enabled by modifying your application's invocation of
+``repoze.bfg.router.make_app``, often located in the ``run.py`` module
+of a :mod:`repoze.bfg` application.
-Enabling a Security Policy
---------------------------
+Enabling an Authorization Policy
+--------------------------------
-By default, :mod:`repoze.bfg` enables no security policy. All views
-are accessible by completely anonymous users.
+By default, :mod:`repoze.bfg` enables no authorization policy. All
+views are accessible by completely anonymous users.
-However, if you add the following bit of code to your application's
-``configure.zcml``, you will enable a security policy:
+However, if you change the call to ``repoze.bfg.router.make_app``
+(usually found within the ``run.py`` module in your application), you
+will enable an authorization policy.
-.. code-block:: xml
+You must enable a a :term:`authentication policy` in order to enable
+an authorization policy.
+
+For example, to enable a policy which compares the ``REMOTE_USER``
+variable passed in the request's environment (as the sole
+:term:`principal`) against the principals present in any :term:`ACL`
+found in model data when attempting to call some :term:`view`, modify
+your ``run.py`` to look something like this:
+
+.. code-block:: python
:linenos:
- <utility
- provides="repoze.bfg.interfaces.ISecurityPolicy"
- factory="repoze.bfg.security.RemoteUserInheritingACLSecurityPolicy"
- />
-
-The above inscrutable stanza enables the
-``RemoteUserInheritingACLSecurityPolicy`` to be in effect for every
-request to your application. The
-``RemoteUserInheritingACLSecurityPolicy`` is a policy which compares
-the ``REMOTE_USER`` variable passed in the request's environment (as
-the sole :term:`principal`) against the principals present in any
-:term:`ACL` found in model data when attempting to call some
-:term:`view`. The policy either allows the view that the permission
-was declared for to be called, or returns a ``401 Unathorized``
-response code to the upstream WSGI server.
-
-.. note:: Another "inheriting" security policy also exists:
- ``WhoInheritingACLSecurityPolicy``. This policy uses principal
- information found in the ``repoze.who.identity`` value set into the
- WSGI environment by the :term:`repoze.who` middleware rather than
- ``REMOTE_USER`` information. This policy only works properly when
- :term:`repoze.who` middleware is present in the WSGI pipeline.
-
-.. note:: "non-inheriting" security policy variants of the
- (``WhoACLSecurityPolicy`` and ``RemoteUserACLSecurityPolicy``) also
- exist. These policies use the *first* ACL found as the canonical
- ACL; they do not continue searching up the context lineage to find
- "inherited" ACLs. It is recommended that you use the inheriting
- variants unless you need this feature.
-
-.. note:: See :ref:`security_policies_api_section` for more
- information about the features of the default security policies.
+ from repoze.bfg.router import make_app
+ from repoze.bfg.authentication import RemoteUserAuthenticationPolicy
+
+ def app(global_config, **kw):
+ """ This function returns a repoze.bfg.router.Router object. It
+ is usually called by the PasteDeploy framework during ``paster
+ serve``"""
+ # paster app config callback
+ from myproject.models import get_root
+ import myproject
+ policy = RemoteUserAuthenticationPolicy()
+ return make_app(get_root, myproject, authentication_policy=policy,
+ options=kw)
+
+This injects an instance of the
+``repoze.bfg.authentication.RemoteUserAuthenticationPolicy`` as the
+:term:`authentication policy`. It is possible to use a different
+authentication policy. :mod:`repoze.bfg` ships with a few prechewed
+authentication policies that should prove useful (see
+:ref:`authentication_policies_api_section`). It is also possible to
+construct your own authentication policy. Any instance which
+implements the interface defined in
+``repoze.bfg.interfaces.IAuthenticationPolicy`` can be used.
+
+It's common but it is also possible to change the default
+:term:`authorization policy` (to use some other persistent
+authorization mechanism other than ACLs). To do so, pass an object
+which implements the ``repoze.bfg.interfaces.IAuthorizationPolicy``)
+to ``make_app`` as the ``authorization_policy`` value.
+:mod:`repoze.who` ships with only one. See
+:ref:`authorization_policies_api_section` for the details of the ACL
+authorization policy which is the default
Protecting Views with Permissions
---------------------------------
@@ -91,9 +102,9 @@ your project's package
""" Add blog entry code goes here """
pass
-If a security policy is in place when this view is found during normal
-application operations, the user will need to possess the ``add``
-permission against the context to be able to invoke the
+If an authorization policy is in place when this view is found during
+normal application operations, the user will need to possess the
+``add`` permission against the context to be able to invoke the
``blog_entry_add_view`` view.
Permission names are just strings. They hold no special significance
@@ -192,18 +203,18 @@ ACE, as below.
]
A principal is usually a user id, however it also may be a group id if
-your authentication system provides group information and the security
-policy is written to respect them. The
-``RemoteUserInheritingACLSecurityPolicy`` does not respect group
-information, but other security policies that come with
-:mod:`repoze.bfg` do (see the :mod:`repoze.bfg.security` API docs for
-more info).
+your authentication system provides group information and the
+effective :term:`authentication policy` policy is written to respect
+group information. The ``RepozeWho1AuthenicationPolicy``
+authentication policy that comes with :mod:`repoze.bfg` respects group
+information (see the :mod:`repoze.bfg.security` API docs for more
+info on authentication policies).
Each tuple within an ACL structure is known as a :term:`ACE`, which
stands for "access control entry". For example, in the above ACL,
``(Allow, Everyone, 'view')`` is an ACE. Each ACE in an ACL is
-processed by a security policy *in the order dictated by the ACL*. So
-if you have an ACL like this:
+processed by an authorization policy *in the order dictated by the
+ACL*. So if you have an ACL like this:
.. code-block:: python
:linenos:
@@ -217,9 +228,9 @@ if you have an ACL like this:
(Deny, Everyone, 'view'),
]
-The security policy will *allow* everyone the view permission, even
-though later in the ACL you have an ACE that denies everyone the view
-permission. On the other hand, if you have an ACL like this:
+The authorization policy will *allow* everyone the view permission,
+even though later in the ACL you have an ACE that denies everyone the
+view permission. On the other hand, if you have an ACL like this:
.. code-block:: python
:linenos:
@@ -233,7 +244,7 @@ permission. On the other hand, if you have an ACL like this:
(Allow, Everyone, 'view'),
]
-The security policy will deny Everyone the view permission, even
+The authorization policy will deny Everyone the view permission, even
though later in the ACL is an ACE that allows everyone.
Special Principal Names
@@ -271,8 +282,8 @@ module. These can be imported for use in ACLs.
ACL like so: ``(Allow, 'fred', ALL_PERMISSIONS)``. The
``ALL_PERMISSIONS`` object is actually a standin object that has a
``__contains__`` method that always returns True, which, for all
- known security policies, has the effect of indicating that a given
- principal "has" any permission asked for by the system.
+ known authorization policies, has the effect of indicating that a
+ given principal "has" any permission asked for by the system.
Special ACEs
------------
@@ -286,11 +297,11 @@ following:
(Deny, Everyone, ALL_PERMISSIONS)
This ACE is often used as the *last* ACE of an ACL to explicitly cause
-inheriting security policies to "stop looking up the traversal tree"
-(effectively breaking any inheritance). For example, an ACL which
-allows *only* ``fred`` the view permission in a particular traversal
-context despite what inherited ACLs may say when an inheriting
-security policy is in effect might look like so:
+inheriting authorization policies to "stop looking up the traversal
+tree" (effectively breaking any inheritance). For example, an ACL
+which allows *only* ``fred`` the view permission in a particular
+traversal context despite what inherited ACLs may say when the default
+authorization policy is in effect might look like so:
.. code-block:: python
:linenos:
@@ -304,39 +315,11 @@ security policy is in effect might look like so:
ACL Inheritance
---------------
-While any security policy is in place, if a model object does not have
-an ACL when it is the context, its *parent* is consulted for an ACL.
-If that object does not have an ACL, *its* parent is consulted for an
-ACL, ad infinitum, until we've reached the root and there are no more
-parents left.
-
-With *non-inheriting* security policy variants
-(e.g. ``WhoACLSecurityPolicy`` and ``RemoteUserACLSecurityPolicy``),
-the *first* ACL found by the security policy will be used as the
-effective ACL. No combination of ACLs found during traversal or
-backtracking is done.
-
-With *inheriting* security policy variants
-(e.g. ``WhoInheritingACLSecurityPolicy`` and
-``RemoteUserInheritingACLSecurityPolicy``), *all* ACLs in the
-context's :term:`lineage` are consulted when determining whether
-access is allowed or denied.
-
-:ref:`security_policies_api_section` for more information about the
-features of the default security policies and the difference between
-the inheriting and non-inheriting variants.
-
-.. note:: It is recommended that you use the inheriting variant of a
- security policy. Inheriting variants of security policies make it
- possible for you to form a security strategy based on context ACL
- "inheritance" rather than needing to keep all information about an
- object's security state in a single ACL attached to that object.
- It's much easier to code applications that dynamically change ACLs
- if ACL inheritance is used. In reality, the non-inheriting
- security policy variants exist only for backwards compatibility
- with applications that used them in versions of :mod:`repoze.bfg`
- before 0.8. If this backwards compatibility was not required, the
- non-inheriting variants probably just wouldn't exist.
+While the default :term:`authorization policy` is in place, if a model
+object does not have an ACL when it is the context, its *parent* is
+consulted for an ACL. If that object does not have an ACL, *its*
+parent is consulted for an ACL, ad infinitum, until we've reached the
+root and there are no more parents left.
Location-Awareness
------------------
diff --git a/docs/narr/traversal.rst b/docs/narr/traversal.rst
index 467916779..8eba623e4 100644
--- a/docs/narr/traversal.rst
+++ b/docs/narr/traversal.rst
@@ -135,13 +135,13 @@ code to execute:
for the name ``b``, the router deems that the context is "object
``a``", the view name is ``b`` and the subpath is ``['c']``.
-#. If a :term:`security policy` is configured, the router performs a
- permission lookup. If a permission declaration is found for the
- view name and context implied by the current request, the security
- policy is consulted to see if the "current user" (also determined
- by the security policy) can perform the action. If he can,
- processing continues. If he cannot, an ``HTTPUnauthorized`` error
- is raised.
+#. If a :term:`authentication policy` is configured, the router
+ performs a permission lookup. If a permission declaration is
+ found for the view name and context implied by the current
+ request, an :term:`authorization policy` is consulted to see if
+ the "current user" (al determined by the the authentication
+ policy) can perform the action. If he can, processing continues.
+ If he cannot, an ``HTTPUnauthorized`` error is raised.
#. Armed with the context, the view name, and the subpath, the router
performs a view lookup. It attemtps to look up a view from the
diff --git a/docs/narr/urldispatch.rst b/docs/narr/urldispatch.rst
index c27165731..d43a344f2 100644
--- a/docs/narr/urldispatch.rst
+++ b/docs/narr/urldispatch.rst
@@ -420,14 +420,14 @@ Using :mod:`repoze.bfg` Security With URL Dispatch
--------------------------------------------------
:mod:`repoze.bfg` provides its own security framework which consults a
-:term:`security policy` before allowing any application code to be
-called. This framework operates in terms of ACLs (Access Control
+:term:`authorization policy` before allowing any application code to
+be called. This framework operates in terms of ACLs (Access Control
Lists, see :ref:`security_chapter` for more information about the
-:mod:`repoze.bfg` security subsystem). A common thing to want to do
-is to attach an ``__acl__`` to the context object dynamically for
-declarative security purposes. You can use the ``factory``
-argument that points at a context factory which attaches a custom
-``__acl__`` to an object at its creation time.
+:mod:`repoze.bfg` authorization subsystem). A common thing to want to
+do is to attach an ``__acl__`` to the context object dynamically for
+declarative security purposes. You can use the ``factory`` argument
+that points at a context factory which attaches a custom ``__acl__``
+to an object at its creation time.
Such a ``factory`` might look like so:
diff --git a/docs/narr/views.rst b/docs/narr/views.rst
index ac5a8383f..9e9c55236 100644
--- a/docs/narr/views.rst
+++ b/docs/narr/views.rst
@@ -546,12 +546,12 @@ will be called.
View Security
-------------
-If a :term:`security policy` is active, any :term:`permission`
-attached to a ``view`` declaration will be consulted to ensure
-that the currently authenticated user possesses that permission
-against the context before the view function is actually called.
-Here's an example of specifying a permission in a ``view``
-declaration:
+If a :term:`authentication policy` (and a :term:`authorization
+policy`) is active, any :term:`permission` attached to a ``view``
+declaration will be consulted to ensure that the currently
+authenticated user possesses that permission against the context
+before the view function is actually called. Here's an example of
+specifying a permission in a ``view`` declaration:
.. code-block:: xml
:linenos:
@@ -563,16 +563,16 @@ declaration:
permission="add"
/>
-When a security policy is enabled, this view will be protected with
-the ``add`` permission. The view will *not be called* if the user
-does not possess the ``add`` permission relative to the current
-:term:`context` and a security policy is enabled. Instead an HTTP
-``Unauthorized`` status will be returned to the client.
+When an authentication policy is enabled, this view will be protected
+with the ``add`` permission. The view will *not be called* if the
+user does not possess the ``add`` permission relative to the current
+:term:`context` and an authorization policy is enabled. Instead an
+HTTP ``Unauthorized`` status will be returned to the client.
.. note::
See the :ref:`security_chapter` chapter to find out how to turn on
- a security policy.
+ an authentication policy.
.. note::
diff --git a/repoze/bfg/authentication.py b/repoze/bfg/authentication.py
new file mode 100644
index 000000000..487a5e6a8
--- /dev/null
+++ b/repoze/bfg/authentication.py
@@ -0,0 +1,86 @@
+from zope.interface import implements
+from repoze.bfg.interfaces import IAuthenticationPolicy
+from repoze.bfg.security import Everyone
+from repoze.bfg.security import Authenticated
+
+class RepozeWho1AuthenticationPolicy(object):
+ """ A BFG authentication policy which obtains data from the
+ repoze.who 1.X WSGI API """
+ implements(IAuthenticationPolicy)
+ identifier_name = 'auth_tkt'
+
+ def _get_identity(self, request):
+ return request.environ.get('repoze.who.identity')
+
+ def _get_identifier(self, request):
+ plugins = request.environ.get('repoze.who.plugins')
+ if plugins is None:
+ return None
+ identifier = plugins[self.identifier_name]
+ return identifier
+
+ def authenticated_userid(self, context, request):
+ identity = self._get_identity(request)
+ if identity is None:
+ return None
+ return identity['repoze.who.userid']
+
+ def effective_principals(self, context, request):
+ effective_principals = [Everyone]
+ identity = self._get_identity(request)
+ if identity is None:
+ return effective_principals
+
+ effective_principals.append(Authenticated)
+ userid = identity['repoze.who.userid']
+ groups = identity.get('groups', [])
+ effective_principals.append(userid)
+ effective_principals.extend(groups)
+
+ return effective_principals
+
+ def remember(self, context, request, principal, **kw):
+ identifier = self._get_identifier(request)
+ if identifier is None:
+ return []
+ environ = request.environ
+ identity = {'repoze.who.userid':principal}
+ return identifier.remember(environ, identity)
+
+ def forget(self, context, request):
+ identifier = self._get_identifier(request)
+ if identifier is None:
+ return []
+ identity = self._get_identity(request)
+ return identifier.forget(request.environ, identity)
+
+class RemoteUserAuthenticationPolicy(object):
+ """ A BFG authentication policy which obtains data from the
+ REMOTE_USER WSGI envvar """
+ implements(IAuthenticationPolicy)
+
+ def _get_identity(self, request):
+ return request.environ.get('REMOTE_USER')
+
+ def authenticated_userid(self, context, request):
+ identity = self._get_identity(request)
+ if identity is None:
+ return None
+ return identity
+
+ def effective_principals(self, context, request):
+ effective_principals = [Everyone]
+ identity = self._get_identity(request)
+ if identity is None:
+ return effective_principals
+
+ effective_principals.append(Authenticated)
+ effective_principals.append(identity)
+
+ return effective_principals
+
+ def remember(self, context, request, principal, **kw):
+ return []
+
+ def forget(self, context, request):
+ return []
diff --git a/repoze/bfg/authorization.py b/repoze/bfg/authorization.py
new file mode 100644
index 000000000..e131e6a21
--- /dev/null
+++ b/repoze/bfg/authorization.py
@@ -0,0 +1,112 @@
+from zope.interface import implements
+
+from repoze.bfg.interfaces import IAuthorizationPolicy
+from repoze.bfg.location import lineage
+from repoze.bfg.security import Allow
+from repoze.bfg.security import Deny
+from repoze.bfg.security import ACLAllowed
+from repoze.bfg.security import ACLDenied
+from repoze.bfg.security import Everyone
+
+class ACLAuthorizationPolicy(object):
+ """ An authorization policy which uses ACLs in the following ways:
+
+ - When checking whether a user is permitted (via the ``permits``
+ method), the security policy consults the ``context`` for an ACL
+ first. If no ACL exists on the context, or one does exist but
+ the ACL does not explicitly allow or deny access for any of the
+ effective principals, consult the context's parent ACL, and so
+ on, until the lineage is exhausted or we determine that the
+ policy permits or denies.
+
+ During this processing, if any ``Deny`` ACE is found matching
+ any principal in ``principals``, stop processing by returning an
+ ``ACLDenied`` (equals False) immediately. If any ``Allow`` ACE
+ is found matching any principal, stop processing by returning an
+ ``ACLAllowed`` (equals True) immediately. If we exhaust the
+ context's lineage, and no ACE has explicitly permitted or denied
+ access, return an ``ACLDenied``. This differs from the
+ non-inheriting security policy (the ``ACLSecurityPolicy``) by
+ virtue of the fact that it does not stop looking for ACLs in the
+ object lineage after it finds the first one.
+
+ - When computing principals allowed by a permission via the
+ ``principals_allowed_by_permission`` method, we compute the set
+ of principals that are explicitly granted the ``permission`` in
+ the provided ``context``. We do this by walking 'up' the object
+ graph *from the root* to the context. During this walking
+ process, if we find an explicit ``Allow`` ACE for a principal
+ that matches the ``permission``, the principal is included in
+ the allow list. However, if later in the walking process that
+ user is mentioned in any ``Deny`` ACE for the permission, the
+ user is removed from the allow list. If a ``Deny`` to the
+ principal ``Everyone`` is encountered during the walking process
+ that matches the ``permission``, the allow list is cleared for
+ all principals encountered in previous ACLs. The walking
+ process ends after we've processed the any ACL directly attached
+ to ``context``; a set of principals is returned.
+ """
+
+ implements(IAuthorizationPolicy)
+
+ def permits(self, context, principals, permission):
+ """ Return ``ACLAllowed`` if the policy permits access,
+ ``ACLDenied`` if not. """
+
+ for location in lineage(context):
+ try:
+ acl = location.__acl__
+ except AttributeError:
+ continue
+
+ for ace in acl:
+ ace_action, ace_principal, ace_permissions = ace
+ if ace_principal in principals:
+ if not hasattr(ace_permissions, '__iter__'):
+ ace_permissions = [ace_permissions]
+ if permission in ace_permissions:
+ if ace_action == Allow:
+ return ACLAllowed(ace, acl, permission,
+ principals, location)
+ else:
+ return ACLDenied(ace, acl, permission,
+ principals, location)
+
+ # default deny if no ACL in lineage at all
+ return ACLDenied(None, None, permission, principals, context)
+
+ def principals_allowed_by_permission(self, context, permission):
+ """ Return the set of principals explicitly granted the
+ permission named ``permission`` according to the ACL directly
+ attached to the context context as well as inherited ACLs. """
+ allowed = set()
+
+ for location in reversed(list(lineage(context))):
+ # NB: we're walking *up* the object graph from the root
+ try:
+ acl = location.__acl__
+ except AttributeError:
+ continue
+
+ allowed_here = set()
+ denied_here = set()
+
+ for ace_action, ace_principal, ace_permissions in acl:
+ if not hasattr(ace_permissions, '__iter__'):
+ ace_permissions = [ace_permissions]
+ if ace_action == Allow and permission in ace_permissions:
+ if not ace_principal in denied_here:
+ allowed_here.add(ace_principal)
+ if ace_action == Deny and permission in ace_permissions:
+ denied_here.add(ace_principal)
+ if ace_principal == Everyone:
+ # clear the entire allowed set, as we've hit a
+ # deny of Everyone ala (Deny, Everyone, ALL)
+ allowed = set()
+ break
+ elif ace_principal in allowed:
+ allowed.remove(ace_principal)
+
+ allowed.update(allowed_here)
+
+ return allowed
diff --git a/repoze/bfg/interfaces.py b/repoze/bfg/interfaces.py
index cecc3a397..2b00ac18f 100644
--- a/repoze/bfg/interfaces.py
+++ b/repoze/bfg/interfaces.py
@@ -197,15 +197,26 @@ class INotFoundAppFactory(Interface):
a``message`` key in the WSGI environ provides information
pertaining to the reason for the notfound."""
+class IForbiddenResponseFactory(Interface):
+ """ A utility which returns an IResponse as the result of the
+ denial of a view invocation by a security policy."""
+ def __call__(context, request):
+ """ Return an object implementing IResponse (an object with
+ the status, headerlist, and app_iter attributes) as a result
+ of a view invocation denial by a security policy.
+
+ Note that the ``message`` key in the WSGI environ
+ (request.environ) provides information pertaining to the
+ reason for the view invocation denial. The ``context`` passed
+ to the forbidden app factory will be the context found by the
+ repoze.bfg router during traversal or url dispatch. The
+ ``request`` will be the request object which caused the deny."""
+
class IUnauthorizedAppFactory(Interface):
""" A utility which returns an Unauthorized WSGI application
- factory"""
- def __call__():
- """ Return a callable which returns an unauthorized WSGI
- application. When the WSGI application is invoked, a
- ``message`` key in the WSGI environ provides information
- pertaining to the reason for the unauthorized."""
-
+ factory (deprecated in repoze.bfg 0.8.2) in favor of
+ IForbiddenResponseFactory """
+
class IContextURL(Interface):
""" An adapter which deals with URLs related to a context.
"""
@@ -220,6 +231,37 @@ class IRoutesContextFactory(Interface):
""" A marker interface used to look up the default routes context factory
"""
+class IAuthenticationPolicy(Interface):
+ """ A multi-adapter on context and request """
+ def authenticated_userid(context, request):
+ """ Return the authenticated userid or ``None`` if no
+ authenticated userid can be found. """
+
+ def effective_principals(context, request):
+ """ Return a sequence representing the effective principals
+ including the userid and any groups belonged to by the current
+ user, including 'system' groups such as Everyone and
+ Authenticated. """
+
+ def remember(context, request, principal, **kw):
+ """ Return a set of headers suitable for 'remembering' the
+ principal named ``principal`` when set in a response. An
+ individual authentication policy and its consumers can decide
+ on the composition and meaning of **kw. """
+
+ def forget(context, request):
+ """ Return a set of headers suitable for 'forgetting' the
+ current user on subsequent requests. """
+
+class IAuthorizationPolicy(Interface):
+ """ A adapter on context """
+ def permits(context, principals, permission):
+ """ Return True if any of the principals is allowed the
+ permission in the current context, else return False """
+
+ def principals_allowed_by_permission(context, permission):
+ """ Return a set of principal identifiers allowed by the permission """
+
# VH_ROOT_KEY is an interface; its imported from other packages (e.g.
# traversalwrapper)
VH_ROOT_KEY = 'HTTP_X_VHM_ROOT'
diff --git a/repoze/bfg/registry.py b/repoze/bfg/registry.py
index 0cf8e306b..cacf9806c 100644
--- a/repoze/bfg/registry.py
+++ b/repoze/bfg/registry.py
@@ -10,6 +10,8 @@ from zope.component.registry import Components
from zope.deprecation import deprecated
+from repoze.bfg.threadlocal import manager
+
from repoze.bfg.zcml import zcml_configure
from repoze.bfg.settings import Settings # alias for deprecation below
@@ -48,35 +50,8 @@ class Registry(Components):
for ignored in self.subscribers(events, None):
""" """
-class ThreadLocalRegistryManager(threading.local):
- def __init__(self):
- self.stack = []
-
- def push(self, registry):
- self.stack.append(registry)
-
- set = push # backwards compatibility
-
- def pop(self):
- if self.stack:
- return self.stack.pop()
-
- def get(self):
- try:
- return self.stack[-1]
- except IndexError:
- return getGlobalSiteManager()
-
- def clear(self):
- self.stack[:] = []
-
-registry_manager = ThreadLocalRegistryManager()
-
-def setRegistryManager(manager): # for unit tests
- global registry_manager
- old_registry_manager = registry_manager
- registry_manager = manager
- return old_registry_manager
+def get_registry():
+ return manager.get()['registry']
def populateRegistry(registry, filename, package, lock=threading.Lock()):
@@ -95,19 +70,19 @@ def populateRegistry(registry, filename, package, lock=threading.Lock()):
registry."""
lock.acquire()
- registry_manager.push(registry)
+ manager.push({'registry':registry, 'request':None})
try:
original_getSiteManager.sethook(getSiteManager)
- zope.component.getGlobalSiteManager = registry_manager.get
+ zope.component.getGlobalSiteManager = get_registry
zcml_configure(filename, package)
finally:
zope.component.getGlobalSiteManager = getGlobalSiteManager
lock.release()
- registry_manager.pop()
+ manager.pop()
def getSiteManager(context=None):
if context is None:
- return registry_manager.get()
+ return get_registry()
else:
try:
return IComponentLookup(context)
diff --git a/repoze/bfg/router.py b/repoze/bfg/router.py
index 81bc6e4ef..1b535c442 100644
--- a/repoze/bfg/router.py
+++ b/repoze/bfg/router.py
@@ -1,42 +1,55 @@
+from cgi import escape
import sys
from webob import Request as WebObRequest
+from webob import Response
from zope.component.event import dispatch
+from zope.component import queryUtility
from zope.interface import implements
+from repoze.bfg.authorization import ACLAuthorizationPolicy
+
from repoze.bfg.events import NewRequest
from repoze.bfg.events import NewResponse
from repoze.bfg.events import WSGIApplicationCreatedEvent
from repoze.bfg.interfaces import ILogger
+from repoze.bfg.interfaces import ISecurityPolicy
from repoze.bfg.interfaces import INotFoundAppFactory
from repoze.bfg.interfaces import IRequestFactory
+from repoze.bfg.interfaces import IResponseFactory
from repoze.bfg.interfaces import IRootFactory
from repoze.bfg.interfaces import IRouter
from repoze.bfg.interfaces import IRoutesMapper
-from repoze.bfg.interfaces import ISecurityPolicy
from repoze.bfg.interfaces import ISettings
+from repoze.bfg.interfaces import IForbiddenResponseFactory
from repoze.bfg.interfaces import IUnauthorizedAppFactory
from repoze.bfg.interfaces import IView
from repoze.bfg.interfaces import IViewPermission
+from repoze.bfg.interfaces import IAuthorizationPolicy
+from repoze.bfg.interfaces import IAuthenticationPolicy
from repoze.bfg.log import make_stream_logger
from repoze.bfg.registry import Registry
-from repoze.bfg.registry import registry_manager
from repoze.bfg.registry import populateRegistry
from repoze.bfg.request import HTTP_METHOD_FACTORIES
from repoze.bfg.request import Request
+from repoze.bfg.secpols import registerBBBAuthn
+
+from repoze.bfg.security import Allowed
+
from repoze.bfg.settings import Settings
-from repoze.bfg.urldispatch import RoutesRootFactory
+from repoze.bfg.threadlocal import manager
from repoze.bfg.traversal import _traverse
-from repoze.bfg.view import _view_execution_permitted
-from repoze.bfg.wsgi import Unauthorized
+
+from repoze.bfg.urldispatch import RoutesRootFactory
+
from repoze.bfg.wsgi import NotFound
_marker = object()
@@ -47,25 +60,51 @@ class Router(object):
debug_authorization = False
debug_notfound = False
+ threadlocal_manager = manager
def __init__(self, registry):
self.registry = registry
+ self.logger = registry.queryUtility(ILogger, 'repoze.bfg.debug')
self.request_factory = registry.queryUtility(IRequestFactory)
- self.security_policy = registry.queryUtility(ISecurityPolicy)
- self.notfound_app_factory = registry.queryUtility(
- INotFoundAppFactory,
- default=NotFound)
- self.unauth_app_factory = registry.queryUtility(
- IUnauthorizedAppFactory,
- default=Unauthorized)
+ unauthorized_app_factory = registry.queryUtility(
+ IUnauthorizedAppFactory)
+
+ forbidden = None
+
+ if unauthorized_app_factory is not None:
+ warning = (
+ 'Instead of registering a utility against the '
+ 'repoze.bfg.interfaces.IUnauthorizedAppFactory interface '
+ 'to return a custom forbidden response, you should now '
+ 'register a "repoze.interfaces.IForbiddenResponseFactory". '
+ 'The IUnauthorizedAppFactory interface was deprecated in '
+ 'repoze.bfg 0.9 and will be removed in a subsequent version '
+ 'of repoze.bfg. See the "Hooks" chapter of the repoze.bfg '
+ 'documentation for more information about '
+ 'IForbiddenResponseFactory.')
+ self.logger and self.logger.warn(warning)
+ def forbidden(context, request):
+ app = unauthorized_app_factory()
+ response = request.get_response(app)
+ return response
+
+ forbidden = registry.queryUtility(IForbiddenResponseFactory,
+ default=forbidden)
+
+ self.forbidden_resp_factory = forbidden or default_forbidden_view
+
+ self.notfound_app_factory = registry.queryUtility(INotFoundAppFactory,
+ default=NotFound)
+
settings = registry.queryUtility(ISettings)
if settings is not None:
self.debug_authorization = settings.debug_authorization
self.debug_notfound = settings.debug_notfound
+
+ self.secured = not not registry.queryUtility(IAuthenticationPolicy)
- self.logger = registry.queryUtility(ILogger, 'repoze.bfg.debug')
self.root_factory = registry.getUtility(IRootFactory)
self.root_policy = self.root_factory # b/w compat
self.traverser_warned = {}
@@ -78,7 +117,11 @@ class Router(object):
iterable.
"""
registry = self.registry
- registry_manager.push(registry)
+ threadlocals = {'registry':registry, 'request':None}
+ self.threadlocal_manager.push(threadlocals)
+
+ logger = self.logger
+ request = None
try:
if self.request_factory is None:
@@ -92,6 +135,7 @@ class Router(object):
request_factory = self.request_factory
request = request_factory(environ)
+ threadlocals['request'] = request
registry.has_listeners and registry.notify(NewRequest(request))
root = self.root_factory(environ)
@@ -121,37 +165,48 @@ class Router(object):
request.virtual_root = vroot
request.virtual_root_path = vroot_path
- security_policy = self.security_policy
-
- permission = None
-
- if security_policy is not None:
- permission = registry.queryMultiAdapter((context, request),
- IViewPermission,
- name=view_name)
+ if self.secured:
- debug_authorization = self.debug_authorization
+ permitted = registry.queryMultiAdapter((context, request),
+ IViewPermission,
+ name=view_name)
- permitted = _view_execution_permitted(context, request, view_name,
- security_policy, permission,
- debug_authorization)
+ if permitted is None:
+ if self.debug_authorization:
+ permitted = Allowed(
+ 'Allowed: view name %r in context %r (no '
+ 'permission registered).' %
+ (view_name, context))
+ else:
+ permitted = True
- logger = self.logger
+
+ else:
+ if self.debug_authorization:
+ permitted = Allowed(
+ 'Allowed: view name %r in context %r (no '
+ 'authentication policy in use).' % (view_name, context))
+ else:
+ permitted = True
- if debug_authorization:
+ if self.debug_authorization:
logger and logger.debug(
'debug_authorization of url %s (view name %r against '
'context %r): %s' % (
request.url, view_name, context, permitted)
)
+
if not permitted:
- if debug_authorization:
+ if self.debug_authorization:
msg = str(permitted)
else:
msg = 'Unauthorized: failed security policy check'
- environ['message'] = msg
- unauth_app = self.unauth_app_factory()
- return unauth_app(environ, start_response)
+
+ environ['repoze.bfg.message'] = msg
+
+ response = self.forbidden_resp_factory(context, request)
+ start_response(response.status, response.headerlist)
+ return response.app_iter
response = registry.queryMultiAdapter(
(context, request), IView, name=view_name)
@@ -168,7 +223,7 @@ class Router(object):
logger and logger.debug(msg)
else:
msg = request.url
- environ['message'] = msg
+ environ['repoze.bfg.message'] = msg
notfound_app = self.notfound_app_factory()
return notfound_app(environ, start_response)
@@ -182,28 +237,89 @@ class Router(object):
'Non-response object returned from view: %r' % response)
finally:
- registry_manager.pop()
+ self.threadlocal_manager.pop()
+
+def default_forbidden_view(context, request):
+ status = '401 Unauthorized'
+ try:
+ msg = escape(request.environ['repoze.bfg.message'])
+ except KeyError:
+ msg = ''
+ html = """
+ <html>
+ <title>%s</title>
+ <body>
+ <h1>%s</h1>
+ <code>%s</code>
+ </body>
+ </html>
+ """ % (status, status, msg)
+ headers = [('Content-Length', str(len(html))),
+ ('Content-Type', 'text/html')]
+ response_factory = queryUtility(IResponseFactory, default=Response)
+ return response_factory(status = status,
+ headerlist = headers,
+ app_iter = [html])
def make_app(root_factory, package=None, filename='configure.zcml',
- options=None):
- """ Return a Router object, representing a ``repoze.bfg`` WSGI
- application. ``root_factory`` must be a callable that accepts a
- WSGI environment and returns a root object. ``package`` is a
- Python module representing the application's package, ``filename``
- is the filesystem path to a ZCML file (optionally relative to the
- package path) that should be parsed to create the application
- registry. ``options``, if used, should be a dictionary containing
- runtime options (e.g. the key/value pairs in an app section of a
+ authentication_policy=None, authorization_policy=None,
+ options=None, registry=None, debug_logger=None):
+ # registry and debug_logger *only* for unittests
+ """ Return a Router object, representing a fully configured
+ ``repoze.bfg`` WSGI application.
+
+ ``root_factory`` must be a callable that accepts a WSGI
+ environment and returns a traversal root object. It may be
+ ``None``, in which case traversal is not performed at all.
+ Instead, all URL-to-code mapping is done via URL dispatch (aka
+ Routes).
+
+ ``package`` is a Python module representing the application's
+ package. It is optional, defaulting to ``None``. If ``package``
+ is ``None``, the ``filename`` passed must be an absolute pathname
+ to a ZCML file that represents the application's configuration.
+
+ ``filename`` is the filesystem path to a ZCML file (optionally
+ relative to the package path) that should be parsed to create the
+ application registry. It defaults to ``configure.zcml``.
+
+ ``authentication_policy`` should be an object that implements the
+ ``repoze.bfg.interfaces.IAuthenticationPolicy`` interface (e.g.
+ it might be an instance of
+ ``repoze.bfg.authentication.RemoteUserAuthenticationPolicy``) or
+ ``None``. If ``authentication_policy`` is ``None``, no
+ authentication or authorization will be performed. Instead, BFG
+ will ignore any view permission assertions in your application and
+ imperative security checks performed by your application will
+ always return ``True``.
+
+ ``authorization_policy`` is an object that implements the
+ ``repoze.bfg.interfaces.IAuthorizationPoicy`` interface
+ (notionally) or ``None``. If the ``authentication_policy``
+ argument is ``None``, this argument is ignored entirely because
+ being able to authorize access to a user depends on being able to
+ authenticate that user. If the ``authentication_policy`` argument
+ is *not* ``None``, and the ``authorization_policy`` argument *is*
+ ``None``, the authorization policy defaults to an authorization
+ implementation that uses ACLs.
+
+ ``options``, if used, should be a dictionary containing runtime
+ options (e.g. the key/value pairs in an app section of a
PasteDeploy file), with each key representing the option and the
key's value representing the specific option value,
e.g. ``{'reload_templates':True}``"""
if options is None:
options = {}
+
regname = filename
+
if package:
regname = package.__name__
- registry = Registry(regname)
- debug_logger = make_stream_logger('repoze.bfg.debug', sys.stderr)
+ if registry is None:
+ registry = Registry(regname)
+
+ if debug_logger is None:
+ debug_logger = make_stream_logger('repoze.bfg.debug', sys.stderr)
registry.registerUtility(debug_logger, ILogger, 'repoze.bfg.debug')
settings = Settings(options)
registry.registerUtility(settings, ISettings)
@@ -221,18 +337,39 @@ def make_app(root_factory, package=None, filename='configure.zcml',
'root_factory (aka get_root) was None and no routes connected')
registry.registerUtility(root_factory, IRootFactory)
+
+ if authentication_policy:
+ registry.registerUtility(authentication_policy, IAuthenticationPolicy)
+ if authorization_policy is None:
+ authorization_policy = ACLAuthorizationPolicy()
+ registry.registerUtility(authorization_policy, IAuthorizationPolicy)
+ else:
+ # deal with bw compat of <= 0.8 security policies (deprecated)
+ secpol = registry.queryUtility(ISecurityPolicy)
+ if secpol is not None:
+ debug_logger.warn(
+ 'Your application is using a repoze.bfg ``ISecurityPolicy`` '
+ '(probably registered via ZCML). This form of security policy '
+ 'has been deprecated in BFG 0.9. See the "Security" chapter '
+ 'of the repoze.bfg documentation to see how to register a more '
+ 'up to date set of security policies (an authentication '
+ 'policy and an authorization policy). ISecurityPolicy-based '
+ 'security policies will cease to work in a later BFG '
+ 'release.')
+ registerBBBAuthn(secpol, registry)
+
app = Router(registry)
# We push the registry on to the stack here in case any ZCA API is
# used in listeners subscribed to the WSGIApplicationCreatedEvent
# we send.
- registry_manager.push(registry)
+ manager.push({'registry':registry, 'request':None})
try:
# use dispatch here instead of registry.notify to make unit
# tests possible
dispatch(WSGIApplicationCreatedEvent(app))
finally:
- registry_manager.pop()
+ manager.pop()
return app
diff --git a/repoze/bfg/scifi.py b/repoze/bfg/scifi.py
new file mode 100644
index 000000000..dcc8fa1c1
--- /dev/null
+++ b/repoze/bfg/scifi.py
@@ -0,0 +1,257 @@
+# Tweak BFG slightly to allow for separate authentication and
+# authorization policies, so we don't have security policies named
+# e.g. "RepozeWhoInheritingACLSecurityPolicy". We'll add
+# IAuthenticationPolicy objects and IAuthorizationPolicy objects;
+# these will be adapters. We'll also change ISecurityPolicy to be an
+# adapter rather than a utility. We'll tweak the function API to use
+# these adapters.
+
+# b/w incompats: the "authorization" policy needs access to the
+# context, which the APIs it maps to in a current BFG security policy
+# don't have; code which depends on these APIs will need to change.
+
+from zope.interface import implements
+from zope.interface import Interface
+
+class IAuthenticationPolicy(Interface):
+ """ A multi-adapter on context and request """
+ def authenticated_userid():
+ """ Return the authenticated userid or None if no
+ authenticated userid can be found """
+
+ def effective_principals():
+ """ Return a sequence representing the effective principals
+ (including the userid and any groups belonged to by the
+ current user, including 'system' groups such as Everyone and
+ Authenticated"""
+
+ def challenge():
+ """ Return an IResponse object representing a challenge, such
+ as a login form or a basic auth dialog """
+
+ def remember(self, principal, token):
+ """ Return a set of headers suitable for 'remembering' the
+ principal on subsequent requests """
+
+ def forget():
+ """ Return a set of headers suitable for 'forgetting' the
+ current user on subsequent requests"""
+
+class IAuthorizationPolicy(Interface):
+ """ An adapter on context """
+ def permits(self, principals, permission):
+ """ Return True if any of the principals is allowed the
+ permission in the current context, else return False """
+
+ def principals_allowed_by_permission(self, permission):
+ """ Return a set of principal identifiers allowed by the permission """
+
+class ISecurityPolicy(Interface):
+ """ A multi-adapter on context and request """
+ def permits(permission):
+ """ Returns True if the combination of the authorization
+ information in the context and the authentication data in
+ the request allow the action implied by the permission """
+
+ def authenticated_userid():
+ """ Return the userid of the currently authenticated user or
+ None if there is no currently authenticated user """
+
+ def effective_principals():
+ """ Return the list of 'effective' principals for the request.
+ This must include the userid of the currently authenticated
+ user if a user is currently authenticated."""
+
+ def principals_allowed_by_permission(permission):
+ """ Return a sequence of principal identifiers allowed by the
+ ``permission`` in the model implied by ``context``. This
+ method may not be supported by a given security policy
+ implementation, in which case, it should raise a
+ ``NotImplementedError`` exception."""
+
+ def forbidden():
+ """ This method should return an IResponse object (an object
+ with the attributes ``status``, ``headerlist``, and
+ ``app_iter``) as a result of a view invocation denial. The
+ ``forbidden`` method of a security policy will be called by
+ ``repoze.bfg`` when view invocation is denied (usually as a
+ result of the ``permit`` method of the same security policy
+ returning False to the Router).
+
+ The ``forbidden`` method of a security will not be called when
+ an ``IForbiddenResponseFactory`` utility is registered;
+ instead the ``IForbiddenResponseFactory`` utility will serve
+ the forbidden response.
+
+ Note that the ``repoze.bfg.message`` key in the environ passed
+ to the WSGI app will contain the 'raw' reason that view
+ invocation was denied by repoze.bfg. The ``context`` object
+ passed in will be the context found by ``repoze.bfg`` when the
+ denial was found and the ``request`` will be the request which
+ caused the denial."""
+
+# an implementation of an authentication policy that uses repoze.who
+
+class RepozeWhoAuthenticationPolicy(object):
+ """ A BFG authentication policy which obtains data from the
+ repoze.who API """
+ implements(IAuthenticationPolicy)
+ def __init__(self, context, request):
+ self.context = context
+ self.request = request
+ from repoze.who.api import api_from_environ
+ self.api = api_from_environ(request.environ)
+
+ def authenticated_userid(self):
+ identity = self.api.authenticate()
+ if identity is None:
+ return None
+ return identity['repoze.who.userid']
+
+ def effective_principals(self):
+ effective_principals = [Everyone]
+ identity = self.api.authenticate()
+ if identity is None:
+ return effective_principals
+
+ effective_principals.append(Authenticated)
+ userid = identity['repoze.who.userid']
+ groups = identity.get('groups', [])
+ effective_principals.append(userid)
+ effective_principals.extend(groups)
+
+ return effective_principals
+
+ def challenge(self):
+ return self.api.challenge()
+
+ def remember(self, principal, token):
+ return self.api.remember({'repoze.who.userid':principal,
+ 'password':token})
+
+ def forget(self):
+ return self.api.forget()
+
+# an implementation of an authentication policy that uses a cookie
+
+class StandaloneAuthenticationPolicy:
+ """ A BFG authentication policy which obtains data from a cookie
+ and a local storage system """
+ def __init__(self, context, request):
+ self.context = context
+ self.request = request
+
+ def _check_password(self, userid, password):
+ """ Return true if the password is good for the userid """
+ raise NotImplementedError # you get the idea
+
+ def _groups_from_userid(self, userid):
+ """ Return a sequence of groups given a user id """
+ raise NotImplementedError # you get the idea
+
+ def _userid_from_login(self, login):
+ """ Return a userid given a login name """
+ raise NotImplementedError # you get the idea
+
+ def _decrypt(self, cookieval):
+ """ Return decrypted login and password"""
+ raise NotImplementedError # you get the idea
+
+ def _encrypt(self, userid, password):
+ """ Return encrypted hash of userid and password for cookie val """
+ raise NotImplementedError # you get the idea
+
+ def authenticated_userid(self):
+ cookieval = request.cookies.get('oatmeal')
+ try:
+ login, password = self._decrypt(cookieval)
+ except:
+ return None
+ userid = self._userid_from_login(login)
+ if self._check_password(userid, password):
+ return userid
+
+ def effective_principals(self):
+ effective_principals = [Everyone]
+ userid = self.authenticated_userid()
+ if userid is None:
+ return effective_principals
+
+ effective_principals.append(Authenticated)
+ groups = self._groups_from_userid(userid)
+ effective_principals.append(userid)
+ effective_principals.extend(groups)
+
+ return effective_principals
+
+ def challenge(self):
+ userid = self.authenticated_userid()
+ if userid:
+ return render_template_to_response('templates/forbidden.pt')
+ else:
+ return render_template_to_response('templates/login_form.pt')
+
+ def remember(self, principal, token):
+ cookieval = self._encrypt(principal, token)
+ return [ ('Set-Cookie', 'oatmeal=%s' % cookieval) ]
+
+ def forget(self):
+ return [ ('Set-Cookie', 'oatmeal=') ]
+
+# an implementation of an authorization policy that uses ACLs
+
+class ACLAuthorizationPolicy(object):
+ implements(IAuthorizationPolicy)
+ def __init__(self, context):
+ self.context = context
+
+ def permits(self, principals, permission):
+ """ """
+ # do stuff to figure out of any of the principals is allowed
+ # the permission by any ACL in the current context's hierarchy
+
+ def principals_allowed_by_permission(self, permission):
+ """ """
+ # return the sequence of principals allowed by the permission
+ # according to the ACLs in the current context' hierarchy
+
+# present a rolled up "face" to both authn and autz policies in the
+# form of a "security policy"; users will interact with this API
+# rather than the authn or authz policies directly.
+
+class SecurityPolicy(object):
+ """ Use separate authn and authz to form a BFG security policy;
+ this will never be overridden, it is concrete. It is an adapter. """
+ implements(ISecurityPolicy)
+ def __init__(self, context, request):
+ self.context = context
+ self.request = request
+ self.authn = getMultiAdapter((self.context, self.request),
+ IAuthenticationPolicy)
+ self.authz = getAdapter(context, IAuthorizationPolicy)
+
+ def permits(self, permission):
+ principals = set(self.authn.effective_principals())
+
+ if authz.permits(principals, permission):
+ return True
+
+ return False
+
+ def authenticated_userid(self):
+ return self.authn.authenticated_userid()
+
+ def effective_principals(self):
+ return self.authn.effective_principals()
+
+ def principals_allowed_by_permission(self):
+ return self.authz.principals_allowed_by_permission()
+
+ def forbidden(self):
+ return self.authn.challenge()
+
+ def remember(self, principal, token):
+ return self.authn.remember(principal, token)
+
+ def forget(self):
+ return self.authn.forget()
diff --git a/repoze/bfg/secpols.py b/repoze/bfg/secpols.py
new file mode 100644
index 000000000..0f0fc7e66
--- /dev/null
+++ b/repoze/bfg/secpols.py
@@ -0,0 +1,469 @@
+from zope.deprecation import deprecated
+from zope.interface import implements
+
+from repoze.bfg.interfaces import ISecurityPolicy
+from repoze.bfg.interfaces import IAuthorizationPolicy
+from repoze.bfg.interfaces import IAuthenticationPolicy
+
+from repoze.bfg.location import lineage
+
+from repoze.bfg.threadlocal import manager
+
+from repoze.bfg.security import Allow
+from repoze.bfg.security import Deny
+from repoze.bfg.security import ACLAllowed
+from repoze.bfg.security import ACLDenied
+from repoze.bfg.security import Everyone
+from repoze.bfg.security import Authenticated
+
+class ACLSecurityPolicy(object):
+ implements(ISecurityPolicy)
+
+ def __init__(self, get_principals):
+ self.get_principals = get_principals
+
+ def permits(self, context, request, permission):
+ """ Return ``ACLAllowed`` if the policy permits access,
+ ``ACLDenied`` if not. """
+ principals = set(self.effective_principals(request))
+
+ for location in lineage(context):
+ try:
+ acl = location.__acl__
+ except AttributeError:
+ continue
+
+ for ace in acl:
+ ace_action, ace_principal, ace_permissions = ace
+ if ace_principal in principals:
+ if not hasattr(ace_permissions, '__iter__'):
+ ace_permissions = [ace_permissions]
+ if permission in ace_permissions:
+ if ace_action == Allow:
+ return ACLAllowed(ace, acl, permission,
+ principals, location)
+ else:
+ return ACLDenied(ace, acl, permission,
+ principals, location)
+
+ # default deny if no ACE matches in the ACL found
+ result = ACLDenied(None, acl, permission, principals, location)
+ return result
+
+ # default deny if no ACL in lineage at all
+ return ACLDenied(None, None, permission, principals, context)
+
+ def authenticated_userid(self, request):
+ principals = self.get_principals(request)
+ if principals:
+ return principals[0]
+
+ def effective_principals(self, request):
+ effective_principals = [Everyone]
+ principal_ids = self.get_principals(request)
+
+ if principal_ids:
+ effective_principals.append(Authenticated)
+ effective_principals.extend(principal_ids)
+
+ return effective_principals
+
+ def principals_allowed_by_permission(self, context, permission):
+ for location in lineage(context):
+ try:
+ acl = location.__acl__
+ except AttributeError:
+ continue
+
+ allowed = {}
+
+ for ace_action, ace_principal, ace_permissions in acl:
+ if ace_action == Allow:
+ if not hasattr(ace_permissions, '__iter__'):
+ ace_permissions = [ace_permissions]
+ if permission in ace_permissions:
+ allowed[ace_principal] = True
+ return sorted(allowed.keys())
+
+ return []
+
+class InheritingACLSecurityPolicy(object):
+ """ A security policy which uses ACLs in the following ways:
+
+ - When checking whether a user is permitted (via the ``permits``
+ method), the security policy consults the ``context`` for an ACL
+ first. If no ACL exists on the context, or one does exist but
+ the ACL does not explicitly allow or deny access for any of the
+ effective principals, consult the context's parent ACL, and so
+ on, until the lineage is exhausted or we determine that the
+ policy permits or denies.
+
+ During this processing, if any ``Deny`` ACE is found matching
+ any effective principal, stop processing by returning an
+ ``ACLDenied`` (equals False) immediately. If any ``Allow`` ACE
+ is found matching any effective principal, stop processing by
+ returning an ``ACLAllowed`` (equals True) immediately. If we
+ exhaust the context's lneage, and no ACE has explicitly
+ permitted or denied access, return an ``ACLDenied``. This
+ differs from the non-inheriting security policy (the
+ ``ACLSecurityPolicy``) by virtue of the fact that it does not
+ stop looking for ACLs in the object lineage after it finds the
+ first one.
+
+ - When computing principals allowed by a permission via the
+ ``principals_allowed_by_permission`` method, we compute the set
+ of principals that are explicitly granted the ``permission``.
+ We do this by walking 'up' the object graph *from the root* to
+ the context. During this walking process, if we find an
+ explicit ``Allow`` ACE for a principal that matches the
+ ``permission``, the principal is included in the allow list.
+ However, if later in the walking process that user is mentioned
+ in any ``Deny`` ACE for the permission, the user is removed from
+ the allow list. If a ``Deny`` to the principal ``Everyone`` is
+ encountered during the walking process that matches the
+ ``permission``, the allow list is cleared for all principals
+ encountered in previous ACLs. The walking process ends after
+ we've processed the any ACL directly attached to ``context``; a
+ list of principals is returned.
+
+ - Other aspects of this policy are the same as those in the
+ ACLSecurityPolicy (e.g. ``effective_principals``,
+ ``authenticated_userid``).
+ """
+ implements(ISecurityPolicy)
+
+ def __init__(self, get_principals):
+ self.get_principals = get_principals
+
+ def permits(self, context, request, permission):
+ """ Return ``ACLAllowed`` if the policy permits access,
+ ``ACLDenied`` if not. """
+ principals = set(self.effective_principals(request))
+
+ for location in lineage(context):
+ try:
+ acl = location.__acl__
+ except AttributeError:
+ continue
+
+ for ace in acl:
+ ace_action, ace_principal, ace_permissions = ace
+ if ace_principal in principals:
+ if not hasattr(ace_permissions, '__iter__'):
+ ace_permissions = [ace_permissions]
+ if permission in ace_permissions:
+ if ace_action == Allow:
+ return ACLAllowed(ace, acl, permission,
+ principals, location)
+ else:
+ return ACLDenied(ace, acl, permission,
+ principals, location)
+
+ # default deny if no ACL in lineage at all
+ return ACLDenied(None, None, permission, principals, context)
+
+ def authenticated_userid(self, request):
+ principals = self.get_principals(request)
+ if principals:
+ return principals[0]
+
+ def effective_principals(self, request):
+ effective_principals = [Everyone]
+ principal_ids = self.get_principals(request)
+
+ if principal_ids:
+ effective_principals.append(Authenticated)
+ effective_principals.extend(principal_ids)
+
+ return effective_principals
+
+ def principals_allowed_by_permission(self, context, permission):
+ allowed = set()
+
+ for location in reversed(list(lineage(context))):
+ # NB: we're walking *up* the object graph from the root
+ try:
+ acl = location.__acl__
+ except AttributeError:
+ continue
+
+ allowed_here = set()
+ denied_here = set()
+
+ for ace_action, ace_principal, ace_permissions in acl:
+ if not hasattr(ace_permissions, '__iter__'):
+ ace_permissions = [ace_permissions]
+ if ace_action == Allow and permission in ace_permissions:
+ if not ace_principal in denied_here:
+ allowed_here.add(ace_principal)
+ if ace_action == Deny and permission in ace_permissions:
+ denied_here.add(ace_principal)
+ if ace_principal == Everyone:
+ # clear the entire allowed set, as we've hit a
+ # deny of Everyone ala (Deny, Everyone, ALL)
+ allowed = set()
+ break
+ elif ace_principal in allowed:
+ allowed.remove(ace_principal)
+
+ allowed.update(allowed_here)
+
+ return allowed
+
+def get_remoteuser(request):
+ user_id = request.environ.get('REMOTE_USER')
+ if user_id:
+ return [user_id]
+ return []
+
+def RemoteUserACLSecurityPolicy():
+ """ A security policy which:
+
+ - examines the request.environ for the REMOTE_USER variable and
+ uses any non-false value as a principal id for this request.
+
+ - uses an ACL-based authorization model which attempts to find the
+ *first* ACL in the context' lineage. It returns ``Allowed`` from
+ its 'permits' method if the single ACL found grants access to the
+ current principal. It returns ``Denied`` if permission was not
+ granted (either explicitly via a deny or implicitly by not finding
+ a matching ACE action). The *first* ACL found in the context's
+ lineage is considered canonical; no searching is done for other
+ security attributes after the first ACL is found in the context'
+ lineage. Use the 'inheriting' variant of this policy to consider
+ more than one ACL in the lineage.
+
+ An ACL is an ordered sequence of ACE tuples, e.g. ``[(Allow,
+ Everyone, 'read'), (Deny, 'george', 'write')]``. ACLs stored on
+ model instance objects as their ``__acl__`` attribute will be used
+ by the security machinery to grant or deny access.
+
+ Enable this security policy by adding the following to your
+ application's ``configure.zcml``:
+
+ .. code-block:: xml
+
+ <utility
+ provides="repoze.bfg.interfaces.ISecurityPolicy"
+ factory="repoze.bfg.security.RemoteUserACLSecurityPolicy"
+ />
+
+ """
+ return ACLSecurityPolicy(get_remoteuser)
+
+def RemoteUserInheritingACLSecurityPolicy():
+ """ A security policy which:
+
+ - examines the request.environ for the REMOTE_USER variable and
+ uses any non-false value as a principal id for this request.
+
+ - Differs from the non-inheriting security policy variants
+ (e.g. ``ACLSecurityPolicy``) by virtue of the fact that it does
+ not stop looking for ACLs in the object lineage after it finds
+ the first one.
+
+ - When checking whether a user is permitted (via the ``permits``
+ method), the security policy consults the ``context`` for an ACL
+ first. If no ACL exists on the context, or one does exist but
+ the ACL does not explicitly allow or deny access for any of the
+ effective principals, consult the context's parent ACL, and so
+ on, until the lineage is exhausted or we determine that the
+ policy permits or denies.
+
+ During this processing, if any ``Deny`` ACE is found matching
+ any effective principal, stop processing by returning an
+ ``ACLDenied`` (equals False) immediately. If any ``Allow`` ACE
+ is found matching any effective principal, stop processing by
+ returning an ``ACLAllowed`` (equals True) immediately. If we
+ exhaust the context's lneage, and no ACE has explicitly
+ permitted or denied access, return an ``ACLDenied``.
+
+ - When computing principals allowed by a permission via the
+ ``principals_allowed_by_permission`` method, we compute the set
+ of principals that are explicitly granted the ``permission``.
+ We do this by walking 'up' the object graph *from the root* to
+ the context. During this walking process, if we find an
+ explicit ``Allow`` ACE for a principal that matches the
+ ``permission``, the principal is included in the allow list.
+ However, if later in the walking process that user is mentioned
+ in any ``Deny`` ACE for the permission, the user is removed from
+ the allow list. If a ``Deny`` to the principal ``Everyone`` is
+ encountered during the walking process that matches the
+ ``permission``, the allow list is cleared for all principals
+ encountered in previous ACLs. The walking process ends after
+ we've processed the any ACL directly attached to ``context``; a
+ list of principals is returned.
+
+ - Other aspects of this policy are the same as those in the
+ ACLSecurityPolicy (e.g. ``effective_principals``,
+ ``authenticated_userid``).
+
+ Enable this security policy by adding the following to your
+ application's ``configure.zcml``:
+
+ .. code-block:: xml
+
+ <utility
+ provides="repoze.bfg.interfaces.ISecurityPolicy"
+ factory="repoze.bfg.security.RemoteUserInheritingACLSecurityPolicy"
+ />
+
+ """
+ return InheritingACLSecurityPolicy(get_remoteuser)
+
+def get_who_principals(request):
+ identity = request.environ.get('repoze.who.identity')
+ if not identity:
+ return []
+ principals = [identity['repoze.who.userid']]
+ principals.extend(identity.get('groups', []))
+ return principals
+
+def WhoACLSecurityPolicy():
+ """
+ A security policy which:
+
+ - examines the request.environ for the ``repoze.who.identity``
+ dictionary. If one is found, the principal ids for the request
+ are composed of ``repoze.who.identity['repoze.who.userid']``
+ plus ``repoze.who.identity.get('groups', [])``.
+
+ - uses an ACL-based authorization model which attempts to find the
+ *first* ACL in the context' lineage. It returns ``Allowed`` from
+ its 'permits' method if the single ACL found grants access to the
+ current principal. It returns ``Denied`` if permission was not
+ granted (either explicitly via a deny or implicitly by not finding
+ a matching ACE action). The *first* ACL found in the context's
+ lineage is considered canonical; no searching is done for other
+ security attributes after the first ACL is found in the context'
+ lineage. Use the 'inheriting' variant of this policy to consider
+ more than one ACL in the lineage.
+
+ An ACL is an ordered sequence of ACE tuples, e.g. ``[(Allow,
+ Everyone, 'read'), (Deny, 'george', 'write')]``. ACLs stored on
+ model instance objects as their ``__acl__`` attribute will be used
+ by the security machinery to grant or deny access.
+
+ Enable this security policy by adding the following to your
+ application's ``configure.zcml``:
+
+ .. code-block:: xml
+
+ <utility
+ provides="repoze.bfg.interfaces.ISecurityPolicy"
+ factory="repoze.bfg.security.WhoACLSecurityPolicy"
+ />
+ """
+ return ACLSecurityPolicy(get_who_principals)
+
+RepozeWhoIdentityACLSecurityPolicy = WhoACLSecurityPolicy
+
+deprecated('RepozeWhoIdentityACLSecurityPolicy',
+ '(repoze.bfg.security.RepozeWhoIdentityACLSecurityPolicy '
+ 'should now be imported as '
+ 'repoze.bfg.security.WhoACLSecurityPolicy)',
+ )
+
+def WhoInheritingACLSecurityPolicy():
+ """ A security policy which:
+
+ - examines the request.environ for the ``repoze.who.identity``
+ dictionary. If one is found, the principal ids for the request
+ are composed of ``repoze.who.identity['repoze.who.userid']``
+ plus ``repoze.who.identity.get('groups', [])``.
+
+ - Differs from the non-inheriting security policy variants
+ (e.g. ``ACLSecurityPolicy``) by virtue of the fact that it does
+ not stop looking for ACLs in the object lineage after it finds
+ the first one.
+
+ - When checking whether a user is permitted (via the ``permits``
+ method), the security policy consults the ``context`` for an ACL
+ first. If no ACL exists on the context, or one does exist but
+ the ACL does not explicitly allow or deny access for any of the
+ effective principals, consult the context's parent ACL, and so
+ on, until the lineage is exhausted or we determine that the
+ policy permits or denies.
+
+ During this processing, if any ``Deny`` ACE is found matching
+ any effective principal, stop processing by returning an
+ ``ACLDenied`` (equals False) immediately. If any ``Allow`` ACE
+ is found matching any effective principal, stop processing by
+ returning an ``ACLAllowed`` (equals True) immediately. If we
+ exhaust the context's lneage, and no ACE has explicitly
+ permitted or denied access, return an ``ACLDenied``.
+
+ - When computing principals allowed by a permission via the
+ ``principals_allowed_by_permission`` method, we compute the set
+ of principals that are explicitly granted the ``permission``.
+ We do this by walking 'up' the object graph *from the root* to
+ the context. During this walking process, if we find an
+ explicit ``Allow`` ACE for a principal that matches the
+ ``permission``, the principal is included in the allow list.
+ However, if later in the walking process that user is mentioned
+ in any ``Deny`` ACE for the permission, the user is removed from
+ the allow list. If a ``Deny`` to the principal ``Everyone`` is
+ encountered during the walking process that matches the
+ ``permission``, the allow list is cleared for all principals
+ encountered in previous ACLs. The walking process ends after
+ we've processed the any ACL directly attached to ``context``; a
+ list of principals is returned.
+
+ - Other aspects of this policy are the same as those in the
+ ACLSecurityPolicy (e.g. ``effective_principals``,
+ ``authenticated_userid``).
+
+ Enable this security policy by adding the following to your
+ application's ``configure.zcml``:
+
+ .. code-block:: xml
+
+ <utility
+ provides="repoze.bfg.interfaces.ISecurityPolicy"
+ factory="repoze.bfg.security.WhoInheritingACLSecurityPolicy"
+ />
+ """
+ return InheritingACLSecurityPolicy(get_who_principals)
+
+
+class SecurityPolicyToAuthorizationPolicyAdapter(object):
+ """ An adapter registered when an old-style ISecurityPolicy
+ utility is configured in ZCML instead of an IAuthorizationPolicy
+ utility """
+ implements(IAuthorizationPolicy)
+ def __init__(self, secpol):
+ self.secpol = secpol
+
+ def permits(self, context, principals, permission):
+ request = manager.get()['request']
+ return self.secpol.permits(context, request, permission)
+
+ def principals_allowed_by_permission(self, context, permission):
+ return self.secpol.principals_allowed_by_permission(context, permission)
+
+class SecurityPolicyToAuthenticationPolicyAdapter(object):
+ implements(IAuthenticationPolicy)
+ def __init__(self, secpol):
+ self.secpol = secpol
+
+ def authenticated_userid(self, context, request):
+ return self.secpol.authenticated_userid(request)
+
+ def effective_principals(self, context, request):
+ return self.secpol.effective_principals(request)
+
+ def remember(self, context, request, principal, **kw):
+ return []
+
+ def forget(self, context, request):
+ return []
+
+def registerBBBAuthn(secpol, registry):
+ # Used when an explicit authentication policy is not defined, and
+ # an an old-style ISecurityPolicy is registered (via ZCML), turn
+ # it into separate authorization and authentication utilities
+ # using adapters
+ authn = SecurityPolicyToAuthenticationPolicyAdapter(secpol)
+ authz = SecurityPolicyToAuthorizationPolicyAdapter(secpol)
+ registry.registerUtility(authn, IAuthenticationPolicy)
+ registry.registerUtility(authz, IAuthorizationPolicy)
diff --git a/repoze/bfg/security.py b/repoze/bfg/security.py
index 90916bac2..5f5252ff3 100644
--- a/repoze/bfg/security.py
+++ b/repoze/bfg/security.py
@@ -1,10 +1,11 @@
+import warnings
+
+from zope.component import queryMultiAdapter
from zope.component import queryUtility
-from zope.deprecation import deprecated
from zope.interface import implements
-from repoze.bfg.location import lineage
-
-from repoze.bfg.interfaces import ISecurityPolicy
+from repoze.bfg.interfaces import IAuthenticationPolicy
+from repoze.bfg.interfaces import IAuthorizationPolicy
from repoze.bfg.interfaces import IViewPermission
from repoze.bfg.interfaces import IViewPermissionFactory
@@ -31,458 +32,155 @@ def has_permission(permission, context, request):
``Allowed`` if the permission is granted in this context to the
user implied by the request. Return an instance of ``Denied`` if
this permission is not granted in this context to this user. This
- delegates to the current security policy. Return True
- unconditionally if no security policy has been configured in this
- application."""
- policy = queryUtility(ISecurityPolicy)
- if policy is None:
- return Allowed('No security policy in use.')
- return policy.permits(context, request, permission)
-
-def authenticated_userid(request):
+ function delegates to the current authentication and authorization
+ policies. Return ``Allowed`` unconditionally if no authentication
+ policy has been configured in this application."""
+ authn_policy = queryUtility(IAuthenticationPolicy)
+ if authn_policy is None:
+ return Allowed('No authentication policy in use.')
+
+ authz_policy = queryUtility(IAuthorizationPolicy)
+ if authz_policy is None:
+ raise ValueError('Authentication policy registered without '
+ 'authorization policy') # should never happen
+ principals = authn_policy.effective_principals(context, request)
+ return authz_policy.permits(context, principals, permission)
+
+def authenticated_userid(*args):
""" Return the userid of the currently authenticated user or
- ``None`` if there is no security policy in effect or there is no
- currently authenticated user"""
- policy = queryUtility(ISecurityPolicy)
+ ``None`` if there is no authentication policy in effect or there
+ is no currently authenticated user. """
+
+ largs = len(args)
+ if largs > 2:
+ raise TypeError(args)
+ if largs == 1:
+ request = args[0]
+ context = None
+ warnings.warn(
+ 'As of BFG 0.9, the "repoze.bfg.security.authenticated_userid" '
+ 'API now takes two arguments: "context" and "request". '
+ 'It is being called it with a single argument'
+ '(assumed to be a request). In a future version, the '
+ '"authenticated_userid API will stop accepting calls with a '
+ 'single argument; please fix the calling code.',
+ stacklevel=2)
+ else:
+ context, request = args
+
+ policy = queryUtility(IAuthenticationPolicy)
if policy is None:
return None
- return policy.authenticated_userid(request)
+ return policy.authenticated_userid(context, request)
-def effective_principals(request):
+def effective_principals(*args):
""" Return the list of 'effective' principal identifiers for the
request. This will include the userid of the currently
authenticated user if a user is currently authenticated. If no
- security policy is in effect, this will return an empty sequence."""
- policy = queryUtility(ISecurityPolicy)
+ authentication policy is in effect, this will return an empty
+ sequence."""
+
+ largs = len(args)
+ if largs > 2:
+ raise TypeError(args)
+ if largs == 1:
+ request = args[0]
+ context = None
+ warnings.warn(
+ 'As of BFG 0.9, the "repoze.bfg.security.effective_principals " '
+ 'API now takes two arguments: "context" and "request". '
+ 'It is being called it with a single argument'
+ '(assumed to be a request). In a future version, the '
+ '"effective_principals API will stop accepting calls with a '
+ 'single argument; please fix the calling code.',
+ stacklevel=2)
+ else:
+ context, request = args
+
+ policy = queryUtility(IAuthenticationPolicy)
if policy is None:
return []
- return policy.effective_principals(request)
+ return policy.effective_principals(context, request)
def principals_allowed_by_permission(context, permission):
""" Provided a context (a model object), and a permission (a
string or unicode object), return a sequence of principal ids that
- possess the permission in the context. If no security policy is
- in effect, this will return a sequence with the single value
+ possess the permission in the context. If no authorization policy
+ is in effect, this will return a sequence with the single value
representing ``Everyone`` (the special principal identifier
- representing all principals). Note that even if a security policy
- *is* in effect, some security policies may not implement the
- required machinery for this function; those will cause a
- ``NotImplementedError`` exception to be raised when this function
- is invoked."""
- policy = queryUtility(ISecurityPolicy)
+ representing all principals).
+
+ .. note:: even if an authorization policy *is* in effect, some
+ (exotic) authorization policies may not implement the required
+ machinery for this function; those will cause a
+ ``NotImplementedError`` exception to be raised when this
+ function is invoked.
+ """
+ policy = queryUtility(IAuthorizationPolicy)
if policy is None:
return [Everyone]
return policy.principals_allowed_by_permission(context, permission)
-class ACLSecurityPolicy(object):
- implements(ISecurityPolicy)
-
- def __init__(self, get_principals):
- self.get_principals = get_principals
-
- def permits(self, context, request, permission):
- """ Return ``ACLAllowed`` if the policy permits access,
- ``ACLDenied`` if not. """
- principals = set(self.effective_principals(request))
-
- for location in lineage(context):
- try:
- acl = location.__acl__
- except AttributeError:
- continue
-
- for ace in acl:
- ace_action, ace_principal, ace_permissions = ace
- if ace_principal in principals:
- if not hasattr(ace_permissions, '__iter__'):
- ace_permissions = [ace_permissions]
- if permission in ace_permissions:
- if ace_action == Allow:
- return ACLAllowed(ace, acl, permission,
- principals, location)
- else:
- return ACLDenied(ace, acl, permission,
- principals, location)
-
- # default deny if no ACE matches in the ACL found
- result = ACLDenied(None, acl, permission, principals, location)
- return result
-
- # default deny if no ACL in lineage at all
- return ACLDenied(None, None, permission, principals, context)
-
- def authenticated_userid(self, request):
- principals = self.get_principals(request)
- if principals:
- return principals[0]
-
- def effective_principals(self, request):
- effective_principals = [Everyone]
- principal_ids = self.get_principals(request)
-
- if principal_ids:
- effective_principals.append(Authenticated)
- effective_principals.extend(principal_ids)
-
- return effective_principals
-
- def principals_allowed_by_permission(self, context, permission):
- for location in lineage(context):
- try:
- acl = location.__acl__
- except AttributeError:
- continue
-
- allowed = {}
-
- for ace_action, ace_principal, ace_permissions in acl:
- if ace_action == Allow:
- if not hasattr(ace_permissions, '__iter__'):
- ace_permissions = [ace_permissions]
- if permission in ace_permissions:
- allowed[ace_principal] = True
- return sorted(allowed.keys())
-
+def view_execution_permitted(context, request, name=''):
+ """ If the view specified by ``context`` and ``name`` is protected
+ by a permission, check the permission associated with the view
+ using the effective authentication/authorization policies and the
+ ``request``. Return a boolean result. If no authentication
+ policy is in effect, or if the view is not protected by a
+ permission, return True."""
+ result = queryMultiAdapter((context, request), IViewPermission,
+ name=name, default=None)
+ if result is None:
+ return Allowed(
+ 'Allowed: view name %r in context %r (no permission defined)' %
+ (name, context))
+ return result
+
+def remember(context, request, principal, **kw):
+ """ Return a sequence of header tuples (e.g. ``[('Set-Cookie',
+ 'foo=abc')]``) suitable for 'remembering' a set of credentials
+ implied by the data passed as ``principal`` and ``*kw`` using the
+ current authentication policy. Common usage might look like so
+ within the body of a view function (``response`` is assumed to be
+ an WebOb-style response object computed previously by the view
+ code)::
+
+ from repoze.bfg.security import forget
+ headers = remember(context, request, 'chrism', password='123')
+ response.headerlist.extend(headers)
+ return response
+
+ If no authentication policy is in use, this function will always
+ return an empty sequence. If used, the composition and meaning of
+ ``**kw`` must be agreed upon by the calling code and the effective
+ authentication policy."""
+ policy = queryUtility(IAuthenticationPolicy)
+ if policy is None:
return []
-
-class InheritingACLSecurityPolicy(object):
- """ A security policy which uses ACLs in the following ways:
-
- - When checking whether a user is permitted (via the ``permits``
- method), the security policy consults the ``context`` for an ACL
- first. If no ACL exists on the context, or one does exist but
- the ACL does not explicitly allow or deny access for any of the
- effective principals, consult the context's parent ACL, and so
- on, until the lineage is exhausted or we determine that the
- policy permits or denies.
-
- During this processing, if any ``Deny`` ACE is found matching
- any effective principal, stop processing by returning an
- ``ACLDenied`` (equals False) immediately. If any ``Allow`` ACE
- is found matching any effective principal, stop processing by
- returning an ``ACLAllowed`` (equals True) immediately. If we
- exhaust the context's lneage, and no ACE has explicitly
- permitted or denied access, return an ``ACLDenied``. This
- differs from the non-inheriting security policy (the
- ``ACLSecurityPolicy``) by virtue of the fact that it does not
- stop looking for ACLs in the object lineage after it finds the
- first one.
-
- - When computing principals allowed by a permission via the
- ``principals_allowed_by_permission`` method, we compute the set
- of principals that are explicitly granted the ``permission``.
- We do this by walking 'up' the object graph *from the root* to
- the context. During this walking process, if we find an
- explicit ``Allow`` ACE for a principal that matches the
- ``permission``, the principal is included in the allow list.
- However, if later in the walking process that user is mentioned
- in any ``Deny`` ACE for the permission, the user is removed from
- the allow list. If a ``Deny`` to the principal ``Everyone`` is
- encountered during the walking process that matches the
- ``permission``, the allow list is cleared for all principals
- encountered in previous ACLs. The walking process ends after
- we've processed the any ACL directly attached to ``context``; a
- list of principals is returned.
-
- - Other aspects of this policy are the same as those in the
- ACLSecurityPolicy (e.g. ``effective_principals``,
- ``authenticated_userid``).
- """
- implements(ISecurityPolicy)
-
- def __init__(self, get_principals):
- self.get_principals = get_principals
-
- def permits(self, context, request, permission):
- """ Return ``ACLAllowed`` if the policy permits access,
- ``ACLDenied`` if not. """
- principals = set(self.effective_principals(request))
-
- for location in lineage(context):
- try:
- acl = location.__acl__
- except AttributeError:
- continue
-
- for ace in acl:
- ace_action, ace_principal, ace_permissions = ace
- if ace_principal in principals:
- if not hasattr(ace_permissions, '__iter__'):
- ace_permissions = [ace_permissions]
- if permission in ace_permissions:
- if ace_action == Allow:
- return ACLAllowed(ace, acl, permission,
- principals, location)
- else:
- return ACLDenied(ace, acl, permission,
- principals, location)
-
- # default deny if no ACL in lineage at all
- return ACLDenied(None, None, permission, principals, context)
-
- def authenticated_userid(self, request):
- principals = self.get_principals(request)
- if principals:
- return principals[0]
-
- def effective_principals(self, request):
- effective_principals = [Everyone]
- principal_ids = self.get_principals(request)
-
- if principal_ids:
- effective_principals.append(Authenticated)
- effective_principals.extend(principal_ids)
-
- return effective_principals
-
- def principals_allowed_by_permission(self, context, permission):
- allowed = set()
-
- for location in reversed(list(lineage(context))):
- # NB: we're walking *up* the object graph from the root
- try:
- acl = location.__acl__
- except AttributeError:
- continue
-
- allowed_here = set()
- denied_here = set()
-
- for ace_action, ace_principal, ace_permissions in acl:
- if not hasattr(ace_permissions, '__iter__'):
- ace_permissions = [ace_permissions]
- if ace_action == Allow and permission in ace_permissions:
- if not ace_principal in denied_here:
- allowed_here.add(ace_principal)
- if ace_action == Deny and permission in ace_permissions:
- denied_here.add(ace_principal)
- if ace_principal == Everyone:
- # clear the entire allowed set, as we've hit a
- # deny of Everyone ala (Deny, Everyone, ALL)
- allowed = set()
- break
- elif ace_principal in allowed:
- allowed.remove(ace_principal)
-
- allowed.update(allowed_here)
-
- return allowed
-
-def get_remoteuser(request):
- user_id = request.environ.get('REMOTE_USER')
- if user_id:
- return [user_id]
- return []
-
-def RemoteUserACLSecurityPolicy():
- """ A security policy which:
-
- - examines the request.environ for the REMOTE_USER variable and
- uses any non-false value as a principal id for this request.
-
- - uses an ACL-based authorization model which attempts to find the
- *first* ACL in the context' lineage. It returns ``Allowed`` from
- its 'permits' method if the single ACL found grants access to the
- current principal. It returns ``Denied`` if permission was not
- granted (either explicitly via a deny or implicitly by not finding
- a matching ACE action). The *first* ACL found in the context's
- lineage is considered canonical; no searching is done for other
- security attributes after the first ACL is found in the context'
- lineage. Use the 'inheriting' variant of this policy to consider
- more than one ACL in the lineage.
-
- An ACL is an ordered sequence of ACE tuples, e.g. ``[(Allow,
- Everyone, 'read'), (Deny, 'george', 'write')]``. ACLs stored on
- model instance objects as their ``__acl__`` attribute will be used
- by the security machinery to grant or deny access.
-
- Enable this security policy by adding the following to your
- application's ``configure.zcml``:
-
- .. code-block:: xml
-
- <utility
- provides="repoze.bfg.interfaces.ISecurityPolicy"
- factory="repoze.bfg.security.RemoteUserACLSecurityPolicy"
- />
-
- """
- return ACLSecurityPolicy(get_remoteuser)
-
-def RemoteUserInheritingACLSecurityPolicy():
- """ A security policy which:
-
- - examines the request.environ for the REMOTE_USER variable and
- uses any non-false value as a principal id for this request.
-
- - Differs from the non-inheriting security policy variants
- (e.g. ``ACLSecurityPolicy``) by virtue of the fact that it does
- not stop looking for ACLs in the object lineage after it finds
- the first one.
-
- - When checking whether a user is permitted (via the ``permits``
- method), the security policy consults the ``context`` for an ACL
- first. If no ACL exists on the context, or one does exist but
- the ACL does not explicitly allow or deny access for any of the
- effective principals, consult the context's parent ACL, and so
- on, until the lineage is exhausted or we determine that the
- policy permits or denies.
-
- During this processing, if any ``Deny`` ACE is found matching
- any effective principal, stop processing by returning an
- ``ACLDenied`` (equals False) immediately. If any ``Allow`` ACE
- is found matching any effective principal, stop processing by
- returning an ``ACLAllowed`` (equals True) immediately. If we
- exhaust the context's lneage, and no ACE has explicitly
- permitted or denied access, return an ``ACLDenied``.
-
- - When computing principals allowed by a permission via the
- ``principals_allowed_by_permission`` method, we compute the set
- of principals that are explicitly granted the ``permission``.
- We do this by walking 'up' the object graph *from the root* to
- the context. During this walking process, if we find an
- explicit ``Allow`` ACE for a principal that matches the
- ``permission``, the principal is included in the allow list.
- However, if later in the walking process that user is mentioned
- in any ``Deny`` ACE for the permission, the user is removed from
- the allow list. If a ``Deny`` to the principal ``Everyone`` is
- encountered during the walking process that matches the
- ``permission``, the allow list is cleared for all principals
- encountered in previous ACLs. The walking process ends after
- we've processed the any ACL directly attached to ``context``; a
- list of principals is returned.
-
- - Other aspects of this policy are the same as those in the
- ACLSecurityPolicy (e.g. ``effective_principals``,
- ``authenticated_userid``).
-
- Enable this security policy by adding the following to your
- application's ``configure.zcml``:
-
- .. code-block:: xml
-
- <utility
- provides="repoze.bfg.interfaces.ISecurityPolicy"
- factory="repoze.bfg.security.RemoteUserInheritingACLSecurityPolicy"
- />
-
- """
- return InheritingACLSecurityPolicy(get_remoteuser)
-
-def get_who_principals(request):
- identity = request.environ.get('repoze.who.identity')
- if not identity:
+ else:
+ return policy.remember(context, request, principal, **kw)
+
+def forget(context, request):
+ """ Return a sequence of header tuples (e.g. ``[('Set-Cookie',
+ 'foo=abc')]``) suitable for 'forgetting' the set of credentials
+ possessed by the currently authenticated user. A common usage
+ might look like so within the body of a view function
+ (``response`` is assumed to be an WebOb-style response object
+ computed previously by the view code)::
+
+ from repoze.bfg.security import forget
+ headers = forget(context, request)
+ response.headerlist.extend(headers)
+ return response
+
+ If no authentication policy is in use, this function will always
+ return an empty sequence."""
+ policy = queryUtility(IAuthenticationPolicy)
+ if policy is None:
return []
- principals = [identity['repoze.who.userid']]
- principals.extend(identity.get('groups', []))
- return principals
-
-def WhoACLSecurityPolicy():
- """
- A security policy which:
-
- - examines the request.environ for the ``repoze.who.identity``
- dictionary. If one is found, the principal ids for the request
- are composed of ``repoze.who.identity['repoze.who.userid']``
- plus ``repoze.who.identity.get('groups', [])``.
-
- - uses an ACL-based authorization model which attempts to find the
- *first* ACL in the context' lineage. It returns ``Allowed`` from
- its 'permits' method if the single ACL found grants access to the
- current principal. It returns ``Denied`` if permission was not
- granted (either explicitly via a deny or implicitly by not finding
- a matching ACE action). The *first* ACL found in the context's
- lineage is considered canonical; no searching is done for other
- security attributes after the first ACL is found in the context'
- lineage. Use the 'inheriting' variant of this policy to consider
- more than one ACL in the lineage.
-
- An ACL is an ordered sequence of ACE tuples, e.g. ``[(Allow,
- Everyone, 'read'), (Deny, 'george', 'write')]``. ACLs stored on
- model instance objects as their ``__acl__`` attribute will be used
- by the security machinery to grant or deny access.
-
- Enable this security policy by adding the following to your
- application's ``configure.zcml``:
-
- .. code-block:: xml
-
- <utility
- provides="repoze.bfg.interfaces.ISecurityPolicy"
- factory="repoze.bfg.security.WhoACLSecurityPolicy"
- />
- """
- return ACLSecurityPolicy(get_who_principals)
-
-RepozeWhoIdentityACLSecurityPolicy = WhoACLSecurityPolicy
-
-deprecated('RepozeWhoIdentityACLSecurityPolicy',
- '(repoze.bfg.security.RepozeWhoIdentityACLSecurityPolicy '
- 'should now be imported as '
- 'repoze.bfg.security.WhoACLSecurityPolicy)',
- )
-
-def WhoInheritingACLSecurityPolicy():
- """ A security policy which:
-
- - examines the request.environ for the ``repoze.who.identity``
- dictionary. If one is found, the principal ids for the request
- are composed of ``repoze.who.identity['repoze.who.userid']``
- plus ``repoze.who.identity.get('groups', [])``.
-
- - Differs from the non-inheriting security policy variants
- (e.g. ``ACLSecurityPolicy``) by virtue of the fact that it does
- not stop looking for ACLs in the object lineage after it finds
- the first one.
-
- - When checking whether a user is permitted (via the ``permits``
- method), the security policy consults the ``context`` for an ACL
- first. If no ACL exists on the context, or one does exist but
- the ACL does not explicitly allow or deny access for any of the
- effective principals, consult the context's parent ACL, and so
- on, until the lineage is exhausted or we determine that the
- policy permits or denies.
-
- During this processing, if any ``Deny`` ACE is found matching
- any effective principal, stop processing by returning an
- ``ACLDenied`` (equals False) immediately. If any ``Allow`` ACE
- is found matching any effective principal, stop processing by
- returning an ``ACLAllowed`` (equals True) immediately. If we
- exhaust the context's lneage, and no ACE has explicitly
- permitted or denied access, return an ``ACLDenied``.
-
- - When computing principals allowed by a permission via the
- ``principals_allowed_by_permission`` method, we compute the set
- of principals that are explicitly granted the ``permission``.
- We do this by walking 'up' the object graph *from the root* to
- the context. During this walking process, if we find an
- explicit ``Allow`` ACE for a principal that matches the
- ``permission``, the principal is included in the allow list.
- However, if later in the walking process that user is mentioned
- in any ``Deny`` ACE for the permission, the user is removed from
- the allow list. If a ``Deny`` to the principal ``Everyone`` is
- encountered during the walking process that matches the
- ``permission``, the allow list is cleared for all principals
- encountered in previous ACLs. The walking process ends after
- we've processed the any ACL directly attached to ``context``; a
- list of principals is returned.
-
- - Other aspects of this policy are the same as those in the
- ACLSecurityPolicy (e.g. ``effective_principals``,
- ``authenticated_userid``).
-
- Enable this security policy by adding the following to your
- application's ``configure.zcml``:
-
- .. code-block:: xml
-
- <utility
- provides="repoze.bfg.interfaces.ISecurityPolicy"
- factory="repoze.bfg.security.WhoInheritingACLSecurityPolicy"
- />
- """
- return InheritingACLSecurityPolicy(get_who_principals)
-
+ else:
+ return policy.forget(context, request)
+
class PermitsResult(int):
def __new__(cls, s, *args):
inst = int.__new__(cls, cls.boolval)
@@ -503,17 +201,18 @@ class PermitsResult(int):
self.msg)
class Denied(PermitsResult):
- """ An instance of ``Denied`` is returned when a security policy
- or other ``repoze.bfg`` code denies an action unlrelated to an ACL
- check. It evaluates equal to all boolean false types. It has an
- attribute named ``msg`` describing the circumstances for the deny."""
+ """ An instance of ``Denied`` is returned when a security-related
+ API or other ``repoze.bfg`` code denies an action unlrelated to an
+ ACL check. It evaluates equal to all boolean false types. It has
+ an attribute named ``msg`` describing the circumstances for the
+ deny."""
boolval = 0
class Allowed(PermitsResult):
- """ An instance of ``Allowed`` is returned when a security policy
- or other ``repoze.bfg`` code allows an action unrelated to an ACL
- check. It evaluates equal to all boolean true types. It has an
- attribute named ``msg`` describing the circumstances for the
+ """ An instance of ``Allowed`` is returned when a security-related
+ API or other ``repoze.bfg`` code allows an action unrelated to an
+ ACL check. It evaluates equal to all boolean true types. It has
+ an attribute named ``msg`` describing the circumstances for the
allow."""
boolval = 1
@@ -566,35 +265,25 @@ class ACLAllowed(ACLPermitsResult):
as the ``msg`` attribute."""
boolval = 1
-class ViewPermission(object):
- implements(IViewPermission)
- def __init__(self, context, request, permission_name):
- self.context = context
- self.request = request
- self.permission_name = permission_name
-
- def __call__(self, security_policy, debug_info=None):
- return security_policy.permits(self.context,
- self.request,
- self.permission_name)
-
- def __repr__(self):
- view_name = getattr(self.request, 'view_name', None)
- return '<Permission at %s named %r for %r>' % (id(self),
- self.permission_name,
- view_name)
-
class ViewPermissionFactory(object):
implements(IViewPermissionFactory)
+
def __init__(self, permission_name):
self.permission_name = permission_name
def __call__(self, context, request):
- return ViewPermission(context, request, self.permission_name)
-
+ return has_permission(self.permission_name, context, request)
+
class Unauthorized(Exception):
pass
+# BBB imports: these must come at the end of the file, as there's a
+# circular dependency between secpols and security
+from repoze.bfg.secpols import ACLSecurityPolicy
+from repoze.bfg.secpols import InheritingACLSecurityPolicy
+from repoze.bfg.secpols import RemoteUserACLSecurityPolicy
+from repoze.bfg.secpols import RemoteUserInheritingACLSecurityPolicy
+from repoze.bfg.secpols import WhoACLSecurityPolicy
+from repoze.bfg.secpols import WhoInheritingACLSecurityPolicy
+# /BBB imports
-
-
diff --git a/repoze/bfg/testing.py b/repoze/bfg/testing.py
index d4ff6fe4d..bd3104a9d 100644
--- a/repoze/bfg/testing.py
+++ b/repoze/bfg/testing.py
@@ -8,22 +8,23 @@ from repoze.bfg.interfaces import IRequest
_marker = []
def registerDummySecurityPolicy(userid=None, groupids=(), permissive=True):
- """ Registers a dummy ``repoze.bfg`` security policy using the
- userid ``userid`` and the group ids ``groupids``. If
- ``permissive`` is true, a 'permissive' security policy is
- registered; this policy allows all access. If ``permissive`` is
- false, a nonpermissive security policy is registered; this policy
- denies all access. This function is most useful when testing code
- that uses the ``repoze.bfg.security`` APIs named
- ``has_permission``, ``authenticated_userid``,
- ``effective_principals``, and ``principals_allowed_by_permission``.
+ """ Registers a dummy ``repoze.bfg`` security policy (actually, a
+ pair of policies: an authorization policy and an authentication
+ policy) using the userid ``userid`` and the group ids
+ ``groupids``. If ``permissive`` is true, a 'permissive' security
+ policy is registered; this policy allows all access. If
+ ``permissive`` is false, a nonpermissive security policy is
+ registered; this policy denies all access. This function is most
+ useful when testing code that uses the ``repoze.bfg.security``
+ APIs named ``has_permission``, ``authenticated_userid``,
+ ``effective_principals``, ``principals_allowed_by_permission``,
+ and so on.
"""
- if permissive:
- policy = DummyAllowingSecurityPolicy(userid, groupids)
- else:
- policy = DummyDenyingSecurityPolicy(userid, groupids)
- from repoze.bfg.interfaces import ISecurityPolicy
- return registerUtility(policy, ISecurityPolicy)
+ policy = DummySecurityPolicy(userid, groupids, permissive)
+ from repoze.bfg.interfaces import IAuthorizationPolicy
+ from repoze.bfg.interfaces import IAuthenticationPolicy
+ registerUtility(policy, IAuthorizationPolicy)
+ registerUtility(policy, IAuthenticationPolicy)
def registerModels(models):
""" Registers a dictionary of models. This is most useful for
@@ -76,7 +77,9 @@ def registerView(name, result='', view=None, for_=(Interface, Interface)):
with code that wants to call,
e.g. ``repoze.bfg.view.render_view_to_response``."""
if view is None:
- view = make_view(result)
+ def view(context, request):
+ from webob import Response
+ return Response(result)
from repoze.bfg.interfaces import IView
return registerAdapter(view, for_, IView, name)
@@ -101,7 +104,8 @@ def registerViewPermission(name, result=True, viewpermission=None,
else:
result = Denied('message')
if viewpermission is None:
- viewpermission = make_view_permission(result)
+ def viewpermission(context, request):
+ return result
from repoze.bfg.interfaces import IViewPermission
return registerAdapter(viewpermission, for_, IViewPermission, name)
@@ -165,15 +169,17 @@ def registerTraverserFactory(traverser, for_=Interface):
from repoze.bfg.interfaces import ITraverserFactory
return registerAdapter(traverser, for_, ITraverserFactory)
-class _DummySecurityPolicy:
- def __init__(self, userid=None, groupids=()):
+class DummySecurityPolicy:
+ """ A standin for both an IAuthentication and IAuthorization policy """
+ def __init__(self, userid=None, groupids=(), permissive=True):
self.userid = userid
self.groupids = groupids
+ self.permissive = permissive
- def authenticated_userid(self, request):
+ def authenticated_userid(self, context, request):
return self.userid
- def effective_principals(self, request):
+ def effective_principals(self, context, request):
from repoze.bfg.security import Everyone
from repoze.bfg.security import Authenticated
effective_principals = [Everyone]
@@ -183,19 +189,17 @@ class _DummySecurityPolicy:
effective_principals.extend(self.groupids)
return effective_principals
-class DummyAllowingSecurityPolicy(_DummySecurityPolicy):
- def permits(self, context, request, permission):
- return True
+ def remember(self, context, request, principal, **kw):
+ return []
- def principals_allowed_by_permission(self, context, permission):
- return self.effective_principals(None)
+ def forget(self, context, request):
+ return []
-class DummyDenyingSecurityPolicy(_DummySecurityPolicy):
- def permits(self, context, request, permission):
- return False
+ def permits(self, context, principals, permission):
+ return self.permissive
def principals_allowed_by_permission(self, context, permission):
- return []
+ return self.effective_principals(None, None)
def make_traverser_factory(root):
class DummyTraverserFactory:
@@ -258,23 +262,6 @@ class DummyTemplateRenderer:
v, k, myval))
return True
-def make_view(result):
- def dummy_view(context, request):
- from webob import Response
- return Response(result)
- return dummy_view
-
-def make_view_permission(result):
- class DummyViewPermission:
- def __init__(self, context, request):
- self.context = context
- self.request = request
-
- def __call__(self, secpol):
- return result
-
- return DummyViewPermission
-
class DummyModel:
""" A dummy :mod:`repoze.bfg` model object. The value of ``name``
to the constructor will be used as the ``__name__`` attribute of
@@ -432,8 +419,8 @@ def cleanUp():
from zope.component.globalregistry import base
from zope.configuration.xmlconfig import _clearContext
from repoze.bfg.registry import original_getSiteManager
-from repoze.bfg.registry import registry_manager
+from repoze.bfg.threadlocal import manager
addCleanUp(original_getSiteManager.reset)
-addCleanUp(registry_manager.clear)
+addCleanUp(manager.clear)
addCleanUp(lambda: base.__init__('base'))
addCleanUp(_clearContext)
diff --git a/repoze/bfg/tests/test_authentication.py b/repoze/bfg/tests/test_authentication.py
new file mode 100644
index 000000000..a23ffeac2
--- /dev/null
+++ b/repoze/bfg/tests/test_authentication.py
@@ -0,0 +1,176 @@
+import unittest
+
+class TestRepozeWho1AuthenticationPolicy(unittest.TestCase):
+ def _getTargetClass(self):
+ from repoze.bfg.authentication import RepozeWho1AuthenticationPolicy
+ return RepozeWho1AuthenticationPolicy
+
+ def _makeOne(self):
+ return self._getTargetClass()()
+
+ def test_class_implements_IAuthenticationPolicy(self):
+ from zope.interface.verify import verifyClass
+ from repoze.bfg.interfaces import IAuthenticationPolicy
+ verifyClass(IAuthenticationPolicy, self._getTargetClass())
+
+ def test_instance_implements_IAuthenticationPolicy(self):
+ from zope.interface.verify import verifyObject
+ from repoze.bfg.interfaces import IAuthenticationPolicy
+ verifyObject(IAuthenticationPolicy, self._makeOne())
+
+ def test_authenticated_userid_None(self):
+ context = DummyContext()
+ request = DummyRequest({})
+ policy = self._makeOne()
+ self.assertEqual(policy.authenticated_userid(context, request), None)
+
+ def test_authenticated_userid(self):
+ context = DummyContext()
+ request = DummyRequest(
+ {'repoze.who.identity':{'repoze.who.userid':'fred'}})
+ policy = self._makeOne()
+ self.assertEqual(policy.authenticated_userid(context, request), 'fred')
+
+ def test_effective_principals_None(self):
+ from repoze.bfg.security import Everyone
+ context = DummyContext()
+ request = DummyRequest({})
+ policy = self._makeOne()
+ self.assertEqual(policy.effective_principals(context, request),
+ [Everyone])
+
+ def test_effective_principals_userid_only(self):
+ from repoze.bfg.security import Everyone
+ from repoze.bfg.security import Authenticated
+ context = DummyContext()
+ request = DummyRequest(
+ {'repoze.who.identity':{'repoze.who.userid':'fred'}})
+ policy = self._makeOne()
+ self.assertEqual(policy.effective_principals(context, request),
+ [Everyone, Authenticated, 'fred'])
+
+ def test_effective_principals_userid_and_groups(self):
+ from repoze.bfg.security import Everyone
+ from repoze.bfg.security import Authenticated
+ context = DummyContext()
+ request = DummyRequest(
+ {'repoze.who.identity':{'repoze.who.userid':'fred',
+ 'groups':['quux', 'biz']}})
+ policy = self._makeOne()
+ self.assertEqual(policy.effective_principals(context, request),
+ [Everyone, Authenticated, 'fred', 'quux', 'biz'])
+
+ def test_remember_no_plugins(self):
+ context = DummyContext()
+ authtkt = DummyPlugin()
+ request = DummyRequest({})
+ policy = self._makeOne()
+ result = policy.remember(context, request, 'fred')
+ self.assertEqual(result, [])
+
+ def test_remember(self):
+ context = DummyContext()
+ authtkt = DummyPlugin()
+ request = DummyRequest(
+ {'repoze.who.plugins':{'auth_tkt':authtkt}})
+ policy = self._makeOne()
+ result = policy.remember(context, request, 'fred')
+ self.assertEqual(result[0], request.environ)
+ self.assertEqual(result[1], {'repoze.who.userid':'fred'})
+
+ def test_forget_no_plugins(self):
+ context = DummyContext()
+ authtkt = DummyPlugin()
+ request = DummyRequest({})
+ policy = self._makeOne()
+ result = policy.forget(context, request)
+ self.assertEqual(result, [])
+
+ def test_forget(self):
+ context = DummyContext()
+ authtkt = DummyPlugin()
+ request = DummyRequest(
+ {'repoze.who.plugins':{'auth_tkt':authtkt},
+ 'repoze.who.identity':{'repoze.who.userid':'fred'},
+ })
+ policy = self._makeOne()
+ result = policy.forget(context, request)
+ self.assertEqual(result[0], request.environ)
+ self.assertEqual(result[1], request.environ['repoze.who.identity'])
+
+class TestRemoteUserAuthenticationPolicy(unittest.TestCase):
+ def _getTargetClass(self):
+ from repoze.bfg.authentication import RemoteUserAuthenticationPolicy
+ return RemoteUserAuthenticationPolicy
+
+ def _makeOne(self):
+ return self._getTargetClass()()
+
+ def test_class_implements_IAuthenticationPolicy(self):
+ from zope.interface.verify import verifyClass
+ from repoze.bfg.interfaces import IAuthenticationPolicy
+ verifyClass(IAuthenticationPolicy, self._getTargetClass())
+
+ def test_instance_implements_IAuthenticationPolicy(self):
+ from zope.interface.verify import verifyObject
+ from repoze.bfg.interfaces import IAuthenticationPolicy
+ verifyObject(IAuthenticationPolicy, self._makeOne())
+
+ def test_authenticated_userid_None(self):
+ context = DummyContext()
+ request = DummyRequest({})
+ policy = self._makeOne()
+ self.assertEqual(policy.authenticated_userid(context, request), None)
+
+ def test_authenticated_userid(self):
+ context = DummyContext()
+ request = DummyRequest({'REMOTE_USER':'fred'})
+ policy = self._makeOne()
+ self.assertEqual(policy.authenticated_userid(context, request), 'fred')
+
+ def test_effective_principals_None(self):
+ from repoze.bfg.security import Everyone
+ context = DummyContext()
+ request = DummyRequest({})
+ policy = self._makeOne()
+ self.assertEqual(policy.effective_principals(context, request),
+ [Everyone])
+
+ def test_effective_principals(self):
+ from repoze.bfg.security import Everyone
+ from repoze.bfg.security import Authenticated
+ context = DummyContext()
+ request = DummyRequest({'REMOTE_USER':'fred'})
+ policy = self._makeOne()
+ self.assertEqual(policy.effective_principals(context, request),
+ [Everyone, Authenticated, 'fred'])
+
+ def test_remember(self):
+ context = DummyContext()
+ authtkt = DummyPlugin()
+ request = DummyRequest({'REMOTE_USER':'fred'})
+ policy = self._makeOne()
+ result = policy.remember(context, request, 'fred')
+ self.assertEqual(result, [])
+
+ def test_forget(self):
+ context = DummyContext()
+ authtkt = DummyPlugin()
+ request = DummyRequest({'REMOTE_USER':'fred'})
+ policy = self._makeOne()
+ result = policy.forget(context, request)
+ self.assertEqual(result, [])
+
+class DummyContext:
+ pass
+
+class DummyRequest:
+ def __init__(self, environ):
+ self.environ = environ
+
+class DummyPlugin:
+ def remember(self, environ, identity):
+ return environ, identity
+
+ def forget(self, environ, identity):
+ return environ, identity
diff --git a/repoze/bfg/tests/test_authorization.py b/repoze/bfg/tests/test_authorization.py
new file mode 100644
index 000000000..8aa9b9abf
--- /dev/null
+++ b/repoze/bfg/tests/test_authorization.py
@@ -0,0 +1,179 @@
+import unittest
+
+from repoze.bfg.testing import cleanUp
+
+class TestACLAuthorizationPolicy(unittest.TestCase):
+ def setUp(self):
+ cleanUp()
+
+ def tearDown(self):
+ cleanUp()
+
+ def _getTargetClass(self):
+ from repoze.bfg.authorization import ACLAuthorizationPolicy
+ return ACLAuthorizationPolicy
+
+ def _makeOne(self):
+ return self._getTargetClass()()
+
+ def test_class_implements_IAuthorizationPolicy(self):
+ from zope.interface.verify import verifyClass
+ from repoze.bfg.interfaces import IAuthorizationPolicy
+ verifyClass(IAuthorizationPolicy, self._getTargetClass())
+
+ def test_instance_implements_IAuthorizationPolicy(self):
+ from zope.interface.verify import verifyObject
+ from repoze.bfg.interfaces import IAuthorizationPolicy
+ verifyObject(IAuthorizationPolicy, self._makeOne())
+
+ def test_permits_no_acl(self):
+ context = DummyContext()
+ policy = self._makeOne()
+ self.assertEqual(policy.permits(context, [], 'view'), False)
+
+ def test_permits(self):
+ from repoze.bfg.security import Deny
+ from repoze.bfg.security import Allow
+ from repoze.bfg.security import Everyone
+ from repoze.bfg.security import Authenticated
+ from repoze.bfg.security import ALL_PERMISSIONS
+ from repoze.bfg.security import DENY_ALL
+ root = DummyContext()
+ community = DummyContext(__name__='community', __parent__=root)
+ blog = DummyContext(__name__='blog', __parent__=community)
+ root.__acl__ = [
+ (Allow, Authenticated, VIEW),
+ ]
+ community.__acl__ = [
+ (Allow, 'fred', ALL_PERMISSIONS),
+ (Allow, 'wilma', VIEW),
+ DENY_ALL,
+ ]
+ blog.__acl__ = [
+ (Allow, 'barney', MEMBER_PERMS),
+ (Allow, 'wilma', VIEW),
+ ]
+
+ policy = self._makeOne()
+
+ result = policy.permits(blog, [Everyone, Authenticated, 'wilma'],
+ 'view')
+ self.assertEqual(result, True)
+ self.assertEqual(result.context, blog)
+ self.assertEqual(result.ace, (Allow, 'wilma', VIEW))
+
+ result = policy.permits(blog, [Everyone, Authenticated, 'wilma'],
+ 'delete')
+ self.assertEqual(result, False)
+ self.assertEqual(result.context, community)
+ self.assertEqual(result.ace, (Deny, Everyone, ALL_PERMISSIONS))
+
+ result = policy.permits(blog, [Everyone, Authenticated, 'fred'], 'view')
+ self.assertEqual(result, True)
+ self.assertEqual(result.context, community)
+ self.assertEqual(result.ace, (Allow, 'fred', ALL_PERMISSIONS))
+ result = policy.permits(blog, [Everyone, Authenticated, 'fred'],
+ 'doesntevenexistyet')
+ self.assertEqual(result, True)
+ self.assertEqual(result.context, community)
+ self.assertEqual(result.ace, (Allow, 'fred', ALL_PERMISSIONS))
+
+ result = policy.permits(blog, [Everyone, Authenticated, 'barney'],
+ 'view')
+ self.assertEqual(result, True)
+ self.assertEqual(result.context, blog)
+ self.assertEqual(result.ace, (Allow, 'barney', MEMBER_PERMS))
+ result = policy.permits(blog, [Everyone, Authenticated, 'barney'],
+ 'administer')
+ self.assertEqual(result, False)
+ self.assertEqual(result.context, community)
+ self.assertEqual(result.ace, (Deny, Everyone, ALL_PERMISSIONS))
+
+ result = policy.permits(root, [Everyone, Authenticated, 'someguy'],
+ 'view')
+ self.assertEqual(result, True)
+ self.assertEqual(result.context, root)
+ self.assertEqual(result.ace, (Allow, Authenticated, VIEW))
+ result = policy.permits(blog,
+ [Everyone, Authenticated, 'someguy'], 'view')
+ self.assertEqual(result, False)
+ self.assertEqual(result.context, community)
+ self.assertEqual(result.ace, (Deny, Everyone, ALL_PERMISSIONS))
+
+ result = policy.permits(root, [Everyone], 'view')
+ self.assertEqual(result, False)
+ self.assertEqual(result.context, root)
+ self.assertEqual(result.ace, None)
+
+ context = DummyContext()
+ result = policy.permits(context, [Everyone], 'view')
+ self.assertEqual(result, False)
+
+ def test_principals_allowed_by_permission_direct(self):
+ from repoze.bfg.security import Allow
+ from repoze.bfg.security import DENY_ALL
+ context = DummyContext()
+ acl = [ (Allow, 'chrism', ('read', 'write')),
+ DENY_ALL,
+ (Allow, 'other', 'read') ]
+ context.__acl__ = acl
+ policy = self._makeOne()
+ result = sorted(
+ policy.principals_allowed_by_permission(context, 'read'))
+ self.assertEqual(result, ['chrism'])
+
+ def test_principals_allowed_by_permission(self):
+ from repoze.bfg.security import Allow
+ from repoze.bfg.security import Deny
+ from repoze.bfg.security import DENY_ALL
+ from repoze.bfg.security import ALL_PERMISSIONS
+ root = DummyContext(__name__='', __parent__=None)
+ community = DummyContext(__name__='community', __parent__=root)
+ blog = DummyContext(__name__='blog', __parent__=community)
+ root.__acl__ = [ (Allow, 'chrism', ('read', 'write')),
+ (Allow, 'other', ('read',)),
+ (Allow, 'jim', ALL_PERMISSIONS)]
+ community.__acl__ = [ (Deny, 'flooz', 'read'),
+ (Allow, 'flooz', 'read'),
+ (Allow, 'mork', 'read'),
+ (Deny, 'jim', 'read'),
+ (Allow, 'someguy', 'manage')]
+ blog.__acl__ = [ (Allow, 'fred', 'read'),
+ DENY_ALL]
+
+ policy = self._makeOne()
+
+ result = sorted(policy.principals_allowed_by_permission(blog, 'read'))
+ self.assertEqual(result, ['fred'])
+ result = sorted(policy.principals_allowed_by_permission(community,
+ 'read'))
+ self.assertEqual(result, ['chrism', 'mork', 'other'])
+ result = sorted(policy.principals_allowed_by_permission(community,
+ 'read'))
+ result = sorted(policy.principals_allowed_by_permission(root, 'read'))
+ self.assertEqual(result, ['chrism', 'jim', 'other'])
+
+ def test_principals_allowed_by_permission_no_acls(self):
+ context = DummyContext()
+ policy = self._makeOne()
+ result = sorted(policy.principals_allowed_by_permission(context,'read'))
+ self.assertEqual(result, [])
+
+class DummyContext:
+ def __init__(self, *arg, **kw):
+ self.__dict__.update(kw)
+
+
+VIEW = 'view'
+EDIT = 'edit'
+CREATE = 'create'
+DELETE = 'delete'
+MODERATE = 'moderate'
+ADMINISTER = 'administer'
+COMMENT = 'comment'
+
+GUEST_PERMS = (VIEW, COMMENT)
+MEMBER_PERMS = GUEST_PERMS + (EDIT, CREATE, DELETE)
+MODERATOR_PERMS = MEMBER_PERMS + (MODERATE,)
+ADMINISTRATOR_PERMS = MODERATOR_PERMS + (ADMINISTER,)
+
diff --git a/repoze/bfg/tests/test_integration.py b/repoze/bfg/tests/test_integration.py
index fdb85b87e..3630a2778 100644
--- a/repoze/bfg/tests/test_integration.py
+++ b/repoze/bfg/tests/test_integration.py
@@ -120,7 +120,7 @@ class TestFixtureApp(unittest.TestCase):
def tearDown(self):
cleanUp()
- def test_registry_actions_can_be_pickled_and_unpickled(self):
+ def test_execute_actions(self):
import repoze.bfg.tests.fixtureapp as package
from zope.configuration import config
from zope.configuration import xmlconfig
@@ -129,11 +129,6 @@ class TestFixtureApp(unittest.TestCase):
context.package = package
xmlconfig.include(context, 'configure.zcml', package)
context.execute_actions(clear=False)
- actions = context.actions
- import cPickle
- dumped = cPickle.dumps(actions, -1)
- new = cPickle.loads(dumped)
- self.assertEqual(len(actions), len(new))
class TestGrokkedApp(unittest.TestCase):
def setUp(self):
diff --git a/repoze/bfg/tests/test_registry.py b/repoze/bfg/tests/test_registry.py
index 77031806f..d145e7700 100644
--- a/repoze/bfg/tests/test_registry.py
+++ b/repoze/bfg/tests/test_registry.py
@@ -53,10 +53,10 @@ class TestPopulateRegistry(unittest.TestCase):
def test_it(self):
from repoze.bfg.tests import fixtureapp
dummylock = DummyLock()
- dummyregmgr = DummyRegistrationManager()
- import repoze.bfg.registry
+ dummyregmgr = DummyThreadLocalManager({'registry':None})
+ import repoze.bfg.threadlocal
try:
- old = repoze.bfg.registry.setRegistryManager(dummyregmgr)
+ old = repoze.bfg.threadlocal.setManager(dummyregmgr)
from zope.component.registry import Components
registry = Components('hello')
self._callFUT(registry,
@@ -65,49 +65,9 @@ class TestPopulateRegistry(unittest.TestCase):
lock=dummylock)
self.assertEqual(dummylock.acquired, True)
self.assertEqual(dummylock.released, True)
- self.assertEqual(dummyregmgr.registry, registry)
+ self.assertEqual(dummyregmgr.data['registry'], None)
finally:
- repoze.bfg.registry.setRegistryManager(old)
-
-class TestThreadLocalRegistryManager(unittest.TestCase):
- def setUp(self):
- cleanUp()
-
- def tearDown(self):
- cleanUp()
-
- def _getTargetClass(self):
- from repoze.bfg.registry import ThreadLocalRegistryManager
- return ThreadLocalRegistryManager
-
- def _makeOne(self):
- return self._getTargetClass()()
-
- def test_init(self):
- local = self._makeOne()
- from zope.component import getGlobalSiteManager
- self.assertEqual(local.stack, [])
- self.assertEqual(local.get(), getGlobalSiteManager())
-
- def test_push_and_pop(self):
- local = self._makeOne()
- from zope.component import getGlobalSiteManager
- local.push(True)
- self.assertEqual(local.get(), True)
- self.assertEqual(local.pop(), True)
- self.assertEqual(local.pop(), None)
- self.assertEqual(local.get(), getGlobalSiteManager())
-
- def test_set_get_and_clear(self):
- local = self._makeOne()
- from zope.component import getGlobalSiteManager
- local.set(None)
- self.assertEqual(local.stack, [None])
- self.assertEqual(local.get(), None)
- local.clear()
- self.assertEqual(local.get(), getGlobalSiteManager())
- local.clear()
- self.assertEqual(local.get(), getGlobalSiteManager())
+ repoze.bfg.threadlocal.setManager(old)
class GetSiteManagerTests(unittest.TestCase):
def _callFUT(self, context=None):
@@ -122,16 +82,10 @@ class GetSiteManagerTests(unittest.TestCase):
from zope.component.interfaces import ComponentLookupError
self.assertRaises(ComponentLookupError, self._callFUT, object)
-class DummyRegistrationManager:
- def push(self, registry):
- self.registry = registry
-
- def pop(self):
- self.popped = True
-
- def get(self):
- return self.registry
-
+class DummyThreadLocalManager:
+ def __init__(self, data):
+ self.data = data
+
class DummyLock:
def acquire(self):
self.acquired = True
diff --git a/repoze/bfg/tests/test_router.py b/repoze/bfg/tests/test_router.py
index db47f832e..9a29967a3 100644
--- a/repoze/bfg/tests/test_router.py
+++ b/repoze/bfg/tests/test_router.py
@@ -16,14 +16,7 @@ class RouterTests(unittest.TestCase):
def _registerLogger(self):
from repoze.bfg.interfaces import ILogger
- class Logger:
- def __init__(self):
- self.messages = []
- def info(self, msg):
- self.messages.append(msg)
- warn = info
- debug = info
- logger = Logger()
+ logger = DummyLogger()
self.registry.registerUtility(logger, ILogger, name='repoze.bfg.debug')
return logger
@@ -38,6 +31,12 @@ class RouterTests(unittest.TestCase):
settings = Settings(**defaultkw)
self.registry.registerUtility(settings, ISettings)
+ def _registerAuthenticationPolicy(self):
+ from repoze.bfg.interfaces import IAuthenticationPolicy
+ policy = DummyAuthenticationPolicy()
+ self.registry.registerUtility(policy, IAuthenticationPolicy)
+ return policy
+
def _registerTraverserFactory(self, context, view_name='', subpath=None,
traversed=None, virtual_root=None,
virtual_root_path=None, **kw):
@@ -74,13 +73,19 @@ class RouterTests(unittest.TestCase):
from repoze.bfg.interfaces import IView
self.registry.registerAdapter(app, for_, IView, name)
- def _registerPermission(self, permission, name, *for_):
+ def _registerViewPermission(self, view_name, allow=True):
+ from zope.interface import Interface
from repoze.bfg.interfaces import IViewPermission
- self.registry.registerAdapter(permission, for_, IViewPermission, name)
-
- def _registerSecurityPolicy(self, secpol):
- from repoze.bfg.interfaces import ISecurityPolicy
- self.registry.registerUtility(secpol, ISecurityPolicy)
+ class Checker(object):
+ def __call__(self, context, request):
+ self.context = context
+ self.request = request
+ return allow
+ checker = Checker()
+ self.registry.registerAdapter(checker, (Interface, Interface),
+ IViewPermission,
+ view_name)
+ return checker
def _registerEventListener(self, iface):
L = []
@@ -89,9 +94,11 @@ class RouterTests(unittest.TestCase):
self.registry.registerHandler(listener, (iface,))
return L
- def _registerRootFactory(self, root_factory):
+ def _registerRootFactory(self, val):
+ rootfactory = make_rootfactory(val)
from repoze.bfg.interfaces import IRootFactory
- self.registry.registerUtility(root_factory, IRootFactory)
+ self.registry.registerUtility(rootfactory, IRootFactory)
+ return rootfactory
def _getTargetClass(self):
from repoze.bfg.router import Router
@@ -113,21 +120,63 @@ class RouterTests(unittest.TestCase):
return environ
def test_root_policy(self):
- rootfactory = make_rootfactory(None)
environ = self._makeEnviron()
context = DummyContext()
self._registerTraverserFactory(context)
- self._registerRootFactory(rootfactory)
+ rootfactory = self._registerRootFactory(None)
router = self._makeOne()
self.assertEqual(router.root_policy, rootfactory)
+ def test_inotfound_appfactory_override(self):
+ from repoze.bfg.interfaces import INotFoundAppFactory
+ def app():
+ """ """
+ self.registry.registerUtility(app, INotFoundAppFactory)
+ self._registerRootFactory(None)
+ router = self._makeOne()
+ self.assertEqual(router.notfound_app_factory, app)
+
+ def test_iforbidden_responsefactory_override(self):
+ from repoze.bfg.interfaces import IForbiddenResponseFactory
+ def app():
+ """ """
+ self.registry.registerUtility(app, IForbiddenResponseFactory)
+ self._registerRootFactory(None)
+ router = self._makeOne()
+ self.assertEqual(router.forbidden_resp_factory, app)
+
+ def test_iforbidden_responsefactory_nooverride(self):
+ context = DummyContext()
+ self._registerRootFactory(None)
+ router = self._makeOne()
+ from repoze.bfg.router import default_forbidden_view
+ self.assertEqual(router.forbidden_resp_factory, default_forbidden_view)
+
+ def test_secpol_with_iunauthorized_appfactory(self):
+ from repoze.bfg.interfaces import IUnauthorizedAppFactory
+ environ = self._makeEnviron()
+ context = DummyContext()
+ self._registerTraverserFactory(context)
+ rootfactory = self._registerRootFactory(None)
+ logger = self._registerLogger()
+ def factory():
+ return 'yo'
+ self.registry.registerUtility(factory, IUnauthorizedAppFactory)
+ router = self._makeOne()
+ self.assertEqual(len(logger.messages), 1)
+ self.failUnless('IForbiddenResponseFactory' in logger.messages[0])
+ class DummyRequest:
+ def get_response(self, app):
+ return app
+ req = DummyRequest()
+ self.assertEqual(router.forbidden_resp_factory(None, req), 'yo')
+
def test_call_no_view_registered_no_isettings(self):
- rootfactory = make_rootfactory(None)
environ = self._makeEnviron()
context = DummyContext()
self._registerTraverserFactory(context)
logger = self._registerLogger()
- self._registerRootFactory(rootfactory)
+ self._registerRootFactory(None)
router = self._makeOne()
start_response = DummyStartResponse()
result = router(environ, start_response)
@@ -148,8 +197,7 @@ class RouterTests(unittest.TestCase):
self._registerTraverserFactory(context)
environ = self._makeEnviron()
start_response = DummyStartResponse()
- rootfactory = make_rootfactory(NotFound())
- self._registerRootFactory(rootfactory)
+ self._registerRootFactory(NotFound())
router = self._makeOne()
result = router(environ, start_response)
status = start_response.status
@@ -157,13 +205,12 @@ class RouterTests(unittest.TestCase):
self.failUnless('http://localhost:8080' in result[0], result)
def test_call_no_view_registered_debug_notfound_false(self):
- rootfactory = make_rootfactory(None)
environ = self._makeEnviron()
context = DummyContext()
self._registerTraverserFactory(context)
logger = self._registerLogger()
self._registerSettings(debug_notfound=False)
- self._registerRootFactory(rootfactory)
+ self._registerRootFactory(None)
router = self._makeOne()
start_response = DummyStartResponse()
result = router(environ, start_response)
@@ -176,13 +223,12 @@ class RouterTests(unittest.TestCase):
self.assertEqual(len(logger.messages), 0)
def test_call_no_view_registered_debug_notfound_true(self):
- rootfactory = make_rootfactory(None)
environ = self._makeEnviron()
context = DummyContext()
self._registerTraverserFactory(context)
self._registerSettings(debug_notfound=True)
logger = self._registerLogger()
- self._registerRootFactory(rootfactory)
+ self._registerRootFactory(None)
router = self._makeOne()
start_response = DummyStartResponse()
result = router(environ, start_response)
@@ -205,19 +251,17 @@ class RouterTests(unittest.TestCase):
self.failUnless("subpath: []" in message)
def test_call_view_returns_nonresponse(self):
- rootfactory = make_rootfactory(None)
context = DummyContext()
self._registerTraverserFactory(context)
environ = self._makeEnviron()
view = make_view('abc')
self._registerView(view, '', None, None)
- self._registerRootFactory(rootfactory)
+ self._registerRootFactory(None)
router = self._makeOne()
start_response = DummyStartResponse()
self.assertRaises(ValueError, router, environ, start_response)
def test_call_view_registered_nonspecific_default_path(self):
- rootfactory = make_rootfactory(None)
context = DummyContext()
self._registerTraverserFactory(context)
response = DummyResponse()
@@ -225,7 +269,7 @@ class RouterTests(unittest.TestCase):
view = make_view(response)
environ = self._makeEnviron()
self._registerView(view, '', None, None)
- self._registerRootFactory(rootfactory)
+ self._registerRootFactory(None)
router = self._makeOne()
start_response = DummyStartResponse()
result = router(environ, start_response)
@@ -238,7 +282,6 @@ class RouterTests(unittest.TestCase):
self.assertEqual(environ['webob.adhoc_attrs']['root'], None)
def test_call_deprecation_warning(self):
- rootfactory = make_rootfactory(None)
context = DummyContext()
self._registerTraverserFactory(context, _deprecation_warning='abc')
response = DummyResponse()
@@ -246,7 +289,7 @@ class RouterTests(unittest.TestCase):
view = make_view(response)
environ = self._makeEnviron()
self._registerView(view, '', None, None)
- self._registerRootFactory(rootfactory)
+ self._registerRootFactory(None)
router = self._makeOne()
logger = self._registerLogger()
router.logger = logger
@@ -256,7 +299,6 @@ class RouterTests(unittest.TestCase):
self.assertEqual(logger.messages[0], 'abc')
def test_call_view_registered_nonspecific_nondefault_path_and_subpath(self):
- rootfactory = make_rootfactory(None)
context = DummyContext()
self._registerTraverserFactory(context, view_name='foo',
subpath=['bar'],
@@ -266,7 +308,7 @@ class RouterTests(unittest.TestCase):
view = make_view(response)
environ = self._makeEnviron()
self._registerView(view, 'foo', None, None)
- self._registerRootFactory(rootfactory)
+ self._registerRootFactory(None)
router = self._makeOne()
start_response = DummyStartResponse()
result = router(environ, start_response)
@@ -279,7 +321,6 @@ class RouterTests(unittest.TestCase):
self.assertEqual(environ['webob.adhoc_attrs']['root'], None)
def test_call_view_registered_specific_success(self):
- rootfactory = make_rootfactory(None)
from zope.interface import Interface
from zope.interface import directlyProvides
class IContext(Interface):
@@ -293,7 +334,7 @@ class RouterTests(unittest.TestCase):
view = make_view(response)
environ = self._makeEnviron()
self._registerView(view, '', IContext, IRequest)
- self._registerRootFactory(rootfactory)
+ self._registerRootFactory(None)
router = self._makeOne()
start_response = DummyStartResponse()
result = router(environ, start_response)
@@ -306,7 +347,6 @@ class RouterTests(unittest.TestCase):
self.assertEqual(environ['webob.adhoc_attrs']['root'], None)
def test_call_view_registered_specific_fail(self):
- rootfactory = make_rootfactory(None)
from zope.interface import Interface
from zope.interface import directlyProvides
class IContext(Interface):
@@ -321,15 +361,14 @@ class RouterTests(unittest.TestCase):
view = make_view(response)
environ = self._makeEnviron()
self._registerView(view, '', IContext, IRequest)
- self._registerRootFactory(rootfactory)
+ self._registerRootFactory(None)
router = self._makeOne()
start_response = DummyStartResponse()
result = router(environ, start_response)
self.assertEqual(start_response.status, '404 Not Found')
self.failUnless('404' in result[0])
- def test_call_view_registered_security_policy_permission_none(self):
- rootfactory = make_rootfactory(None)
+ def test_call_view_permission_none(self):
from zope.interface import Interface
from zope.interface import directlyProvides
class IContext(Interface):
@@ -342,16 +381,14 @@ class RouterTests(unittest.TestCase):
view = make_view(response)
environ = self._makeEnviron()
self._registerView(view, '', IContext, IRequest)
- secpol = DummySecurityPolicy()
- self._registerSecurityPolicy(secpol)
- self._registerRootFactory(rootfactory)
+ self._registerRootFactory(None)
router = self._makeOne()
start_response = DummyStartResponse()
result = router(environ, start_response)
self.assertEqual(start_response.status, '200 OK')
- def test_call_view_registered_security_policy_permission_succeeds(self):
- rootfactory = make_rootfactory(None)
+ def test_call_view_no_authentication_policy_debug_authorization(self):
+ logger = self._registerLogger()
from zope.interface import Interface
from zope.interface import directlyProvides
class IContext(Interface):
@@ -362,21 +399,87 @@ class RouterTests(unittest.TestCase):
self._registerTraverserFactory(context, subpath=[''])
response = DummyResponse()
view = make_view(response)
- secpol = DummySecurityPolicy()
- permissionfactory = make_permission_factory(True)
environ = self._makeEnviron()
self._registerView(view, '', IContext, IRequest)
- self._registerSecurityPolicy(secpol)
- self._registerPermission(permissionfactory, '', IContext, IRequest)
- self._registerRootFactory(rootfactory)
+ self._registerRootFactory(None)
router = self._makeOne()
+ router.debug_authorization = True
start_response = DummyStartResponse()
result = router(environ, start_response)
self.assertEqual(start_response.status, '200 OK')
- self.assertEqual(permissionfactory.checked_with, secpol)
+ self.assertEqual(len(logger.messages), 1)
+ self.failUnless('no authentication policy' in logger.messages[0])
+
+ def test_call_view_no_permission_registered_debug_authorization(self):
+ self._registerAuthenticationPolicy()
+ logger = self._registerLogger()
+ from zope.interface import Interface
+ from zope.interface import directlyProvides
+ class IContext(Interface):
+ pass
+ from repoze.bfg.interfaces import IRequest
+ context = DummyContext()
+ directlyProvides(context, IContext)
+ self._registerTraverserFactory(context, subpath=[''])
+ response = DummyResponse()
+ view = make_view(response)
+ environ = self._makeEnviron()
+ self._registerView(view, '', IContext, IRequest)
+ self._registerRootFactory(None)
+ router = self._makeOne()
+ router.debug_authorization = True
+ start_response = DummyStartResponse()
+ result = router(environ, start_response)
+ self.assertEqual(start_response.status, '200 OK')
+ self.assertEqual(len(logger.messages), 1)
+ self.failUnless('no permission registered' in logger.messages[0])
+
+ def test_call_view_no_permission_registered_no_debug(self):
+ self._registerAuthenticationPolicy()
+ logger = self._registerLogger()
+ from zope.interface import Interface
+ from zope.interface import directlyProvides
+ class IContext(Interface):
+ pass
+ from repoze.bfg.interfaces import IRequest
+ context = DummyContext()
+ directlyProvides(context, IContext)
+ self._registerTraverserFactory(context, subpath=[''])
+ response = DummyResponse()
+ view = make_view(response)
+ environ = self._makeEnviron()
+ self._registerView(view, '', IContext, IRequest)
+ self._registerRootFactory(None)
+ router = self._makeOne()
+ router.debug_authorization = False
+ start_response = DummyStartResponse()
+ result = router(environ, start_response)
+ self.assertEqual(start_response.status, '200 OK')
+ self.assertEqual(len(logger.messages), 0)
+
+ def test_call_view_permission_succeeds(self):
+ from zope.interface import Interface
+ from zope.interface import directlyProvides
+ class IContext(Interface):
+ pass
+ from repoze.bfg.interfaces import IRequest
+ context = DummyContext()
+ directlyProvides(context, IContext)
+ self._registerTraverserFactory(context, subpath=[''])
+ self._registerAuthenticationPolicy()
+ response = DummyResponse()
+ view = make_view(response)
+ environ = self._makeEnviron()
+ self._registerView(view, '', IContext, IRequest)
+ checker = self._registerViewPermission('', True)
+ self._registerRootFactory(None)
+ router = self._makeOne()
+ start_response = DummyStartResponse()
+ result = router(environ, start_response)
+ self.assertEqual(start_response.status, '200 OK')
+ self.assertEqual(checker.context, context)
def test_call_view_permission_fails_nosettings(self):
- rootfactory = make_rootfactory(None)
from zope.interface import Interface
from zope.interface import directlyProvides
class IContext(Interface):
@@ -385,28 +488,24 @@ class RouterTests(unittest.TestCase):
context = DummyContext()
directlyProvides(context, IContext)
self._registerTraverserFactory(context, subpath=[''])
+ self._registerAuthenticationPolicy()
response = DummyResponse()
view = make_view(response)
- secpol = DummySecurityPolicy()
from repoze.bfg.security import ACLDenied
- permissionfactory = make_permission_factory(
- ACLDenied('ace', 'acl', 'permission', ['principals'], context)
- )
+ denied = ACLDenied('ace', 'acl', 'permission', ['principals'], context)
environ = self._makeEnviron()
self._registerView(view, '', IContext, IRequest)
- self._registerSecurityPolicy(secpol)
- self._registerPermission(permissionfactory, '', IContext, IRequest)
- self._registerRootFactory(rootfactory)
+ checker = self._registerViewPermission('', denied)
+ self._registerRootFactory(None)
router = self._makeOne()
start_response = DummyStartResponse()
result = router(environ, start_response)
self.assertEqual(start_response.status, '401 Unauthorized')
- message = result[0]
- self.failUnless('failed security policy check' in message)
- self.assertEqual(permissionfactory.checked_with, secpol)
+ message = environ['repoze.bfg.message']
+ self.assertEqual(message, 'Unauthorized: failed security policy check')
+ self.assertEqual(checker.context, context)
def test_call_view_permission_fails_no_debug_auth(self):
- rootfactory = make_rootfactory(None)
from zope.interface import Interface
from zope.interface import directlyProvides
class IContext(Interface):
@@ -415,29 +514,25 @@ class RouterTests(unittest.TestCase):
context = DummyContext()
directlyProvides(context, IContext)
self._registerTraverserFactory(context, subpath=[''])
+ self._registerAuthenticationPolicy()
response = DummyResponse()
view = make_view(response)
- secpol = DummySecurityPolicy()
from repoze.bfg.security import ACLDenied
- permissionfactory = make_permission_factory(
- ACLDenied('ace', 'acl', 'permission', ['principals'], context)
- )
+ denied = ACLDenied('ace', 'acl', 'permission', ['principals'], context)
environ = self._makeEnviron()
self._registerView(view, '', IContext, IRequest)
- self._registerSecurityPolicy(secpol)
- self._registerPermission(permissionfactory, '', IContext, IRequest)
+ checker = self._registerViewPermission('', denied)
self._registerSettings(debug_authorization=False)
- self._registerRootFactory(rootfactory)
+ self._registerRootFactory(None)
router = self._makeOne()
start_response = DummyStartResponse()
result = router(environ, start_response)
self.assertEqual(start_response.status, '401 Unauthorized')
- message = result[0]
+ message = environ['repoze.bfg.message']
self.failUnless('failed security policy check' in message)
- self.assertEqual(permissionfactory.checked_with, secpol)
+ self.assertEqual(checker.context, context)
def test_call_view_permission_fails_with_debug_auth(self):
- rootfactory = make_rootfactory(None)
from zope.interface import Interface
from zope.interface import directlyProvides
class IContext(Interface):
@@ -445,31 +540,28 @@ class RouterTests(unittest.TestCase):
from repoze.bfg.interfaces import IRequest
context = DummyContext()
directlyProvides(context, IContext)
+ self._registerAuthenticationPolicy()
self._registerTraverserFactory(context, subpath=[''])
response = DummyResponse()
view = make_view(response)
- secpol = DummySecurityPolicy()
from repoze.bfg.security import ACLDenied
- permissionfactory = make_permission_factory(
- ACLDenied('ace', 'acl', 'permission', ['principals'], context)
- )
environ = self._makeEnviron()
self._registerView(view, '', IContext, IRequest)
- self._registerSecurityPolicy(secpol)
- self._registerPermission(permissionfactory, '', IContext, IRequest)
+ allowed = ACLDenied('ace', 'acl', 'permission', ['principals'], context)
+ checker = self._registerViewPermission('', allowed)
self._registerSettings(debug_authorization=True)
logger = self._registerLogger()
- self._registerRootFactory(rootfactory)
+ self._registerRootFactory(None)
router = self._makeOne()
start_response = DummyStartResponse()
result = router(environ, start_response)
self.assertEqual(start_response.status, '401 Unauthorized')
- message = result[0]
+ message = environ['repoze.bfg.message']
self.failUnless(
"ACLDenied permission 'permission' via ACE 'ace' in ACL 'acl' "
"on context" in message)
self.failUnless("for principals ['principals']" in message)
- self.assertEqual(permissionfactory.checked_with, secpol)
+ self.assertEqual(checker.context, context)
self.assertEqual(len(logger.messages), 1)
logged = logger.messages[0]
self.failUnless(
@@ -482,7 +574,6 @@ class RouterTests(unittest.TestCase):
"for principals ['principals']" in logged)
def test_call_eventsends(self):
- rootfactory = make_rootfactory(None)
context = DummyContext()
self._registerTraverserFactory(context)
response = DummyResponse()
@@ -494,7 +585,7 @@ class RouterTests(unittest.TestCase):
from repoze.bfg.interfaces import INewResponse
request_events = self._registerEventListener(INewRequest)
response_events = self._registerEventListener(INewResponse)
- self._registerRootFactory(rootfactory)
+ self._registerRootFactory(None)
router = self._makeOne()
start_response = DummyStartResponse()
result = router(environ, start_response)
@@ -503,12 +594,27 @@ class RouterTests(unittest.TestCase):
self.assertEqual(len(response_events), 1)
self.assertEqual(response_events[0].response, response)
+ def test_call_pushes_and_pops_threadlocal_manager(self):
+ context = DummyContext()
+ self._registerTraverserFactory(context)
+ response = DummyResponse()
+ response.app_iter = ['Hello world']
+ view = make_view(response)
+ environ = self._makeEnviron()
+ self._registerView(view, '', None, None)
+ self._registerRootFactory(None)
+ router = self._makeOne()
+ start_response = DummyStartResponse()
+ router.threadlocal_manager = DummyThreadLocalManager()
+ result = router(environ, start_response)
+ self.assertEqual(len(router.threadlocal_manager.pushed), 1)
+ self.assertEqual(len(router.threadlocal_manager.popped), 1)
+
def test_call_post_method(self):
from repoze.bfg.interfaces import INewRequest
from repoze.bfg.interfaces import IPOSTRequest
from repoze.bfg.interfaces import IPUTRequest
from repoze.bfg.interfaces import IRequest
- rootfactory = make_rootfactory(None)
context = DummyContext()
self._registerTraverserFactory(context)
response = DummyResponse()
@@ -516,7 +622,7 @@ class RouterTests(unittest.TestCase):
view = make_view(response)
environ = self._makeEnviron(REQUEST_METHOD='POST')
self._registerView(view, '', None, None)
- self._registerRootFactory(rootfactory)
+ self._registerRootFactory(None)
router = self._makeOne()
start_response = DummyStartResponse()
request_events = self._registerEventListener(INewRequest)
@@ -531,7 +637,6 @@ class RouterTests(unittest.TestCase):
from repoze.bfg.interfaces import IPUTRequest
from repoze.bfg.interfaces import IPOSTRequest
from repoze.bfg.interfaces import IRequest
- rootfactory = make_rootfactory(None)
context = DummyContext()
self._registerTraverserFactory(context)
response = DummyResponse()
@@ -539,7 +644,7 @@ class RouterTests(unittest.TestCase):
view = make_view(response)
environ = self._makeEnviron(REQUEST_METHOD='PUT')
self._registerView(view, '', None, None)
- self._registerRootFactory(rootfactory)
+ self._registerRootFactory(None)
router = self._makeOne()
start_response = DummyStartResponse()
request_events = self._registerEventListener(INewRequest)
@@ -552,7 +657,6 @@ class RouterTests(unittest.TestCase):
def test_call_unknown_method(self):
from repoze.bfg.interfaces import INewRequest
from repoze.bfg.interfaces import IRequest
- rootfactory = make_rootfactory(None)
context = DummyContext()
self._registerTraverserFactory(context)
response = DummyResponse()
@@ -560,7 +664,7 @@ class RouterTests(unittest.TestCase):
view = make_view(response)
environ = self._makeEnviron(REQUEST_METHOD='UNKNOWN')
self._registerView(view, '', None, None)
- self._registerRootFactory(rootfactory)
+ self._registerRootFactory(None)
router = self._makeOne()
start_response = DummyStartResponse()
request_events = self._registerEventListener(INewRequest)
@@ -573,7 +677,6 @@ class RouterTests(unittest.TestCase):
from repoze.bfg.interfaces import IRequestFactory
from repoze.bfg.testing import DummyRequest
self.registry.registerUtility(DummyRequest, IRequestFactory)
- rootfactory = make_rootfactory(None)
context = DummyContext()
self._registerTraverserFactory(context)
response = DummyResponse()
@@ -581,7 +684,7 @@ class RouterTests(unittest.TestCase):
view = make_view(response)
environ = self._makeEnviron()
self._registerView(view, '', None, None)
- self._registerRootFactory(rootfactory)
+ self._registerRootFactory(None)
router = self._makeOne()
start_response = DummyStartResponse()
request_events = self._registerEventListener(INewRequest)
@@ -593,54 +696,19 @@ class RouterTests(unittest.TestCase):
self.assertEqual(request.view_name, '')
self.assertEqual(request.subpath, [])
- def test_call_inotfound_appfactory_override(self):
- from repoze.bfg.interfaces import INotFoundAppFactory
- def app():
- """ """
- self.registry.registerUtility(app, INotFoundAppFactory)
- rootfactory = make_rootfactory(None)
- context = DummyContext()
- self._registerTraverserFactory(context)
- response = DummyResponse()
- response.app_iter = ['Hello world']
- view = make_view(response)
- environ = self._makeEnviron()
- self._registerView(view, '', None, None)
- self._registerRootFactory(rootfactory)
- router = self._makeOne()
- self.assertEqual(router.notfound_app_factory, app)
-
- def test_call_iunauth_appfactory_override(self):
- from repoze.bfg.interfaces import IUnauthorizedAppFactory
- def app():
- """ """
- self.registry.registerUtility(app, IUnauthorizedAppFactory)
- rootfactory = make_rootfactory(None)
- context = DummyContext()
- self._registerTraverserFactory(context)
- response = DummyResponse()
- response.app_iter = ['Hello world']
- view = make_view(response)
- environ = self._makeEnviron()
- self._registerView(view, '', None, None)
- self._registerRootFactory(rootfactory)
- router = self._makeOne()
- self.assertEqual(router.unauth_app_factory, app)
-
class MakeAppTests(unittest.TestCase):
def setUp(self):
cleanUp()
import repoze.bfg.router
- self.old_registry_manager = repoze.bfg.router.registry_manager
+ self.old_tl_manager = repoze.bfg.router.manager
self.regmgr = DummyRegistryManager()
- repoze.bfg.router.registry_manager = self.regmgr
+ repoze.bfg.router.manager = self.regmgr
def tearDown(self):
cleanUp()
import repoze.bfg.router
- repoze.bfg.router.registry_manager = self.old_registry_manager
+ repoze.bfg.router.threadlocal_manager = self.old_tl_manager
-
def _callFUT(self, *arg, **kw):
from repoze.bfg.router import make_app
return make_app(*arg, **kw)
@@ -729,6 +797,73 @@ class MakeAppTests(unittest.TestCase):
self.assertRaises(ValueError, self._callFUT, None, fixtureapp,
options=options)
+ def test_authorization_policy_no_authentication_policy(self):
+ from repoze.bfg.interfaces import IAuthorizationPolicy
+ authzpolicy = DummyContext()
+ from repoze.bfg.tests import routesapp
+ app = self._callFUT(None, routesapp, authorization_policy=authzpolicy)
+ self.failIf(app.registry.queryUtility(IAuthorizationPolicy))
+
+ def test_authentication_policy_no_authorization_policy(self):
+ from repoze.bfg.interfaces import IAuthorizationPolicy
+ from repoze.bfg.interfaces import IAuthenticationPolicy
+ from repoze.bfg.authorization import ACLAuthorizationPolicy
+ authnpolicy = DummyContext()
+ from repoze.bfg.tests import routesapp
+ app = self._callFUT(None, routesapp, authentication_policy=authnpolicy)
+ self.assertEqual(app.registry.getUtility(IAuthenticationPolicy),
+ authnpolicy)
+ self.assertEqual(
+ app.registry.getUtility(IAuthorizationPolicy).__class__,
+ ACLAuthorizationPolicy)
+
+ def test_authentication_policy_and_authorization_policy(self):
+ from repoze.bfg.interfaces import IAuthorizationPolicy
+ from repoze.bfg.interfaces import IAuthenticationPolicy
+ authnpolicy = DummyContext()
+ authzpolicy = DummyContext()
+ from repoze.bfg.tests import routesapp
+ app = self._callFUT(None, routesapp, authentication_policy=authnpolicy,
+ authorization_policy = authzpolicy)
+ self.assertEqual(app.registry.getUtility(IAuthenticationPolicy),
+ authnpolicy)
+ self.assertEqual(app.registry.getUtility(IAuthorizationPolicy),
+ authzpolicy)
+
+ def test_secpol_BBB_registrations(self):
+ from repoze.bfg.interfaces import IAuthorizationPolicy
+ from repoze.bfg.interfaces import IAuthenticationPolicy
+ from repoze.bfg.interfaces import ISecurityPolicy
+ secpol = DummySecurityPolicy()
+ from zope.component import getGlobalSiteManager
+ gsm = getGlobalSiteManager()
+ gsm.registerUtility(secpol, ISecurityPolicy)
+ from repoze.bfg.tests import routesapp
+ logger = DummyLogger()
+ app = self._callFUT(None, routesapp, registry=gsm, debug_logger=logger)
+ self.failUnless(app.registry.queryUtility(IAuthenticationPolicy))
+ self.failUnless(app.registry.queryUtility(IAuthorizationPolicy))
+ self.assertEqual(len(logger.messages), 1)
+ self.failUnless('ISecurityPolicy' in logger.messages[0])
+
+class TestDefaultForbiddenView(unittest.TestCase):
+ def _callFUT(self, context, request):
+ from repoze.bfg.router import default_forbidden_view
+ return default_forbidden_view(context, request)
+
+ def test_nomessage(self):
+ request = DummyRequest({})
+ context = DummyContext()
+ response = self._callFUT(context, request)
+ self.failUnless('<code></code>' in response.body)
+
+ def test_withmessage(self):
+ request = DummyRequest({'repoze.bfg.message':'abc&123'})
+ context = DummyContext()
+ response = self._callFUT(context, request)
+ self.failUnless('<code>abc&amp;123</code>' in response.body)
+
+
class DummyRegistryManager:
def push(self, registry):
self.pushed = True
@@ -744,18 +879,6 @@ def make_view(response):
return response
return view
-def make_permission_factory(result):
- class DummyPermissionFactory:
- def __init__(self, context, request):
- self.context = context
- self.request = request
-
- def __call__(self, secpol):
- self.__class__.checked_with = secpol
- return result
-
- return DummyPermissionFactory
-
def make_rootfactory(root):
def rootpolicy(environ):
return root
@@ -776,3 +899,29 @@ class DummyResponse:
class DummySecurityPolicy:
pass
+class DummyRequest:
+ def __init__(self, environ):
+ self.environ = environ
+
+
+class DummyLogger:
+ def __init__(self):
+ self.messages = []
+ def info(self, msg):
+ self.messages.append(msg)
+ warn = info
+ debug = info
+
+class DummyThreadLocalManager:
+ def __init__(self):
+ self.pushed = []
+ self.popped = []
+
+ def push(self, val):
+ self.pushed.append(val)
+
+ def pop(self):
+ self.popped.append(True)
+
+class DummyAuthenticationPolicy:
+ pass
diff --git a/repoze/bfg/tests/test_secpols.py b/repoze/bfg/tests/test_secpols.py
new file mode 100644
index 000000000..2b0449e89
--- /dev/null
+++ b/repoze/bfg/tests/test_secpols.py
@@ -0,0 +1,766 @@
+import unittest
+
+from repoze.bfg.testing import cleanUp
+
+class TestAPIFunctionsSecpolBBB(unittest.TestCase):
+ def setUp(self):
+ cleanUp()
+
+ def tearDown(self):
+ cleanUp()
+ try:
+ del globals()['__warningregistry__']
+ except KeyError:
+ pass
+
+ def _testWithWarnings(self, f, *args, **kw):
+ messages = []
+ def showwarning(message, category, filename, lineno, file=None):
+ messages.append(message)
+ try:
+ import warnings
+ _old_showwarning = warnings.showwarning
+ warnings.showwarning = showwarning
+ result = f(*args, **kw)
+ return result, messages
+ finally:
+ warnings.showwarning = _old_showwarning
+
+ def _registerSecurityPolicy(self, secpol):
+ import zope.component
+ from repoze.bfg.secpols import registerBBBAuthn
+ gsm = zope.component.getGlobalSiteManager()
+ registerBBBAuthn(secpol, gsm)
+
+ def test_has_permission_registered(self):
+ secpol = DummySecurityPolicy(False)
+ self._registerSecurityPolicy(secpol)
+ from repoze.bfg.security import has_permission
+ self.assertEqual(has_permission('view', None, None), False)
+
+ def test_has_permission_not_registered(self):
+ from repoze.bfg.security import has_permission
+ result = has_permission('view', None, None)
+ self.assertEqual(result, True)
+ self.assertEqual(result.msg, 'No authentication policy in use.')
+
+ def test_authenticated_userid_registered(self):
+ secpol = DummySecurityPolicy(False)
+ self._registerSecurityPolicy(secpol)
+ from repoze.bfg.security import authenticated_userid
+ request = DummyRequest({})
+ result, warnings = self._testWithWarnings(authenticated_userid,
+ request)
+ self.assertEqual(result, 'fred')
+ self.assertEqual(len(warnings), 1)
+
+ def test_authenticated_userid_not_registered(self):
+ from repoze.bfg.security import authenticated_userid
+ request = DummyRequest({})
+ result, warnings = self._testWithWarnings(authenticated_userid,
+ request)
+ self.assertEqual(result, None)
+ self.assertEqual(len(warnings), 1)
+
+ def test_authenticated_userid_too_many_args(self):
+ from repoze.bfg.security import authenticated_userid
+ self.assertRaises(TypeError, authenticated_userid, None, None, None)
+
+ def test_effective_principals_registered(self):
+ secpol = DummySecurityPolicy(False)
+ self._registerSecurityPolicy(secpol)
+ from repoze.bfg.security import effective_principals
+ request = DummyRequest({})
+ result, warnings = self._testWithWarnings(effective_principals, request)
+ self.assertEqual(result, ['fred', 'bob'])
+ self.assertEqual(len(warnings), 1)
+
+ def test_effective_principals_not_registered(self):
+ from repoze.bfg.security import effective_principals
+ request = DummyRequest({})
+ result, warnings = self._testWithWarnings(effective_principals, request)
+ self.assertEqual(result, [])
+ self.assertEqual(len(warnings), 1)
+
+ def test_effective_principals_too_many_args(self):
+ from repoze.bfg.security import effective_principals
+ self.assertRaises(TypeError, effective_principals, None, None, None)
+
+
+ def test_principals_allowed_by_permission_not_registered(self):
+ from repoze.bfg.security import principals_allowed_by_permission
+ from repoze.bfg.security import Everyone
+ self.assertEqual(principals_allowed_by_permission(None, None),
+ [Everyone])
+
+ def test_principals_allowed_by_permission_registered(self):
+ secpol = DummySecurityPolicy(False)
+ self._registerSecurityPolicy(secpol)
+ from repoze.bfg.security import principals_allowed_by_permission
+ self.assertEqual(principals_allowed_by_permission(None, None),
+ ['fred', 'bob'])
+
+
+class TestACLSecurityPolicy(unittest.TestCase):
+ def setUp(self):
+ cleanUp()
+
+ def tearDown(self):
+ cleanUp()
+
+ def _getTargetClass(self):
+ from repoze.bfg.secpols import ACLSecurityPolicy
+ return ACLSecurityPolicy
+
+ def _makeOne(self, *arg, **kw):
+ klass = self._getTargetClass()
+ return klass(*arg, **kw)
+
+ def test_class_implements_ISecurityPolicy(self):
+ from zope.interface.verify import verifyClass
+ from repoze.bfg.interfaces import ISecurityPolicy
+ verifyClass(ISecurityPolicy, self._getTargetClass())
+
+ def test_instance_implements_ISecurityPolicy(self):
+ from zope.interface.verify import verifyObject
+ from repoze.bfg.interfaces import ISecurityPolicy
+ verifyObject(ISecurityPolicy, self._makeOne(lambda *arg: None))
+
+ def test_permits_no_principals_no_acl_info_on_context(self):
+ context = DummyContext()
+ request = DummyRequest({})
+ policy = self._makeOne(lambda *arg: [])
+ result = policy.permits(context, request, 'view')
+ self.assertEqual(result, False)
+ from repoze.bfg.security import Everyone
+ self.assertEqual(result.principals, set([Everyone]))
+ self.assertEqual(result.permission, 'view')
+ self.assertEqual(result.context, context)
+
+ def test_permits_no_principals_empty_acl_info_on_context(self):
+ context = DummyContext()
+ context.__acl__ = []
+ request = DummyRequest({})
+ policy = self._makeOne(lambda *arg: [])
+ result = policy.permits(context, request, 'view')
+ self.assertEqual(result, False)
+ from repoze.bfg.security import Everyone
+ self.assertEqual(result.principals, set([Everyone]))
+ self.assertEqual(result.permission, 'view')
+ self.assertEqual(result.context, context)
+
+ def test_permits_no_principals_root_has_empty_acl_info(self):
+ context = DummyContext()
+ context.__name__ = None
+ context.__parent__ = None
+ context.__acl__ = []
+ context2 = DummyContext()
+ context2.__name__ = 'context2'
+ context2.__parent__ = context
+ request = DummyRequest({})
+ policy = self._makeOne(lambda *arg: [])
+ result = policy.permits(context, request, 'view')
+ self.assertEqual(result, False)
+ from repoze.bfg.security import Everyone
+ self.assertEqual(result.principals, set([Everyone]))
+ self.assertEqual(result.permission, 'view')
+ self.assertEqual(result.context, context)
+
+ def test_permits_no_principals_root_allows_everyone(self):
+ context = DummyContext()
+ context.__name__ = None
+ context.__parent__ = None
+ from repoze.bfg.security import Allow, Everyone
+ context.__acl__ = [ (Allow, Everyone, 'view') ]
+ context2 = DummyContext()
+ context2.__name__ = 'context2'
+ context2.__parent__ = context
+ request = DummyRequest({})
+ policy = self._makeOne(lambda *arg: [])
+ result = policy.permits(context, request, 'view')
+ self.assertEqual(result, True)
+ self.assertEqual(result.principals, set([Everyone]))
+ self.assertEqual(result.permission, 'view')
+ self.assertEqual(result.context, context)
+
+ def test_permits_deny_implicit(self):
+ from repoze.bfg.security import Allow, Authenticated, Everyone
+ context = DummyContext()
+ context.__acl__ = [ (Allow, 'somebodyelse', 'read') ]
+ policy = self._makeOne(lambda *arg: ['fred'])
+ request = DummyRequest({})
+ result = policy.permits(context, request, 'read')
+ self.assertEqual(result, False)
+ self.assertEqual(result.principals,
+ set(['fred', Authenticated, Everyone]))
+ self.assertEqual(result.permission, 'read')
+ self.assertEqual(result.context, context)
+ self.assertEqual(result.ace, None)
+
+ def test_permits_deny_explicit(self):
+ from repoze.bfg.security import Deny, Authenticated, Everyone
+ context = DummyContext()
+ context.__acl__ = [ (Deny, 'fred', 'read') ]
+ policy = self._makeOne(lambda *arg: ['fred'])
+ request = DummyRequest({})
+ result = policy.permits(context, request, 'read')
+ self.assertEqual(result, False)
+ self.assertEqual(result.principals,
+ set(['fred', Authenticated, Everyone]))
+ self.assertEqual(result.permission, 'read')
+ self.assertEqual(result.context, context)
+ self.assertEqual(result.ace, (Deny, 'fred', 'read'))
+
+ def test_permits_deny_twoacl_implicit(self):
+ from repoze.bfg.security import Allow, Authenticated, Everyone
+ context = DummyContext()
+ acl = [(Allow, 'somebody', 'view'), (Allow, 'somebody', 'write')]
+ context.__acl__ = acl
+ policy = self._makeOne(lambda *arg: ['fred'])
+ request = DummyRequest({})
+ result = policy.permits(context, request, 'read')
+ self.assertEqual(result, False)
+ self.assertEqual(result.principals,
+ set(['fred', Authenticated, Everyone]))
+ self.assertEqual(result.permission, 'read')
+ self.assertEqual(result.context, context)
+ self.assertEqual(result.ace, None)
+
+ def test_permits_allow_twoacl_multiperm(self):
+ from repoze.bfg.security import Allow, Deny, Authenticated, Everyone
+ context = DummyContext()
+ acl = [ (Allow, 'fred', ('write', 'view') ), (Deny, 'fred', 'view') ]
+ context.__acl__ = acl
+ policy = self._makeOne(lambda *arg: ['fred'])
+ request = DummyRequest({})
+ result = policy.permits(context, request, 'view')
+ self.assertEqual(result, True)
+ self.assertEqual(result.principals,
+ set(['fred', Authenticated, Everyone]))
+ self.assertEqual(result.permission, 'view')
+ self.assertEqual(result.context, context)
+ self.assertEqual(result.ace, (Allow, 'fred', ('write', 'view') ))
+
+ def test_permits_deny_twoacl_multiperm(self):
+ from repoze.bfg.security import Allow, Deny, Authenticated, Everyone
+ context = DummyContext()
+ acl = []
+ deny = (Deny, 'fred', ('view', 'read'))
+ allow = (Allow, 'fred', 'view')
+ context.__acl__ = [deny, allow]
+ policy = self._makeOne(lambda *arg: ['fred'])
+ request = DummyRequest({})
+ result = policy.permits(context, request, 'read')
+ self.assertEqual(result, False)
+ self.assertEqual(result.principals,
+ set(['fred', Authenticated, Everyone]))
+ self.assertEqual(result.permission, 'read')
+ self.assertEqual(result.context, context)
+ self.assertEqual(result.ace, deny)
+
+ def test_permits_allow_via_location_parent(self):
+ from repoze.bfg.security import Allow, Authenticated, Everyone
+ context = DummyContext()
+ context.__parent__ = None
+ context.__name__ = None
+ context.__acl__ = [ (Allow, 'fred', 'read') ]
+ context2 = DummyContext()
+ context2.__parent__ = context
+ context2.__name__ = 'myname'
+
+ policy = self._makeOne(lambda *arg: ['fred'])
+ request = DummyRequest({})
+ result = policy.permits(context2, request, 'read')
+ self.assertEqual(result, True)
+ self.assertEqual(result.principals,
+ set(['fred', Authenticated, Everyone]))
+ self.assertEqual(result.permission, 'read')
+ self.assertEqual(result.context, context)
+ self.assertEqual(result.ace, ('Allow', 'fred', 'read'))
+
+ def test_permits_deny_byorder(self):
+ from repoze.bfg.security import Allow, Deny, Authenticated, Everyone
+ context = DummyContext()
+ acl = []
+ deny = (Deny, 'fred', 'read')
+ allow = (Allow, 'fred', 'view')
+ context.__acl__ = [deny, allow]
+ policy = self._makeOne(lambda *arg: ['fred'])
+ request = DummyRequest({})
+ result = policy.permits(context, request, 'read')
+ self.assertEqual(result, False)
+ self.assertEqual(result.principals,
+ set(['fred', Authenticated, Everyone]))
+ self.assertEqual(result.permission, 'read')
+ self.assertEqual(result.context, context)
+ self.assertEqual(result.ace, deny)
+
+ def test_permits_allow_byorder(self):
+ from repoze.bfg.security import Allow, Deny, Authenticated, Everyone
+ context = DummyContext()
+ acl = []
+ deny = (Deny, 'fred', ('view', 'read'))
+ allow = (Allow, 'fred', 'view')
+ context.__acl__ = [allow, deny]
+ policy = self._makeOne(lambda *arg: ['fred'])
+ request = DummyRequest({})
+ result = policy.permits(context, request, 'view')
+ self.assertEqual(result, True)
+ self.assertEqual(result.principals,
+ set(['fred', Authenticated, Everyone]))
+ self.assertEqual(result.permission, 'view')
+ self.assertEqual(result.context, context)
+ self.assertEqual(result.ace, allow)
+
+ def test_principals_allowed_by_permission_direct(self):
+ from repoze.bfg.security import Allow
+ context = DummyContext()
+ acl = [ (Allow, 'chrism', ('read', 'write')),
+ (Allow, 'other', 'read') ]
+ context.__acl__ = acl
+ policy = self._makeOne(lambda *arg: None)
+ result = policy.principals_allowed_by_permission(context, 'read')
+ self.assertEqual(result, ['chrism', 'other'])
+
+ def test_principals_allowed_by_permission_acquired(self):
+ from repoze.bfg.security import Allow
+ context = DummyContext()
+ acl = [ (Allow, 'chrism', ('read', 'write')),
+ (Allow, 'other', ('read',)) ]
+ context.__acl__ = acl
+ context.__parent__ = None
+ context.__name__ = 'context'
+ inter = DummyContext()
+ inter.__name__ = None
+ inter.__parent__ = context
+ policy = self._makeOne(lambda *arg: None)
+ result = policy.principals_allowed_by_permission(inter, 'read')
+ self.assertEqual(result, ['chrism', 'other'])
+
+ def test_principals_allowed_by_permission_no_acls(self):
+ policy = self._makeOne(lambda *arg: None)
+ result = policy.principals_allowed_by_permission(None, 'read')
+ self.assertEqual(result, [])
+
+class TestInheritingACLSecurityPolicy(unittest.TestCase):
+ def setUp(self):
+ cleanUp()
+
+ def tearDown(self):
+ cleanUp()
+
+ def _getTargetClass(self):
+ from repoze.bfg.secpols import InheritingACLSecurityPolicy
+ return InheritingACLSecurityPolicy
+
+ def _makeOne(self, *arg, **kw):
+ klass = self._getTargetClass()
+ return klass(*arg, **kw)
+
+ def test_class_implements_ISecurityPolicy(self):
+ from zope.interface.verify import verifyClass
+ from repoze.bfg.interfaces import ISecurityPolicy
+ verifyClass(ISecurityPolicy, self._getTargetClass())
+
+ def test_instance_implements_ISecurityPolicy(self):
+ from zope.interface.verify import verifyObject
+ from repoze.bfg.interfaces import ISecurityPolicy
+ verifyObject(ISecurityPolicy, self._makeOne(lambda *arg: None))
+
+ def test_permits(self):
+ from repoze.bfg.security import Deny
+ from repoze.bfg.security import Allow
+ from repoze.bfg.security import Everyone
+ from repoze.bfg.security import Authenticated
+ from repoze.bfg.security import ALL_PERMISSIONS
+ from repoze.bfg.security import DENY_ALL
+ policy = self._makeOne(lambda *arg: [])
+ root = DummyContext()
+ community = DummyContext(__name__='community', __parent__=root)
+ blog = DummyContext(__name__='blog', __parent__=community)
+ root.__acl__ = [
+ (Allow, Authenticated, VIEW),
+ ]
+ community.__acl__ = [
+ (Allow, 'fred', ALL_PERMISSIONS),
+ (Allow, 'wilma', VIEW),
+ DENY_ALL,
+ ]
+ blog.__acl__ = [
+ (Allow, 'barney', MEMBER_PERMS),
+ (Allow, 'wilma', VIEW),
+ ]
+ policy = self._makeOne(lambda request: request.principals)
+ request = DummyRequest({})
+
+ request.principals = ['wilma']
+ result = policy.permits(blog, request, 'view')
+ self.assertEqual(result, True)
+ self.assertEqual(result.context, blog)
+ self.assertEqual(result.ace, (Allow, 'wilma', VIEW))
+ result = policy.permits(blog, request, 'delete')
+ self.assertEqual(result, False)
+ self.assertEqual(result.context, community)
+ self.assertEqual(result.ace, (Deny, Everyone, ALL_PERMISSIONS))
+
+ request.principals = ['fred']
+ result = policy.permits(blog, request, 'view')
+ self.assertEqual(result, True)
+ self.assertEqual(result.context, community)
+ self.assertEqual(result.ace, (Allow, 'fred', ALL_PERMISSIONS))
+ result = policy.permits(blog, request, 'doesntevenexistyet')
+ self.assertEqual(result, True)
+ self.assertEqual(result.context, community)
+ self.assertEqual(result.ace, (Allow, 'fred', ALL_PERMISSIONS))
+
+ request.principals = ['barney']
+ result = policy.permits(blog, request, 'view')
+ self.assertEqual(result, True)
+ self.assertEqual(result.context, blog)
+ self.assertEqual(result.ace, (Allow, 'barney', MEMBER_PERMS))
+ result = policy.permits(blog, request, 'administer')
+ self.assertEqual(result, False)
+ self.assertEqual(result.context, community)
+ self.assertEqual(result.ace, (Deny, Everyone, ALL_PERMISSIONS))
+
+ request.principals = ['someguy']
+ result = policy.permits(root, request, 'view')
+ self.assertEqual(result, True)
+ self.assertEqual(result.context, root)
+ self.assertEqual(result.ace, (Allow, Authenticated, VIEW))
+ result = policy.permits(blog, request, 'view')
+ self.assertEqual(result, False)
+ self.assertEqual(result.context, community)
+ self.assertEqual(result.ace, (Deny, Everyone, ALL_PERMISSIONS))
+
+ request.principals = []
+ result = policy.permits(root, request, 'view')
+ self.assertEqual(result, False)
+ self.assertEqual(result.context, root)
+ self.assertEqual(result.ace, None)
+
+ request.principals = []
+ context = DummyContext()
+ result = policy.permits(context, request, 'view')
+ self.assertEqual(result, False)
+
+ def test_principals_allowed_by_permission_direct(self):
+ from repoze.bfg.security import Allow
+ from repoze.bfg.security import DENY_ALL
+ context = DummyContext()
+ acl = [ (Allow, 'chrism', ('read', 'write')),
+ DENY_ALL,
+ (Allow, 'other', 'read') ]
+ context.__acl__ = acl
+ policy = self._makeOne(lambda *arg: None)
+ result = sorted(
+ policy.principals_allowed_by_permission(context, 'read'))
+ self.assertEqual(result, ['chrism'])
+
+ def test_principals_allowed_by_permission(self):
+ from repoze.bfg.security import Allow
+ from repoze.bfg.security import Deny
+ from repoze.bfg.security import DENY_ALL
+ from repoze.bfg.security import ALL_PERMISSIONS
+ root = DummyContext(__name__='', __parent__=None)
+ community = DummyContext(__name__='community', __parent__=root)
+ blog = DummyContext(__name__='blog', __parent__=community)
+ root.__acl__ = [ (Allow, 'chrism', ('read', 'write')),
+ (Allow, 'other', ('read',)),
+ (Allow, 'jim', ALL_PERMISSIONS)]
+ community.__acl__ = [ (Deny, 'flooz', 'read'),
+ (Allow, 'flooz', 'read'),
+ (Allow, 'mork', 'read'),
+ (Deny, 'jim', 'read'),
+ (Allow, 'someguy', 'manage')]
+ blog.__acl__ = [ (Allow, 'fred', 'read'),
+ DENY_ALL]
+
+ policy = self._makeOne(lambda *arg: None)
+ result = sorted(policy.principals_allowed_by_permission(blog, 'read'))
+ self.assertEqual(result, ['fred'])
+ result = sorted(policy.principals_allowed_by_permission(community,
+ 'read'))
+ self.assertEqual(result, ['chrism', 'mork', 'other'])
+ result = sorted(policy.principals_allowed_by_permission(community,
+ 'read'))
+ result = sorted(policy.principals_allowed_by_permission(root, 'read'))
+ self.assertEqual(result, ['chrism', 'jim', 'other'])
+
+ def test_principals_allowed_by_permission_no_acls(self):
+ policy = self._makeOne(lambda *arg: None)
+ context = DummyContext()
+ result = sorted(policy.principals_allowed_by_permission(context,'read'))
+ self.assertEqual(result, [])
+
+ def test_effective_principals(self):
+ context = DummyContext()
+ request = DummyRequest({})
+ request.principals = ['fred']
+ policy = self._makeOne(lambda request: request.principals)
+ result = sorted(policy.effective_principals(request))
+ from repoze.bfg.security import Everyone
+ from repoze.bfg.security import Authenticated
+ self.assertEqual(result,
+ ['fred', Authenticated, Everyone])
+
+ def test_no_effective_principals(self):
+ context = DummyContext()
+ request = DummyRequest({})
+ request.principals = []
+ policy = self._makeOne(lambda request: request.principals)
+ result = sorted(policy.effective_principals(request))
+ from repoze.bfg.security import Everyone
+ self.assertEqual(result, [Everyone])
+
+ def test_authenticated_userid(self):
+ context = DummyContext()
+ request = DummyRequest({})
+ request.principals = ['fred']
+ policy = self._makeOne(lambda request: request.principals)
+ result = policy.authenticated_userid(request)
+ self.assertEqual(result, 'fred')
+
+ def test_no_authenticated_userid(self):
+ context = DummyContext()
+ request = DummyRequest({})
+ request.principals = []
+ policy = self._makeOne(lambda request: request.principals)
+ result = policy.authenticated_userid(request)
+ self.assertEqual(result, None)
+
+class TestRemoteUserACLSecurityPolicy(unittest.TestCase):
+ def setUp(self):
+ cleanUp()
+
+ def tearDown(self):
+ cleanUp()
+
+ def _getTargetClass(self):
+ from repoze.bfg.secpols import RemoteUserACLSecurityPolicy
+ return RemoteUserACLSecurityPolicy
+
+ def _makeOne(self, *arg, **kw):
+ klass = self._getTargetClass()
+ return klass(*arg, **kw)
+
+ def test_instance_implements_ISecurityPolicy(self):
+ from zope.interface.verify import verifyObject
+ from repoze.bfg.interfaces import ISecurityPolicy
+ verifyObject(ISecurityPolicy, self._makeOne())
+
+ def test_authenticated_userid(self):
+ context = DummyContext()
+ request = DummyRequest({'REMOTE_USER':'fred'})
+ policy = self._makeOne()
+ result = policy.authenticated_userid(request)
+ self.assertEqual(result, 'fred')
+
+ def test_authenticated_userid_no_remote_user(self):
+ context = DummyContext()
+ request = DummyRequest({})
+ policy = self._makeOne()
+ result = policy.authenticated_userid(request)
+ self.assertEqual(result, None)
+
+ def test_effective_principals(self):
+ context = DummyContext()
+ request = DummyRequest({'REMOTE_USER':'fred'})
+ policy = self._makeOne()
+ result = policy.effective_principals(request)
+ from repoze.bfg.security import Everyone
+ from repoze.bfg.security import Authenticated
+ self.assertEqual(result, [Everyone, Authenticated, 'fred'])
+
+ def test_effective_principals_no_remote_user(self):
+ context = DummyContext()
+ request = DummyRequest({})
+ policy = self._makeOne()
+ result = policy.effective_principals(request)
+ from repoze.bfg.security import Everyone
+ self.assertEqual(result, [Everyone])
+
+class TestRemoteUserInheritingACLSecurityPolicy(TestRemoteUserACLSecurityPolicy):
+ def _getTargetClass(self):
+ from repoze.bfg.secpols import RemoteUserInheritingACLSecurityPolicy
+ return RemoteUserInheritingACLSecurityPolicy
+
+class TestWhoACLSecurityPolicy(unittest.TestCase):
+ def setUp(self):
+ cleanUp()
+
+ def tearDown(self):
+ cleanUp()
+
+ def _getTargetClass(self):
+ from repoze.bfg.secpols import WhoACLSecurityPolicy
+ return WhoACLSecurityPolicy
+
+ def _makeOne(self, *arg, **kw):
+ klass = self._getTargetClass()
+ return klass(*arg, **kw)
+
+ def test_instance_implements_ISecurityPolicy(self):
+ from zope.interface.verify import verifyObject
+ from repoze.bfg.interfaces import ISecurityPolicy
+ verifyObject(ISecurityPolicy, self._makeOne())
+
+ def test_authenticated_userid(self):
+ context = DummyContext()
+ identity = {'repoze.who.identity':{'repoze.who.userid':'fred'}}
+ request = DummyRequest(identity)
+ policy = self._makeOne()
+ result = policy.authenticated_userid(request)
+ self.assertEqual(result, 'fred')
+
+ def test_authenticated_userid_no_who_ident(self):
+ context = DummyContext()
+ request = DummyRequest({})
+ policy = self._makeOne()
+ result = policy.authenticated_userid(request)
+ self.assertEqual(result, None)
+
+ def test_effective_principals(self):
+ context = DummyContext()
+ identity = {'repoze.who.identity':{'repoze.who.userid':'fred'}}
+ request = DummyRequest(identity)
+ policy = self._makeOne()
+ result = policy.effective_principals(request)
+ from repoze.bfg.security import Everyone
+ from repoze.bfg.security import Authenticated
+ self.assertEqual(result, [Everyone, Authenticated, 'fred'])
+
+ def test_effective_principals_no_who_ident(self):
+ context = DummyContext()
+ request = DummyRequest({})
+ policy = self._makeOne()
+ result = policy.effective_principals(request)
+ from repoze.bfg.security import Everyone
+ self.assertEqual(result, [Everyone])
+
+class TestWhoInheritingACLSecurityPolicy(TestWhoACLSecurityPolicy):
+ def _getTargetClass(self):
+ from repoze.bfg.secpols import WhoInheritingACLSecurityPolicy
+ return WhoInheritingACLSecurityPolicy
+
+class TestSecurityPolicyToAuthenticationPolicyAdapter(unittest.TestCase):
+ def _getTargetClass(self):
+ from repoze.bfg.secpols import \
+ SecurityPolicyToAuthenticationPolicyAdapter
+ return SecurityPolicyToAuthenticationPolicyAdapter
+
+ def _makeOne(self, secpol):
+ return self._getTargetClass()(secpol)
+
+ def test_class_implements_IAuthenticationPolicy(self):
+ from zope.interface.verify import verifyClass
+ from repoze.bfg.interfaces import IAuthenticationPolicy
+ verifyClass(IAuthenticationPolicy, self._getTargetClass())
+
+ def test_instance_implements_IAuthenticationPolicy(self):
+ from zope.interface.verify import verifyObject
+ from repoze.bfg.interfaces import IAuthenticationPolicy
+ verifyObject(IAuthenticationPolicy, self._makeOne(None))
+
+ def test_authenticated_userid(self):
+ secpol = DummySecurityPolicy(None)
+ adapter = self._makeOne(secpol)
+ result = adapter.authenticated_userid(None, None)
+ self.assertEqual(result, 'fred')
+
+ def test_effective_principals(self):
+ secpol = DummySecurityPolicy(None)
+ adapter = self._makeOne(secpol)
+ result = adapter.effective_principals(None, None)
+ self.assertEqual(result, ['fred', 'bob'])
+
+ def test_remember(self):
+ secpol = DummySecurityPolicy(None)
+ adapter = self._makeOne(secpol)
+ result = adapter.remember(None, None, None)
+ self.assertEqual(result, [])
+
+ def test_forget(self):
+ secpol = DummySecurityPolicy(None)
+ adapter = self._makeOne(secpol)
+ result = adapter.forget(None, None)
+ self.assertEqual(result, [])
+
+class TestSecurityPolicyToAuthorizationPolicyAdapter(unittest.TestCase):
+ def _getTargetClass(self):
+ from repoze.bfg.secpols import \
+ SecurityPolicyToAuthorizationPolicyAdapter
+ return SecurityPolicyToAuthorizationPolicyAdapter
+
+ def _makeOne(self, secpol):
+ return self._getTargetClass()(secpol)
+
+ def test_class_implements_IAuthorizationPolicy(self):
+ from zope.interface.verify import verifyClass
+ from repoze.bfg.interfaces import IAuthorizationPolicy
+ verifyClass(IAuthorizationPolicy, self._getTargetClass())
+
+ def test_instance_implements_IAuthorizationPolicy(self):
+ from zope.interface.verify import verifyObject
+ from repoze.bfg.interfaces import IAuthorizationPolicy
+ verifyObject(IAuthorizationPolicy, self._makeOne(None))
+
+ def test_permits(self):
+ from repoze.bfg.threadlocal import manager
+ manager.push({'request':1})
+ try:
+ secpol = DummySecurityPolicy(None)
+ adapter = self._makeOne(secpol)
+ result = adapter.permits(None, None, None)
+ self.assertEqual(result, None)
+ self.assertEqual(secpol.checked, (None, 1, None))
+ finally:
+ manager.pop()
+
+ def test_principals_allowed_by_permission(self):
+ secpol = DummySecurityPolicy(None)
+ adapter = self._makeOne(secpol)
+ result = adapter.principals_allowed_by_permission(None, None)
+ self.assertEqual(result, ['fred', 'bob'])
+
+
+
+class DummyContext:
+ def __init__(self, *arg, **kw):
+ self.__dict__.update(kw)
+
+class DummyRequest:
+ def __init__(self, environ):
+ self.environ = environ
+
+
+VIEW = 'view'
+EDIT = 'edit'
+CREATE = 'create'
+DELETE = 'delete'
+MODERATE = 'moderate'
+ADMINISTER = 'administer'
+COMMENT = 'comment'
+
+GUEST_PERMS = (VIEW, COMMENT)
+MEMBER_PERMS = GUEST_PERMS + (EDIT, CREATE, DELETE)
+MODERATOR_PERMS = MEMBER_PERMS + (MODERATE,)
+ADMINISTRATOR_PERMS = MODERATOR_PERMS + (ADMINISTER,)
+
+class DummySecurityPolicy:
+ def __init__(self, result):
+ self.result = result
+
+ def permits(self, *args):
+ self.checked = args
+ return self.result
+
+ def authenticated_userid(self, request):
+ return 'fred'
+
+ def effective_principals(self, request):
+ return ['fred', 'bob']
+
+ def principals_allowed_by_permission(self, context, permission):
+ return ['fred', 'bob']
+
diff --git a/repoze/bfg/tests/test_security.py b/repoze/bfg/tests/test_security.py
index 03a466e7c..3f18d3a4a 100644
--- a/repoze/bfg/tests/test_security.py
+++ b/repoze/bfg/tests/test_security.py
@@ -2,433 +2,6 @@ import unittest
from repoze.bfg.testing import cleanUp
-class TestACLSecurityPolicy(unittest.TestCase):
- def setUp(self):
- cleanUp()
-
- def tearDown(self):
- cleanUp()
-
- def _getTargetClass(self):
- from repoze.bfg.security import ACLSecurityPolicy
- return ACLSecurityPolicy
-
- def _makeOne(self, *arg, **kw):
- klass = self._getTargetClass()
- return klass(*arg, **kw)
-
- def test_class_implements_ISecurityPolicy(self):
- from zope.interface.verify import verifyClass
- from repoze.bfg.interfaces import ISecurityPolicy
- verifyClass(ISecurityPolicy, self._getTargetClass())
-
- def test_instance_implements_ISecurityPolicy(self):
- from zope.interface.verify import verifyObject
- from repoze.bfg.interfaces import ISecurityPolicy
- verifyObject(ISecurityPolicy, self._makeOne(lambda *arg: None))
-
- def test_permits_no_principals_no_acl_info_on_context(self):
- context = DummyContext()
- request = DummyRequest({})
- policy = self._makeOne(lambda *arg: [])
- result = policy.permits(context, request, 'view')
- self.assertEqual(result, False)
- from repoze.bfg.security import Everyone
- self.assertEqual(result.principals, set([Everyone]))
- self.assertEqual(result.permission, 'view')
- self.assertEqual(result.context, context)
-
- def test_permits_no_principals_empty_acl_info_on_context(self):
- context = DummyContext()
- context.__acl__ = []
- request = DummyRequest({})
- policy = self._makeOne(lambda *arg: [])
- result = policy.permits(context, request, 'view')
- self.assertEqual(result, False)
- from repoze.bfg.security import Everyone
- self.assertEqual(result.principals, set([Everyone]))
- self.assertEqual(result.permission, 'view')
- self.assertEqual(result.context, context)
-
- def test_permits_no_principals_root_has_empty_acl_info(self):
- context = DummyContext()
- context.__name__ = None
- context.__parent__ = None
- context.__acl__ = []
- context2 = DummyContext()
- context2.__name__ = 'context2'
- context2.__parent__ = context
- request = DummyRequest({})
- policy = self._makeOne(lambda *arg: [])
- result = policy.permits(context, request, 'view')
- self.assertEqual(result, False)
- from repoze.bfg.security import Everyone
- self.assertEqual(result.principals, set([Everyone]))
- self.assertEqual(result.permission, 'view')
- self.assertEqual(result.context, context)
-
- def test_permits_no_principals_root_allows_everyone(self):
- context = DummyContext()
- context.__name__ = None
- context.__parent__ = None
- from repoze.bfg.security import Allow, Everyone
- context.__acl__ = [ (Allow, Everyone, 'view') ]
- context2 = DummyContext()
- context2.__name__ = 'context2'
- context2.__parent__ = context
- request = DummyRequest({})
- policy = self._makeOne(lambda *arg: [])
- result = policy.permits(context, request, 'view')
- self.assertEqual(result, True)
- self.assertEqual(result.principals, set([Everyone]))
- self.assertEqual(result.permission, 'view')
- self.assertEqual(result.context, context)
-
- def test_permits_deny_implicit(self):
- from repoze.bfg.security import Allow, Authenticated, Everyone
- context = DummyContext()
- context.__acl__ = [ (Allow, 'somebodyelse', 'read') ]
- policy = self._makeOne(lambda *arg: ['fred'])
- request = DummyRequest({})
- result = policy.permits(context, request, 'read')
- self.assertEqual(result, False)
- self.assertEqual(result.principals,
- set(['fred', Authenticated, Everyone]))
- self.assertEqual(result.permission, 'read')
- self.assertEqual(result.context, context)
- self.assertEqual(result.ace, None)
-
- def test_permits_deny_explicit(self):
- from repoze.bfg.security import Deny, Authenticated, Everyone
- context = DummyContext()
- context.__acl__ = [ (Deny, 'fred', 'read') ]
- policy = self._makeOne(lambda *arg: ['fred'])
- request = DummyRequest({})
- result = policy.permits(context, request, 'read')
- self.assertEqual(result, False)
- self.assertEqual(result.principals,
- set(['fred', Authenticated, Everyone]))
- self.assertEqual(result.permission, 'read')
- self.assertEqual(result.context, context)
- self.assertEqual(result.ace, (Deny, 'fred', 'read'))
-
- def test_permits_deny_twoacl_implicit(self):
- from repoze.bfg.security import Allow, Authenticated, Everyone
- context = DummyContext()
- acl = [(Allow, 'somebody', 'view'), (Allow, 'somebody', 'write')]
- context.__acl__ = acl
- policy = self._makeOne(lambda *arg: ['fred'])
- request = DummyRequest({})
- result = policy.permits(context, request, 'read')
- self.assertEqual(result, False)
- self.assertEqual(result.principals,
- set(['fred', Authenticated, Everyone]))
- self.assertEqual(result.permission, 'read')
- self.assertEqual(result.context, context)
- self.assertEqual(result.ace, None)
-
- def test_permits_allow_twoacl_multiperm(self):
- from repoze.bfg.security import Allow, Deny, Authenticated, Everyone
- context = DummyContext()
- acl = [ (Allow, 'fred', ('write', 'view') ), (Deny, 'fred', 'view') ]
- context.__acl__ = acl
- policy = self._makeOne(lambda *arg: ['fred'])
- request = DummyRequest({})
- result = policy.permits(context, request, 'view')
- self.assertEqual(result, True)
- self.assertEqual(result.principals,
- set(['fred', Authenticated, Everyone]))
- self.assertEqual(result.permission, 'view')
- self.assertEqual(result.context, context)
- self.assertEqual(result.ace, (Allow, 'fred', ('write', 'view') ))
-
- def test_permits_deny_twoacl_multiperm(self):
- from repoze.bfg.security import Allow, Deny, Authenticated, Everyone
- context = DummyContext()
- acl = []
- deny = (Deny, 'fred', ('view', 'read'))
- allow = (Allow, 'fred', 'view')
- context.__acl__ = [deny, allow]
- policy = self._makeOne(lambda *arg: ['fred'])
- request = DummyRequest({})
- result = policy.permits(context, request, 'read')
- self.assertEqual(result, False)
- self.assertEqual(result.principals,
- set(['fred', Authenticated, Everyone]))
- self.assertEqual(result.permission, 'read')
- self.assertEqual(result.context, context)
- self.assertEqual(result.ace, deny)
-
- def test_permits_allow_via_location_parent(self):
- from repoze.bfg.security import Allow, Authenticated, Everyone
- context = DummyContext()
- context.__parent__ = None
- context.__name__ = None
- context.__acl__ = [ (Allow, 'fred', 'read') ]
- context2 = DummyContext()
- context2.__parent__ = context
- context2.__name__ = 'myname'
-
- policy = self._makeOne(lambda *arg: ['fred'])
- request = DummyRequest({})
- result = policy.permits(context2, request, 'read')
- self.assertEqual(result, True)
- self.assertEqual(result.principals,
- set(['fred', Authenticated, Everyone]))
- self.assertEqual(result.permission, 'read')
- self.assertEqual(result.context, context)
- self.assertEqual(result.ace, ('Allow', 'fred', 'read'))
-
- def test_permits_deny_byorder(self):
- from repoze.bfg.security import Allow, Deny, Authenticated, Everyone
- context = DummyContext()
- acl = []
- deny = (Deny, 'fred', 'read')
- allow = (Allow, 'fred', 'view')
- context.__acl__ = [deny, allow]
- policy = self._makeOne(lambda *arg: ['fred'])
- request = DummyRequest({})
- result = policy.permits(context, request, 'read')
- self.assertEqual(result, False)
- self.assertEqual(result.principals,
- set(['fred', Authenticated, Everyone]))
- self.assertEqual(result.permission, 'read')
- self.assertEqual(result.context, context)
- self.assertEqual(result.ace, deny)
-
- def test_permits_allow_byorder(self):
- from repoze.bfg.security import Allow, Deny, Authenticated, Everyone
- context = DummyContext()
- acl = []
- deny = (Deny, 'fred', ('view', 'read'))
- allow = (Allow, 'fred', 'view')
- context.__acl__ = [allow, deny]
- policy = self._makeOne(lambda *arg: ['fred'])
- request = DummyRequest({})
- result = policy.permits(context, request, 'view')
- self.assertEqual(result, True)
- self.assertEqual(result.principals,
- set(['fred', Authenticated, Everyone]))
- self.assertEqual(result.permission, 'view')
- self.assertEqual(result.context, context)
- self.assertEqual(result.ace, allow)
-
- def test_principals_allowed_by_permission_direct(self):
- from repoze.bfg.security import Allow
- context = DummyContext()
- acl = [ (Allow, 'chrism', ('read', 'write')),
- (Allow, 'other', 'read') ]
- context.__acl__ = acl
- policy = self._makeOne(lambda *arg: None)
- result = policy.principals_allowed_by_permission(context, 'read')
- self.assertEqual(result, ['chrism', 'other'])
-
- def test_principals_allowed_by_permission_acquired(self):
- from repoze.bfg.security import Allow
- context = DummyContext()
- acl = [ (Allow, 'chrism', ('read', 'write')),
- (Allow, 'other', ('read',)) ]
- context.__acl__ = acl
- context.__parent__ = None
- context.__name__ = 'context'
- inter = DummyContext()
- inter.__name__ = None
- inter.__parent__ = context
- policy = self._makeOne(lambda *arg: None)
- result = policy.principals_allowed_by_permission(inter, 'read')
- self.assertEqual(result, ['chrism', 'other'])
-
- def test_principals_allowed_by_permission_no_acls(self):
- policy = self._makeOne(lambda *arg: None)
- result = policy.principals_allowed_by_permission(None, 'read')
- self.assertEqual(result, [])
-
-class TestInheritingACLSecurityPolicy(unittest.TestCase):
- def setUp(self):
- cleanUp()
-
- def tearDown(self):
- cleanUp()
-
- def _getTargetClass(self):
- from repoze.bfg.security import InheritingACLSecurityPolicy
- return InheritingACLSecurityPolicy
-
- def _makeOne(self, *arg, **kw):
- klass = self._getTargetClass()
- return klass(*arg, **kw)
-
- def test_class_implements_ISecurityPolicy(self):
- from zope.interface.verify import verifyClass
- from repoze.bfg.interfaces import ISecurityPolicy
- verifyClass(ISecurityPolicy, self._getTargetClass())
-
- def test_instance_implements_ISecurityPolicy(self):
- from zope.interface.verify import verifyObject
- from repoze.bfg.interfaces import ISecurityPolicy
- verifyObject(ISecurityPolicy, self._makeOne(lambda *arg: None))
-
- def test_permits(self):
- from repoze.bfg.security import Deny
- from repoze.bfg.security import Allow
- from repoze.bfg.security import Everyone
- from repoze.bfg.security import Authenticated
- from repoze.bfg.security import ALL_PERMISSIONS
- from repoze.bfg.security import DENY_ALL
- policy = self._makeOne(lambda *arg: [])
- root = DummyContext()
- community = DummyContext(__name__='community', __parent__=root)
- blog = DummyContext(__name__='blog', __parent__=community)
- root.__acl__ = [
- (Allow, Authenticated, VIEW),
- ]
- community.__acl__ = [
- (Allow, 'fred', ALL_PERMISSIONS),
- (Allow, 'wilma', VIEW),
- DENY_ALL,
- ]
- blog.__acl__ = [
- (Allow, 'barney', MEMBER_PERMS),
- (Allow, 'wilma', VIEW),
- ]
- policy = self._makeOne(lambda request: request.principals)
- request = DummyRequest({})
-
- request.principals = ['wilma']
- result = policy.permits(blog, request, 'view')
- self.assertEqual(result, True)
- self.assertEqual(result.context, blog)
- self.assertEqual(result.ace, (Allow, 'wilma', VIEW))
- result = policy.permits(blog, request, 'delete')
- self.assertEqual(result, False)
- self.assertEqual(result.context, community)
- self.assertEqual(result.ace, (Deny, Everyone, ALL_PERMISSIONS))
-
- request.principals = ['fred']
- result = policy.permits(blog, request, 'view')
- self.assertEqual(result, True)
- self.assertEqual(result.context, community)
- self.assertEqual(result.ace, (Allow, 'fred', ALL_PERMISSIONS))
- result = policy.permits(blog, request, 'doesntevenexistyet')
- self.assertEqual(result, True)
- self.assertEqual(result.context, community)
- self.assertEqual(result.ace, (Allow, 'fred', ALL_PERMISSIONS))
-
- request.principals = ['barney']
- result = policy.permits(blog, request, 'view')
- self.assertEqual(result, True)
- self.assertEqual(result.context, blog)
- self.assertEqual(result.ace, (Allow, 'barney', MEMBER_PERMS))
- result = policy.permits(blog, request, 'administer')
- self.assertEqual(result, False)
- self.assertEqual(result.context, community)
- self.assertEqual(result.ace, (Deny, Everyone, ALL_PERMISSIONS))
-
- request.principals = ['someguy']
- result = policy.permits(root, request, 'view')
- self.assertEqual(result, True)
- self.assertEqual(result.context, root)
- self.assertEqual(result.ace, (Allow, Authenticated, VIEW))
- result = policy.permits(blog, request, 'view')
- self.assertEqual(result, False)
- self.assertEqual(result.context, community)
- self.assertEqual(result.ace, (Deny, Everyone, ALL_PERMISSIONS))
-
- request.principals = []
- result = policy.permits(root, request, 'view')
- self.assertEqual(result, False)
- self.assertEqual(result.context, root)
- self.assertEqual(result.ace, None)
-
- request.principals = []
- context = DummyContext()
- result = policy.permits(context, request, 'view')
- self.assertEqual(result, False)
-
- def test_principals_allowed_by_permission_direct(self):
- from repoze.bfg.security import Allow
- from repoze.bfg.security import DENY_ALL
- context = DummyContext()
- acl = [ (Allow, 'chrism', ('read', 'write')),
- DENY_ALL,
- (Allow, 'other', 'read') ]
- context.__acl__ = acl
- policy = self._makeOne(lambda *arg: None)
- result = sorted(
- policy.principals_allowed_by_permission(context, 'read'))
- self.assertEqual(result, ['chrism'])
-
- def test_principals_allowed_by_permission(self):
- from repoze.bfg.security import Allow
- from repoze.bfg.security import Deny
- from repoze.bfg.security import DENY_ALL
- from repoze.bfg.security import ALL_PERMISSIONS
- root = DummyContext(__name__='', __parent__=None)
- community = DummyContext(__name__='community', __parent__=root)
- blog = DummyContext(__name__='blog', __parent__=community)
- root.__acl__ = [ (Allow, 'chrism', ('read', 'write')),
- (Allow, 'other', ('read',)),
- (Allow, 'jim', ALL_PERMISSIONS)]
- community.__acl__ = [ (Deny, 'flooz', 'read'),
- (Allow, 'flooz', 'read'),
- (Allow, 'mork', 'read'),
- (Deny, 'jim', 'read'),
- (Allow, 'someguy', 'manage')]
- blog.__acl__ = [ (Allow, 'fred', 'read'),
- DENY_ALL]
-
- policy = self._makeOne(lambda *arg: None)
- result = sorted(policy.principals_allowed_by_permission(blog, 'read'))
- self.assertEqual(result, ['fred'])
- result = sorted(policy.principals_allowed_by_permission(community,
- 'read'))
- self.assertEqual(result, ['chrism', 'mork', 'other'])
- result = sorted(policy.principals_allowed_by_permission(community,
- 'read'))
- result = sorted(policy.principals_allowed_by_permission(root, 'read'))
- self.assertEqual(result, ['chrism', 'jim', 'other'])
-
- def test_principals_allowed_by_permission_no_acls(self):
- policy = self._makeOne(lambda *arg: None)
- context = DummyContext()
- result = sorted(policy.principals_allowed_by_permission(context,'read'))
- self.assertEqual(result, [])
-
- def test_effective_principals(self):
- context = DummyContext()
- request = DummyRequest({})
- request.principals = ['fred']
- policy = self._makeOne(lambda request: request.principals)
- result = sorted(policy.effective_principals(request))
- from repoze.bfg.security import Everyone
- from repoze.bfg.security import Authenticated
- self.assertEqual(result,
- ['fred', Authenticated, Everyone])
-
- def test_no_effective_principals(self):
- context = DummyContext()
- request = DummyRequest({})
- request.principals = []
- policy = self._makeOne(lambda request: request.principals)
- result = sorted(policy.effective_principals(request))
- from repoze.bfg.security import Everyone
- self.assertEqual(result, [Everyone])
-
- def test_authenticated_userid(self):
- context = DummyContext()
- request = DummyRequest({})
- request.principals = ['fred']
- policy = self._makeOne(lambda request: request.principals)
- result = policy.authenticated_userid(request)
- self.assertEqual(result, 'fred')
-
- def test_no_authenticated_userid(self):
- context = DummyContext()
- request = DummyRequest({})
- request.principals = []
- policy = self._makeOne(lambda request: request.principals)
- result = policy.authenticated_userid(request)
- self.assertEqual(result, None)
class TestAllPermissionsList(unittest.TestCase):
def setUp(self):
@@ -454,211 +27,13 @@ class TestAllPermissionsList(unittest.TestCase):
from repoze.bfg.security import ALL_PERMISSIONS
self.assertEqual(ALL_PERMISSIONS.__class__, self._getTargetClass())
-class TestRemoteUserACLSecurityPolicy(unittest.TestCase):
- def setUp(self):
- cleanUp()
-
- def tearDown(self):
- cleanUp()
-
- def _getTargetClass(self):
- from repoze.bfg.security import RemoteUserACLSecurityPolicy
- return RemoteUserACLSecurityPolicy
-
- def _makeOne(self, *arg, **kw):
- klass = self._getTargetClass()
- return klass(*arg, **kw)
-
- def test_instance_implements_ISecurityPolicy(self):
- from zope.interface.verify import verifyObject
- from repoze.bfg.interfaces import ISecurityPolicy
- verifyObject(ISecurityPolicy, self._makeOne())
-
- def test_authenticated_userid(self):
- context = DummyContext()
- request = DummyRequest({'REMOTE_USER':'fred'})
- policy = self._makeOne()
- result = policy.authenticated_userid(request)
- self.assertEqual(result, 'fred')
-
- def test_authenticated_userid_no_remote_user(self):
- context = DummyContext()
- request = DummyRequest({})
- policy = self._makeOne()
- result = policy.authenticated_userid(request)
- self.assertEqual(result, None)
-
- def test_effective_principals(self):
- context = DummyContext()
- request = DummyRequest({'REMOTE_USER':'fred'})
- policy = self._makeOne()
- result = policy.effective_principals(request)
- from repoze.bfg.security import Everyone
- from repoze.bfg.security import Authenticated
- self.assertEqual(result, [Everyone, Authenticated, 'fred'])
-
- def test_effective_principals_no_remote_user(self):
- context = DummyContext()
- request = DummyRequest({})
- policy = self._makeOne()
- result = policy.effective_principals(request)
- from repoze.bfg.security import Everyone
- self.assertEqual(result, [Everyone])
-
-class TestRemoteUserInheritingACLSecurityPolicy(TestRemoteUserACLSecurityPolicy):
- def _getTargetClass(self):
- from repoze.bfg.security import RemoteUserInheritingACLSecurityPolicy
- return RemoteUserInheritingACLSecurityPolicy
-
-class TestWhoACLSecurityPolicy(unittest.TestCase):
+class TestViewPermissionFactory(unittest.TestCase):
def setUp(self):
cleanUp()
def tearDown(self):
cleanUp()
-
- def _getTargetClass(self):
- from repoze.bfg.security import WhoACLSecurityPolicy
- return WhoACLSecurityPolicy
-
- def _makeOne(self, *arg, **kw):
- klass = self._getTargetClass()
- return klass(*arg, **kw)
-
- def test_instance_implements_ISecurityPolicy(self):
- from zope.interface.verify import verifyObject
- from repoze.bfg.interfaces import ISecurityPolicy
- verifyObject(ISecurityPolicy, self._makeOne())
-
- def test_authenticated_userid(self):
- context = DummyContext()
- identity = {'repoze.who.identity':{'repoze.who.userid':'fred'}}
- request = DummyRequest(identity)
- policy = self._makeOne()
- result = policy.authenticated_userid(request)
- self.assertEqual(result, 'fred')
-
- def test_authenticated_userid_no_who_ident(self):
- context = DummyContext()
- request = DummyRequest({})
- policy = self._makeOne()
- result = policy.authenticated_userid(request)
- self.assertEqual(result, None)
-
- def test_effective_principals(self):
- context = DummyContext()
- identity = {'repoze.who.identity':{'repoze.who.userid':'fred'}}
- request = DummyRequest(identity)
- policy = self._makeOne()
- result = policy.effective_principals(request)
- from repoze.bfg.security import Everyone
- from repoze.bfg.security import Authenticated
- self.assertEqual(result, [Everyone, Authenticated, 'fred'])
-
- def test_effective_principals_no_who_ident(self):
- context = DummyContext()
- request = DummyRequest({})
- policy = self._makeOne()
- result = policy.effective_principals(request)
- from repoze.bfg.security import Everyone
- self.assertEqual(result, [Everyone])
-
-class TestWhoInheritingACLSecurityPolicy(TestWhoACLSecurityPolicy):
- def _getTargetClass(self):
- from repoze.bfg.security import WhoInheritingACLSecurityPolicy
- return WhoInheritingACLSecurityPolicy
-
-class TestAPIFunctions(unittest.TestCase):
- def setUp(self):
- cleanUp()
- def tearDown(self):
- cleanUp()
-
- def _registerSecurityPolicy(self, secpol):
- import zope.component
- gsm = zope.component.getGlobalSiteManager()
- from repoze.bfg.interfaces import ISecurityPolicy
- gsm.registerUtility(secpol, ISecurityPolicy)
-
- def test_has_permission_registered(self):
- secpol = DummySecurityPolicy(False)
- self._registerSecurityPolicy(secpol)
- from repoze.bfg.security import has_permission
- self.assertEqual(has_permission('view', None, None), False)
-
- def test_has_permission_not_registered(self):
- from repoze.bfg.security import has_permission
- result = has_permission('view', None, None)
- self.assertEqual(result, True)
- self.assertEqual(result.msg, 'No security policy in use.')
-
- def test_authenticated_userid_registered(self):
- secpol = DummySecurityPolicy(False)
- self._registerSecurityPolicy(secpol)
- from repoze.bfg.security import authenticated_userid
- request = DummyRequest({})
- self.assertEqual(authenticated_userid(request), 'fred')
-
- def test_authenticated_userid_not_registered(self):
- from repoze.bfg.security import authenticated_userid
- request = DummyRequest({})
- self.assertEqual(authenticated_userid(request), None)
-
- def test_effective_principals_registered(self):
- secpol = DummySecurityPolicy(False)
- self._registerSecurityPolicy(secpol)
- from repoze.bfg.security import effective_principals
- request = DummyRequest({})
- self.assertEqual(effective_principals(request), ['fred', 'bob'])
-
- def test_effective_principals_not_registered(self):
- from repoze.bfg.security import effective_principals
- request = DummyRequest({})
- self.assertEqual(effective_principals(request), [])
-
- def test_principals_allowed_by_permission_not_registered(self):
- from repoze.bfg.security import principals_allowed_by_permission
- from repoze.bfg.security import Everyone
- self.assertEqual(principals_allowed_by_permission(None, None),
- [Everyone])
-
- def test_principals_allowed_by_permission_registered(self):
- secpol = DummySecurityPolicy(False)
- self._registerSecurityPolicy(secpol)
- from repoze.bfg.security import principals_allowed_by_permission
- self.assertEqual(principals_allowed_by_permission(None, None),
- ['fred', 'bob'])
-
-class TestViewPermission(unittest.TestCase):
- def _getTargetClass(self):
- from repoze.bfg.security import ViewPermission
- return ViewPermission
-
- def _makeOne(self, *arg, **kw):
- klass = self._getTargetClass()
- return klass(*arg, **kw)
-
- def test_call(self):
- context = DummyContext()
- request = DummyRequest({})
- secpol = DummySecurityPolicy(True)
- permission = self._makeOne(context, request, 'repoze.view')
- result = permission(secpol)
- self.assertEqual(result, True)
- self.assertEqual(secpol.checked, (context, request, 'repoze.view'))
-
- def test_repr(self):
- context = DummyContext()
- request = DummyRequest({})
- request.view_name = 'viewname'
- secpol = DummySecurityPolicy(True)
- permission = self._makeOne(context, request, 'repoze.view')
- result = repr(permission)
- self.failUnless(result.startswith('<Permission at '))
- self.failUnless(result.endswith(" named 'repoze.view' for 'viewname'>"))
-
-class TestViewPermissionFactory(unittest.TestCase):
def _getTargetClass(self):
from repoze.bfg.security import ViewPermissionFactory
return ViewPermissionFactory
@@ -671,10 +46,9 @@ class TestViewPermissionFactory(unittest.TestCase):
context = DummyContext()
request = DummyRequest({})
factory = self._makeOne('repoze.view')
+ self.assertEqual(factory.permission_name, 'repoze.view')
result = factory(context, request)
- self.assertEqual(result.permission_name, 'repoze.view')
- self.assertEqual(result.context, context)
- self.assertEqual(result.request, request)
+ self.assertEqual(result, True)
class TestAllowed(unittest.TestCase):
def _getTargetClass(self):
@@ -752,6 +126,222 @@ class TestACLDenied(unittest.TestCase):
self.failUnless('<ACLDenied instance at ' in repr(denied))
self.failUnless("with msg %r>" % msg in repr(denied))
+class TestViewExecutionPermitted(unittest.TestCase):
+ def setUp(self):
+ cleanUp()
+
+ def tearDown(self):
+ cleanUp()
+
+ def _callFUT(self, *arg, **kw):
+ from repoze.bfg.security import view_execution_permitted
+ return view_execution_permitted(*arg, **kw)
+
+ def _registerViewPermission(self, view_name, allow=True):
+ import zope.component
+ from zope.interface import Interface
+ from repoze.bfg.interfaces import IViewPermission
+ class Checker(object):
+ def __call__(self, context, request):
+ self.context = context
+ self.request = request
+ return allow
+ checker = Checker()
+ gsm = zope.component.getGlobalSiteManager()
+ gsm.registerAdapter(checker, (Interface, Interface),
+ IViewPermission,
+ view_name)
+ return checker
+
+ def test_no_permission(self):
+ import zope.component
+ gsm = zope.component.getGlobalSiteManager()
+ from repoze.bfg.interfaces import ISettings
+ settings = DummySettings(debug_authorization=True)
+ gsm.registerUtility(settings, ISettings)
+ context = DummyContext()
+ request = DummyRequest({})
+ result = self._callFUT(context, request, '')
+ msg = result.msg
+ self.failUnless("Allowed: view name '' in context" in msg)
+ self.failUnless('(no permission defined)' in msg)
+ self.assertEqual(result, True)
+
+ def test_with_permission(self):
+ from zope.interface import Interface
+ from zope.interface import directlyProvides
+ from repoze.bfg.interfaces import IRequest
+ class IContext(Interface):
+ pass
+ context = DummyContext()
+ directlyProvides(context, IContext)
+ checker = self._registerViewPermission('', True)
+ request = DummyRequest({})
+ directlyProvides(request, IRequest)
+ result = self._callFUT(context, request, '')
+ self.failUnless(result is True)
+
+def _registerAuthenticationPolicy(result):
+ from repoze.bfg.interfaces import IAuthenticationPolicy
+ policy = DummyAuthenticationPolicy(result)
+ import zope.component
+ gsm = zope.component.getGlobalSiteManager()
+ gsm.registerUtility(policy, IAuthenticationPolicy)
+ return policy
+
+def _registerAuthorizationPolicy(result):
+ from repoze.bfg.interfaces import IAuthorizationPolicy
+ policy = DummyAuthorizationPolicy(result)
+ import zope.component
+ gsm = zope.component.getGlobalSiteManager()
+ gsm.registerUtility(policy, IAuthorizationPolicy)
+ return policy
+
+
+class TestHasPermission(unittest.TestCase):
+ def setUp(self):
+ cleanUp()
+
+ def tearDown(self):
+ cleanUp()
+
+ def _callFUT(self, *arg):
+ from repoze.bfg.security import has_permission
+ return has_permission(*arg)
+
+ def test_no_authentication_policy(self):
+ result = self._callFUT('view', None, None)
+ self.assertEqual(result, True)
+ self.assertEqual(result.msg, 'No authentication policy in use.')
+
+ def test_authentication_policy_no_authorization_policy(self):
+ _registerAuthenticationPolicy(None)
+ self.assertRaises(ValueError, self._callFUT, 'view', None, None)
+
+ def test_authn_and_authz_policies_registered(self):
+ _registerAuthenticationPolicy(None)
+ pol = _registerAuthorizationPolicy('yo')
+ self.assertEqual(self._callFUT('view', None, None), 'yo')
+
+class TestAuthenticatedUserId(unittest.TestCase):
+ def setUp(self):
+ cleanUp()
+
+ def tearDown(self):
+ cleanUp()
+
+ def _callFUT(self, *arg):
+ from repoze.bfg.security import authenticated_userid
+ return authenticated_userid(*arg)
+
+ def test_no_authentication_policy(self):
+ context = DummyContext()
+ request = DummyRequest({})
+ result = self._callFUT(context, request)
+ self.assertEqual(result, None)
+
+ def test_with_authentication_policy(self):
+ _registerAuthenticationPolicy('yo')
+ context = DummyContext()
+ request = DummyRequest({})
+ result = self._callFUT(context, request)
+ self.assertEqual(result, 'yo')
+
+class TestEffectivePrincipals(unittest.TestCase):
+ def setUp(self):
+ cleanUp()
+
+ def tearDown(self):
+ cleanUp()
+
+ def _callFUT(self, *arg):
+ from repoze.bfg.security import effective_principals
+ return effective_principals(*arg)
+
+ def test_no_authentication_policy(self):
+ context = DummyContext()
+ request = DummyRequest({})
+ result = self._callFUT(context, request)
+ self.assertEqual(result, [])
+
+ def test_with_authentication_policy(self):
+ _registerAuthenticationPolicy('yo')
+ context = DummyContext()
+ request = DummyRequest({})
+ result = self._callFUT(context, request)
+ self.assertEqual(result, 'yo')
+
+class TestPrincipalsAllowedByPermission(unittest.TestCase):
+ def setUp(self):
+ cleanUp()
+
+ def tearDown(self):
+ cleanUp()
+
+ def _callFUT(self, *arg):
+ from repoze.bfg.security import principals_allowed_by_permission
+ return principals_allowed_by_permission(*arg)
+
+ def test_no_authorization_policy(self):
+ from repoze.bfg.security import Everyone
+ context = DummyContext()
+ result = self._callFUT(context, 'view')
+ self.assertEqual(result, [Everyone])
+
+ def test_with_authorization_policy(self):
+ _registerAuthorizationPolicy('yo')
+ context = DummyContext()
+ result = self._callFUT(context, 'view')
+ self.assertEqual(result, 'yo')
+
+class TestRemember(unittest.TestCase):
+ def setUp(self):
+ cleanUp()
+
+ def tearDown(self):
+ cleanUp()
+
+ def _callFUT(self, *arg):
+ from repoze.bfg.security import remember
+ return remember(*arg)
+
+ def test_no_authentication_policy(self):
+ context = DummyContext()
+ request = DummyRequest({})
+ result = self._callFUT(context, request, 'me')
+ self.assertEqual(result, [])
+
+ def test_with_authentication_policy(self):
+ _registerAuthenticationPolicy('yo')
+ context = DummyContext()
+ request = DummyRequest({})
+ result = self._callFUT(context, request, 'me')
+ self.assertEqual(result, 'yo')
+
+class TestForget(unittest.TestCase):
+ def setUp(self):
+ cleanUp()
+
+ def tearDown(self):
+ cleanUp()
+
+ def _callFUT(self, *arg):
+ from repoze.bfg.security import forget
+ return forget(*arg)
+
+ def test_no_authentication_policy(self):
+ context = DummyContext()
+ request = DummyRequest({})
+ result = self._callFUT(context, request)
+ self.assertEqual(result, [])
+
+ def test_with_authentication_policy(self):
+ _registerAuthenticationPolicy('yo')
+ context = DummyContext()
+ request = DummyRequest({})
+ result = self._callFUT(context, request)
+ self.assertEqual(result, 'yo')
+
class DummyContext:
def __init__(self, *arg, **kw):
self.__dict__.update(kw)
@@ -760,33 +350,33 @@ class DummyRequest:
def __init__(self, environ):
self.environ = environ
-class DummySecurityPolicy:
+class DummyAuthenticationPolicy:
def __init__(self, result):
self.result = result
- def permits(self, *args):
- self.checked = args
+ def effective_principals(self, context, request):
return self.result
- def authenticated_userid(self, request):
- return 'fred'
+ def authenticated_userid(self, context, request):
+ return self.result
- def effective_principals(self, request):
- return ['fred', 'bob']
+ def remember(self, context, request, principal, **kw):
+ return self.result
+
+ def forget(self, context, request):
+ return self.result
+
+class DummyAuthorizationPolicy:
+ def __init__(self, result):
+ self.result = result
+
+ def permits(self, context, principals, permission):
+ return self.result
def principals_allowed_by_permission(self, context, permission):
- return ['fred', 'bob']
-
-VIEW = 'view'
-EDIT = 'edit'
-CREATE = 'create'
-DELETE = 'delete'
-MODERATE = 'moderate'
-ADMINISTER = 'administer'
-COMMENT = 'comment'
-
-GUEST_PERMS = (VIEW, COMMENT)
-MEMBER_PERMS = GUEST_PERMS + (EDIT, CREATE, DELETE)
-MODERATOR_PERMS = MEMBER_PERMS + (MODERATE,)
-ADMINISTRATOR_PERMS = MODERATOR_PERMS + (ADMINISTER,)
+ return self.result
+
+class DummySettings:
+ def __init__(self, **kw):
+ self.__dict__.update(kw)
diff --git a/repoze/bfg/tests/test_testing.py b/repoze/bfg/tests/test_testing.py
index 504151ce2..5024c41ba 100644
--- a/repoze/bfg/tests/test_testing.py
+++ b/repoze/bfg/tests/test_testing.py
@@ -8,29 +8,20 @@ class TestTestingFunctions(unittest.TestCase):
def tearDown(self):
cleanUp()
- def test_registerDummySecurityPolicy_permissive(self):
- from repoze.bfg import testing
- testing.registerDummySecurityPolicy('user', ('group1', 'group2'),
- permissive=True)
- from repoze.bfg.interfaces import ISecurityPolicy
- from zope.component import getUtility
- ut = getUtility(ISecurityPolicy)
- from repoze.bfg.testing import DummyAllowingSecurityPolicy
- self.failUnless(isinstance(ut, DummyAllowingSecurityPolicy))
- self.assertEqual(ut.userid, 'user')
- self.assertEqual(ut.groupids, ('group1', 'group2'))
-
- def test_registerDummySecurityPolicy_nonpermissive(self):
+ def test_registerDummySecurityPolicy(self):
from repoze.bfg import testing
testing.registerDummySecurityPolicy('user', ('group1', 'group2'),
permissive=False)
- from repoze.bfg.interfaces import ISecurityPolicy
+ from repoze.bfg.interfaces import IAuthenticationPolicy
+ from repoze.bfg.interfaces import IAuthorizationPolicy
from zope.component import getUtility
- ut = getUtility(ISecurityPolicy)
- from repoze.bfg.testing import DummyDenyingSecurityPolicy
- self.failUnless(isinstance(ut, DummyDenyingSecurityPolicy))
+ ut = getUtility(IAuthenticationPolicy)
+ from repoze.bfg.testing import DummySecurityPolicy
+ self.failUnless(isinstance(ut, DummySecurityPolicy))
+ ut = getUtility(IAuthorizationPolicy)
self.assertEqual(ut.userid, 'user')
self.assertEqual(ut.groupids, ('group1', 'group2'))
+ self.assertEqual(ut.permissive, False)
def test_registerModels(self):
ob1 = object()
@@ -153,36 +144,30 @@ class TestTestingFunctions(unittest.TestCase):
self.assertEqual(response.body, '123')
def test_registerViewPermission_defaults(self):
+ from repoze.bfg.security import view_execution_permitted
from repoze.bfg import testing
view = testing.registerViewPermission('moo.html')
- from repoze.bfg.view import view_execution_permitted
testing.registerDummySecurityPolicy()
result = view_execution_permitted(None, None, 'moo.html')
self.failUnless(result)
self.assertEqual(result.msg, 'message')
def test_registerViewPermission_denying(self):
+ from repoze.bfg.security import view_execution_permitted
from repoze.bfg import testing
view = testing.registerViewPermission('moo.html', result=False)
- from repoze.bfg.view import view_execution_permitted
testing.registerDummySecurityPolicy()
result = view_execution_permitted(None, None, 'moo.html')
self.failIf(result)
self.assertEqual(result.msg, 'message')
def test_registerViewPermission_custom(self):
- class ViewPermission:
- def __init__(self, context, request):
- self.context = context
- self.request = request
-
- def __call__(self, secpol):
- return True
-
+ from repoze.bfg.security import view_execution_permitted
+ def viewperm(context, request):
+ return True
from repoze.bfg import testing
view = testing.registerViewPermission('moo.html',
- viewpermission=ViewPermission)
- from repoze.bfg.view import view_execution_permitted
+ viewpermission=viewperm)
testing.registerDummySecurityPolicy()
result = view_execution_permitted(None, None, 'moo.html')
self.failUnless(result is True)
@@ -226,30 +211,30 @@ class TestTestingFunctions(unittest.TestCase):
testing.registerUtility(utility, iface, name='mudge')
self.assertEqual(getUtility(iface, name='mudge')(), 'foo')
-class TestDummyAllowingSecurityPolicy(unittest.TestCase):
+class TestDummySecurityPolicy(unittest.TestCase):
def _getTargetClass(self):
- from repoze.bfg.testing import DummyAllowingSecurityPolicy
- return DummyAllowingSecurityPolicy
+ from repoze.bfg.testing import DummySecurityPolicy
+ return DummySecurityPolicy
- def _makeOne(self, userid=None, groupids=()):
+ def _makeOne(self, userid=None, groupids=(), permissive=True):
klass = self._getTargetClass()
- return klass(userid, groupids)
+ return klass(userid, groupids, permissive)
def test_authenticated_userid(self):
policy = self._makeOne('user')
- self.assertEqual(policy.authenticated_userid(None), 'user')
+ self.assertEqual(policy.authenticated_userid(None, None), 'user')
def test_effective_principals_userid(self):
policy = self._makeOne('user', ('group1',))
from repoze.bfg.security import Everyone
from repoze.bfg.security import Authenticated
- self.assertEqual(policy.effective_principals(None),
+ self.assertEqual(policy.effective_principals(None, None),
[Everyone, Authenticated, 'user', 'group1'])
def test_effective_principals_nouserid(self):
policy = self._makeOne()
from repoze.bfg.security import Everyone
- self.assertEqual(policy.effective_principals(None), [Everyone])
+ self.assertEqual(policy.effective_principals(None, None), [Everyone])
def test_permits(self):
policy = self._makeOne()
@@ -259,44 +244,19 @@ class TestDummyAllowingSecurityPolicy(unittest.TestCase):
policy = self._makeOne('user', ('group1',))
from repoze.bfg.security import Everyone
from repoze.bfg.security import Authenticated
- self.assertEqual(policy.principals_allowed_by_permission(None, None),
- [Everyone, Authenticated, 'user', 'group1'])
-
-
-class TestDummyDenyingSecurityPolicy(unittest.TestCase):
- def _getTargetClass(self):
- from repoze.bfg.testing import DummyDenyingSecurityPolicy
- return DummyDenyingSecurityPolicy
-
- def _makeOne(self, userid=None, groupids=()):
- klass = self._getTargetClass()
- return klass(userid, groupids)
+ result = policy.principals_allowed_by_permission(None, None)
+ self.assertEqual(result, [Everyone, Authenticated, 'user', 'group1'])
- def test_authenticated_userid(self):
- policy = self._makeOne('user')
- self.assertEqual(policy.authenticated_userid(None), 'user')
-
- def test_effective_principals_userid(self):
- policy = self._makeOne('user', ('group1',))
- from repoze.bfg.security import Everyone
- from repoze.bfg.security import Authenticated
- self.assertEqual(policy.effective_principals(None),
- [Everyone, Authenticated, 'user', 'group1'])
-
- def test_effective_principals_nouserid(self):
+ def test_forget(self):
policy = self._makeOne()
- from repoze.bfg.security import Everyone
- self.assertEqual(policy.effective_principals(None), [Everyone])
-
- def test_permits(self):
+ self.assertEqual(policy.forget(None, None), [])
+
+ def test_remember(self):
policy = self._makeOne()
- self.assertEqual(policy.permits(None, None, None), False)
+ self.assertEqual(policy.remember(None, None, None), [])
- def test_principals_allowed_by_permission(self):
- policy = self._makeOne('user', ('group1',))
- self.assertEqual(policy.principals_allowed_by_permission(None, None),
- [])
+
class TestDummyModel(unittest.TestCase):
def _getTargetClass(self):
from repoze.bfg.testing import DummyModel
diff --git a/repoze/bfg/tests/test_threadlocal.py b/repoze/bfg/tests/test_threadlocal.py
new file mode 100644
index 000000000..230bb3726
--- /dev/null
+++ b/repoze/bfg/tests/test_threadlocal.py
@@ -0,0 +1,46 @@
+from repoze.bfg.testing import cleanUp
+import unittest
+
+class TestThreadLocalManager(unittest.TestCase):
+ def setUp(self):
+ cleanUp()
+
+ def tearDown(self):
+ cleanUp()
+
+ def _getTargetClass(self):
+ from repoze.bfg.threadlocal import ThreadLocalManager
+ return ThreadLocalManager
+
+ def _makeOne(self, default=lambda *x: 1):
+ return self._getTargetClass()(default)
+
+ def test_init(self):
+ local = self._makeOne()
+ self.assertEqual(local.stack, [])
+ self.assertEqual(local.get(), 1)
+
+ def test_default(self):
+ from zope.component import getGlobalSiteManager
+ local = self._makeOne(getGlobalSiteManager)
+ self.assertEqual(local.stack, [])
+ self.assertEqual(local.get(), getGlobalSiteManager())
+
+ def test_push_and_pop(self):
+ local = self._makeOne()
+ local.push(True)
+ self.assertEqual(local.get(), True)
+ self.assertEqual(local.pop(), True)
+ self.assertEqual(local.pop(), None)
+ self.assertEqual(local.get(), 1)
+
+ def test_set_get_and_clear(self):
+ local = self._makeOne()
+ local.set(None)
+ self.assertEqual(local.stack, [None])
+ self.assertEqual(local.get(), None)
+ local.clear()
+ self.assertEqual(local.get(), 1)
+ local.clear()
+ self.assertEqual(local.get(), 1)
+
diff --git a/repoze/bfg/tests/test_view.py b/repoze/bfg/tests/test_view.py
index 08beffeaa..cc293d6ef 100644
--- a/repoze/bfg/tests/test_view.py
+++ b/repoze/bfg/tests/test_view.py
@@ -15,11 +15,21 @@ class BaseTest(object):
from repoze.bfg.interfaces import IView
gsm.registerAdapter(app, for_, IView, name)
- def _registerPermission(self, permission, name, *for_):
+ def _registerViewPermission(self, view_name, allow=True):
import zope.component
- gsm = zope.component.getGlobalSiteManager()
+ from zope.interface import Interface
from repoze.bfg.interfaces import IViewPermission
- gsm.registerAdapter(permission, for_, IViewPermission, name)
+ class Checker(object):
+ def __call__(self, context, request):
+ self.context = context
+ self.request = request
+ return allow
+ checker = Checker()
+ gsm = zope.component.getGlobalSiteManager()
+ gsm.registerAdapter(checker, (Interface, Interface),
+ IViewPermission,
+ view_name)
+ return checker
def _registerSecurityPolicy(self, secpol):
import zope.component
@@ -61,12 +71,10 @@ class RenderViewToResponseTests(BaseTest, unittest.TestCase):
directlyProvides(context, IContext)
response = DummyResponse()
secpol = DummySecurityPolicy()
- permissionfactory = make_permission_factory(False)
view = make_view(response)
self._registerView(view, 'registered', IContext, IRequest)
self._registerSecurityPolicy(secpol)
- self._registerPermission(permissionfactory, 'registered', IContext,
- IRequest)
+ self._registerViewPermission('registered', False)
environ = self._makeEnviron()
from webob import Request
request = Request(environ)
@@ -85,12 +93,10 @@ class RenderViewToResponseTests(BaseTest, unittest.TestCase):
directlyProvides(context, IContext)
response = DummyResponse()
secpol = DummySecurityPolicy()
- permissionfactory = make_permission_factory(True)
view = make_view(response)
self._registerView(view, 'registered', IContext, IRequest)
self._registerSecurityPolicy(secpol)
- self._registerPermission(permissionfactory, 'registered', IContext,
- IRequest)
+ self._registerViewPermission('registered', True)
environ = self._makeEnviron()
from webob import Request
request = Request(environ)
@@ -109,12 +115,10 @@ class RenderViewToResponseTests(BaseTest, unittest.TestCase):
directlyProvides(context, IContext)
response = DummyResponse()
secpol = DummySecurityPolicy()
- permissionfactory = make_permission_factory(False)
view = make_view(response)
self._registerView(view, 'registered', IContext, IRequest)
self._registerSecurityPolicy(secpol)
- self._registerPermission(permissionfactory, 'registered', IContext,
- IRequest)
+ self._registerViewPermission('registered', False)
environ = self._makeEnviron()
from webob import Request
request = Request(environ)
@@ -146,12 +150,10 @@ class RenderViewToIterableTests(BaseTest, unittest.TestCase):
directlyProvides(context, IContext)
response = DummyResponse()
secpol = DummySecurityPolicy()
- permissionfactory = make_permission_factory(False)
view = make_view(response)
self._registerView(view, 'registered', IContext, IRequest)
self._registerSecurityPolicy(secpol)
- self._registerPermission(permissionfactory, 'registered', IContext,
- IRequest)
+ self._registerViewPermission('registered', False)
environ = self._makeEnviron()
from webob import Request
request = Request(environ)
@@ -170,12 +172,10 @@ class RenderViewToIterableTests(BaseTest, unittest.TestCase):
directlyProvides(context, IContext)
response = DummyResponse()
secpol = DummySecurityPolicy()
- permissionfactory = make_permission_factory(True)
view = make_view(response)
self._registerView(view, 'registered', IContext, IRequest)
self._registerSecurityPolicy(secpol)
- self._registerPermission(permissionfactory, 'registered', IContext,
- IRequest)
+ self._registerViewPermission('registered', True)
environ = self._makeEnviron()
from webob import Request
request = Request(environ)
@@ -194,12 +194,10 @@ class RenderViewToIterableTests(BaseTest, unittest.TestCase):
directlyProvides(context, IContext)
response = DummyResponse()
secpol = DummySecurityPolicy()
- permissionfactory = make_permission_factory(False)
view = make_view(response)
self._registerView(view, 'registered', IContext, IRequest)
self._registerSecurityPolicy(secpol)
- self._registerPermission(permissionfactory, 'registered', IContext,
- IRequest)
+ self._registerViewPermission('registered', False)
environ = self._makeEnviron()
from webob import Request
request = Request(environ)
@@ -231,12 +229,10 @@ class RenderViewTests(unittest.TestCase, BaseTest):
directlyProvides(context, IContext)
response = DummyResponse()
secpol = DummySecurityPolicy()
- permissionfactory = make_permission_factory(False)
view = make_view(response)
self._registerView(view, 'registered', IContext, IRequest)
self._registerSecurityPolicy(secpol)
- self._registerPermission(permissionfactory, 'registered', IContext,
- IRequest)
+ self._registerViewPermission('registered', False)
environ = self._makeEnviron()
from webob import Request
request = Request(environ)
@@ -255,12 +251,10 @@ class RenderViewTests(unittest.TestCase, BaseTest):
directlyProvides(context, IContext)
response = DummyResponse()
secpol = DummySecurityPolicy()
- permissionfactory = make_permission_factory(True)
view = make_view(response)
self._registerView(view, 'registered', IContext, IRequest)
self._registerSecurityPolicy(secpol)
- self._registerPermission(permissionfactory, 'registered', IContext,
- IRequest)
+ self._registerViewPermission('registered', True)
environ = self._makeEnviron()
from webob import Request
request = Request(environ)
@@ -278,12 +272,10 @@ class RenderViewTests(unittest.TestCase, BaseTest):
directlyProvides(context, IContext)
response = DummyResponse()
secpol = DummySecurityPolicy()
- permissionfactory = make_permission_factory(False)
view = make_view(response)
self._registerView(view, 'registered', IContext, IRequest)
self._registerSecurityPolicy(secpol)
- self._registerPermission(permissionfactory, 'registered', IContext,
- IRequest)
+ self._registerViewPermission('registered', False)
environ = self._makeEnviron()
from webob import Request
request = Request(environ)
@@ -314,67 +306,6 @@ class TestIsResponse(unittest.TestCase):
response.status = None
self.assertEqual(self._callFUT(response), False)
-class TestViewExecutionPermitted(unittest.TestCase):
- def setUp(self):
- cleanUp()
-
- def tearDown(self):
- cleanUp()
-
- def _callFUT(self, *arg, **kw):
- from repoze.bfg.view import view_execution_permitted
- return view_execution_permitted(*arg, **kw)
-
- def _registerSecurityPolicy(self, secpol):
- import zope.component
- gsm = zope.component.getGlobalSiteManager()
- from repoze.bfg.interfaces import ISecurityPolicy
- gsm.registerUtility(secpol, ISecurityPolicy)
-
- def _registerPermission(self, permission, name, *for_):
- import zope.component
- gsm = zope.component.getGlobalSiteManager()
- from repoze.bfg.interfaces import IViewPermission
- gsm.registerAdapter(permission, for_, IViewPermission, name)
-
- def test_no_secpol(self):
- context = DummyContext()
- request = DummyRequest()
- result = self._callFUT(context, request, '')
- msg = result.msg
- self.failUnless("Allowed: view name '' in context" in msg)
- self.failUnless('(no security policy in use)' in msg)
- self.assertEqual(result, True)
-
- def test_secpol_no_permission(self):
- secpol = DummySecurityPolicy()
- self._registerSecurityPolicy(secpol)
- context = DummyContext()
- request = DummyRequest()
- result = self._callFUT(context, request, '')
- msg = result.msg
- self.failUnless("Allowed: view name '' in context" in msg)
- self.failUnless("(no permission registered for name '')" in msg)
- self.assertEqual(result, True)
-
- def test_secpol_and_permission(self):
- from zope.interface import Interface
- from zope.interface import directlyProvides
- from repoze.bfg.interfaces import IRequest
- class IContext(Interface):
- pass
- context = DummyContext()
- directlyProvides(context, IContext)
- permissionfactory = make_permission_factory(True)
- self._registerPermission(permissionfactory, '', IContext,
- IRequest)
- secpol = DummySecurityPolicy()
- self._registerSecurityPolicy(secpol)
- request = DummyRequest()
- directlyProvides(request, IRequest)
- result = self._callFUT(context, request, '')
- self.failUnless(result is True)
-
class TestStaticView(unittest.TestCase, BaseTest):
def setUp(self):
cleanUp()
@@ -527,18 +458,6 @@ def make_view(response):
return response
return view
-def make_permission_factory(result):
- class DummyPermissionFactory:
- def __init__(self, context, request):
- self.context = context
- self.request = request
-
- def __call__(self, secpol):
- self.__class__.checked_with = secpol
- return result
-
- return DummyPermissionFactory
-
class DummyResponse:
status = '200 OK'
headerlist = ()
diff --git a/repoze/bfg/tests/test_wsgi.py b/repoze/bfg/tests/test_wsgi.py
index b9568eb82..893364635 100644
--- a/repoze/bfg/tests/test_wsgi.py
+++ b/repoze/bfg/tests/test_wsgi.py
@@ -131,7 +131,7 @@ class TestNotFound(unittest.TestCase):
('Content-Type', 'text/html')])
def test_with_message(self):
- environ = {'message':'<hi!>'}
+ environ = {'repoze.bfg.message':'<hi!>'}
L = []
def start_response(status, headers):
L.append((status, headers))
@@ -166,7 +166,7 @@ class TestUnauthorized(unittest.TestCase):
('Content-Type', 'text/html')])
def test_with_message(self):
- environ = {'message':'<hi!>'}
+ environ = {'repoze.bfg.message':'<hi!>'}
L = []
def start_response(status, headers):
L.append((status, headers))
diff --git a/repoze/bfg/threadlocal.py b/repoze/bfg/threadlocal.py
new file mode 100644
index 000000000..7df4f34f4
--- /dev/null
+++ b/repoze/bfg/threadlocal.py
@@ -0,0 +1,39 @@
+import threading
+from zope.component import getGlobalSiteManager
+
+class ThreadLocalManager(threading.local):
+ def __init__(self, default):
+ self.stack = []
+ self.default = default
+
+ def push(self, info):
+ self.stack.append(info)
+
+ set = push # b/c
+
+ def pop(self):
+ if self.stack:
+ return self.stack.pop()
+
+ def get(self):
+ try:
+ return self.stack[-1]
+ except IndexError:
+ return self.default()
+
+ def clear(self):
+ self.stack[:] = []
+
+def defaults():
+ defaults = {'request':None}
+ gsm = getGlobalSiteManager()
+ defaults['registry'] = gsm
+ return defaults
+
+manager = ThreadLocalManager(defaults)
+
+def setManager(new_manager): # for unit tests
+ global manager
+ old_manager = manager
+ manager = new_manager
+ return old_manager
diff --git a/repoze/bfg/view.py b/repoze/bfg/view.py
index a867987a5..1698bc470 100644
--- a/repoze/bfg/view.py
+++ b/repoze/bfg/view.py
@@ -2,61 +2,25 @@ import inspect
from paste.urlparser import StaticURLParser
from zope.component import queryMultiAdapter
-from zope.component import queryUtility
+from zope.deprecation import deprecated
-from repoze.bfg.interfaces import ISecurityPolicy
-from repoze.bfg.interfaces import IViewPermission
-from repoze.bfg.interfaces import IView
+from zope.interface import Interface
+
+from repoze.bfg.interfaces import IRequest
+from repoze.bfg.interfaces import IView
from repoze.bfg.path import caller_path
+from repoze.bfg.security import view_execution_permitted
from repoze.bfg.security import Unauthorized
-from repoze.bfg.security import Allowed
-from zope.interface import Interface
-
-from repoze.bfg.interfaces import IRequest
+deprecated('view_execution_permitted',
+ "('from repoze.bfg.view import view_execution_permitted' is now "
+ "deprecated; instead use 'from repoze.bfg.security import "
+ "view_execution_permitted')",
+ )
_marker = object()
-def view_execution_permitted(context, request, name=''):
- """ If the view specified by ``context`` and ``name`` is protected
- by a permission, check the permission associated with the view
- using the effective security policy and the ``request``. Return a
- boolean result. If no security policy is in effect, or if the
- view is not protected by a permission, return True."""
- security_policy = queryUtility(ISecurityPolicy)
- permission = queryMultiAdapter((context, request), IViewPermission,
- name=name)
- return _view_execution_permitted(context, request, name, security_policy,
- permission, True)
-
-def _view_execution_permitted(context, request, view_name, security_policy,
- permission, debug_authorization):
- """ Rawer (faster) form of view_execution_permitted which does not
- need to do a CA lookup for the security policy or other values and
- which returns plain booleans if debug_authorization is off instead
- of constructing ``Allowed`` objects. This function is used by
- ``view_execution_permitted`` and the Router; it is not a public
- API."""
- if security_policy is None:
- if debug_authorization:
- return Allowed(
- 'Allowed: view name %r in context %r (no security policy in '
- 'use)', view_name, context)
- else:
- return True
-
- elif permission is None:
- if debug_authorization:
- return Allowed(
- 'Allowed: view name %r in context %r (no permission '
- 'registered for name %r).', view_name, context, view_name)
- else:
- return True
-
- else:
- return permission(security_policy)
-
def render_view_to_response(context, request, name='', secure=True):
""" Render the view named ``name`` against the specified
``context`` and ``request`` to an object implementing
diff --git a/repoze/bfg/wsgi.py b/repoze/bfg/wsgi.py
index abe7ebead..027345673 100644
--- a/repoze/bfg/wsgi.py
+++ b/repoze/bfg/wsgi.py
@@ -105,7 +105,7 @@ def wsgiapp2(wrapped):
class HTTPException(object):
def __call__(self, environ, start_response, exc_info=False):
try:
- msg = escape(environ['message'])
+ msg = escape(environ['repoze.bfg.message'])
except KeyError:
msg = ''
html = """<body>
diff --git a/repoze/bfg/zcml.py b/repoze/bfg/zcml.py
index 737e2409b..39366b0b1 100644
--- a/repoze/bfg/zcml.py
+++ b/repoze/bfg/zcml.py
@@ -37,14 +37,15 @@ class Uncacheable(object):
""" Include in discriminators of actions which are not cacheable;
this class only exists for backwards compatibility (<0.8.1)"""
-def view(_context,
- permission=None,
- for_=None,
- view=None,
- name="",
- request_type=None,
- cacheable=True, # not used, here for b/w compat < 0.8
- ):
+def view(
+ _context,
+ permission=None,
+ for_=None,
+ view=None,
+ name="",
+ request_type=None,
+ cacheable=True, # not used, here for b/w compat < 0.8
+ ):
if not view:
raise ConfigurationError('"view" attribute was not specified')
diff --git a/setup.py b/setup.py
index a06c41f11..a0106d349 100644
--- a/setup.py
+++ b/setup.py
@@ -12,7 +12,7 @@
#
##############################################################################
-__version__ = '0.8.1'
+__version__ = '0.9dev'
import os