Issue #3782: os.write() must not accept unicode strings

This commit is contained in:
Antoine Pitrou 2008-09-15 23:02:56 +00:00
parent 4e80cdd739
commit 9cadb1b6e0
9 changed files with 50 additions and 24 deletions

View file

@ -129,7 +129,7 @@ def fork():
def _writen(fd, data): def _writen(fd, data):
"""Write all the data to a descriptor.""" """Write all the data to a descriptor."""
while data != '': while data:
n = os.write(fd, data) n = os.write(fd, data)
data = data[n:] data = data[n:]

View file

@ -381,8 +381,8 @@ class DispatcherWithSendTests_UsePoll(DispatcherWithSendTests):
if hasattr(asyncore, 'file_wrapper'): if hasattr(asyncore, 'file_wrapper'):
class FileWrapperTest(unittest.TestCase): class FileWrapperTest(unittest.TestCase):
def setUp(self): def setUp(self):
self.d = "It's not dead, it's sleeping!" self.d = b"It's not dead, it's sleeping!"
open(TESTFN, 'w').write(self.d) open(TESTFN, 'wb').write(self.d)
def tearDown(self): def tearDown(self):
unlink(TESTFN) unlink(TESTFN)
@ -400,8 +400,8 @@ if hasattr(asyncore, 'file_wrapper'):
self.assertRaises(OSError, w.read, 1) self.assertRaises(OSError, w.read, 1)
def test_send(self): def test_send(self):
d1 = "Come again?" d1 = b"Come again?"
d2 = "I want to buy some cheese." d2 = b"I want to buy some cheese."
fd = os.open(TESTFN, os.O_WRONLY | os.O_APPEND) fd = os.open(TESTFN, os.O_WRONLY | os.O_APPEND)
w = asyncore.file_wrapper(fd) w = asyncore.file_wrapper(fd)
os.close(fd) os.close(fd)
@ -409,7 +409,7 @@ if hasattr(asyncore, 'file_wrapper'):
w.write(d1) w.write(d1)
w.send(d2) w.send(d2)
w.close() w.close()
self.assertEqual(open(TESTFN).read(), self.d + d1 + d2) self.assertEqual(open(TESTFN, 'rb').read(), self.d + d1 + d2)
def test_main(): def test_main():

View file

@ -41,7 +41,7 @@ class FileTests(unittest.TestCase):
os.close(second) os.close(second)
# close a fd that is open, and one that isn't # close a fd that is open, and one that isn't
os.closerange(first, first + 2) os.closerange(first, first + 2)
self.assertRaises(OSError, os.write, first, "a") self.assertRaises(OSError, os.write, first, b"a")
def test_rename(self): def test_rename(self):
path = support.TESTFN path = support.TESTFN
@ -50,6 +50,28 @@ class FileTests(unittest.TestCase):
new = sys.getrefcount(path) new = sys.getrefcount(path)
self.assertEqual(old, new) self.assertEqual(old, new)
def test_read(self):
with open(support.TESTFN, "w+b") as fobj:
fobj.write(b"spam")
fobj.flush()
fd = fobj.fileno()
os.lseek(fd, 0, 0)
s = os.read(fd, 4)
self.assertEqual(type(s), bytes)
self.assertEqual(s, b"spam")
def test_write(self):
# os.write() accepts bytes- and buffer-like objects but not strings
fd = os.open(support.TESTFN, os.O_CREAT | os.O_WRONLY)
self.assertRaises(TypeError, os.write, fd, "beans")
os.write(fd, b"bacon\n")
os.write(fd, bytearray(b"eggs\n"))
os.write(fd, memoryview(b"spam\n"))
os.close(fd)
with open(support.TESTFN, "rb") as fobj:
self.assertEqual(fobj.read(), b"bacon\neggs\nspam\n")
class TemporaryFileTests(unittest.TestCase): class TemporaryFileTests(unittest.TestCase):
def setUp(self): def setUp(self):
self.files = [] self.files = []

View file

@ -7,8 +7,8 @@ import signal
from test.support import verbose, TestSkipped, run_unittest from test.support import verbose, TestSkipped, run_unittest
import unittest import unittest
TEST_STRING_1 = "I wish to buy a fish license.\n" TEST_STRING_1 = b"I wish to buy a fish license.\n"
TEST_STRING_2 = "For my pet fish, Eric.\n" TEST_STRING_2 = b"For my pet fish, Eric.\n"
if verbose: if verbose:
def debug(msg): def debug(msg):

View file

@ -128,7 +128,7 @@ class ProcessTestCase(unittest.TestCase):
# stdin is set to open file descriptor # stdin is set to open file descriptor
tf = tempfile.TemporaryFile() tf = tempfile.TemporaryFile()
d = tf.fileno() d = tf.fileno()
os.write(d, "pear") os.write(d, b"pear")
os.lseek(d, 0, 0) os.lseek(d, 0, 0)
p = subprocess.Popen([sys.executable, "-c", p = subprocess.Popen([sys.executable, "-c",
'import sys; sys.exit(sys.stdin.read() == "pear")'], 'import sys; sys.exit(sys.stdin.read() == "pear")'],
@ -237,7 +237,7 @@ class ProcessTestCase(unittest.TestCase):
def test_stdout_filedes_of_stdout(self): def test_stdout_filedes_of_stdout(self):
# stdout is set to 1 (#1531862). # stdout is set to 1 (#1531862).
cmd = r"import sys, os; sys.exit(os.write(sys.stdout.fileno(), '.\n'))" cmd = r"import sys, os; sys.exit(os.write(sys.stdout.fileno(), b'.\n'))"
rc = subprocess.call([sys.executable, "-c", cmd], stdout=1) rc = subprocess.call([sys.executable, "-c", cmd], stdout=1)
self.assertEquals(rc, 2) self.assertEquals(rc, 2)
@ -548,11 +548,12 @@ class ProcessTestCase(unittest.TestCase):
def test_args_string(self): def test_args_string(self):
# args is a string # args is a string
f, fname = self.mkstemp() fd, fname = self.mkstemp()
os.write(f, "#!/bin/sh\n") # reopen in text mode
os.write(f, "exec '%s' -c 'import sys; sys.exit(47)'\n" % with open(fd, "w") as fobj:
sys.executable) fobj.write("#!/bin/sh\n")
os.close(f) fobj.write("exec '%s' -c 'import sys; sys.exit(47)'\n" %
sys.executable)
os.chmod(fname, 0o700) os.chmod(fname, 0o700)
p = subprocess.Popen(fname) p = subprocess.Popen(fname)
p.wait() p.wait()
@ -590,11 +591,12 @@ class ProcessTestCase(unittest.TestCase):
def test_call_string(self): def test_call_string(self):
# call() function with string argument on UNIX # call() function with string argument on UNIX
f, fname = self.mkstemp() fd, fname = self.mkstemp()
os.write(f, "#!/bin/sh\n") # reopen in text mode
os.write(f, "exec '%s' -c 'import sys; sys.exit(47)'\n" % with open(fd, "w") as fobj:
sys.executable) fobj.write("#!/bin/sh\n")
os.close(f) fobj.write("exec '%s' -c 'import sys; sys.exit(47)'\n" %
sys.executable)
os.chmod(fname, 0o700) os.chmod(fname, 0o700)
rc = subprocess.call(fname) rc = subprocess.call(fname)
os.remove(fname) os.remove(fname)

View file

@ -252,7 +252,7 @@ class test__mkstemp_inner(TC):
# _mkstemp_inner can create files in a user-selected directory # _mkstemp_inner can create files in a user-selected directory
dir = tempfile.mkdtemp() dir = tempfile.mkdtemp()
try: try:
self.do_create(dir=dir).write("blat") self.do_create(dir=dir).write(b"blat")
finally: finally:
os.rmdir(dir) os.rmdir(dir)

View file

@ -10,7 +10,7 @@ try:
fd = int(sys.argv[2]) fd = int(sys.argv[2])
try: try:
os.write(fd, "blat") os.write(fd, b"blat")
except os.error: except os.error:
# Success -- could not write to fd. # Success -- could not write to fd.
sys.exit(0) sys.exit(0)

View file

@ -140,6 +140,8 @@ Library
Extension Modules Extension Modules
----------------- -----------------
- Issue #3782: os.write() must not accept unicode strings.
- Issue #2975: When compiling several extension modules with Visual Studio 2008 - Issue #2975: When compiling several extension modules with Visual Studio 2008
from the same python interpreter, some environment variables would grow from the same python interpreter, some environment variables would grow
without limit. without limit.

View file

@ -4896,7 +4896,7 @@ posix_write(PyObject *self, PyObject *args)
int fd; int fd;
Py_ssize_t size; Py_ssize_t size;
if (!PyArg_ParseTuple(args, "is*:write", &fd, &pbuf)) if (!PyArg_ParseTuple(args, "iy*:write", &fd, &pbuf))
return NULL; return NULL;
Py_BEGIN_ALLOW_THREADS Py_BEGIN_ALLOW_THREADS
size = write(fd, pbuf.buf, (size_t)pbuf.len); size = write(fd, pbuf.buf, (size_t)pbuf.len);