[3.13] gh-118894: Make asyncio REPL use pyrepl (GH-119433) (#119884)

(cherry picked from commit 2237946af0)

Co-authored-by: Łukasz Langa <lukasz@langa.pl>
This commit is contained in:
Miss Islington (bot) 2024-05-31 23:15:44 +02:00 committed by GitHub
parent 67ac19111f
commit a5272e63ef
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 143 additions and 65 deletions

View file

@ -219,6 +219,11 @@ class interrupt(FinishCommand):
os.kill(os.getpid(), signal.SIGINT) os.kill(os.getpid(), signal.SIGINT)
class ctrl_c(Command):
def do(self) -> None:
raise KeyboardInterrupt
class suspend(Command): class suspend(Command):
def do(self) -> None: def do(self) -> None:
import signal import signal

View file

@ -19,10 +19,14 @@
from __future__ import annotations from __future__ import annotations
import sys import _colorize # type: ignore[import-not-found]
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
import ast
import code
from dataclasses import dataclass, field from dataclasses import dataclass, field
import os.path
import sys
TYPE_CHECKING = False TYPE_CHECKING = False
@ -136,3 +140,54 @@ class Console(ABC):
@abstractmethod @abstractmethod
def repaint(self) -> None: ... def repaint(self) -> None: ...
class InteractiveColoredConsole(code.InteractiveConsole):
def __init__(
self,
locals: dict[str, object] | None = None,
filename: str = "<console>",
*,
local_exit: bool = False,
) -> None:
super().__init__(locals=locals, filename=filename, local_exit=local_exit) # type: ignore[call-arg]
self.can_colorize = _colorize.can_colorize()
def showsyntaxerror(self, filename=None):
super().showsyntaxerror(colorize=self.can_colorize)
def showtraceback(self):
super().showtraceback(colorize=self.can_colorize)
def runsource(self, source, filename="<input>", symbol="single"):
try:
tree = ast.parse(source)
except (SyntaxError, OverflowError, ValueError):
self.showsyntaxerror(filename)
return False
if tree.body:
*_, last_stmt = tree.body
for stmt in tree.body:
wrapper = ast.Interactive if stmt is last_stmt else ast.Module
the_symbol = symbol if stmt is last_stmt else "exec"
item = wrapper([stmt])
try:
code = self.compile.compiler(item, filename, the_symbol, dont_inherit=True)
except SyntaxError as e:
if e.args[0] == "'await' outside function":
python = os.path.basename(sys.executable)
e.add_note(
f"Try the asyncio REPL ({python} -m asyncio) to use"
f" top-level 'await' and run background asyncio tasks."
)
self.showsyntaxerror(filename)
return False
except (OverflowError, ValueError):
self.showsyntaxerror(filename)
return False
if code is None:
return True
self.runcode(code)
return False

View file

@ -131,6 +131,7 @@ default_keymap: tuple[tuple[KeySpec, CommandName], ...] = tuple(
("\\\\", "self-insert"), ("\\\\", "self-insert"),
(r"\x1b[200~", "enable_bracketed_paste"), (r"\x1b[200~", "enable_bracketed_paste"),
(r"\x1b[201~", "disable_bracketed_paste"), (r"\x1b[201~", "disable_bracketed_paste"),
(r"\x03", "ctrl-c"),
] ]
+ [(c, "self-insert") for c in map(chr, range(32, 127)) if c != "\\"] + [(c, "self-insert") for c in map(chr, range(32, 127)) if c != "\\"]
+ [(c, "self-insert") for c in map(chr, range(128, 256)) if c.isalpha()] + [(c, "self-insert") for c in map(chr, range(128, 256)) if c.isalpha()]

View file

@ -25,14 +25,13 @@ allowing multiline input and multiline history entries.
from __future__ import annotations from __future__ import annotations
import _colorize # type: ignore[import-not-found]
import _sitebuiltins import _sitebuiltins
import linecache import linecache
import sys import sys
import code import code
import ast
from types import ModuleType from types import ModuleType
from .console import InteractiveColoredConsole
from .readline import _get_reader, multiline_input from .readline import _get_reader, multiline_input
_error: tuple[type[Exception], ...] | type[Exception] _error: tuple[type[Exception], ...] | type[Exception]
@ -74,57 +73,21 @@ REPL_COMMANDS = {
"clear": _clear_screen, "clear": _clear_screen,
} }
class InteractiveColoredConsole(code.InteractiveConsole):
def __init__(
self,
locals: dict[str, object] | None = None,
filename: str = "<console>",
*,
local_exit: bool = False,
) -> None:
super().__init__(locals=locals, filename=filename, local_exit=local_exit) # type: ignore[call-arg]
self.can_colorize = _colorize.can_colorize()
def showsyntaxerror(self, filename=None):
super().showsyntaxerror(colorize=self.can_colorize)
def showtraceback(self):
super().showtraceback(colorize=self.can_colorize)
def runsource(self, source, filename="<input>", symbol="single"):
try:
tree = ast.parse(source)
except (OverflowError, SyntaxError, ValueError):
self.showsyntaxerror(filename)
return False
if tree.body:
*_, last_stmt = tree.body
for stmt in tree.body:
wrapper = ast.Interactive if stmt is last_stmt else ast.Module
the_symbol = symbol if stmt is last_stmt else "exec"
item = wrapper([stmt])
try:
code = compile(item, filename, the_symbol, dont_inherit=True)
except (OverflowError, ValueError, SyntaxError):
self.showsyntaxerror(filename)
return False
if code is None:
return True
self.runcode(code)
return False
def run_multiline_interactive_console( def run_multiline_interactive_console(
mainmodule: ModuleType | None= None, future_flags: int = 0 mainmodule: ModuleType | None = None,
future_flags: int = 0,
console: code.InteractiveConsole | None = None,
) -> None: ) -> None:
import __main__ import __main__
from .readline import _setup from .readline import _setup
_setup() _setup()
mainmodule = mainmodule or __main__ mainmodule = mainmodule or __main__
console = InteractiveColoredConsole(mainmodule.__dict__, filename="<stdin>") if console is None:
console = InteractiveColoredConsole(
mainmodule.__dict__, filename="<stdin>"
)
if future_flags: if future_flags:
console.compile.compiler.flags |= future_flags console.compile.compiler.flags |= future_flags

View file

@ -1,42 +1,49 @@
import ast import ast
import asyncio import asyncio
import code
import concurrent.futures import concurrent.futures
import inspect import inspect
import os
import site import site
import sys import sys
import threading import threading
import types import types
import warnings import warnings
from _colorize import can_colorize, ANSIColors # type: ignore[import-not-found]
from _pyrepl.console import InteractiveColoredConsole
from . import futures from . import futures
class AsyncIOInteractiveConsole(code.InteractiveConsole): class AsyncIOInteractiveConsole(InteractiveColoredConsole):
def __init__(self, locals, loop): def __init__(self, locals, loop):
super().__init__(locals) super().__init__(locals, filename="<stdin>")
self.compile.compiler.flags |= ast.PyCF_ALLOW_TOP_LEVEL_AWAIT self.compile.compiler.flags |= ast.PyCF_ALLOW_TOP_LEVEL_AWAIT
self.loop = loop self.loop = loop
def runcode(self, code): def runcode(self, code):
global return_code
future = concurrent.futures.Future() future = concurrent.futures.Future()
def callback(): def callback():
global return_code
global repl_future global repl_future
global repl_future_interrupted global keyboard_interrupted
repl_future = None repl_future = None
repl_future_interrupted = False keyboard_interrupted = False
func = types.FunctionType(code, self.locals) func = types.FunctionType(code, self.locals)
try: try:
coro = func() coro = func()
except SystemExit: except SystemExit as se:
raise return_code = se.code
self.loop.stop()
return
except KeyboardInterrupt as ex: except KeyboardInterrupt as ex:
repl_future_interrupted = True keyboard_interrupted = True
future.set_exception(ex) future.set_exception(ex)
return return
except BaseException as ex: except BaseException as ex:
@ -57,10 +64,12 @@ class AsyncIOInteractiveConsole(code.InteractiveConsole):
try: try:
return future.result() return future.result()
except SystemExit: except SystemExit as se:
raise return_code = se.code
self.loop.stop()
return
except BaseException: except BaseException:
if repl_future_interrupted: if keyboard_interrupted:
self.write("\nKeyboardInterrupt\n") self.write("\nKeyboardInterrupt\n")
else: else:
self.showtraceback() self.showtraceback()
@ -69,18 +78,56 @@ class AsyncIOInteractiveConsole(code.InteractiveConsole):
class REPLThread(threading.Thread): class REPLThread(threading.Thread):
def run(self): def run(self):
global return_code
try: try:
banner = ( banner = (
f'asyncio REPL {sys.version} on {sys.platform}\n' f'asyncio REPL {sys.version} on {sys.platform}\n'
f'Use "await" directly instead of "asyncio.run()".\n' f'Use "await" directly instead of "asyncio.run()".\n'
f'Type "help", "copyright", "credits" or "license" ' f'Type "help", "copyright", "credits" or "license" '
f'for more information.\n' f'for more information.\n'
f'{getattr(sys, "ps1", ">>> ")}import asyncio'
) )
console.interact( console.write(banner)
banner=banner,
exitmsg='exiting asyncio REPL...') if startup_path := os.getenv("PYTHONSTARTUP"):
import tokenize
with tokenize.open(startup_path) as f:
startup_code = compile(f.read(), startup_path, "exec")
exec(startup_code, console.locals)
ps1 = getattr(sys, "ps1", ">>> ")
if can_colorize():
ps1 = f"{ANSIColors.BOLD_MAGENTA}{ps1}{ANSIColors.RESET}"
console.write(f"{ps1}import asyncio\n")
try:
import errno
if os.getenv("PYTHON_BASIC_REPL"):
raise RuntimeError("user environment requested basic REPL")
if not os.isatty(sys.stdin.fileno()):
raise OSError(errno.ENOTTY, "tty required", "stdin")
# This import will fail on operating systems with no termios.
from _pyrepl.simple_interact import (
check,
run_multiline_interactive_console,
)
if err := check():
raise RuntimeError(err)
except Exception as e:
console.interact(banner="", exitmsg=exit_message)
else:
try:
run_multiline_interactive_console(console=console)
except SystemExit:
# expected via the `exit` and `quit` commands
pass
except BaseException:
# unexpected issue
console.showtraceback()
console.write("Internal error, ")
return_code = 1
finally: finally:
warnings.filterwarnings( warnings.filterwarnings(
'ignore', 'ignore',
@ -91,6 +138,9 @@ class REPLThread(threading.Thread):
if __name__ == '__main__': if __name__ == '__main__':
CAN_USE_PYREPL = True
return_code = 0
loop = asyncio.new_event_loop() loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop) asyncio.set_event_loop(loop)
@ -103,7 +153,7 @@ if __name__ == '__main__':
console = AsyncIOInteractiveConsole(repl_locals, loop) console = AsyncIOInteractiveConsole(repl_locals, loop)
repl_future = None repl_future = None
repl_future_interrupted = False keyboard_interrupted = False
try: try:
import readline # NoQA import readline # NoQA
@ -126,7 +176,7 @@ if __name__ == '__main__':
completer = rlcompleter.Completer(console.locals) completer = rlcompleter.Completer(console.locals)
readline.set_completer(completer.complete) readline.set_completer(completer.complete)
repl_thread = REPLThread() repl_thread = REPLThread(name="Interactive thread")
repl_thread.daemon = True repl_thread.daemon = True
repl_thread.start() repl_thread.start()
@ -134,9 +184,12 @@ if __name__ == '__main__':
try: try:
loop.run_forever() loop.run_forever()
except KeyboardInterrupt: except KeyboardInterrupt:
keyboard_interrupted = True
if repl_future and not repl_future.done(): if repl_future and not repl_future.done():
repl_future.cancel() repl_future.cancel()
repl_future_interrupted = True
continue continue
else: else:
break break
console.write('exiting asyncio REPL...\n')
sys.exit(return_code)

View file

@ -6,7 +6,7 @@ from textwrap import dedent
from test.support import force_not_colorized from test.support import force_not_colorized
from _pyrepl.simple_interact import InteractiveColoredConsole from _pyrepl.console import InteractiveColoredConsole
class TestSimpleInteract(unittest.TestCase): class TestSimpleInteract(unittest.TestCase):

View file

@ -0,0 +1 @@
:mod:`asyncio` REPL now has the same capabilities as PyREPL.