diff options
39 files changed, 3489 insertions, 1879 deletions
diff --git a/CHANGES.txt b/CHANGES.txt index 40a810305..5891313fd 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,3 +1,116 @@ +Next release +============ + +Features +-------- + +- New API functions named ``forget`` and ``remember`` are available in + the ``security`` module. The ``forget`` function returns headers + which will cause the currently authenticated user to be logged out + when set in a response. The ``remember`` function (when passed the + proper arguments) will return headers which will cause a principal + to be "logged in" when set in a response. See the Security API + chapter of the docs for more info. + +- New keyword arguments to the ``repoze.bfg.router.make_app`` call + have been added: ``authentication_policy`` and + ``authorization_policy``. These should, respectively, be an + implementation of an authentication policy (an object implementing + the ``repoze.bfg.interfaces.IAuthenticationPolicy`` interface) and + an implementation of an authorization policy (an object implementing + ``repoze.bfg.interfaces.IAuthorizationPolicy). Concrete + implementations of authentication policies exist in + ``repoze.bfg.authentication``. Concrete implementations of + authorization policies exist in ``repoze.bfg.authorization``. + + Both ``authentication_policy`` and ``authorization_policy`` default + to ``None``. + + If ``authentication_policy`` is ``None``, but + ``authorization_policy`` is *not* ``None``, then + ``authorization_policy`` is ignored (the ability to do authorization + depends on authentication). + + If the ``authentication_policy`` argument is *not* ``None``, and the + ``authorization_policy`` argument *is* ``None``, the authorization + policy defaults to an authorization implementation that uses ACLs + (``repoze.bfg.authorization.ACLAuthorizationPolicy``). + + .. note:: we no longer encourage configuration of "security + policies" using ZCML, as previously we did for + ``ISecurityPolicy``. This is because it's not uncommon to need to + configure settings for concrete authorization or authentication + policies using paste .ini parameters; the app entry point for your + applicatio is the natural place to do this. + +- Two new abstractions have been added in the way of adapters used by + the system: an ``IAuthorizationPolicy`` and an + ``IAuthenticationPolicy``. A combination of these (as registered by + the ``securitypolicy`` ZCML directive) take the place of the + ``ISecurityPolicy`` abstraction in previous releases of repoze.who. + The API functions in ``repoze.who.security`` (such as + ``authentication_userid``, ``effective_principals``, + ``has_permission``, and so on) have been changed to try to make use + of these new adapters. If you're using an older ``ISecurityPolicy`` + adapter, the system will still work, but it will print deprecation + warnings when such a policy is used. + +- The way the (internal) IViewPermission utilities registered via ZCML + are invoked has changed. They are purely adapters now, returning a + boolean result, rather than returning a callable. You shouldn't have + been using these anyway. ;-) + +- New concrete implementations of IAuthenticationPolicy have been + added to the ``repoze.bfg.authentication`` module: + ``RepozeWho1AuthenticationPolicy`` which uses ``repoze.who`` + identity to retrieve authentication data from and + ``RemoteUserAuthenticationPolicy``, which uses the ``REMOTE_USER`` + value in the WSGI environment to retrieve authentication data. + +- A new concrete implementation of IAuthorizationPolicy has been added + to the ``repoze.bfg.authorization`` module: + ``ACLAuthorizationPolicy`` which uses ACL inheritance to do + authorization. + +- It is now possible to register a custom + ``repoze.bfg.interfaces.IForbiddenResponseFactory`` for a given + application. This feature replaces the + ``repoze.bfg.interfaces.IUnauthorizedAppFactory`` feature previously + described in the Hooks chapter. The IForbiddenResponseFactory will + be called when the framework detects an authorization failure; it + should accept a context object and a request object; it should + return an IResponse object (a webob response, basically). Read the + below point for more info and see the Hooks narrative chapter of the + BFG docs for more info. + +Backwards Incompatibilities +--------------------------- + +- Custom NotFound and Forbidden (nee' Unauthorized) WSGI applications + (registered as a utility for INotFoundAppFactory and + IUnauthorizedAppFactory) could rely on an environment key named + ``message`` describing the circumstance of the response. This key + has been renamed to ``repoze.bfg.message`` (as per the WSGI spec, + which requires environment extensions to contain dots). + +Deprecations +------------ + +- The ``repoze.bfg.interfaces.IUnauthorizedAppFactory`` interface has + been deprecated in favor of using the new + ``repoze.bfg.interfaces.IForbiddenResponseFactory`` mechanism. + +- The ``view_execution_permitted`` API should now be imported from the + ``repoze.bfg.security`` module instead of the ``repoze.bfg.view`` + module. + +- The ``authenticated_userid`` and ``effective_principals`` APIs in + ``repoze.bfg.security`` used to only take a single argument + (request). They now accept two arguments (``context`` and + ``request``). Calling them with a single argument is still + supported but issues a deprecation warning. + + 0.8.1 (2009-05-21) ================== diff --git a/docs/api/authentication.rst b/docs/api/authentication.rst new file mode 100644 index 000000000..f51eeb302 --- /dev/null +++ b/docs/api/authentication.rst @@ -0,0 +1,14 @@ +:mod:`repoze.bfg.authentication` +================================ + +.. automodule:: repoze.bfg.authentication + +.. _authentication_policies_api_section: + +Authentication Policies +~~~~~~~~~~~~~~~~~~~~~~~ + +.. autoclass:: RepozeWho1AuthenticationPolicy + +.. autoclass:: RemoteUserAuthenticationPolicy + diff --git a/docs/api/authorization.rst b/docs/api/authorization.rst new file mode 100644 index 000000000..e44b1e293 --- /dev/null +++ b/docs/api/authorization.rst @@ -0,0 +1,11 @@ +:mod:`repoze.bfg.authorization` +=============================== + +.. automodule:: repoze.bfg.authorization + +.. _authorization_policies_api_section: + +Authorization Policies +~~~~~~~~~~~~~~~~~~~~~~ + +.. autoclass:: ACLAuthorizationPolicy diff --git a/docs/api/security.rst b/docs/api/security.rst index 5990f1809..77b60f5d0 100644 --- a/docs/api/security.rst +++ b/docs/api/security.rst @@ -8,14 +8,21 @@ API Functions ~~~~~~~~~~~~~ -.. autofunction:: authenticated_userid +.. autofunction:: authenticated_userid(context, request) -.. autofunction:: effective_principals +.. autofunction:: effective_principals(context, request) .. autofunction:: has_permission .. autofunction:: principals_allowed_by_permission +.. autofunction:: forget + +.. autofunction:: remember + +.. autofunction:: view_execution_permitted + + Constants ~~~~~~~~~ @@ -29,8 +36,8 @@ Constants The special principal id named 'Authenticated'. This principal id is granted to all requests which contain any other non-Everyone - principal id (according to the security policy). Its actual value - is the string 'system.Authenticated'. + principal id (according to the :term:`authentication policy`). + Its actual value is the string 'system.Authenticated'. .. attribute:: ALL_PERMISSIONS @@ -73,15 +80,3 @@ Return Values .. autoclass:: Allowed :members: -.. _security_policies_api_section: - -Security Policies -~~~~~~~~~~~~~~~~~ - -.. autofunction:: WhoACLSecurityPolicy - -.. autofunction:: WhoInheritingACLSecurityPolicy - -.. autofunction:: RemoteUserACLSecurityPolicy - -.. autofunction:: RemoteUserInheritingACLSecurityPolicy diff --git a/docs/api/traversal.rst b/docs/api/traversal.rst index 2f57fd3aa..4b7ac41f5 100644 --- a/docs/api/traversal.rst +++ b/docs/api/traversal.rst @@ -21,9 +21,6 @@ .. autofunction:: traverse -.. note:: A function named ``model_url`` used to be present in this - module. It was moved to :ref:`url_module` in version 0.6.1. - Secondary APIs ~~~~~~~~~~~~~~ diff --git a/docs/api/view.rst b/docs/api/view.rst index 3b34b7e22..40c69d24b 100644 --- a/docs/api/view.rst +++ b/docs/api/view.rst @@ -13,8 +13,6 @@ .. autofunction:: is_response - .. autofunction:: view_execution_permitted - .. autoclass:: bfg_view :members: diff --git a/docs/conf.py b/docs/conf.py index ae594c7d3..bfb096b3b 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -51,9 +51,9 @@ copyright = '2008, Agendaless Consulting' # other places throughout the built documents. # # The short X.Y version. -version = '0.8.1' +version = '0.9dev' # The full version, including alpha/beta/rc tags. -release = '0.8.1' +release = '0.9dev' # There are two options for replacing |today|: either, you set today to some # non-false value, then it is used: diff --git a/docs/glossary.rst b/docs/glossary.rst index 92e2264f9..e6efcc565 100644 --- a/docs/glossary.rst +++ b/docs/glossary.rst @@ -141,14 +141,16 @@ Glossary perfom authentication: it leaves it up to an upstream component such as :term:`repoze.who`. :mod:`repoze.bfg` uses the :term:`authentication` data supplied by the upstream component as - one input during :term:`authorization`. + one input during :term:`authorization`. Authentication in + :mod:`repoze.bfg` is performed via an :term:`authentication + policy`. Authorization The act of determining whether a user can perform a specific action. In bfg terms, this means determining whether, for a given context, any :term:`principal` (or principals) associated with the request have the requisite :term:`permission` to allow the request to continue. Authorization in :mod:`repoze.bfg` is performed via - its :term:`security policy`. + its :term:`authorization policy`. Principal A *principal* is a string or unicode object representing a user or a user's membership in a group. It is provided by the @@ -158,14 +160,16 @@ Glossary bar", the request might have information attached to it that would indictate that Bob was represented by three principals: "bob", "group foo" and "group bar". - Security Policy - A security policy in :mod:`repoze.bfg` terms is a bit of code - which accepts a request, the :term:`ACL` associated with a - context, and the :term:`permission` associated with a particular - view, and subsequently determines whether or not the principals - associated with the request can perform the action associated with - the permission based on the ACL found on the :term:`context` (or - any of its parents). + Authorization Policy + An authorization policy in :mod:`repoze.bfg` terms is a bit of + code which has an API which determines whether or not the + principals associated with the request can perform an action + associated with a permission, based on the information found on the + :term:`context`. + Authentication Policy + An authentication policy in :mod:`repoze.bfg` terms is a bit of + code which has an API which determines the current + :term:`principal` (or principals) associated with a request. WSGI `Web Server Gateway Interface <http://wsgi.org/>`_. This is a Python standard for connecting web applications to web servers, @@ -274,7 +278,7 @@ Glossary object. In :mod:`repoze.bfg`, an interface may be attached to an model object or a request object in order to identify that the object is "of a type". Interfaces are used internally by - :mod:`repoze.bfg` to perform view lookups and security policy + :mod:`repoze.bfg` to perform view lookups and other policy lookups. Interfaces are exposed to application programmers by the ``view`` ZCML directive or the corresponding ``bfg_view`` decorator in the form of both the ``for`` attribute and the diff --git a/docs/index.rst b/docs/index.rst index 20c564396..d86cd895e 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -50,15 +50,17 @@ Per-module :mod:`repoze.bfg` API documentation. .. toctree:: :maxdepth: 2 + api/authentication + api/authorization api/events api/interfaces + api/location api/push api/router api/security api/template api/testing api/traversal - api/location api/url api/view api/wsgi diff --git a/docs/narr/hooks.rst b/docs/narr/hooks.rst index 21906e466..657ad8a67 100644 --- a/docs/narr/hooks.rst +++ b/docs/narr/hooks.rst @@ -88,7 +88,7 @@ an object that implements any particular interface; it simply needs have a ``status`` attribute, a ``headerlist`` attribute, and and ``app_iter`` attribute. -Changing the NotFound application +Changing the NotFound Application --------------------------------- When :mod:`repoze.bfg` can't map a URL to code, it creates and invokes @@ -119,54 +119,55 @@ sample code that implements a minimal NotFound application factory: .. note:: When a NotFound application factory is invoked, it is passed the WSGI environ and the WSGI ``start_response`` handler by :mod:`repoze.bfg`. Within the WSGI environ will be a key named - ``message`` that has a value explaining why the not found error was - raised. This error will be different when the ``debug_notfound`` - environment setting is true than it is when it is false. + ``repoze.bfg.message`` that has a value explaining why the not + found error was raised. This error will be different when the + ``debug_notfound`` environment setting is true than it is when it + is false. -Changing the Unauthorized application -------------------------------------- +Changing the Forbidden Response +------------------------------- When :mod:`repoze.bfg` can't authorize execution of a view based on -the security policy in use, it creates and invokes an Unauthorized -WSGI application. The application it invokes can be customized by -placing something like the following ZCML in your ``configure.zcml`` -file. +the authorization policy in use, it invokes a "forbidden response +factory". Usually this forbidden response factory is a default 401 +response, but it can be overridden as necessary by placing something +like the following ZCML in your ``configure.zcml`` file. .. code-block:: xml :linenos: - <utility provides="repoze.bfg.interfaces.IUnauthorizedAppFactory" - component="helloworld.factories.unauthorized_app_factory"/> + <utility provides="repoze.bfg.interfaces.IForbiddenResponseFactory" + component="helloworld.factories.forbidden_response_factory"/> -Replace ``helloworld.factories.unauthorized_app_factory`` with the -Python dotted name to the request factory you want to use. Here's -some sample code that implements a minimal Unauthorized application -factory: +Replace ``helloworld.factories.forbidden_app_factory`` with the Python +dotted name to the forbidden response factory you want to use. The +response factory must accept two parameters: ``context`` and +``request``. The ``context`` is the context found by the router when +the view invocation was denied. The ``request`` is the current +:term:`request` representing the denied action. Here's some sample +code that implements a minimal forbidden response factory: .. code-block:: python - from webob.exc import HTTPUnauthorized + from repoze.bfg.chameleon_zpt import render_template_to_response - class MyUnauthorized(HTTPUnauthorized): - pass + def forbidden_response_factory(context, request): + return render_template_to_response('templates/login_form.pt') - def notfound_app_factory(): - return MyUnauthorized +.. note:: When an forbidden response factory is invoked, it is passed + the request as the second argument. An attribute of the request is + ``environ``, which is the WSGI environment. Within the WSGI + environ will be a key named ``repoze.bfg.message`` that has a value + explaining why the current view invocation was forbidden. This + error will be different when the ``debug_authorization`` + environment setting is true than it is when it is false. -.. note:: When an Unauthorized application factory is invoked, it is - passed the WSGI environ and the WSGI ``start_response`` handler by - :mod:`repoze.bfg`. Within the WSGI environ will be a key named - ``message`` that has a value explaining why the action was not - authorized. This error will be different when the - ``debug_authorization`` environment setting is true than it is when - it is false. - -.. note:: You can influence the status code of Unauthorized responses - by using an alterate unauthorized application factory. For - example, you may return an unauthorized application with a ``403 - Forbidden`` status code, rather than use the default unauthorized - application factory, which sends a response with a ``401 - Unauthorized`` status code. +.. warning:: the default forbidden application factory sends a + response with a ``401 Unauthorized`` status code for backwards + compatibility reasons. You can influence the status code of + Forbidden responses by using an alterate forbidden application + factory. For example, it would make sense to return an forbidden + application with a ``403 Forbidden`` status code. Changing the Default Routes Context Factory ------------------------------------------- diff --git a/docs/narr/security.rst b/docs/narr/security.rst index 3de7f9140..365f19010 100644 --- a/docs/narr/security.rst +++ b/docs/narr/security.rst @@ -9,53 +9,64 @@ from being invoked when the user represented by credentials in the :term:`request` does not have an appropriate level of access in a specific context. -Authorization is enabled by adding configuration to your -``configure.zcml`` which specifies a :term:`security policy`. +Authorization is enabled by modifying your application's invocation of +``repoze.bfg.router.make_app``, often located in the ``run.py`` module +of a :mod:`repoze.bfg` application. -Enabling a Security Policy --------------------------- +Enabling an Authorization Policy +-------------------------------- -By default, :mod:`repoze.bfg` enables no security policy. All views -are accessible by completely anonymous users. +By default, :mod:`repoze.bfg` enables no authorization policy. All +views are accessible by completely anonymous users. -However, if you add the following bit of code to your application's -``configure.zcml``, you will enable a security policy: +However, if you change the call to ``repoze.bfg.router.make_app`` +(usually found within the ``run.py`` module in your application), you +will enable an authorization policy. -.. code-block:: xml +You must enable a a :term:`authentication policy` in order to enable +an authorization policy. + +For example, to enable a policy which compares the ``REMOTE_USER`` +variable passed in the request's environment (as the sole +:term:`principal`) against the principals present in any :term:`ACL` +found in model data when attempting to call some :term:`view`, modify +your ``run.py`` to look something like this: + +.. code-block:: python :linenos: - <utility - provides="repoze.bfg.interfaces.ISecurityPolicy" - factory="repoze.bfg.security.RemoteUserInheritingACLSecurityPolicy" - /> - -The above inscrutable stanza enables the -``RemoteUserInheritingACLSecurityPolicy`` to be in effect for every -request to your application. The -``RemoteUserInheritingACLSecurityPolicy`` is a policy which compares -the ``REMOTE_USER`` variable passed in the request's environment (as -the sole :term:`principal`) against the principals present in any -:term:`ACL` found in model data when attempting to call some -:term:`view`. The policy either allows the view that the permission -was declared for to be called, or returns a ``401 Unathorized`` -response code to the upstream WSGI server. - -.. note:: Another "inheriting" security policy also exists: - ``WhoInheritingACLSecurityPolicy``. This policy uses principal - information found in the ``repoze.who.identity`` value set into the - WSGI environment by the :term:`repoze.who` middleware rather than - ``REMOTE_USER`` information. This policy only works properly when - :term:`repoze.who` middleware is present in the WSGI pipeline. - -.. note:: "non-inheriting" security policy variants of the - (``WhoACLSecurityPolicy`` and ``RemoteUserACLSecurityPolicy``) also - exist. These policies use the *first* ACL found as the canonical - ACL; they do not continue searching up the context lineage to find - "inherited" ACLs. It is recommended that you use the inheriting - variants unless you need this feature. - -.. note:: See :ref:`security_policies_api_section` for more - information about the features of the default security policies. + from repoze.bfg.router import make_app + from repoze.bfg.authentication import RemoteUserAuthenticationPolicy + + def app(global_config, **kw): + """ This function returns a repoze.bfg.router.Router object. It + is usually called by the PasteDeploy framework during ``paster + serve``""" + # paster app config callback + from myproject.models import get_root + import myproject + policy = RemoteUserAuthenticationPolicy() + return make_app(get_root, myproject, authentication_policy=policy, + options=kw) + +This injects an instance of the +``repoze.bfg.authentication.RemoteUserAuthenticationPolicy`` as the +:term:`authentication policy`. It is possible to use a different +authentication policy. :mod:`repoze.bfg` ships with a few prechewed +authentication policies that should prove useful (see +:ref:`authentication_policies_api_section`). It is also possible to +construct your own authentication policy. Any instance which +implements the interface defined in +``repoze.bfg.interfaces.IAuthenticationPolicy`` can be used. + +It's common but it is also possible to change the default +:term:`authorization policy` (to use some other persistent +authorization mechanism other than ACLs). To do so, pass an object +which implements the ``repoze.bfg.interfaces.IAuthorizationPolicy``) +to ``make_app`` as the ``authorization_policy`` value. +:mod:`repoze.who` ships with only one. See +:ref:`authorization_policies_api_section` for the details of the ACL +authorization policy which is the default Protecting Views with Permissions --------------------------------- @@ -91,9 +102,9 @@ your project's package """ Add blog entry code goes here """ pass -If a security policy is in place when this view is found during normal -application operations, the user will need to possess the ``add`` -permission against the context to be able to invoke the +If an authorization policy is in place when this view is found during +normal application operations, the user will need to possess the +``add`` permission against the context to be able to invoke the ``blog_entry_add_view`` view. Permission names are just strings. They hold no special significance @@ -192,18 +203,18 @@ ACE, as below. ] A principal is usually a user id, however it also may be a group id if -your authentication system provides group information and the security -policy is written to respect them. The -``RemoteUserInheritingACLSecurityPolicy`` does not respect group -information, but other security policies that come with -:mod:`repoze.bfg` do (see the :mod:`repoze.bfg.security` API docs for -more info). +your authentication system provides group information and the +effective :term:`authentication policy` policy is written to respect +group information. The ``RepozeWho1AuthenicationPolicy`` +authentication policy that comes with :mod:`repoze.bfg` respects group +information (see the :mod:`repoze.bfg.security` API docs for more +info on authentication policies). Each tuple within an ACL structure is known as a :term:`ACE`, which stands for "access control entry". For example, in the above ACL, ``(Allow, Everyone, 'view')`` is an ACE. Each ACE in an ACL is -processed by a security policy *in the order dictated by the ACL*. So -if you have an ACL like this: +processed by an authorization policy *in the order dictated by the +ACL*. So if you have an ACL like this: .. code-block:: python :linenos: @@ -217,9 +228,9 @@ if you have an ACL like this: (Deny, Everyone, 'view'), ] -The security policy will *allow* everyone the view permission, even -though later in the ACL you have an ACE that denies everyone the view -permission. On the other hand, if you have an ACL like this: +The authorization policy will *allow* everyone the view permission, +even though later in the ACL you have an ACE that denies everyone the +view permission. On the other hand, if you have an ACL like this: .. code-block:: python :linenos: @@ -233,7 +244,7 @@ permission. On the other hand, if you have an ACL like this: (Allow, Everyone, 'view'), ] -The security policy will deny Everyone the view permission, even +The authorization policy will deny Everyone the view permission, even though later in the ACL is an ACE that allows everyone. Special Principal Names @@ -271,8 +282,8 @@ module. These can be imported for use in ACLs. ACL like so: ``(Allow, 'fred', ALL_PERMISSIONS)``. The ``ALL_PERMISSIONS`` object is actually a standin object that has a ``__contains__`` method that always returns True, which, for all - known security policies, has the effect of indicating that a given - principal "has" any permission asked for by the system. + known authorization policies, has the effect of indicating that a + given principal "has" any permission asked for by the system. Special ACEs ------------ @@ -286,11 +297,11 @@ following: (Deny, Everyone, ALL_PERMISSIONS) This ACE is often used as the *last* ACE of an ACL to explicitly cause -inheriting security policies to "stop looking up the traversal tree" -(effectively breaking any inheritance). For example, an ACL which -allows *only* ``fred`` the view permission in a particular traversal -context despite what inherited ACLs may say when an inheriting -security policy is in effect might look like so: +inheriting authorization policies to "stop looking up the traversal +tree" (effectively breaking any inheritance). For example, an ACL +which allows *only* ``fred`` the view permission in a particular +traversal context despite what inherited ACLs may say when the default +authorization policy is in effect might look like so: .. code-block:: python :linenos: @@ -304,39 +315,11 @@ security policy is in effect might look like so: ACL Inheritance --------------- -While any security policy is in place, if a model object does not have -an ACL when it is the context, its *parent* is consulted for an ACL. -If that object does not have an ACL, *its* parent is consulted for an -ACL, ad infinitum, until we've reached the root and there are no more -parents left. - -With *non-inheriting* security policy variants -(e.g. ``WhoACLSecurityPolicy`` and ``RemoteUserACLSecurityPolicy``), -the *first* ACL found by the security policy will be used as the -effective ACL. No combination of ACLs found during traversal or -backtracking is done. - -With *inheriting* security policy variants -(e.g. ``WhoInheritingACLSecurityPolicy`` and -``RemoteUserInheritingACLSecurityPolicy``), *all* ACLs in the -context's :term:`lineage` are consulted when determining whether -access is allowed or denied. - -:ref:`security_policies_api_section` for more information about the -features of the default security policies and the difference between -the inheriting and non-inheriting variants. - -.. note:: It is recommended that you use the inheriting variant of a - security policy. Inheriting variants of security policies make it - possible for you to form a security strategy based on context ACL - "inheritance" rather than needing to keep all information about an - object's security state in a single ACL attached to that object. - It's much easier to code applications that dynamically change ACLs - if ACL inheritance is used. In reality, the non-inheriting - security policy variants exist only for backwards compatibility - with applications that used them in versions of :mod:`repoze.bfg` - before 0.8. If this backwards compatibility was not required, the - non-inheriting variants probably just wouldn't exist. +While the default :term:`authorization policy` is in place, if a model +object does not have an ACL when it is the context, its *parent* is +consulted for an ACL. If that object does not have an ACL, *its* +parent is consulted for an ACL, ad infinitum, until we've reached the +root and there are no more parents left. Location-Awareness ------------------ diff --git a/docs/narr/traversal.rst b/docs/narr/traversal.rst index 467916779..8eba623e4 100644 --- a/docs/narr/traversal.rst +++ b/docs/narr/traversal.rst @@ -135,13 +135,13 @@ code to execute: for the name ``b``, the router deems that the context is "object ``a``", the view name is ``b`` and the subpath is ``['c']``. -#. If a :term:`security policy` is configured, the router performs a - permission lookup. If a permission declaration is found for the - view name and context implied by the current request, the security - policy is consulted to see if the "current user" (also determined - by the security policy) can perform the action. If he can, - processing continues. If he cannot, an ``HTTPUnauthorized`` error - is raised. +#. If a :term:`authentication policy` is configured, the router + performs a permission lookup. If a permission declaration is + found for the view name and context implied by the current + request, an :term:`authorization policy` is consulted to see if + the "current user" (al determined by the the authentication + policy) can perform the action. If he can, processing continues. + If he cannot, an ``HTTPUnauthorized`` error is raised. #. Armed with the context, the view name, and the subpath, the router performs a view lookup. It attemtps to look up a view from the diff --git a/docs/narr/urldispatch.rst b/docs/narr/urldispatch.rst index c27165731..d43a344f2 100644 --- a/docs/narr/urldispatch.rst +++ b/docs/narr/urldispatch.rst @@ -420,14 +420,14 @@ Using :mod:`repoze.bfg` Security With URL Dispatch -------------------------------------------------- :mod:`repoze.bfg` provides its own security framework which consults a -:term:`security policy` before allowing any application code to be -called. This framework operates in terms of ACLs (Access Control +:term:`authorization policy` before allowing any application code to +be called. This framework operates in terms of ACLs (Access Control Lists, see :ref:`security_chapter` for more information about the -:mod:`repoze.bfg` security subsystem). A common thing to want to do -is to attach an ``__acl__`` to the context object dynamically for -declarative security purposes. You can use the ``factory`` -argument that points at a context factory which attaches a custom -``__acl__`` to an object at its creation time. +:mod:`repoze.bfg` authorization subsystem). A common thing to want to +do is to attach an ``__acl__`` to the context object dynamically for +declarative security purposes. You can use the ``factory`` argument +that points at a context factory which attaches a custom ``__acl__`` +to an object at its creation time. Such a ``factory`` might look like so: diff --git a/docs/narr/views.rst b/docs/narr/views.rst index ac5a8383f..9e9c55236 100644 --- a/docs/narr/views.rst +++ b/docs/narr/views.rst @@ -546,12 +546,12 @@ will be called. View Security ------------- -If a :term:`security policy` is active, any :term:`permission` -attached to a ``view`` declaration will be consulted to ensure -that the currently authenticated user possesses that permission -against the context before the view function is actually called. -Here's an example of specifying a permission in a ``view`` -declaration: +If a :term:`authentication policy` (and a :term:`authorization +policy`) is active, any :term:`permission` attached to a ``view`` +declaration will be consulted to ensure that the currently +authenticated user possesses that permission against the context +before the view function is actually called. Here's an example of +specifying a permission in a ``view`` declaration: .. code-block:: xml :linenos: @@ -563,16 +563,16 @@ declaration: permission="add" /> -When a security policy is enabled, this view will be protected with -the ``add`` permission. The view will *not be called* if the user -does not possess the ``add`` permission relative to the current -:term:`context` and a security policy is enabled. Instead an HTTP -``Unauthorized`` status will be returned to the client. +When an authentication policy is enabled, this view will be protected +with the ``add`` permission. The view will *not be called* if the +user does not possess the ``add`` permission relative to the current +:term:`context` and an authorization policy is enabled. Instead an +HTTP ``Unauthorized`` status will be returned to the client. .. note:: See the :ref:`security_chapter` chapter to find out how to turn on - a security policy. + an authentication policy. .. note:: diff --git a/repoze/bfg/authentication.py b/repoze/bfg/authentication.py new file mode 100644 index 000000000..487a5e6a8 --- /dev/null +++ b/repoze/bfg/authentication.py @@ -0,0 +1,86 @@ +from zope.interface import implements +from repoze.bfg.interfaces import IAuthenticationPolicy +from repoze.bfg.security import Everyone +from repoze.bfg.security import Authenticated + +class RepozeWho1AuthenticationPolicy(object): + """ A BFG authentication policy which obtains data from the + repoze.who 1.X WSGI API """ + implements(IAuthenticationPolicy) + identifier_name = 'auth_tkt' + + def _get_identity(self, request): + return request.environ.get('repoze.who.identity') + + def _get_identifier(self, request): + plugins = request.environ.get('repoze.who.plugins') + if plugins is None: + return None + identifier = plugins[self.identifier_name] + return identifier + + def authenticated_userid(self, context, request): + identity = self._get_identity(request) + if identity is None: + return None + return identity['repoze.who.userid'] + + def effective_principals(self, context, request): + effective_principals = [Everyone] + identity = self._get_identity(request) + if identity is None: + return effective_principals + + effective_principals.append(Authenticated) + userid = identity['repoze.who.userid'] + groups = identity.get('groups', []) + effective_principals.append(userid) + effective_principals.extend(groups) + + return effective_principals + + def remember(self, context, request, principal, **kw): + identifier = self._get_identifier(request) + if identifier is None: + return [] + environ = request.environ + identity = {'repoze.who.userid':principal} + return identifier.remember(environ, identity) + + def forget(self, context, request): + identifier = self._get_identifier(request) + if identifier is None: + return [] + identity = self._get_identity(request) + return identifier.forget(request.environ, identity) + +class RemoteUserAuthenticationPolicy(object): + """ A BFG authentication policy which obtains data from the + REMOTE_USER WSGI envvar """ + implements(IAuthenticationPolicy) + + def _get_identity(self, request): + return request.environ.get('REMOTE_USER') + + def authenticated_userid(self, context, request): + identity = self._get_identity(request) + if identity is None: + return None + return identity + + def effective_principals(self, context, request): + effective_principals = [Everyone] + identity = self._get_identity(request) + if identity is None: + return effective_principals + + effective_principals.append(Authenticated) + effective_principals.append(identity) + + return effective_principals + + def remember(self, context, request, principal, **kw): + return [] + + def forget(self, context, request): + return [] diff --git a/repoze/bfg/authorization.py b/repoze/bfg/authorization.py new file mode 100644 index 000000000..e131e6a21 --- /dev/null +++ b/repoze/bfg/authorization.py @@ -0,0 +1,112 @@ +from zope.interface import implements + +from repoze.bfg.interfaces import IAuthorizationPolicy +from repoze.bfg.location import lineage +from repoze.bfg.security import Allow +from repoze.bfg.security import Deny +from repoze.bfg.security import ACLAllowed +from repoze.bfg.security import ACLDenied +from repoze.bfg.security import Everyone + +class ACLAuthorizationPolicy(object): + """ An authorization policy which uses ACLs in the following ways: + + - When checking whether a user is permitted (via the ``permits`` + method), the security policy consults the ``context`` for an ACL + first. If no ACL exists on the context, or one does exist but + the ACL does not explicitly allow or deny access for any of the + effective principals, consult the context's parent ACL, and so + on, until the lineage is exhausted or we determine that the + policy permits or denies. + + During this processing, if any ``Deny`` ACE is found matching + any principal in ``principals``, stop processing by returning an + ``ACLDenied`` (equals False) immediately. If any ``Allow`` ACE + is found matching any principal, stop processing by returning an + ``ACLAllowed`` (equals True) immediately. If we exhaust the + context's lineage, and no ACE has explicitly permitted or denied + access, return an ``ACLDenied``. This differs from the + non-inheriting security policy (the ``ACLSecurityPolicy``) by + virtue of the fact that it does not stop looking for ACLs in the + object lineage after it finds the first one. + + - When computing principals allowed by a permission via the + ``principals_allowed_by_permission`` method, we compute the set + of principals that are explicitly granted the ``permission`` in + the provided ``context``. We do this by walking 'up' the object + graph *from the root* to the context. During this walking + process, if we find an explicit ``Allow`` ACE for a principal + that matches the ``permission``, the principal is included in + the allow list. However, if later in the walking process that + user is mentioned in any ``Deny`` ACE for the permission, the + user is removed from the allow list. If a ``Deny`` to the + principal ``Everyone`` is encountered during the walking process + that matches the ``permission``, the allow list is cleared for + all principals encountered in previous ACLs. The walking + process ends after we've processed the any ACL directly attached + to ``context``; a set of principals is returned. + """ + + implements(IAuthorizationPolicy) + + def permits(self, context, principals, permission): + """ Return ``ACLAllowed`` if the policy permits access, + ``ACLDenied`` if not. """ + + for location in lineage(context): + try: + acl = location.__acl__ + except AttributeError: + continue + + for ace in acl: + ace_action, ace_principal, ace_permissions = ace + if ace_principal in principals: + if not hasattr(ace_permissions, '__iter__'): + ace_permissions = [ace_permissions] + if permission in ace_permissions: + if ace_action == Allow: + return ACLAllowed(ace, acl, permission, + principals, location) + else: + return ACLDenied(ace, acl, permission, + principals, location) + + # default deny if no ACL in lineage at all + return ACLDenied(None, None, permission, principals, context) + + def principals_allowed_by_permission(self, context, permission): + """ Return the set of principals explicitly granted the + permission named ``permission`` according to the ACL directly + attached to the context context as well as inherited ACLs. """ + allowed = set() + + for location in reversed(list(lineage(context))): + # NB: we're walking *up* the object graph from the root + try: + acl = location.__acl__ + except AttributeError: + continue + + allowed_here = set() + denied_here = set() + + for ace_action, ace_principal, ace_permissions in acl: + if not hasattr(ace_permissions, '__iter__'): + ace_permissions = [ace_permissions] + if ace_action == Allow and permission in ace_permissions: + if not ace_principal in denied_here: + allowed_here.add(ace_principal) + if ace_action == Deny and permission in ace_permissions: + denied_here.add(ace_principal) + if ace_principal == Everyone: + # clear the entire allowed set, as we've hit a + # deny of Everyone ala (Deny, Everyone, ALL) + allowed = set() + break + elif ace_principal in allowed: + allowed.remove(ace_principal) + + allowed.update(allowed_here) + + return allowed diff --git a/repoze/bfg/interfaces.py b/repoze/bfg/interfaces.py index cecc3a397..2b00ac18f 100644 --- a/repoze/bfg/interfaces.py +++ b/repoze/bfg/interfaces.py @@ -197,15 +197,26 @@ class INotFoundAppFactory(Interface): a``message`` key in the WSGI environ provides information pertaining to the reason for the notfound.""" +class IForbiddenResponseFactory(Interface): + """ A utility which returns an IResponse as the result of the + denial of a view invocation by a security policy.""" + def __call__(context, request): + """ Return an object implementing IResponse (an object with + the status, headerlist, and app_iter attributes) as a result + of a view invocation denial by a security policy. + + Note that the ``message`` key in the WSGI environ + (request.environ) provides information pertaining to the + reason for the view invocation denial. The ``context`` passed + to the forbidden app factory will be the context found by the + repoze.bfg router during traversal or url dispatch. The + ``request`` will be the request object which caused the deny.""" + class IUnauthorizedAppFactory(Interface): """ A utility which returns an Unauthorized WSGI application - factory""" - def __call__(): - """ Return a callable which returns an unauthorized WSGI - application. When the WSGI application is invoked, a - ``message`` key in the WSGI environ provides information - pertaining to the reason for the unauthorized.""" - + factory (deprecated in repoze.bfg 0.8.2) in favor of + IForbiddenResponseFactory """ + class IContextURL(Interface): """ An adapter which deals with URLs related to a context. """ @@ -220,6 +231,37 @@ class IRoutesContextFactory(Interface): """ A marker interface used to look up the default routes context factory """ +class IAuthenticationPolicy(Interface): + """ A multi-adapter on context and request """ + def authenticated_userid(context, request): + """ Return the authenticated userid or ``None`` if no + authenticated userid can be found. """ + + def effective_principals(context, request): + """ Return a sequence representing the effective principals + including the userid and any groups belonged to by the current + user, including 'system' groups such as Everyone and + Authenticated. """ + + def remember(context, request, principal, **kw): + """ Return a set of headers suitable for 'remembering' the + principal named ``principal`` when set in a response. An + individual authentication policy and its consumers can decide + on the composition and meaning of **kw. """ + + def forget(context, request): + """ Return a set of headers suitable for 'forgetting' the + current user on subsequent requests. """ + +class IAuthorizationPolicy(Interface): + """ A adapter on context """ + def permits(context, principals, permission): + """ Return True if any of the principals is allowed the + permission in the current context, else return False """ + + def principals_allowed_by_permission(context, permission): + """ Return a set of principal identifiers allowed by the permission """ + # VH_ROOT_KEY is an interface; its imported from other packages (e.g. # traversalwrapper) VH_ROOT_KEY = 'HTTP_X_VHM_ROOT' diff --git a/repoze/bfg/registry.py b/repoze/bfg/registry.py index 0cf8e306b..cacf9806c 100644 --- a/repoze/bfg/registry.py +++ b/repoze/bfg/registry.py @@ -10,6 +10,8 @@ from zope.component.registry import Components from zope.deprecation import deprecated +from repoze.bfg.threadlocal import manager + from repoze.bfg.zcml import zcml_configure from repoze.bfg.settings import Settings # alias for deprecation below @@ -48,35 +50,8 @@ class Registry(Components): for ignored in self.subscribers(events, None): """ """ -class ThreadLocalRegistryManager(threading.local): - def __init__(self): - self.stack = [] - - def push(self, registry): - self.stack.append(registry) - - set = push # backwards compatibility - - def pop(self): - if self.stack: - return self.stack.pop() - - def get(self): - try: - return self.stack[-1] - except IndexError: - return getGlobalSiteManager() - - def clear(self): - self.stack[:] = [] - -registry_manager = ThreadLocalRegistryManager() - -def setRegistryManager(manager): # for unit tests - global registry_manager - old_registry_manager = registry_manager - registry_manager = manager - return old_registry_manager +def get_registry(): + return manager.get()['registry'] def populateRegistry(registry, filename, package, lock=threading.Lock()): @@ -95,19 +70,19 @@ def populateRegistry(registry, filename, package, lock=threading.Lock()): registry.""" lock.acquire() - registry_manager.push(registry) + manager.push({'registry':registry, 'request':None}) try: original_getSiteManager.sethook(getSiteManager) - zope.component.getGlobalSiteManager = registry_manager.get + zope.component.getGlobalSiteManager = get_registry zcml_configure(filename, package) finally: zope.component.getGlobalSiteManager = getGlobalSiteManager lock.release() - registry_manager.pop() + manager.pop() def getSiteManager(context=None): if context is None: - return registry_manager.get() + return get_registry() else: try: return IComponentLookup(context) diff --git a/repoze/bfg/router.py b/repoze/bfg/router.py index 81bc6e4ef..1b535c442 100644 --- a/repoze/bfg/router.py +++ b/repoze/bfg/router.py @@ -1,42 +1,55 @@ +from cgi import escape import sys from webob import Request as WebObRequest +from webob import Response from zope.component.event import dispatch +from zope.component import queryUtility from zope.interface import implements +from repoze.bfg.authorization import ACLAuthorizationPolicy + from repoze.bfg.events import NewRequest from repoze.bfg.events import NewResponse from repoze.bfg.events import WSGIApplicationCreatedEvent from repoze.bfg.interfaces import ILogger +from repoze.bfg.interfaces import ISecurityPolicy from repoze.bfg.interfaces import INotFoundAppFactory from repoze.bfg.interfaces import IRequestFactory +from repoze.bfg.interfaces import IResponseFactory from repoze.bfg.interfaces import IRootFactory from repoze.bfg.interfaces import IRouter from repoze.bfg.interfaces import IRoutesMapper -from repoze.bfg.interfaces import ISecurityPolicy from repoze.bfg.interfaces import ISettings +from repoze.bfg.interfaces import IForbiddenResponseFactory from repoze.bfg.interfaces import IUnauthorizedAppFactory from repoze.bfg.interfaces import IView from repoze.bfg.interfaces import IViewPermission +from repoze.bfg.interfaces import IAuthorizationPolicy +from repoze.bfg.interfaces import IAuthenticationPolicy from repoze.bfg.log import make_stream_logger from repoze.bfg.registry import Registry -from repoze.bfg.registry import registry_manager from repoze.bfg.registry import populateRegistry from repoze.bfg.request import HTTP_METHOD_FACTORIES from repoze.bfg.request import Request +from repoze.bfg.secpols import registerBBBAuthn + +from repoze.bfg.security import Allowed + from repoze.bfg.settings import Settings -from repoze.bfg.urldispatch import RoutesRootFactory +from repoze.bfg.threadlocal import manager from repoze.bfg.traversal import _traverse -from repoze.bfg.view import _view_execution_permitted -from repoze.bfg.wsgi import Unauthorized + +from repoze.bfg.urldispatch import RoutesRootFactory + from repoze.bfg.wsgi import NotFound _marker = object() @@ -47,25 +60,51 @@ class Router(object): debug_authorization = False debug_notfound = False + threadlocal_manager = manager def __init__(self, registry): self.registry = registry + self.logger = registry.queryUtility(ILogger, 'repoze.bfg.debug') self.request_factory = registry.queryUtility(IRequestFactory) - self.security_policy = registry.queryUtility(ISecurityPolicy) - self.notfound_app_factory = registry.queryUtility( - INotFoundAppFactory, - default=NotFound) - self.unauth_app_factory = registry.queryUtility( - IUnauthorizedAppFactory, - default=Unauthorized) + unauthorized_app_factory = registry.queryUtility( + IUnauthorizedAppFactory) + + forbidden = None + + if unauthorized_app_factory is not None: + warning = ( + 'Instead of registering a utility against the ' + 'repoze.bfg.interfaces.IUnauthorizedAppFactory interface ' + 'to return a custom forbidden response, you should now ' + 'register a "repoze.interfaces.IForbiddenResponseFactory". ' + 'The IUnauthorizedAppFactory interface was deprecated in ' + 'repoze.bfg 0.9 and will be removed in a subsequent version ' + 'of repoze.bfg. See the "Hooks" chapter of the repoze.bfg ' + 'documentation for more information about ' + 'IForbiddenResponseFactory.') + self.logger and self.logger.warn(warning) + def forbidden(context, request): + app = unauthorized_app_factory() + response = request.get_response(app) + return response + + forbidden = registry.queryUtility(IForbiddenResponseFactory, + default=forbidden) + + self.forbidden_resp_factory = forbidden or default_forbidden_view + + self.notfound_app_factory = registry.queryUtility(INotFoundAppFactory, + default=NotFound) + settings = registry.queryUtility(ISettings) if settings is not None: self.debug_authorization = settings.debug_authorization self.debug_notfound = settings.debug_notfound + + self.secured = not not registry.queryUtility(IAuthenticationPolicy) - self.logger = registry.queryUtility(ILogger, 'repoze.bfg.debug') self.root_factory = registry.getUtility(IRootFactory) self.root_policy = self.root_factory # b/w compat self.traverser_warned = {} @@ -78,7 +117,11 @@ class Router(object): iterable. """ registry = self.registry - registry_manager.push(registry) + threadlocals = {'registry':registry, 'request':None} + self.threadlocal_manager.push(threadlocals) + + logger = self.logger + request = None try: if self.request_factory is None: @@ -92,6 +135,7 @@ class Router(object): request_factory = self.request_factory request = request_factory(environ) + threadlocals['request'] = request registry.has_listeners and registry.notify(NewRequest(request)) root = self.root_factory(environ) @@ -121,37 +165,48 @@ class Router(object): request.virtual_root = vroot request.virtual_root_path = vroot_path - security_policy = self.security_policy - - permission = None - - if security_policy is not None: - permission = registry.queryMultiAdapter((context, request), - IViewPermission, - name=view_name) + if self.secured: - debug_authorization = self.debug_authorization + permitted = registry.queryMultiAdapter((context, request), + IViewPermission, + name=view_name) - permitted = _view_execution_permitted(context, request, view_name, - security_policy, permission, - debug_authorization) + if permitted is None: + if self.debug_authorization: + permitted = Allowed( + 'Allowed: view name %r in context %r (no ' + 'permission registered).' % + (view_name, context)) + else: + permitted = True - logger = self.logger + + else: + if self.debug_authorization: + permitted = Allowed( + 'Allowed: view name %r in context %r (no ' + 'authentication policy in use).' % (view_name, context)) + else: + permitted = True - if debug_authorization: + if self.debug_authorization: logger and logger.debug( 'debug_authorization of url %s (view name %r against ' 'context %r): %s' % ( request.url, view_name, context, permitted) ) + if not permitted: - if debug_authorization: + if self.debug_authorization: msg = str(permitted) else: msg = 'Unauthorized: failed security policy check' - environ['message'] = msg - unauth_app = self.unauth_app_factory() - return unauth_app(environ, start_response) + + environ['repoze.bfg.message'] = msg + + response = self.forbidden_resp_factory(context, request) + start_response(response.status, response.headerlist) + return response.app_iter response = registry.queryMultiAdapter( (context, request), IView, name=view_name) @@ -168,7 +223,7 @@ class Router(object): logger and logger.debug(msg) else: msg = request.url - environ['message'] = msg + environ['repoze.bfg.message'] = msg notfound_app = self.notfound_app_factory() return notfound_app(environ, start_response) @@ -182,28 +237,89 @@ class Router(object): 'Non-response object returned from view: %r' % response) finally: - registry_manager.pop() + self.threadlocal_manager.pop() + +def default_forbidden_view(context, request): + status = '401 Unauthorized' + try: + msg = escape(request.environ['repoze.bfg.message']) + except KeyError: + msg = '' + html = """ + <html> + <title>%s</title> + <body> + <h1>%s</h1> + <code>%s</code> + </body> + </html> + """ % (status, status, msg) + headers = [('Content-Length', str(len(html))), + ('Content-Type', 'text/html')] + response_factory = queryUtility(IResponseFactory, default=Response) + return response_factory(status = status, + headerlist = headers, + app_iter = [html]) def make_app(root_factory, package=None, filename='configure.zcml', - options=None): - """ Return a Router object, representing a ``repoze.bfg`` WSGI - application. ``root_factory`` must be a callable that accepts a - WSGI environment and returns a root object. ``package`` is a - Python module representing the application's package, ``filename`` - is the filesystem path to a ZCML file (optionally relative to the - package path) that should be parsed to create the application - registry. ``options``, if used, should be a dictionary containing - runtime options (e.g. the key/value pairs in an app section of a + authentication_policy=None, authorization_policy=None, + options=None, registry=None, debug_logger=None): + # registry and debug_logger *only* for unittests + """ Return a Router object, representing a fully configured + ``repoze.bfg`` WSGI application. + + ``root_factory`` must be a callable that accepts a WSGI + environment and returns a traversal root object. It may be + ``None``, in which case traversal is not performed at all. + Instead, all URL-to-code mapping is done via URL dispatch (aka + Routes). + + ``package`` is a Python module representing the application's + package. It is optional, defaulting to ``None``. If ``package`` + is ``None``, the ``filename`` passed must be an absolute pathname + to a ZCML file that represents the application's configuration. + + ``filename`` is the filesystem path to a ZCML file (optionally + relative to the package path) that should be parsed to create the + application registry. It defaults to ``configure.zcml``. + + ``authentication_policy`` should be an object that implements the + ``repoze.bfg.interfaces.IAuthenticationPolicy`` interface (e.g. + it might be an instance of + ``repoze.bfg.authentication.RemoteUserAuthenticationPolicy``) or + ``None``. If ``authentication_policy`` is ``None``, no + authentication or authorization will be performed. Instead, BFG + will ignore any view permission assertions in your application and + imperative security checks performed by your application will + always return ``True``. + + ``authorization_policy`` is an object that implements the + ``repoze.bfg.interfaces.IAuthorizationPoicy`` interface + (notionally) or ``None``. If the ``authentication_policy`` + argument is ``None``, this argument is ignored entirely because + being able to authorize access to a user depends on being able to + authenticate that user. If the ``authentication_policy`` argument + is *not* ``None``, and the ``authorization_policy`` argument *is* + ``None``, the authorization policy defaults to an authorization + implementation that uses ACLs. + + ``options``, if used, should be a dictionary containing runtime + options (e.g. the key/value pairs in an app section of a PasteDeploy file), with each key representing the option and the key's value representing the specific option value, e.g. ``{'reload_templates':True}``""" if options is None: options = {} + regname = filename + if package: regname = package.__name__ - registry = Registry(regname) - debug_logger = make_stream_logger('repoze.bfg.debug', sys.stderr) + if registry is None: + registry = Registry(regname) + + if debug_logger is None: + debug_logger = make_stream_logger('repoze.bfg.debug', sys.stderr) registry.registerUtility(debug_logger, ILogger, 'repoze.bfg.debug') settings = Settings(options) registry.registerUtility(settings, ISettings) @@ -221,18 +337,39 @@ def make_app(root_factory, package=None, filename='configure.zcml', 'root_factory (aka get_root) was None and no routes connected') registry.registerUtility(root_factory, IRootFactory) + + if authentication_policy: + registry.registerUtility(authentication_policy, IAuthenticationPolicy) + if authorization_policy is None: + authorization_policy = ACLAuthorizationPolicy() + registry.registerUtility(authorization_policy, IAuthorizationPolicy) + else: + # deal with bw compat of <= 0.8 security policies (deprecated) + secpol = registry.queryUtility(ISecurityPolicy) + if secpol is not None: + debug_logger.warn( + 'Your application is using a repoze.bfg ``ISecurityPolicy`` ' + '(probably registered via ZCML). This form of security policy ' + 'has been deprecated in BFG 0.9. See the "Security" chapter ' + 'of the repoze.bfg documentation to see how to register a more ' + 'up to date set of security policies (an authentication ' + 'policy and an authorization policy). ISecurityPolicy-based ' + 'security policies will cease to work in a later BFG ' + 'release.') + registerBBBAuthn(secpol, registry) + app = Router(registry) # We push the registry on to the stack here in case any ZCA API is # used in listeners subscribed to the WSGIApplicationCreatedEvent # we send. - registry_manager.push(registry) + manager.push({'registry':registry, 'request':None}) try: # use dispatch here instead of registry.notify to make unit # tests possible dispatch(WSGIApplicationCreatedEvent(app)) finally: - registry_manager.pop() + manager.pop() return app diff --git a/repoze/bfg/scifi.py b/repoze/bfg/scifi.py new file mode 100644 index 000000000..dcc8fa1c1 --- /dev/null +++ b/repoze/bfg/scifi.py @@ -0,0 +1,257 @@ +# Tweak BFG slightly to allow for separate authentication and +# authorization policies, so we don't have security policies named +# e.g. "RepozeWhoInheritingACLSecurityPolicy". We'll add +# IAuthenticationPolicy objects and IAuthorizationPolicy objects; +# these will be adapters. We'll also change ISecurityPolicy to be an +# adapter rather than a utility. We'll tweak the function API to use +# these adapters. + +# b/w incompats: the "authorization" policy needs access to the +# context, which the APIs it maps to in a current BFG security policy +# don't have; code which depends on these APIs will need to change. + +from zope.interface import implements +from zope.interface import Interface + +class IAuthenticationPolicy(Interface): + """ A multi-adapter on context and request """ + def authenticated_userid(): + """ Return the authenticated userid or None if no + authenticated userid can be found """ + + def effective_principals(): + """ Return a sequence representing the effective principals + (including the userid and any groups belonged to by the + current user, including 'system' groups such as Everyone and + Authenticated""" + + def challenge(): + """ Return an IResponse object representing a challenge, such + as a login form or a basic auth dialog """ + + def remember(self, principal, token): + """ Return a set of headers suitable for 'remembering' the + principal on subsequent requests """ + + def forget(): + """ Return a set of headers suitable for 'forgetting' the + current user on subsequent requests""" + +class IAuthorizationPolicy(Interface): + """ An adapter on context """ + def permits(self, principals, permission): + """ Return True if any of the principals is allowed the + permission in the current context, else return False """ + + def principals_allowed_by_permission(self, permission): + """ Return a set of principal identifiers allowed by the permission """ + +class ISecurityPolicy(Interface): + """ A multi-adapter on context and request """ + def permits(permission): + """ Returns True if the combination of the authorization + information in the context and the authentication data in + the request allow the action implied by the permission """ + + def authenticated_userid(): + """ Return the userid of the currently authenticated user or + None if there is no currently authenticated user """ + + def effective_principals(): + """ Return the list of 'effective' principals for the request. + This must include the userid of the currently authenticated + user if a user is currently authenticated.""" + + def principals_allowed_by_permission(permission): + """ Return a sequence of principal identifiers allowed by the + ``permission`` in the model implied by ``context``. This + method may not be supported by a given security policy + implementation, in which case, it should raise a + ``NotImplementedError`` exception.""" + + def forbidden(): + """ This method should return an IResponse object (an object + with the attributes ``status``, ``headerlist``, and + ``app_iter``) as a result of a view invocation denial. The + ``forbidden`` method of a security policy will be called by + ``repoze.bfg`` when view invocation is denied (usually as a + result of the ``permit`` method of the same security policy + returning False to the Router). + + The ``forbidden`` method of a security will not be called when + an ``IForbiddenResponseFactory`` utility is registered; + instead the ``IForbiddenResponseFactory`` utility will serve + the forbidden response. + + Note that the ``repoze.bfg.message`` key in the environ passed + to the WSGI app will contain the 'raw' reason that view + invocation was denied by repoze.bfg. The ``context`` object + passed in will be the context found by ``repoze.bfg`` when the + denial was found and the ``request`` will be the request which + caused the denial.""" + +# an implementation of an authentication policy that uses repoze.who + +class RepozeWhoAuthenticationPolicy(object): + """ A BFG authentication policy which obtains data from the + repoze.who API """ + implements(IAuthenticationPolicy) + def __init__(self, context, request): + self.context = context + self.request = request + from repoze.who.api import api_from_environ + self.api = api_from_environ(request.environ) + + def authenticated_userid(self): + identity = self.api.authenticate() + if identity is None: + return None + return identity['repoze.who.userid'] + + def effective_principals(self): + effective_principals = [Everyone] + identity = self.api.authenticate() + if identity is None: + return effective_principals + + effective_principals.append(Authenticated) + userid = identity['repoze.who.userid'] + groups = identity.get('groups', []) + effective_principals.append(userid) + effective_principals.extend(groups) + + return effective_principals + + def challenge(self): + return self.api.challenge() + + def remember(self, principal, token): + return self.api.remember({'repoze.who.userid':principal, + 'password':token}) + + def forget(self): + return self.api.forget() + +# an implementation of an authentication policy that uses a cookie + +class StandaloneAuthenticationPolicy: + """ A BFG authentication policy which obtains data from a cookie + and a local storage system """ + def __init__(self, context, request): + self.context = context + self.request = request + + def _check_password(self, userid, password): + """ Return true if the password is good for the userid """ + raise NotImplementedError # you get the idea + + def _groups_from_userid(self, userid): + """ Return a sequence of groups given a user id """ + raise NotImplementedError # you get the idea + + def _userid_from_login(self, login): + """ Return a userid given a login name """ + raise NotImplementedError # you get the idea + + def _decrypt(self, cookieval): + """ Return decrypted login and password""" + raise NotImplementedError # you get the idea + + def _encrypt(self, userid, password): + """ Return encrypted hash of userid and password for cookie val """ + raise NotImplementedError # you get the idea + + def authenticated_userid(self): + cookieval = request.cookies.get('oatmeal') + try: + login, password = self._decrypt(cookieval) + except: + return None + userid = self._userid_from_login(login) + if self._check_password(userid, password): + return userid + + def effective_principals(self): + effective_principals = [Everyone] + userid = self.authenticated_userid() + if userid is None: + return effective_principals + + effective_principals.append(Authenticated) + groups = self._groups_from_userid(userid) + effective_principals.append(userid) + effective_principals.extend(groups) + + return effective_principals + + def challenge(self): + userid = self.authenticated_userid() + if userid: + return render_template_to_response('templates/forbidden.pt') + else: + return render_template_to_response('templates/login_form.pt') + + def remember(self, principal, token): + cookieval = self._encrypt(principal, token) + return [ ('Set-Cookie', 'oatmeal=%s' % cookieval) ] + + def forget(self): + return [ ('Set-Cookie', 'oatmeal=') ] + +# an implementation of an authorization policy that uses ACLs + +class ACLAuthorizationPolicy(object): + implements(IAuthorizationPolicy) + def __init__(self, context): + self.context = context + + def permits(self, principals, permission): + """ """ + # do stuff to figure out of any of the principals is allowed + # the permission by any ACL in the current context's hierarchy + + def principals_allowed_by_permission(self, permission): + """ """ + # return the sequence of principals allowed by the permission + # according to the ACLs in the current context' hierarchy + +# present a rolled up "face" to both authn and autz policies in the +# form of a "security policy"; users will interact with this API +# rather than the authn or authz policies directly. + +class SecurityPolicy(object): + """ Use separate authn and authz to form a BFG security policy; + this will never be overridden, it is concrete. It is an adapter. """ + implements(ISecurityPolicy) + def __init__(self, context, request): + self.context = context + self.request = request + self.authn = getMultiAdapter((self.context, self.request), + IAuthenticationPolicy) + self.authz = getAdapter(context, IAuthorizationPolicy) + + def permits(self, permission): + principals = set(self.authn.effective_principals()) + + if authz.permits(principals, permission): + return True + + return False + + def authenticated_userid(self): + return self.authn.authenticated_userid() + + def effective_principals(self): + return self.authn.effective_principals() + + def principals_allowed_by_permission(self): + return self.authz.principals_allowed_by_permission() + + def forbidden(self): + return self.authn.challenge() + + def remember(self, principal, token): + return self.authn.remember(principal, token) + + def forget(self): + return self.authn.forget() diff --git a/repoze/bfg/secpols.py b/repoze/bfg/secpols.py new file mode 100644 index 000000000..0f0fc7e66 --- /dev/null +++ b/repoze/bfg/secpols.py @@ -0,0 +1,469 @@ +from zope.deprecation import deprecated +from zope.interface import implements + +from repoze.bfg.interfaces import ISecurityPolicy +from repoze.bfg.interfaces import IAuthorizationPolicy +from repoze.bfg.interfaces import IAuthenticationPolicy + +from repoze.bfg.location import lineage + +from repoze.bfg.threadlocal import manager + +from repoze.bfg.security import Allow +from repoze.bfg.security import Deny +from repoze.bfg.security import ACLAllowed +from repoze.bfg.security import ACLDenied +from repoze.bfg.security import Everyone +from repoze.bfg.security import Authenticated + +class ACLSecurityPolicy(object): + implements(ISecurityPolicy) + + def __init__(self, get_principals): + self.get_principals = get_principals + + def permits(self, context, request, permission): + """ Return ``ACLAllowed`` if the policy permits access, + ``ACLDenied`` if not. """ + principals = set(self.effective_principals(request)) + + for location in lineage(context): + try: + acl = location.__acl__ + except AttributeError: + continue + + for ace in acl: + ace_action, ace_principal, ace_permissions = ace + if ace_principal in principals: + if not hasattr(ace_permissions, '__iter__'): + ace_permissions = [ace_permissions] + if permission in ace_permissions: + if ace_action == Allow: + return ACLAllowed(ace, acl, permission, + principals, location) + else: + return ACLDenied(ace, acl, permission, + principals, location) + + # default deny if no ACE matches in the ACL found + result = ACLDenied(None, acl, permission, principals, location) + return result + + # default deny if no ACL in lineage at all + return ACLDenied(None, None, permission, principals, context) + + def authenticated_userid(self, request): + principals = self.get_principals(request) + if principals: + return principals[0] + + def effective_principals(self, request): + effective_principals = [Everyone] + principal_ids = self.get_principals(request) + + if principal_ids: + effective_principals.append(Authenticated) + effective_principals.extend(principal_ids) + + return effective_principals + + def principals_allowed_by_permission(self, context, permission): + for location in lineage(context): + try: + acl = location.__acl__ + except AttributeError: + continue + + allowed = {} + + for ace_action, ace_principal, ace_permissions in acl: + if ace_action == Allow: + if not hasattr(ace_permissions, '__iter__'): + ace_permissions = [ace_permissions] + if permission in ace_permissions: + allowed[ace_principal] = True + return sorted(allowed.keys()) + + return [] + +class InheritingACLSecurityPolicy(object): + """ A security policy which uses ACLs in the following ways: + + - When checking whether a user is permitted (via the ``permits`` + method), the security policy consults the ``context`` for an ACL + first. If no ACL exists on the context, or one does exist but + the ACL does not explicitly allow or deny access for any of the + effective principals, consult the context's parent ACL, and so + on, until the lineage is exhausted or we determine that the + policy permits or denies. + + During this processing, if any ``Deny`` ACE is found matching + any effective principal, stop processing by returning an + ``ACLDenied`` (equals False) immediately. If any ``Allow`` ACE + is found matching any effective principal, stop processing by + returning an ``ACLAllowed`` (equals True) immediately. If we + exhaust the context's lneage, and no ACE has explicitly + permitted or denied access, return an ``ACLDenied``. This + differs from the non-inheriting security policy (the + ``ACLSecurityPolicy``) by virtue of the fact that it does not + stop looking for ACLs in the object lineage after it finds the + first one. + + - When computing principals allowed by a permission via the + ``principals_allowed_by_permission`` method, we compute the set + of principals that are explicitly granted the ``permission``. + We do this by walking 'up' the object graph *from the root* to + the context. During this walking process, if we find an + explicit ``Allow`` ACE for a principal that matches the + ``permission``, the principal is included in the allow list. + However, if later in the walking process that user is mentioned + in any ``Deny`` ACE for the permission, the user is removed from + the allow list. If a ``Deny`` to the principal ``Everyone`` is + encountered during the walking process that matches the + ``permission``, the allow list is cleared for all principals + encountered in previous ACLs. The walking process ends after + we've processed the any ACL directly attached to ``context``; a + list of principals is returned. + + - Other aspects of this policy are the same as those in the + ACLSecurityPolicy (e.g. ``effective_principals``, + ``authenticated_userid``). + """ + implements(ISecurityPolicy) + + def __init__(self, get_principals): + self.get_principals = get_principals + + def permits(self, context, request, permission): + """ Return ``ACLAllowed`` if the policy permits access, + ``ACLDenied`` if not. """ + principals = set(self.effective_principals(request)) + + for location in lineage(context): + try: + acl = location.__acl__ + except AttributeError: + continue + + for ace in acl: + ace_action, ace_principal, ace_permissions = ace + if ace_principal in principals: + if not hasattr(ace_permissions, '__iter__'): + ace_permissions = [ace_permissions] + if permission in ace_permissions: + if ace_action == Allow: + return ACLAllowed(ace, acl, permission, + principals, location) + else: + return ACLDenied(ace, acl, permission, + principals, location) + + # default deny if no ACL in lineage at all + return ACLDenied(None, None, permission, principals, context) + + def authenticated_userid(self, request): + principals = self.get_principals(request) + if principals: + return principals[0] + + def effective_principals(self, request): + effective_principals = [Everyone] + principal_ids = self.get_principals(request) + + if principal_ids: + effective_principals.append(Authenticated) + effective_principals.extend(principal_ids) + + return effective_principals + + def principals_allowed_by_permission(self, context, permission): + allowed = set() + + for location in reversed(list(lineage(context))): + # NB: we're walking *up* the object graph from the root + try: + acl = location.__acl__ + except AttributeError: + continue + + allowed_here = set() + denied_here = set() + + for ace_action, ace_principal, ace_permissions in acl: + if not hasattr(ace_permissions, '__iter__'): + ace_permissions = [ace_permissions] + if ace_action == Allow and permission in ace_permissions: + if not ace_principal in denied_here: + allowed_here.add(ace_principal) + if ace_action == Deny and permission in ace_permissions: + denied_here.add(ace_principal) + if ace_principal == Everyone: + # clear the entire allowed set, as we've hit a + # deny of Everyone ala (Deny, Everyone, ALL) + allowed = set() + break + elif ace_principal in allowed: + allowed.remove(ace_principal) + + allowed.update(allowed_here) + + return allowed + +def get_remoteuser(request): + user_id = request.environ.get('REMOTE_USER') + if user_id: + return [user_id] + return [] + +def RemoteUserACLSecurityPolicy(): + """ A security policy which: + + - examines the request.environ for the REMOTE_USER variable and + uses any non-false value as a principal id for this request. + + - uses an ACL-based authorization model which attempts to find the + *first* ACL in the context' lineage. It returns ``Allowed`` from + its 'permits' method if the single ACL found grants access to the + current principal. It returns ``Denied`` if permission was not + granted (either explicitly via a deny or implicitly by not finding + a matching ACE action). The *first* ACL found in the context's + lineage is considered canonical; no searching is done for other + security attributes after the first ACL is found in the context' + lineage. Use the 'inheriting' variant of this policy to consider + more than one ACL in the lineage. + + An ACL is an ordered sequence of ACE tuples, e.g. ``[(Allow, + Everyone, 'read'), (Deny, 'george', 'write')]``. ACLs stored on + model instance objects as their ``__acl__`` attribute will be used + by the security machinery to grant or deny access. + + Enable this security policy by adding the following to your + application's ``configure.zcml``: + + .. code-block:: xml + + <utility + provides="repoze.bfg.interfaces.ISecurityPolicy" + factory="repoze.bfg.security.RemoteUserACLSecurityPolicy" + /> + + """ + return ACLSecurityPolicy(get_remoteuser) + +def RemoteUserInheritingACLSecurityPolicy(): + """ A security policy which: + + - examines the request.environ for the REMOTE_USER variable and + uses any non-false value as a principal id for this request. + + - Differs from the non-inheriting security policy variants + (e.g. ``ACLSecurityPolicy``) by virtue of the fact that it does + not stop looking for ACLs in the object lineage after it finds + the first one. + + - When checking whether a user is permitted (via the ``permits`` + method), the security policy consults the ``context`` for an ACL + first. If no ACL exists on the context, or one does exist but + the ACL does not explicitly allow or deny access for any of the + effective principals, consult the context's parent ACL, and so + on, until the lineage is exhausted or we determine that the + policy permits or denies. + + During this processing, if any ``Deny`` ACE is found matching + any effective principal, stop processing by returning an + ``ACLDenied`` (equals False) immediately. If any ``Allow`` ACE + is found matching any effective principal, stop processing by + returning an ``ACLAllowed`` (equals True) immediately. If we + exhaust the context's lneage, and no ACE has explicitly + permitted or denied access, return an ``ACLDenied``. + + - When computing principals allowed by a permission via the + ``principals_allowed_by_permission`` method, we compute the set + of principals that are explicitly granted the ``permission``. + We do this by walking 'up' the object graph *from the root* to + the context. During this walking process, if we find an + explicit ``Allow`` ACE for a principal that matches the + ``permission``, the principal is included in the allow list. + However, if later in the walking process that user is mentioned + in any ``Deny`` ACE for the permission, the user is removed from + the allow list. If a ``Deny`` to the principal ``Everyone`` is + encountered during the walking process that matches the + ``permission``, the allow list is cleared for all principals + encountered in previous ACLs. The walking process ends after + we've processed the any ACL directly attached to ``context``; a + list of principals is returned. + + - Other aspects of this policy are the same as those in the + ACLSecurityPolicy (e.g. ``effective_principals``, + ``authenticated_userid``). + + Enable this security policy by adding the following to your + application's ``configure.zcml``: + + .. code-block:: xml + + <utility + provides="repoze.bfg.interfaces.ISecurityPolicy" + factory="repoze.bfg.security.RemoteUserInheritingACLSecurityPolicy" + /> + + """ + return InheritingACLSecurityPolicy(get_remoteuser) + +def get_who_principals(request): + identity = request.environ.get('repoze.who.identity') + if not identity: + return [] + principals = [identity['repoze.who.userid']] + principals.extend(identity.get('groups', [])) + return principals + +def WhoACLSecurityPolicy(): + """ + A security policy which: + + - examines the request.environ for the ``repoze.who.identity`` + dictionary. If one is found, the principal ids for the request + are composed of ``repoze.who.identity['repoze.who.userid']`` + plus ``repoze.who.identity.get('groups', [])``. + + - uses an ACL-based authorization model which attempts to find the + *first* ACL in the context' lineage. It returns ``Allowed`` from + its 'permits' method if the single ACL found grants access to the + current principal. It returns ``Denied`` if permission was not + granted (either explicitly via a deny or implicitly by not finding + a matching ACE action). The *first* ACL found in the context's + lineage is considered canonical; no searching is done for other + security attributes after the first ACL is found in the context' + lineage. Use the 'inheriting' variant of this policy to consider + more than one ACL in the lineage. + + An ACL is an ordered sequence of ACE tuples, e.g. ``[(Allow, + Everyone, 'read'), (Deny, 'george', 'write')]``. ACLs stored on + model instance objects as their ``__acl__`` attribute will be used + by the security machinery to grant or deny access. + + Enable this security policy by adding the following to your + application's ``configure.zcml``: + + .. code-block:: xml + + <utility + provides="repoze.bfg.interfaces.ISecurityPolicy" + factory="repoze.bfg.security.WhoACLSecurityPolicy" + /> + """ + return ACLSecurityPolicy(get_who_principals) + +RepozeWhoIdentityACLSecurityPolicy = WhoACLSecurityPolicy + +deprecated('RepozeWhoIdentityACLSecurityPolicy', + '(repoze.bfg.security.RepozeWhoIdentityACLSecurityPolicy ' + 'should now be imported as ' + 'repoze.bfg.security.WhoACLSecurityPolicy)', + ) + +def WhoInheritingACLSecurityPolicy(): + """ A security policy which: + + - examines the request.environ for the ``repoze.who.identity`` + dictionary. If one is found, the principal ids for the request + are composed of ``repoze.who.identity['repoze.who.userid']`` + plus ``repoze.who.identity.get('groups', [])``. + + - Differs from the non-inheriting security policy variants + (e.g. ``ACLSecurityPolicy``) by virtue of the fact that it does + not stop looking for ACLs in the object lineage after it finds + the first one. + + - When checking whether a user is permitted (via the ``permits`` + method), the security policy consults the ``context`` for an ACL + first. If no ACL exists on the context, or one does exist but + the ACL does not explicitly allow or deny access for any of the + effective principals, consult the context's parent ACL, and so + on, until the lineage is exhausted or we determine that the + policy permits or denies. + + During this processing, if any ``Deny`` ACE is found matching + any effective principal, stop processing by returning an + ``ACLDenied`` (equals False) immediately. If any ``Allow`` ACE + is found matching any effective principal, stop processing by + returning an ``ACLAllowed`` (equals True) immediately. If we + exhaust the context's lneage, and no ACE has explicitly + permitted or denied access, return an ``ACLDenied``. + + - When computing principals allowed by a permission via the + ``principals_allowed_by_permission`` method, we compute the set + of principals that are explicitly granted the ``permission``. + We do this by walking 'up' the object graph *from the root* to + the context. During this walking process, if we find an + explicit ``Allow`` ACE for a principal that matches the + ``permission``, the principal is included in the allow list. + However, if later in the walking process that user is mentioned + in any ``Deny`` ACE for the permission, the user is removed from + the allow list. If a ``Deny`` to the principal ``Everyone`` is + encountered during the walking process that matches the + ``permission``, the allow list is cleared for all principals + encountered in previous ACLs. The walking process ends after + we've processed the any ACL directly attached to ``context``; a + list of principals is returned. + + - Other aspects of this policy are the same as those in the + ACLSecurityPolicy (e.g. ``effective_principals``, + ``authenticated_userid``). + + Enable this security policy by adding the following to your + application's ``configure.zcml``: + + .. code-block:: xml + + <utility + provides="repoze.bfg.interfaces.ISecurityPolicy" + factory="repoze.bfg.security.WhoInheritingACLSecurityPolicy" + /> + """ + return InheritingACLSecurityPolicy(get_who_principals) + + +class SecurityPolicyToAuthorizationPolicyAdapter(object): + """ An adapter registered when an old-style ISecurityPolicy + utility is configured in ZCML instead of an IAuthorizationPolicy + utility """ + implements(IAuthorizationPolicy) + def __init__(self, secpol): + self.secpol = secpol + + def permits(self, context, principals, permission): + request = manager.get()['request'] + return self.secpol.permits(context, request, permission) + + def principals_allowed_by_permission(self, context, permission): + return self.secpol.principals_allowed_by_permission(context, permission) + +class SecurityPolicyToAuthenticationPolicyAdapter(object): + implements(IAuthenticationPolicy) + def __init__(self, secpol): + self.secpol = secpol + + def authenticated_userid(self, context, request): + return self.secpol.authenticated_userid(request) + + def effective_principals(self, context, request): + return self.secpol.effective_principals(request) + + def remember(self, context, request, principal, **kw): + return [] + + def forget(self, context, request): + return [] + +def registerBBBAuthn(secpol, registry): + # Used when an explicit authentication policy is not defined, and + # an an old-style ISecurityPolicy is registered (via ZCML), turn + # it into separate authorization and authentication utilities + # using adapters + authn = SecurityPolicyToAuthenticationPolicyAdapter(secpol) + authz = SecurityPolicyToAuthorizationPolicyAdapter(secpol) + registry.registerUtility(authn, IAuthenticationPolicy) + registry.registerUtility(authz, IAuthorizationPolicy) diff --git a/repoze/bfg/security.py b/repoze/bfg/security.py index 90916bac2..5f5252ff3 100644 --- a/repoze/bfg/security.py +++ b/repoze/bfg/security.py @@ -1,10 +1,11 @@ +import warnings + +from zope.component import queryMultiAdapter from zope.component import queryUtility -from zope.deprecation import deprecated from zope.interface import implements -from repoze.bfg.location import lineage - -from repoze.bfg.interfaces import ISecurityPolicy +from repoze.bfg.interfaces import IAuthenticationPolicy +from repoze.bfg.interfaces import IAuthorizationPolicy from repoze.bfg.interfaces import IViewPermission from repoze.bfg.interfaces import IViewPermissionFactory @@ -31,458 +32,155 @@ def has_permission(permission, context, request): ``Allowed`` if the permission is granted in this context to the user implied by the request. Return an instance of ``Denied`` if this permission is not granted in this context to this user. This - delegates to the current security policy. Return True - unconditionally if no security policy has been configured in this - application.""" - policy = queryUtility(ISecurityPolicy) - if policy is None: - return Allowed('No security policy in use.') - return policy.permits(context, request, permission) - -def authenticated_userid(request): + function delegates to the current authentication and authorization + policies. Return ``Allowed`` unconditionally if no authentication + policy has been configured in this application.""" + authn_policy = queryUtility(IAuthenticationPolicy) + if authn_policy is None: + return Allowed('No authentication policy in use.') + + authz_policy = queryUtility(IAuthorizationPolicy) + if authz_policy is None: + raise ValueError('Authentication policy registered without ' + 'authorization policy') # should never happen + principals = authn_policy.effective_principals(context, request) + return authz_policy.permits(context, principals, permission) + +def authenticated_userid(*args): """ Return the userid of the currently authenticated user or - ``None`` if there is no security policy in effect or there is no - currently authenticated user""" - policy = queryUtility(ISecurityPolicy) + ``None`` if there is no authentication policy in effect or there + is no currently authenticated user. """ + + largs = len(args) + if largs > 2: + raise TypeError(args) + if largs == 1: + request = args[0] + context = None + warnings.warn( + 'As of BFG 0.9, the "repoze.bfg.security.authenticated_userid" ' + 'API now takes two arguments: "context" and "request". ' + 'It is being called it with a single argument' + '(assumed to be a request). In a future version, the ' + '"authenticated_userid API will stop accepting calls with a ' + 'single argument; please fix the calling code.', + stacklevel=2) + else: + context, request = args + + policy = queryUtility(IAuthenticationPolicy) if policy is None: return None - return policy.authenticated_userid(request) + return policy.authenticated_userid(context, request) -def effective_principals(request): +def effective_principals(*args): """ Return the list of 'effective' principal identifiers for the request. This will include the userid of the currently authenticated user if a user is currently authenticated. If no - security policy is in effect, this will return an empty sequence.""" - policy = queryUtility(ISecurityPolicy) + authentication policy is in effect, this will return an empty + sequence.""" + + largs = len(args) + if largs > 2: + raise TypeError(args) + if largs == 1: + request = args[0] + context = None + warnings.warn( + 'As of BFG 0.9, the "repoze.bfg.security.effective_principals " ' + 'API now takes two arguments: "context" and "request". ' + 'It is being called it with a single argument' + '(assumed to be a request). In a future version, the ' + '"effective_principals API will stop accepting calls with a ' + 'single argument; please fix the calling code.', + stacklevel=2) + else: + context, request = args + + policy = queryUtility(IAuthenticationPolicy) if policy is None: return [] - return policy.effective_principals(request) + return policy.effective_principals(context, request) def principals_allowed_by_permission(context, permission): """ Provided a context (a model object), and a permission (a string or unicode object), return a sequence of principal ids that - possess the permission in the context. If no security policy is - in effect, this will return a sequence with the single value + possess the permission in the context. If no authorization policy + is in effect, this will return a sequence with the single value representing ``Everyone`` (the special principal identifier - representing all principals). Note that even if a security policy - *is* in effect, some security policies may not implement the - required machinery for this function; those will cause a - ``NotImplementedError`` exception to be raised when this function - is invoked.""" - policy = queryUtility(ISecurityPolicy) + representing all principals). + + .. note:: even if an authorization policy *is* in effect, some + (exotic) authorization policies may not implement the required + machinery for this function; those will cause a + ``NotImplementedError`` exception to be raised when this + function is invoked. + """ + policy = queryUtility(IAuthorizationPolicy) if policy is None: return [Everyone] return policy.principals_allowed_by_permission(context, permission) -class ACLSecurityPolicy(object): - implements(ISecurityPolicy) - - def __init__(self, get_principals): - self.get_principals = get_principals - - def permits(self, context, request, permission): - """ Return ``ACLAllowed`` if the policy permits access, - ``ACLDenied`` if not. """ - principals = set(self.effective_principals(request)) - - for location in lineage(context): - try: - acl = location.__acl__ - except AttributeError: - continue - - for ace in acl: - ace_action, ace_principal, ace_permissions = ace - if ace_principal in principals: - if not hasattr(ace_permissions, '__iter__'): - ace_permissions = [ace_permissions] - if permission in ace_permissions: - if ace_action == Allow: - return ACLAllowed(ace, acl, permission, - principals, location) - else: - return ACLDenied(ace, acl, permission, - principals, location) - - # default deny if no ACE matches in the ACL found - result = ACLDenied(None, acl, permission, principals, location) - return result - - # default deny if no ACL in lineage at all - return ACLDenied(None, None, permission, principals, context) - - def authenticated_userid(self, request): - principals = self.get_principals(request) - if principals: - return principals[0] - - def effective_principals(self, request): - effective_principals = [Everyone] - principal_ids = self.get_principals(request) - - if principal_ids: - effective_principals.append(Authenticated) - effective_principals.extend(principal_ids) - - return effective_principals - - def principals_allowed_by_permission(self, context, permission): - for location in lineage(context): - try: - acl = location.__acl__ - except AttributeError: - continue - - allowed = {} - - for ace_action, ace_principal, ace_permissions in acl: - if ace_action == Allow: - if not hasattr(ace_permissions, '__iter__'): - ace_permissions = [ace_permissions] - if permission in ace_permissions: - allowed[ace_principal] = True - return sorted(allowed.keys()) - +def view_execution_permitted(context, request, name=''): + """ If the view specified by ``context`` and ``name`` is protected + by a permission, check the permission associated with the view + using the effective authentication/authorization policies and the + ``request``. Return a boolean result. If no authentication + policy is in effect, or if the view is not protected by a + permission, return True.""" + result = queryMultiAdapter((context, request), IViewPermission, + name=name, default=None) + if result is None: + return Allowed( + 'Allowed: view name %r in context %r (no permission defined)' % + (name, context)) + return result + +def remember(context, request, principal, **kw): + """ Return a sequence of header tuples (e.g. ``[('Set-Cookie', + 'foo=abc')]``) suitable for 'remembering' a set of credentials + implied by the data passed as ``principal`` and ``*kw`` using the + current authentication policy. Common usage might look like so + within the body of a view function (``response`` is assumed to be + an WebOb-style response object computed previously by the view + code):: + + from repoze.bfg.security import forget + headers = remember(context, request, 'chrism', password='123') + response.headerlist.extend(headers) + return response + + If no authentication policy is in use, this function will always + return an empty sequence. If used, the composition and meaning of + ``**kw`` must be agreed upon by the calling code and the effective + authentication policy.""" + policy = queryUtility(IAuthenticationPolicy) + if policy is None: return [] - -class InheritingACLSecurityPolicy(object): - """ A security policy which uses ACLs in the following ways: - - - When checking whether a user is permitted (via the ``permits`` - method), the security policy consults the ``context`` for an ACL - first. If no ACL exists on the context, or one does exist but - the ACL does not explicitly allow or deny access for any of the - effective principals, consult the context's parent ACL, and so - on, until the lineage is exhausted or we determine that the - policy permits or denies. - - During this processing, if any ``Deny`` ACE is found matching - any effective principal, stop processing by returning an - ``ACLDenied`` (equals False) immediately. If any ``Allow`` ACE - is found matching any effective principal, stop processing by - returning an ``ACLAllowed`` (equals True) immediately. If we - exhaust the context's lneage, and no ACE has explicitly - permitted or denied access, return an ``ACLDenied``. This - differs from the non-inheriting security policy (the - ``ACLSecurityPolicy``) by virtue of the fact that it does not - stop looking for ACLs in the object lineage after it finds the - first one. - - - When computing principals allowed by a permission via the - ``principals_allowed_by_permission`` method, we compute the set - of principals that are explicitly granted the ``permission``. - We do this by walking 'up' the object graph *from the root* to - the context. During this walking process, if we find an - explicit ``Allow`` ACE for a principal that matches the - ``permission``, the principal is included in the allow list. - However, if later in the walking process that user is mentioned - in any ``Deny`` ACE for the permission, the user is removed from - the allow list. If a ``Deny`` to the principal ``Everyone`` is - encountered during the walking process that matches the - ``permission``, the allow list is cleared for all principals - encountered in previous ACLs. The walking process ends after - we've processed the any ACL directly attached to ``context``; a - list of principals is returned. - - - Other aspects of this policy are the same as those in the - ACLSecurityPolicy (e.g. ``effective_principals``, - ``authenticated_userid``). - """ - implements(ISecurityPolicy) - - def __init__(self, get_principals): - self.get_principals = get_principals - - def permits(self, context, request, permission): - """ Return ``ACLAllowed`` if the policy permits access, - ``ACLDenied`` if not. """ - principals = set(self.effective_principals(request)) - - for location in lineage(context): - try: - acl = location.__acl__ - except AttributeError: - continue - - for ace in acl: - ace_action, ace_principal, ace_permissions = ace - if ace_principal in principals: - if not hasattr(ace_permissions, '__iter__'): - ace_permissions = [ace_permissions] - if permission in ace_permissions: - if ace_action == Allow: - return ACLAllowed(ace, acl, permission, - principals, location) - else: - return ACLDenied(ace, acl, permission, - principals, location) - - # default deny if no ACL in lineage at all - return ACLDenied(None, None, permission, principals, context) - - def authenticated_userid(self, request): - principals = self.get_principals(request) - if principals: - return principals[0] - - def effective_principals(self, request): - effective_principals = [Everyone] - principal_ids = self.get_principals(request) - - if principal_ids: - effective_principals.append(Authenticated) - effective_principals.extend(principal_ids) - - return effective_principals - - def principals_allowed_by_permission(self, context, permission): - allowed = set() - - for location in reversed(list(lineage(context))): - # NB: we're walking *up* the object graph from the root - try: - acl = location.__acl__ - except AttributeError: - continue - - allowed_here = set() - denied_here = set() - - for ace_action, ace_principal, ace_permissions in acl: - if not hasattr(ace_permissions, '__iter__'): - ace_permissions = [ace_permissions] - if ace_action == Allow and permission in ace_permissions: - if not ace_principal in denied_here: - allowed_here.add(ace_principal) - if ace_action == Deny and permission in ace_permissions: - denied_here.add(ace_principal) - if ace_principal == Everyone: - # clear the entire allowed set, as we've hit a - # deny of Everyone ala (Deny, Everyone, ALL) - allowed = set() - break - elif ace_principal in allowed: - allowed.remove(ace_principal) - - allowed.update(allowed_here) - - return allowed - -def get_remoteuser(request): - user_id = request.environ.get('REMOTE_USER') - if user_id: - return [user_id] - return [] - -def RemoteUserACLSecurityPolicy(): - """ A security policy which: - - - examines the request.environ for the REMOTE_USER variable and - uses any non-false value as a principal id for this request. - - - uses an ACL-based authorization model which attempts to find the - *first* ACL in the context' lineage. It returns ``Allowed`` from - its 'permits' method if the single ACL found grants access to the - current principal. It returns ``Denied`` if permission was not - granted (either explicitly via a deny or implicitly by not finding - a matching ACE action). The *first* ACL found in the context's - lineage is considered canonical; no searching is done for other - security attributes after the first ACL is found in the context' - lineage. Use the 'inheriting' variant of this policy to consider - more than one ACL in the lineage. - - An ACL is an ordered sequence of ACE tuples, e.g. ``[(Allow, - Everyone, 'read'), (Deny, 'george', 'write')]``. ACLs stored on - model instance objects as their ``__acl__`` attribute will be used - by the security machinery to grant or deny access. - - Enable this security policy by adding the following to your - application's ``configure.zcml``: - - .. code-block:: xml - - <utility - provides="repoze.bfg.interfaces.ISecurityPolicy" - factory="repoze.bfg.security.RemoteUserACLSecurityPolicy" - /> - - """ - return ACLSecurityPolicy(get_remoteuser) - -def RemoteUserInheritingACLSecurityPolicy(): - """ A security policy which: - - - examines the request.environ for the REMOTE_USER variable and - uses any non-false value as a principal id for this request. - - - Differs from the non-inheriting security policy variants - (e.g. ``ACLSecurityPolicy``) by virtue of the fact that it does - not stop looking for ACLs in the object lineage after it finds - the first one. - - - When checking whether a user is permitted (via the ``permits`` - method), the security policy consults the ``context`` for an ACL - first. If no ACL exists on the context, or one does exist but - the ACL does not explicitly allow or deny access for any of the - effective principals, consult the context's parent ACL, and so - on, until the lineage is exhausted or we determine that the - policy permits or denies. - - During this processing, if any ``Deny`` ACE is found matching - any effective principal, stop processing by returning an - ``ACLDenied`` (equals False) immediately. If any ``Allow`` ACE - is found matching any effective principal, stop processing by - returning an ``ACLAllowed`` (equals True) immediately. If we - exhaust the context's lneage, and no ACE has explicitly - permitted or denied access, return an ``ACLDenied``. - - - When computing principals allowed by a permission via the - ``principals_allowed_by_permission`` method, we compute the set - of principals that are explicitly granted the ``permission``. - We do this by walking 'up' the object graph *from the root* to - the context. During this walking process, if we find an - explicit ``Allow`` ACE for a principal that matches the - ``permission``, the principal is included in the allow list. - However, if later in the walking process that user is mentioned - in any ``Deny`` ACE for the permission, the user is removed from - the allow list. If a ``Deny`` to the principal ``Everyone`` is - encountered during the walking process that matches the - ``permission``, the allow list is cleared for all principals - encountered in previous ACLs. The walking process ends after - we've processed the any ACL directly attached to ``context``; a - list of principals is returned. - - - Other aspects of this policy are the same as those in the - ACLSecurityPolicy (e.g. ``effective_principals``, - ``authenticated_userid``). - - Enable this security policy by adding the following to your - application's ``configure.zcml``: - - .. code-block:: xml - - <utility - provides="repoze.bfg.interfaces.ISecurityPolicy" - factory="repoze.bfg.security.RemoteUserInheritingACLSecurityPolicy" - /> - - """ - return InheritingACLSecurityPolicy(get_remoteuser) - -def get_who_principals(request): - identity = request.environ.get('repoze.who.identity') - if not identity: + else: + return policy.remember(context, request, principal, **kw) + +def forget(context, request): + """ Return a sequence of header tuples (e.g. ``[('Set-Cookie', + 'foo=abc')]``) suitable for 'forgetting' the set of credentials + possessed by the currently authenticated user. A common usage + might look like so within the body of a view function + (``response`` is assumed to be an WebOb-style response object + computed previously by the view code):: + + from repoze.bfg.security import forget + headers = forget(context, request) + response.headerlist.extend(headers) + return response + + If no authentication policy is in use, this function will always + return an empty sequence.""" + policy = queryUtility(IAuthenticationPolicy) + if policy is None: return [] - principals = [identity['repoze.who.userid']] - principals.extend(identity.get('groups', [])) - return principals - -def WhoACLSecurityPolicy(): - """ - A security policy which: - - - examines the request.environ for the ``repoze.who.identity`` - dictionary. If one is found, the principal ids for the request - are composed of ``repoze.who.identity['repoze.who.userid']`` - plus ``repoze.who.identity.get('groups', [])``. - - - uses an ACL-based authorization model which attempts to find the - *first* ACL in the context' lineage. It returns ``Allowed`` from - its 'permits' method if the single ACL found grants access to the - current principal. It returns ``Denied`` if permission was not - granted (either explicitly via a deny or implicitly by not finding - a matching ACE action). The *first* ACL found in the context's - lineage is considered canonical; no searching is done for other - security attributes after the first ACL is found in the context' - lineage. Use the 'inheriting' variant of this policy to consider - more than one ACL in the lineage. - - An ACL is an ordered sequence of ACE tuples, e.g. ``[(Allow, - Everyone, 'read'), (Deny, 'george', 'write')]``. ACLs stored on - model instance objects as their ``__acl__`` attribute will be used - by the security machinery to grant or deny access. - - Enable this security policy by adding the following to your - application's ``configure.zcml``: - - .. code-block:: xml - - <utility - provides="repoze.bfg.interfaces.ISecurityPolicy" - factory="repoze.bfg.security.WhoACLSecurityPolicy" - /> - """ - return ACLSecurityPolicy(get_who_principals) - -RepozeWhoIdentityACLSecurityPolicy = WhoACLSecurityPolicy - -deprecated('RepozeWhoIdentityACLSecurityPolicy', - '(repoze.bfg.security.RepozeWhoIdentityACLSecurityPolicy ' - 'should now be imported as ' - 'repoze.bfg.security.WhoACLSecurityPolicy)', - ) - -def WhoInheritingACLSecurityPolicy(): - """ A security policy which: - - - examines the request.environ for the ``repoze.who.identity`` - dictionary. If one is found, the principal ids for the request - are composed of ``repoze.who.identity['repoze.who.userid']`` - plus ``repoze.who.identity.get('groups', [])``. - - - Differs from the non-inheriting security policy variants - (e.g. ``ACLSecurityPolicy``) by virtue of the fact that it does - not stop looking for ACLs in the object lineage after it finds - the first one. - - - When checking whether a user is permitted (via the ``permits`` - method), the security policy consults the ``context`` for an ACL - first. If no ACL exists on the context, or one does exist but - the ACL does not explicitly allow or deny access for any of the - effective principals, consult the context's parent ACL, and so - on, until the lineage is exhausted or we determine that the - policy permits or denies. - - During this processing, if any ``Deny`` ACE is found matching - any effective principal, stop processing by returning an - ``ACLDenied`` (equals False) immediately. If any ``Allow`` ACE - is found matching any effective principal, stop processing by - returning an ``ACLAllowed`` (equals True) immediately. If we - exhaust the context's lneage, and no ACE has explicitly - permitted or denied access, return an ``ACLDenied``. - - - When computing principals allowed by a permission via the - ``principals_allowed_by_permission`` method, we compute the set - of principals that are explicitly granted the ``permission``. - We do this by walking 'up' the object graph *from the root* to - the context. During this walking process, if we find an - explicit ``Allow`` ACE for a principal that matches the - ``permission``, the principal is included in the allow list. - However, if later in the walking process that user is mentioned - in any ``Deny`` ACE for the permission, the user is removed from - the allow list. If a ``Deny`` to the principal ``Everyone`` is - encountered during the walking process that matches the - ``permission``, the allow list is cleared for all principals - encountered in previous ACLs. The walking process ends after - we've processed the any ACL directly attached to ``context``; a - list of principals is returned. - - - Other aspects of this policy are the same as those in the - ACLSecurityPolicy (e.g. ``effective_principals``, - ``authenticated_userid``). - - Enable this security policy by adding the following to your - application's ``configure.zcml``: - - .. code-block:: xml - - <utility - provides="repoze.bfg.interfaces.ISecurityPolicy" - factory="repoze.bfg.security.WhoInheritingACLSecurityPolicy" - /> - """ - return InheritingACLSecurityPolicy(get_who_principals) - + else: + return policy.forget(context, request) + class PermitsResult(int): def __new__(cls, s, *args): inst = int.__new__(cls, cls.boolval) @@ -503,17 +201,18 @@ class PermitsResult(int): self.msg) class Denied(PermitsResult): - """ An instance of ``Denied`` is returned when a security policy - or other ``repoze.bfg`` code denies an action unlrelated to an ACL - check. It evaluates equal to all boolean false types. It has an - attribute named ``msg`` describing the circumstances for the deny.""" + """ An instance of ``Denied`` is returned when a security-related + API or other ``repoze.bfg`` code denies an action unlrelated to an + ACL check. It evaluates equal to all boolean false types. It has + an attribute named ``msg`` describing the circumstances for the + deny.""" boolval = 0 class Allowed(PermitsResult): - """ An instance of ``Allowed`` is returned when a security policy - or other ``repoze.bfg`` code allows an action unrelated to an ACL - check. It evaluates equal to all boolean true types. It has an - attribute named ``msg`` describing the circumstances for the + """ An instance of ``Allowed`` is returned when a security-related + API or other ``repoze.bfg`` code allows an action unrelated to an + ACL check. It evaluates equal to all boolean true types. It has + an attribute named ``msg`` describing the circumstances for the allow.""" boolval = 1 @@ -566,35 +265,25 @@ class ACLAllowed(ACLPermitsResult): as the ``msg`` attribute.""" boolval = 1 -class ViewPermission(object): - implements(IViewPermission) - def __init__(self, context, request, permission_name): - self.context = context - self.request = request - self.permission_name = permission_name - - def __call__(self, security_policy, debug_info=None): - return security_policy.permits(self.context, - self.request, - self.permission_name) - - def __repr__(self): - view_name = getattr(self.request, 'view_name', None) - return '<Permission at %s named %r for %r>' % (id(self), - self.permission_name, - view_name) - class ViewPermissionFactory(object): implements(IViewPermissionFactory) + def __init__(self, permission_name): self.permission_name = permission_name def __call__(self, context, request): - return ViewPermission(context, request, self.permission_name) - + return has_permission(self.permission_name, context, request) + class Unauthorized(Exception): pass +# BBB imports: these must come at the end of the file, as there's a +# circular dependency between secpols and security +from repoze.bfg.secpols import ACLSecurityPolicy +from repoze.bfg.secpols import InheritingACLSecurityPolicy +from repoze.bfg.secpols import RemoteUserACLSecurityPolicy +from repoze.bfg.secpols import RemoteUserInheritingACLSecurityPolicy +from repoze.bfg.secpols import WhoACLSecurityPolicy +from repoze.bfg.secpols import WhoInheritingACLSecurityPolicy +# /BBB imports - - diff --git a/repoze/bfg/testing.py b/repoze/bfg/testing.py index d4ff6fe4d..bd3104a9d 100644 --- a/repoze/bfg/testing.py +++ b/repoze/bfg/testing.py @@ -8,22 +8,23 @@ from repoze.bfg.interfaces import IRequest _marker = [] def registerDummySecurityPolicy(userid=None, groupids=(), permissive=True): - """ Registers a dummy ``repoze.bfg`` security policy using the - userid ``userid`` and the group ids ``groupids``. If - ``permissive`` is true, a 'permissive' security policy is - registered; this policy allows all access. If ``permissive`` is - false, a nonpermissive security policy is registered; this policy - denies all access. This function is most useful when testing code - that uses the ``repoze.bfg.security`` APIs named - ``has_permission``, ``authenticated_userid``, - ``effective_principals``, and ``principals_allowed_by_permission``. + """ Registers a dummy ``repoze.bfg`` security policy (actually, a + pair of policies: an authorization policy and an authentication + policy) using the userid ``userid`` and the group ids + ``groupids``. If ``permissive`` is true, a 'permissive' security + policy is registered; this policy allows all access. If + ``permissive`` is false, a nonpermissive security policy is + registered; this policy denies all access. This function is most + useful when testing code that uses the ``repoze.bfg.security`` + APIs named ``has_permission``, ``authenticated_userid``, + ``effective_principals``, ``principals_allowed_by_permission``, + and so on. """ - if permissive: - policy = DummyAllowingSecurityPolicy(userid, groupids) - else: - policy = DummyDenyingSecurityPolicy(userid, groupids) - from repoze.bfg.interfaces import ISecurityPolicy - return registerUtility(policy, ISecurityPolicy) + policy = DummySecurityPolicy(userid, groupids, permissive) + from repoze.bfg.interfaces import IAuthorizationPolicy + from repoze.bfg.interfaces import IAuthenticationPolicy + registerUtility(policy, IAuthorizationPolicy) + registerUtility(policy, IAuthenticationPolicy) def registerModels(models): """ Registers a dictionary of models. This is most useful for @@ -76,7 +77,9 @@ def registerView(name, result='', view=None, for_=(Interface, Interface)): with code that wants to call, e.g. ``repoze.bfg.view.render_view_to_response``.""" if view is None: - view = make_view(result) + def view(context, request): + from webob import Response + return Response(result) from repoze.bfg.interfaces import IView return registerAdapter(view, for_, IView, name) @@ -101,7 +104,8 @@ def registerViewPermission(name, result=True, viewpermission=None, else: result = Denied('message') if viewpermission is None: - viewpermission = make_view_permission(result) + def viewpermission(context, request): + return result from repoze.bfg.interfaces import IViewPermission return registerAdapter(viewpermission, for_, IViewPermission, name) @@ -165,15 +169,17 @@ def registerTraverserFactory(traverser, for_=Interface): from repoze.bfg.interfaces import ITraverserFactory return registerAdapter(traverser, for_, ITraverserFactory) -class _DummySecurityPolicy: - def __init__(self, userid=None, groupids=()): +class DummySecurityPolicy: + """ A standin for both an IAuthentication and IAuthorization policy """ + def __init__(self, userid=None, groupids=(), permissive=True): self.userid = userid self.groupids = groupids + self.permissive = permissive - def authenticated_userid(self, request): + def authenticated_userid(self, context, request): return self.userid - def effective_principals(self, request): + def effective_principals(self, context, request): from repoze.bfg.security import Everyone from repoze.bfg.security import Authenticated effective_principals = [Everyone] @@ -183,19 +189,17 @@ class _DummySecurityPolicy: effective_principals.extend(self.groupids) return effective_principals -class DummyAllowingSecurityPolicy(_DummySecurityPolicy): - def permits(self, context, request, permission): - return True + def remember(self, context, request, principal, **kw): + return [] - def principals_allowed_by_permission(self, context, permission): - return self.effective_principals(None) + def forget(self, context, request): + return [] -class DummyDenyingSecurityPolicy(_DummySecurityPolicy): - def permits(self, context, request, permission): - return False + def permits(self, context, principals, permission): + return self.permissive def principals_allowed_by_permission(self, context, permission): - return [] + return self.effective_principals(None, None) def make_traverser_factory(root): class DummyTraverserFactory: @@ -258,23 +262,6 @@ class DummyTemplateRenderer: v, k, myval)) return True -def make_view(result): - def dummy_view(context, request): - from webob import Response - return Response(result) - return dummy_view - -def make_view_permission(result): - class DummyViewPermission: - def __init__(self, context, request): - self.context = context - self.request = request - - def __call__(self, secpol): - return result - - return DummyViewPermission - class DummyModel: """ A dummy :mod:`repoze.bfg` model object. The value of ``name`` to the constructor will be used as the ``__name__`` attribute of @@ -432,8 +419,8 @@ def cleanUp(): from zope.component.globalregistry import base from zope.configuration.xmlconfig import _clearContext from repoze.bfg.registry import original_getSiteManager -from repoze.bfg.registry import registry_manager +from repoze.bfg.threadlocal import manager addCleanUp(original_getSiteManager.reset) -addCleanUp(registry_manager.clear) +addCleanUp(manager.clear) addCleanUp(lambda: base.__init__('base')) addCleanUp(_clearContext) diff --git a/repoze/bfg/tests/test_authentication.py b/repoze/bfg/tests/test_authentication.py new file mode 100644 index 000000000..a23ffeac2 --- /dev/null +++ b/repoze/bfg/tests/test_authentication.py @@ -0,0 +1,176 @@ +import unittest + +class TestRepozeWho1AuthenticationPolicy(unittest.TestCase): + def _getTargetClass(self): + from repoze.bfg.authentication import RepozeWho1AuthenticationPolicy + return RepozeWho1AuthenticationPolicy + + def _makeOne(self): + return self._getTargetClass()() + + def test_class_implements_IAuthenticationPolicy(self): + from zope.interface.verify import verifyClass + from repoze.bfg.interfaces import IAuthenticationPolicy + verifyClass(IAuthenticationPolicy, self._getTargetClass()) + + def test_instance_implements_IAuthenticationPolicy(self): + from zope.interface.verify import verifyObject + from repoze.bfg.interfaces import IAuthenticationPolicy + verifyObject(IAuthenticationPolicy, self._makeOne()) + + def test_authenticated_userid_None(self): + context = DummyContext() + request = DummyRequest({}) + policy = self._makeOne() + self.assertEqual(policy.authenticated_userid(context, request), None) + + def test_authenticated_userid(self): + context = DummyContext() + request = DummyRequest( + {'repoze.who.identity':{'repoze.who.userid':'fred'}}) + policy = self._makeOne() + self.assertEqual(policy.authenticated_userid(context, request), 'fred') + + def test_effective_principals_None(self): + from repoze.bfg.security import Everyone + context = DummyContext() + request = DummyRequest({}) + policy = self._makeOne() + self.assertEqual(policy.effective_principals(context, request), + [Everyone]) + + def test_effective_principals_userid_only(self): + from repoze.bfg.security import Everyone + from repoze.bfg.security import Authenticated + context = DummyContext() + request = DummyRequest( + {'repoze.who.identity':{'repoze.who.userid':'fred'}}) + policy = self._makeOne() + self.assertEqual(policy.effective_principals(context, request), + [Everyone, Authenticated, 'fred']) + + def test_effective_principals_userid_and_groups(self): + from repoze.bfg.security import Everyone + from repoze.bfg.security import Authenticated + context = DummyContext() + request = DummyRequest( + {'repoze.who.identity':{'repoze.who.userid':'fred', + 'groups':['quux', 'biz']}}) + policy = self._makeOne() + self.assertEqual(policy.effective_principals(context, request), + [Everyone, Authenticated, 'fred', 'quux', 'biz']) + + def test_remember_no_plugins(self): + context = DummyContext() + authtkt = DummyPlugin() + request = DummyRequest({}) + policy = self._makeOne() + result = policy.remember(context, request, 'fred') + self.assertEqual(result, []) + + def test_remember(self): + context = DummyContext() + authtkt = DummyPlugin() + request = DummyRequest( + {'repoze.who.plugins':{'auth_tkt':authtkt}}) + policy = self._makeOne() + result = policy.remember(context, request, 'fred') + self.assertEqual(result[0], request.environ) + self.assertEqual(result[1], {'repoze.who.userid':'fred'}) + + def test_forget_no_plugins(self): + context = DummyContext() + authtkt = DummyPlugin() + request = DummyRequest({}) + policy = self._makeOne() + result = policy.forget(context, request) + self.assertEqual(result, []) + + def test_forget(self): + context = DummyContext() + authtkt = DummyPlugin() + request = DummyRequest( + {'repoze.who.plugins':{'auth_tkt':authtkt}, + 'repoze.who.identity':{'repoze.who.userid':'fred'}, + }) + policy = self._makeOne() + result = policy.forget(context, request) + self.assertEqual(result[0], request.environ) + self.assertEqual(result[1], request.environ['repoze.who.identity']) + +class TestRemoteUserAuthenticationPolicy(unittest.TestCase): + def _getTargetClass(self): + from repoze.bfg.authentication import RemoteUserAuthenticationPolicy + return RemoteUserAuthenticationPolicy + + def _makeOne(self): + return self._getTargetClass()() + + def test_class_implements_IAuthenticationPolicy(self): + from zope.interface.verify import verifyClass + from repoze.bfg.interfaces import IAuthenticationPolicy + verifyClass(IAuthenticationPolicy, self._getTargetClass()) + + def test_instance_implements_IAuthenticationPolicy(self): + from zope.interface.verify import verifyObject + from repoze.bfg.interfaces import IAuthenticationPolicy + verifyObject(IAuthenticationPolicy, self._makeOne()) + + def test_authenticated_userid_None(self): + context = DummyContext() + request = DummyRequest({}) + policy = self._makeOne() + self.assertEqual(policy.authenticated_userid(context, request), None) + + def test_authenticated_userid(self): + context = DummyContext() + request = DummyRequest({'REMOTE_USER':'fred'}) + policy = self._makeOne() + self.assertEqual(policy.authenticated_userid(context, request), 'fred') + + def test_effective_principals_None(self): + from repoze.bfg.security import Everyone + context = DummyContext() + request = DummyRequest({}) + policy = self._makeOne() + self.assertEqual(policy.effective_principals(context, request), + [Everyone]) + + def test_effective_principals(self): + from repoze.bfg.security import Everyone + from repoze.bfg.security import Authenticated + context = DummyContext() + request = DummyRequest({'REMOTE_USER':'fred'}) + policy = self._makeOne() + self.assertEqual(policy.effective_principals(context, request), + [Everyone, Authenticated, 'fred']) + + def test_remember(self): + context = DummyContext() + authtkt = DummyPlugin() + request = DummyRequest({'REMOTE_USER':'fred'}) + policy = self._makeOne() + result = policy.remember(context, request, 'fred') + self.assertEqual(result, []) + + def test_forget(self): + context = DummyContext() + authtkt = DummyPlugin() + request = DummyRequest({'REMOTE_USER':'fred'}) + policy = self._makeOne() + result = policy.forget(context, request) + self.assertEqual(result, []) + +class DummyContext: + pass + +class DummyRequest: + def __init__(self, environ): + self.environ = environ + +class DummyPlugin: + def remember(self, environ, identity): + return environ, identity + + def forget(self, environ, identity): + return environ, identity diff --git a/repoze/bfg/tests/test_authorization.py b/repoze/bfg/tests/test_authorization.py new file mode 100644 index 000000000..8aa9b9abf --- /dev/null +++ b/repoze/bfg/tests/test_authorization.py @@ -0,0 +1,179 @@ +import unittest + +from repoze.bfg.testing import cleanUp + +class TestACLAuthorizationPolicy(unittest.TestCase): + def setUp(self): + cleanUp() + + def tearDown(self): + cleanUp() + + def _getTargetClass(self): + from repoze.bfg.authorization import ACLAuthorizationPolicy + return ACLAuthorizationPolicy + + def _makeOne(self): + return self._getTargetClass()() + + def test_class_implements_IAuthorizationPolicy(self): + from zope.interface.verify import verifyClass + from repoze.bfg.interfaces import IAuthorizationPolicy + verifyClass(IAuthorizationPolicy, self._getTargetClass()) + + def test_instance_implements_IAuthorizationPolicy(self): + from zope.interface.verify import verifyObject + from repoze.bfg.interfaces import IAuthorizationPolicy + verifyObject(IAuthorizationPolicy, self._makeOne()) + + def test_permits_no_acl(self): + context = DummyContext() + policy = self._makeOne() + self.assertEqual(policy.permits(context, [], 'view'), False) + + def test_permits(self): + from repoze.bfg.security import Deny + from repoze.bfg.security import Allow + from repoze.bfg.security import Everyone + from repoze.bfg.security import Authenticated + from repoze.bfg.security import ALL_PERMISSIONS + from repoze.bfg.security import DENY_ALL + root = DummyContext() + community = DummyContext(__name__='community', __parent__=root) + blog = DummyContext(__name__='blog', __parent__=community) + root.__acl__ = [ + (Allow, Authenticated, VIEW), + ] + community.__acl__ = [ + (Allow, 'fred', ALL_PERMISSIONS), + (Allow, 'wilma', VIEW), + DENY_ALL, + ] + blog.__acl__ = [ + (Allow, 'barney', MEMBER_PERMS), + (Allow, 'wilma', VIEW), + ] + + policy = self._makeOne() + + result = policy.permits(blog, [Everyone, Authenticated, 'wilma'], + 'view') + self.assertEqual(result, True) + self.assertEqual(result.context, blog) + self.assertEqual(result.ace, (Allow, 'wilma', VIEW)) + + result = policy.permits(blog, [Everyone, Authenticated, 'wilma'], + 'delete') + self.assertEqual(result, False) + self.assertEqual(result.context, community) + self.assertEqual(result.ace, (Deny, Everyone, ALL_PERMISSIONS)) + + result = policy.permits(blog, [Everyone, Authenticated, 'fred'], 'view') + self.assertEqual(result, True) + self.assertEqual(result.context, community) + self.assertEqual(result.ace, (Allow, 'fred', ALL_PERMISSIONS)) + result = policy.permits(blog, [Everyone, Authenticated, 'fred'], + 'doesntevenexistyet') + self.assertEqual(result, True) + self.assertEqual(result.context, community) + self.assertEqual(result.ace, (Allow, 'fred', ALL_PERMISSIONS)) + + result = policy.permits(blog, [Everyone, Authenticated, 'barney'], + 'view') + self.assertEqual(result, True) + self.assertEqual(result.context, blog) + self.assertEqual(result.ace, (Allow, 'barney', MEMBER_PERMS)) + result = policy.permits(blog, [Everyone, Authenticated, 'barney'], + 'administer') + self.assertEqual(result, False) + self.assertEqual(result.context, community) + self.assertEqual(result.ace, (Deny, Everyone, ALL_PERMISSIONS)) + + result = policy.permits(root, [Everyone, Authenticated, 'someguy'], + 'view') + self.assertEqual(result, True) + self.assertEqual(result.context, root) + self.assertEqual(result.ace, (Allow, Authenticated, VIEW)) + result = policy.permits(blog, + [Everyone, Authenticated, 'someguy'], 'view') + self.assertEqual(result, False) + self.assertEqual(result.context, community) + self.assertEqual(result.ace, (Deny, Everyone, ALL_PERMISSIONS)) + + result = policy.permits(root, [Everyone], 'view') + self.assertEqual(result, False) + self.assertEqual(result.context, root) + self.assertEqual(result.ace, None) + + context = DummyContext() + result = policy.permits(context, [Everyone], 'view') + self.assertEqual(result, False) + + def test_principals_allowed_by_permission_direct(self): + from repoze.bfg.security import Allow + from repoze.bfg.security import DENY_ALL + context = DummyContext() + acl = [ (Allow, 'chrism', ('read', 'write')), + DENY_ALL, + (Allow, 'other', 'read') ] + context.__acl__ = acl + policy = self._makeOne() + result = sorted( + policy.principals_allowed_by_permission(context, 'read')) + self.assertEqual(result, ['chrism']) + + def test_principals_allowed_by_permission(self): + from repoze.bfg.security import Allow + from repoze.bfg.security import Deny + from repoze.bfg.security import DENY_ALL + from repoze.bfg.security import ALL_PERMISSIONS + root = DummyContext(__name__='', __parent__=None) + community = DummyContext(__name__='community', __parent__=root) + blog = DummyContext(__name__='blog', __parent__=community) + root.__acl__ = [ (Allow, 'chrism', ('read', 'write')), + (Allow, 'other', ('read',)), + (Allow, 'jim', ALL_PERMISSIONS)] + community.__acl__ = [ (Deny, 'flooz', 'read'), + (Allow, 'flooz', 'read'), + (Allow, 'mork', 'read'), + (Deny, 'jim', 'read'), + (Allow, 'someguy', 'manage')] + blog.__acl__ = [ (Allow, 'fred', 'read'), + DENY_ALL] + + policy = self._makeOne() + + result = sorted(policy.principals_allowed_by_permission(blog, 'read')) + self.assertEqual(result, ['fred']) + result = sorted(policy.principals_allowed_by_permission(community, + 'read')) + self.assertEqual(result, ['chrism', 'mork', 'other']) + result = sorted(policy.principals_allowed_by_permission(community, + 'read')) + result = sorted(policy.principals_allowed_by_permission(root, 'read')) + self.assertEqual(result, ['chrism', 'jim', 'other']) + + def test_principals_allowed_by_permission_no_acls(self): + context = DummyContext() + policy = self._makeOne() + result = sorted(policy.principals_allowed_by_permission(context,'read')) + self.assertEqual(result, []) + +class DummyContext: + def __init__(self, *arg, **kw): + self.__dict__.update(kw) + + +VIEW = 'view' +EDIT = 'edit' +CREATE = 'create' +DELETE = 'delete' +MODERATE = 'moderate' +ADMINISTER = 'administer' +COMMENT = 'comment' + +GUEST_PERMS = (VIEW, COMMENT) +MEMBER_PERMS = GUEST_PERMS + (EDIT, CREATE, DELETE) +MODERATOR_PERMS = MEMBER_PERMS + (MODERATE,) +ADMINISTRATOR_PERMS = MODERATOR_PERMS + (ADMINISTER,) + diff --git a/repoze/bfg/tests/test_integration.py b/repoze/bfg/tests/test_integration.py index fdb85b87e..3630a2778 100644 --- a/repoze/bfg/tests/test_integration.py +++ b/repoze/bfg/tests/test_integration.py @@ -120,7 +120,7 @@ class TestFixtureApp(unittest.TestCase): def tearDown(self): cleanUp() - def test_registry_actions_can_be_pickled_and_unpickled(self): + def test_execute_actions(self): import repoze.bfg.tests.fixtureapp as package from zope.configuration import config from zope.configuration import xmlconfig @@ -129,11 +129,6 @@ class TestFixtureApp(unittest.TestCase): context.package = package xmlconfig.include(context, 'configure.zcml', package) context.execute_actions(clear=False) - actions = context.actions - import cPickle - dumped = cPickle.dumps(actions, -1) - new = cPickle.loads(dumped) - self.assertEqual(len(actions), len(new)) class TestGrokkedApp(unittest.TestCase): def setUp(self): diff --git a/repoze/bfg/tests/test_registry.py b/repoze/bfg/tests/test_registry.py index 77031806f..d145e7700 100644 --- a/repoze/bfg/tests/test_registry.py +++ b/repoze/bfg/tests/test_registry.py @@ -53,10 +53,10 @@ class TestPopulateRegistry(unittest.TestCase): def test_it(self): from repoze.bfg.tests import fixtureapp dummylock = DummyLock() - dummyregmgr = DummyRegistrationManager() - import repoze.bfg.registry + dummyregmgr = DummyThreadLocalManager({'registry':None}) + import repoze.bfg.threadlocal try: - old = repoze.bfg.registry.setRegistryManager(dummyregmgr) + old = repoze.bfg.threadlocal.setManager(dummyregmgr) from zope.component.registry import Components registry = Components('hello') self._callFUT(registry, @@ -65,49 +65,9 @@ class TestPopulateRegistry(unittest.TestCase): lock=dummylock) self.assertEqual(dummylock.acquired, True) self.assertEqual(dummylock.released, True) - self.assertEqual(dummyregmgr.registry, registry) + self.assertEqual(dummyregmgr.data['registry'], None) finally: - repoze.bfg.registry.setRegistryManager(old) - -class TestThreadLocalRegistryManager(unittest.TestCase): - def setUp(self): - cleanUp() - - def tearDown(self): - cleanUp() - - def _getTargetClass(self): - from repoze.bfg.registry import ThreadLocalRegistryManager - return ThreadLocalRegistryManager - - def _makeOne(self): - return self._getTargetClass()() - - def test_init(self): - local = self._makeOne() - from zope.component import getGlobalSiteManager - self.assertEqual(local.stack, []) - self.assertEqual(local.get(), getGlobalSiteManager()) - - def test_push_and_pop(self): - local = self._makeOne() - from zope.component import getGlobalSiteManager - local.push(True) - self.assertEqual(local.get(), True) - self.assertEqual(local.pop(), True) - self.assertEqual(local.pop(), None) - self.assertEqual(local.get(), getGlobalSiteManager()) - - def test_set_get_and_clear(self): - local = self._makeOne() - from zope.component import getGlobalSiteManager - local.set(None) - self.assertEqual(local.stack, [None]) - self.assertEqual(local.get(), None) - local.clear() - self.assertEqual(local.get(), getGlobalSiteManager()) - local.clear() - self.assertEqual(local.get(), getGlobalSiteManager()) + repoze.bfg.threadlocal.setManager(old) class GetSiteManagerTests(unittest.TestCase): def _callFUT(self, context=None): @@ -122,16 +82,10 @@ class GetSiteManagerTests(unittest.TestCase): from zope.component.interfaces import ComponentLookupError self.assertRaises(ComponentLookupError, self._callFUT, object) -class DummyRegistrationManager: - def push(self, registry): - self.registry = registry - - def pop(self): - self.popped = True - - def get(self): - return self.registry - +class DummyThreadLocalManager: + def __init__(self, data): + self.data = data + class DummyLock: def acquire(self): self.acquired = True diff --git a/repoze/bfg/tests/test_router.py b/repoze/bfg/tests/test_router.py index db47f832e..9a29967a3 100644 --- a/repoze/bfg/tests/test_router.py +++ b/repoze/bfg/tests/test_router.py @@ -16,14 +16,7 @@ class RouterTests(unittest.TestCase): def _registerLogger(self): from repoze.bfg.interfaces import ILogger - class Logger: - def __init__(self): - self.messages = [] - def info(self, msg): - self.messages.append(msg) - warn = info - debug = info - logger = Logger() + logger = DummyLogger() self.registry.registerUtility(logger, ILogger, name='repoze.bfg.debug') return logger @@ -38,6 +31,12 @@ class RouterTests(unittest.TestCase): settings = Settings(**defaultkw) self.registry.registerUtility(settings, ISettings) + def _registerAuthenticationPolicy(self): + from repoze.bfg.interfaces import IAuthenticationPolicy + policy = DummyAuthenticationPolicy() + self.registry.registerUtility(policy, IAuthenticationPolicy) + return policy + def _registerTraverserFactory(self, context, view_name='', subpath=None, traversed=None, virtual_root=None, virtual_root_path=None, **kw): @@ -74,13 +73,19 @@ class RouterTests(unittest.TestCase): from repoze.bfg.interfaces import IView self.registry.registerAdapter(app, for_, IView, name) - def _registerPermission(self, permission, name, *for_): + def _registerViewPermission(self, view_name, allow=True): + from zope.interface import Interface from repoze.bfg.interfaces import IViewPermission - self.registry.registerAdapter(permission, for_, IViewPermission, name) - - def _registerSecurityPolicy(self, secpol): - from repoze.bfg.interfaces import ISecurityPolicy - self.registry.registerUtility(secpol, ISecurityPolicy) + class Checker(object): + def __call__(self, context, request): + self.context = context + self.request = request + return allow + checker = Checker() + self.registry.registerAdapter(checker, (Interface, Interface), + IViewPermission, + view_name) + return checker def _registerEventListener(self, iface): L = [] @@ -89,9 +94,11 @@ class RouterTests(unittest.TestCase): self.registry.registerHandler(listener, (iface,)) return L - def _registerRootFactory(self, root_factory): + def _registerRootFactory(self, val): + rootfactory = make_rootfactory(val) from repoze.bfg.interfaces import IRootFactory - self.registry.registerUtility(root_factory, IRootFactory) + self.registry.registerUtility(rootfactory, IRootFactory) + return rootfactory def _getTargetClass(self): from repoze.bfg.router import Router @@ -113,21 +120,63 @@ class RouterTests(unittest.TestCase): return environ def test_root_policy(self): - rootfactory = make_rootfactory(None) environ = self._makeEnviron() context = DummyContext() self._registerTraverserFactory(context) - self._registerRootFactory(rootfactory) + rootfactory = self._registerRootFactory(None) router = self._makeOne() self.assertEqual(router.root_policy, rootfactory) + def test_inotfound_appfactory_override(self): + from repoze.bfg.interfaces import INotFoundAppFactory + def app(): + """ """ + self.registry.registerUtility(app, INotFoundAppFactory) + self._registerRootFactory(None) + router = self._makeOne() + self.assertEqual(router.notfound_app_factory, app) + + def test_iforbidden_responsefactory_override(self): + from repoze.bfg.interfaces import IForbiddenResponseFactory + def app(): + """ """ + self.registry.registerUtility(app, IForbiddenResponseFactory) + self._registerRootFactory(None) + router = self._makeOne() + self.assertEqual(router.forbidden_resp_factory, app) + + def test_iforbidden_responsefactory_nooverride(self): + context = DummyContext() + self._registerRootFactory(None) + router = self._makeOne() + from repoze.bfg.router import default_forbidden_view + self.assertEqual(router.forbidden_resp_factory, default_forbidden_view) + + def test_secpol_with_iunauthorized_appfactory(self): + from repoze.bfg.interfaces import IUnauthorizedAppFactory + environ = self._makeEnviron() + context = DummyContext() + self._registerTraverserFactory(context) + rootfactory = self._registerRootFactory(None) + logger = self._registerLogger() + def factory(): + return 'yo' + self.registry.registerUtility(factory, IUnauthorizedAppFactory) + router = self._makeOne() + self.assertEqual(len(logger.messages), 1) + self.failUnless('IForbiddenResponseFactory' in logger.messages[0]) + class DummyRequest: + def get_response(self, app): + return app + req = DummyRequest() + self.assertEqual(router.forbidden_resp_factory(None, req), 'yo') + def test_call_no_view_registered_no_isettings(self): - rootfactory = make_rootfactory(None) environ = self._makeEnviron() context = DummyContext() self._registerTraverserFactory(context) logger = self._registerLogger() - self._registerRootFactory(rootfactory) + self._registerRootFactory(None) router = self._makeOne() start_response = DummyStartResponse() result = router(environ, start_response) @@ -148,8 +197,7 @@ class RouterTests(unittest.TestCase): self._registerTraverserFactory(context) environ = self._makeEnviron() start_response = DummyStartResponse() - rootfactory = make_rootfactory(NotFound()) - self._registerRootFactory(rootfactory) + self._registerRootFactory(NotFound()) router = self._makeOne() result = router(environ, start_response) status = start_response.status @@ -157,13 +205,12 @@ class RouterTests(unittest.TestCase): self.failUnless('http://localhost:8080' in result[0], result) def test_call_no_view_registered_debug_notfound_false(self): - rootfactory = make_rootfactory(None) environ = self._makeEnviron() context = DummyContext() self._registerTraverserFactory(context) logger = self._registerLogger() self._registerSettings(debug_notfound=False) - self._registerRootFactory(rootfactory) + self._registerRootFactory(None) router = self._makeOne() start_response = DummyStartResponse() result = router(environ, start_response) @@ -176,13 +223,12 @@ class RouterTests(unittest.TestCase): self.assertEqual(len(logger.messages), 0) def test_call_no_view_registered_debug_notfound_true(self): - rootfactory = make_rootfactory(None) environ = self._makeEnviron() context = DummyContext() self._registerTraverserFactory(context) self._registerSettings(debug_notfound=True) logger = self._registerLogger() - self._registerRootFactory(rootfactory) + self._registerRootFactory(None) router = self._makeOne() start_response = DummyStartResponse() result = router(environ, start_response) @@ -205,19 +251,17 @@ class RouterTests(unittest.TestCase): self.failUnless("subpath: []" in message) def test_call_view_returns_nonresponse(self): - rootfactory = make_rootfactory(None) context = DummyContext() self._registerTraverserFactory(context) environ = self._makeEnviron() view = make_view('abc') self._registerView(view, '', None, None) - self._registerRootFactory(rootfactory) + self._registerRootFactory(None) router = self._makeOne() start_response = DummyStartResponse() self.assertRaises(ValueError, router, environ, start_response) def test_call_view_registered_nonspecific_default_path(self): - rootfactory = make_rootfactory(None) context = DummyContext() self._registerTraverserFactory(context) response = DummyResponse() @@ -225,7 +269,7 @@ class RouterTests(unittest.TestCase): view = make_view(response) environ = self._makeEnviron() self._registerView(view, '', None, None) - self._registerRootFactory(rootfactory) + self._registerRootFactory(None) router = self._makeOne() start_response = DummyStartResponse() result = router(environ, start_response) @@ -238,7 +282,6 @@ class RouterTests(unittest.TestCase): self.assertEqual(environ['webob.adhoc_attrs']['root'], None) def test_call_deprecation_warning(self): - rootfactory = make_rootfactory(None) context = DummyContext() self._registerTraverserFactory(context, _deprecation_warning='abc') response = DummyResponse() @@ -246,7 +289,7 @@ class RouterTests(unittest.TestCase): view = make_view(response) environ = self._makeEnviron() self._registerView(view, '', None, None) - self._registerRootFactory(rootfactory) + self._registerRootFactory(None) router = self._makeOne() logger = self._registerLogger() router.logger = logger @@ -256,7 +299,6 @@ class RouterTests(unittest.TestCase): self.assertEqual(logger.messages[0], 'abc') def test_call_view_registered_nonspecific_nondefault_path_and_subpath(self): - rootfactory = make_rootfactory(None) context = DummyContext() self._registerTraverserFactory(context, view_name='foo', subpath=['bar'], @@ -266,7 +308,7 @@ class RouterTests(unittest.TestCase): view = make_view(response) environ = self._makeEnviron() self._registerView(view, 'foo', None, None) - self._registerRootFactory(rootfactory) + self._registerRootFactory(None) router = self._makeOne() start_response = DummyStartResponse() result = router(environ, start_response) @@ -279,7 +321,6 @@ class RouterTests(unittest.TestCase): self.assertEqual(environ['webob.adhoc_attrs']['root'], None) def test_call_view_registered_specific_success(self): - rootfactory = make_rootfactory(None) from zope.interface import Interface from zope.interface import directlyProvides class IContext(Interface): @@ -293,7 +334,7 @@ class RouterTests(unittest.TestCase): view = make_view(response) environ = self._makeEnviron() self._registerView(view, '', IContext, IRequest) - self._registerRootFactory(rootfactory) + self._registerRootFactory(None) router = self._makeOne() start_response = DummyStartResponse() result = router(environ, start_response) @@ -306,7 +347,6 @@ class RouterTests(unittest.TestCase): self.assertEqual(environ['webob.adhoc_attrs']['root'], None) def test_call_view_registered_specific_fail(self): - rootfactory = make_rootfactory(None) from zope.interface import Interface from zope.interface import directlyProvides class IContext(Interface): @@ -321,15 +361,14 @@ class RouterTests(unittest.TestCase): view = make_view(response) environ = self._makeEnviron() self._registerView(view, '', IContext, IRequest) - self._registerRootFactory(rootfactory) + self._registerRootFactory(None) router = self._makeOne() start_response = DummyStartResponse() result = router(environ, start_response) self.assertEqual(start_response.status, '404 Not Found') self.failUnless('404' in result[0]) - def test_call_view_registered_security_policy_permission_none(self): - rootfactory = make_rootfactory(None) + def test_call_view_permission_none(self): from zope.interface import Interface from zope.interface import directlyProvides class IContext(Interface): @@ -342,16 +381,14 @@ class RouterTests(unittest.TestCase): view = make_view(response) environ = self._makeEnviron() self._registerView(view, '', IContext, IRequest) - secpol = DummySecurityPolicy() - self._registerSecurityPolicy(secpol) - self._registerRootFactory(rootfactory) + self._registerRootFactory(None) router = self._makeOne() start_response = DummyStartResponse() result = router(environ, start_response) self.assertEqual(start_response.status, '200 OK') - def test_call_view_registered_security_policy_permission_succeeds(self): - rootfactory = make_rootfactory(None) + def test_call_view_no_authentication_policy_debug_authorization(self): + logger = self._registerLogger() from zope.interface import Interface from zope.interface import directlyProvides class IContext(Interface): @@ -362,21 +399,87 @@ class RouterTests(unittest.TestCase): self._registerTraverserFactory(context, subpath=['']) response = DummyResponse() view = make_view(response) - secpol = DummySecurityPolicy() - permissionfactory = make_permission_factory(True) environ = self._makeEnviron() self._registerView(view, '', IContext, IRequest) - self._registerSecurityPolicy(secpol) - self._registerPermission(permissionfactory, '', IContext, IRequest) - self._registerRootFactory(rootfactory) + self._registerRootFactory(None) router = self._makeOne() + router.debug_authorization = True start_response = DummyStartResponse() result = router(environ, start_response) self.assertEqual(start_response.status, '200 OK') - self.assertEqual(permissionfactory.checked_with, secpol) + self.assertEqual(len(logger.messages), 1) + self.failUnless('no authentication policy' in logger.messages[0]) + + def test_call_view_no_permission_registered_debug_authorization(self): + self._registerAuthenticationPolicy() + logger = self._registerLogger() + from zope.interface import Interface + from zope.interface import directlyProvides + class IContext(Interface): + pass + from repoze.bfg.interfaces import IRequest + context = DummyContext() + directlyProvides(context, IContext) + self._registerTraverserFactory(context, subpath=['']) + response = DummyResponse() + view = make_view(response) + environ = self._makeEnviron() + self._registerView(view, '', IContext, IRequest) + self._registerRootFactory(None) + router = self._makeOne() + router.debug_authorization = True + start_response = DummyStartResponse() + result = router(environ, start_response) + self.assertEqual(start_response.status, '200 OK') + self.assertEqual(len(logger.messages), 1) + self.failUnless('no permission registered' in logger.messages[0]) + + def test_call_view_no_permission_registered_no_debug(self): + self._registerAuthenticationPolicy() + logger = self._registerLogger() + from zope.interface import Interface + from zope.interface import directlyProvides + class IContext(Interface): + pass + from repoze.bfg.interfaces import IRequest + context = DummyContext() + directlyProvides(context, IContext) + self._registerTraverserFactory(context, subpath=['']) + response = DummyResponse() + view = make_view(response) + environ = self._makeEnviron() + self._registerView(view, '', IContext, IRequest) + self._registerRootFactory(None) + router = self._makeOne() + router.debug_authorization = False + start_response = DummyStartResponse() + result = router(environ, start_response) + self.assertEqual(start_response.status, '200 OK') + self.assertEqual(len(logger.messages), 0) + + def test_call_view_permission_succeeds(self): + from zope.interface import Interface + from zope.interface import directlyProvides + class IContext(Interface): + pass + from repoze.bfg.interfaces import IRequest + context = DummyContext() + directlyProvides(context, IContext) + self._registerTraverserFactory(context, subpath=['']) + self._registerAuthenticationPolicy() + response = DummyResponse() + view = make_view(response) + environ = self._makeEnviron() + self._registerView(view, '', IContext, IRequest) + checker = self._registerViewPermission('', True) + self._registerRootFactory(None) + router = self._makeOne() + start_response = DummyStartResponse() + result = router(environ, start_response) + self.assertEqual(start_response.status, '200 OK') + self.assertEqual(checker.context, context) def test_call_view_permission_fails_nosettings(self): - rootfactory = make_rootfactory(None) from zope.interface import Interface from zope.interface import directlyProvides class IContext(Interface): @@ -385,28 +488,24 @@ class RouterTests(unittest.TestCase): context = DummyContext() directlyProvides(context, IContext) self._registerTraverserFactory(context, subpath=['']) + self._registerAuthenticationPolicy() response = DummyResponse() view = make_view(response) - secpol = DummySecurityPolicy() from repoze.bfg.security import ACLDenied - permissionfactory = make_permission_factory( - ACLDenied('ace', 'acl', 'permission', ['principals'], context) - ) + denied = ACLDenied('ace', 'acl', 'permission', ['principals'], context) environ = self._makeEnviron() self._registerView(view, '', IContext, IRequest) - self._registerSecurityPolicy(secpol) - self._registerPermission(permissionfactory, '', IContext, IRequest) - self._registerRootFactory(rootfactory) + checker = self._registerViewPermission('', denied) + self._registerRootFactory(None) router = self._makeOne() start_response = DummyStartResponse() result = router(environ, start_response) self.assertEqual(start_response.status, '401 Unauthorized') - message = result[0] - self.failUnless('failed security policy check' in message) - self.assertEqual(permissionfactory.checked_with, secpol) + message = environ['repoze.bfg.message'] + self.assertEqual(message, 'Unauthorized: failed security policy check') + self.assertEqual(checker.context, context) def test_call_view_permission_fails_no_debug_auth(self): - rootfactory = make_rootfactory(None) from zope.interface import Interface from zope.interface import directlyProvides class IContext(Interface): @@ -415,29 +514,25 @@ class RouterTests(unittest.TestCase): context = DummyContext() directlyProvides(context, IContext) self._registerTraverserFactory(context, subpath=['']) + self._registerAuthenticationPolicy() response = DummyResponse() view = make_view(response) - secpol = DummySecurityPolicy() from repoze.bfg.security import ACLDenied - permissionfactory = make_permission_factory( - ACLDenied('ace', 'acl', 'permission', ['principals'], context) - ) + denied = ACLDenied('ace', 'acl', 'permission', ['principals'], context) environ = self._makeEnviron() self._registerView(view, '', IContext, IRequest) - self._registerSecurityPolicy(secpol) - self._registerPermission(permissionfactory, '', IContext, IRequest) + checker = self._registerViewPermission('', denied) self._registerSettings(debug_authorization=False) - self._registerRootFactory(rootfactory) + self._registerRootFactory(None) router = self._makeOne() start_response = DummyStartResponse() result = router(environ, start_response) self.assertEqual(start_response.status, '401 Unauthorized') - message = result[0] + message = environ['repoze.bfg.message'] self.failUnless('failed security policy check' in message) - self.assertEqual(permissionfactory.checked_with, secpol) + self.assertEqual(checker.context, context) def test_call_view_permission_fails_with_debug_auth(self): - rootfactory = make_rootfactory(None) from zope.interface import Interface from zope.interface import directlyProvides class IContext(Interface): @@ -445,31 +540,28 @@ class RouterTests(unittest.TestCase): from repoze.bfg.interfaces import IRequest context = DummyContext() directlyProvides(context, IContext) + self._registerAuthenticationPolicy() self._registerTraverserFactory(context, subpath=['']) response = DummyResponse() view = make_view(response) - secpol = DummySecurityPolicy() from repoze.bfg.security import ACLDenied - permissionfactory = make_permission_factory( - ACLDenied('ace', 'acl', 'permission', ['principals'], context) - ) environ = self._makeEnviron() self._registerView(view, '', IContext, IRequest) - self._registerSecurityPolicy(secpol) - self._registerPermission(permissionfactory, '', IContext, IRequest) + allowed = ACLDenied('ace', 'acl', 'permission', ['principals'], context) + checker = self._registerViewPermission('', allowed) self._registerSettings(debug_authorization=True) logger = self._registerLogger() - self._registerRootFactory(rootfactory) + self._registerRootFactory(None) router = self._makeOne() start_response = DummyStartResponse() result = router(environ, start_response) self.assertEqual(start_response.status, '401 Unauthorized') - message = result[0] + message = environ['repoze.bfg.message'] self.failUnless( "ACLDenied permission 'permission' via ACE 'ace' in ACL 'acl' " "on context" in message) self.failUnless("for principals ['principals']" in message) - self.assertEqual(permissionfactory.checked_with, secpol) + self.assertEqual(checker.context, context) self.assertEqual(len(logger.messages), 1) logged = logger.messages[0] self.failUnless( @@ -482,7 +574,6 @@ class RouterTests(unittest.TestCase): "for principals ['principals']" in logged) def test_call_eventsends(self): - rootfactory = make_rootfactory(None) context = DummyContext() self._registerTraverserFactory(context) response = DummyResponse() @@ -494,7 +585,7 @@ class RouterTests(unittest.TestCase): from repoze.bfg.interfaces import INewResponse request_events = self._registerEventListener(INewRequest) response_events = self._registerEventListener(INewResponse) - self._registerRootFactory(rootfactory) + self._registerRootFactory(None) router = self._makeOne() start_response = DummyStartResponse() result = router(environ, start_response) @@ -503,12 +594,27 @@ class RouterTests(unittest.TestCase): self.assertEqual(len(response_events), 1) self.assertEqual(response_events[0].response, response) + def test_call_pushes_and_pops_threadlocal_manager(self): + context = DummyContext() + self._registerTraverserFactory(context) + response = DummyResponse() + response.app_iter = ['Hello world'] + view = make_view(response) + environ = self._makeEnviron() + self._registerView(view, '', None, None) + self._registerRootFactory(None) + router = self._makeOne() + start_response = DummyStartResponse() + router.threadlocal_manager = DummyThreadLocalManager() + result = router(environ, start_response) + self.assertEqual(len(router.threadlocal_manager.pushed), 1) + self.assertEqual(len(router.threadlocal_manager.popped), 1) + def test_call_post_method(self): from repoze.bfg.interfaces import INewRequest from repoze.bfg.interfaces import IPOSTRequest from repoze.bfg.interfaces import IPUTRequest from repoze.bfg.interfaces import IRequest - rootfactory = make_rootfactory(None) context = DummyContext() self._registerTraverserFactory(context) response = DummyResponse() @@ -516,7 +622,7 @@ class RouterTests(unittest.TestCase): view = make_view(response) environ = self._makeEnviron(REQUEST_METHOD='POST') self._registerView(view, '', None, None) - self._registerRootFactory(rootfactory) + self._registerRootFactory(None) router = self._makeOne() start_response = DummyStartResponse() request_events = self._registerEventListener(INewRequest) @@ -531,7 +637,6 @@ class RouterTests(unittest.TestCase): from repoze.bfg.interfaces import IPUTRequest from repoze.bfg.interfaces import IPOSTRequest from repoze.bfg.interfaces import IRequest - rootfactory = make_rootfactory(None) context = DummyContext() self._registerTraverserFactory(context) response = DummyResponse() @@ -539,7 +644,7 @@ class RouterTests(unittest.TestCase): view = make_view(response) environ = self._makeEnviron(REQUEST_METHOD='PUT') self._registerView(view, '', None, None) - self._registerRootFactory(rootfactory) + self._registerRootFactory(None) router = self._makeOne() start_response = DummyStartResponse() request_events = self._registerEventListener(INewRequest) @@ -552,7 +657,6 @@ class RouterTests(unittest.TestCase): def test_call_unknown_method(self): from repoze.bfg.interfaces import INewRequest from repoze.bfg.interfaces import IRequest - rootfactory = make_rootfactory(None) context = DummyContext() self._registerTraverserFactory(context) response = DummyResponse() @@ -560,7 +664,7 @@ class RouterTests(unittest.TestCase): view = make_view(response) environ = self._makeEnviron(REQUEST_METHOD='UNKNOWN') self._registerView(view, '', None, None) - self._registerRootFactory(rootfactory) + self._registerRootFactory(None) router = self._makeOne() start_response = DummyStartResponse() request_events = self._registerEventListener(INewRequest) @@ -573,7 +677,6 @@ class RouterTests(unittest.TestCase): from repoze.bfg.interfaces import IRequestFactory from repoze.bfg.testing import DummyRequest self.registry.registerUtility(DummyRequest, IRequestFactory) - rootfactory = make_rootfactory(None) context = DummyContext() self._registerTraverserFactory(context) response = DummyResponse() @@ -581,7 +684,7 @@ class RouterTests(unittest.TestCase): view = make_view(response) environ = self._makeEnviron() self._registerView(view, '', None, None) - self._registerRootFactory(rootfactory) + self._registerRootFactory(None) router = self._makeOne() start_response = DummyStartResponse() request_events = self._registerEventListener(INewRequest) @@ -593,54 +696,19 @@ class RouterTests(unittest.TestCase): self.assertEqual(request.view_name, '') self.assertEqual(request.subpath, []) - def test_call_inotfound_appfactory_override(self): - from repoze.bfg.interfaces import INotFoundAppFactory - def app(): - """ """ - self.registry.registerUtility(app, INotFoundAppFactory) - rootfactory = make_rootfactory(None) - context = DummyContext() - self._registerTraverserFactory(context) - response = DummyResponse() - response.app_iter = ['Hello world'] - view = make_view(response) - environ = self._makeEnviron() - self._registerView(view, '', None, None) - self._registerRootFactory(rootfactory) - router = self._makeOne() - self.assertEqual(router.notfound_app_factory, app) - - def test_call_iunauth_appfactory_override(self): - from repoze.bfg.interfaces import IUnauthorizedAppFactory - def app(): - """ """ - self.registry.registerUtility(app, IUnauthorizedAppFactory) - rootfactory = make_rootfactory(None) - context = DummyContext() - self._registerTraverserFactory(context) - response = DummyResponse() - response.app_iter = ['Hello world'] - view = make_view(response) - environ = self._makeEnviron() - self._registerView(view, '', None, None) - self._registerRootFactory(rootfactory) - router = self._makeOne() - self.assertEqual(router.unauth_app_factory, app) - class MakeAppTests(unittest.TestCase): def setUp(self): cleanUp() import repoze.bfg.router - self.old_registry_manager = repoze.bfg.router.registry_manager + self.old_tl_manager = repoze.bfg.router.manager self.regmgr = DummyRegistryManager() - repoze.bfg.router.registry_manager = self.regmgr + repoze.bfg.router.manager = self.regmgr def tearDown(self): cleanUp() import repoze.bfg.router - repoze.bfg.router.registry_manager = self.old_registry_manager + repoze.bfg.router.threadlocal_manager = self.old_tl_manager - def _callFUT(self, *arg, **kw): from repoze.bfg.router import make_app return make_app(*arg, **kw) @@ -729,6 +797,73 @@ class MakeAppTests(unittest.TestCase): self.assertRaises(ValueError, self._callFUT, None, fixtureapp, options=options) + def test_authorization_policy_no_authentication_policy(self): + from repoze.bfg.interfaces import IAuthorizationPolicy + authzpolicy = DummyContext() + from repoze.bfg.tests import routesapp + app = self._callFUT(None, routesapp, authorization_policy=authzpolicy) + self.failIf(app.registry.queryUtility(IAuthorizationPolicy)) + + def test_authentication_policy_no_authorization_policy(self): + from repoze.bfg.interfaces import IAuthorizationPolicy + from repoze.bfg.interfaces import IAuthenticationPolicy + from repoze.bfg.authorization import ACLAuthorizationPolicy + authnpolicy = DummyContext() + from repoze.bfg.tests import routesapp + app = self._callFUT(None, routesapp, authentication_policy=authnpolicy) + self.assertEqual(app.registry.getUtility(IAuthenticationPolicy), + authnpolicy) + self.assertEqual( + app.registry.getUtility(IAuthorizationPolicy).__class__, + ACLAuthorizationPolicy) + + def test_authentication_policy_and_authorization_policy(self): + from repoze.bfg.interfaces import IAuthorizationPolicy + from repoze.bfg.interfaces import IAuthenticationPolicy + authnpolicy = DummyContext() + authzpolicy = DummyContext() + from repoze.bfg.tests import routesapp + app = self._callFUT(None, routesapp, authentication_policy=authnpolicy, + authorization_policy = authzpolicy) + self.assertEqual(app.registry.getUtility(IAuthenticationPolicy), + authnpolicy) + self.assertEqual(app.registry.getUtility(IAuthorizationPolicy), + authzpolicy) + + def test_secpol_BBB_registrations(self): + from repoze.bfg.interfaces import IAuthorizationPolicy + from repoze.bfg.interfaces import IAuthenticationPolicy + from repoze.bfg.interfaces import ISecurityPolicy + secpol = DummySecurityPolicy() + from zope.component import getGlobalSiteManager + gsm = getGlobalSiteManager() + gsm.registerUtility(secpol, ISecurityPolicy) + from repoze.bfg.tests import routesapp + logger = DummyLogger() + app = self._callFUT(None, routesapp, registry=gsm, debug_logger=logger) + self.failUnless(app.registry.queryUtility(IAuthenticationPolicy)) + self.failUnless(app.registry.queryUtility(IAuthorizationPolicy)) + self.assertEqual(len(logger.messages), 1) + self.failUnless('ISecurityPolicy' in logger.messages[0]) + +class TestDefaultForbiddenView(unittest.TestCase): + def _callFUT(self, context, request): + from repoze.bfg.router import default_forbidden_view + return default_forbidden_view(context, request) + + def test_nomessage(self): + request = DummyRequest({}) + context = DummyContext() + response = self._callFUT(context, request) + self.failUnless('<code></code>' in response.body) + + def test_withmessage(self): + request = DummyRequest({'repoze.bfg.message':'abc&123'}) + context = DummyContext() + response = self._callFUT(context, request) + self.failUnless('<code>abc&123</code>' in response.body) + + class DummyRegistryManager: def push(self, registry): self.pushed = True @@ -744,18 +879,6 @@ def make_view(response): return response return view -def make_permission_factory(result): - class DummyPermissionFactory: - def __init__(self, context, request): - self.context = context - self.request = request - - def __call__(self, secpol): - self.__class__.checked_with = secpol - return result - - return DummyPermissionFactory - def make_rootfactory(root): def rootpolicy(environ): return root @@ -776,3 +899,29 @@ class DummyResponse: class DummySecurityPolicy: pass +class DummyRequest: + def __init__(self, environ): + self.environ = environ + + +class DummyLogger: + def __init__(self): + self.messages = [] + def info(self, msg): + self.messages.append(msg) + warn = info + debug = info + +class DummyThreadLocalManager: + def __init__(self): + self.pushed = [] + self.popped = [] + + def push(self, val): + self.pushed.append(val) + + def pop(self): + self.popped.append(True) + +class DummyAuthenticationPolicy: + pass diff --git a/repoze/bfg/tests/test_secpols.py b/repoze/bfg/tests/test_secpols.py new file mode 100644 index 000000000..2b0449e89 --- /dev/null +++ b/repoze/bfg/tests/test_secpols.py @@ -0,0 +1,766 @@ +import unittest + +from repoze.bfg.testing import cleanUp + +class TestAPIFunctionsSecpolBBB(unittest.TestCase): + def setUp(self): + cleanUp() + + def tearDown(self): + cleanUp() + try: + del globals()['__warningregistry__'] + except KeyError: + pass + + def _testWithWarnings(self, f, *args, **kw): + messages = [] + def showwarning(message, category, filename, lineno, file=None): + messages.append(message) + try: + import warnings + _old_showwarning = warnings.showwarning + warnings.showwarning = showwarning + result = f(*args, **kw) + return result, messages + finally: + warnings.showwarning = _old_showwarning + + def _registerSecurityPolicy(self, secpol): + import zope.component + from repoze.bfg.secpols import registerBBBAuthn + gsm = zope.component.getGlobalSiteManager() + registerBBBAuthn(secpol, gsm) + + def test_has_permission_registered(self): + secpol = DummySecurityPolicy(False) + self._registerSecurityPolicy(secpol) + from repoze.bfg.security import has_permission + self.assertEqual(has_permission('view', None, None), False) + + def test_has_permission_not_registered(self): + from repoze.bfg.security import has_permission + result = has_permission('view', None, None) + self.assertEqual(result, True) + self.assertEqual(result.msg, 'No authentication policy in use.') + + def test_authenticated_userid_registered(self): + secpol = DummySecurityPolicy(False) + self._registerSecurityPolicy(secpol) + from repoze.bfg.security import authenticated_userid + request = DummyRequest({}) + result, warnings = self._testWithWarnings(authenticated_userid, + request) + self.assertEqual(result, 'fred') + self.assertEqual(len(warnings), 1) + + def test_authenticated_userid_not_registered(self): + from repoze.bfg.security import authenticated_userid + request = DummyRequest({}) + result, warnings = self._testWithWarnings(authenticated_userid, + request) + self.assertEqual(result, None) + self.assertEqual(len(warnings), 1) + + def test_authenticated_userid_too_many_args(self): + from repoze.bfg.security import authenticated_userid + self.assertRaises(TypeError, authenticated_userid, None, None, None) + + def test_effective_principals_registered(self): + secpol = DummySecurityPolicy(False) + self._registerSecurityPolicy(secpol) + from repoze.bfg.security import effective_principals + request = DummyRequest({}) + result, warnings = self._testWithWarnings(effective_principals, request) + self.assertEqual(result, ['fred', 'bob']) + self.assertEqual(len(warnings), 1) + + def test_effective_principals_not_registered(self): + from repoze.bfg.security import effective_principals + request = DummyRequest({}) + result, warnings = self._testWithWarnings(effective_principals, request) + self.assertEqual(result, []) + self.assertEqual(len(warnings), 1) + + def test_effective_principals_too_many_args(self): + from repoze.bfg.security import effective_principals + self.assertRaises(TypeError, effective_principals, None, None, None) + + + def test_principals_allowed_by_permission_not_registered(self): + from repoze.bfg.security import principals_allowed_by_permission + from repoze.bfg.security import Everyone + self.assertEqual(principals_allowed_by_permission(None, None), + [Everyone]) + + def test_principals_allowed_by_permission_registered(self): + secpol = DummySecurityPolicy(False) + self._registerSecurityPolicy(secpol) + from repoze.bfg.security import principals_allowed_by_permission + self.assertEqual(principals_allowed_by_permission(None, None), + ['fred', 'bob']) + + +class TestACLSecurityPolicy(unittest.TestCase): + def setUp(self): + cleanUp() + + def tearDown(self): + cleanUp() + + def _getTargetClass(self): + from repoze.bfg.secpols import ACLSecurityPolicy + return ACLSecurityPolicy + + def _makeOne(self, *arg, **kw): + klass = self._getTargetClass() + return klass(*arg, **kw) + + def test_class_implements_ISecurityPolicy(self): + from zope.interface.verify import verifyClass + from repoze.bfg.interfaces import ISecurityPolicy + verifyClass(ISecurityPolicy, self._getTargetClass()) + + def test_instance_implements_ISecurityPolicy(self): + from zope.interface.verify import verifyObject + from repoze.bfg.interfaces import ISecurityPolicy + verifyObject(ISecurityPolicy, self._makeOne(lambda *arg: None)) + + def test_permits_no_principals_no_acl_info_on_context(self): + context = DummyContext() + request = DummyRequest({}) + policy = self._makeOne(lambda *arg: []) + result = policy.permits(context, request, 'view') + self.assertEqual(result, False) + from repoze.bfg.security import Everyone + self.assertEqual(result.principals, set([Everyone])) + self.assertEqual(result.permission, 'view') + self.assertEqual(result.context, context) + + def test_permits_no_principals_empty_acl_info_on_context(self): + context = DummyContext() + context.__acl__ = [] + request = DummyRequest({}) + policy = self._makeOne(lambda *arg: []) + result = policy.permits(context, request, 'view') + self.assertEqual(result, False) + from repoze.bfg.security import Everyone + self.assertEqual(result.principals, set([Everyone])) + self.assertEqual(result.permission, 'view') + self.assertEqual(result.context, context) + + def test_permits_no_principals_root_has_empty_acl_info(self): + context = DummyContext() + context.__name__ = None + context.__parent__ = None + context.__acl__ = [] + context2 = DummyContext() + context2.__name__ = 'context2' + context2.__parent__ = context + request = DummyRequest({}) + policy = self._makeOne(lambda *arg: []) + result = policy.permits(context, request, 'view') + self.assertEqual(result, False) + from repoze.bfg.security import Everyone + self.assertEqual(result.principals, set([Everyone])) + self.assertEqual(result.permission, 'view') + self.assertEqual(result.context, context) + + def test_permits_no_principals_root_allows_everyone(self): + context = DummyContext() + context.__name__ = None + context.__parent__ = None + from repoze.bfg.security import Allow, Everyone + context.__acl__ = [ (Allow, Everyone, 'view') ] + context2 = DummyContext() + context2.__name__ = 'context2' + context2.__parent__ = context + request = DummyRequest({}) + policy = self._makeOne(lambda *arg: []) + result = policy.permits(context, request, 'view') + self.assertEqual(result, True) + self.assertEqual(result.principals, set([Everyone])) + self.assertEqual(result.permission, 'view') + self.assertEqual(result.context, context) + + def test_permits_deny_implicit(self): + from repoze.bfg.security import Allow, Authenticated, Everyone + context = DummyContext() + context.__acl__ = [ (Allow, 'somebodyelse', 'read') ] + policy = self._makeOne(lambda *arg: ['fred']) + request = DummyRequest({}) + result = policy.permits(context, request, 'read') + self.assertEqual(result, False) + self.assertEqual(result.principals, + set(['fred', Authenticated, Everyone])) + self.assertEqual(result.permission, 'read') + self.assertEqual(result.context, context) + self.assertEqual(result.ace, None) + + def test_permits_deny_explicit(self): + from repoze.bfg.security import Deny, Authenticated, Everyone + context = DummyContext() + context.__acl__ = [ (Deny, 'fred', 'read') ] + policy = self._makeOne(lambda *arg: ['fred']) + request = DummyRequest({}) + result = policy.permits(context, request, 'read') + self.assertEqual(result, False) + self.assertEqual(result.principals, + set(['fred', Authenticated, Everyone])) + self.assertEqual(result.permission, 'read') + self.assertEqual(result.context, context) + self.assertEqual(result.ace, (Deny, 'fred', 'read')) + + def test_permits_deny_twoacl_implicit(self): + from repoze.bfg.security import Allow, Authenticated, Everyone + context = DummyContext() + acl = [(Allow, 'somebody', 'view'), (Allow, 'somebody', 'write')] + context.__acl__ = acl + policy = self._makeOne(lambda *arg: ['fred']) + request = DummyRequest({}) + result = policy.permits(context, request, 'read') + self.assertEqual(result, False) + self.assertEqual(result.principals, + set(['fred', Authenticated, Everyone])) + self.assertEqual(result.permission, 'read') + self.assertEqual(result.context, context) + self.assertEqual(result.ace, None) + + def test_permits_allow_twoacl_multiperm(self): + from repoze.bfg.security import Allow, Deny, Authenticated, Everyone + context = DummyContext() + acl = [ (Allow, 'fred', ('write', 'view') ), (Deny, 'fred', 'view') ] + context.__acl__ = acl + policy = self._makeOne(lambda *arg: ['fred']) + request = DummyRequest({}) + result = policy.permits(context, request, 'view') + self.assertEqual(result, True) + self.assertEqual(result.principals, + set(['fred', Authenticated, Everyone])) + self.assertEqual(result.permission, 'view') + self.assertEqual(result.context, context) + self.assertEqual(result.ace, (Allow, 'fred', ('write', 'view') )) + + def test_permits_deny_twoacl_multiperm(self): + from repoze.bfg.security import Allow, Deny, Authenticated, Everyone + context = DummyContext() + acl = [] + deny = (Deny, 'fred', ('view', 'read')) + allow = (Allow, 'fred', 'view') + context.__acl__ = [deny, allow] + policy = self._makeOne(lambda *arg: ['fred']) + request = DummyRequest({}) + result = policy.permits(context, request, 'read') + self.assertEqual(result, False) + self.assertEqual(result.principals, + set(['fred', Authenticated, Everyone])) + self.assertEqual(result.permission, 'read') + self.assertEqual(result.context, context) + self.assertEqual(result.ace, deny) + + def test_permits_allow_via_location_parent(self): + from repoze.bfg.security import Allow, Authenticated, Everyone + context = DummyContext() + context.__parent__ = None + context.__name__ = None + context.__acl__ = [ (Allow, 'fred', 'read') ] + context2 = DummyContext() + context2.__parent__ = context + context2.__name__ = 'myname' + + policy = self._makeOne(lambda *arg: ['fred']) + request = DummyRequest({}) + result = policy.permits(context2, request, 'read') + self.assertEqual(result, True) + self.assertEqual(result.principals, + set(['fred', Authenticated, Everyone])) + self.assertEqual(result.permission, 'read') + self.assertEqual(result.context, context) + self.assertEqual(result.ace, ('Allow', 'fred', 'read')) + + def test_permits_deny_byorder(self): + from repoze.bfg.security import Allow, Deny, Authenticated, Everyone + context = DummyContext() + acl = [] + deny = (Deny, 'fred', 'read') + allow = (Allow, 'fred', 'view') + context.__acl__ = [deny, allow] + policy = self._makeOne(lambda *arg: ['fred']) + request = DummyRequest({}) + result = policy.permits(context, request, 'read') + self.assertEqual(result, False) + self.assertEqual(result.principals, + set(['fred', Authenticated, Everyone])) + self.assertEqual(result.permission, 'read') + self.assertEqual(result.context, context) + self.assertEqual(result.ace, deny) + + def test_permits_allow_byorder(self): + from repoze.bfg.security import Allow, Deny, Authenticated, Everyone + context = DummyContext() + acl = [] + deny = (Deny, 'fred', ('view', 'read')) + allow = (Allow, 'fred', 'view') + context.__acl__ = [allow, deny] + policy = self._makeOne(lambda *arg: ['fred']) + request = DummyRequest({}) + result = policy.permits(context, request, 'view') + self.assertEqual(result, True) + self.assertEqual(result.principals, + set(['fred', Authenticated, Everyone])) + self.assertEqual(result.permission, 'view') + self.assertEqual(result.context, context) + self.assertEqual(result.ace, allow) + + def test_principals_allowed_by_permission_direct(self): + from repoze.bfg.security import Allow + context = DummyContext() + acl = [ (Allow, 'chrism', ('read', 'write')), + (Allow, 'other', 'read') ] + context.__acl__ = acl + policy = self._makeOne(lambda *arg: None) + result = policy.principals_allowed_by_permission(context, 'read') + self.assertEqual(result, ['chrism', 'other']) + + def test_principals_allowed_by_permission_acquired(self): + from repoze.bfg.security import Allow + context = DummyContext() + acl = [ (Allow, 'chrism', ('read', 'write')), + (Allow, 'other', ('read',)) ] + context.__acl__ = acl + context.__parent__ = None + context.__name__ = 'context' + inter = DummyContext() + inter.__name__ = None + inter.__parent__ = context + policy = self._makeOne(lambda *arg: None) + result = policy.principals_allowed_by_permission(inter, 'read') + self.assertEqual(result, ['chrism', 'other']) + + def test_principals_allowed_by_permission_no_acls(self): + policy = self._makeOne(lambda *arg: None) + result = policy.principals_allowed_by_permission(None, 'read') + self.assertEqual(result, []) + +class TestInheritingACLSecurityPolicy(unittest.TestCase): + def setUp(self): + cleanUp() + + def tearDown(self): + cleanUp() + + def _getTargetClass(self): + from repoze.bfg.secpols import InheritingACLSecurityPolicy + return InheritingACLSecurityPolicy + + def _makeOne(self, *arg, **kw): + klass = self._getTargetClass() + return klass(*arg, **kw) + + def test_class_implements_ISecurityPolicy(self): + from zope.interface.verify import verifyClass + from repoze.bfg.interfaces import ISecurityPolicy + verifyClass(ISecurityPolicy, self._getTargetClass()) + + def test_instance_implements_ISecurityPolicy(self): + from zope.interface.verify import verifyObject + from repoze.bfg.interfaces import ISecurityPolicy + verifyObject(ISecurityPolicy, self._makeOne(lambda *arg: None)) + + def test_permits(self): + from repoze.bfg.security import Deny + from repoze.bfg.security import Allow + from repoze.bfg.security import Everyone + from repoze.bfg.security import Authenticated + from repoze.bfg.security import ALL_PERMISSIONS + from repoze.bfg.security import DENY_ALL + policy = self._makeOne(lambda *arg: []) + root = DummyContext() + community = DummyContext(__name__='community', __parent__=root) + blog = DummyContext(__name__='blog', __parent__=community) + root.__acl__ = [ + (Allow, Authenticated, VIEW), + ] + community.__acl__ = [ + (Allow, 'fred', ALL_PERMISSIONS), + (Allow, 'wilma', VIEW), + DENY_ALL, + ] + blog.__acl__ = [ + (Allow, 'barney', MEMBER_PERMS), + (Allow, 'wilma', VIEW), + ] + policy = self._makeOne(lambda request: request.principals) + request = DummyRequest({}) + + request.principals = ['wilma'] + result = policy.permits(blog, request, 'view') + self.assertEqual(result, True) + self.assertEqual(result.context, blog) + self.assertEqual(result.ace, (Allow, 'wilma', VIEW)) + result = policy.permits(blog, request, 'delete') + self.assertEqual(result, False) + self.assertEqual(result.context, community) + self.assertEqual(result.ace, (Deny, Everyone, ALL_PERMISSIONS)) + + request.principals = ['fred'] + result = policy.permits(blog, request, 'view') + self.assertEqual(result, True) + self.assertEqual(result.context, community) + self.assertEqual(result.ace, (Allow, 'fred', ALL_PERMISSIONS)) + result = policy.permits(blog, request, 'doesntevenexistyet') + self.assertEqual(result, True) + self.assertEqual(result.context, community) + self.assertEqual(result.ace, (Allow, 'fred', ALL_PERMISSIONS)) + + request.principals = ['barney'] + result = policy.permits(blog, request, 'view') + self.assertEqual(result, True) + self.assertEqual(result.context, blog) + self.assertEqual(result.ace, (Allow, 'barney', MEMBER_PERMS)) + result = policy.permits(blog, request, 'administer') + self.assertEqual(result, False) + self.assertEqual(result.context, community) + self.assertEqual(result.ace, (Deny, Everyone, ALL_PERMISSIONS)) + + request.principals = ['someguy'] + result = policy.permits(root, request, 'view') + self.assertEqual(result, True) + self.assertEqual(result.context, root) + self.assertEqual(result.ace, (Allow, Authenticated, VIEW)) + result = policy.permits(blog, request, 'view') + self.assertEqual(result, False) + self.assertEqual(result.context, community) + self.assertEqual(result.ace, (Deny, Everyone, ALL_PERMISSIONS)) + + request.principals = [] + result = policy.permits(root, request, 'view') + self.assertEqual(result, False) + self.assertEqual(result.context, root) + self.assertEqual(result.ace, None) + + request.principals = [] + context = DummyContext() + result = policy.permits(context, request, 'view') + self.assertEqual(result, False) + + def test_principals_allowed_by_permission_direct(self): + from repoze.bfg.security import Allow + from repoze.bfg.security import DENY_ALL + context = DummyContext() + acl = [ (Allow, 'chrism', ('read', 'write')), + DENY_ALL, + (Allow, 'other', 'read') ] + context.__acl__ = acl + policy = self._makeOne(lambda *arg: None) + result = sorted( + policy.principals_allowed_by_permission(context, 'read')) + self.assertEqual(result, ['chrism']) + + def test_principals_allowed_by_permission(self): + from repoze.bfg.security import Allow + from repoze.bfg.security import Deny + from repoze.bfg.security import DENY_ALL + from repoze.bfg.security import ALL_PERMISSIONS + root = DummyContext(__name__='', __parent__=None) + community = DummyContext(__name__='community', __parent__=root) + blog = DummyContext(__name__='blog', __parent__=community) + root.__acl__ = [ (Allow, 'chrism', ('read', 'write')), + (Allow, 'other', ('read',)), + (Allow, 'jim', ALL_PERMISSIONS)] + community.__acl__ = [ (Deny, 'flooz', 'read'), + (Allow, 'flooz', 'read'), + (Allow, 'mork', 'read'), + (Deny, 'jim', 'read'), + (Allow, 'someguy', 'manage')] + blog.__acl__ = [ (Allow, 'fred', 'read'), + DENY_ALL] + + policy = self._makeOne(lambda *arg: None) + result = sorted(policy.principals_allowed_by_permission(blog, 'read')) + self.assertEqual(result, ['fred']) + result = sorted(policy.principals_allowed_by_permission(community, + 'read')) + self.assertEqual(result, ['chrism', 'mork', 'other']) + result = sorted(policy.principals_allowed_by_permission(community, + 'read')) + result = sorted(policy.principals_allowed_by_permission(root, 'read')) + self.assertEqual(result, ['chrism', 'jim', 'other']) + + def test_principals_allowed_by_permission_no_acls(self): + policy = self._makeOne(lambda *arg: None) + context = DummyContext() + result = sorted(policy.principals_allowed_by_permission(context,'read')) + self.assertEqual(result, []) + + def test_effective_principals(self): + context = DummyContext() + request = DummyRequest({}) + request.principals = ['fred'] + policy = self._makeOne(lambda request: request.principals) + result = sorted(policy.effective_principals(request)) + from repoze.bfg.security import Everyone + from repoze.bfg.security import Authenticated + self.assertEqual(result, + ['fred', Authenticated, Everyone]) + + def test_no_effective_principals(self): + context = DummyContext() + request = DummyRequest({}) + request.principals = [] + policy = self._makeOne(lambda request: request.principals) + result = sorted(policy.effective_principals(request)) + from repoze.bfg.security import Everyone + self.assertEqual(result, [Everyone]) + + def test_authenticated_userid(self): + context = DummyContext() + request = DummyRequest({}) + request.principals = ['fred'] + policy = self._makeOne(lambda request: request.principals) + result = policy.authenticated_userid(request) + self.assertEqual(result, 'fred') + + def test_no_authenticated_userid(self): + context = DummyContext() + request = DummyRequest({}) + request.principals = [] + policy = self._makeOne(lambda request: request.principals) + result = policy.authenticated_userid(request) + self.assertEqual(result, None) + +class TestRemoteUserACLSecurityPolicy(unittest.TestCase): + def setUp(self): + cleanUp() + + def tearDown(self): + cleanUp() + + def _getTargetClass(self): + from repoze.bfg.secpols import RemoteUserACLSecurityPolicy + return RemoteUserACLSecurityPolicy + + def _makeOne(self, *arg, **kw): + klass = self._getTargetClass() + return klass(*arg, **kw) + + def test_instance_implements_ISecurityPolicy(self): + from zope.interface.verify import verifyObject + from repoze.bfg.interfaces import ISecurityPolicy + verifyObject(ISecurityPolicy, self._makeOne()) + + def test_authenticated_userid(self): + context = DummyContext() + request = DummyRequest({'REMOTE_USER':'fred'}) + policy = self._makeOne() + result = policy.authenticated_userid(request) + self.assertEqual(result, 'fred') + + def test_authenticated_userid_no_remote_user(self): + context = DummyContext() + request = DummyRequest({}) + policy = self._makeOne() + result = policy.authenticated_userid(request) + self.assertEqual(result, None) + + def test_effective_principals(self): + context = DummyContext() + request = DummyRequest({'REMOTE_USER':'fred'}) + policy = self._makeOne() + result = policy.effective_principals(request) + from repoze.bfg.security import Everyone + from repoze.bfg.security import Authenticated + self.assertEqual(result, [Everyone, Authenticated, 'fred']) + + def test_effective_principals_no_remote_user(self): + context = DummyContext() + request = DummyRequest({}) + policy = self._makeOne() + result = policy.effective_principals(request) + from repoze.bfg.security import Everyone + self.assertEqual(result, [Everyone]) + +class TestRemoteUserInheritingACLSecurityPolicy(TestRemoteUserACLSecurityPolicy): + def _getTargetClass(self): + from repoze.bfg.secpols import RemoteUserInheritingACLSecurityPolicy + return RemoteUserInheritingACLSecurityPolicy + +class TestWhoACLSecurityPolicy(unittest.TestCase): + def setUp(self): + cleanUp() + + def tearDown(self): + cleanUp() + + def _getTargetClass(self): + from repoze.bfg.secpols import WhoACLSecurityPolicy + return WhoACLSecurityPolicy + + def _makeOne(self, *arg, **kw): + klass = self._getTargetClass() + return klass(*arg, **kw) + + def test_instance_implements_ISecurityPolicy(self): + from zope.interface.verify import verifyObject + from repoze.bfg.interfaces import ISecurityPolicy + verifyObject(ISecurityPolicy, self._makeOne()) + + def test_authenticated_userid(self): + context = DummyContext() + identity = {'repoze.who.identity':{'repoze.who.userid':'fred'}} + request = DummyRequest(identity) + policy = self._makeOne() + result = policy.authenticated_userid(request) + self.assertEqual(result, 'fred') + + def test_authenticated_userid_no_who_ident(self): + context = DummyContext() + request = DummyRequest({}) + policy = self._makeOne() + result = policy.authenticated_userid(request) + self.assertEqual(result, None) + + def test_effective_principals(self): + context = DummyContext() + identity = {'repoze.who.identity':{'repoze.who.userid':'fred'}} + request = DummyRequest(identity) + policy = self._makeOne() + result = policy.effective_principals(request) + from repoze.bfg.security import Everyone + from repoze.bfg.security import Authenticated + self.assertEqual(result, [Everyone, Authenticated, 'fred']) + + def test_effective_principals_no_who_ident(self): + context = DummyContext() + request = DummyRequest({}) + policy = self._makeOne() + result = policy.effective_principals(request) + from repoze.bfg.security import Everyone + self.assertEqual(result, [Everyone]) + +class TestWhoInheritingACLSecurityPolicy(TestWhoACLSecurityPolicy): + def _getTargetClass(self): + from repoze.bfg.secpols import WhoInheritingACLSecurityPolicy + return WhoInheritingACLSecurityPolicy + +class TestSecurityPolicyToAuthenticationPolicyAdapter(unittest.TestCase): + def _getTargetClass(self): + from repoze.bfg.secpols import \ + SecurityPolicyToAuthenticationPolicyAdapter + return SecurityPolicyToAuthenticationPolicyAdapter + + def _makeOne(self, secpol): + return self._getTargetClass()(secpol) + + def test_class_implements_IAuthenticationPolicy(self): + from zope.interface.verify import verifyClass + from repoze.bfg.interfaces import IAuthenticationPolicy + verifyClass(IAuthenticationPolicy, self._getTargetClass()) + + def test_instance_implements_IAuthenticationPolicy(self): + from zope.interface.verify import verifyObject + from repoze.bfg.interfaces import IAuthenticationPolicy + verifyObject(IAuthenticationPolicy, self._makeOne(None)) + + def test_authenticated_userid(self): + secpol = DummySecurityPolicy(None) + adapter = self._makeOne(secpol) + result = adapter.authenticated_userid(None, None) + self.assertEqual(result, 'fred') + + def test_effective_principals(self): + secpol = DummySecurityPolicy(None) + adapter = self._makeOne(secpol) + result = adapter.effective_principals(None, None) + self.assertEqual(result, ['fred', 'bob']) + + def test_remember(self): + secpol = DummySecurityPolicy(None) + adapter = self._makeOne(secpol) + result = adapter.remember(None, None, None) + self.assertEqual(result, []) + + def test_forget(self): + secpol = DummySecurityPolicy(None) + adapter = self._makeOne(secpol) + result = adapter.forget(None, None) + self.assertEqual(result, []) + +class TestSecurityPolicyToAuthorizationPolicyAdapter(unittest.TestCase): + def _getTargetClass(self): + from repoze.bfg.secpols import \ + SecurityPolicyToAuthorizationPolicyAdapter + return SecurityPolicyToAuthorizationPolicyAdapter + + def _makeOne(self, secpol): + return self._getTargetClass()(secpol) + + def test_class_implements_IAuthorizationPolicy(self): + from zope.interface.verify import verifyClass + from repoze.bfg.interfaces import IAuthorizationPolicy + verifyClass(IAuthorizationPolicy, self._getTargetClass()) + + def test_instance_implements_IAuthorizationPolicy(self): + from zope.interface.verify import verifyObject + from repoze.bfg.interfaces import IAuthorizationPolicy + verifyObject(IAuthorizationPolicy, self._makeOne(None)) + + def test_permits(self): + from repoze.bfg.threadlocal import manager + manager.push({'request':1}) + try: + secpol = DummySecurityPolicy(None) + adapter = self._makeOne(secpol) + result = adapter.permits(None, None, None) + self.assertEqual(result, None) + self.assertEqual(secpol.checked, (None, 1, None)) + finally: + manager.pop() + + def test_principals_allowed_by_permission(self): + secpol = DummySecurityPolicy(None) + adapter = self._makeOne(secpol) + result = adapter.principals_allowed_by_permission(None, None) + self.assertEqual(result, ['fred', 'bob']) + + + +class DummyContext: + def __init__(self, *arg, **kw): + self.__dict__.update(kw) + +class DummyRequest: + def __init__(self, environ): + self.environ = environ + + +VIEW = 'view' +EDIT = 'edit' +CREATE = 'create' +DELETE = 'delete' +MODERATE = 'moderate' +ADMINISTER = 'administer' +COMMENT = 'comment' + +GUEST_PERMS = (VIEW, COMMENT) +MEMBER_PERMS = GUEST_PERMS + (EDIT, CREATE, DELETE) +MODERATOR_PERMS = MEMBER_PERMS + (MODERATE,) +ADMINISTRATOR_PERMS = MODERATOR_PERMS + (ADMINISTER,) + +class DummySecurityPolicy: + def __init__(self, result): + self.result = result + + def permits(self, *args): + self.checked = args + return self.result + + def authenticated_userid(self, request): + return 'fred' + + def effective_principals(self, request): + return ['fred', 'bob'] + + def principals_allowed_by_permission(self, context, permission): + return ['fred', 'bob'] + diff --git a/repoze/bfg/tests/test_security.py b/repoze/bfg/tests/test_security.py index 03a466e7c..3f18d3a4a 100644 --- a/repoze/bfg/tests/test_security.py +++ b/repoze/bfg/tests/test_security.py @@ -2,433 +2,6 @@ import unittest from repoze.bfg.testing import cleanUp -class TestACLSecurityPolicy(unittest.TestCase): - def setUp(self): - cleanUp() - - def tearDown(self): - cleanUp() - - def _getTargetClass(self): - from repoze.bfg.security import ACLSecurityPolicy - return ACLSecurityPolicy - - def _makeOne(self, *arg, **kw): - klass = self._getTargetClass() - return klass(*arg, **kw) - - def test_class_implements_ISecurityPolicy(self): - from zope.interface.verify import verifyClass - from repoze.bfg.interfaces import ISecurityPolicy - verifyClass(ISecurityPolicy, self._getTargetClass()) - - def test_instance_implements_ISecurityPolicy(self): - from zope.interface.verify import verifyObject - from repoze.bfg.interfaces import ISecurityPolicy - verifyObject(ISecurityPolicy, self._makeOne(lambda *arg: None)) - - def test_permits_no_principals_no_acl_info_on_context(self): - context = DummyContext() - request = DummyRequest({}) - policy = self._makeOne(lambda *arg: []) - result = policy.permits(context, request, 'view') - self.assertEqual(result, False) - from repoze.bfg.security import Everyone - self.assertEqual(result.principals, set([Everyone])) - self.assertEqual(result.permission, 'view') - self.assertEqual(result.context, context) - - def test_permits_no_principals_empty_acl_info_on_context(self): - context = DummyContext() - context.__acl__ = [] - request = DummyRequest({}) - policy = self._makeOne(lambda *arg: []) - result = policy.permits(context, request, 'view') - self.assertEqual(result, False) - from repoze.bfg.security import Everyone - self.assertEqual(result.principals, set([Everyone])) - self.assertEqual(result.permission, 'view') - self.assertEqual(result.context, context) - - def test_permits_no_principals_root_has_empty_acl_info(self): - context = DummyContext() - context.__name__ = None - context.__parent__ = None - context.__acl__ = [] - context2 = DummyContext() - context2.__name__ = 'context2' - context2.__parent__ = context - request = DummyRequest({}) - policy = self._makeOne(lambda *arg: []) - result = policy.permits(context, request, 'view') - self.assertEqual(result, False) - from repoze.bfg.security import Everyone - self.assertEqual(result.principals, set([Everyone])) - self.assertEqual(result.permission, 'view') - self.assertEqual(result.context, context) - - def test_permits_no_principals_root_allows_everyone(self): - context = DummyContext() - context.__name__ = None - context.__parent__ = None - from repoze.bfg.security import Allow, Everyone - context.__acl__ = [ (Allow, Everyone, 'view') ] - context2 = DummyContext() - context2.__name__ = 'context2' - context2.__parent__ = context - request = DummyRequest({}) - policy = self._makeOne(lambda *arg: []) - result = policy.permits(context, request, 'view') - self.assertEqual(result, True) - self.assertEqual(result.principals, set([Everyone])) - self.assertEqual(result.permission, 'view') - self.assertEqual(result.context, context) - - def test_permits_deny_implicit(self): - from repoze.bfg.security import Allow, Authenticated, Everyone - context = DummyContext() - context.__acl__ = [ (Allow, 'somebodyelse', 'read') ] - policy = self._makeOne(lambda *arg: ['fred']) - request = DummyRequest({}) - result = policy.permits(context, request, 'read') - self.assertEqual(result, False) - self.assertEqual(result.principals, - set(['fred', Authenticated, Everyone])) - self.assertEqual(result.permission, 'read') - self.assertEqual(result.context, context) - self.assertEqual(result.ace, None) - - def test_permits_deny_explicit(self): - from repoze.bfg.security import Deny, Authenticated, Everyone - context = DummyContext() - context.__acl__ = [ (Deny, 'fred', 'read') ] - policy = self._makeOne(lambda *arg: ['fred']) - request = DummyRequest({}) - result = policy.permits(context, request, 'read') - self.assertEqual(result, False) - self.assertEqual(result.principals, - set(['fred', Authenticated, Everyone])) - self.assertEqual(result.permission, 'read') - self.assertEqual(result.context, context) - self.assertEqual(result.ace, (Deny, 'fred', 'read')) - - def test_permits_deny_twoacl_implicit(self): - from repoze.bfg.security import Allow, Authenticated, Everyone - context = DummyContext() - acl = [(Allow, 'somebody', 'view'), (Allow, 'somebody', 'write')] - context.__acl__ = acl - policy = self._makeOne(lambda *arg: ['fred']) - request = DummyRequest({}) - result = policy.permits(context, request, 'read') - self.assertEqual(result, False) - self.assertEqual(result.principals, - set(['fred', Authenticated, Everyone])) - self.assertEqual(result.permission, 'read') - self.assertEqual(result.context, context) - self.assertEqual(result.ace, None) - - def test_permits_allow_twoacl_multiperm(self): - from repoze.bfg.security import Allow, Deny, Authenticated, Everyone - context = DummyContext() - acl = [ (Allow, 'fred', ('write', 'view') ), (Deny, 'fred', 'view') ] - context.__acl__ = acl - policy = self._makeOne(lambda *arg: ['fred']) - request = DummyRequest({}) - result = policy.permits(context, request, 'view') - self.assertEqual(result, True) - self.assertEqual(result.principals, - set(['fred', Authenticated, Everyone])) - self.assertEqual(result.permission, 'view') - self.assertEqual(result.context, context) - self.assertEqual(result.ace, (Allow, 'fred', ('write', 'view') )) - - def test_permits_deny_twoacl_multiperm(self): - from repoze.bfg.security import Allow, Deny, Authenticated, Everyone - context = DummyContext() - acl = [] - deny = (Deny, 'fred', ('view', 'read')) - allow = (Allow, 'fred', 'view') - context.__acl__ = [deny, allow] - policy = self._makeOne(lambda *arg: ['fred']) - request = DummyRequest({}) - result = policy.permits(context, request, 'read') - self.assertEqual(result, False) - self.assertEqual(result.principals, - set(['fred', Authenticated, Everyone])) - self.assertEqual(result.permission, 'read') - self.assertEqual(result.context, context) - self.assertEqual(result.ace, deny) - - def test_permits_allow_via_location_parent(self): - from repoze.bfg.security import Allow, Authenticated, Everyone - context = DummyContext() - context.__parent__ = None - context.__name__ = None - context.__acl__ = [ (Allow, 'fred', 'read') ] - context2 = DummyContext() - context2.__parent__ = context - context2.__name__ = 'myname' - - policy = self._makeOne(lambda *arg: ['fred']) - request = DummyRequest({}) - result = policy.permits(context2, request, 'read') - self.assertEqual(result, True) - self.assertEqual(result.principals, - set(['fred', Authenticated, Everyone])) - self.assertEqual(result.permission, 'read') - self.assertEqual(result.context, context) - self.assertEqual(result.ace, ('Allow', 'fred', 'read')) - - def test_permits_deny_byorder(self): - from repoze.bfg.security import Allow, Deny, Authenticated, Everyone - context = DummyContext() - acl = [] - deny = (Deny, 'fred', 'read') - allow = (Allow, 'fred', 'view') - context.__acl__ = [deny, allow] - policy = self._makeOne(lambda *arg: ['fred']) - request = DummyRequest({}) - result = policy.permits(context, request, 'read') - self.assertEqual(result, False) - self.assertEqual(result.principals, - set(['fred', Authenticated, Everyone])) - self.assertEqual(result.permission, 'read') - self.assertEqual(result.context, context) - self.assertEqual(result.ace, deny) - - def test_permits_allow_byorder(self): - from repoze.bfg.security import Allow, Deny, Authenticated, Everyone - context = DummyContext() - acl = [] - deny = (Deny, 'fred', ('view', 'read')) - allow = (Allow, 'fred', 'view') - context.__acl__ = [allow, deny] - policy = self._makeOne(lambda *arg: ['fred']) - request = DummyRequest({}) - result = policy.permits(context, request, 'view') - self.assertEqual(result, True) - self.assertEqual(result.principals, - set(['fred', Authenticated, Everyone])) - self.assertEqual(result.permission, 'view') - self.assertEqual(result.context, context) - self.assertEqual(result.ace, allow) - - def test_principals_allowed_by_permission_direct(self): - from repoze.bfg.security import Allow - context = DummyContext() - acl = [ (Allow, 'chrism', ('read', 'write')), - (Allow, 'other', 'read') ] - context.__acl__ = acl - policy = self._makeOne(lambda *arg: None) - result = policy.principals_allowed_by_permission(context, 'read') - self.assertEqual(result, ['chrism', 'other']) - - def test_principals_allowed_by_permission_acquired(self): - from repoze.bfg.security import Allow - context = DummyContext() - acl = [ (Allow, 'chrism', ('read', 'write')), - (Allow, 'other', ('read',)) ] - context.__acl__ = acl - context.__parent__ = None - context.__name__ = 'context' - inter = DummyContext() - inter.__name__ = None - inter.__parent__ = context - policy = self._makeOne(lambda *arg: None) - result = policy.principals_allowed_by_permission(inter, 'read') - self.assertEqual(result, ['chrism', 'other']) - - def test_principals_allowed_by_permission_no_acls(self): - policy = self._makeOne(lambda *arg: None) - result = policy.principals_allowed_by_permission(None, 'read') - self.assertEqual(result, []) - -class TestInheritingACLSecurityPolicy(unittest.TestCase): - def setUp(self): - cleanUp() - - def tearDown(self): - cleanUp() - - def _getTargetClass(self): - from repoze.bfg.security import InheritingACLSecurityPolicy - return InheritingACLSecurityPolicy - - def _makeOne(self, *arg, **kw): - klass = self._getTargetClass() - return klass(*arg, **kw) - - def test_class_implements_ISecurityPolicy(self): - from zope.interface.verify import verifyClass - from repoze.bfg.interfaces import ISecurityPolicy - verifyClass(ISecurityPolicy, self._getTargetClass()) - - def test_instance_implements_ISecurityPolicy(self): - from zope.interface.verify import verifyObject - from repoze.bfg.interfaces import ISecurityPolicy - verifyObject(ISecurityPolicy, self._makeOne(lambda *arg: None)) - - def test_permits(self): - from repoze.bfg.security import Deny - from repoze.bfg.security import Allow - from repoze.bfg.security import Everyone - from repoze.bfg.security import Authenticated - from repoze.bfg.security import ALL_PERMISSIONS - from repoze.bfg.security import DENY_ALL - policy = self._makeOne(lambda *arg: []) - root = DummyContext() - community = DummyContext(__name__='community', __parent__=root) - blog = DummyContext(__name__='blog', __parent__=community) - root.__acl__ = [ - (Allow, Authenticated, VIEW), - ] - community.__acl__ = [ - (Allow, 'fred', ALL_PERMISSIONS), - (Allow, 'wilma', VIEW), - DENY_ALL, - ] - blog.__acl__ = [ - (Allow, 'barney', MEMBER_PERMS), - (Allow, 'wilma', VIEW), - ] - policy = self._makeOne(lambda request: request.principals) - request = DummyRequest({}) - - request.principals = ['wilma'] - result = policy.permits(blog, request, 'view') - self.assertEqual(result, True) - self.assertEqual(result.context, blog) - self.assertEqual(result.ace, (Allow, 'wilma', VIEW)) - result = policy.permits(blog, request, 'delete') - self.assertEqual(result, False) - self.assertEqual(result.context, community) - self.assertEqual(result.ace, (Deny, Everyone, ALL_PERMISSIONS)) - - request.principals = ['fred'] - result = policy.permits(blog, request, 'view') - self.assertEqual(result, True) - self.assertEqual(result.context, community) - self.assertEqual(result.ace, (Allow, 'fred', ALL_PERMISSIONS)) - result = policy.permits(blog, request, 'doesntevenexistyet') - self.assertEqual(result, True) - self.assertEqual(result.context, community) - self.assertEqual(result.ace, (Allow, 'fred', ALL_PERMISSIONS)) - - request.principals = ['barney'] - result = policy.permits(blog, request, 'view') - self.assertEqual(result, True) - self.assertEqual(result.context, blog) - self.assertEqual(result.ace, (Allow, 'barney', MEMBER_PERMS)) - result = policy.permits(blog, request, 'administer') - self.assertEqual(result, False) - self.assertEqual(result.context, community) - self.assertEqual(result.ace, (Deny, Everyone, ALL_PERMISSIONS)) - - request.principals = ['someguy'] - result = policy.permits(root, request, 'view') - self.assertEqual(result, True) - self.assertEqual(result.context, root) - self.assertEqual(result.ace, (Allow, Authenticated, VIEW)) - result = policy.permits(blog, request, 'view') - self.assertEqual(result, False) - self.assertEqual(result.context, community) - self.assertEqual(result.ace, (Deny, Everyone, ALL_PERMISSIONS)) - - request.principals = [] - result = policy.permits(root, request, 'view') - self.assertEqual(result, False) - self.assertEqual(result.context, root) - self.assertEqual(result.ace, None) - - request.principals = [] - context = DummyContext() - result = policy.permits(context, request, 'view') - self.assertEqual(result, False) - - def test_principals_allowed_by_permission_direct(self): - from repoze.bfg.security import Allow - from repoze.bfg.security import DENY_ALL - context = DummyContext() - acl = [ (Allow, 'chrism', ('read', 'write')), - DENY_ALL, - (Allow, 'other', 'read') ] - context.__acl__ = acl - policy = self._makeOne(lambda *arg: None) - result = sorted( - policy.principals_allowed_by_permission(context, 'read')) - self.assertEqual(result, ['chrism']) - - def test_principals_allowed_by_permission(self): - from repoze.bfg.security import Allow - from repoze.bfg.security import Deny - from repoze.bfg.security import DENY_ALL - from repoze.bfg.security import ALL_PERMISSIONS - root = DummyContext(__name__='', __parent__=None) - community = DummyContext(__name__='community', __parent__=root) - blog = DummyContext(__name__='blog', __parent__=community) - root.__acl__ = [ (Allow, 'chrism', ('read', 'write')), - (Allow, 'other', ('read',)), - (Allow, 'jim', ALL_PERMISSIONS)] - community.__acl__ = [ (Deny, 'flooz', 'read'), - (Allow, 'flooz', 'read'), - (Allow, 'mork', 'read'), - (Deny, 'jim', 'read'), - (Allow, 'someguy', 'manage')] - blog.__acl__ = [ (Allow, 'fred', 'read'), - DENY_ALL] - - policy = self._makeOne(lambda *arg: None) - result = sorted(policy.principals_allowed_by_permission(blog, 'read')) - self.assertEqual(result, ['fred']) - result = sorted(policy.principals_allowed_by_permission(community, - 'read')) - self.assertEqual(result, ['chrism', 'mork', 'other']) - result = sorted(policy.principals_allowed_by_permission(community, - 'read')) - result = sorted(policy.principals_allowed_by_permission(root, 'read')) - self.assertEqual(result, ['chrism', 'jim', 'other']) - - def test_principals_allowed_by_permission_no_acls(self): - policy = self._makeOne(lambda *arg: None) - context = DummyContext() - result = sorted(policy.principals_allowed_by_permission(context,'read')) - self.assertEqual(result, []) - - def test_effective_principals(self): - context = DummyContext() - request = DummyRequest({}) - request.principals = ['fred'] - policy = self._makeOne(lambda request: request.principals) - result = sorted(policy.effective_principals(request)) - from repoze.bfg.security import Everyone - from repoze.bfg.security import Authenticated - self.assertEqual(result, - ['fred', Authenticated, Everyone]) - - def test_no_effective_principals(self): - context = DummyContext() - request = DummyRequest({}) - request.principals = [] - policy = self._makeOne(lambda request: request.principals) - result = sorted(policy.effective_principals(request)) - from repoze.bfg.security import Everyone - self.assertEqual(result, [Everyone]) - - def test_authenticated_userid(self): - context = DummyContext() - request = DummyRequest({}) - request.principals = ['fred'] - policy = self._makeOne(lambda request: request.principals) - result = policy.authenticated_userid(request) - self.assertEqual(result, 'fred') - - def test_no_authenticated_userid(self): - context = DummyContext() - request = DummyRequest({}) - request.principals = [] - policy = self._makeOne(lambda request: request.principals) - result = policy.authenticated_userid(request) - self.assertEqual(result, None) class TestAllPermissionsList(unittest.TestCase): def setUp(self): @@ -454,211 +27,13 @@ class TestAllPermissionsList(unittest.TestCase): from repoze.bfg.security import ALL_PERMISSIONS self.assertEqual(ALL_PERMISSIONS.__class__, self._getTargetClass()) -class TestRemoteUserACLSecurityPolicy(unittest.TestCase): - def setUp(self): - cleanUp() - - def tearDown(self): - cleanUp() - - def _getTargetClass(self): - from repoze.bfg.security import RemoteUserACLSecurityPolicy - return RemoteUserACLSecurityPolicy - - def _makeOne(self, *arg, **kw): - klass = self._getTargetClass() - return klass(*arg, **kw) - - def test_instance_implements_ISecurityPolicy(self): - from zope.interface.verify import verifyObject - from repoze.bfg.interfaces import ISecurityPolicy - verifyObject(ISecurityPolicy, self._makeOne()) - - def test_authenticated_userid(self): - context = DummyContext() - request = DummyRequest({'REMOTE_USER':'fred'}) - policy = self._makeOne() - result = policy.authenticated_userid(request) - self.assertEqual(result, 'fred') - - def test_authenticated_userid_no_remote_user(self): - context = DummyContext() - request = DummyRequest({}) - policy = self._makeOne() - result = policy.authenticated_userid(request) - self.assertEqual(result, None) - - def test_effective_principals(self): - context = DummyContext() - request = DummyRequest({'REMOTE_USER':'fred'}) - policy = self._makeOne() - result = policy.effective_principals(request) - from repoze.bfg.security import Everyone - from repoze.bfg.security import Authenticated - self.assertEqual(result, [Everyone, Authenticated, 'fred']) - - def test_effective_principals_no_remote_user(self): - context = DummyContext() - request = DummyRequest({}) - policy = self._makeOne() - result = policy.effective_principals(request) - from repoze.bfg.security import Everyone - self.assertEqual(result, [Everyone]) - -class TestRemoteUserInheritingACLSecurityPolicy(TestRemoteUserACLSecurityPolicy): - def _getTargetClass(self): - from repoze.bfg.security import RemoteUserInheritingACLSecurityPolicy - return RemoteUserInheritingACLSecurityPolicy - -class TestWhoACLSecurityPolicy(unittest.TestCase): +class TestViewPermissionFactory(unittest.TestCase): def setUp(self): cleanUp() def tearDown(self): cleanUp() - - def _getTargetClass(self): - from repoze.bfg.security import WhoACLSecurityPolicy - return WhoACLSecurityPolicy - - def _makeOne(self, *arg, **kw): - klass = self._getTargetClass() - return klass(*arg, **kw) - - def test_instance_implements_ISecurityPolicy(self): - from zope.interface.verify import verifyObject - from repoze.bfg.interfaces import ISecurityPolicy - verifyObject(ISecurityPolicy, self._makeOne()) - - def test_authenticated_userid(self): - context = DummyContext() - identity = {'repoze.who.identity':{'repoze.who.userid':'fred'}} - request = DummyRequest(identity) - policy = self._makeOne() - result = policy.authenticated_userid(request) - self.assertEqual(result, 'fred') - - def test_authenticated_userid_no_who_ident(self): - context = DummyContext() - request = DummyRequest({}) - policy = self._makeOne() - result = policy.authenticated_userid(request) - self.assertEqual(result, None) - - def test_effective_principals(self): - context = DummyContext() - identity = {'repoze.who.identity':{'repoze.who.userid':'fred'}} - request = DummyRequest(identity) - policy = self._makeOne() - result = policy.effective_principals(request) - from repoze.bfg.security import Everyone - from repoze.bfg.security import Authenticated - self.assertEqual(result, [Everyone, Authenticated, 'fred']) - - def test_effective_principals_no_who_ident(self): - context = DummyContext() - request = DummyRequest({}) - policy = self._makeOne() - result = policy.effective_principals(request) - from repoze.bfg.security import Everyone - self.assertEqual(result, [Everyone]) - -class TestWhoInheritingACLSecurityPolicy(TestWhoACLSecurityPolicy): - def _getTargetClass(self): - from repoze.bfg.security import WhoInheritingACLSecurityPolicy - return WhoInheritingACLSecurityPolicy - -class TestAPIFunctions(unittest.TestCase): - def setUp(self): - cleanUp() - def tearDown(self): - cleanUp() - - def _registerSecurityPolicy(self, secpol): - import zope.component - gsm = zope.component.getGlobalSiteManager() - from repoze.bfg.interfaces import ISecurityPolicy - gsm.registerUtility(secpol, ISecurityPolicy) - - def test_has_permission_registered(self): - secpol = DummySecurityPolicy(False) - self._registerSecurityPolicy(secpol) - from repoze.bfg.security import has_permission - self.assertEqual(has_permission('view', None, None), False) - - def test_has_permission_not_registered(self): - from repoze.bfg.security import has_permission - result = has_permission('view', None, None) - self.assertEqual(result, True) - self.assertEqual(result.msg, 'No security policy in use.') - - def test_authenticated_userid_registered(self): - secpol = DummySecurityPolicy(False) - self._registerSecurityPolicy(secpol) - from repoze.bfg.security import authenticated_userid - request = DummyRequest({}) - self.assertEqual(authenticated_userid(request), 'fred') - - def test_authenticated_userid_not_registered(self): - from repoze.bfg.security import authenticated_userid - request = DummyRequest({}) - self.assertEqual(authenticated_userid(request), None) - - def test_effective_principals_registered(self): - secpol = DummySecurityPolicy(False) - self._registerSecurityPolicy(secpol) - from repoze.bfg.security import effective_principals - request = DummyRequest({}) - self.assertEqual(effective_principals(request), ['fred', 'bob']) - - def test_effective_principals_not_registered(self): - from repoze.bfg.security import effective_principals - request = DummyRequest({}) - self.assertEqual(effective_principals(request), []) - - def test_principals_allowed_by_permission_not_registered(self): - from repoze.bfg.security import principals_allowed_by_permission - from repoze.bfg.security import Everyone - self.assertEqual(principals_allowed_by_permission(None, None), - [Everyone]) - - def test_principals_allowed_by_permission_registered(self): - secpol = DummySecurityPolicy(False) - self._registerSecurityPolicy(secpol) - from repoze.bfg.security import principals_allowed_by_permission - self.assertEqual(principals_allowed_by_permission(None, None), - ['fred', 'bob']) - -class TestViewPermission(unittest.TestCase): - def _getTargetClass(self): - from repoze.bfg.security import ViewPermission - return ViewPermission - - def _makeOne(self, *arg, **kw): - klass = self._getTargetClass() - return klass(*arg, **kw) - - def test_call(self): - context = DummyContext() - request = DummyRequest({}) - secpol = DummySecurityPolicy(True) - permission = self._makeOne(context, request, 'repoze.view') - result = permission(secpol) - self.assertEqual(result, True) - self.assertEqual(secpol.checked, (context, request, 'repoze.view')) - - def test_repr(self): - context = DummyContext() - request = DummyRequest({}) - request.view_name = 'viewname' - secpol = DummySecurityPolicy(True) - permission = self._makeOne(context, request, 'repoze.view') - result = repr(permission) - self.failUnless(result.startswith('<Permission at ')) - self.failUnless(result.endswith(" named 'repoze.view' for 'viewname'>")) - -class TestViewPermissionFactory(unittest.TestCase): def _getTargetClass(self): from repoze.bfg.security import ViewPermissionFactory return ViewPermissionFactory @@ -671,10 +46,9 @@ class TestViewPermissionFactory(unittest.TestCase): context = DummyContext() request = DummyRequest({}) factory = self._makeOne('repoze.view') + self.assertEqual(factory.permission_name, 'repoze.view') result = factory(context, request) - self.assertEqual(result.permission_name, 'repoze.view') - self.assertEqual(result.context, context) - self.assertEqual(result.request, request) + self.assertEqual(result, True) class TestAllowed(unittest.TestCase): def _getTargetClass(self): @@ -752,6 +126,222 @@ class TestACLDenied(unittest.TestCase): self.failUnless('<ACLDenied instance at ' in repr(denied)) self.failUnless("with msg %r>" % msg in repr(denied)) +class TestViewExecutionPermitted(unittest.TestCase): + def setUp(self): + cleanUp() + + def tearDown(self): + cleanUp() + + def _callFUT(self, *arg, **kw): + from repoze.bfg.security import view_execution_permitted + return view_execution_permitted(*arg, **kw) + + def _registerViewPermission(self, view_name, allow=True): + import zope.component + from zope.interface import Interface + from repoze.bfg.interfaces import IViewPermission + class Checker(object): + def __call__(self, context, request): + self.context = context + self.request = request + return allow + checker = Checker() + gsm = zope.component.getGlobalSiteManager() + gsm.registerAdapter(checker, (Interface, Interface), + IViewPermission, + view_name) + return checker + + def test_no_permission(self): + import zope.component + gsm = zope.component.getGlobalSiteManager() + from repoze.bfg.interfaces import ISettings + settings = DummySettings(debug_authorization=True) + gsm.registerUtility(settings, ISettings) + context = DummyContext() + request = DummyRequest({}) + result = self._callFUT(context, request, '') + msg = result.msg + self.failUnless("Allowed: view name '' in context" in msg) + self.failUnless('(no permission defined)' in msg) + self.assertEqual(result, True) + + def test_with_permission(self): + from zope.interface import Interface + from zope.interface import directlyProvides + from repoze.bfg.interfaces import IRequest + class IContext(Interface): + pass + context = DummyContext() + directlyProvides(context, IContext) + checker = self._registerViewPermission('', True) + request = DummyRequest({}) + directlyProvides(request, IRequest) + result = self._callFUT(context, request, '') + self.failUnless(result is True) + +def _registerAuthenticationPolicy(result): + from repoze.bfg.interfaces import IAuthenticationPolicy + policy = DummyAuthenticationPolicy(result) + import zope.component + gsm = zope.component.getGlobalSiteManager() + gsm.registerUtility(policy, IAuthenticationPolicy) + return policy + +def _registerAuthorizationPolicy(result): + from repoze.bfg.interfaces import IAuthorizationPolicy + policy = DummyAuthorizationPolicy(result) + import zope.component + gsm = zope.component.getGlobalSiteManager() + gsm.registerUtility(policy, IAuthorizationPolicy) + return policy + + +class TestHasPermission(unittest.TestCase): + def setUp(self): + cleanUp() + + def tearDown(self): + cleanUp() + + def _callFUT(self, *arg): + from repoze.bfg.security import has_permission + return has_permission(*arg) + + def test_no_authentication_policy(self): + result = self._callFUT('view', None, None) + self.assertEqual(result, True) + self.assertEqual(result.msg, 'No authentication policy in use.') + + def test_authentication_policy_no_authorization_policy(self): + _registerAuthenticationPolicy(None) + self.assertRaises(ValueError, self._callFUT, 'view', None, None) + + def test_authn_and_authz_policies_registered(self): + _registerAuthenticationPolicy(None) + pol = _registerAuthorizationPolicy('yo') + self.assertEqual(self._callFUT('view', None, None), 'yo') + +class TestAuthenticatedUserId(unittest.TestCase): + def setUp(self): + cleanUp() + + def tearDown(self): + cleanUp() + + def _callFUT(self, *arg): + from repoze.bfg.security import authenticated_userid + return authenticated_userid(*arg) + + def test_no_authentication_policy(self): + context = DummyContext() + request = DummyRequest({}) + result = self._callFUT(context, request) + self.assertEqual(result, None) + + def test_with_authentication_policy(self): + _registerAuthenticationPolicy('yo') + context = DummyContext() + request = DummyRequest({}) + result = self._callFUT(context, request) + self.assertEqual(result, 'yo') + +class TestEffectivePrincipals(unittest.TestCase): + def setUp(self): + cleanUp() + + def tearDown(self): + cleanUp() + + def _callFUT(self, *arg): + from repoze.bfg.security import effective_principals + return effective_principals(*arg) + + def test_no_authentication_policy(self): + context = DummyContext() + request = DummyRequest({}) + result = self._callFUT(context, request) + self.assertEqual(result, []) + + def test_with_authentication_policy(self): + _registerAuthenticationPolicy('yo') + context = DummyContext() + request = DummyRequest({}) + result = self._callFUT(context, request) + self.assertEqual(result, 'yo') + +class TestPrincipalsAllowedByPermission(unittest.TestCase): + def setUp(self): + cleanUp() + + def tearDown(self): + cleanUp() + + def _callFUT(self, *arg): + from repoze.bfg.security import principals_allowed_by_permission + return principals_allowed_by_permission(*arg) + + def test_no_authorization_policy(self): + from repoze.bfg.security import Everyone + context = DummyContext() + result = self._callFUT(context, 'view') + self.assertEqual(result, [Everyone]) + + def test_with_authorization_policy(self): + _registerAuthorizationPolicy('yo') + context = DummyContext() + result = self._callFUT(context, 'view') + self.assertEqual(result, 'yo') + +class TestRemember(unittest.TestCase): + def setUp(self): + cleanUp() + + def tearDown(self): + cleanUp() + + def _callFUT(self, *arg): + from repoze.bfg.security import remember + return remember(*arg) + + def test_no_authentication_policy(self): + context = DummyContext() + request = DummyRequest({}) + result = self._callFUT(context, request, 'me') + self.assertEqual(result, []) + + def test_with_authentication_policy(self): + _registerAuthenticationPolicy('yo') + context = DummyContext() + request = DummyRequest({}) + result = self._callFUT(context, request, 'me') + self.assertEqual(result, 'yo') + +class TestForget(unittest.TestCase): + def setUp(self): + cleanUp() + + def tearDown(self): + cleanUp() + + def _callFUT(self, *arg): + from repoze.bfg.security import forget + return forget(*arg) + + def test_no_authentication_policy(self): + context = DummyContext() + request = DummyRequest({}) + result = self._callFUT(context, request) + self.assertEqual(result, []) + + def test_with_authentication_policy(self): + _registerAuthenticationPolicy('yo') + context = DummyContext() + request = DummyRequest({}) + result = self._callFUT(context, request) + self.assertEqual(result, 'yo') + class DummyContext: def __init__(self, *arg, **kw): self.__dict__.update(kw) @@ -760,33 +350,33 @@ class DummyRequest: def __init__(self, environ): self.environ = environ -class DummySecurityPolicy: +class DummyAuthenticationPolicy: def __init__(self, result): self.result = result - def permits(self, *args): - self.checked = args + def effective_principals(self, context, request): return self.result - def authenticated_userid(self, request): - return 'fred' + def authenticated_userid(self, context, request): + return self.result - def effective_principals(self, request): - return ['fred', 'bob'] + def remember(self, context, request, principal, **kw): + return self.result + + def forget(self, context, request): + return self.result + +class DummyAuthorizationPolicy: + def __init__(self, result): + self.result = result + + def permits(self, context, principals, permission): + return self.result def principals_allowed_by_permission(self, context, permission): - return ['fred', 'bob'] - -VIEW = 'view' -EDIT = 'edit' -CREATE = 'create' -DELETE = 'delete' -MODERATE = 'moderate' -ADMINISTER = 'administer' -COMMENT = 'comment' - -GUEST_PERMS = (VIEW, COMMENT) -MEMBER_PERMS = GUEST_PERMS + (EDIT, CREATE, DELETE) -MODERATOR_PERMS = MEMBER_PERMS + (MODERATE,) -ADMINISTRATOR_PERMS = MODERATOR_PERMS + (ADMINISTER,) + return self.result + +class DummySettings: + def __init__(self, **kw): + self.__dict__.update(kw) diff --git a/repoze/bfg/tests/test_testing.py b/repoze/bfg/tests/test_testing.py index 504151ce2..5024c41ba 100644 --- a/repoze/bfg/tests/test_testing.py +++ b/repoze/bfg/tests/test_testing.py @@ -8,29 +8,20 @@ class TestTestingFunctions(unittest.TestCase): def tearDown(self): cleanUp() - def test_registerDummySecurityPolicy_permissive(self): - from repoze.bfg import testing - testing.registerDummySecurityPolicy('user', ('group1', 'group2'), - permissive=True) - from repoze.bfg.interfaces import ISecurityPolicy - from zope.component import getUtility - ut = getUtility(ISecurityPolicy) - from repoze.bfg.testing import DummyAllowingSecurityPolicy - self.failUnless(isinstance(ut, DummyAllowingSecurityPolicy)) - self.assertEqual(ut.userid, 'user') - self.assertEqual(ut.groupids, ('group1', 'group2')) - - def test_registerDummySecurityPolicy_nonpermissive(self): + def test_registerDummySecurityPolicy(self): from repoze.bfg import testing testing.registerDummySecurityPolicy('user', ('group1', 'group2'), permissive=False) - from repoze.bfg.interfaces import ISecurityPolicy + from repoze.bfg.interfaces import IAuthenticationPolicy + from repoze.bfg.interfaces import IAuthorizationPolicy from zope.component import getUtility - ut = getUtility(ISecurityPolicy) - from repoze.bfg.testing import DummyDenyingSecurityPolicy - self.failUnless(isinstance(ut, DummyDenyingSecurityPolicy)) + ut = getUtility(IAuthenticationPolicy) + from repoze.bfg.testing import DummySecurityPolicy + self.failUnless(isinstance(ut, DummySecurityPolicy)) + ut = getUtility(IAuthorizationPolicy) self.assertEqual(ut.userid, 'user') self.assertEqual(ut.groupids, ('group1', 'group2')) + self.assertEqual(ut.permissive, False) def test_registerModels(self): ob1 = object() @@ -153,36 +144,30 @@ class TestTestingFunctions(unittest.TestCase): self.assertEqual(response.body, '123') def test_registerViewPermission_defaults(self): + from repoze.bfg.security import view_execution_permitted from repoze.bfg import testing view = testing.registerViewPermission('moo.html') - from repoze.bfg.view import view_execution_permitted testing.registerDummySecurityPolicy() result = view_execution_permitted(None, None, 'moo.html') self.failUnless(result) self.assertEqual(result.msg, 'message') def test_registerViewPermission_denying(self): + from repoze.bfg.security import view_execution_permitted from repoze.bfg import testing view = testing.registerViewPermission('moo.html', result=False) - from repoze.bfg.view import view_execution_permitted testing.registerDummySecurityPolicy() result = view_execution_permitted(None, None, 'moo.html') self.failIf(result) self.assertEqual(result.msg, 'message') def test_registerViewPermission_custom(self): - class ViewPermission: - def __init__(self, context, request): - self.context = context - self.request = request - - def __call__(self, secpol): - return True - + from repoze.bfg.security import view_execution_permitted + def viewperm(context, request): + return True from repoze.bfg import testing view = testing.registerViewPermission('moo.html', - viewpermission=ViewPermission) - from repoze.bfg.view import view_execution_permitted + viewpermission=viewperm) testing.registerDummySecurityPolicy() result = view_execution_permitted(None, None, 'moo.html') self.failUnless(result is True) @@ -226,30 +211,30 @@ class TestTestingFunctions(unittest.TestCase): testing.registerUtility(utility, iface, name='mudge') self.assertEqual(getUtility(iface, name='mudge')(), 'foo') -class TestDummyAllowingSecurityPolicy(unittest.TestCase): +class TestDummySecurityPolicy(unittest.TestCase): def _getTargetClass(self): - from repoze.bfg.testing import DummyAllowingSecurityPolicy - return DummyAllowingSecurityPolicy + from repoze.bfg.testing import DummySecurityPolicy + return DummySecurityPolicy - def _makeOne(self, userid=None, groupids=()): + def _makeOne(self, userid=None, groupids=(), permissive=True): klass = self._getTargetClass() - return klass(userid, groupids) + return klass(userid, groupids, permissive) def test_authenticated_userid(self): policy = self._makeOne('user') - self.assertEqual(policy.authenticated_userid(None), 'user') + self.assertEqual(policy.authenticated_userid(None, None), 'user') def test_effective_principals_userid(self): policy = self._makeOne('user', ('group1',)) from repoze.bfg.security import Everyone from repoze.bfg.security import Authenticated - self.assertEqual(policy.effective_principals(None), + self.assertEqual(policy.effective_principals(None, None), [Everyone, Authenticated, 'user', 'group1']) def test_effective_principals_nouserid(self): policy = self._makeOne() from repoze.bfg.security import Everyone - self.assertEqual(policy.effective_principals(None), [Everyone]) + self.assertEqual(policy.effective_principals(None, None), [Everyone]) def test_permits(self): policy = self._makeOne() @@ -259,44 +244,19 @@ class TestDummyAllowingSecurityPolicy(unittest.TestCase): policy = self._makeOne('user', ('group1',)) from repoze.bfg.security import Everyone from repoze.bfg.security import Authenticated - self.assertEqual(policy.principals_allowed_by_permission(None, None), - [Everyone, Authenticated, 'user', 'group1']) - - -class TestDummyDenyingSecurityPolicy(unittest.TestCase): - def _getTargetClass(self): - from repoze.bfg.testing import DummyDenyingSecurityPolicy - return DummyDenyingSecurityPolicy - - def _makeOne(self, userid=None, groupids=()): - klass = self._getTargetClass() - return klass(userid, groupids) + result = policy.principals_allowed_by_permission(None, None) + self.assertEqual(result, [Everyone, Authenticated, 'user', 'group1']) - def test_authenticated_userid(self): - policy = self._makeOne('user') - self.assertEqual(policy.authenticated_userid(None), 'user') - - def test_effective_principals_userid(self): - policy = self._makeOne('user', ('group1',)) - from repoze.bfg.security import Everyone - from repoze.bfg.security import Authenticated - self.assertEqual(policy.effective_principals(None), - [Everyone, Authenticated, 'user', 'group1']) - - def test_effective_principals_nouserid(self): + def test_forget(self): policy = self._makeOne() - from repoze.bfg.security import Everyone - self.assertEqual(policy.effective_principals(None), [Everyone]) - - def test_permits(self): + self.assertEqual(policy.forget(None, None), []) + + def test_remember(self): policy = self._makeOne() - self.assertEqual(policy.permits(None, None, None), False) + self.assertEqual(policy.remember(None, None, None), []) - def test_principals_allowed_by_permission(self): - policy = self._makeOne('user', ('group1',)) - self.assertEqual(policy.principals_allowed_by_permission(None, None), - []) + class TestDummyModel(unittest.TestCase): def _getTargetClass(self): from repoze.bfg.testing import DummyModel diff --git a/repoze/bfg/tests/test_threadlocal.py b/repoze/bfg/tests/test_threadlocal.py new file mode 100644 index 000000000..230bb3726 --- /dev/null +++ b/repoze/bfg/tests/test_threadlocal.py @@ -0,0 +1,46 @@ +from repoze.bfg.testing import cleanUp +import unittest + +class TestThreadLocalManager(unittest.TestCase): + def setUp(self): + cleanUp() + + def tearDown(self): + cleanUp() + + def _getTargetClass(self): + from repoze.bfg.threadlocal import ThreadLocalManager + return ThreadLocalManager + + def _makeOne(self, default=lambda *x: 1): + return self._getTargetClass()(default) + + def test_init(self): + local = self._makeOne() + self.assertEqual(local.stack, []) + self.assertEqual(local.get(), 1) + + def test_default(self): + from zope.component import getGlobalSiteManager + local = self._makeOne(getGlobalSiteManager) + self.assertEqual(local.stack, []) + self.assertEqual(local.get(), getGlobalSiteManager()) + + def test_push_and_pop(self): + local = self._makeOne() + local.push(True) + self.assertEqual(local.get(), True) + self.assertEqual(local.pop(), True) + self.assertEqual(local.pop(), None) + self.assertEqual(local.get(), 1) + + def test_set_get_and_clear(self): + local = self._makeOne() + local.set(None) + self.assertEqual(local.stack, [None]) + self.assertEqual(local.get(), None) + local.clear() + self.assertEqual(local.get(), 1) + local.clear() + self.assertEqual(local.get(), 1) + diff --git a/repoze/bfg/tests/test_view.py b/repoze/bfg/tests/test_view.py index 08beffeaa..cc293d6ef 100644 --- a/repoze/bfg/tests/test_view.py +++ b/repoze/bfg/tests/test_view.py @@ -15,11 +15,21 @@ class BaseTest(object): from repoze.bfg.interfaces import IView gsm.registerAdapter(app, for_, IView, name) - def _registerPermission(self, permission, name, *for_): + def _registerViewPermission(self, view_name, allow=True): import zope.component - gsm = zope.component.getGlobalSiteManager() + from zope.interface import Interface from repoze.bfg.interfaces import IViewPermission - gsm.registerAdapter(permission, for_, IViewPermission, name) + class Checker(object): + def __call__(self, context, request): + self.context = context + self.request = request + return allow + checker = Checker() + gsm = zope.component.getGlobalSiteManager() + gsm.registerAdapter(checker, (Interface, Interface), + IViewPermission, + view_name) + return checker def _registerSecurityPolicy(self, secpol): import zope.component @@ -61,12 +71,10 @@ class RenderViewToResponseTests(BaseTest, unittest.TestCase): directlyProvides(context, IContext) response = DummyResponse() secpol = DummySecurityPolicy() - permissionfactory = make_permission_factory(False) view = make_view(response) self._registerView(view, 'registered', IContext, IRequest) self._registerSecurityPolicy(secpol) - self._registerPermission(permissionfactory, 'registered', IContext, - IRequest) + self._registerViewPermission('registered', False) environ = self._makeEnviron() from webob import Request request = Request(environ) @@ -85,12 +93,10 @@ class RenderViewToResponseTests(BaseTest, unittest.TestCase): directlyProvides(context, IContext) response = DummyResponse() secpol = DummySecurityPolicy() - permissionfactory = make_permission_factory(True) view = make_view(response) self._registerView(view, 'registered', IContext, IRequest) self._registerSecurityPolicy(secpol) - self._registerPermission(permissionfactory, 'registered', IContext, - IRequest) + self._registerViewPermission('registered', True) environ = self._makeEnviron() from webob import Request request = Request(environ) @@ -109,12 +115,10 @@ class RenderViewToResponseTests(BaseTest, unittest.TestCase): directlyProvides(context, IContext) response = DummyResponse() secpol = DummySecurityPolicy() - permissionfactory = make_permission_factory(False) view = make_view(response) self._registerView(view, 'registered', IContext, IRequest) self._registerSecurityPolicy(secpol) - self._registerPermission(permissionfactory, 'registered', IContext, - IRequest) + self._registerViewPermission('registered', False) environ = self._makeEnviron() from webob import Request request = Request(environ) @@ -146,12 +150,10 @@ class RenderViewToIterableTests(BaseTest, unittest.TestCase): directlyProvides(context, IContext) response = DummyResponse() secpol = DummySecurityPolicy() - permissionfactory = make_permission_factory(False) view = make_view(response) self._registerView(view, 'registered', IContext, IRequest) self._registerSecurityPolicy(secpol) - self._registerPermission(permissionfactory, 'registered', IContext, - IRequest) + self._registerViewPermission('registered', False) environ = self._makeEnviron() from webob import Request request = Request(environ) @@ -170,12 +172,10 @@ class RenderViewToIterableTests(BaseTest, unittest.TestCase): directlyProvides(context, IContext) response = DummyResponse() secpol = DummySecurityPolicy() - permissionfactory = make_permission_factory(True) view = make_view(response) self._registerView(view, 'registered', IContext, IRequest) self._registerSecurityPolicy(secpol) - self._registerPermission(permissionfactory, 'registered', IContext, - IRequest) + self._registerViewPermission('registered', True) environ = self._makeEnviron() from webob import Request request = Request(environ) @@ -194,12 +194,10 @@ class RenderViewToIterableTests(BaseTest, unittest.TestCase): directlyProvides(context, IContext) response = DummyResponse() secpol = DummySecurityPolicy() - permissionfactory = make_permission_factory(False) view = make_view(response) self._registerView(view, 'registered', IContext, IRequest) self._registerSecurityPolicy(secpol) - self._registerPermission(permissionfactory, 'registered', IContext, - IRequest) + self._registerViewPermission('registered', False) environ = self._makeEnviron() from webob import Request request = Request(environ) @@ -231,12 +229,10 @@ class RenderViewTests(unittest.TestCase, BaseTest): directlyProvides(context, IContext) response = DummyResponse() secpol = DummySecurityPolicy() - permissionfactory = make_permission_factory(False) view = make_view(response) self._registerView(view, 'registered', IContext, IRequest) self._registerSecurityPolicy(secpol) - self._registerPermission(permissionfactory, 'registered', IContext, - IRequest) + self._registerViewPermission('registered', False) environ = self._makeEnviron() from webob import Request request = Request(environ) @@ -255,12 +251,10 @@ class RenderViewTests(unittest.TestCase, BaseTest): directlyProvides(context, IContext) response = DummyResponse() secpol = DummySecurityPolicy() - permissionfactory = make_permission_factory(True) view = make_view(response) self._registerView(view, 'registered', IContext, IRequest) self._registerSecurityPolicy(secpol) - self._registerPermission(permissionfactory, 'registered', IContext, - IRequest) + self._registerViewPermission('registered', True) environ = self._makeEnviron() from webob import Request request = Request(environ) @@ -278,12 +272,10 @@ class RenderViewTests(unittest.TestCase, BaseTest): directlyProvides(context, IContext) response = DummyResponse() secpol = DummySecurityPolicy() - permissionfactory = make_permission_factory(False) view = make_view(response) self._registerView(view, 'registered', IContext, IRequest) self._registerSecurityPolicy(secpol) - self._registerPermission(permissionfactory, 'registered', IContext, - IRequest) + self._registerViewPermission('registered', False) environ = self._makeEnviron() from webob import Request request = Request(environ) @@ -314,67 +306,6 @@ class TestIsResponse(unittest.TestCase): response.status = None self.assertEqual(self._callFUT(response), False) -class TestViewExecutionPermitted(unittest.TestCase): - def setUp(self): - cleanUp() - - def tearDown(self): - cleanUp() - - def _callFUT(self, *arg, **kw): - from repoze.bfg.view import view_execution_permitted - return view_execution_permitted(*arg, **kw) - - def _registerSecurityPolicy(self, secpol): - import zope.component - gsm = zope.component.getGlobalSiteManager() - from repoze.bfg.interfaces import ISecurityPolicy - gsm.registerUtility(secpol, ISecurityPolicy) - - def _registerPermission(self, permission, name, *for_): - import zope.component - gsm = zope.component.getGlobalSiteManager() - from repoze.bfg.interfaces import IViewPermission - gsm.registerAdapter(permission, for_, IViewPermission, name) - - def test_no_secpol(self): - context = DummyContext() - request = DummyRequest() - result = self._callFUT(context, request, '') - msg = result.msg - self.failUnless("Allowed: view name '' in context" in msg) - self.failUnless('(no security policy in use)' in msg) - self.assertEqual(result, True) - - def test_secpol_no_permission(self): - secpol = DummySecurityPolicy() - self._registerSecurityPolicy(secpol) - context = DummyContext() - request = DummyRequest() - result = self._callFUT(context, request, '') - msg = result.msg - self.failUnless("Allowed: view name '' in context" in msg) - self.failUnless("(no permission registered for name '')" in msg) - self.assertEqual(result, True) - - def test_secpol_and_permission(self): - from zope.interface import Interface - from zope.interface import directlyProvides - from repoze.bfg.interfaces import IRequest - class IContext(Interface): - pass - context = DummyContext() - directlyProvides(context, IContext) - permissionfactory = make_permission_factory(True) - self._registerPermission(permissionfactory, '', IContext, - IRequest) - secpol = DummySecurityPolicy() - self._registerSecurityPolicy(secpol) - request = DummyRequest() - directlyProvides(request, IRequest) - result = self._callFUT(context, request, '') - self.failUnless(result is True) - class TestStaticView(unittest.TestCase, BaseTest): def setUp(self): cleanUp() @@ -527,18 +458,6 @@ def make_view(response): return response return view -def make_permission_factory(result): - class DummyPermissionFactory: - def __init__(self, context, request): - self.context = context - self.request = request - - def __call__(self, secpol): - self.__class__.checked_with = secpol - return result - - return DummyPermissionFactory - class DummyResponse: status = '200 OK' headerlist = () diff --git a/repoze/bfg/tests/test_wsgi.py b/repoze/bfg/tests/test_wsgi.py index b9568eb82..893364635 100644 --- a/repoze/bfg/tests/test_wsgi.py +++ b/repoze/bfg/tests/test_wsgi.py @@ -131,7 +131,7 @@ class TestNotFound(unittest.TestCase): ('Content-Type', 'text/html')]) def test_with_message(self): - environ = {'message':'<hi!>'} + environ = {'repoze.bfg.message':'<hi!>'} L = [] def start_response(status, headers): L.append((status, headers)) @@ -166,7 +166,7 @@ class TestUnauthorized(unittest.TestCase): ('Content-Type', 'text/html')]) def test_with_message(self): - environ = {'message':'<hi!>'} + environ = {'repoze.bfg.message':'<hi!>'} L = [] def start_response(status, headers): L.append((status, headers)) diff --git a/repoze/bfg/threadlocal.py b/repoze/bfg/threadlocal.py new file mode 100644 index 000000000..7df4f34f4 --- /dev/null +++ b/repoze/bfg/threadlocal.py @@ -0,0 +1,39 @@ +import threading +from zope.component import getGlobalSiteManager + +class ThreadLocalManager(threading.local): + def __init__(self, default): + self.stack = [] + self.default = default + + def push(self, info): + self.stack.append(info) + + set = push # b/c + + def pop(self): + if self.stack: + return self.stack.pop() + + def get(self): + try: + return self.stack[-1] + except IndexError: + return self.default() + + def clear(self): + self.stack[:] = [] + +def defaults(): + defaults = {'request':None} + gsm = getGlobalSiteManager() + defaults['registry'] = gsm + return defaults + +manager = ThreadLocalManager(defaults) + +def setManager(new_manager): # for unit tests + global manager + old_manager = manager + manager = new_manager + return old_manager diff --git a/repoze/bfg/view.py b/repoze/bfg/view.py index a867987a5..1698bc470 100644 --- a/repoze/bfg/view.py +++ b/repoze/bfg/view.py @@ -2,61 +2,25 @@ import inspect from paste.urlparser import StaticURLParser from zope.component import queryMultiAdapter -from zope.component import queryUtility +from zope.deprecation import deprecated -from repoze.bfg.interfaces import ISecurityPolicy -from repoze.bfg.interfaces import IViewPermission -from repoze.bfg.interfaces import IView +from zope.interface import Interface + +from repoze.bfg.interfaces import IRequest +from repoze.bfg.interfaces import IView from repoze.bfg.path import caller_path +from repoze.bfg.security import view_execution_permitted from repoze.bfg.security import Unauthorized -from repoze.bfg.security import Allowed -from zope.interface import Interface - -from repoze.bfg.interfaces import IRequest +deprecated('view_execution_permitted', + "('from repoze.bfg.view import view_execution_permitted' is now " + "deprecated; instead use 'from repoze.bfg.security import " + "view_execution_permitted')", + ) _marker = object() -def view_execution_permitted(context, request, name=''): - """ If the view specified by ``context`` and ``name`` is protected - by a permission, check the permission associated with the view - using the effective security policy and the ``request``. Return a - boolean result. If no security policy is in effect, or if the - view is not protected by a permission, return True.""" - security_policy = queryUtility(ISecurityPolicy) - permission = queryMultiAdapter((context, request), IViewPermission, - name=name) - return _view_execution_permitted(context, request, name, security_policy, - permission, True) - -def _view_execution_permitted(context, request, view_name, security_policy, - permission, debug_authorization): - """ Rawer (faster) form of view_execution_permitted which does not - need to do a CA lookup for the security policy or other values and - which returns plain booleans if debug_authorization is off instead - of constructing ``Allowed`` objects. This function is used by - ``view_execution_permitted`` and the Router; it is not a public - API.""" - if security_policy is None: - if debug_authorization: - return Allowed( - 'Allowed: view name %r in context %r (no security policy in ' - 'use)', view_name, context) - else: - return True - - elif permission is None: - if debug_authorization: - return Allowed( - 'Allowed: view name %r in context %r (no permission ' - 'registered for name %r).', view_name, context, view_name) - else: - return True - - else: - return permission(security_policy) - def render_view_to_response(context, request, name='', secure=True): """ Render the view named ``name`` against the specified ``context`` and ``request`` to an object implementing diff --git a/repoze/bfg/wsgi.py b/repoze/bfg/wsgi.py index abe7ebead..027345673 100644 --- a/repoze/bfg/wsgi.py +++ b/repoze/bfg/wsgi.py @@ -105,7 +105,7 @@ def wsgiapp2(wrapped): class HTTPException(object): def __call__(self, environ, start_response, exc_info=False): try: - msg = escape(environ['message']) + msg = escape(environ['repoze.bfg.message']) except KeyError: msg = '' html = """<body> diff --git a/repoze/bfg/zcml.py b/repoze/bfg/zcml.py index 737e2409b..39366b0b1 100644 --- a/repoze/bfg/zcml.py +++ b/repoze/bfg/zcml.py @@ -37,14 +37,15 @@ class Uncacheable(object): """ Include in discriminators of actions which are not cacheable; this class only exists for backwards compatibility (<0.8.1)""" -def view(_context, - permission=None, - for_=None, - view=None, - name="", - request_type=None, - cacheable=True, # not used, here for b/w compat < 0.8 - ): +def view( + _context, + permission=None, + for_=None, + view=None, + name="", + request_type=None, + cacheable=True, # not used, here for b/w compat < 0.8 + ): if not view: raise ConfigurationError('"view" attribute was not specified') @@ -12,7 +12,7 @@ # ############################################################################## -__version__ = '0.8.1' +__version__ = '0.9dev' import os |
