mirror of
https://github.com/python/cpython.git
synced 2025-09-27 02:39:58 +00:00
Issue #12138: fix use of transient_internet() in test_urllibnet
This commit is contained in:
commit
f738d33e5d
1 changed files with 62 additions and 77 deletions
|
@ -3,6 +3,7 @@
|
||||||
import unittest
|
import unittest
|
||||||
from test import support
|
from test import support
|
||||||
|
|
||||||
|
import contextlib
|
||||||
import socket
|
import socket
|
||||||
import urllib.request
|
import urllib.request
|
||||||
import sys
|
import sys
|
||||||
|
@ -27,6 +28,7 @@ class URLTimeoutTest(unittest.TestCase):
|
||||||
f = urllib.request.urlopen("http://www.python.org/")
|
f = urllib.request.urlopen("http://www.python.org/")
|
||||||
x = f.read()
|
x = f.read()
|
||||||
|
|
||||||
|
|
||||||
class urlopenNetworkTests(unittest.TestCase):
|
class urlopenNetworkTests(unittest.TestCase):
|
||||||
"""Tests urllib.reqest.urlopen using the network.
|
"""Tests urllib.reqest.urlopen using the network.
|
||||||
|
|
||||||
|
@ -42,43 +44,37 @@ class urlopenNetworkTests(unittest.TestCase):
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
@contextlib.contextmanager
|
||||||
def urlopen(self, *args, **kwargs):
|
def urlopen(self, *args, **kwargs):
|
||||||
resource = args[0]
|
resource = args[0]
|
||||||
cm = support.transient_internet(resource)
|
with support.transient_internet(resource):
|
||||||
cm.__enter__()
|
r = urllib.request.urlopen(*args, **kwargs)
|
||||||
self.addCleanup(cm.__exit__, None, None, None)
|
try:
|
||||||
return urllib.request.urlopen(*args, **kwargs)
|
yield r
|
||||||
|
finally:
|
||||||
|
r.close()
|
||||||
|
|
||||||
def test_basic(self):
|
def test_basic(self):
|
||||||
# Simple test expected to pass.
|
# Simple test expected to pass.
|
||||||
open_url = self.urlopen("http://www.python.org/")
|
with self.urlopen("http://www.python.org/") as open_url:
|
||||||
for attr in ("read", "readline", "readlines", "fileno", "close",
|
for attr in ("read", "readline", "readlines", "fileno", "close",
|
||||||
"info", "geturl"):
|
"info", "geturl"):
|
||||||
self.assertTrue(hasattr(open_url, attr), "object returned from "
|
self.assertTrue(hasattr(open_url, attr), "object returned from "
|
||||||
"urlopen lacks the %s attribute" % attr)
|
"urlopen lacks the %s attribute" % attr)
|
||||||
try:
|
|
||||||
self.assertTrue(open_url.read(), "calling 'read' failed")
|
self.assertTrue(open_url.read(), "calling 'read' failed")
|
||||||
finally:
|
|
||||||
open_url.close()
|
|
||||||
|
|
||||||
def test_readlines(self):
|
def test_readlines(self):
|
||||||
# Test both readline and readlines.
|
# Test both readline and readlines.
|
||||||
open_url = self.urlopen("http://www.python.org/")
|
with self.urlopen("http://www.python.org/") as open_url:
|
||||||
try:
|
|
||||||
self.assertIsInstance(open_url.readline(), bytes,
|
self.assertIsInstance(open_url.readline(), bytes,
|
||||||
"readline did not return a string")
|
"readline did not return a string")
|
||||||
self.assertIsInstance(open_url.readlines(), list,
|
self.assertIsInstance(open_url.readlines(), list,
|
||||||
"readlines did not return a list")
|
"readlines did not return a list")
|
||||||
finally:
|
|
||||||
open_url.close()
|
|
||||||
|
|
||||||
def test_info(self):
|
def test_info(self):
|
||||||
# Test 'info'.
|
# Test 'info'.
|
||||||
open_url = self.urlopen("http://www.python.org/")
|
with self.urlopen("http://www.python.org/") as open_url:
|
||||||
try:
|
|
||||||
info_obj = open_url.info()
|
info_obj = open_url.info()
|
||||||
finally:
|
|
||||||
open_url.close()
|
|
||||||
self.assertIsInstance(info_obj, email.message.Message,
|
self.assertIsInstance(info_obj, email.message.Message,
|
||||||
"object returned by 'info' is not an "
|
"object returned by 'info' is not an "
|
||||||
"instance of email.message.Message")
|
"instance of email.message.Message")
|
||||||
|
@ -87,22 +83,20 @@ class urlopenNetworkTests(unittest.TestCase):
|
||||||
def test_geturl(self):
|
def test_geturl(self):
|
||||||
# Make sure same URL as opened is returned by geturl.
|
# Make sure same URL as opened is returned by geturl.
|
||||||
URL = "http://www.python.org/"
|
URL = "http://www.python.org/"
|
||||||
open_url = self.urlopen(URL)
|
with self.urlopen(URL) as open_url:
|
||||||
try:
|
|
||||||
gotten_url = open_url.geturl()
|
gotten_url = open_url.geturl()
|
||||||
finally:
|
self.assertEqual(gotten_url, URL)
|
||||||
open_url.close()
|
|
||||||
self.assertEqual(gotten_url, URL)
|
|
||||||
|
|
||||||
def test_getcode(self):
|
def test_getcode(self):
|
||||||
# test getcode() with the fancy opener to get 404 error codes
|
# test getcode() with the fancy opener to get 404 error codes
|
||||||
URL = "http://www.python.org/XXXinvalidXXX"
|
URL = "http://www.python.org/XXXinvalidXXX"
|
||||||
open_url = urllib.request.FancyURLopener().open(URL)
|
with support.transient_internet(URL):
|
||||||
try:
|
open_url = urllib.request.FancyURLopener().open(URL)
|
||||||
code = open_url.getcode()
|
try:
|
||||||
finally:
|
code = open_url.getcode()
|
||||||
open_url.close()
|
finally:
|
||||||
self.assertEqual(code, 404)
|
open_url.close()
|
||||||
|
self.assertEqual(code, 404)
|
||||||
|
|
||||||
def test_fileno(self):
|
def test_fileno(self):
|
||||||
if sys.platform in ('win32',):
|
if sys.platform in ('win32',):
|
||||||
|
@ -110,14 +104,11 @@ class urlopenNetworkTests(unittest.TestCase):
|
||||||
# test can't pass on Windows.
|
# test can't pass on Windows.
|
||||||
return
|
return
|
||||||
# Make sure fd returned by fileno is valid.
|
# Make sure fd returned by fileno is valid.
|
||||||
open_url = self.urlopen("http://www.python.org/", timeout=None)
|
with self.urlopen("http://www.python.org/", timeout=None) as open_url:
|
||||||
fd = open_url.fileno()
|
fd = open_url.fileno()
|
||||||
FILE = os.fdopen(fd, encoding='utf-8')
|
with os.fdopen(fd, encoding='utf-8') as f:
|
||||||
try:
|
self.assertTrue(f.read(), "reading from file created using fd "
|
||||||
self.assertTrue(FILE.read(), "reading from file created using fd "
|
"returned by fileno failed")
|
||||||
"returned by fileno failed")
|
|
||||||
finally:
|
|
||||||
FILE.close()
|
|
||||||
|
|
||||||
def test_bad_address(self):
|
def test_bad_address(self):
|
||||||
# Make sure proper exception is raised when connecting to a bogus
|
# Make sure proper exception is raised when connecting to a bogus
|
||||||
|
@ -133,66 +124,60 @@ class urlopenNetworkTests(unittest.TestCase):
|
||||||
urllib.request.urlopen,
|
urllib.request.urlopen,
|
||||||
"http://sadflkjsasf.i.nvali.d/")
|
"http://sadflkjsasf.i.nvali.d/")
|
||||||
|
|
||||||
|
|
||||||
class urlretrieveNetworkTests(unittest.TestCase):
|
class urlretrieveNetworkTests(unittest.TestCase):
|
||||||
"""Tests urllib.request.urlretrieve using the network."""
|
"""Tests urllib.request.urlretrieve using the network."""
|
||||||
|
|
||||||
|
@contextlib.contextmanager
|
||||||
def urlretrieve(self, *args):
|
def urlretrieve(self, *args):
|
||||||
resource = args[0]
|
resource = args[0]
|
||||||
cm = support.transient_internet(resource)
|
with support.transient_internet(resource):
|
||||||
cm.__enter__()
|
file_location, info = urllib.request.urlretrieve(*args)
|
||||||
self.addCleanup(cm.__exit__, None, None, None)
|
try:
|
||||||
return urllib.request.urlretrieve(*args)
|
yield file_location, info
|
||||||
|
finally:
|
||||||
|
support.unlink(file_location)
|
||||||
|
|
||||||
def test_basic(self):
|
def test_basic(self):
|
||||||
# Test basic functionality.
|
# Test basic functionality.
|
||||||
file_location,info = self.urlretrieve("http://www.python.org/")
|
with self.urlretrieve("http://www.python.org/") as (file_location, info):
|
||||||
self.assertTrue(os.path.exists(file_location), "file location returned by"
|
self.assertTrue(os.path.exists(file_location), "file location returned by"
|
||||||
" urlretrieve is not a valid path")
|
" urlretrieve is not a valid path")
|
||||||
FILE = open(file_location, encoding='utf-8')
|
with open(file_location, encoding='utf-8') as f:
|
||||||
try:
|
self.assertTrue(f.read(), "reading from the file location returned"
|
||||||
self.assertTrue(FILE.read(), "reading from the file location returned"
|
" by urlretrieve failed")
|
||||||
" by urlretrieve failed")
|
|
||||||
finally:
|
|
||||||
FILE.close()
|
|
||||||
os.unlink(file_location)
|
|
||||||
|
|
||||||
def test_specified_path(self):
|
def test_specified_path(self):
|
||||||
# Make sure that specifying the location of the file to write to works.
|
# Make sure that specifying the location of the file to write to works.
|
||||||
file_location,info = self.urlretrieve("http://www.python.org/",
|
with self.urlretrieve("http://www.python.org/",
|
||||||
support.TESTFN)
|
support.TESTFN) as (file_location, info):
|
||||||
self.assertEqual(file_location, support.TESTFN)
|
self.assertEqual(file_location, support.TESTFN)
|
||||||
self.assertTrue(os.path.exists(file_location))
|
self.assertTrue(os.path.exists(file_location))
|
||||||
FILE = open(file_location, encoding='utf-8')
|
with open(file_location, encoding='utf-8') as f:
|
||||||
try:
|
self.assertTrue(f.read(), "reading from temporary file failed")
|
||||||
self.assertTrue(FILE.read(), "reading from temporary file failed")
|
|
||||||
finally:
|
|
||||||
FILE.close()
|
|
||||||
os.unlink(file_location)
|
|
||||||
|
|
||||||
def test_header(self):
|
def test_header(self):
|
||||||
# Make sure header returned as 2nd value from urlretrieve is good.
|
# Make sure header returned as 2nd value from urlretrieve is good.
|
||||||
file_location, header = self.urlretrieve("http://www.python.org/")
|
with self.urlretrieve("http://www.python.org/") as (file_location, info):
|
||||||
os.unlink(file_location)
|
self.assertIsInstance(info, email.message.Message,
|
||||||
self.assertIsInstance(header, email.message.Message,
|
"info is not an instance of email.message.Message")
|
||||||
"header is not an instance of email.message.Message")
|
|
||||||
|
|
||||||
def test_data_header(self):
|
def test_data_header(self):
|
||||||
logo = "http://www.python.org/community/logos/python-logo-master-v3-TM.png"
|
logo = "http://www.python.org/community/logos/python-logo-master-v3-TM.png"
|
||||||
file_location, fileheaders = self.urlretrieve(logo)
|
with self.urlretrieve(logo) as (file_location, fileheaders):
|
||||||
os.unlink(file_location)
|
datevalue = fileheaders.get('Date')
|
||||||
datevalue = fileheaders.get('Date')
|
dateformat = '%a, %d %b %Y %H:%M:%S GMT'
|
||||||
dateformat = '%a, %d %b %Y %H:%M:%S GMT'
|
try:
|
||||||
try:
|
time.strptime(datevalue, dateformat)
|
||||||
time.strptime(datevalue, dateformat)
|
except ValueError:
|
||||||
except ValueError:
|
self.fail('Date value not in %r format', dateformat)
|
||||||
self.fail('Date value not in %r format', dateformat)
|
|
||||||
|
|
||||||
|
|
||||||
def test_main():
|
def test_main():
|
||||||
support.requires('network')
|
support.requires('network')
|
||||||
support.run_unittest(URLTimeoutTest,
|
support.run_unittest(URLTimeoutTest,
|
||||||
urlopenNetworkTests,
|
urlopenNetworkTests,
|
||||||
urlretrieveNetworkTests)
|
urlretrieveNetworkTests)
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
test_main()
|
test_main()
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue