mirror of
https://github.com/python/cpython.git
synced 2025-08-31 14:07:50 +00:00
Rewrote to use temporary files instead of StringIO objects in most places.
Goal is to work in the direction of universal newline support.
This commit is contained in:
parent
a2c9a98a0a
commit
58fc5d0813
1 changed files with 249 additions and 109 deletions
|
@ -3,8 +3,10 @@
|
|||
# csv package unit tests
|
||||
|
||||
import sys
|
||||
import os
|
||||
import unittest
|
||||
from StringIO import StringIO
|
||||
import tempfile
|
||||
import csv
|
||||
import gc
|
||||
from test import test_support
|
||||
|
@ -55,11 +57,17 @@ class Test_Csv(unittest.TestCase):
|
|||
self._test_attrs(csv.writer(StringIO()))
|
||||
|
||||
def _write_test(self, fields, expect, **kwargs):
|
||||
fileobj = StringIO()
|
||||
writer = csv.writer(fileobj, **kwargs)
|
||||
writer.writerow(fields)
|
||||
self.assertEqual(fileobj.getvalue(),
|
||||
expect + writer.dialect.lineterminator)
|
||||
fd, name = tempfile.mkstemp()
|
||||
fileobj = os.fdopen(fd, "w+b")
|
||||
try:
|
||||
writer = csv.writer(fileobj, **kwargs)
|
||||
writer.writerow(fields)
|
||||
fileobj.seek(0)
|
||||
self.assertEqual(fileobj.read(),
|
||||
expect + writer.dialect.lineterminator)
|
||||
finally:
|
||||
fileobj.close()
|
||||
os.unlink(name)
|
||||
|
||||
def test_write_arg_valid(self):
|
||||
self.assertRaises(csv.Error, self._write_test, None, '')
|
||||
|
@ -114,12 +122,18 @@ class Test_Csv(unittest.TestCase):
|
|||
raise IOError
|
||||
writer = csv.writer(BrokenFile())
|
||||
self.assertRaises(IOError, writer.writerows, [['a']])
|
||||
fileobj = StringIO()
|
||||
writer = csv.writer(fileobj)
|
||||
self.assertRaises(TypeError, writer.writerows, None)
|
||||
writer.writerows([['a','b'],['c','d']])
|
||||
self.assertEqual(fileobj.getvalue(), "a,b\r\nc,d\r\n")
|
||||
|
||||
fd, name = tempfile.mkstemp()
|
||||
fileobj = os.fdopen(fd, "w+b")
|
||||
try:
|
||||
writer = csv.writer(fileobj)
|
||||
self.assertRaises(TypeError, writer.writerows, None)
|
||||
writer.writerows([['a','b'],['c','d']])
|
||||
fileobj.seek(0)
|
||||
self.assertEqual(fileobj.read(), "a,b\r\nc,d\r\n")
|
||||
finally:
|
||||
fileobj.close()
|
||||
os.unlink(name)
|
||||
|
||||
def _read_test(self, input, expect, **kwargs):
|
||||
reader = csv.reader(input, **kwargs)
|
||||
result = list(reader)
|
||||
|
@ -201,11 +215,18 @@ class TestDialectRegistry(unittest.TestCase):
|
|||
quoting = csv.QUOTE_NONE
|
||||
escapechar = "\\"
|
||||
|
||||
s = StringIO("abc def\nc1ccccc1 benzene\n")
|
||||
rdr = csv.reader(s, dialect=space())
|
||||
self.assertEqual(rdr.next(), ["abc", "def"])
|
||||
self.assertEqual(rdr.next(), ["c1ccccc1", "benzene"])
|
||||
|
||||
fd, name = tempfile.mkstemp()
|
||||
fileobj = os.fdopen(fd, "w+b")
|
||||
try:
|
||||
fileobj.write("abc def\nc1ccccc1 benzene\n")
|
||||
fileobj.seek(0)
|
||||
rdr = csv.reader(fileobj, dialect=space())
|
||||
self.assertEqual(rdr.next(), ["abc", "def"])
|
||||
self.assertEqual(rdr.next(), ["c1ccccc1", "benzene"])
|
||||
finally:
|
||||
fileobj.close()
|
||||
os.unlink(name)
|
||||
|
||||
def test_dialect_apply(self):
|
||||
class testA(csv.excel):
|
||||
delimiter = "\t"
|
||||
|
@ -216,30 +237,61 @@ class TestDialectRegistry(unittest.TestCase):
|
|||
|
||||
csv.register_dialect('testC', testC)
|
||||
try:
|
||||
fileobj = StringIO()
|
||||
writer = csv.writer(fileobj)
|
||||
writer.writerow([1,2,3])
|
||||
self.assertEqual(fileobj.getvalue(), "1,2,3\r\n")
|
||||
fd, name = tempfile.mkstemp()
|
||||
fileobj = os.fdopen(fd, "w+b")
|
||||
try:
|
||||
writer = csv.writer(fileobj)
|
||||
writer.writerow([1,2,3])
|
||||
fileobj.seek(0)
|
||||
self.assertEqual(fileobj.read(), "1,2,3\r\n")
|
||||
finally:
|
||||
fileobj.close()
|
||||
os.unlink(name)
|
||||
|
||||
fd, name = tempfile.mkstemp()
|
||||
fileobj = os.fdopen(fd, "w+b")
|
||||
try:
|
||||
writer = csv.writer(fileobj, testA)
|
||||
writer.writerow([1,2,3])
|
||||
fileobj.seek(0)
|
||||
self.assertEqual(fileobj.read(), "1\t2\t3\r\n")
|
||||
finally:
|
||||
fileobj.close()
|
||||
os.unlink(name)
|
||||
|
||||
fileobj = StringIO()
|
||||
writer = csv.writer(fileobj, testA)
|
||||
writer.writerow([1,2,3])
|
||||
self.assertEqual(fileobj.getvalue(), "1\t2\t3\r\n")
|
||||
fd, name = tempfile.mkstemp()
|
||||
fileobj = os.fdopen(fd, "w+b")
|
||||
try:
|
||||
writer = csv.writer(fileobj, dialect=testB())
|
||||
writer.writerow([1,2,3])
|
||||
fileobj.seek(0)
|
||||
self.assertEqual(fileobj.read(), "1:2:3\r\n")
|
||||
finally:
|
||||
fileobj.close()
|
||||
os.unlink(name)
|
||||
|
||||
fileobj = StringIO()
|
||||
writer = csv.writer(fileobj, dialect=testB())
|
||||
writer.writerow([1,2,3])
|
||||
self.assertEqual(fileobj.getvalue(), "1:2:3\r\n")
|
||||
fd, name = tempfile.mkstemp()
|
||||
fileobj = os.fdopen(fd, "w+b")
|
||||
try:
|
||||
writer = csv.writer(fileobj, dialect='testC')
|
||||
writer.writerow([1,2,3])
|
||||
fileobj.seek(0)
|
||||
self.assertEqual(fileobj.read(), "1|2|3\r\n")
|
||||
finally:
|
||||
fileobj.close()
|
||||
os.unlink(name)
|
||||
|
||||
fileobj = StringIO()
|
||||
writer = csv.writer(fileobj, dialect='testC')
|
||||
writer.writerow([1,2,3])
|
||||
self.assertEqual(fileobj.getvalue(), "1|2|3\r\n")
|
||||
fd, name = tempfile.mkstemp()
|
||||
fileobj = os.fdopen(fd, "w+b")
|
||||
try:
|
||||
writer = csv.writer(fileobj, dialect=testA, delimiter=';')
|
||||
writer.writerow([1,2,3])
|
||||
fileobj.seek(0)
|
||||
self.assertEqual(fileobj.read(), "1;2;3\r\n")
|
||||
finally:
|
||||
fileobj.close()
|
||||
os.unlink(name)
|
||||
|
||||
fileobj = StringIO()
|
||||
writer = csv.writer(fileobj, dialect=testA, delimiter=';')
|
||||
writer.writerow([1,2,3])
|
||||
self.assertEqual(fileobj.getvalue(), "1;2;3\r\n")
|
||||
finally:
|
||||
csv.unregister_dialect('testC')
|
||||
|
||||
|
@ -253,15 +305,29 @@ class TestDialectRegistry(unittest.TestCase):
|
|||
|
||||
class TestCsvBase(unittest.TestCase):
|
||||
def readerAssertEqual(self, input, expected_result):
|
||||
reader = csv.reader(StringIO(input), dialect = self.dialect)
|
||||
fields = list(reader)
|
||||
self.assertEqual(fields, expected_result)
|
||||
fd, name = tempfile.mkstemp()
|
||||
fileobj = os.fdopen(fd, "w+b")
|
||||
try:
|
||||
fileobj.write(input)
|
||||
fileobj.seek(0)
|
||||
reader = csv.reader(fileobj, dialect = self.dialect)
|
||||
fields = list(reader)
|
||||
self.assertEqual(fields, expected_result)
|
||||
finally:
|
||||
fileobj.close()
|
||||
os.unlink(name)
|
||||
|
||||
def writerAssertEqual(self, input, expected_result):
|
||||
fileobj = StringIO()
|
||||
writer = csv.writer(fileobj, dialect = self.dialect)
|
||||
writer.writerows(input)
|
||||
self.assertEqual(fileobj.getvalue(), expected_result)
|
||||
fd, name = tempfile.mkstemp()
|
||||
fileobj = os.fdopen(fd, "w+b")
|
||||
try:
|
||||
writer = csv.writer(fileobj, dialect = self.dialect)
|
||||
writer.writerows(input)
|
||||
fileobj.seek(0)
|
||||
self.assertEqual(fileobj.read(), expected_result)
|
||||
finally:
|
||||
fileobj.close()
|
||||
os.unlink(name)
|
||||
|
||||
class TestDialectExcel(TestCsvBase):
|
||||
dialect = 'excel'
|
||||
|
@ -386,51 +452,104 @@ class TestDictFields(unittest.TestCase):
|
|||
### "long" means the row is longer than the number of fieldnames
|
||||
### "short" means there are fewer elements in the row than fieldnames
|
||||
def test_write_simple_dict(self):
|
||||
fileobj = StringIO()
|
||||
writer = csv.DictWriter(fileobj, fieldnames = ["f1", "f2", "f3"])
|
||||
writer.writerow({"f1": 10, "f3": "abc"})
|
||||
self.assertEqual(fileobj.getvalue(), "10,,abc\r\n")
|
||||
fd, name = tempfile.mkstemp()
|
||||
fileobj = os.fdopen(fd, "w+b")
|
||||
try:
|
||||
writer = csv.DictWriter(fileobj, fieldnames = ["f1", "f2", "f3"])
|
||||
writer.writerow({"f1": 10, "f3": "abc"})
|
||||
fileobj.seek(0)
|
||||
self.assertEqual(fileobj.read(), "10,,abc\r\n")
|
||||
finally:
|
||||
fileobj.close()
|
||||
os.unlink(name)
|
||||
|
||||
def test_write_no_fields(self):
|
||||
fileobj = StringIO()
|
||||
self.assertRaises(TypeError, csv.DictWriter, fileobj)
|
||||
|
||||
def test_read_dict_fields(self):
|
||||
reader = csv.DictReader(StringIO("1,2,abc\r\n"),
|
||||
fieldnames=["f1", "f2", "f3"])
|
||||
self.assertEqual(reader.next(), {"f1": '1', "f2": '2', "f3": 'abc'})
|
||||
fd, name = tempfile.mkstemp()
|
||||
fileobj = os.fdopen(fd, "w+b")
|
||||
try:
|
||||
fileobj.write("1,2,abc\r\n")
|
||||
fileobj.seek(0)
|
||||
reader = csv.DictReader(fileobj,
|
||||
fieldnames=["f1", "f2", "f3"])
|
||||
self.assertEqual(reader.next(), {"f1": '1', "f2": '2', "f3": 'abc'})
|
||||
finally:
|
||||
fileobj.close()
|
||||
os.unlink(name)
|
||||
|
||||
def test_read_dict_no_fieldnames(self):
|
||||
reader = csv.DictReader(StringIO("f1,f2,f3\r\n1,2,abc\r\n"))
|
||||
self.assertEqual(reader.next(), {"f1": '1', "f2": '2', "f3": 'abc'})
|
||||
fd, name = tempfile.mkstemp()
|
||||
fileobj = os.fdopen(fd, "w+b")
|
||||
try:
|
||||
fileobj.write("f1,f2,f3\r\n1,2,abc\r\n")
|
||||
fileobj.seek(0)
|
||||
reader = csv.DictReader(fileobj)
|
||||
self.assertEqual(reader.next(), {"f1": '1', "f2": '2', "f3": 'abc'})
|
||||
finally:
|
||||
fileobj.close()
|
||||
os.unlink(name)
|
||||
|
||||
def test_read_long(self):
|
||||
reader = csv.DictReader(StringIO("1,2,abc,4,5,6\r\n"),
|
||||
fieldnames=["f1", "f2"])
|
||||
self.assertEqual(reader.next(), {"f1": '1', "f2": '2',
|
||||
None: ["abc", "4", "5", "6"]})
|
||||
fd, name = tempfile.mkstemp()
|
||||
fileobj = os.fdopen(fd, "w+b")
|
||||
try:
|
||||
fileobj.write("1,2,abc,4,5,6\r\n")
|
||||
fileobj.seek(0)
|
||||
reader = csv.DictReader(fileobj,
|
||||
fieldnames=["f1", "f2"])
|
||||
self.assertEqual(reader.next(), {"f1": '1', "f2": '2',
|
||||
None: ["abc", "4", "5", "6"]})
|
||||
finally:
|
||||
fileobj.close()
|
||||
os.unlink(name)
|
||||
|
||||
def test_read_long_with_rest(self):
|
||||
reader = csv.DictReader(StringIO("1,2,abc,4,5,6\r\n"),
|
||||
fieldnames=["f1", "f2"], restkey="_rest")
|
||||
self.assertEqual(reader.next(), {"f1": '1', "f2": '2',
|
||||
"_rest": ["abc", "4", "5", "6"]})
|
||||
fd, name = tempfile.mkstemp()
|
||||
fileobj = os.fdopen(fd, "w+b")
|
||||
try:
|
||||
fileobj.write("1,2,abc,4,5,6\r\n")
|
||||
fileobj.seek(0)
|
||||
reader = csv.DictReader(fileobj,
|
||||
fieldnames=["f1", "f2"], restkey="_rest")
|
||||
self.assertEqual(reader.next(), {"f1": '1', "f2": '2',
|
||||
"_rest": ["abc", "4", "5", "6"]})
|
||||
finally:
|
||||
fileobj.close()
|
||||
os.unlink(name)
|
||||
|
||||
def test_read_long_with_rest_no_fieldnames(self):
|
||||
reader = csv.DictReader(StringIO("f1,f2\r\n1,2,abc,4,5,6\r\n"),
|
||||
restkey="_rest")
|
||||
self.assertEqual(reader.next(), {"f1": '1', "f2": '2',
|
||||
"_rest": ["abc", "4", "5", "6"]})
|
||||
fd, name = tempfile.mkstemp()
|
||||
fileobj = os.fdopen(fd, "w+b")
|
||||
try:
|
||||
fileobj.write("f1,f2\r\n1,2,abc,4,5,6\r\n")
|
||||
fileobj.seek(0)
|
||||
reader = csv.DictReader(fileobj, restkey="_rest")
|
||||
self.assertEqual(reader.next(), {"f1": '1', "f2": '2',
|
||||
"_rest": ["abc", "4", "5", "6"]})
|
||||
finally:
|
||||
fileobj.close()
|
||||
os.unlink(name)
|
||||
|
||||
def test_read_short(self):
|
||||
reader = csv.DictReader(["1,2,abc,4,5,6\r\n","1,2,abc\r\n"],
|
||||
fieldnames="1 2 3 4 5 6".split(),
|
||||
restval="DEFAULT")
|
||||
self.assertEqual(reader.next(), {"1": '1', "2": '2', "3": 'abc',
|
||||
"4": '4', "5": '5', "6": '6'})
|
||||
self.assertEqual(reader.next(), {"1": '1', "2": '2', "3": 'abc',
|
||||
"4": 'DEFAULT', "5": 'DEFAULT',
|
||||
"6": 'DEFAULT'})
|
||||
fd, name = tempfile.mkstemp()
|
||||
fileobj = os.fdopen(fd, "w+b")
|
||||
try:
|
||||
fileobj.write("1,2,abc,4,5,6\r\n1,2,abc\r\n")
|
||||
fileobj.seek(0)
|
||||
reader = csv.DictReader(fileobj,
|
||||
fieldnames="1 2 3 4 5 6".split(),
|
||||
restval="DEFAULT")
|
||||
self.assertEqual(reader.next(), {"1": '1', "2": '2', "3": 'abc',
|
||||
"4": '4', "5": '5', "6": '6'})
|
||||
self.assertEqual(reader.next(), {"1": '1', "2": '2', "3": 'abc',
|
||||
"4": 'DEFAULT', "5": 'DEFAULT',
|
||||
"6": 'DEFAULT'})
|
||||
finally:
|
||||
fileobj.close()
|
||||
os.unlink(name)
|
||||
|
||||
def test_read_multi(self):
|
||||
sample = [
|
||||
|
@ -468,40 +587,65 @@ class TestArrayWrites(unittest.TestCase):
|
|||
import array
|
||||
contents = [(20-i) for i in range(20)]
|
||||
a = array.array('i', contents)
|
||||
fileobj = StringIO()
|
||||
writer = csv.writer(fileobj, dialect="excel")
|
||||
writer.writerow(a)
|
||||
expected = ",".join([str(i) for i in a])+"\r\n"
|
||||
self.assertEqual(fileobj.getvalue(), expected)
|
||||
|
||||
fd, name = tempfile.mkstemp()
|
||||
fileobj = os.fdopen(fd, "w+b")
|
||||
try:
|
||||
writer = csv.writer(fileobj, dialect="excel")
|
||||
writer.writerow(a)
|
||||
expected = ",".join([str(i) for i in a])+"\r\n"
|
||||
fileobj.seek(0)
|
||||
self.assertEqual(fileobj.read(), expected)
|
||||
finally:
|
||||
fileobj.close()
|
||||
os.unlink(name)
|
||||
|
||||
def test_double_write(self):
|
||||
import array
|
||||
contents = [(20-i)*0.1 for i in range(20)]
|
||||
a = array.array('d', contents)
|
||||
fileobj = StringIO()
|
||||
writer = csv.writer(fileobj, dialect="excel")
|
||||
writer.writerow(a)
|
||||
expected = ",".join([str(i) for i in a])+"\r\n"
|
||||
self.assertEqual(fileobj.getvalue(), expected)
|
||||
fd, name = tempfile.mkstemp()
|
||||
fileobj = os.fdopen(fd, "w+b")
|
||||
try:
|
||||
writer = csv.writer(fileobj, dialect="excel")
|
||||
writer.writerow(a)
|
||||
expected = ",".join([str(i) for i in a])+"\r\n"
|
||||
fileobj.seek(0)
|
||||
self.assertEqual(fileobj.read(), expected)
|
||||
finally:
|
||||
fileobj.close()
|
||||
os.unlink(name)
|
||||
|
||||
def test_float_write(self):
|
||||
import array
|
||||
contents = [(20-i)*0.1 for i in range(20)]
|
||||
a = array.array('f', contents)
|
||||
fileobj = StringIO()
|
||||
writer = csv.writer(fileobj, dialect="excel")
|
||||
writer.writerow(a)
|
||||
expected = ",".join([str(i) for i in a])+"\r\n"
|
||||
self.assertEqual(fileobj.getvalue(), expected)
|
||||
fd, name = tempfile.mkstemp()
|
||||
fileobj = os.fdopen(fd, "w+b")
|
||||
try:
|
||||
writer = csv.writer(fileobj, dialect="excel")
|
||||
writer.writerow(a)
|
||||
expected = ",".join([str(i) for i in a])+"\r\n"
|
||||
fileobj.seek(0)
|
||||
self.assertEqual(fileobj.read(), expected)
|
||||
finally:
|
||||
fileobj.close()
|
||||
os.unlink(name)
|
||||
|
||||
def test_char_write(self):
|
||||
import array, string
|
||||
a = array.array('c', string.letters)
|
||||
fileobj = StringIO()
|
||||
writer = csv.writer(fileobj, dialect="excel")
|
||||
writer.writerow(a)
|
||||
expected = ",".join(a)+"\r\n"
|
||||
self.assertEqual(fileobj.getvalue(), expected)
|
||||
fd, name = tempfile.mkstemp()
|
||||
fileobj = os.fdopen(fd, "w+b")
|
||||
try:
|
||||
writer = csv.writer(fileobj, dialect="excel")
|
||||
writer.writerow(a)
|
||||
expected = ",".join(a)+"\r\n"
|
||||
fileobj.seek(0)
|
||||
self.assertEqual(fileobj.read(), expected)
|
||||
finally:
|
||||
fileobj.close()
|
||||
os.unlink(name)
|
||||
|
||||
class TestDialectValidity(unittest.TestCase):
|
||||
def test_quoting(self):
|
||||
|
@ -696,23 +840,19 @@ else:
|
|||
self.assertEqual(delta < 5, True)
|
||||
|
||||
# commented out for now - csv module doesn't yet support Unicode
|
||||
if 0:
|
||||
from StringIO import StringIO
|
||||
import csv
|
||||
|
||||
class TestUnicode(unittest.TestCase):
|
||||
def test_unicode_read(self):
|
||||
import codecs
|
||||
f = codecs.EncodedFile(StringIO("Martin von Löwis,"
|
||||
"Marc André Lemburg,"
|
||||
"Guido van Rossum,"
|
||||
"François Pinard\r\n"),
|
||||
data_encoding='iso-8859-1')
|
||||
reader = csv.reader(f)
|
||||
self.assertEqual(list(reader), [[u"Martin von Löwis",
|
||||
u"Marc André Lemburg",
|
||||
u"Guido van Rossum",
|
||||
u"François Pinardn"]])
|
||||
## class TestUnicode(unittest.TestCase):
|
||||
## def test_unicode_read(self):
|
||||
## import codecs
|
||||
## f = codecs.EncodedFile(StringIO("Martin von Löwis,"
|
||||
## "Marc André Lemburg,"
|
||||
## "Guido van Rossum,"
|
||||
## "François Pinard\r\n"),
|
||||
## data_encoding='iso-8859-1')
|
||||
## reader = csv.reader(f)
|
||||
## self.assertEqual(list(reader), [[u"Martin von Löwis",
|
||||
## u"Marc André Lemburg",
|
||||
## u"Guido van Rossum",
|
||||
## u"François Pinardn"]])
|
||||
|
||||
def test_main():
|
||||
mod = sys.modules[__name__]
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue