gh-121790: Fix interactive console initialization (#121793)

Co-authored-by: Łukasz Langa <lukasz@langa.pl>
This commit is contained in:
Milan Oberkirch 2024-07-16 00:24:18 +02:00 committed by GitHub
parent d23be3947c
commit e5c7216f37
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 85 additions and 42 deletions

View file

@ -1,15 +1,27 @@
"""Test the interactive interpreter."""
import os
import select
import subprocess
import sys
import unittest
from textwrap import dedent
from test import support
from test.support import cpython_only, has_subprocess_support, SuppressCrashReport
from test.support.script_helper import assert_python_failure, kill_python, assert_python_ok
from test.support import (
cpython_only,
has_subprocess_support,
os_helper,
SuppressCrashReport,
SHORT_TIMEOUT,
)
from test.support.script_helper import kill_python
from test.support.import_helper import import_module
try:
import pty
except ImportError:
pty = None
if not has_subprocess_support:
raise unittest.SkipTest("test module requires subprocess")
@ -195,9 +207,56 @@ class TestInteractiveInterpreter(unittest.TestCase):
expected = "(30, None, [\'def foo(x):\\n\', \' return x + 1\\n\', \'\\n\'], \'<stdin>\')"
self.assertIn(expected, output, expected)
def test_asyncio_repl_no_tty_fails(self):
assert assert_python_failure("-m", "asyncio")
def test_asyncio_repl_reaches_python_startup_script(self):
with os_helper.temp_dir() as tmpdir:
script = os.path.join(tmpdir, "pythonstartup.py")
with open(script, "w") as f:
f.write("print('pythonstartup done!')" + os.linesep)
f.write("exit(0)" + os.linesep)
env = os.environ.copy()
env["PYTHONSTARTUP"] = script
subprocess.check_call(
[sys.executable, "-m", "asyncio"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env,
timeout=SHORT_TIMEOUT,
)
@unittest.skipUnless(pty, "requires pty")
def test_asyncio_repl_is_ok(self):
m, s = pty.openpty()
cmd = [sys.executable, "-m", "asyncio"]
proc = subprocess.Popen(
cmd,
stdin=s,
stdout=s,
stderr=s,
text=True,
close_fds=True,
env=os.environ,
)
os.close(s)
os.write(m, b"await asyncio.sleep(0)\n")
os.write(m, b"exit()\n")
output = []
while select.select([m], [], [], SHORT_TIMEOUT)[0]:
try:
data = os.read(m, 1024).decode("utf-8")
if not data:
break
except OSError:
break
output.append(data)
os.close(m)
try:
exit_code = proc.wait(timeout=SHORT_TIMEOUT)
except subprocess.TimeoutExpired:
proc.kill()
exit_code = proc.wait()
self.assertEqual(exit_code, 0)
class TestInteractiveModeSyntaxErrors(unittest.TestCase):