summaryrefslogtreecommitdiff
path: root/repoze/bfg/traversal.py
blob: 3c6756de192a322b4ab2fffa99802f88f74bd9e3 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
import re
import urllib

from zope.component import queryUtility
   
from zope.interface import classProvides
from zope.interface import implements
from repoze.bfg.location import LocationProxy
from repoze.bfg.location import lineage

from repoze.bfg.interfaces import ILocation
from repoze.bfg.interfaces import ITraverser
from repoze.bfg.interfaces import ITraverserFactory
from repoze.bfg.interfaces import ISettings

def split_path(path):
    while path.startswith('/'):
        path = path[1:]
    while path.endswith('/'):
        path = path[:-1]
    clean = []
    for segment in path.split('/'):
        segment = urllib.unquote(segment) # deal with spaces in path segment
        if not segment or segment=='.':
            continue
        elif segment == '..':
            del clean[-1]
        else:
            clean.append(segment)
    return clean

def step(ob, name, default, as_unicode=True):
    if as_unicode:
        try:
            name = name.decode('utf-8')
        except UnicodeDecodeError:
            raise TypeError('Could not decode path segment "%s" using the '
                            'UTF-8 decoding scheme' % name)
    if name.startswith('@@'):
        return name[2:], default
    if not hasattr(ob, '__getitem__'):
        return name, default
    try:
        return name, ob[name]
    except KeyError:
        return name, default

_marker = []

class ModelGraphTraverser(object):
    classProvides(ITraverserFactory)
    implements(ITraverser)
    def __init__(self, root):
        self.root = root
        self.locatable = ILocation.providedBy(root)
        self.unicode_path_segments = True
        settings = queryUtility(ISettings)
        if settings is not None:
            self.unicode_path_segments = settings.unicode_path_segments

    def __call__(self, environ):
        unicode_path_segments = self.unicode_path_segments
        path = environ.get('PATH_INFO', '/')
        path = split_path(path)
        ob = self.root

        name = ''

        while path:
            segment = path.pop(0)
            segment, next = step(ob, segment, _marker, unicode_path_segments)
            if next is _marker:
                name = segment
                break
            if (self.locatable) and (not ILocation.providedBy(next)):
                next = LocationProxy(next, ob, segment)
            ob = next

        return ob, name, path

def find_root(model):
    """ Find the root node in the graph to which ``model``
    belongs. Note that ``model`` should be :term:`location`-aware.
    Note that the root node is available in the request object by
    accessing the ``request.root`` attribute.
    """
    for location in lineage(model):
        if location.__parent__ is None:
            model = location
            break
    return model

def find_model(model, path):
    """ Given a model object and a string representing a path
    reference (a set of names delimited by forward-slashes, such as
    the return value of ``model_path``), return an context in this
    application's model graph at the specified path.  If the path
    starts with a slash, the path is considered absolute and the graph
    traversal will start at the root object.  If the path does not
    start with a slash, the path is considered relative and graph
    traversal will begin at the model object supplied to the function.
    In either case, if the path cannot be resolved, a KeyError will be
    thrown. Note that the model passed in should be
    :term:`location`-aware."""

    if path.startswith('/'):
        model = find_root(model)
        
    ob, name, path = ITraverserFactory(model)({'PATH_INFO':path})
    if name:
        raise KeyError('%r has no subelement %s' % (ob, name))
    return ob

def find_interface(model, interface):
    """ Return the first object found which provides the interface
    ``interface`` in the parent chain of ``model`` or ``None`` if no
    object providing ``interface`` can be found in the parent chain.
    The ``model`` passed in should be :term:`location`-aware."""
    for location in lineage(model):
        if interface.providedBy(location):
            return location

_segment_cache = {}

def _urlsegment(s):
    """ The bit of this code that deals with ``_segment_cache`` is an
    optimization: we cache all the computation of URL path segments in
    this module-scope dictionary with the original string (or unicode
    value) as the key, so we can look it up later without needing to
    reencode or re-url-quote it """
    result = _segment_cache.get(s)
    if result is None:
        if s.__class__ is unicode: # isinstance slighly slower (~15%)
            result = _url_quote(s.encode('utf-8'))
        else:
            result = _url_quote(s)
        # we don't need a lock to mutate _segment_cache, as the below
        # will generate exactly one Python bytecode (STORE_SUBSCR)
        _segment_cache[s] = result
    return result

def model_url(model, request, *elements):
    """ Return a string representing the absolute URL of the model
    object based on the ``wsgi.url_scheme``, ``HTTP_HOST`` or
    ``SERVER_NAME`` in the request, plus any ``SCRIPT_NAME``.  If any
    model in the lineage has a unicode name, it will be converted to
    UTF-8 before being attached to the URL.  Empty names in the model
    graph are ignored.

    Any positional arguments passed in as ``elements`` must be strings
    or unicode objects.  These will be joined by slashes and appended
    to the model URL.  Each of the elements passed in is URL-quoted
    before being appended; if any element is unicode, it will
    converted to a UTF-8 bytestring before being URL-quoted.

    If ``elements`` are not used, the model URL will end with a
    trailing slash.  If ``elements`` are used, the generated URL will
    *not* end in trailing a slash.

    The ``model`` passed in must be :term:`location`-aware.  The
    overall result of this function is always a string (never
    unicode). """
    rpath = []
    for location in lineage(model):
        name = location.__name__
        if name:
            rpath.append(_urlsegment(name))
    prefix = '/'.join(reversed(rpath))
    suffix = '/'.join([_urlsegment(s) for s in elements])
    path = '/'.join([prefix, suffix])
    if not path.startswith('/'):
        path = '/' + path
    app_url = request.application_url # never ends in a slash
    return app_url + path

def model_path(model, *elements):
    """ Return a string representing the absolute path of the model
    object based on its position in the model graph, e.g
    ``/foo/bar``. Any positional arguments passed in as ``elements``
    will be joined by slashes and appended to the generated path.  The
    ``model`` passed in must be :term:`location`-aware.

    Note that this function is not equivalent to ``model_url``:
    individual segments in the path are not quoted in any way.  Thus,
    to ensure that this function generates paths that can later be
    resolved to models via ``find_model``, you should ensure that your
    model names do not contain any slashes.

    Note also that the path returned may be a Unicode string if any
    model name in the model graph is Unicode; otherwise it will be a
    byte-string."""
    rpath = []
    for location in lineage(model):
        if location.__name__:
            rpath.append(location.__name__)
    path = '/' + '/'.join(reversed(rpath))
    if elements: # never have a trailing slash
        suffix = '/'.join(elements)
        path = '/'.join([path, suffix])
    return path

always_safe = ('ABCDEFGHIJKLMNOPQRSTUVWXYZ'
               'abcdefghijklmnopqrstuvwxyz'
               '0123456789' '_.-')
_safemaps = {}
_must_quote = {}

def _url_quote(s, safe = '/'):
    """quote('abc def') -> 'abc%20def'

    Faster version of Python stdlib urllib.quote.  See
    http://bugs.python.org/issue1285086 for more information.

    Each part of a URL, e.g. the path info, the query, etc., has a
    different set of reserved characters that must be quoted.

    RFC 2396 Uniform Resource Identifiers (URI): Generic Syntax lists
    the following reserved characters.

    reserved    = ";" | "/" | "?" | ":" | "@" | "&" | "=" | "+" |
                  "$" | ","

    Each of these characters is reserved in some component of a URL,
    but not necessarily in all of them.

    By default, the quote function is intended for quoting the path
    section of a URL.  Thus, it will not encode '/'.  This character
    is reserved, but in typical usage the quote function is being
    called on a path where the existing slash characters are used as
    reserved characters.
    """
    cachekey = (safe, always_safe)
    try:
        safe_map = _safemaps[cachekey]
        if not _must_quote[cachekey].search(s):
            return s
    except KeyError:
        safe += always_safe
        _must_quote[cachekey] = re.compile(r'[^%s]' % safe)
        safe_map = {}
        for i in range(256):
            c = chr(i)
            safe_map[c] = (c in safe) and c or ('%%%02X' % i)
        _safemaps[cachekey] = safe_map
    res = map(safe_map.__getitem__, s)
    return ''.join(res)