mirror of
https://github.com/python/cpython.git
synced 2025-09-03 15:31:08 +00:00

This brings the code under test.support.interpreters, and the corresponding extension modules, in line with recent updates to PEP 734. (Note: PEP 734 has not been accepted at this time. However, we are using an internal copy of the implementation in the test suite to exercise the existing subinterpreters feature.)
328 lines
9.5 KiB
Python
328 lines
9.5 KiB
Python
import threading
|
|
from textwrap import dedent
|
|
import unittest
|
|
import time
|
|
|
|
from test.support import import_helper
|
|
# Raise SkipTest if subinterpreters not supported.
|
|
_channels = import_helper.import_module('_xxinterpchannels')
|
|
from test.support import interpreters
|
|
from test.support.interpreters import channels
|
|
from .utils import _run_output, TestBase
|
|
|
|
|
|
class TestChannels(TestBase):
|
|
|
|
def test_create(self):
|
|
r, s = channels.create()
|
|
self.assertIsInstance(r, channels.RecvChannel)
|
|
self.assertIsInstance(s, channels.SendChannel)
|
|
|
|
def test_list_all(self):
|
|
self.assertEqual(channels.list_all(), [])
|
|
created = set()
|
|
for _ in range(3):
|
|
ch = channels.create()
|
|
created.add(ch)
|
|
after = set(channels.list_all())
|
|
self.assertEqual(after, created)
|
|
|
|
def test_shareable(self):
|
|
rch, sch = channels.create()
|
|
|
|
self.assertTrue(
|
|
interpreters.is_shareable(rch))
|
|
self.assertTrue(
|
|
interpreters.is_shareable(sch))
|
|
|
|
sch.send_nowait(rch)
|
|
sch.send_nowait(sch)
|
|
rch2 = rch.recv()
|
|
sch2 = rch.recv()
|
|
|
|
self.assertEqual(rch2, rch)
|
|
self.assertEqual(sch2, sch)
|
|
|
|
def test_is_closed(self):
|
|
rch, sch = channels.create()
|
|
rbefore = rch.is_closed
|
|
sbefore = sch.is_closed
|
|
rch.close()
|
|
rafter = rch.is_closed
|
|
safter = sch.is_closed
|
|
|
|
self.assertFalse(rbefore)
|
|
self.assertFalse(sbefore)
|
|
self.assertTrue(rafter)
|
|
self.assertTrue(safter)
|
|
|
|
|
|
class TestRecvChannelAttrs(TestBase):
|
|
|
|
def test_id_type(self):
|
|
rch, _ = channels.create()
|
|
self.assertIsInstance(rch.id, _channels.ChannelID)
|
|
|
|
def test_custom_id(self):
|
|
rch = channels.RecvChannel(1)
|
|
self.assertEqual(rch.id, 1)
|
|
|
|
with self.assertRaises(TypeError):
|
|
channels.RecvChannel('1')
|
|
|
|
def test_id_readonly(self):
|
|
rch = channels.RecvChannel(1)
|
|
with self.assertRaises(AttributeError):
|
|
rch.id = 2
|
|
|
|
def test_equality(self):
|
|
ch1, _ = channels.create()
|
|
ch2, _ = channels.create()
|
|
self.assertEqual(ch1, ch1)
|
|
self.assertNotEqual(ch1, ch2)
|
|
|
|
|
|
class TestSendChannelAttrs(TestBase):
|
|
|
|
def test_id_type(self):
|
|
_, sch = channels.create()
|
|
self.assertIsInstance(sch.id, _channels.ChannelID)
|
|
|
|
def test_custom_id(self):
|
|
sch = channels.SendChannel(1)
|
|
self.assertEqual(sch.id, 1)
|
|
|
|
with self.assertRaises(TypeError):
|
|
channels.SendChannel('1')
|
|
|
|
def test_id_readonly(self):
|
|
sch = channels.SendChannel(1)
|
|
with self.assertRaises(AttributeError):
|
|
sch.id = 2
|
|
|
|
def test_equality(self):
|
|
_, ch1 = channels.create()
|
|
_, ch2 = channels.create()
|
|
self.assertEqual(ch1, ch1)
|
|
self.assertNotEqual(ch1, ch2)
|
|
|
|
|
|
class TestSendRecv(TestBase):
|
|
|
|
def test_send_recv_main(self):
|
|
r, s = channels.create()
|
|
orig = b'spam'
|
|
s.send_nowait(orig)
|
|
obj = r.recv()
|
|
|
|
self.assertEqual(obj, orig)
|
|
self.assertIsNot(obj, orig)
|
|
|
|
def test_send_recv_same_interpreter(self):
|
|
interp = interpreters.create()
|
|
interp.exec(dedent("""
|
|
from test.support.interpreters import channels
|
|
r, s = channels.create()
|
|
orig = b'spam'
|
|
s.send_nowait(orig)
|
|
obj = r.recv()
|
|
assert obj == orig, 'expected: obj == orig'
|
|
assert obj is not orig, 'expected: obj is not orig'
|
|
"""))
|
|
|
|
@unittest.skip('broken (see BPO-...)')
|
|
def test_send_recv_different_interpreters(self):
|
|
r1, s1 = channels.create()
|
|
r2, s2 = channels.create()
|
|
orig1 = b'spam'
|
|
s1.send_nowait(orig1)
|
|
out = _run_output(
|
|
interpreters.create(),
|
|
dedent(f"""
|
|
obj1 = r.recv()
|
|
assert obj1 == b'spam', 'expected: obj1 == orig1'
|
|
# When going to another interpreter we get a copy.
|
|
assert id(obj1) != {id(orig1)}, 'expected: obj1 is not orig1'
|
|
orig2 = b'eggs'
|
|
print(id(orig2))
|
|
s.send_nowait(orig2)
|
|
"""),
|
|
channels=dict(r=r1, s=s2),
|
|
)
|
|
obj2 = r2.recv()
|
|
|
|
self.assertEqual(obj2, b'eggs')
|
|
self.assertNotEqual(id(obj2), int(out))
|
|
|
|
def test_send_recv_different_threads(self):
|
|
r, s = channels.create()
|
|
|
|
def f():
|
|
while True:
|
|
try:
|
|
obj = r.recv()
|
|
break
|
|
except channels.ChannelEmptyError:
|
|
time.sleep(0.1)
|
|
s.send(obj)
|
|
t = threading.Thread(target=f)
|
|
t.start()
|
|
|
|
orig = b'spam'
|
|
s.send(orig)
|
|
obj = r.recv()
|
|
t.join()
|
|
|
|
self.assertEqual(obj, orig)
|
|
self.assertIsNot(obj, orig)
|
|
|
|
def test_send_recv_nowait_main(self):
|
|
r, s = channels.create()
|
|
orig = b'spam'
|
|
s.send_nowait(orig)
|
|
obj = r.recv_nowait()
|
|
|
|
self.assertEqual(obj, orig)
|
|
self.assertIsNot(obj, orig)
|
|
|
|
def test_send_recv_nowait_main_with_default(self):
|
|
r, _ = channels.create()
|
|
obj = r.recv_nowait(None)
|
|
|
|
self.assertIsNone(obj)
|
|
|
|
def test_send_recv_nowait_same_interpreter(self):
|
|
interp = interpreters.create()
|
|
interp.exec(dedent("""
|
|
from test.support.interpreters import channels
|
|
r, s = channels.create()
|
|
orig = b'spam'
|
|
s.send_nowait(orig)
|
|
obj = r.recv_nowait()
|
|
assert obj == orig, 'expected: obj == orig'
|
|
# When going back to the same interpreter we get the same object.
|
|
assert obj is not orig, 'expected: obj is not orig'
|
|
"""))
|
|
|
|
@unittest.skip('broken (see BPO-...)')
|
|
def test_send_recv_nowait_different_interpreters(self):
|
|
r1, s1 = channels.create()
|
|
r2, s2 = channels.create()
|
|
orig1 = b'spam'
|
|
s1.send_nowait(orig1)
|
|
out = _run_output(
|
|
interpreters.create(),
|
|
dedent(f"""
|
|
obj1 = r.recv_nowait()
|
|
assert obj1 == b'spam', 'expected: obj1 == orig1'
|
|
# When going to another interpreter we get a copy.
|
|
assert id(obj1) != {id(orig1)}, 'expected: obj1 is not orig1'
|
|
orig2 = b'eggs'
|
|
print(id(orig2))
|
|
s.send_nowait(orig2)
|
|
"""),
|
|
channels=dict(r=r1, s=s2),
|
|
)
|
|
obj2 = r2.recv_nowait()
|
|
|
|
self.assertEqual(obj2, b'eggs')
|
|
self.assertNotEqual(id(obj2), int(out))
|
|
|
|
def test_recv_timeout(self):
|
|
r, _ = channels.create()
|
|
with self.assertRaises(TimeoutError):
|
|
r.recv(timeout=1)
|
|
|
|
def test_recv_channel_does_not_exist(self):
|
|
ch = channels.RecvChannel(1_000_000)
|
|
with self.assertRaises(channels.ChannelNotFoundError):
|
|
ch.recv()
|
|
|
|
def test_send_channel_does_not_exist(self):
|
|
ch = channels.SendChannel(1_000_000)
|
|
with self.assertRaises(channels.ChannelNotFoundError):
|
|
ch.send(b'spam')
|
|
|
|
def test_recv_nowait_channel_does_not_exist(self):
|
|
ch = channels.RecvChannel(1_000_000)
|
|
with self.assertRaises(channels.ChannelNotFoundError):
|
|
ch.recv_nowait()
|
|
|
|
def test_send_nowait_channel_does_not_exist(self):
|
|
ch = channels.SendChannel(1_000_000)
|
|
with self.assertRaises(channels.ChannelNotFoundError):
|
|
ch.send_nowait(b'spam')
|
|
|
|
def test_recv_nowait_empty(self):
|
|
ch, _ = channels.create()
|
|
with self.assertRaises(channels.ChannelEmptyError):
|
|
ch.recv_nowait()
|
|
|
|
def test_recv_nowait_default(self):
|
|
default = object()
|
|
rch, sch = channels.create()
|
|
obj1 = rch.recv_nowait(default)
|
|
sch.send_nowait(None)
|
|
sch.send_nowait(1)
|
|
sch.send_nowait(b'spam')
|
|
sch.send_nowait(b'eggs')
|
|
obj2 = rch.recv_nowait(default)
|
|
obj3 = rch.recv_nowait(default)
|
|
obj4 = rch.recv_nowait()
|
|
obj5 = rch.recv_nowait(default)
|
|
obj6 = rch.recv_nowait(default)
|
|
|
|
self.assertIs(obj1, default)
|
|
self.assertIs(obj2, None)
|
|
self.assertEqual(obj3, 1)
|
|
self.assertEqual(obj4, b'spam')
|
|
self.assertEqual(obj5, b'eggs')
|
|
self.assertIs(obj6, default)
|
|
|
|
def test_send_buffer(self):
|
|
buf = bytearray(b'spamspamspam')
|
|
obj = None
|
|
rch, sch = channels.create()
|
|
|
|
def f():
|
|
nonlocal obj
|
|
while True:
|
|
try:
|
|
obj = rch.recv()
|
|
break
|
|
except channels.ChannelEmptyError:
|
|
time.sleep(0.1)
|
|
t = threading.Thread(target=f)
|
|
t.start()
|
|
|
|
sch.send_buffer(buf)
|
|
t.join()
|
|
|
|
self.assertIsNot(obj, buf)
|
|
self.assertIsInstance(obj, memoryview)
|
|
self.assertEqual(obj, buf)
|
|
|
|
buf[4:8] = b'eggs'
|
|
self.assertEqual(obj, buf)
|
|
obj[4:8] = b'ham.'
|
|
self.assertEqual(obj, buf)
|
|
|
|
def test_send_buffer_nowait(self):
|
|
buf = bytearray(b'spamspamspam')
|
|
rch, sch = channels.create()
|
|
sch.send_buffer_nowait(buf)
|
|
obj = rch.recv()
|
|
|
|
self.assertIsNot(obj, buf)
|
|
self.assertIsInstance(obj, memoryview)
|
|
self.assertEqual(obj, buf)
|
|
|
|
buf[4:8] = b'eggs'
|
|
self.assertEqual(obj, buf)
|
|
obj[4:8] = b'ham.'
|
|
self.assertEqual(obj, buf)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
# Test needs to be a package, so we can do relative imports.
|
|
unittest.main()
|