diff --git a/Lib/cgi.py b/Lib/cgi.py index 6018c3608697af..4e227aa05bd554 100755 --- a/Lib/cgi.py +++ b/Lib/cgi.py @@ -115,7 +115,8 @@ def closelog(): # 0 ==> unlimited input maxlen = 0 -def parse(fp=None, environ=os.environ, keep_blank_values=0, strict_parsing=0): +def parse(fp=None, environ=os.environ, keep_blank_values=0, strict_parsing=0, + separators=('&', ';')): """Parse a query in the environment or from a file (default stdin) Arguments, all optional: @@ -134,6 +135,8 @@ def parse(fp=None, environ=os.environ, keep_blank_values=0, strict_parsing=0): strict_parsing: flag indicating what to do with parsing errors. If false (the default), errors are silently ignored. If true, errors raise a ValueError exception. + + separators: valid separators in the query string, e.g. ('&',) """ if fp is None: fp = sys.stdin @@ -154,7 +157,7 @@ def parse(fp=None, environ=os.environ, keep_blank_values=0, strict_parsing=0): if environ['REQUEST_METHOD'] == 'POST': ctype, pdict = parse_header(environ['CONTENT_TYPE']) if ctype == 'multipart/form-data': - return parse_multipart(fp, pdict) + return parse_multipart(fp, pdict, separators=separators) elif ctype == 'application/x-www-form-urlencoded': clength = int(environ['CONTENT_LENGTH']) if maxlen and clength > maxlen: @@ -178,10 +181,11 @@ def parse(fp=None, environ=os.environ, keep_blank_values=0, strict_parsing=0): qs = "" environ['QUERY_STRING'] = qs # XXX Shouldn't, really return urllib.parse.parse_qs(qs, keep_blank_values, strict_parsing, - encoding=encoding) + encoding=encoding, separators=separators) -def parse_multipart(fp, pdict, encoding="utf-8", errors="replace"): +def parse_multipart(fp, pdict, encoding="utf-8", errors="replace", + separators=('&', ';')): """Parse multipart input. Arguments: @@ -205,7 +209,7 @@ def parse_multipart(fp, pdict, encoding="utf-8", errors="replace"): except KeyError: pass fs = FieldStorage(fp, headers=headers, encoding=encoding, errors=errors, - environ={'REQUEST_METHOD': 'POST'}) + environ={'REQUEST_METHOD': 'POST'}, separators=separators) return {k: fs.getlist(k) for k in fs} def _parseparam(s): @@ -315,7 +319,7 @@ class FieldStorage: def __init__(self, fp=None, headers=None, outerboundary=b'', environ=os.environ, keep_blank_values=0, strict_parsing=0, limit=None, encoding='utf-8', errors='replace', - max_num_fields=None): + max_num_fields=None, separators=('&', ';')): """Constructor. Read multipart/* until last part. Arguments, all optional: @@ -358,11 +362,15 @@ def __init__(self, fp=None, headers=None, outerboundary=b'', max_num_fields: int. If set, then __init__ throws a ValueError if there are more than n fields read by parse_qsl(). + separators: valid separators in the query string, e.g. ('&',). + Passed directly to urllib.parse.parse_qsl internally. + """ method = 'GET' self.keep_blank_values = keep_blank_values self.strict_parsing = strict_parsing self.max_num_fields = max_num_fields + self.separators = separators if 'REQUEST_METHOD' in environ: method = environ['REQUEST_METHOD'].upper() self.qs_on_post = None @@ -472,7 +480,7 @@ def __init__(self, fp=None, headers=None, outerboundary=b'', if ctype == 'application/x-www-form-urlencoded': self.read_urlencoded() elif ctype[:10] == 'multipart/': - self.read_multi(environ, keep_blank_values, strict_parsing) + self.read_multi(environ, keep_blank_values, strict_parsing, separators) else: self.read_single() @@ -589,13 +597,15 @@ def read_urlencoded(self): query = urllib.parse.parse_qsl( qs, self.keep_blank_values, self.strict_parsing, encoding=self.encoding, errors=self.errors, - max_num_fields=self.max_num_fields) + max_num_fields=self.max_num_fields, + separators=self.separators) self.list = [MiniFieldStorage(key, value) for key, value in query] self.skip_lines() FieldStorageClass = None - def read_multi(self, environ, keep_blank_values, strict_parsing): + def read_multi(self, environ, keep_blank_values, strict_parsing, + separators=('&', ';')): """Internal: read a part that is itself multipart.""" ib = self.innerboundary if not valid_boundary(ib): @@ -605,7 +615,8 @@ def read_multi(self, environ, keep_blank_values, strict_parsing): query = urllib.parse.parse_qsl( self.qs_on_post, self.keep_blank_values, self.strict_parsing, encoding=self.encoding, errors=self.errors, - max_num_fields=self.max_num_fields) + max_num_fields=self.max_num_fields, + separators=self.separators) self.list.extend(MiniFieldStorage(key, value) for key, value in query) klass = self.FieldStorageClass or self.__class__ @@ -649,7 +660,8 @@ def read_multi(self, environ, keep_blank_values, strict_parsing): else self.limit - self.bytes_read part = klass(self.fp, headers, ib, environ, keep_blank_values, strict_parsing, limit, - self.encoding, self.errors, max_num_fields) + self.encoding, self.errors, max_num_fields, + separators) if max_num_fields is not None: max_num_fields -= 1 diff --git a/Lib/test/test_urlparse.py b/Lib/test/test_urlparse.py index 762500789f73ac..9c3ab771714a06 100644 --- a/Lib/test/test_urlparse.py +++ b/Lib/test/test_urlparse.py @@ -11,7 +11,33 @@ # Each parse_qsl testcase is a two-tuple that contains # a string with the query and a list with the expected result. -parse_qsl_test_cases = [ +parse_qsl_test_cases_allow_semicolon = [ + (";", []), + (";;", []), + (";a=b", [('a', 'b')]), + ("a=a+b;b=b+c", [('a', 'a b'), ('b', 'b c')]), + ("a=1;a=2", [('a', '1'), ('a', '2')]), + (b";", []), + (b";;", []), + (b";a=b", [(b'a', b'b')]), + (b"a=a+b;b=b+c", [(b'a', b'a b'), (b'b', b'b c')]), + (b"a=1;a=2", [(b'a', b'1'), (b'a', b'2')]), +] + +parse_qsl_test_cases_deny_semicolon = [ + (";", [(';', '')]), + (";;", [(';;', '')]), + (";a=b", [(';a', 'b')]), + ("a=a+b;b=b+c", [('a', 'a b;b=b c')]), + ("a=1;a=2", [('a', '1;a=2')]), + (b";", [(b';', b'')]), + (b";;", [(b';;', b'')]), + (b";a=b", [(b';a', b'b')]), + (b"a=a+b;b=b+c", [(b'a', b'a b;b=b c')]), + (b"a=1;a=2", [(b'a', b'1;a=2')]), +] + +parse_qsl_test_cases_ampersand = [ ("", []), ("&", []), ("&&", []), @@ -32,22 +58,43 @@ (b"&a=b", [(b'a', b'b')]), (b"a=a+b&b=b+c", [(b'a', b'a b'), (b'b', b'b c')]), (b"a=1&a=2", [(b'a', b'1'), (b'a', b'2')]), - (";", []), - (";;", []), - (";a=b", [('a', 'b')]), - ("a=a+b;b=b+c", [('a', 'a b'), ('b', 'b c')]), - ("a=1;a=2", [('a', '1'), ('a', '2')]), - (b";", []), - (b";;", []), - (b";a=b", [(b'a', b'b')]), - (b"a=a+b;b=b+c", [(b'a', b'a b'), (b'b', b'b c')]), - (b"a=1;a=2", [(b'a', b'1'), (b'a', b'2')]), ] +parse_qsl_test_cases = (parse_qsl_test_cases_ampersand + + parse_qsl_test_cases_allow_semicolon) + +parse_qsl_test_cases_semicolon = (parse_qsl_test_cases_ampersand + + parse_qsl_test_cases_deny_semicolon) + # Each parse_qs testcase is a two-tuple that contains # a string with the query and a dictionary with the expected result. +parse_qs_test_cases_allow_semicolon = [ + (";", {}), + (";;", {}), + (";a=b", {'a': ['b']}), + ("a=a+b;b=b+c", {'a': ['a b'], 'b': ['b c']}), + ("a=1;a=2", {'a': ['1', '2']}), + (b";", {}), + (b";;", {}), + (b";a=b", {b'a': [b'b']}), + (b"a=a+b;b=b+c", {b'a': [b'a b'], b'b': [b'b c']}), + (b"a=1;a=2", {b'a': [b'1', b'2']}), +] -parse_qs_test_cases = [ +parse_qs_test_cases_deny_semicolon = [ + (";", {';': ['']}), + (";;", {';;': ['']}), + (";a=b", {';a': ['b']}), + ("a=a+b;b=b+c", {'a': ['a b;b=b c']}), + ("a=1;a=2", {'a': ['1;a=2']}), + (b";", {b';': [b'']}), + (b";;", {b';;': [b'']}), + (b";a=b", {b';a': [b'b']}), + (b"a=a+b;b=b+c", {b'a': [b'a b;b=b c']}), + (b"a=1;a=2", {b'a': [b'1;a=2']}), +] + +parse_qs_test_cases_ampersand = [ ("", {}), ("&", {}), ("&&", {}), @@ -68,18 +115,15 @@ (b"&a=b", {b'a': [b'b']}), (b"a=a+b&b=b+c", {b'a': [b'a b'], b'b': [b'b c']}), (b"a=1&a=2", {b'a': [b'1', b'2']}), - (";", {}), - (";;", {}), - (";a=b", {'a': ['b']}), - ("a=a+b;b=b+c", {'a': ['a b'], 'b': ['b c']}), - ("a=1;a=2", {'a': ['1', '2']}), - (b";", {}), - (b";;", {}), - (b";a=b", {b'a': [b'b']}), - (b"a=a+b;b=b+c", {b'a': [b'a b'], b'b': [b'b c']}), - (b"a=1;a=2", {b'a': [b'1', b'2']}), ] +parse_qs_test_cases = (parse_qs_test_cases_ampersand + + parse_qs_test_cases_allow_semicolon) + +parse_qs_test_cases_semicolon = (parse_qs_test_cases_ampersand + + parse_qs_test_cases_deny_semicolon) + + class UrlParseTestCase(unittest.TestCase): def checkRoundtrips(self, url, parsed, split): @@ -135,23 +179,55 @@ def checkRoundtrips(self, url, parsed, split): def test_qsl(self): for orig, expect in parse_qsl_test_cases: - result = urllib.parse.parse_qsl(orig, keep_blank_values=True) + result = urllib.parse.parse_qsl(orig, keep_blank_values=True, + separators=('&', ';')) self.assertEqual(result, expect, "Error parsing %r" % orig) expect_without_blanks = [v for v in expect if len(v[1])] - result = urllib.parse.parse_qsl(orig, keep_blank_values=False) + result = urllib.parse.parse_qsl(orig, keep_blank_values=False, + separators=('&', ';')) self.assertEqual(result, expect_without_blanks, "Error parsing %r" % orig) def test_qs(self): for orig, expect in parse_qs_test_cases: - result = urllib.parse.parse_qs(orig, keep_blank_values=True) + result = urllib.parse.parse_qs(orig, keep_blank_values=True, + separators=('&', ';')) self.assertEqual(result, expect, "Error parsing %r" % orig) expect_without_blanks = {v: expect[v] for v in expect if len(expect[v][0])} - result = urllib.parse.parse_qs(orig, keep_blank_values=False) + result = urllib.parse.parse_qs(orig, keep_blank_values=False, + separators=('&', ';')) self.assertEqual(result, expect_without_blanks, "Error parsing %r" % orig) + def test_qsl_no_semicolon(self): + # See bpo-42967 for more information. + for orig, expect in parse_qsl_test_cases_semicolon: + with self.subTest(f"Original: {orig!r}, Expected: {expect!r}"): + result = urllib.parse.parse_qsl(orig, keep_blank_values=True, + separators=('&',)) + self.assertEqual(result, expect, + "Error parsing %r" % orig) + expect_without_blanks = [v for v in expect if len(v[1])] + result = urllib.parse.parse_qsl(orig, keep_blank_values=False, + separators=('&',)) + self.assertEqual(result, expect_without_blanks, + "Error parsing %r" % orig) + + def test_qs_no_semicolon(self): + # See bpo-42967 for more information. + for orig, expect in parse_qs_test_cases_semicolon: + with self.subTest(f"Original: {orig!r}, Expected: {expect!r}"): + result = urllib.parse.parse_qs(orig, keep_blank_values=True, + separators=('&',)) + self.assertEqual(result, expect, "Error parsing %r" % orig) + expect_without_blanks = {v: expect[v] + for v in expect if len(expect[v][0])} + result = urllib.parse.parse_qs(orig, keep_blank_values=False, + separators=('&',)) + self.assertEqual(result, expect_without_blanks, + "Error parsing %r" % orig) + def test_roundtrips(self): str_cases = [ ('file:///tmp/junk.txt', @@ -887,7 +963,8 @@ def test_parse_qsl_max_num_fields(self): with self.assertRaises(ValueError): urllib.parse.parse_qs('&'.join(['a=a']*11), max_num_fields=10) with self.assertRaises(ValueError): - urllib.parse.parse_qs(';'.join(['a=a']*11), max_num_fields=10) + urllib.parse.parse_qs(';'.join(['a=a']*11), max_num_fields=10, + separators=('&', ';')) urllib.parse.parse_qs('&'.join(['a=a']*10), max_num_fields=10) def test_urlencode_sequences(self): diff --git a/Lib/urllib/parse.py b/Lib/urllib/parse.py index ea897c3032257b..745312cbef1a33 100644 --- a/Lib/urllib/parse.py +++ b/Lib/urllib/parse.py @@ -662,7 +662,8 @@ def unquote(string, encoding='utf-8', errors='replace'): def parse_qs(qs, keep_blank_values=False, strict_parsing=False, - encoding='utf-8', errors='replace', max_num_fields=None): + encoding='utf-8', errors='replace', max_num_fields=None, + separators=('&', ';')): """Parse a query given as a string argument. Arguments: @@ -686,12 +687,15 @@ def parse_qs(qs, keep_blank_values=False, strict_parsing=False, max_num_fields: int. If set, then throws a ValueError if there are more than n fields read by parse_qsl(). + separators: valid separators in the query string, e.g. ('&',) + Returns a dictionary. """ parsed_result = {} pairs = parse_qsl(qs, keep_blank_values, strict_parsing, encoding=encoding, errors=errors, - max_num_fields=max_num_fields) + max_num_fields=max_num_fields, + separators=separators) for name, value in pairs: if name in parsed_result: parsed_result[name].append(value) @@ -701,7 +705,8 @@ def parse_qs(qs, keep_blank_values=False, strict_parsing=False, def parse_qsl(qs, keep_blank_values=False, strict_parsing=False, - encoding='utf-8', errors='replace', max_num_fields=None): + encoding='utf-8', errors='replace', max_num_fields=None, + separators=('&', ';')): """Parse a query given as a string argument. Arguments: @@ -724,6 +729,8 @@ def parse_qsl(qs, keep_blank_values=False, strict_parsing=False, max_num_fields: int. If set, then throws a ValueError if there are more than n fields read by parse_qsl(). + separators: valid separators in the query string, e.g. ('&',) + Returns a list, as G-d intended. """ qs, _coerce_result = _coerce_args(qs) @@ -732,11 +739,16 @@ def parse_qsl(qs, keep_blank_values=False, strict_parsing=False, # is less than max_num_fields. This prevents a memory exhaustion DOS # attack via post bodies with many fields. if max_num_fields is not None: - num_fields = 1 + qs.count('&') + qs.count(';') + num_fields = 1 + sum(qs.count(sep) for sep in separators) if max_num_fields < num_fields: raise ValueError('Max number of fields exceeded') - pairs = [s2 for s1 in qs.split('&') for s2 in s1.split(';')] + if not separators: + raise ValueError('Separators must be a tuple or list of strings') + for sep in separators[1:]: + qs = qs.replace(sep, separators[0]) + pairs = qs.split(separators[0]) + r = [] for name_value in pairs: if not name_value and not strict_parsing: diff --git a/Misc/ACKS b/Misc/ACKS index 136266965a869b..41d65a6f7712a9 100644 --- a/Misc/ACKS +++ b/Misc/ACKS @@ -613,6 +613,7 @@ Karan Goel Jeroen Van Goey Christoph Gohlke Tim Golden +Adam Goldschmidt Yonatan Goldschmidt Mark Gollahon Mikhail Golubev