bpo-37096: Add large-file tests for modules using sendfile(2) (GH-13676)

This commit is contained in:
Giampaolo Rodola 2019-09-30 12:51:55 +08:00 committed by GitHub
parent 25e115ec00
commit 5bcc6d89bc
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -5,17 +5,19 @@ import os
import stat import stat
import sys import sys
import unittest import unittest
from test.support import TESTFN, requires, unlink, bigmemtest import socket
import shutil
import threading
from test.support import TESTFN, requires, unlink, bigmemtest, find_unused_port
import io # C implementation of io import io # C implementation of io
import _pyio as pyio # Python implementation of io import _pyio as pyio # Python implementation of io
# size of file to create (>2 GiB; 2 GiB == 2,147,483,648 bytes) # size of file to create (>2 GiB; 2 GiB == 2,147,483,648 bytes)
size = 2_500_000_000 size = 2_500_000_000
TESTFN2 = TESTFN + '2'
class LargeFileTest: class LargeFileTest:
"""Test that each file function works as expected for large
(i.e. > 2 GiB) files.
"""
def setUp(self): def setUp(self):
if os.path.exists(TESTFN): if os.path.exists(TESTFN):
@ -44,6 +46,13 @@ class LargeFileTest:
if not os.stat(TESTFN)[stat.ST_SIZE] == 0: if not os.stat(TESTFN)[stat.ST_SIZE] == 0:
raise cls.failureException('File was not truncated by opening ' raise cls.failureException('File was not truncated by opening '
'with mode "wb"') 'with mode "wb"')
unlink(TESTFN2)
class TestFileMethods(LargeFileTest):
"""Test that each file function works as expected for large
(i.e. > 2 GiB) files.
"""
# _pyio.FileIO.readall() uses a temporary bytearray then casted to bytes, # _pyio.FileIO.readall() uses a temporary bytearray then casted to bytes,
# so memuse=2 is needed # so memuse=2 is needed
@ -140,6 +149,72 @@ class LargeFileTest:
f.seek(pos) f.seek(pos)
self.assertTrue(f.seekable()) self.assertTrue(f.seekable())
class TestCopyfile(LargeFileTest, unittest.TestCase):
open = staticmethod(io.open)
def test_it(self):
# Internally shutil.copyfile() can use "fast copy" methods like
# os.sendfile().
size = os.path.getsize(TESTFN)
shutil.copyfile(TESTFN, TESTFN2)
self.assertEqual(os.path.getsize(TESTFN2), size)
with open(TESTFN2, 'rb') as f:
self.assertEqual(f.read(5), b'z\x00\x00\x00\x00')
f.seek(size - 5)
self.assertEqual(f.read(), b'\x00\x00\x00\x00a')
@unittest.skipIf(not hasattr(os, 'sendfile'), 'sendfile not supported')
class TestSocketSendfile(LargeFileTest, unittest.TestCase):
open = staticmethod(io.open)
timeout = 3
def setUp(self):
super().setUp()
self.thread = None
def tearDown(self):
super().tearDown()
if self.thread is not None:
self.thread.join(self.timeout)
self.thread = None
def tcp_server(self, sock):
def run(sock):
with sock:
conn, _ = sock.accept()
with conn, open(TESTFN2, 'wb') as f:
event.wait(self.timeout)
while True:
chunk = conn.recv(65536)
if not chunk:
return
f.write(chunk)
event = threading.Event()
sock.settimeout(self.timeout)
self.thread = threading.Thread(target=run, args=(sock, ))
self.thread.start()
event.set()
def test_it(self):
port = find_unused_port()
with socket.create_server(("", port)) as sock:
self.tcp_server(sock)
with socket.create_connection(("127.0.0.1", port)) as client:
with open(TESTFN, 'rb') as f:
client.sendfile(f)
self.tearDown()
size = os.path.getsize(TESTFN)
self.assertEqual(os.path.getsize(TESTFN2), size)
with open(TESTFN2, 'rb') as f:
self.assertEqual(f.read(5), b'z\x00\x00\x00\x00')
f.seek(size - 5)
self.assertEqual(f.read(), b'\x00\x00\x00\x00a')
def setUpModule(): def setUpModule():
try: try:
import signal import signal
@ -176,14 +251,18 @@ def setUpModule():
unlink(TESTFN) unlink(TESTFN)
class CLargeFileTest(LargeFileTest, unittest.TestCase): class CLargeFileTest(TestFileMethods, unittest.TestCase):
open = staticmethod(io.open) open = staticmethod(io.open)
class PyLargeFileTest(LargeFileTest, unittest.TestCase):
class PyLargeFileTest(TestFileMethods, unittest.TestCase):
open = staticmethod(pyio.open) open = staticmethod(pyio.open)
def tearDownModule(): def tearDownModule():
unlink(TESTFN) unlink(TESTFN)
unlink(TESTFN2)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()