mirror of
				https://github.com/python/cpython.git
				synced 2025-11-04 03:44:55 +00:00 
			
		
		
		
	This switches the main pyrepl event loop to always be non-blocking so that it can listen to incoming interruptions from other threads. This also resolves invalid display of exceptions from other threads (gh-123178). This also fixes freezes with pasting and an active input hook.
		
			
				
	
	
		
			295 lines
		
	
	
	
		
			9.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			295 lines
		
	
	
	
		
			9.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
"""Test the interactive interpreter."""
 | 
						|
 | 
						|
import os
 | 
						|
import select
 | 
						|
import subprocess
 | 
						|
import sys
 | 
						|
import unittest
 | 
						|
from textwrap import dedent
 | 
						|
from test import support
 | 
						|
from test.support import (
 | 
						|
    cpython_only,
 | 
						|
    has_subprocess_support,
 | 
						|
    os_helper,
 | 
						|
    SuppressCrashReport,
 | 
						|
    SHORT_TIMEOUT,
 | 
						|
)
 | 
						|
from test.support.script_helper import kill_python
 | 
						|
from test.support.import_helper import import_module
 | 
						|
 | 
						|
try:
 | 
						|
    import pty
 | 
						|
except ImportError:
 | 
						|
    pty = None
 | 
						|
 | 
						|
 | 
						|
if not has_subprocess_support:
 | 
						|
    raise unittest.SkipTest("test module requires subprocess")
 | 
						|
 | 
						|
 | 
						|
def spawn_repl(*args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, **kw):
 | 
						|
    """Run the Python REPL with the given arguments.
 | 
						|
 | 
						|
    kw is extra keyword args to pass to subprocess.Popen. Returns a Popen
 | 
						|
    object.
 | 
						|
    """
 | 
						|
 | 
						|
    # To run the REPL without using a terminal, spawn python with the command
 | 
						|
    # line option '-i' and the process name set to '<stdin>'.
 | 
						|
    # The directory of argv[0] must match the directory of the Python
 | 
						|
    # executable for the Popen() call to python to succeed as the directory
 | 
						|
    # path may be used by Py_GetPath() to build the default module search
 | 
						|
    # path.
 | 
						|
    stdin_fname = os.path.join(os.path.dirname(sys.executable), "<stdin>")
 | 
						|
    cmd_line = [stdin_fname, '-I', '-i']
 | 
						|
    cmd_line.extend(args)
 | 
						|
 | 
						|
    # Set TERM=vt100, for the rationale see the comments in spawn_python() of
 | 
						|
    # test.support.script_helper.
 | 
						|
    env = kw.setdefault('env', dict(os.environ))
 | 
						|
    env['TERM'] = 'vt100'
 | 
						|
    return subprocess.Popen(cmd_line,
 | 
						|
                            executable=sys.executable,
 | 
						|
                            text=True,
 | 
						|
                            stdin=subprocess.PIPE,
 | 
						|
                            stdout=stdout, stderr=stderr,
 | 
						|
                            **kw)
 | 
						|
 | 
						|
def run_on_interactive_mode(source):
 | 
						|
    """Spawn a new Python interpreter, pass the given
 | 
						|
    input source code from the stdin and return the
 | 
						|
    result back. If the interpreter exits non-zero, it
 | 
						|
    raises a ValueError."""
 | 
						|
 | 
						|
    process = spawn_repl()
 | 
						|
    process.stdin.write(source)
 | 
						|
    output = kill_python(process)
 | 
						|
 | 
						|
    if process.returncode != 0:
 | 
						|
        raise ValueError("Process didn't exit properly.")
 | 
						|
    return output
 | 
						|
 | 
						|
 | 
						|
class TestInteractiveInterpreter(unittest.TestCase):
 | 
						|
 | 
						|
    @cpython_only
 | 
						|
    # Python built with Py_TRACE_REFS fail with a fatal error in
 | 
						|
    # _PyRefchain_Trace() on memory allocation error.
 | 
						|
    @unittest.skipIf(support.Py_TRACE_REFS, 'cannot test Py_TRACE_REFS build')
 | 
						|
    def test_no_memory(self):
 | 
						|
        import_module("_testcapi")
 | 
						|
        # Issue #30696: Fix the interactive interpreter looping endlessly when
 | 
						|
        # no memory. Check also that the fix does not break the interactive
 | 
						|
        # loop when an exception is raised.
 | 
						|
        user_input = """
 | 
						|
            import sys, _testcapi
 | 
						|
            1/0
 | 
						|
            print('After the exception.')
 | 
						|
            _testcapi.set_nomemory(0)
 | 
						|
            sys.exit(0)
 | 
						|
        """
 | 
						|
        user_input = dedent(user_input)
 | 
						|
        p = spawn_repl()
 | 
						|
        with SuppressCrashReport():
 | 
						|
            p.stdin.write(user_input)
 | 
						|
        output = kill_python(p)
 | 
						|
        self.assertIn('After the exception.', output)
 | 
						|
        # Exit code 120: Py_FinalizeEx() failed to flush stdout and stderr.
 | 
						|
        self.assertIn(p.returncode, (1, 120))
 | 
						|
 | 
						|
    @cpython_only
 | 
						|
    def test_multiline_string_parsing(self):
 | 
						|
        # bpo-39209: Multiline string tokens need to be handled in the tokenizer
 | 
						|
        # in two places: the interactive path and the non-interactive path.
 | 
						|
        user_input = '''\
 | 
						|
        x = """<?xml version="1.0" encoding="iso-8859-1"?>
 | 
						|
        <test>
 | 
						|
            <Users>
 | 
						|
                <fun25>
 | 
						|
                    <limits>
 | 
						|
                        <total>0KiB</total>
 | 
						|
                        <kbps>0</kbps>
 | 
						|
                        <rps>1.3</rps>
 | 
						|
                        <connections>0</connections>
 | 
						|
                    </limits>
 | 
						|
                    <usages>
 | 
						|
                        <total>16738211KiB</total>
 | 
						|
                        <kbps>237.15</kbps>
 | 
						|
                        <rps>1.3</rps>
 | 
						|
                        <connections>0</connections>
 | 
						|
                    </usages>
 | 
						|
                    <time_to_refresh>never</time_to_refresh>
 | 
						|
                    <limit_exceeded_URL>none</limit_exceeded_URL>
 | 
						|
                </fun25>
 | 
						|
            </Users>
 | 
						|
        </test>"""
 | 
						|
        '''
 | 
						|
        user_input = dedent(user_input)
 | 
						|
        p = spawn_repl()
 | 
						|
        p.stdin.write(user_input)
 | 
						|
        output = kill_python(p)
 | 
						|
        self.assertEqual(p.returncode, 0)
 | 
						|
 | 
						|
    def test_close_stdin(self):
 | 
						|
        user_input = dedent('''
 | 
						|
            import os
 | 
						|
            print("before close")
 | 
						|
            os.close(0)
 | 
						|
        ''')
 | 
						|
        prepare_repl = dedent('''
 | 
						|
            from test.support import suppress_msvcrt_asserts
 | 
						|
            suppress_msvcrt_asserts()
 | 
						|
        ''')
 | 
						|
        process = spawn_repl('-c', prepare_repl)
 | 
						|
        output = process.communicate(user_input)[0]
 | 
						|
        self.assertEqual(process.returncode, 0)
 | 
						|
        self.assertIn('before close', output)
 | 
						|
 | 
						|
    def test_interactive_traceback_reporting(self):
 | 
						|
        user_input = "1 / 0 / 3 / 4"
 | 
						|
        p = spawn_repl()
 | 
						|
        p.stdin.write(user_input)
 | 
						|
        output = kill_python(p)
 | 
						|
        self.assertEqual(p.returncode, 0)
 | 
						|
 | 
						|
        traceback_lines = output.splitlines()[-6:-1]
 | 
						|
        expected_lines = [
 | 
						|
            "Traceback (most recent call last):",
 | 
						|
            "  File \"<stdin>\", line 1, in <module>",
 | 
						|
            "    1 / 0 / 3 / 4",
 | 
						|
            "    ~~^~~",
 | 
						|
            "ZeroDivisionError: division by zero",
 | 
						|
        ]
 | 
						|
        self.assertEqual(traceback_lines, expected_lines)
 | 
						|
 | 
						|
    def test_interactive_traceback_reporting_multiple_input(self):
 | 
						|
        user_input1 = dedent("""
 | 
						|
        def foo(x):
 | 
						|
            1 / x
 | 
						|
 | 
						|
        """)
 | 
						|
        p = spawn_repl()
 | 
						|
        p.stdin.write(user_input1)
 | 
						|
        user_input2 = "foo(0)"
 | 
						|
        p.stdin.write(user_input2)
 | 
						|
        output = kill_python(p)
 | 
						|
        self.assertEqual(p.returncode, 0)
 | 
						|
 | 
						|
        traceback_lines = output.splitlines()[-8:-1]
 | 
						|
        expected_lines = [
 | 
						|
            '  File "<stdin>", line 1, in <module>',
 | 
						|
            '    foo(0)',
 | 
						|
            '    ~~~^^^',
 | 
						|
            '  File "<stdin>", line 2, in foo',
 | 
						|
            '    1 / x',
 | 
						|
            '    ~~^~~',
 | 
						|
            'ZeroDivisionError: division by zero'
 | 
						|
        ]
 | 
						|
        self.assertEqual(traceback_lines, expected_lines)
 | 
						|
 | 
						|
    def test_runsource_show_syntax_error_location(self):
 | 
						|
        user_input = dedent("""def f(x, x): ...
 | 
						|
                            """)
 | 
						|
        p = spawn_repl()
 | 
						|
        p.stdin.write(user_input)
 | 
						|
        output = kill_python(p)
 | 
						|
        expected_lines = [
 | 
						|
            '    def f(x, x): ...',
 | 
						|
            '             ^',
 | 
						|
            "SyntaxError: duplicate argument 'x' in function definition"
 | 
						|
        ]
 | 
						|
        self.assertEqual(output.splitlines()[4:-1], expected_lines)
 | 
						|
 | 
						|
    def test_interactive_source_is_in_linecache(self):
 | 
						|
        user_input = dedent("""
 | 
						|
        def foo(x):
 | 
						|
            return x + 1
 | 
						|
 | 
						|
        def bar(x):
 | 
						|
            return foo(x) + 2
 | 
						|
        """)
 | 
						|
        p = spawn_repl()
 | 
						|
        p.stdin.write(user_input)
 | 
						|
        user_input2 = dedent("""
 | 
						|
        import linecache
 | 
						|
        print(linecache.cache['<stdin>-1'])
 | 
						|
        """)
 | 
						|
        p.stdin.write(user_input2)
 | 
						|
        output = kill_python(p)
 | 
						|
        self.assertEqual(p.returncode, 0)
 | 
						|
        expected = "(30, None, [\'def foo(x):\\n\', \'    return x + 1\\n\', \'\\n\'], \'<stdin>\')"
 | 
						|
        self.assertIn(expected, output, expected)
 | 
						|
 | 
						|
    def test_asyncio_repl_reaches_python_startup_script(self):
 | 
						|
        with os_helper.temp_dir() as tmpdir:
 | 
						|
            script = os.path.join(tmpdir, "pythonstartup.py")
 | 
						|
            with open(script, "w") as f:
 | 
						|
                f.write("print('pythonstartup done!')" + os.linesep)
 | 
						|
                f.write("exit(0)" + os.linesep)
 | 
						|
 | 
						|
            env = os.environ.copy()
 | 
						|
            env["PYTHON_HISTORY"] = os.path.join(tmpdir, ".asyncio_history")
 | 
						|
            env["PYTHONSTARTUP"] = script
 | 
						|
            subprocess.check_call(
 | 
						|
                [sys.executable, "-m", "asyncio"],
 | 
						|
                stdout=subprocess.PIPE,
 | 
						|
                stderr=subprocess.PIPE,
 | 
						|
                env=env,
 | 
						|
                timeout=SHORT_TIMEOUT,
 | 
						|
            )
 | 
						|
 | 
						|
    @unittest.skipUnless(pty, "requires pty")
 | 
						|
    def test_asyncio_repl_is_ok(self):
 | 
						|
        m, s = pty.openpty()
 | 
						|
        cmd = [sys.executable, "-I", "-m", "asyncio"]
 | 
						|
        env = os.environ.copy()
 | 
						|
        proc = subprocess.Popen(
 | 
						|
            cmd,
 | 
						|
            stdin=s,
 | 
						|
            stdout=s,
 | 
						|
            stderr=s,
 | 
						|
            text=True,
 | 
						|
            close_fds=True,
 | 
						|
            env=env,
 | 
						|
        )
 | 
						|
        os.close(s)
 | 
						|
        os.write(m, b"await asyncio.sleep(0)\n")
 | 
						|
        os.write(m, b"exit()\n")
 | 
						|
        output = []
 | 
						|
        while select.select([m], [], [], SHORT_TIMEOUT)[0]:
 | 
						|
            try:
 | 
						|
                data = os.read(m, 1024).decode("utf-8")
 | 
						|
                if not data:
 | 
						|
                    break
 | 
						|
            except OSError:
 | 
						|
                break
 | 
						|
            output.append(data)
 | 
						|
        os.close(m)
 | 
						|
        try:
 | 
						|
            exit_code = proc.wait(timeout=SHORT_TIMEOUT)
 | 
						|
        except subprocess.TimeoutExpired:
 | 
						|
            proc.kill()
 | 
						|
            exit_code = proc.wait()
 | 
						|
 | 
						|
        self.assertEqual(exit_code, 0, "".join(output))
 | 
						|
 | 
						|
class TestInteractiveModeSyntaxErrors(unittest.TestCase):
 | 
						|
 | 
						|
    def test_interactive_syntax_error_correct_line(self):
 | 
						|
        output = run_on_interactive_mode(dedent("""\
 | 
						|
        def f():
 | 
						|
            print(0)
 | 
						|
            return yield 42
 | 
						|
        """))
 | 
						|
 | 
						|
        traceback_lines = output.splitlines()[-4:-1]
 | 
						|
        expected_lines = [
 | 
						|
            '    return yield 42',
 | 
						|
            '           ^^^^^',
 | 
						|
            'SyntaxError: invalid syntax'
 | 
						|
        ]
 | 
						|
        self.assertEqual(traceback_lines, expected_lines)
 | 
						|
 | 
						|
 | 
						|
if __name__ == "__main__":
 | 
						|
    unittest.main()
 |