From af3134a984a7e9c53d41607dcf4f1feb60282f85 Mon Sep 17 00:00:00 2001 From: Michael Merickel Date: Tue, 12 Nov 2013 00:56:57 -0600 Subject: centralize and properly escape query string and anchor arguments --- pyramid/config/views.py | 16 +++---- pyramid/encode.py | 4 +- pyramid/tests/test_config/test_views.py | 2 +- pyramid/tests/test_url.py | 10 ++--- pyramid/url.py | 77 +++++++++++++++++++-------------- 5 files changed, 58 insertions(+), 51 deletions(-) diff --git a/pyramid/config/views.py b/pyramid/config/views.py index 5ad235795..0165f96f1 100644 --- a/pyramid/config/views.py +++ b/pyramid/config/views.py @@ -70,6 +70,8 @@ from pyramid.security import NO_PERMISSION_REQUIRED from pyramid.static import static_view from pyramid.threadlocal import get_current_registry +from pyramid.url import parse_url_overrides + from pyramid.view import ( render_view_to_response, AppendSlashNotFoundViewFactory, @@ -1900,23 +1902,15 @@ class StaticURLInfo(object): kw['subpath'] = subpath return request.route_url(route_name, **kw) else: + app_url, scheme, host, port, qs, anchor = \ + parse_url_overrides(kw) parsed = url_parse(url) if not parsed.scheme: url = urlparse.urlunparse(parsed._replace( scheme=request.environ['wsgi.url_scheme'])) subpath = url_quote(subpath) result = urljoin(url, subpath) - if '_query' in kw: - query = kw.pop('_query') - if isinstance(query, string_types): - result += '?' + quote_plus(query) - elif query: - result += '?' + urlencode(query, doseq=True) - if '_anchor' in kw: - anchor = kw.pop('_anchor') - anchor = quote_plus(anchor) - result += '#' + anchor - return result + return result + qs + anchor raise ValueError('No static URL definition matching %s' % path) diff --git a/pyramid/encode.py b/pyramid/encode.py index d2376109e..15da1c511 100644 --- a/pyramid/encode.py +++ b/pyramid/encode.py @@ -65,11 +65,11 @@ def urlencode(query, doseq=True): return result # bw compat api (dnr) -def quote_plus(val): +def quote_plus(val, safe=''): cls = val.__class__ if cls is text_type: val = val.encode('utf-8') elif cls is not binary_type: val = str(val).encode('utf-8') - return _quote_plus(val) + return _quote_plus(val, safe=safe) diff --git a/pyramid/tests/test_config/test_views.py b/pyramid/tests/test_config/test_views.py index 832921713..c722c9166 100644 --- a/pyramid/tests/test_config/test_views.py +++ b/pyramid/tests/test_config/test_views.py @@ -3828,7 +3828,7 @@ class TestStaticURLInfo(unittest.TestCase): result = inst.generate('package:path/abc def', request, a=1, _query='(openlayers)') self.assertEqual(result, - 'http://example.com/abc%20def?%28openlayers%29') + 'http://example.com/abc%20def?(openlayers)') def test_generate_url_with_custom_anchor(self): inst = self._makeOne() diff --git a/pyramid/tests/test_url.py b/pyramid/tests/test_url.py index cbbb933a7..6a8624a9f 100644 --- a/pyramid/tests/test_url.py +++ b/pyramid/tests/test_url.py @@ -99,7 +99,7 @@ class TestURLMethodsMixin(unittest.TestCase): context = DummyContext() result = request.resource_url(context, 'a', query='(openlayers)') self.assertEqual(result, - 'http://example.com:5432/context/a?%28openlayers%29') + 'http://example.com:5432/context/a?(openlayers)') def test_resource_url_with_query_dict(self): request = self._makeOne() @@ -162,13 +162,13 @@ class TestURLMethodsMixin(unittest.TestCase): self.assertEqual(result, 'http://example.com:5432/context/#La+Pe%C3%B1a') - def test_resource_url_anchor_is_urlencoded(self): + def test_resource_url_anchor_is_urlencoded_safe(self): request = self._makeOne() self._registerResourceURL(request.registry) context = DummyContext() - result = request.resource_url(context, anchor=' /#') + result = request.resource_url(context, anchor=' /#?&+') self.assertEqual(result, - 'http://example.com:5432/context/#+%2F%23') + 'http://example.com:5432/context/#+/%23?&+') def test_resource_url_no_IResourceURL_registered(self): # falls back to ResourceURL @@ -481,7 +481,7 @@ class TestURLMethodsMixin(unittest.TestCase): request.registry.registerUtility(mapper, IRoutesMapper) result = request.route_url('flub', _query='(openlayers)') self.assertEqual(result, - 'http://example.com:5432/1/2/3?%28openlayers%29') + 'http://example.com:5432/1/2/3?(openlayers)') def test_route_url_with_empty_query(self): from pyramid.interfaces import IRoutesMapper diff --git a/pyramid/url.py b/pyramid/url.py index 4384e9cd6..e760bb356 100644 --- a/pyramid/url.py +++ b/pyramid/url.py @@ -29,6 +29,48 @@ from pyramid.traversal import ( ) PATH_SAFE = '/:@&+$,' # from webob +QUERY_SAFE = '/?:@!$&\'()*+,;=' # RFC 3986 +ANCHOR_SAFE = QUERY_SAFE + +def parse_url_overrides(kw): + """Parse special arguments passed when generating urls. + + The supplied dictionary is mutated, popping arguments as necessary. + Returns a 6-tuple of the format ``(app_url, scheme, host, port, + qs, anchor)``. + """ + anchor = '' + qs = '' + app_url = None + host = None + scheme = None + port = None + + if '_query' in kw: + query = kw.pop('_query') + if isinstance(query, string_types): + qs = '?' + quote_plus(query, safe=QUERY_SAFE) + elif query: + qs = '?' + urlencode(query, doseq=True) + + if '_anchor' in kw: + anchor = kw.pop('_anchor') + anchor = quote_plus(anchor, safe=ANCHOR_SAFE) + anchor = '#' + anchor + + if '_app_url' in kw: + app_url = kw.pop('_app_url') + + if '_host' in kw: + host = kw.pop('_host') + + if '_scheme' in kw: + scheme = kw.pop('_scheme') + + if '_port' in kw: + port = kw.pop('_port') + + return app_url, scheme, host, port, qs, anchor class URLMethodsMixin(object): """ Request methods mixin for BaseRequest having to do with URL @@ -215,36 +257,7 @@ class URLMethodsMixin(object): if route.pregenerator is not None: elements, kw = route.pregenerator(self, elements, kw) - anchor = '' - qs = '' - app_url = None - host = None - scheme = None - port = None - - if '_query' in kw: - query = kw.pop('_query') - if isinstance(query, string_types): - qs = '?' + quote_plus(query) - elif query: - qs = '?' + urlencode(query, doseq=True) - - if '_anchor' in kw: - anchor = kw.pop('_anchor') - anchor = quote_plus(anchor) - anchor = '#' + anchor - - if '_app_url' in kw: - app_url = kw.pop('_app_url') - - if '_host' in kw: - host = kw.pop('_host') - - if '_scheme' in kw: - scheme = kw.pop('_scheme') - - if '_port' in kw: - port = kw.pop('_port') + app_url, scheme, host, port, qs, anchor = parse_url_overrides(kw) if app_url is None: if (scheme is not None or host is not None or port is not None): @@ -585,13 +598,13 @@ class URLMethodsMixin(object): if 'query' in kw: query = kw['query'] if isinstance(query, string_types): - qs = '?' + quote_plus(query) + qs = '?' + quote_plus(query, safe=QUERY_SAFE) elif query: qs = '?' + urlencode(query, doseq=True) if 'anchor' in kw: anchor = kw['anchor'] - anchor = quote_plus(anchor) + anchor = quote_plus(anchor, safe=ANCHOR_SAFE) anchor = '#' + anchor if elements: -- cgit v1.2.3