Issue #22054: Add os.get_blocking() and os.set_blocking() functions to get and

set the blocking mode of a file descriptor (False if the O_NONBLOCK flag is
set, True otherwise). These functions are not available on Windows.
This commit is contained in:
Victor Stinner 2014-07-29 22:32:47 +02:00
parent 6aa4269ed2
commit 1db9e7bb19
13 changed files with 198 additions and 58 deletions

View file

@ -44,10 +44,6 @@ try:
import threading
except ImportError:
threading = None
try:
import fcntl
except ImportError:
fcntl = None
def _default_chunk_size():
"""Get the default TextIOWrapper chunk size"""
@ -3230,26 +3226,20 @@ class MiscIOTest(unittest.TestCase):
with self.open(support.TESTFN, **kwargs) as f:
self.assertRaises(TypeError, pickle.dumps, f, protocol)
@unittest.skipUnless(fcntl, 'fcntl required for this test')
def test_nonblock_pipe_write_bigbuf(self):
self._test_nonblock_pipe_write(16*1024)
@unittest.skipUnless(fcntl, 'fcntl required for this test')
def test_nonblock_pipe_write_smallbuf(self):
self._test_nonblock_pipe_write(1024)
def _set_non_blocking(self, fd):
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
self.assertNotEqual(flags, -1)
res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
self.assertEqual(res, 0)
@unittest.skipUnless(hasattr(os, 'set_blocking'),
'os.set_blocking() required for this test')
def _test_nonblock_pipe_write(self, bufsize):
sent = []
received = []
r, w = os.pipe()
self._set_non_blocking(r)
self._set_non_blocking(w)
os.set_blocking(r, False)
os.set_blocking(w, False)
# To exercise all code paths in the C implementation we need
# to play with buffer sizes. For instance, if we choose a