mirror of
https://github.com/python/cpython.git
synced 2025-07-08 03:45:36 +00:00
117 lines
3.8 KiB
Python
117 lines
3.8 KiB
Python
import io
|
|
import _pyio as pyio
|
|
import threading
|
|
from unittest import TestCase
|
|
from test.support import threading_helper
|
|
from random import randint
|
|
from sys import getsizeof
|
|
|
|
|
|
class ThreadSafetyMixin:
|
|
# Test pretty much everything that can break under free-threading.
|
|
# Non-deterministic, but at least one of these things will fail if
|
|
# BytesIO object is not free-thread safe.
|
|
|
|
def check(self, funcs, *args):
|
|
barrier = threading.Barrier(len(funcs))
|
|
threads = []
|
|
|
|
for func in funcs:
|
|
thread = threading.Thread(target=func, args=(barrier, *args))
|
|
|
|
threads.append(thread)
|
|
|
|
with threading_helper.start_threads(threads):
|
|
pass
|
|
|
|
@threading_helper.requires_working_threading()
|
|
@threading_helper.reap_threads
|
|
def test_free_threading(self):
|
|
"""Test for segfaults and aborts."""
|
|
|
|
def write(barrier, b, *ignore):
|
|
barrier.wait()
|
|
try: b.write(b'0' * randint(100, 1000))
|
|
except ValueError: pass # ignore write fail to closed file
|
|
|
|
def writelines(barrier, b, *ignore):
|
|
barrier.wait()
|
|
b.write(b'0\n' * randint(100, 1000))
|
|
|
|
def truncate(barrier, b, *ignore):
|
|
barrier.wait()
|
|
try: b.truncate(0)
|
|
except: BufferError # ignore exported buffer
|
|
|
|
def read(barrier, b, *ignore):
|
|
barrier.wait()
|
|
b.read()
|
|
|
|
def read1(barrier, b, *ignore):
|
|
barrier.wait()
|
|
b.read1()
|
|
|
|
def readline(barrier, b, *ignore):
|
|
barrier.wait()
|
|
b.readline()
|
|
|
|
def readlines(barrier, b, *ignore):
|
|
barrier.wait()
|
|
b.readlines()
|
|
|
|
def readinto(barrier, b, into, *ignore):
|
|
barrier.wait()
|
|
b.readinto(into)
|
|
|
|
def close(barrier, b, *ignore):
|
|
barrier.wait()
|
|
b.close()
|
|
|
|
def getvalue(barrier, b, *ignore):
|
|
barrier.wait()
|
|
b.getvalue()
|
|
|
|
def getbuffer(barrier, b, *ignore):
|
|
barrier.wait()
|
|
b.getbuffer()
|
|
|
|
def iter(barrier, b, *ignore):
|
|
barrier.wait()
|
|
list(b)
|
|
|
|
def getstate(barrier, b, *ignore):
|
|
barrier.wait()
|
|
b.__getstate__()
|
|
|
|
def setstate(barrier, b, st, *ignore):
|
|
barrier.wait()
|
|
b.__setstate__(st)
|
|
|
|
def sizeof(barrier, b, *ignore):
|
|
barrier.wait()
|
|
getsizeof(b)
|
|
|
|
self.check([write] * 10, self.ioclass())
|
|
self.check([writelines] * 10, self.ioclass())
|
|
self.check([write] * 10 + [truncate] * 10, self.ioclass())
|
|
self.check([truncate] + [read] * 10, self.ioclass(b'0\n'*204800))
|
|
self.check([truncate] + [read1] * 10, self.ioclass(b'0\n'*204800))
|
|
self.check([truncate] + [readline] * 10, self.ioclass(b'0\n'*20480))
|
|
self.check([truncate] + [readlines] * 10, self.ioclass(b'0\n'*20480))
|
|
self.check([truncate] + [readinto] * 10, self.ioclass(b'0\n'*204800), bytearray(b'0\n'*204800))
|
|
self.check([close] + [write] * 10, self.ioclass())
|
|
self.check([truncate] + [getvalue] * 10, self.ioclass(b'0\n'*204800))
|
|
self.check([truncate] + [getbuffer] * 10, self.ioclass(b'0\n'*204800))
|
|
self.check([truncate] + [iter] * 10, self.ioclass(b'0\n'*20480))
|
|
self.check([truncate] + [getstate] * 10, self.ioclass(b'0\n'*204800))
|
|
state = self.ioclass(b'123').__getstate__()
|
|
self.check([truncate] + [setstate] * 10, self.ioclass(b'0\n'*204800), state)
|
|
self.check([truncate] + [sizeof] * 10, self.ioclass(b'0\n'*204800))
|
|
|
|
# no tests for seek or tell because they don't break anything
|
|
|
|
class CBytesIOTest(ThreadSafetyMixin, TestCase):
|
|
ioclass = io.BytesIO
|
|
|
|
class PyBytesIOTest(ThreadSafetyMixin, TestCase):
|
|
ioclass = pyio.BytesIO
|