mirror of
https://github.com/python/cpython.git
synced 2025-09-06 17:02:26 +00:00
gh-69093: Add indexing and slicing support to sqlite3.Blob (#91599)
Authored-by: Aviv Palivoda <palaviv@gmail.com> Co-authored-by: Erlend E. Aasland <erlend.aasland@innova.no>
This commit is contained in:
parent
1317b70f89
commit
29afb7d2ef
5 changed files with 349 additions and 16 deletions
|
@ -33,7 +33,7 @@ from test.support import (
|
|||
check_disallow_instantiation,
|
||||
threading_helper,
|
||||
)
|
||||
from _testcapi import INT_MAX
|
||||
from _testcapi import INT_MAX, ULLONG_MAX
|
||||
from os import SEEK_SET, SEEK_CUR, SEEK_END
|
||||
from test.support.os_helper import TESTFN, unlink, temp_dir
|
||||
|
||||
|
@ -1138,6 +1138,13 @@ class BlobTests(unittest.TestCase):
|
|||
with self.assertRaisesRegex(ValueError, "data longer than blob"):
|
||||
self.blob.write(b"a" * 1000)
|
||||
|
||||
self.blob.seek(0, SEEK_SET)
|
||||
n = len(self.blob)
|
||||
self.blob.write(b"a" * (n-1))
|
||||
self.blob.write(b"a")
|
||||
with self.assertRaisesRegex(ValueError, "data longer than blob"):
|
||||
self.blob.write(b"a")
|
||||
|
||||
def test_blob_write_error_row_changed(self):
|
||||
self.cx.execute("update test set b='aaaa' where rowid=1")
|
||||
with self.assertRaises(sqlite.OperationalError):
|
||||
|
@ -1162,12 +1169,127 @@ class BlobTests(unittest.TestCase):
|
|||
with self.assertRaisesRegex(sqlite.OperationalError, regex):
|
||||
self.cx.blobopen(*args, **kwds)
|
||||
|
||||
def test_blob_length(self):
|
||||
self.assertEqual(len(self.blob), 50)
|
||||
|
||||
def test_blob_get_item(self):
|
||||
self.assertEqual(self.blob[5], b"b")
|
||||
self.assertEqual(self.blob[6], b"l")
|
||||
self.assertEqual(self.blob[7], b"o")
|
||||
self.assertEqual(self.blob[8], b"b")
|
||||
self.assertEqual(self.blob[-1], b"!")
|
||||
|
||||
def test_blob_set_item(self):
|
||||
self.blob[0] = b"b"
|
||||
expected = b"b" + self.data[1:]
|
||||
actual = self.cx.execute("select b from test").fetchone()[0]
|
||||
self.assertEqual(actual, expected)
|
||||
|
||||
def test_blob_set_item_with_offset(self):
|
||||
self.blob.seek(0, SEEK_END)
|
||||
self.assertEqual(self.blob.read(), b"") # verify that we're at EOB
|
||||
self.blob[0] = b"T"
|
||||
self.blob[-1] = b"."
|
||||
self.blob.seek(0, SEEK_SET)
|
||||
expected = b"This blob data string is exactly fifty bytes long."
|
||||
self.assertEqual(self.blob.read(), expected)
|
||||
|
||||
def test_blob_set_buffer_object(self):
|
||||
from array import array
|
||||
self.blob[0] = memoryview(b"1")
|
||||
self.assertEqual(self.blob[0], b"1")
|
||||
|
||||
self.blob[1] = bytearray(b"2")
|
||||
self.assertEqual(self.blob[1], b"2")
|
||||
|
||||
self.blob[2] = array("b", [4])
|
||||
self.assertEqual(self.blob[2], b"\x04")
|
||||
|
||||
self.blob[0:5] = memoryview(b"12345")
|
||||
self.assertEqual(self.blob[0:5], b"12345")
|
||||
|
||||
self.blob[0:5] = bytearray(b"23456")
|
||||
self.assertEqual(self.blob[0:5], b"23456")
|
||||
|
||||
self.blob[0:5] = array("b", [1, 2, 3, 4, 5])
|
||||
self.assertEqual(self.blob[0:5], b"\x01\x02\x03\x04\x05")
|
||||
|
||||
def test_blob_set_item_negative_index(self):
|
||||
self.blob[-1] = b"z"
|
||||
self.assertEqual(self.blob[-1], b"z")
|
||||
|
||||
def test_blob_get_slice(self):
|
||||
self.assertEqual(self.blob[5:14], b"blob data")
|
||||
|
||||
def test_blob_get_empty_slice(self):
|
||||
self.assertEqual(self.blob[5:5], b"")
|
||||
|
||||
def test_blob_get_slice_negative_index(self):
|
||||
self.assertEqual(self.blob[5:-5], self.data[5:-5])
|
||||
|
||||
def test_blob_get_slice_with_skip(self):
|
||||
self.assertEqual(self.blob[0:10:2], b"ti lb")
|
||||
|
||||
def test_blob_set_slice(self):
|
||||
self.blob[0:5] = b"12345"
|
||||
expected = b"12345" + self.data[5:]
|
||||
actual = self.cx.execute("select b from test").fetchone()[0]
|
||||
self.assertEqual(actual, expected)
|
||||
|
||||
def test_blob_set_empty_slice(self):
|
||||
self.blob[0:0] = b""
|
||||
self.assertEqual(self.blob[:], self.data)
|
||||
|
||||
def test_blob_set_slice_with_skip(self):
|
||||
self.blob[0:10:2] = b"12345"
|
||||
actual = self.cx.execute("select b from test").fetchone()[0]
|
||||
expected = b"1h2s3b4o5 " + self.data[10:]
|
||||
self.assertEqual(actual, expected)
|
||||
|
||||
def test_blob_mapping_invalid_index_type(self):
|
||||
msg = "indices must be integers"
|
||||
with self.assertRaisesRegex(TypeError, msg):
|
||||
self.blob[5:5.5]
|
||||
with self.assertRaisesRegex(TypeError, msg):
|
||||
self.blob[1.5]
|
||||
with self.assertRaisesRegex(TypeError, msg):
|
||||
self.blob["a"] = b"b"
|
||||
|
||||
def test_blob_get_item_error(self):
|
||||
dataset = [len(self.blob), 105, -105]
|
||||
for idx in dataset:
|
||||
with self.subTest(idx=idx):
|
||||
with self.assertRaisesRegex(IndexError, "index out of range"):
|
||||
self.blob[idx]
|
||||
with self.assertRaisesRegex(IndexError, "cannot fit 'int'"):
|
||||
self.blob[ULLONG_MAX]
|
||||
|
||||
def test_blob_set_item_error(self):
|
||||
with self.assertRaisesRegex(ValueError, "must be a single byte"):
|
||||
self.blob[0] = b"multiple"
|
||||
with self.assertRaisesRegex(TypeError, "doesn't support.*deletion"):
|
||||
del self.blob[0]
|
||||
with self.assertRaisesRegex(IndexError, "Blob index out of range"):
|
||||
self.blob[1000] = b"a"
|
||||
|
||||
def test_blob_set_slice_error(self):
|
||||
with self.assertRaisesRegex(IndexError, "wrong size"):
|
||||
self.blob[5:10] = b"a"
|
||||
with self.assertRaisesRegex(IndexError, "wrong size"):
|
||||
self.blob[5:10] = b"a" * 1000
|
||||
with self.assertRaisesRegex(TypeError, "doesn't support.*deletion"):
|
||||
del self.blob[5:10]
|
||||
with self.assertRaisesRegex(ValueError, "step cannot be zero"):
|
||||
self.blob[5:10:0] = b"12345"
|
||||
with self.assertRaises(BufferError):
|
||||
self.blob[5:10] = memoryview(b"abcde")[::2]
|
||||
|
||||
def test_blob_sequence_not_supported(self):
|
||||
with self.assertRaises(TypeError):
|
||||
with self.assertRaisesRegex(TypeError, "unsupported operand"):
|
||||
self.blob + self.blob
|
||||
with self.assertRaises(TypeError):
|
||||
with self.assertRaisesRegex(TypeError, "unsupported operand"):
|
||||
self.blob * 5
|
||||
with self.assertRaises(TypeError):
|
||||
with self.assertRaisesRegex(TypeError, "is not iterable"):
|
||||
b"a" in self.blob
|
||||
|
||||
def test_blob_context_manager(self):
|
||||
|
@ -1209,6 +1331,14 @@ class BlobTests(unittest.TestCase):
|
|||
blob.__enter__()
|
||||
with self.assertRaisesRegex(sqlite.ProgrammingError, msg):
|
||||
blob.__exit__(None, None, None)
|
||||
with self.assertRaisesRegex(sqlite.ProgrammingError, msg):
|
||||
len(blob)
|
||||
with self.assertRaisesRegex(sqlite.ProgrammingError, msg):
|
||||
blob[0]
|
||||
with self.assertRaisesRegex(sqlite.ProgrammingError, msg):
|
||||
blob[0:1]
|
||||
with self.assertRaisesRegex(sqlite.ProgrammingError, msg):
|
||||
blob[0] = b""
|
||||
|
||||
def test_blob_closed_db_read(self):
|
||||
with memory_database() as cx:
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue