Support for buffer protocol for socket and struct.

* Added socket.recv_buf() and socket.recvfrom_buf() methods, that use the buffer
  protocol (send and sendto already did).

* Added struct.pack_to(), that is the corresponding buffer compatible method to
  unpack_from().

* Fixed minor typos in arraymodule.
This commit is contained in:
Martin Blais 2006-05-26 12:03:27 +00:00
parent 1b94940165
commit 2856e5f390
6 changed files with 550 additions and 220 deletions

View file

@ -1,5 +1,8 @@
from test.test_support import TestFailed, verbose, verify
import test.test_support
import struct
import array
import unittest
import sys
ISBIGENDIAN = sys.byteorder == "big"
@ -438,31 +441,6 @@ def test_705836():
test_705836()
def test_unpack_from():
test_string = 'abcd01234'
fmt = '4s'
s = struct.Struct(fmt)
for cls in (str, buffer):
data = cls(test_string)
assert s.unpack_from(data) == ('abcd',)
assert s.unpack_from(data, 2) == ('cd01',)
assert s.unpack_from(data, 4) == ('0123',)
for i in xrange(6):
assert s.unpack_from(data, i) == (data[i:i+4],)
for i in xrange(6, len(test_string) + 1):
simple_err(s.unpack_from, data, i)
for cls in (str, buffer):
data = cls(test_string)
assert struct.unpack_from(fmt, data) == ('abcd',)
assert struct.unpack_from(fmt, data, 2) == ('cd01',)
assert struct.unpack_from(fmt, data, 4) == ('0123',)
for i in xrange(6):
assert struct.unpack_from(fmt, data, i) == (data[i:i+4],)
for i in xrange(6, len(test_string) + 1):
simple_err(struct.unpack_from, fmt, data, i)
test_unpack_from()
def test_1229380():
for endian in ('', '>', '<'):
for cls in (int, long):
@ -478,3 +456,60 @@ def test_1229380():
if 0:
# TODO: bug #1229380
test_1229380()
class PackBufferTestCase(unittest.TestCase):
"""
Test the packing methods that work on buffers.
"""
def test_unpack_from( self ):
test_string = 'abcd01234'
fmt = '4s'
s = struct.Struct(fmt)
for cls in (str, buffer):
data = cls(test_string)
self.assertEquals(s.unpack_from(data), ('abcd',))
self.assertEquals(s.unpack_from(data, 2), ('cd01',))
self.assertEquals(s.unpack_from(data, 4), ('0123',))
for i in xrange(6):
self.assertEquals(s.unpack_from(data, i), (data[i:i+4],))
for i in xrange(6, len(test_string) + 1):
simple_err(s.unpack_from, data, i)
for cls in (str, buffer):
data = cls(test_string)
self.assertEquals(struct.unpack_from(fmt, data), ('abcd',))
self.assertEquals(struct.unpack_from(fmt, data, 2), ('cd01',))
self.assertEquals(struct.unpack_from(fmt, data, 4), ('0123',))
for i in xrange(6):
self.assertEquals(struct.unpack_from(fmt, data, i),
(data[i:i+4],))
for i in xrange(6, len(test_string) + 1):
simple_err(struct.unpack_from, fmt, data, i)
def test_pack_to( self ):
test_string = 'Reykjavik rocks, eow!'
writable_buf = array.array('c', ' '*100)
fmt = '21s'
s = struct.Struct(fmt)
# Test without offset
s.pack_to(writable_buf, 0, test_string)
from_buf = writable_buf.tostring()[:len(test_string)]
self.assertEquals(from_buf, test_string)
# Test with offset.
s.pack_to(writable_buf, 10, test_string)
from_buf = writable_buf.tostring()[:len(test_string)+10]
self.assertEquals(from_buf, (test_string[:10] + test_string))
# Go beyond boundaries.
small_buf = array.array('c', ' '*10)
self.assertRaises(struct.error, s.pack_to, small_buf, 0, test_string)
self.assertRaises(struct.error, s.pack_to, small_buf, 2, test_string)
def test_main():
test.test_support.run_unittest(PackBufferTestCase)
if __name__ == "__main__":
test_main()