mirror of
https://github.com/python/cpython.git
synced 2025-08-01 07:33:08 +00:00
Don't invoke reload(sys) and use StringIO objects instead of real files to capture stdin and stdout when needed (ensures all sys attributes remain unmodified after test_xmlrpc runs)
This commit is contained in:
parent
7df72dcdf9
commit
8c1ffeb614
2 changed files with 36 additions and 45 deletions
|
@ -659,6 +659,9 @@ def captured_output(stream_name):
|
||||||
def captured_stdout():
|
def captured_stdout():
|
||||||
return captured_output("stdout")
|
return captured_output("stdout")
|
||||||
|
|
||||||
|
def captured_stdin():
|
||||||
|
return captured_output("stdin")
|
||||||
|
|
||||||
def gc_collect():
|
def gc_collect():
|
||||||
"""Force as many objects as possible to be collected.
|
"""Force as many objects as possible to be collected.
|
||||||
|
|
||||||
|
|
|
@ -160,17 +160,17 @@ class XMLRPCTestCase(unittest.TestCase):
|
||||||
"""
|
"""
|
||||||
|
|
||||||
# sys.setdefaultencoding() normally doesn't exist after site.py is
|
# sys.setdefaultencoding() normally doesn't exist after site.py is
|
||||||
# loaded. reload(sys) is the way to get it back.
|
# loaded. Import a temporary fresh copy to get access to it
|
||||||
|
# but then restore the original copy to avoid messing with
|
||||||
|
# other potentially modified sys module attributes
|
||||||
old_encoding = sys.getdefaultencoding()
|
old_encoding = sys.getdefaultencoding()
|
||||||
setdefaultencoding_existed = hasattr(sys, "setdefaultencoding")
|
with test_support.CleanImport('sys'):
|
||||||
reload(sys) # ugh!
|
import sys as temp_sys
|
||||||
sys.setdefaultencoding("iso-8859-1")
|
temp_sys.setdefaultencoding("iso-8859-1")
|
||||||
try:
|
try:
|
||||||
(s, d), m = xmlrpclib.loads(utf8)
|
(s, d), m = xmlrpclib.loads(utf8)
|
||||||
finally:
|
finally:
|
||||||
sys.setdefaultencoding(old_encoding)
|
temp_sys.setdefaultencoding(old_encoding)
|
||||||
if not setdefaultencoding_existed:
|
|
||||||
del sys.setdefaultencoding
|
|
||||||
|
|
||||||
items = d.items()
|
items = d.items()
|
||||||
if have_unicode:
|
if have_unicode:
|
||||||
|
@ -831,54 +831,45 @@ class CGIHandlerTestCase(unittest.TestCase):
|
||||||
env['REQUEST_METHOD'] = 'GET'
|
env['REQUEST_METHOD'] = 'GET'
|
||||||
# if the method is GET and no request_text is given, it runs handle_get
|
# if the method is GET and no request_text is given, it runs handle_get
|
||||||
# get sysout output
|
# get sysout output
|
||||||
tmp = sys.stdout
|
with test_support.captured_stdout() as data_out:
|
||||||
sys.stdout = open(test_support.TESTFN, "w")
|
self.cgi.handle_request()
|
||||||
self.cgi.handle_request()
|
|
||||||
sys.stdout.close()
|
|
||||||
sys.stdout = tmp
|
|
||||||
|
|
||||||
# parse Status header
|
# parse Status header
|
||||||
handle = open(test_support.TESTFN, "r").read()
|
data_out.seek(0)
|
||||||
|
handle = data_out.read()
|
||||||
status = handle.split()[1]
|
status = handle.split()[1]
|
||||||
message = ' '.join(handle.split()[2:4])
|
message = ' '.join(handle.split()[2:4])
|
||||||
|
|
||||||
self.assertEqual(status, '400')
|
self.assertEqual(status, '400')
|
||||||
self.assertEqual(message, 'Bad Request')
|
self.assertEqual(message, 'Bad Request')
|
||||||
|
|
||||||
os.remove(test_support.TESTFN)
|
|
||||||
|
|
||||||
def test_cgi_xmlrpc_response(self):
|
def test_cgi_xmlrpc_response(self):
|
||||||
data = """<?xml version='1.0'?>
|
data = """<?xml version='1.0'?>
|
||||||
<methodCall>
|
<methodCall>
|
||||||
<methodName>test_method</methodName>
|
<methodName>test_method</methodName>
|
||||||
<params>
|
<params>
|
||||||
<param>
|
<param>
|
||||||
<value><string>foo</string></value>
|
<value><string>foo</string></value>
|
||||||
</param>
|
</param>
|
||||||
<param>
|
<param>
|
||||||
<value><string>bar</string></value>
|
<value><string>bar</string></value>
|
||||||
</param>
|
</param>
|
||||||
</params>
|
</params>
|
||||||
</methodCall>
|
</methodCall>
|
||||||
"""
|
"""
|
||||||
open("xmldata.txt", "w").write(data)
|
|
||||||
tmp1 = sys.stdin
|
|
||||||
tmp2 = sys.stdout
|
|
||||||
|
|
||||||
sys.stdin = open("xmldata.txt", "r")
|
with test_support.EnvironmentVarGuard() as env, \
|
||||||
sys.stdout = open(test_support.TESTFN, "w")
|
test_support.captured_stdout() as data_out, \
|
||||||
|
test_support.captured_stdin() as data_in:
|
||||||
with test_support.EnvironmentVarGuard() as env:
|
data_in.write(data)
|
||||||
|
data_in.seek(0)
|
||||||
env['CONTENT_LENGTH'] = str(len(data))
|
env['CONTENT_LENGTH'] = str(len(data))
|
||||||
self.cgi.handle_request()
|
self.cgi.handle_request()
|
||||||
|
data_out.seek(0)
|
||||||
sys.stdin.close()
|
|
||||||
sys.stdout.close()
|
|
||||||
sys.stdin = tmp1
|
|
||||||
sys.stdout = tmp2
|
|
||||||
|
|
||||||
# will respond exception, if so, our goal is achieved ;)
|
# will respond exception, if so, our goal is achieved ;)
|
||||||
handle = open(test_support.TESTFN, "r").read()
|
handle = data_out.read()
|
||||||
|
|
||||||
# start with 44th char so as not to get http header, we just need only xml
|
# start with 44th char so as not to get http header, we just need only xml
|
||||||
self.assertRaises(xmlrpclib.Fault, xmlrpclib.loads, handle[44:])
|
self.assertRaises(xmlrpclib.Fault, xmlrpclib.loads, handle[44:])
|
||||||
|
@ -895,9 +886,6 @@ class CGIHandlerTestCase(unittest.TestCase):
|
||||||
len(content))
|
len(content))
|
||||||
|
|
||||||
|
|
||||||
os.remove("xmldata.txt")
|
|
||||||
os.remove(test_support.TESTFN)
|
|
||||||
|
|
||||||
class FakeSocket:
|
class FakeSocket:
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue