import contextlib import os import threading from textwrap import dedent import unittest from test.support import interpreters def _captured_script(script): r, w = os.pipe() indented = script.replace('\n', '\n ') wrapped = dedent(f""" import contextlib with open({w}, 'w', encoding='utf-8') as spipe: with contextlib.redirect_stdout(spipe): {indented} """) return wrapped, open(r, encoding='utf-8') def clean_up_interpreters(): for interp in interpreters.list_all(): if interp.id == 0: # main continue try: interp.close() except RuntimeError: pass # already destroyed def _run_output(interp, request, channels=None): script, rpipe = _captured_script(request) with rpipe: interp.exec_sync(script, channels=channels) return rpipe.read() @contextlib.contextmanager def _running(interp): r, w = os.pipe() def run(): interp.exec_sync(dedent(f""" # wait for "signal" with open({r}) as rpipe: rpipe.read() """)) t = threading.Thread(target=run) t.start() yield with open(w, 'w') as spipe: spipe.write('done') t.join() class TestBase(unittest.TestCase): def pipe(self): def ensure_closed(fd): try: os.close(fd) except OSError: pass r, w = os.pipe() self.addCleanup(lambda: ensure_closed(r)) self.addCleanup(lambda: ensure_closed(w)) return r, w def tearDown(self): clean_up_interpreters()