mirror of
https://github.com/python/cpython.git
synced 2025-10-17 04:08:28 +00:00
gh-76785: Add More Tests to test_interpreters.test_api (gh-117662)
In addition to the increase test coverage, this is a precursor to sorting out how we handle interpreters created directly via the C-API.
This commit is contained in:
parent
0cc71bde00
commit
993c3cca16
18 changed files with 2015 additions and 421 deletions
|
@ -1,6 +1,7 @@
|
|||
import os
|
||||
import pickle
|
||||
from textwrap import dedent
|
||||
import sys
|
||||
from textwrap import dedent, indent
|
||||
import threading
|
||||
import types
|
||||
import unittest
|
||||
|
@ -10,8 +11,13 @@ from test.support import import_helper
|
|||
# Raise SkipTest if subinterpreters not supported.
|
||||
_interpreters = import_helper.import_module('_xxsubinterpreters')
|
||||
from test.support import interpreters
|
||||
from test.support.interpreters import InterpreterNotFoundError
|
||||
from .utils import _captured_script, _run_output, _running, TestBase
|
||||
from test.support.interpreters import (
|
||||
InterpreterError, InterpreterNotFoundError, ExecutionFailed,
|
||||
)
|
||||
from .utils import (
|
||||
_captured_script, _run_output, _running, TestBase,
|
||||
requires_test_modules, _testinternalcapi,
|
||||
)
|
||||
|
||||
|
||||
class ModuleTests(TestBase):
|
||||
|
@ -157,6 +163,20 @@ class GetCurrentTests(TestBase):
|
|||
id2 = id(interp)
|
||||
self.assertNotEqual(id1, id2)
|
||||
|
||||
@requires_test_modules
|
||||
def test_created_with_capi(self):
|
||||
last = 0
|
||||
for id, *_ in _interpreters.list_all():
|
||||
last = max(last, id)
|
||||
expected = _testinternalcapi.next_interpreter_id()
|
||||
text = self.run_temp_from_capi(f"""
|
||||
import {interpreters.__name__} as interpreters
|
||||
interp = interpreters.get_current()
|
||||
print(interp.id)
|
||||
""")
|
||||
interpid = eval(text)
|
||||
self.assertEqual(interpid, expected)
|
||||
|
||||
|
||||
class ListAllTests(TestBase):
|
||||
|
||||
|
@ -199,6 +219,33 @@ class ListAllTests(TestBase):
|
|||
for interp1, interp2 in zip(actual, expected):
|
||||
self.assertIs(interp1, interp2)
|
||||
|
||||
def test_created_with_capi(self):
|
||||
mainid, *_ = _interpreters.get_main()
|
||||
interpid1 = _interpreters.create()
|
||||
interpid2 = _interpreters.create()
|
||||
interpid3 = _interpreters.create()
|
||||
interpid4 = interpid3 + 1
|
||||
interpid5 = interpid4 + 1
|
||||
expected = [
|
||||
(mainid,),
|
||||
(interpid1,),
|
||||
(interpid2,),
|
||||
(interpid3,),
|
||||
(interpid4,),
|
||||
(interpid5,),
|
||||
]
|
||||
expected2 = expected[:-2]
|
||||
text = self.run_temp_from_capi(f"""
|
||||
import {interpreters.__name__} as interpreters
|
||||
interp = interpreters.create()
|
||||
print(
|
||||
[(i.id,) for i in interpreters.list_all()])
|
||||
""")
|
||||
res = eval(text)
|
||||
res2 = [(i.id,) for i in interpreters.list_all()]
|
||||
self.assertEqual(res, expected)
|
||||
self.assertEqual(res2, expected2)
|
||||
|
||||
|
||||
class InterpreterObjectTests(TestBase):
|
||||
|
||||
|
@ -276,6 +323,7 @@ class TestInterpreterIsRunning(TestBase):
|
|||
main = interpreters.get_main()
|
||||
self.assertTrue(main.is_running())
|
||||
|
||||
# XXX Is this still true?
|
||||
@unittest.skip('Fails on FreeBSD')
|
||||
def test_subinterpreter(self):
|
||||
interp = interpreters.create()
|
||||
|
@ -337,6 +385,55 @@ class TestInterpreterIsRunning(TestBase):
|
|||
interp.exec('t.join()')
|
||||
self.assertEqual(os.read(r_interp, 1), FINISHED)
|
||||
|
||||
def test_created_with_capi(self):
|
||||
script = dedent(f"""
|
||||
import {interpreters.__name__} as interpreters
|
||||
interp = interpreters.get_current()
|
||||
print(interp.is_running())
|
||||
""")
|
||||
def parse_results(text):
|
||||
self.assertNotEqual(text, "")
|
||||
try:
|
||||
return eval(text)
|
||||
except Exception:
|
||||
raise Exception(repr(text))
|
||||
|
||||
with self.subTest('running __main__ (from self)'):
|
||||
with self.interpreter_from_capi() as interpid:
|
||||
text = self.run_from_capi(interpid, script, main=True)
|
||||
running = parse_results(text)
|
||||
self.assertTrue(running)
|
||||
|
||||
with self.subTest('running, but not __main__ (from self)'):
|
||||
text = self.run_temp_from_capi(script)
|
||||
running = parse_results(text)
|
||||
self.assertFalse(running)
|
||||
|
||||
with self.subTest('running __main__ (from other)'):
|
||||
with self.interpreter_obj_from_capi() as (interp, interpid):
|
||||
before = interp.is_running()
|
||||
with self.running_from_capi(interpid, main=True):
|
||||
during = interp.is_running()
|
||||
after = interp.is_running()
|
||||
self.assertFalse(before)
|
||||
self.assertTrue(during)
|
||||
self.assertFalse(after)
|
||||
|
||||
with self.subTest('running, but not __main__ (from other)'):
|
||||
with self.interpreter_obj_from_capi() as (interp, interpid):
|
||||
before = interp.is_running()
|
||||
with self.running_from_capi(interpid, main=False):
|
||||
during = interp.is_running()
|
||||
after = interp.is_running()
|
||||
self.assertFalse(before)
|
||||
self.assertFalse(during)
|
||||
self.assertFalse(after)
|
||||
|
||||
with self.subTest('not running (from other)'):
|
||||
with self.interpreter_obj_from_capi() as (interp, _):
|
||||
running = interp.is_running()
|
||||
self.assertFalse(running)
|
||||
|
||||
|
||||
class TestInterpreterClose(TestBase):
|
||||
|
||||
|
@ -364,11 +461,11 @@ class TestInterpreterClose(TestBase):
|
|||
|
||||
def test_main(self):
|
||||
main, = interpreters.list_all()
|
||||
with self.assertRaises(interpreters.InterpreterError):
|
||||
with self.assertRaises(InterpreterError):
|
||||
main.close()
|
||||
|
||||
def f():
|
||||
with self.assertRaises(interpreters.InterpreterError):
|
||||
with self.assertRaises(InterpreterError):
|
||||
main.close()
|
||||
|
||||
t = threading.Thread(target=f)
|
||||
|
@ -419,12 +516,13 @@ class TestInterpreterClose(TestBase):
|
|||
t.start()
|
||||
t.join()
|
||||
|
||||
# XXX Is this still true?
|
||||
@unittest.skip('Fails on FreeBSD')
|
||||
def test_still_running(self):
|
||||
main, = interpreters.list_all()
|
||||
interp = interpreters.create()
|
||||
with _running(interp):
|
||||
with self.assertRaises(interpreters.InterpreterError):
|
||||
with self.assertRaises(InterpreterError):
|
||||
interp.close()
|
||||
self.assertTrue(interp.is_running())
|
||||
|
||||
|
@ -459,6 +557,52 @@ class TestInterpreterClose(TestBase):
|
|||
|
||||
self.assertEqual(os.read(r_interp, 1), FINISHED)
|
||||
|
||||
def test_created_with_capi(self):
|
||||
script = dedent(f"""
|
||||
import {interpreters.__name__} as interpreters
|
||||
interp = interpreters.get_current()
|
||||
interp.close()
|
||||
""")
|
||||
|
||||
with self.subTest('running __main__ (from self)'):
|
||||
with self.interpreter_from_capi() as interpid:
|
||||
with self.assertRaisesRegex(ExecutionFailed,
|
||||
'InterpreterError.*current'):
|
||||
self.run_from_capi(interpid, script, main=True)
|
||||
|
||||
with self.subTest('running, but not __main__ (from self)'):
|
||||
with self.assertRaisesRegex(ExecutionFailed,
|
||||
'InterpreterError.*current'):
|
||||
self.run_temp_from_capi(script)
|
||||
|
||||
with self.subTest('running __main__ (from other)'):
|
||||
with self.interpreter_obj_from_capi() as (interp, interpid):
|
||||
with self.running_from_capi(interpid, main=True):
|
||||
with self.assertRaisesRegex(InterpreterError, 'running'):
|
||||
interp.close()
|
||||
# Make sure it wssn't closed.
|
||||
self.assertTrue(
|
||||
interp.is_running())
|
||||
|
||||
# The rest must be skipped until we deal with running threads when
|
||||
# interp.close() is called.
|
||||
return
|
||||
|
||||
with self.subTest('running, but not __main__ (from other)'):
|
||||
with self.interpreter_obj_from_capi() as (interp, interpid):
|
||||
with self.running_from_capi(interpid, main=False):
|
||||
with self.assertRaisesRegex(InterpreterError, 'not managed'):
|
||||
interp.close()
|
||||
# Make sure it wssn't closed.
|
||||
self.assertFalse(interp.is_running())
|
||||
|
||||
with self.subTest('not running (from other)'):
|
||||
with self.interpreter_obj_from_capi() as (interp, _):
|
||||
with self.assertRaisesRegex(InterpreterError, 'not managed'):
|
||||
interp.close()
|
||||
# Make sure it wssn't closed.
|
||||
self.assertFalse(interp.is_running())
|
||||
|
||||
|
||||
class TestInterpreterPrepareMain(TestBase):
|
||||
|
||||
|
@ -511,26 +655,45 @@ class TestInterpreterPrepareMain(TestBase):
|
|||
interp.prepare_main(spam={'spam': 'eggs', 'foo': 'bar'})
|
||||
|
||||
# Make sure neither was actually bound.
|
||||
with self.assertRaises(interpreters.ExecutionFailed):
|
||||
with self.assertRaises(ExecutionFailed):
|
||||
interp.exec('print(foo)')
|
||||
with self.assertRaises(interpreters.ExecutionFailed):
|
||||
with self.assertRaises(ExecutionFailed):
|
||||
interp.exec('print(spam)')
|
||||
|
||||
def test_running(self):
|
||||
interp = interpreters.create()
|
||||
interp.prepare_main({'spam': True})
|
||||
with self.running(interp):
|
||||
with self.assertRaisesRegex(InterpreterError, 'running'):
|
||||
interp.prepare_main({'spam': False})
|
||||
interp.exec('assert spam is True')
|
||||
|
||||
@requires_test_modules
|
||||
def test_created_with_capi(self):
|
||||
with self.interpreter_from_capi() as interpid:
|
||||
interp = interpreters.Interpreter(interpid)
|
||||
interp.prepare_main({'spam': True})
|
||||
rc = _testinternalcapi.exec_interpreter(interpid,
|
||||
'assert spam is True')
|
||||
assert rc == 0, rc
|
||||
|
||||
|
||||
class TestInterpreterExec(TestBase):
|
||||
|
||||
def test_success(self):
|
||||
interp = interpreters.create()
|
||||
script, file = _captured_script('print("it worked!", end="")')
|
||||
with file:
|
||||
script, results = _captured_script('print("it worked!", end="")')
|
||||
with results:
|
||||
interp.exec(script)
|
||||
out = file.read()
|
||||
results = results.final()
|
||||
results.raise_if_failed()
|
||||
out = results.stdout
|
||||
|
||||
self.assertEqual(out, 'it worked!')
|
||||
|
||||
def test_failure(self):
|
||||
interp = interpreters.create()
|
||||
with self.assertRaises(interpreters.ExecutionFailed):
|
||||
with self.assertRaises(ExecutionFailed):
|
||||
interp.exec('raise Exception')
|
||||
|
||||
def test_display_preserved_exception(self):
|
||||
|
@ -583,15 +746,17 @@ class TestInterpreterExec(TestBase):
|
|||
|
||||
def test_in_thread(self):
|
||||
interp = interpreters.create()
|
||||
script, file = _captured_script('print("it worked!", end="")')
|
||||
with file:
|
||||
script, results = _captured_script('print("it worked!", end="")')
|
||||
with results:
|
||||
def f():
|
||||
interp.exec(script)
|
||||
|
||||
t = threading.Thread(target=f)
|
||||
t.start()
|
||||
t.join()
|
||||
out = file.read()
|
||||
results = results.final()
|
||||
results.raise_if_failed()
|
||||
out = results.stdout
|
||||
|
||||
self.assertEqual(out, 'it worked!')
|
||||
|
||||
|
@ -618,6 +783,7 @@ class TestInterpreterExec(TestBase):
|
|||
content = file.read()
|
||||
self.assertEqual(content, expected)
|
||||
|
||||
# XXX Is this still true?
|
||||
@unittest.skip('Fails on FreeBSD')
|
||||
def test_already_running(self):
|
||||
interp = interpreters.create()
|
||||
|
@ -666,6 +832,11 @@ class TestInterpreterExec(TestBase):
|
|||
self.assertEqual(os.read(r_interp, 1), RAN)
|
||||
self.assertEqual(os.read(r_interp, 1), FINISHED)
|
||||
|
||||
def test_created_with_capi(self):
|
||||
with self.interpreter_obj_from_capi() as (interp, _):
|
||||
with self.assertRaisesRegex(ExecutionFailed, 'it worked'):
|
||||
interp.exec('raise Exception("it worked!")')
|
||||
|
||||
# test_xxsubinterpreters covers the remaining
|
||||
# Interpreter.exec() behavior.
|
||||
|
||||
|
@ -830,7 +1001,7 @@ class TestInterpreterCall(TestBase):
|
|||
raise Exception((args, kwargs))
|
||||
interp.call(callable)
|
||||
|
||||
with self.assertRaises(interpreters.ExecutionFailed):
|
||||
with self.assertRaises(ExecutionFailed):
|
||||
interp.call(call_func_failure)
|
||||
|
||||
def test_call_in_thread(self):
|
||||
|
@ -942,6 +1113,14 @@ class LowLevelTests(TestBase):
|
|||
# encountered by the high-level module, thus they
|
||||
# mostly shouldn't matter as much.
|
||||
|
||||
def interp_exists(self, interpid):
|
||||
try:
|
||||
_interpreters.is_running(interpid)
|
||||
except InterpreterNotFoundError:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
def test_new_config(self):
|
||||
# This test overlaps with
|
||||
# test.test_capi.test_misc.InterpreterConfigTests.
|
||||
|
@ -1064,46 +1243,107 @@ class LowLevelTests(TestBase):
|
|||
with self.assertRaises(ValueError):
|
||||
_interpreters.new_config(gil=value)
|
||||
|
||||
def test_get_config(self):
|
||||
# This test overlaps with
|
||||
# test.test_capi.test_misc.InterpreterConfigTests.
|
||||
def test_get_main(self):
|
||||
interpid, = _interpreters.get_main()
|
||||
self.assertEqual(interpid, 0)
|
||||
|
||||
def test_get_current(self):
|
||||
with self.subTest('main'):
|
||||
main, *_ = _interpreters.get_main()
|
||||
interpid, = _interpreters.get_current()
|
||||
self.assertEqual(interpid, main)
|
||||
|
||||
script = f"""
|
||||
import {_interpreters.__name__} as _interpreters
|
||||
interpid, = _interpreters.get_current()
|
||||
print(interpid)
|
||||
"""
|
||||
def parse_stdout(text):
|
||||
parts = text.split()
|
||||
assert len(parts) == 1, parts
|
||||
interpid, = parts
|
||||
interpid = int(interpid)
|
||||
return interpid,
|
||||
|
||||
with self.subTest('from _interpreters'):
|
||||
orig = _interpreters.create()
|
||||
text = self.run_and_capture(orig, script)
|
||||
interpid, = parse_stdout(text)
|
||||
self.assertEqual(interpid, orig)
|
||||
|
||||
with self.subTest('from C-API'):
|
||||
last = 0
|
||||
for id, *_ in _interpreters.list_all():
|
||||
last = max(last, id)
|
||||
expected = last + 1
|
||||
text = self.run_temp_from_capi(script)
|
||||
interpid, = parse_stdout(text)
|
||||
self.assertEqual(interpid, expected)
|
||||
|
||||
def test_list_all(self):
|
||||
mainid, *_ = _interpreters.get_main()
|
||||
interpid1 = _interpreters.create()
|
||||
interpid2 = _interpreters.create()
|
||||
interpid3 = _interpreters.create()
|
||||
expected = [
|
||||
(mainid,),
|
||||
(interpid1,),
|
||||
(interpid2,),
|
||||
(interpid3,),
|
||||
]
|
||||
|
||||
with self.subTest('main'):
|
||||
expected = _interpreters.new_config('legacy')
|
||||
expected.gil = 'own'
|
||||
interpid = _interpreters.get_main()
|
||||
config = _interpreters.get_config(interpid)
|
||||
self.assert_ns_equal(config, expected)
|
||||
res = _interpreters.list_all()
|
||||
self.assertEqual(res, expected)
|
||||
|
||||
with self.subTest('isolated'):
|
||||
expected = _interpreters.new_config('isolated')
|
||||
interpid = _interpreters.create('isolated')
|
||||
config = _interpreters.get_config(interpid)
|
||||
self.assert_ns_equal(config, expected)
|
||||
with self.subTest('from _interpreters'):
|
||||
text = self.run_and_capture(interpid2, f"""
|
||||
import {_interpreters.__name__} as _interpreters
|
||||
print(
|
||||
_interpreters.list_all())
|
||||
""")
|
||||
|
||||
with self.subTest('legacy'):
|
||||
expected = _interpreters.new_config('legacy')
|
||||
interpid = _interpreters.create('legacy')
|
||||
config = _interpreters.get_config(interpid)
|
||||
self.assert_ns_equal(config, expected)
|
||||
res = eval(text)
|
||||
self.assertEqual(res, expected)
|
||||
|
||||
with self.subTest('from C-API'):
|
||||
interpid4 = interpid3 + 1
|
||||
interpid5 = interpid4 + 1
|
||||
expected2 = expected + [
|
||||
(interpid4,),
|
||||
(interpid5,),
|
||||
]
|
||||
expected3 = expected + [
|
||||
(interpid5,),
|
||||
]
|
||||
text = self.run_temp_from_capi(f"""
|
||||
import {_interpreters.__name__} as _interpreters
|
||||
_interpreters.create()
|
||||
print(
|
||||
_interpreters.list_all())
|
||||
""")
|
||||
res2 = eval(text)
|
||||
res3 = _interpreters.list_all()
|
||||
self.assertEqual(res2, expected2)
|
||||
self.assertEqual(res3, expected3)
|
||||
|
||||
def test_create(self):
|
||||
isolated = _interpreters.new_config('isolated')
|
||||
legacy = _interpreters.new_config('legacy')
|
||||
default = isolated
|
||||
|
||||
with self.subTest('no arg'):
|
||||
with self.subTest('no args'):
|
||||
interpid = _interpreters.create()
|
||||
config = _interpreters.get_config(interpid)
|
||||
self.assert_ns_equal(config, default)
|
||||
|
||||
with self.subTest('arg: None'):
|
||||
with self.subTest('config: None'):
|
||||
interpid = _interpreters.create(None)
|
||||
config = _interpreters.get_config(interpid)
|
||||
self.assert_ns_equal(config, default)
|
||||
|
||||
with self.subTest('arg: \'empty\''):
|
||||
with self.assertRaises(interpreters.InterpreterError):
|
||||
with self.subTest('config: \'empty\''):
|
||||
with self.assertRaises(InterpreterError):
|
||||
# The "empty" config isn't viable on its own.
|
||||
_interpreters.create('empty')
|
||||
|
||||
|
@ -1138,6 +1378,230 @@ class LowLevelTests(TestBase):
|
|||
with self.assertRaises(ValueError):
|
||||
_interpreters.create(orig)
|
||||
|
||||
@requires_test_modules
|
||||
def test_destroy(self):
|
||||
with self.subTest('from _interpreters'):
|
||||
interpid = _interpreters.create()
|
||||
before = [id for id, *_ in _interpreters.list_all()]
|
||||
_interpreters.destroy(interpid)
|
||||
after = [id for id, *_ in _interpreters.list_all()]
|
||||
|
||||
self.assertIn(interpid, before)
|
||||
self.assertNotIn(interpid, after)
|
||||
self.assertFalse(
|
||||
self.interp_exists(interpid))
|
||||
|
||||
with self.subTest('main'):
|
||||
interpid, *_ = _interpreters.get_main()
|
||||
with self.assertRaises(InterpreterError):
|
||||
# It is the current interpreter.
|
||||
_interpreters.destroy(interpid)
|
||||
|
||||
with self.subTest('from C-API'):
|
||||
interpid = _testinternalcapi.create_interpreter()
|
||||
_interpreters.destroy(interpid)
|
||||
self.assertFalse(
|
||||
self.interp_exists(interpid))
|
||||
|
||||
def test_get_config(self):
|
||||
# This test overlaps with
|
||||
# test.test_capi.test_misc.InterpreterConfigTests.
|
||||
|
||||
with self.subTest('main'):
|
||||
expected = _interpreters.new_config('legacy')
|
||||
expected.gil = 'own'
|
||||
interpid, *_ = _interpreters.get_main()
|
||||
config = _interpreters.get_config(interpid)
|
||||
self.assert_ns_equal(config, expected)
|
||||
|
||||
with self.subTest('main'):
|
||||
expected = _interpreters.new_config('legacy')
|
||||
expected.gil = 'own'
|
||||
interpid, *_ = _interpreters.get_main()
|
||||
config = _interpreters.get_config(interpid)
|
||||
self.assert_ns_equal(config, expected)
|
||||
|
||||
with self.subTest('isolated'):
|
||||
expected = _interpreters.new_config('isolated')
|
||||
interpid = _interpreters.create('isolated')
|
||||
config = _interpreters.get_config(interpid)
|
||||
self.assert_ns_equal(config, expected)
|
||||
|
||||
with self.subTest('legacy'):
|
||||
expected = _interpreters.new_config('legacy')
|
||||
interpid = _interpreters.create('legacy')
|
||||
config = _interpreters.get_config(interpid)
|
||||
self.assert_ns_equal(config, expected)
|
||||
|
||||
with self.subTest('from C-API'):
|
||||
orig = _interpreters.new_config('isolated')
|
||||
with self.interpreter_from_capi(orig) as interpid:
|
||||
config = _interpreters.get_config(interpid)
|
||||
self.assert_ns_equal(config, orig)
|
||||
|
||||
@requires_test_modules
|
||||
def test_whence(self):
|
||||
with self.subTest('main'):
|
||||
interpid, *_ = _interpreters.get_main()
|
||||
whence = _interpreters.whence(interpid)
|
||||
self.assertEqual(whence, _interpreters.WHENCE_RUNTIME)
|
||||
|
||||
with self.subTest('stdlib'):
|
||||
interpid = _interpreters.create()
|
||||
whence = _interpreters.whence(interpid)
|
||||
self.assertEqual(whence, _interpreters.WHENCE_XI)
|
||||
|
||||
for orig, name in {
|
||||
# XXX Also check WHENCE_UNKNOWN.
|
||||
_interpreters.WHENCE_LEGACY_CAPI: 'legacy C-API',
|
||||
_interpreters.WHENCE_CAPI: 'C-API',
|
||||
_interpreters.WHENCE_XI: 'cross-interpreter C-API',
|
||||
}.items():
|
||||
with self.subTest(f'from C-API ({orig}: {name})'):
|
||||
with self.interpreter_from_capi(whence=orig) as interpid:
|
||||
whence = _interpreters.whence(interpid)
|
||||
self.assertEqual(whence, orig)
|
||||
|
||||
with self.subTest('from C-API, running'):
|
||||
text = self.run_temp_from_capi(dedent(f"""
|
||||
import {_interpreters.__name__} as _interpreters
|
||||
interpid, *_ = _interpreters.get_current()
|
||||
print(_interpreters.whence(interpid))
|
||||
"""),
|
||||
config=True)
|
||||
whence = eval(text)
|
||||
self.assertEqual(whence, _interpreters.WHENCE_CAPI)
|
||||
|
||||
with self.subTest('from legacy C-API, running'):
|
||||
...
|
||||
text = self.run_temp_from_capi(dedent(f"""
|
||||
import {_interpreters.__name__} as _interpreters
|
||||
interpid, *_ = _interpreters.get_current()
|
||||
print(_interpreters.whence(interpid))
|
||||
"""),
|
||||
config=False)
|
||||
whence = eval(text)
|
||||
self.assertEqual(whence, _interpreters.WHENCE_LEGACY_CAPI)
|
||||
|
||||
def test_is_running(self):
|
||||
with self.subTest('main'):
|
||||
interpid, *_ = _interpreters.get_main()
|
||||
running = _interpreters.is_running(interpid)
|
||||
self.assertTrue(running)
|
||||
|
||||
with self.subTest('from _interpreters (running)'):
|
||||
interpid = _interpreters.create()
|
||||
with self.running(interpid):
|
||||
running = _interpreters.is_running(interpid)
|
||||
self.assertTrue(running)
|
||||
|
||||
with self.subTest('from _interpreters (not running)'):
|
||||
interpid = _interpreters.create()
|
||||
running = _interpreters.is_running(interpid)
|
||||
self.assertFalse(running)
|
||||
|
||||
with self.subTest('from C-API (running __main__)'):
|
||||
with self.interpreter_from_capi() as interpid:
|
||||
with self.running_from_capi(interpid, main=True):
|
||||
running = _interpreters.is_running(interpid)
|
||||
self.assertTrue(running)
|
||||
|
||||
with self.subTest('from C-API (running, but not __main__)'):
|
||||
with self.interpreter_from_capi() as interpid:
|
||||
with self.running_from_capi(interpid, main=False):
|
||||
running = _interpreters.is_running(interpid)
|
||||
self.assertFalse(running)
|
||||
|
||||
with self.subTest('from C-API (not running)'):
|
||||
with self.interpreter_from_capi() as interpid:
|
||||
running = _interpreters.is_running(interpid)
|
||||
self.assertFalse(running)
|
||||
|
||||
def test_exec(self):
|
||||
with self.subTest('run script'):
|
||||
interpid = _interpreters.create()
|
||||
script, results = _captured_script('print("it worked!", end="")')
|
||||
with results:
|
||||
exc = _interpreters.exec(interpid, script)
|
||||
results = results.final()
|
||||
results.raise_if_failed()
|
||||
out = results.stdout
|
||||
self.assertEqual(out, 'it worked!')
|
||||
|
||||
with self.subTest('uncaught exception'):
|
||||
interpid = _interpreters.create()
|
||||
script, results = _captured_script("""
|
||||
raise Exception('uh-oh!')
|
||||
print("it worked!", end="")
|
||||
""")
|
||||
with results:
|
||||
exc = _interpreters.exec(interpid, script)
|
||||
out = results.stdout()
|
||||
self.assertEqual(out, '')
|
||||
self.assert_ns_equal(exc, types.SimpleNamespace(
|
||||
type=types.SimpleNamespace(
|
||||
__name__='Exception',
|
||||
__qualname__='Exception',
|
||||
__module__='builtins',
|
||||
),
|
||||
msg='uh-oh!',
|
||||
# We check these in other tests.
|
||||
formatted=exc.formatted,
|
||||
errdisplay=exc.errdisplay,
|
||||
))
|
||||
|
||||
with self.subTest('from C-API'):
|
||||
with self.interpreter_from_capi() as interpid:
|
||||
exc = _interpreters.exec(interpid, 'raise Exception("it worked!")')
|
||||
self.assertIsNot(exc, None)
|
||||
self.assertEqual(exc.msg, 'it worked!')
|
||||
|
||||
def test_call(self):
|
||||
with self.subTest('no args'):
|
||||
interpid = _interpreters.create()
|
||||
exc = _interpreters.call(interpid, call_func_return_shareable)
|
||||
self.assertIs(exc, None)
|
||||
|
||||
with self.subTest('uncaught exception'):
|
||||
interpid = _interpreters.create()
|
||||
exc = _interpreters.call(interpid, call_func_failure)
|
||||
self.assertEqual(exc, types.SimpleNamespace(
|
||||
type=types.SimpleNamespace(
|
||||
__name__='Exception',
|
||||
__qualname__='Exception',
|
||||
__module__='builtins',
|
||||
),
|
||||
msg='spam!',
|
||||
# We check these in other tests.
|
||||
formatted=exc.formatted,
|
||||
errdisplay=exc.errdisplay,
|
||||
))
|
||||
|
||||
def test_set___main___attrs(self):
|
||||
with self.subTest('from _interpreters'):
|
||||
interpid = _interpreters.create()
|
||||
before1 = _interpreters.exec(interpid, 'assert spam == \'eggs\'')
|
||||
before2 = _interpreters.exec(interpid, 'assert ham == 42')
|
||||
self.assertEqual(before1.type.__name__, 'NameError')
|
||||
self.assertEqual(before2.type.__name__, 'NameError')
|
||||
|
||||
_interpreters.set___main___attrs(interpid, dict(
|
||||
spam='eggs',
|
||||
ham=42,
|
||||
))
|
||||
after1 = _interpreters.exec(interpid, 'assert spam == \'eggs\'')
|
||||
after2 = _interpreters.exec(interpid, 'assert ham == 42')
|
||||
after3 = _interpreters.exec(interpid, 'assert spam == 42')
|
||||
self.assertIs(after1, None)
|
||||
self.assertIs(after2, None)
|
||||
self.assertEqual(after3.type.__name__, 'AssertionError')
|
||||
|
||||
with self.subTest('from C-API'):
|
||||
with self.interpreter_from_capi() as interpid:
|
||||
_interpreters.set___main___attrs(interpid, {'spam': True})
|
||||
exc = _interpreters.exec(interpid, 'assert spam is True')
|
||||
self.assertIsNone(exc)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
# Test needs to be a package, so we can do relative imports.
|
||||
|
|
|
@ -1,30 +1,344 @@
|
|||
from collections import namedtuple
|
||||
import contextlib
|
||||
import json
|
||||
import io
|
||||
import os
|
||||
import os.path
|
||||
import pickle
|
||||
import queue
|
||||
#import select
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
from textwrap import dedent
|
||||
from textwrap import dedent, indent
|
||||
import threading
|
||||
import types
|
||||
import unittest
|
||||
import warnings
|
||||
|
||||
from test import support
|
||||
from test.support import os_helper
|
||||
from test.support import import_helper
|
||||
|
||||
_interpreters = import_helper.import_module('_xxsubinterpreters')
|
||||
from test.support import interpreters
|
||||
|
||||
|
||||
def _captured_script(script):
|
||||
r, w = os.pipe()
|
||||
indented = script.replace('\n', '\n ')
|
||||
wrapped = dedent(f"""
|
||||
import contextlib
|
||||
with open({w}, 'w', encoding='utf-8') as spipe:
|
||||
with contextlib.redirect_stdout(spipe):
|
||||
try:
|
||||
import _testinternalcapi
|
||||
import _testcapi
|
||||
except ImportError:
|
||||
_testinternalcapi = None
|
||||
_testcapi = None
|
||||
|
||||
def requires_test_modules(func):
|
||||
return unittest.skipIf(_testinternalcapi is None, "test requires _testinternalcapi module")(func)
|
||||
|
||||
|
||||
def _dump_script(text):
|
||||
lines = text.splitlines()
|
||||
print()
|
||||
print('-' * 20)
|
||||
for i, line in enumerate(lines, 1):
|
||||
print(f' {i:>{len(str(len(lines)))}} {line}')
|
||||
print('-' * 20)
|
||||
|
||||
|
||||
def _close_file(file):
|
||||
try:
|
||||
if hasattr(file, 'close'):
|
||||
file.close()
|
||||
else:
|
||||
os.close(file)
|
||||
except OSError as exc:
|
||||
if exc.errno != 9:
|
||||
raise # re-raise
|
||||
# It was closed already.
|
||||
|
||||
|
||||
def pack_exception(exc=None):
|
||||
captured = _interpreters.capture_exception(exc)
|
||||
data = dict(captured.__dict__)
|
||||
data['type'] = dict(captured.type.__dict__)
|
||||
return json.dumps(data)
|
||||
|
||||
|
||||
def unpack_exception(packed):
|
||||
try:
|
||||
data = json.loads(packed)
|
||||
except json.decoder.JSONDecodeError:
|
||||
warnings.warn('incomplete exception data', RuntimeWarning)
|
||||
print(packed if isinstance(packed, str) else packed.decode('utf-8'))
|
||||
return None
|
||||
exc = types.SimpleNamespace(**data)
|
||||
exc.type = types.SimpleNamespace(**exc.type)
|
||||
return exc;
|
||||
|
||||
|
||||
class CapturingResults:
|
||||
|
||||
STDIO = dedent("""\
|
||||
with open({w_pipe}, 'wb', buffering=0) as _spipe_{stream}:
|
||||
_captured_std{stream} = io.StringIO()
|
||||
with contextlib.redirect_std{stream}(_captured_std{stream}):
|
||||
#########################
|
||||
# begin wrapped script
|
||||
|
||||
{indented}
|
||||
""")
|
||||
return wrapped, open(r, encoding='utf-8')
|
||||
|
||||
# end wrapped script
|
||||
#########################
|
||||
text = _captured_std{stream}.getvalue()
|
||||
_spipe_{stream}.write(text.encode('utf-8'))
|
||||
""")[:-1]
|
||||
EXC = dedent("""\
|
||||
with open({w_pipe}, 'wb', buffering=0) as _spipe_exc:
|
||||
try:
|
||||
#########################
|
||||
# begin wrapped script
|
||||
|
||||
{indented}
|
||||
|
||||
# end wrapped script
|
||||
#########################
|
||||
except Exception as exc:
|
||||
text = _interp_utils.pack_exception(exc)
|
||||
_spipe_exc.write(text.encode('utf-8'))
|
||||
""")[:-1]
|
||||
|
||||
@classmethod
|
||||
def wrap_script(cls, script, *, stdout=True, stderr=False, exc=False):
|
||||
script = dedent(script).strip(os.linesep)
|
||||
imports = [
|
||||
f'import {__name__} as _interp_utils',
|
||||
]
|
||||
wrapped = script
|
||||
|
||||
# Handle exc.
|
||||
if exc:
|
||||
exc = os.pipe()
|
||||
r_exc, w_exc = exc
|
||||
indented = wrapped.replace('\n', '\n ')
|
||||
wrapped = cls.EXC.format(
|
||||
w_pipe=w_exc,
|
||||
indented=indented,
|
||||
)
|
||||
else:
|
||||
exc = None
|
||||
|
||||
# Handle stdout.
|
||||
if stdout:
|
||||
imports.extend([
|
||||
'import contextlib, io',
|
||||
])
|
||||
stdout = os.pipe()
|
||||
r_out, w_out = stdout
|
||||
indented = wrapped.replace('\n', '\n ')
|
||||
wrapped = cls.STDIO.format(
|
||||
w_pipe=w_out,
|
||||
indented=indented,
|
||||
stream='out',
|
||||
)
|
||||
else:
|
||||
stdout = None
|
||||
|
||||
# Handle stderr.
|
||||
if stderr == 'stdout':
|
||||
stderr = None
|
||||
elif stderr:
|
||||
if not stdout:
|
||||
imports.extend([
|
||||
'import contextlib, io',
|
||||
])
|
||||
stderr = os.pipe()
|
||||
r_err, w_err = stderr
|
||||
indented = wrapped.replace('\n', '\n ')
|
||||
wrapped = cls.STDIO.format(
|
||||
w_pipe=w_err,
|
||||
indented=indented,
|
||||
stream='err',
|
||||
)
|
||||
else:
|
||||
stderr = None
|
||||
|
||||
if wrapped == script:
|
||||
raise NotImplementedError
|
||||
else:
|
||||
for line in imports:
|
||||
wrapped = f'{line}{os.linesep}{wrapped}'
|
||||
|
||||
results = cls(stdout, stderr, exc)
|
||||
return wrapped, results
|
||||
|
||||
def __init__(self, out, err, exc):
|
||||
self._rf_out = None
|
||||
self._rf_err = None
|
||||
self._rf_exc = None
|
||||
self._w_out = None
|
||||
self._w_err = None
|
||||
self._w_exc = None
|
||||
|
||||
if out is not None:
|
||||
r_out, w_out = out
|
||||
self._rf_out = open(r_out, 'rb', buffering=0)
|
||||
self._w_out = w_out
|
||||
|
||||
if err is not None:
|
||||
r_err, w_err = err
|
||||
self._rf_err = open(r_err, 'rb', buffering=0)
|
||||
self._w_err = w_err
|
||||
|
||||
if exc is not None:
|
||||
r_exc, w_exc = exc
|
||||
self._rf_exc = open(r_exc, 'rb', buffering=0)
|
||||
self._w_exc = w_exc
|
||||
|
||||
self._buf_out = b''
|
||||
self._buf_err = b''
|
||||
self._buf_exc = b''
|
||||
self._exc = None
|
||||
|
||||
self._closed = False
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, *args):
|
||||
self.close()
|
||||
|
||||
@property
|
||||
def closed(self):
|
||||
return self._closed
|
||||
|
||||
def close(self):
|
||||
if self._closed:
|
||||
return
|
||||
self._closed = True
|
||||
|
||||
if self._w_out is not None:
|
||||
_close_file(self._w_out)
|
||||
self._w_out = None
|
||||
if self._w_err is not None:
|
||||
_close_file(self._w_err)
|
||||
self._w_err = None
|
||||
if self._w_exc is not None:
|
||||
_close_file(self._w_exc)
|
||||
self._w_exc = None
|
||||
|
||||
self._capture()
|
||||
|
||||
if self._rf_out is not None:
|
||||
_close_file(self._rf_out)
|
||||
self._rf_out = None
|
||||
if self._rf_err is not None:
|
||||
_close_file(self._rf_err)
|
||||
self._rf_err = None
|
||||
if self._rf_exc is not None:
|
||||
_close_file(self._rf_exc)
|
||||
self._rf_exc = None
|
||||
|
||||
def _capture(self):
|
||||
# Ideally this is called only after the script finishes
|
||||
# (and thus has closed the write end of the pipe.
|
||||
if self._rf_out is not None:
|
||||
chunk = self._rf_out.read(100)
|
||||
while chunk:
|
||||
self._buf_out += chunk
|
||||
chunk = self._rf_out.read(100)
|
||||
if self._rf_err is not None:
|
||||
chunk = self._rf_err.read(100)
|
||||
while chunk:
|
||||
self._buf_err += chunk
|
||||
chunk = self._rf_err.read(100)
|
||||
if self._rf_exc is not None:
|
||||
chunk = self._rf_exc.read(100)
|
||||
while chunk:
|
||||
self._buf_exc += chunk
|
||||
chunk = self._rf_exc.read(100)
|
||||
|
||||
def _unpack_stdout(self):
|
||||
return self._buf_out.decode('utf-8')
|
||||
|
||||
def _unpack_stderr(self):
|
||||
return self._buf_err.decode('utf-8')
|
||||
|
||||
def _unpack_exc(self):
|
||||
if self._exc is not None:
|
||||
return self._exc
|
||||
if not self._buf_exc:
|
||||
return None
|
||||
self._exc = unpack_exception(self._buf_exc)
|
||||
return self._exc
|
||||
|
||||
def stdout(self):
|
||||
if self.closed:
|
||||
return self.final().stdout
|
||||
self._capture()
|
||||
return self._unpack_stdout()
|
||||
|
||||
def stderr(self):
|
||||
if self.closed:
|
||||
return self.final().stderr
|
||||
self._capture()
|
||||
return self._unpack_stderr()
|
||||
|
||||
def exc(self):
|
||||
if self.closed:
|
||||
return self.final().exc
|
||||
self._capture()
|
||||
return self._unpack_exc()
|
||||
|
||||
def final(self, *, force=False):
|
||||
try:
|
||||
return self._final
|
||||
except AttributeError:
|
||||
if not self._closed:
|
||||
if not force:
|
||||
raise Exception('no final results available yet')
|
||||
else:
|
||||
return CapturedResults.Proxy(self)
|
||||
self._final = CapturedResults(
|
||||
self._unpack_stdout(),
|
||||
self._unpack_stderr(),
|
||||
self._unpack_exc(),
|
||||
)
|
||||
return self._final
|
||||
|
||||
|
||||
class CapturedResults(namedtuple('CapturedResults', 'stdout stderr exc')):
|
||||
|
||||
class Proxy:
|
||||
def __init__(self, capturing):
|
||||
self._capturing = capturing
|
||||
def _finish(self):
|
||||
if self._capturing is None:
|
||||
return
|
||||
self._final = self._capturing.final()
|
||||
self._capturing = None
|
||||
def __iter__(self):
|
||||
self._finish()
|
||||
yield from self._final
|
||||
def __len__(self):
|
||||
self._finish()
|
||||
return len(self._final)
|
||||
def __getattr__(self, name):
|
||||
self._finish()
|
||||
if name.startswith('_'):
|
||||
raise AttributeError(name)
|
||||
return getattr(self._final, name)
|
||||
|
||||
def raise_if_failed(self):
|
||||
if self.exc is not None:
|
||||
raise interpreters.ExecutionFailed(self.exc)
|
||||
|
||||
|
||||
def _captured_script(script, *, stdout=True, stderr=False, exc=False):
|
||||
return CapturingResults.wrap_script(
|
||||
script,
|
||||
stdout=stdout,
|
||||
stderr=stderr,
|
||||
exc=exc,
|
||||
)
|
||||
|
||||
|
||||
def clean_up_interpreters():
|
||||
|
@ -33,17 +347,17 @@ def clean_up_interpreters():
|
|||
continue
|
||||
try:
|
||||
interp.close()
|
||||
except RuntimeError:
|
||||
except _interpreters.InterpreterError:
|
||||
pass # already destroyed
|
||||
|
||||
|
||||
def _run_output(interp, request, init=None):
|
||||
script, rpipe = _captured_script(request)
|
||||
with rpipe:
|
||||
script, results = _captured_script(request)
|
||||
with results:
|
||||
if init:
|
||||
interp.prepare_main(init)
|
||||
interp.exec(script)
|
||||
return rpipe.read()
|
||||
return results.stdout()
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
|
@ -175,3 +489,184 @@ class TestBase(unittest.TestCase):
|
|||
diff = f'namespace({diff})'
|
||||
standardMsg = self._truncateMessage(standardMsg, diff)
|
||||
self.fail(self._formatMessage(msg, standardMsg))
|
||||
|
||||
def _run_string(self, interp, script):
|
||||
wrapped, results = _captured_script(script, exc=False)
|
||||
#_dump_script(wrapped)
|
||||
with results:
|
||||
if isinstance(interp, interpreters.Interpreter):
|
||||
interp.exec(script)
|
||||
else:
|
||||
err = _interpreters.run_string(interp, wrapped)
|
||||
if err is not None:
|
||||
return None, err
|
||||
return results.stdout(), None
|
||||
|
||||
def run_and_capture(self, interp, script):
|
||||
text, err = self._run_string(interp, script)
|
||||
if err is not None:
|
||||
raise interpreters.ExecutionFailed(err)
|
||||
else:
|
||||
return text
|
||||
|
||||
@requires_test_modules
|
||||
@contextlib.contextmanager
|
||||
def interpreter_from_capi(self, config=None, whence=None):
|
||||
if config is False:
|
||||
if whence is None:
|
||||
whence = _interpreters.WHENCE_LEGACY_CAPI
|
||||
else:
|
||||
assert whence in (_interpreters.WHENCE_LEGACY_CAPI,
|
||||
_interpreters.WHENCE_UNKNOWN), repr(whence)
|
||||
config = None
|
||||
elif config is True:
|
||||
config = _interpreters.new_config('default')
|
||||
elif config is None:
|
||||
if whence not in (
|
||||
_interpreters.WHENCE_LEGACY_CAPI,
|
||||
_interpreters.WHENCE_UNKNOWN,
|
||||
):
|
||||
config = _interpreters.new_config('legacy')
|
||||
elif isinstance(config, str):
|
||||
config = _interpreters.new_config(config)
|
||||
|
||||
if whence is None:
|
||||
whence = _interpreters.WHENCE_XI
|
||||
|
||||
interpid = _testinternalcapi.create_interpreter(config, whence=whence)
|
||||
try:
|
||||
yield interpid
|
||||
finally:
|
||||
try:
|
||||
_testinternalcapi.destroy_interpreter(interpid)
|
||||
except _interpreters.InterpreterNotFoundError:
|
||||
pass
|
||||
|
||||
@contextlib.contextmanager
|
||||
def interpreter_obj_from_capi(self, config='legacy'):
|
||||
with self.interpreter_from_capi(config) as interpid:
|
||||
yield interpreters.Interpreter(interpid), interpid
|
||||
|
||||
@contextlib.contextmanager
|
||||
def capturing(self, script):
|
||||
wrapped, capturing = _captured_script(script, stdout=True, exc=True)
|
||||
#_dump_script(wrapped)
|
||||
with capturing:
|
||||
yield wrapped, capturing.final(force=True)
|
||||
|
||||
@requires_test_modules
|
||||
def run_from_capi(self, interpid, script, *, main=False):
|
||||
with self.capturing(script) as (wrapped, results):
|
||||
rc = _testinternalcapi.exec_interpreter(interpid, wrapped, main=main)
|
||||
assert rc == 0, rc
|
||||
results.raise_if_failed()
|
||||
return results.stdout
|
||||
|
||||
@contextlib.contextmanager
|
||||
def _running(self, run_interp, exec_interp):
|
||||
token = b'\0'
|
||||
r_in, w_in = self.pipe()
|
||||
r_out, w_out = self.pipe()
|
||||
|
||||
def close():
|
||||
_close_file(r_in)
|
||||
_close_file(w_in)
|
||||
_close_file(r_out)
|
||||
_close_file(w_out)
|
||||
|
||||
# Start running (and wait).
|
||||
script = dedent(f"""
|
||||
import os
|
||||
try:
|
||||
# handshake
|
||||
token = os.read({r_in}, 1)
|
||||
os.write({w_out}, token)
|
||||
# Wait for the "done" message.
|
||||
os.read({r_in}, 1)
|
||||
except BrokenPipeError:
|
||||
pass
|
||||
except OSError as exc:
|
||||
if exc.errno != 9:
|
||||
raise # re-raise
|
||||
# It was closed already.
|
||||
""")
|
||||
failed = None
|
||||
def run():
|
||||
nonlocal failed
|
||||
try:
|
||||
run_interp(script)
|
||||
except Exception as exc:
|
||||
failed = exc
|
||||
close()
|
||||
t = threading.Thread(target=run)
|
||||
t.start()
|
||||
|
||||
# handshake
|
||||
try:
|
||||
os.write(w_in, token)
|
||||
token2 = os.read(r_out, 1)
|
||||
assert token2 == token, (token2, token)
|
||||
except OSError:
|
||||
t.join()
|
||||
if failed is not None:
|
||||
raise failed
|
||||
|
||||
# CM __exit__()
|
||||
try:
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
# Send "done".
|
||||
os.write(w_in, b'\0')
|
||||
finally:
|
||||
close()
|
||||
t.join()
|
||||
if failed is not None:
|
||||
raise failed
|
||||
|
||||
@contextlib.contextmanager
|
||||
def running(self, interp):
|
||||
if isinstance(interp, int):
|
||||
interpid = interp
|
||||
def exec_interp(script):
|
||||
exc = _interpreters.exec(interpid, script)
|
||||
assert exc is None, exc
|
||||
run_interp = exec_interp
|
||||
else:
|
||||
def run_interp(script):
|
||||
text = self.run_and_capture(interp, script)
|
||||
assert text == '', repr(text)
|
||||
def exec_interp(script):
|
||||
interp.exec(script)
|
||||
with self._running(run_interp, exec_interp):
|
||||
yield
|
||||
|
||||
@requires_test_modules
|
||||
@contextlib.contextmanager
|
||||
def running_from_capi(self, interpid, *, main=False):
|
||||
def run_interp(script):
|
||||
text = self.run_from_capi(interpid, script, main=main)
|
||||
assert text == '', repr(text)
|
||||
def exec_interp(script):
|
||||
rc = _testinternalcapi.exec_interpreter(interpid, script)
|
||||
assert rc == 0, rc
|
||||
with self._running(run_interp, exec_interp):
|
||||
yield
|
||||
|
||||
@requires_test_modules
|
||||
def run_temp_from_capi(self, script, config='legacy'):
|
||||
if config is False:
|
||||
# Force using Py_NewInterpreter().
|
||||
run_in_interp = (lambda s, c: _testcapi.run_in_subinterp(s))
|
||||
config = None
|
||||
else:
|
||||
run_in_interp = _testinternalcapi.run_in_subinterp_with_config
|
||||
if config is True:
|
||||
config = 'default'
|
||||
if isinstance(config, str):
|
||||
config = _interpreters.new_config(config)
|
||||
with self.capturing(script) as (wrapped, results):
|
||||
rc = run_in_interp(wrapped, config)
|
||||
assert rc == 0, rc
|
||||
results.raise_if_failed()
|
||||
return results.stdout
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue