mirror of
https://github.com/python/cpython.git
synced 2025-07-23 11:15:24 +00:00
gh-111201: Support pyrepl on Windows (#119559)
Co-authored-by: Anthony Shaw <anthony.p.shaw@gmail.com> Co-authored-by: Łukasz Langa <lukasz@langa.pl>
This commit is contained in:
parent
13a5fdc72f
commit
0d07182821
15 changed files with 1020 additions and 49 deletions
|
@ -154,10 +154,10 @@ New Features
|
||||||
A Better Interactive Interpreter
|
A Better Interactive Interpreter
|
||||||
--------------------------------
|
--------------------------------
|
||||||
|
|
||||||
On Unix-like systems like Linux or macOS, Python now uses a new
|
On Unix-like systems like Linux or macOS as well as Windows, Python now
|
||||||
:term:`interactive` shell. When the user starts the :term:`REPL` from an
|
uses a new :term:`interactive` shell. When the user starts the
|
||||||
interactive terminal, and both :mod:`curses` and :mod:`readline` are
|
:term:`REPL` from an interactive terminal the interactive shell now
|
||||||
available, the interactive shell now supports the following new features:
|
supports the following new features:
|
||||||
|
|
||||||
* Colorized prompts.
|
* Colorized prompts.
|
||||||
* Multiline editing with history preservation.
|
* Multiline editing with history preservation.
|
||||||
|
@ -174,10 +174,13 @@ available, the interactive shell now supports the following new features:
|
||||||
If the new interactive shell is not desired, it can be disabled via
|
If the new interactive shell is not desired, it can be disabled via
|
||||||
the :envvar:`PYTHON_BASIC_REPL` environment variable.
|
the :envvar:`PYTHON_BASIC_REPL` environment variable.
|
||||||
|
|
||||||
|
The new shell requires :mod:`curses` on Unix-like systems.
|
||||||
|
|
||||||
For more on interactive mode, see :ref:`tut-interac`.
|
For more on interactive mode, see :ref:`tut-interac`.
|
||||||
|
|
||||||
(Contributed by Pablo Galindo Salgado, Łukasz Langa, and
|
(Contributed by Pablo Galindo Salgado, Łukasz Langa, and
|
||||||
Lysandros Nikolaou in :gh:`111201` based on code from the PyPy project.)
|
Lysandros Nikolaou in :gh:`111201` based on code from the PyPy project.
|
||||||
|
Windows support contributed by Dino Viehland and Anthony Shaw.)
|
||||||
|
|
||||||
.. _whatsnew313-improved-error-messages:
|
.. _whatsnew313-improved-error-messages:
|
||||||
|
|
||||||
|
|
|
@ -1,7 +1,11 @@
|
||||||
import os
|
import os
|
||||||
import sys
|
import sys
|
||||||
|
|
||||||
CAN_USE_PYREPL = sys.platform != "win32"
|
CAN_USE_PYREPL: bool
|
||||||
|
if sys.platform != "win32":
|
||||||
|
CAN_USE_PYREPL = True
|
||||||
|
else:
|
||||||
|
CAN_USE_PYREPL = sys.getwindowsversion().build >= 10586 # Windows 10 TH2
|
||||||
|
|
||||||
|
|
||||||
def interactive_console(mainmodule=None, quiet=False, pythonstartup=False):
|
def interactive_console(mainmodule=None, quiet=False, pythonstartup=False):
|
||||||
|
|
|
@ -19,10 +19,18 @@
|
||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import sys
|
||||||
|
|
||||||
from abc import ABC, abstractmethod
|
from abc import ABC, abstractmethod
|
||||||
from dataclasses import dataclass, field
|
from dataclasses import dataclass, field
|
||||||
|
|
||||||
|
|
||||||
|
TYPE_CHECKING = False
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from typing import IO
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class Event:
|
class Event:
|
||||||
evt: str
|
evt: str
|
||||||
|
@ -36,6 +44,25 @@ class Console(ABC):
|
||||||
height: int = 25
|
height: int = 25
|
||||||
width: int = 80
|
width: int = 80
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
f_in: IO[bytes] | int = 0,
|
||||||
|
f_out: IO[bytes] | int = 1,
|
||||||
|
term: str = "",
|
||||||
|
encoding: str = "",
|
||||||
|
):
|
||||||
|
self.encoding = encoding or sys.getdefaultencoding()
|
||||||
|
|
||||||
|
if isinstance(f_in, int):
|
||||||
|
self.input_fd = f_in
|
||||||
|
else:
|
||||||
|
self.input_fd = f_in.fileno()
|
||||||
|
|
||||||
|
if isinstance(f_out, int):
|
||||||
|
self.output_fd = f_out
|
||||||
|
else:
|
||||||
|
self.output_fd = f_out.fileno()
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def refresh(self, screen: list[str], xy: tuple[int, int]) -> None: ...
|
def refresh(self, screen: list[str], xy: tuple[int, int]) -> None: ...
|
||||||
|
|
||||||
|
@ -108,5 +135,4 @@ class Console(ABC):
|
||||||
...
|
...
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def repaint(self) -> None:
|
def repaint(self) -> None: ...
|
||||||
...
|
|
||||||
|
|
|
@ -442,14 +442,13 @@ class Reader:
|
||||||
"""
|
"""
|
||||||
if self.arg is None:
|
if self.arg is None:
|
||||||
return default
|
return default
|
||||||
else:
|
return self.arg
|
||||||
return self.arg
|
|
||||||
|
|
||||||
def get_prompt(self, lineno: int, cursor_on_line: bool) -> str:
|
def get_prompt(self, lineno: int, cursor_on_line: bool) -> str:
|
||||||
"""Return what should be in the left-hand margin for line
|
"""Return what should be in the left-hand margin for line
|
||||||
'lineno'."""
|
'lineno'."""
|
||||||
if self.arg is not None and cursor_on_line:
|
if self.arg is not None and cursor_on_line:
|
||||||
prompt = "(arg: %s) " % self.arg
|
prompt = f"(arg: {self.arg}) "
|
||||||
elif self.paste_mode:
|
elif self.paste_mode:
|
||||||
prompt = "(paste) "
|
prompt = "(paste) "
|
||||||
elif "\n" in self.buffer:
|
elif "\n" in self.buffer:
|
||||||
|
@ -515,12 +514,12 @@ class Reader:
|
||||||
offset = l - 1 if in_wrapped_line else l # need to remove backslash
|
offset = l - 1 if in_wrapped_line else l # need to remove backslash
|
||||||
if offset >= pos:
|
if offset >= pos:
|
||||||
break
|
break
|
||||||
|
|
||||||
|
if p + sum(l2) >= self.console.width:
|
||||||
|
pos -= l - 1 # -1 cause backslash is not in buffer
|
||||||
else:
|
else:
|
||||||
if p + sum(l2) >= self.console.width:
|
pos -= l + 1 # +1 cause newline is in buffer
|
||||||
pos -= l - 1 # -1 cause backslash is not in buffer
|
y += 1
|
||||||
else:
|
|
||||||
pos -= l + 1 # +1 cause newline is in buffer
|
|
||||||
y += 1
|
|
||||||
return p + sum(l2[:pos]), y
|
return p + sum(l2[:pos]), y
|
||||||
|
|
||||||
def insert(self, text: str | list[str]) -> None:
|
def insert(self, text: str | list[str]) -> None:
|
||||||
|
@ -582,7 +581,6 @@ class Reader:
|
||||||
for arg in ("msg", "ps1", "ps2", "ps3", "ps4", "paste_mode"):
|
for arg in ("msg", "ps1", "ps2", "ps3", "ps4", "paste_mode"):
|
||||||
setattr(self, arg, prev_state[arg])
|
setattr(self, arg, prev_state[arg])
|
||||||
self.prepare()
|
self.prepare()
|
||||||
pass
|
|
||||||
|
|
||||||
def finish(self) -> None:
|
def finish(self) -> None:
|
||||||
"""Called when a command signals that we're finished."""
|
"""Called when a command signals that we're finished."""
|
||||||
|
|
|
@ -38,7 +38,14 @@ from rlcompleter import Completer as RLCompleter
|
||||||
|
|
||||||
from . import commands, historical_reader
|
from . import commands, historical_reader
|
||||||
from .completing_reader import CompletingReader
|
from .completing_reader import CompletingReader
|
||||||
from .unix_console import UnixConsole, _error
|
from .console import Console as ConsoleType
|
||||||
|
|
||||||
|
Console: type[ConsoleType]
|
||||||
|
_error: tuple[type[Exception], ...] | type[Exception]
|
||||||
|
try:
|
||||||
|
from .unix_console import UnixConsole as Console, _error
|
||||||
|
except ImportError:
|
||||||
|
from .windows_console import WindowsConsole as Console, _error
|
||||||
|
|
||||||
ENCODING = sys.getdefaultencoding() or "latin1"
|
ENCODING = sys.getdefaultencoding() or "latin1"
|
||||||
|
|
||||||
|
@ -328,7 +335,7 @@ class _ReadlineWrapper:
|
||||||
|
|
||||||
def get_reader(self) -> ReadlineAlikeReader:
|
def get_reader(self) -> ReadlineAlikeReader:
|
||||||
if self.reader is None:
|
if self.reader is None:
|
||||||
console = UnixConsole(self.f_in, self.f_out, encoding=ENCODING)
|
console = Console(self.f_in, self.f_out, encoding=ENCODING)
|
||||||
self.reader = ReadlineAlikeReader(console=console, config=self.config)
|
self.reader = ReadlineAlikeReader(console=console, config=self.config)
|
||||||
return self.reader
|
return self.reader
|
||||||
|
|
||||||
|
|
|
@ -34,8 +34,12 @@ import ast
|
||||||
from types import ModuleType
|
from types import ModuleType
|
||||||
|
|
||||||
from .readline import _get_reader, multiline_input
|
from .readline import _get_reader, multiline_input
|
||||||
from .unix_console import _error
|
|
||||||
|
|
||||||
|
_error: tuple[type[Exception], ...] | type[Exception]
|
||||||
|
try:
|
||||||
|
from .unix_console import _error
|
||||||
|
except ModuleNotFoundError:
|
||||||
|
from .windows_console import _error
|
||||||
|
|
||||||
def check() -> str:
|
def check() -> str:
|
||||||
"""Returns the error message if there is a problem initializing the state."""
|
"""Returns the error message if there is a problem initializing the state."""
|
||||||
|
|
|
@ -143,18 +143,7 @@ class UnixConsole(Console):
|
||||||
- term (str): Terminal name.
|
- term (str): Terminal name.
|
||||||
- encoding (str): Encoding to use for I/O operations.
|
- encoding (str): Encoding to use for I/O operations.
|
||||||
"""
|
"""
|
||||||
|
super().__init__(f_in, f_out, term, encoding)
|
||||||
self.encoding = encoding or sys.getdefaultencoding()
|
|
||||||
|
|
||||||
if isinstance(f_in, int):
|
|
||||||
self.input_fd = f_in
|
|
||||||
else:
|
|
||||||
self.input_fd = f_in.fileno()
|
|
||||||
|
|
||||||
if isinstance(f_out, int):
|
|
||||||
self.output_fd = f_out
|
|
||||||
else:
|
|
||||||
self.output_fd = f_out.fileno()
|
|
||||||
|
|
||||||
self.pollob = poll()
|
self.pollob = poll()
|
||||||
self.pollob.register(self.input_fd, select.POLLIN)
|
self.pollob.register(self.input_fd, select.POLLIN)
|
||||||
|
@ -592,14 +581,19 @@ class UnixConsole(Console):
|
||||||
px_pos = 0
|
px_pos = 0
|
||||||
j = 0
|
j = 0
|
||||||
for c in oldline:
|
for c in oldline:
|
||||||
if j >= px_coord: break
|
if j >= px_coord:
|
||||||
|
break
|
||||||
j += wlen(c)
|
j += wlen(c)
|
||||||
px_pos += 1
|
px_pos += 1
|
||||||
|
|
||||||
# reuse the oldline as much as possible, but stop as soon as we
|
# reuse the oldline as much as possible, but stop as soon as we
|
||||||
# encounter an ESCAPE, because it might be the start of an escape
|
# encounter an ESCAPE, because it might be the start of an escape
|
||||||
# sequene
|
# sequene
|
||||||
while x_coord < minlen and oldline[x_pos] == newline[x_pos] and newline[x_pos] != "\x1b":
|
while (
|
||||||
|
x_coord < minlen
|
||||||
|
and oldline[x_pos] == newline[x_pos]
|
||||||
|
and newline[x_pos] != "\x1b"
|
||||||
|
):
|
||||||
x_coord += wlen(newline[x_pos])
|
x_coord += wlen(newline[x_pos])
|
||||||
x_pos += 1
|
x_pos += 1
|
||||||
|
|
||||||
|
@ -619,7 +613,11 @@ class UnixConsole(Console):
|
||||||
self.__posxy = x_coord + character_width, y
|
self.__posxy = x_coord + character_width, y
|
||||||
|
|
||||||
# if it's a single character change in the middle of the line
|
# if it's a single character change in the middle of the line
|
||||||
elif x_coord < minlen and oldline[x_pos + 1 :] == newline[x_pos + 1 :] and wlen(oldline[x_pos]) == wlen(newline[x_pos]):
|
elif (
|
||||||
|
x_coord < minlen
|
||||||
|
and oldline[x_pos + 1 :] == newline[x_pos + 1 :]
|
||||||
|
and wlen(oldline[x_pos]) == wlen(newline[x_pos])
|
||||||
|
):
|
||||||
character_width = wlen(newline[x_pos])
|
character_width = wlen(newline[x_pos])
|
||||||
self.__move(x_coord, y)
|
self.__move(x_coord, y)
|
||||||
self.__write(newline[x_pos])
|
self.__write(newline[x_pos])
|
||||||
|
|
587
Lib/_pyrepl/windows_console.py
Normal file
587
Lib/_pyrepl/windows_console.py
Normal file
|
@ -0,0 +1,587 @@
|
||||||
|
# Copyright 2000-2004 Michael Hudson-Doyle <micahel@gmail.com>
|
||||||
|
#
|
||||||
|
# All Rights Reserved
|
||||||
|
#
|
||||||
|
#
|
||||||
|
# Permission to use, copy, modify, and distribute this software and
|
||||||
|
# its documentation for any purpose is hereby granted without fee,
|
||||||
|
# provided that the above copyright notice appear in all copies and
|
||||||
|
# that both that copyright notice and this permission notice appear in
|
||||||
|
# supporting documentation.
|
||||||
|
#
|
||||||
|
# THE AUTHOR MICHAEL HUDSON DISCLAIMS ALL WARRANTIES WITH REGARD TO
|
||||||
|
# THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
|
||||||
|
# AND FITNESS, IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL,
|
||||||
|
# INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER
|
||||||
|
# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF
|
||||||
|
# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
|
||||||
|
# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import io
|
||||||
|
from multiprocessing import Value
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
|
||||||
|
from abc import ABC, abstractmethod
|
||||||
|
from collections import deque
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
import ctypes
|
||||||
|
from ctypes.wintypes import (
|
||||||
|
_COORD,
|
||||||
|
WORD,
|
||||||
|
SMALL_RECT,
|
||||||
|
BOOL,
|
||||||
|
HANDLE,
|
||||||
|
CHAR,
|
||||||
|
DWORD,
|
||||||
|
WCHAR,
|
||||||
|
SHORT,
|
||||||
|
)
|
||||||
|
from ctypes import Structure, POINTER, Union
|
||||||
|
from .console import Event, Console
|
||||||
|
from .trace import trace
|
||||||
|
from .utils import wlen
|
||||||
|
|
||||||
|
try:
|
||||||
|
from ctypes import GetLastError, WinDLL, windll, WinError # type: ignore[attr-defined]
|
||||||
|
except:
|
||||||
|
# Keep MyPy happy off Windows
|
||||||
|
from ctypes import CDLL as WinDLL, cdll as windll
|
||||||
|
|
||||||
|
def GetLastError() -> int:
|
||||||
|
return 42
|
||||||
|
|
||||||
|
class WinError(OSError): # type: ignore[no-redef]
|
||||||
|
def __init__(self, err: int | None, descr: str | None = None) -> None:
|
||||||
|
self.err = err
|
||||||
|
self.descr = descr
|
||||||
|
|
||||||
|
|
||||||
|
TYPE_CHECKING = False
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from typing import IO
|
||||||
|
|
||||||
|
VK_MAP: dict[int, str] = {
|
||||||
|
0x23: "end", # VK_END
|
||||||
|
0x24: "home", # VK_HOME
|
||||||
|
0x25: "left", # VK_LEFT
|
||||||
|
0x26: "up", # VK_UP
|
||||||
|
0x27: "right", # VK_RIGHT
|
||||||
|
0x28: "down", # VK_DOWN
|
||||||
|
0x2E: "delete", # VK_DELETE
|
||||||
|
0x70: "f1", # VK_F1
|
||||||
|
0x71: "f2", # VK_F2
|
||||||
|
0x72: "f3", # VK_F3
|
||||||
|
0x73: "f4", # VK_F4
|
||||||
|
0x74: "f5", # VK_F5
|
||||||
|
0x75: "f6", # VK_F6
|
||||||
|
0x76: "f7", # VK_F7
|
||||||
|
0x77: "f8", # VK_F8
|
||||||
|
0x78: "f9", # VK_F9
|
||||||
|
0x79: "f10", # VK_F10
|
||||||
|
0x7A: "f11", # VK_F11
|
||||||
|
0x7B: "f12", # VK_F12
|
||||||
|
0x7C: "f13", # VK_F13
|
||||||
|
0x7D: "f14", # VK_F14
|
||||||
|
0x7E: "f15", # VK_F15
|
||||||
|
0x7F: "f16", # VK_F16
|
||||||
|
0x79: "f17", # VK_F17
|
||||||
|
0x80: "f18", # VK_F18
|
||||||
|
0x81: "f19", # VK_F19
|
||||||
|
0x82: "f20", # VK_F20
|
||||||
|
}
|
||||||
|
|
||||||
|
# Console escape codes: https://learn.microsoft.com/en-us/windows/console/console-virtual-terminal-sequences
|
||||||
|
ERASE_IN_LINE = "\x1b[K"
|
||||||
|
MOVE_LEFT = "\x1b[{}D"
|
||||||
|
MOVE_RIGHT = "\x1b[{}C"
|
||||||
|
MOVE_UP = "\x1b[{}A"
|
||||||
|
MOVE_DOWN = "\x1b[{}B"
|
||||||
|
CLEAR = "\x1b[H\x1b[J"
|
||||||
|
|
||||||
|
|
||||||
|
class _error(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class WindowsConsole(Console):
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
f_in: IO[bytes] | int = 0,
|
||||||
|
f_out: IO[bytes] | int = 1,
|
||||||
|
term: str = "",
|
||||||
|
encoding: str = "",
|
||||||
|
):
|
||||||
|
super().__init__(f_in, f_out, term, encoding)
|
||||||
|
|
||||||
|
SetConsoleMode(
|
||||||
|
OutHandle,
|
||||||
|
ENABLE_WRAP_AT_EOL_OUTPUT
|
||||||
|
| ENABLE_PROCESSED_OUTPUT
|
||||||
|
| ENABLE_VIRTUAL_TERMINAL_PROCESSING,
|
||||||
|
)
|
||||||
|
self.screen: list[str] = []
|
||||||
|
self.width = 80
|
||||||
|
self.height = 25
|
||||||
|
self.__offset = 0
|
||||||
|
self.event_queue: deque[Event] = deque()
|
||||||
|
try:
|
||||||
|
self.out = io._WindowsConsoleIO(self.output_fd, "w") # type: ignore[attr-defined]
|
||||||
|
except ValueError:
|
||||||
|
# Console I/O is redirected, fallback...
|
||||||
|
self.out = None
|
||||||
|
|
||||||
|
def refresh(self, screen: list[str], c_xy: tuple[int, int]) -> None:
|
||||||
|
"""
|
||||||
|
Refresh the console screen.
|
||||||
|
|
||||||
|
Parameters:
|
||||||
|
- screen (list): List of strings representing the screen contents.
|
||||||
|
- c_xy (tuple): Cursor position (x, y) on the screen.
|
||||||
|
"""
|
||||||
|
cx, cy = c_xy
|
||||||
|
|
||||||
|
while len(self.screen) < min(len(screen), self.height):
|
||||||
|
self._hide_cursor()
|
||||||
|
self._move_relative(0, len(self.screen) - 1)
|
||||||
|
self.__write("\n")
|
||||||
|
self.__posxy = 0, len(self.screen)
|
||||||
|
self.screen.append("")
|
||||||
|
|
||||||
|
px, py = self.__posxy
|
||||||
|
old_offset = offset = self.__offset
|
||||||
|
height = self.height
|
||||||
|
|
||||||
|
# we make sure the cursor is on the screen, and that we're
|
||||||
|
# using all of the screen if we can
|
||||||
|
if cy < offset:
|
||||||
|
offset = cy
|
||||||
|
elif cy >= offset + height:
|
||||||
|
offset = cy - height + 1
|
||||||
|
scroll_lines = offset - old_offset
|
||||||
|
|
||||||
|
# Scrolling the buffer as the current input is greater than the visible
|
||||||
|
# portion of the window. We need to scroll the visible portion and the
|
||||||
|
# entire history
|
||||||
|
self._scroll(scroll_lines, self._getscrollbacksize())
|
||||||
|
self.__posxy = self.__posxy[0], self.__posxy[1] + scroll_lines
|
||||||
|
self.__offset += scroll_lines
|
||||||
|
|
||||||
|
for i in range(scroll_lines):
|
||||||
|
self.screen.append("")
|
||||||
|
elif offset > 0 and len(screen) < offset + height:
|
||||||
|
offset = max(len(screen) - height, 0)
|
||||||
|
screen.append("")
|
||||||
|
|
||||||
|
oldscr = self.screen[old_offset : old_offset + height]
|
||||||
|
newscr = screen[offset : offset + height]
|
||||||
|
|
||||||
|
self.__offset = offset
|
||||||
|
|
||||||
|
self._hide_cursor()
|
||||||
|
for (
|
||||||
|
y,
|
||||||
|
oldline,
|
||||||
|
newline,
|
||||||
|
) in zip(range(offset, offset + height), oldscr, newscr):
|
||||||
|
if oldline != newline:
|
||||||
|
self.__write_changed_line(y, oldline, newline, px)
|
||||||
|
|
||||||
|
y = len(newscr)
|
||||||
|
while y < len(oldscr):
|
||||||
|
self._move_relative(0, y)
|
||||||
|
self.__posxy = 0, y
|
||||||
|
self._erase_to_end()
|
||||||
|
y += 1
|
||||||
|
|
||||||
|
self._show_cursor()
|
||||||
|
|
||||||
|
self.screen = screen
|
||||||
|
self.move_cursor(cx, cy)
|
||||||
|
|
||||||
|
def __write_changed_line(
|
||||||
|
self, y: int, oldline: str, newline: str, px_coord: int
|
||||||
|
) -> None:
|
||||||
|
# this is frustrating; there's no reason to test (say)
|
||||||
|
# self.dch1 inside the loop -- but alternative ways of
|
||||||
|
# structuring this function are equally painful (I'm trying to
|
||||||
|
# avoid writing code generators these days...)
|
||||||
|
minlen = min(wlen(oldline), wlen(newline))
|
||||||
|
x_pos = 0
|
||||||
|
x_coord = 0
|
||||||
|
|
||||||
|
px_pos = 0
|
||||||
|
j = 0
|
||||||
|
for c in oldline:
|
||||||
|
if j >= px_coord:
|
||||||
|
break
|
||||||
|
j += wlen(c)
|
||||||
|
px_pos += 1
|
||||||
|
|
||||||
|
# reuse the oldline as much as possible, but stop as soon as we
|
||||||
|
# encounter an ESCAPE, because it might be the start of an escape
|
||||||
|
# sequene
|
||||||
|
while (
|
||||||
|
x_coord < minlen
|
||||||
|
and oldline[x_pos] == newline[x_pos]
|
||||||
|
and newline[x_pos] != "\x1b"
|
||||||
|
):
|
||||||
|
x_coord += wlen(newline[x_pos])
|
||||||
|
x_pos += 1
|
||||||
|
|
||||||
|
self._hide_cursor()
|
||||||
|
self._move_relative(x_coord, y)
|
||||||
|
if wlen(oldline) > wlen(newline):
|
||||||
|
self._erase_to_end()
|
||||||
|
|
||||||
|
self.__write(newline[x_pos:])
|
||||||
|
if wlen(newline) == self.width:
|
||||||
|
# If we wrapped we want to start at the next line
|
||||||
|
self._move_relative(0, y + 1)
|
||||||
|
self.__posxy = 0, y + 1
|
||||||
|
else:
|
||||||
|
self.__posxy = wlen(newline), y
|
||||||
|
|
||||||
|
if "\x1b" in newline or y != self.__posxy[1]:
|
||||||
|
# ANSI escape characters are present, so we can't assume
|
||||||
|
# anything about the position of the cursor. Moving the cursor
|
||||||
|
# to the left margin should work to get to a known position.
|
||||||
|
self.move_cursor(0, y)
|
||||||
|
|
||||||
|
def _scroll(
|
||||||
|
self, top: int, bottom: int, left: int | None = None, right: int | None = None
|
||||||
|
) -> None:
|
||||||
|
scroll_rect = SMALL_RECT()
|
||||||
|
scroll_rect.Top = SHORT(top)
|
||||||
|
scroll_rect.Bottom = SHORT(bottom)
|
||||||
|
scroll_rect.Left = SHORT(0 if left is None else left)
|
||||||
|
scroll_rect.Right = SHORT(
|
||||||
|
self.getheightwidth()[1] - 1 if right is None else right
|
||||||
|
)
|
||||||
|
destination_origin = _COORD()
|
||||||
|
fill_info = CHAR_INFO()
|
||||||
|
fill_info.UnicodeChar = " "
|
||||||
|
|
||||||
|
if not ScrollConsoleScreenBuffer(
|
||||||
|
OutHandle, scroll_rect, None, destination_origin, fill_info
|
||||||
|
):
|
||||||
|
raise WinError(GetLastError())
|
||||||
|
|
||||||
|
def _hide_cursor(self):
|
||||||
|
self.__write("\x1b[?25l")
|
||||||
|
|
||||||
|
def _show_cursor(self):
|
||||||
|
self.__write("\x1b[?25h")
|
||||||
|
|
||||||
|
def _enable_blinking(self):
|
||||||
|
self.__write("\x1b[?12h")
|
||||||
|
|
||||||
|
def _disable_blinking(self):
|
||||||
|
self.__write("\x1b[?12l")
|
||||||
|
|
||||||
|
def __write(self, text: str) -> None:
|
||||||
|
if self.out is not None:
|
||||||
|
self.out.write(text.encode(self.encoding, "replace"))
|
||||||
|
self.out.flush()
|
||||||
|
else:
|
||||||
|
os.write(self.output_fd, text.encode(self.encoding, "replace"))
|
||||||
|
|
||||||
|
@property
|
||||||
|
def screen_xy(self) -> tuple[int, int]:
|
||||||
|
info = CONSOLE_SCREEN_BUFFER_INFO()
|
||||||
|
if not GetConsoleScreenBufferInfo(OutHandle, info):
|
||||||
|
raise WinError(GetLastError())
|
||||||
|
return info.dwCursorPosition.X, info.dwCursorPosition.Y
|
||||||
|
|
||||||
|
def _erase_to_end(self) -> None:
|
||||||
|
self.__write(ERASE_IN_LINE)
|
||||||
|
|
||||||
|
def prepare(self) -> None:
|
||||||
|
trace("prepare")
|
||||||
|
self.screen = []
|
||||||
|
self.height, self.width = self.getheightwidth()
|
||||||
|
|
||||||
|
self.__posxy = 0, 0
|
||||||
|
self.__gone_tall = 0
|
||||||
|
self.__offset = 0
|
||||||
|
|
||||||
|
def restore(self) -> None:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def _move_relative(self, x: int, y: int) -> None:
|
||||||
|
"""Moves relative to the current __posxy"""
|
||||||
|
dx = x - self.__posxy[0]
|
||||||
|
dy = y - self.__posxy[1]
|
||||||
|
if dx < 0:
|
||||||
|
self.__write(MOVE_LEFT.format(-dx))
|
||||||
|
elif dx > 0:
|
||||||
|
self.__write(MOVE_RIGHT.format(dx))
|
||||||
|
|
||||||
|
if dy < 0:
|
||||||
|
self.__write(MOVE_UP.format(-dy))
|
||||||
|
elif dy > 0:
|
||||||
|
self.__write(MOVE_DOWN.format(dy))
|
||||||
|
|
||||||
|
def move_cursor(self, x: int, y: int) -> None:
|
||||||
|
if x < 0 or y < 0:
|
||||||
|
raise ValueError(f"Bad cursor position {x}, {y}")
|
||||||
|
|
||||||
|
if y < self.__offset or y >= self.__offset + self.height:
|
||||||
|
self.event_queue.insert(0, Event("scroll", ""))
|
||||||
|
else:
|
||||||
|
self._move_relative(x, y)
|
||||||
|
self.__posxy = x, y
|
||||||
|
|
||||||
|
def set_cursor_vis(self, visible: bool) -> None:
|
||||||
|
if visible:
|
||||||
|
self._show_cursor()
|
||||||
|
else:
|
||||||
|
self._hide_cursor()
|
||||||
|
|
||||||
|
def getheightwidth(self) -> tuple[int, int]:
|
||||||
|
"""Return (height, width) where height and width are the height
|
||||||
|
and width of the terminal window in characters."""
|
||||||
|
info = CONSOLE_SCREEN_BUFFER_INFO()
|
||||||
|
if not GetConsoleScreenBufferInfo(OutHandle, info):
|
||||||
|
raise WinError(GetLastError())
|
||||||
|
return (
|
||||||
|
info.srWindow.Bottom - info.srWindow.Top + 1,
|
||||||
|
info.srWindow.Right - info.srWindow.Left + 1,
|
||||||
|
)
|
||||||
|
|
||||||
|
def _getscrollbacksize(self) -> int:
|
||||||
|
info = CONSOLE_SCREEN_BUFFER_INFO()
|
||||||
|
if not GetConsoleScreenBufferInfo(OutHandle, info):
|
||||||
|
raise WinError(GetLastError())
|
||||||
|
|
||||||
|
return info.srWindow.Bottom # type: ignore[no-any-return]
|
||||||
|
|
||||||
|
def _read_input(self) -> INPUT_RECORD | None:
|
||||||
|
rec = INPUT_RECORD()
|
||||||
|
read = DWORD()
|
||||||
|
if not ReadConsoleInput(InHandle, rec, 1, read):
|
||||||
|
raise WinError(GetLastError())
|
||||||
|
|
||||||
|
if read.value == 0:
|
||||||
|
return None
|
||||||
|
|
||||||
|
return rec
|
||||||
|
|
||||||
|
def get_event(self, block: bool = True) -> Event | None:
|
||||||
|
"""Return an Event instance. Returns None if |block| is false
|
||||||
|
and there is no event pending, otherwise waits for the
|
||||||
|
completion of an event."""
|
||||||
|
if self.event_queue:
|
||||||
|
return self.event_queue.pop()
|
||||||
|
|
||||||
|
while True:
|
||||||
|
rec = self._read_input()
|
||||||
|
if rec is None:
|
||||||
|
if block:
|
||||||
|
continue
|
||||||
|
return None
|
||||||
|
|
||||||
|
if rec.EventType == WINDOW_BUFFER_SIZE_EVENT:
|
||||||
|
return Event("resize", "")
|
||||||
|
|
||||||
|
if rec.EventType != KEY_EVENT or not rec.Event.KeyEvent.bKeyDown:
|
||||||
|
# Only process keys and keydown events
|
||||||
|
if block:
|
||||||
|
continue
|
||||||
|
return None
|
||||||
|
|
||||||
|
key = rec.Event.KeyEvent.uChar.UnicodeChar
|
||||||
|
|
||||||
|
if rec.Event.KeyEvent.uChar.UnicodeChar == "\r":
|
||||||
|
# Make enter make unix-like
|
||||||
|
return Event(evt="key", data="\n", raw=b"\n")
|
||||||
|
elif rec.Event.KeyEvent.wVirtualKeyCode == 8:
|
||||||
|
# Turn backspace directly into the command
|
||||||
|
return Event(
|
||||||
|
evt="key",
|
||||||
|
data="backspace",
|
||||||
|
raw=rec.Event.KeyEvent.uChar.UnicodeChar,
|
||||||
|
)
|
||||||
|
elif rec.Event.KeyEvent.uChar.UnicodeChar == "\x00":
|
||||||
|
# Handle special keys like arrow keys and translate them into the appropriate command
|
||||||
|
code = VK_MAP.get(rec.Event.KeyEvent.wVirtualKeyCode)
|
||||||
|
if code:
|
||||||
|
return Event(
|
||||||
|
evt="key", data=code, raw=rec.Event.KeyEvent.uChar.UnicodeChar
|
||||||
|
)
|
||||||
|
if block:
|
||||||
|
continue
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
return Event(evt="key", data=key, raw=rec.Event.KeyEvent.uChar.UnicodeChar)
|
||||||
|
|
||||||
|
def push_char(self, char: int | bytes) -> None:
|
||||||
|
"""
|
||||||
|
Push a character to the console event queue.
|
||||||
|
"""
|
||||||
|
raise NotImplementedError("push_char not supported on Windows")
|
||||||
|
|
||||||
|
def beep(self) -> None:
|
||||||
|
self.__write("\x07")
|
||||||
|
|
||||||
|
def clear(self) -> None:
|
||||||
|
"""Wipe the screen"""
|
||||||
|
self.__write(CLEAR)
|
||||||
|
self.__posxy = 0, 0
|
||||||
|
self.screen = [""]
|
||||||
|
|
||||||
|
def finish(self) -> None:
|
||||||
|
"""Move the cursor to the end of the display and otherwise get
|
||||||
|
ready for end. XXX could be merged with restore? Hmm."""
|
||||||
|
y = len(self.screen) - 1
|
||||||
|
while y >= 0 and not self.screen[y]:
|
||||||
|
y -= 1
|
||||||
|
self._move_relative(0, min(y, self.height + self.__offset - 1))
|
||||||
|
self.__write("\r\n")
|
||||||
|
|
||||||
|
def flushoutput(self) -> None:
|
||||||
|
"""Flush all output to the screen (assuming there's some
|
||||||
|
buffering going on somewhere).
|
||||||
|
|
||||||
|
All output on Windows is unbuffered so this is a nop"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
def forgetinput(self) -> None:
|
||||||
|
"""Forget all pending, but not yet processed input."""
|
||||||
|
while self._read_input() is not None:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def getpending(self) -> Event:
|
||||||
|
"""Return the characters that have been typed but not yet
|
||||||
|
processed."""
|
||||||
|
return Event("key", "", b"")
|
||||||
|
|
||||||
|
def wait(self) -> None:
|
||||||
|
"""Wait for an event."""
|
||||||
|
raise NotImplementedError("No wait support")
|
||||||
|
|
||||||
|
def repaint(self) -> None:
|
||||||
|
raise NotImplementedError("No repaint support")
|
||||||
|
|
||||||
|
|
||||||
|
# Windows interop
|
||||||
|
class CONSOLE_SCREEN_BUFFER_INFO(Structure):
|
||||||
|
_fields_ = [
|
||||||
|
("dwSize", _COORD),
|
||||||
|
("dwCursorPosition", _COORD),
|
||||||
|
("wAttributes", WORD),
|
||||||
|
("srWindow", SMALL_RECT),
|
||||||
|
("dwMaximumWindowSize", _COORD),
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
class CONSOLE_CURSOR_INFO(Structure):
|
||||||
|
_fields_ = [
|
||||||
|
("dwSize", DWORD),
|
||||||
|
("bVisible", BOOL),
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
class CHAR_INFO(Structure):
|
||||||
|
_fields_ = [
|
||||||
|
("UnicodeChar", WCHAR),
|
||||||
|
("Attributes", WORD),
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
class Char(Union):
|
||||||
|
_fields_ = [
|
||||||
|
("UnicodeChar", WCHAR),
|
||||||
|
("Char", CHAR),
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
class KeyEvent(ctypes.Structure):
|
||||||
|
_fields_ = [
|
||||||
|
("bKeyDown", BOOL),
|
||||||
|
("wRepeatCount", WORD),
|
||||||
|
("wVirtualKeyCode", WORD),
|
||||||
|
("wVirtualScanCode", WORD),
|
||||||
|
("uChar", Char),
|
||||||
|
("dwControlKeyState", DWORD),
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
class WindowsBufferSizeEvent(ctypes.Structure):
|
||||||
|
_fields_ = [("dwSize", _COORD)]
|
||||||
|
|
||||||
|
|
||||||
|
class ConsoleEvent(ctypes.Union):
|
||||||
|
_fields_ = [
|
||||||
|
("KeyEvent", KeyEvent),
|
||||||
|
("WindowsBufferSizeEvent", WindowsBufferSizeEvent),
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
class INPUT_RECORD(Structure):
|
||||||
|
_fields_ = [("EventType", WORD), ("Event", ConsoleEvent)]
|
||||||
|
|
||||||
|
|
||||||
|
KEY_EVENT = 0x01
|
||||||
|
FOCUS_EVENT = 0x10
|
||||||
|
MENU_EVENT = 0x08
|
||||||
|
MOUSE_EVENT = 0x02
|
||||||
|
WINDOW_BUFFER_SIZE_EVENT = 0x04
|
||||||
|
|
||||||
|
ENABLE_PROCESSED_OUTPUT = 0x01
|
||||||
|
ENABLE_WRAP_AT_EOL_OUTPUT = 0x02
|
||||||
|
ENABLE_VIRTUAL_TERMINAL_PROCESSING = 0x04
|
||||||
|
|
||||||
|
STD_INPUT_HANDLE = -10
|
||||||
|
STD_OUTPUT_HANDLE = -11
|
||||||
|
|
||||||
|
if sys.platform == "win32":
|
||||||
|
_KERNEL32 = WinDLL("kernel32", use_last_error=True)
|
||||||
|
|
||||||
|
GetStdHandle = windll.kernel32.GetStdHandle
|
||||||
|
GetStdHandle.argtypes = [DWORD]
|
||||||
|
GetStdHandle.restype = HANDLE
|
||||||
|
|
||||||
|
GetConsoleScreenBufferInfo = _KERNEL32.GetConsoleScreenBufferInfo
|
||||||
|
GetConsoleScreenBufferInfo.argtypes = [
|
||||||
|
HANDLE,
|
||||||
|
ctypes.POINTER(CONSOLE_SCREEN_BUFFER_INFO),
|
||||||
|
]
|
||||||
|
GetConsoleScreenBufferInfo.restype = BOOL
|
||||||
|
|
||||||
|
ScrollConsoleScreenBuffer = _KERNEL32.ScrollConsoleScreenBufferW
|
||||||
|
ScrollConsoleScreenBuffer.argtypes = [
|
||||||
|
HANDLE,
|
||||||
|
POINTER(SMALL_RECT),
|
||||||
|
POINTER(SMALL_RECT),
|
||||||
|
_COORD,
|
||||||
|
POINTER(CHAR_INFO),
|
||||||
|
]
|
||||||
|
ScrollConsoleScreenBuffer.restype = BOOL
|
||||||
|
|
||||||
|
SetConsoleMode = _KERNEL32.SetConsoleMode
|
||||||
|
SetConsoleMode.argtypes = [HANDLE, DWORD]
|
||||||
|
SetConsoleMode.restype = BOOL
|
||||||
|
|
||||||
|
ReadConsoleInput = _KERNEL32.ReadConsoleInputW
|
||||||
|
ReadConsoleInput.argtypes = [HANDLE, POINTER(INPUT_RECORD), DWORD, POINTER(DWORD)]
|
||||||
|
ReadConsoleInput.restype = BOOL
|
||||||
|
|
||||||
|
OutHandle = GetStdHandle(STD_OUTPUT_HANDLE)
|
||||||
|
InHandle = GetStdHandle(STD_INPUT_HANDLE)
|
||||||
|
else:
|
||||||
|
|
||||||
|
def _win_only(*args, **kwargs):
|
||||||
|
raise NotImplementedError("Windows only")
|
||||||
|
|
||||||
|
GetStdHandle = _win_only
|
||||||
|
GetConsoleScreenBufferInfo = _win_only
|
||||||
|
ScrollConsoleScreenBuffer = _win_only
|
||||||
|
SetConsoleMode = _win_only
|
||||||
|
ReadConsoleInput = _win_only
|
||||||
|
OutHandle = 0
|
||||||
|
InHandle = 0
|
|
@ -1,12 +1,14 @@
|
||||||
import os
|
import os
|
||||||
|
import sys
|
||||||
from test.support import requires, load_package_tests
|
from test.support import requires, load_package_tests
|
||||||
from test.support.import_helper import import_module
|
from test.support.import_helper import import_module
|
||||||
|
|
||||||
# Optionally test pyrepl. This currently requires that the
|
if sys.platform != "win32":
|
||||||
# 'curses' resource be given on the regrtest command line using the -u
|
# On non-Windows platforms, testing pyrepl currently requires that the
|
||||||
# option. Additionally, we need to attempt to import curses and readline.
|
# 'curses' resource be given on the regrtest command line using the -u
|
||||||
requires("curses")
|
# option. Additionally, we need to attempt to import curses and readline.
|
||||||
curses = import_module("curses")
|
requires("curses")
|
||||||
|
curses = import_module("curses")
|
||||||
|
|
||||||
|
|
||||||
def load_tests(*args):
|
def load_tests(*args):
|
||||||
|
|
|
@ -55,7 +55,7 @@ def prepare_reader(console: Console, **kwargs):
|
||||||
return reader
|
return reader
|
||||||
|
|
||||||
|
|
||||||
def prepare_console(events: Iterable[Event], **kwargs):
|
def prepare_console(events: Iterable[Event], **kwargs) -> MagicMock | Console:
|
||||||
console = MagicMock()
|
console = MagicMock()
|
||||||
console.get_event.side_effect = events
|
console.get_event.side_effect = events
|
||||||
console.height = 100
|
console.height = 100
|
||||||
|
|
|
@ -508,14 +508,15 @@ class TestPyReplCompleter(TestCase):
|
||||||
reader = ReadlineAlikeReader(console=console, config=config)
|
reader = ReadlineAlikeReader(console=console, config=config)
|
||||||
return reader
|
return reader
|
||||||
|
|
||||||
|
@patch("rlcompleter._readline_available", False)
|
||||||
def test_simple_completion(self):
|
def test_simple_completion(self):
|
||||||
events = code_to_events("os.geten\t\n")
|
events = code_to_events("os.getpid\t\n")
|
||||||
|
|
||||||
namespace = {"os": os}
|
namespace = {"os": os}
|
||||||
reader = self.prepare_reader(events, namespace)
|
reader = self.prepare_reader(events, namespace)
|
||||||
|
|
||||||
output = multiline_input(reader, namespace)
|
output = multiline_input(reader, namespace)
|
||||||
self.assertEqual(output, "os.getenv")
|
self.assertEqual(output, "os.getpid()")
|
||||||
|
|
||||||
def test_completion_with_many_options(self):
|
def test_completion_with_many_options(self):
|
||||||
# Test with something that initially displays many options
|
# Test with something that initially displays many options
|
||||||
|
|
|
@ -1,12 +1,16 @@
|
||||||
import itertools
|
import itertools
|
||||||
|
import sys
|
||||||
|
import unittest
|
||||||
from functools import partial
|
from functools import partial
|
||||||
from unittest import TestCase
|
from unittest import TestCase
|
||||||
from unittest.mock import MagicMock, call, patch, ANY
|
from unittest.mock import MagicMock, call, patch, ANY
|
||||||
|
|
||||||
from .support import handle_all_events, code_to_events
|
from .support import handle_all_events, code_to_events
|
||||||
from _pyrepl.console import Event
|
try:
|
||||||
from _pyrepl.unix_console import UnixConsole
|
from _pyrepl.console import Event
|
||||||
|
from _pyrepl.unix_console import UnixConsole
|
||||||
|
except ImportError:
|
||||||
|
pass
|
||||||
|
|
||||||
def unix_console(events, **kwargs):
|
def unix_console(events, **kwargs):
|
||||||
console = UnixConsole()
|
console = UnixConsole()
|
||||||
|
@ -67,6 +71,7 @@ TERM_CAPABILITIES = {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@unittest.skipIf(sys.platform == "win32", "No Unix event queue on Windows")
|
||||||
@patch("_pyrepl.curses.tigetstr", lambda s: TERM_CAPABILITIES.get(s))
|
@patch("_pyrepl.curses.tigetstr", lambda s: TERM_CAPABILITIES.get(s))
|
||||||
@patch(
|
@patch(
|
||||||
"_pyrepl.curses.tparm",
|
"_pyrepl.curses.tparm",
|
||||||
|
|
|
@ -1,11 +1,15 @@
|
||||||
import tempfile
|
import tempfile
|
||||||
import unittest
|
import unittest
|
||||||
|
import sys
|
||||||
from unittest.mock import patch
|
from unittest.mock import patch
|
||||||
|
|
||||||
from _pyrepl.console import Event
|
try:
|
||||||
from _pyrepl.unix_eventqueue import EventQueue
|
from _pyrepl.console import Event
|
||||||
|
from _pyrepl.unix_eventqueue import EventQueue
|
||||||
|
except ImportError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
@unittest.skipIf(sys.platform == "win32", "No Unix event queue on Windows")
|
||||||
@patch("_pyrepl.curses.tigetstr", lambda x: b"")
|
@patch("_pyrepl.curses.tigetstr", lambda x: b"")
|
||||||
class TestUnixEventQueue(unittest.TestCase):
|
class TestUnixEventQueue(unittest.TestCase):
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
|
|
331
Lib/test/test_pyrepl/test_windows_console.py
Normal file
331
Lib/test/test_pyrepl/test_windows_console.py
Normal file
|
@ -0,0 +1,331 @@
|
||||||
|
import itertools
|
||||||
|
import sys
|
||||||
|
import unittest
|
||||||
|
from _pyrepl.console import Event, Console
|
||||||
|
from _pyrepl.windows_console import (
|
||||||
|
MOVE_LEFT,
|
||||||
|
MOVE_RIGHT,
|
||||||
|
MOVE_UP,
|
||||||
|
MOVE_DOWN,
|
||||||
|
ERASE_IN_LINE,
|
||||||
|
)
|
||||||
|
from functools import partial
|
||||||
|
from typing import Iterable
|
||||||
|
from unittest import TestCase, main
|
||||||
|
from unittest.mock import MagicMock, call, patch, ANY
|
||||||
|
|
||||||
|
from .support import handle_all_events, code_to_events
|
||||||
|
|
||||||
|
try:
|
||||||
|
from _pyrepl.console import Event
|
||||||
|
from _pyrepl.windows_console import WindowsConsole
|
||||||
|
except ImportError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
@unittest.skipIf(sys.platform != "win32", "Test class specifically for Windows")
|
||||||
|
class WindowsConsoleTests(TestCase):
|
||||||
|
def console(self, events, **kwargs) -> Console:
|
||||||
|
console = WindowsConsole()
|
||||||
|
console.get_event = MagicMock(side_effect=events)
|
||||||
|
console._scroll = MagicMock()
|
||||||
|
console._hide_cursor = MagicMock()
|
||||||
|
console._show_cursor = MagicMock()
|
||||||
|
console._getscrollbacksize = MagicMock(42)
|
||||||
|
console.out = MagicMock()
|
||||||
|
|
||||||
|
height = kwargs.get("height", 25)
|
||||||
|
width = kwargs.get("width", 80)
|
||||||
|
console.getheightwidth = MagicMock(side_effect=lambda: (height, width))
|
||||||
|
|
||||||
|
console.prepare()
|
||||||
|
for key, val in kwargs.items():
|
||||||
|
setattr(console, key, val)
|
||||||
|
return console
|
||||||
|
|
||||||
|
def handle_events(self, events: Iterable[Event], **kwargs):
|
||||||
|
return handle_all_events(events, partial(self.console, **kwargs))
|
||||||
|
|
||||||
|
def handle_events_narrow(self, events):
|
||||||
|
return self.handle_events(events, width=5)
|
||||||
|
|
||||||
|
def handle_events_short(self, events):
|
||||||
|
return self.handle_events(events, height=1)
|
||||||
|
|
||||||
|
def handle_events_height_3(self, events):
|
||||||
|
return self.handle_events(events, height=3)
|
||||||
|
|
||||||
|
def test_simple_addition(self):
|
||||||
|
code = "12+34"
|
||||||
|
events = code_to_events(code)
|
||||||
|
_, con = self.handle_events(events)
|
||||||
|
con.out.write.assert_any_call(b"1")
|
||||||
|
con.out.write.assert_any_call(b"2")
|
||||||
|
con.out.write.assert_any_call(b"+")
|
||||||
|
con.out.write.assert_any_call(b"3")
|
||||||
|
con.out.write.assert_any_call(b"4")
|
||||||
|
con.restore()
|
||||||
|
|
||||||
|
def test_wrap(self):
|
||||||
|
code = "12+34"
|
||||||
|
events = code_to_events(code)
|
||||||
|
_, con = self.handle_events_narrow(events)
|
||||||
|
con.out.write.assert_any_call(b"1")
|
||||||
|
con.out.write.assert_any_call(b"2")
|
||||||
|
con.out.write.assert_any_call(b"+")
|
||||||
|
con.out.write.assert_any_call(b"3")
|
||||||
|
con.out.write.assert_any_call(b"\\")
|
||||||
|
con.out.write.assert_any_call(b"\n")
|
||||||
|
con.out.write.assert_any_call(b"4")
|
||||||
|
con.restore()
|
||||||
|
|
||||||
|
def test_resize_wider(self):
|
||||||
|
code = "1234567890"
|
||||||
|
events = code_to_events(code)
|
||||||
|
reader, console = self.handle_events_narrow(events)
|
||||||
|
|
||||||
|
console.height = 20
|
||||||
|
console.width = 80
|
||||||
|
console.getheightwidth = MagicMock(lambda _: (20, 80))
|
||||||
|
|
||||||
|
def same_reader(_):
|
||||||
|
return reader
|
||||||
|
|
||||||
|
def same_console(events):
|
||||||
|
console.get_event = MagicMock(side_effect=events)
|
||||||
|
return console
|
||||||
|
|
||||||
|
_, con = handle_all_events(
|
||||||
|
[Event(evt="resize", data=None)],
|
||||||
|
prepare_reader=same_reader,
|
||||||
|
prepare_console=same_console,
|
||||||
|
)
|
||||||
|
|
||||||
|
con.out.write.assert_any_call(self.move_right(2))
|
||||||
|
con.out.write.assert_any_call(self.move_up(2))
|
||||||
|
con.out.write.assert_any_call(b"567890")
|
||||||
|
|
||||||
|
con.restore()
|
||||||
|
|
||||||
|
def test_resize_narrower(self):
|
||||||
|
code = "1234567890"
|
||||||
|
events = code_to_events(code)
|
||||||
|
reader, console = self.handle_events(events)
|
||||||
|
|
||||||
|
console.height = 20
|
||||||
|
console.width = 4
|
||||||
|
console.getheightwidth = MagicMock(lambda _: (20, 4))
|
||||||
|
|
||||||
|
def same_reader(_):
|
||||||
|
return reader
|
||||||
|
|
||||||
|
def same_console(events):
|
||||||
|
console.get_event = MagicMock(side_effect=events)
|
||||||
|
return console
|
||||||
|
|
||||||
|
_, con = handle_all_events(
|
||||||
|
[Event(evt="resize", data=None)],
|
||||||
|
prepare_reader=same_reader,
|
||||||
|
prepare_console=same_console,
|
||||||
|
)
|
||||||
|
|
||||||
|
con.out.write.assert_any_call(b"456\\")
|
||||||
|
con.out.write.assert_any_call(b"789\\")
|
||||||
|
|
||||||
|
con.restore()
|
||||||
|
|
||||||
|
def test_cursor_left(self):
|
||||||
|
code = "1"
|
||||||
|
events = itertools.chain(
|
||||||
|
code_to_events(code),
|
||||||
|
[Event(evt="key", data="left", raw=bytearray(b"\x1bOD"))],
|
||||||
|
)
|
||||||
|
_, con = self.handle_events(events)
|
||||||
|
con.out.write.assert_any_call(self.move_left())
|
||||||
|
con.restore()
|
||||||
|
|
||||||
|
def test_cursor_left_right(self):
|
||||||
|
code = "1"
|
||||||
|
events = itertools.chain(
|
||||||
|
code_to_events(code),
|
||||||
|
[
|
||||||
|
Event(evt="key", data="left", raw=bytearray(b"\x1bOD")),
|
||||||
|
Event(evt="key", data="right", raw=bytearray(b"\x1bOC")),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
_, con = self.handle_events(events)
|
||||||
|
con.out.write.assert_any_call(self.move_left())
|
||||||
|
con.out.write.assert_any_call(self.move_right())
|
||||||
|
con.restore()
|
||||||
|
|
||||||
|
def test_cursor_up(self):
|
||||||
|
code = "1\n2+3"
|
||||||
|
events = itertools.chain(
|
||||||
|
code_to_events(code),
|
||||||
|
[Event(evt="key", data="up", raw=bytearray(b"\x1bOA"))],
|
||||||
|
)
|
||||||
|
_, con = self.handle_events(events)
|
||||||
|
con.out.write.assert_any_call(self.move_up())
|
||||||
|
con.restore()
|
||||||
|
|
||||||
|
def test_cursor_up_down(self):
|
||||||
|
code = "1\n2+3"
|
||||||
|
events = itertools.chain(
|
||||||
|
code_to_events(code),
|
||||||
|
[
|
||||||
|
Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
|
||||||
|
Event(evt="key", data="down", raw=bytearray(b"\x1bOB")),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
_, con = self.handle_events(events)
|
||||||
|
con.out.write.assert_any_call(self.move_up())
|
||||||
|
con.out.write.assert_any_call(self.move_down())
|
||||||
|
con.restore()
|
||||||
|
|
||||||
|
def test_cursor_back_write(self):
|
||||||
|
events = itertools.chain(
|
||||||
|
code_to_events("1"),
|
||||||
|
[Event(evt="key", data="left", raw=bytearray(b"\x1bOD"))],
|
||||||
|
code_to_events("2"),
|
||||||
|
)
|
||||||
|
_, con = self.handle_events(events)
|
||||||
|
con.out.write.assert_any_call(b"1")
|
||||||
|
con.out.write.assert_any_call(self.move_left())
|
||||||
|
con.out.write.assert_any_call(b"21")
|
||||||
|
con.restore()
|
||||||
|
|
||||||
|
def test_multiline_function_move_up_short_terminal(self):
|
||||||
|
# fmt: off
|
||||||
|
code = (
|
||||||
|
"def f():\n"
|
||||||
|
" foo"
|
||||||
|
)
|
||||||
|
# fmt: on
|
||||||
|
|
||||||
|
events = itertools.chain(
|
||||||
|
code_to_events(code),
|
||||||
|
[
|
||||||
|
Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
|
||||||
|
Event(evt="scroll", data=None),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
_, con = self.handle_events_short(events)
|
||||||
|
con.out.write.assert_any_call(self.move_left(5))
|
||||||
|
con.out.write.assert_any_call(self.move_up())
|
||||||
|
con.restore()
|
||||||
|
|
||||||
|
def test_multiline_function_move_up_down_short_terminal(self):
|
||||||
|
# fmt: off
|
||||||
|
code = (
|
||||||
|
"def f():\n"
|
||||||
|
" foo"
|
||||||
|
)
|
||||||
|
# fmt: on
|
||||||
|
|
||||||
|
events = itertools.chain(
|
||||||
|
code_to_events(code),
|
||||||
|
[
|
||||||
|
Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
|
||||||
|
Event(evt="scroll", data=None),
|
||||||
|
Event(evt="key", data="down", raw=bytearray(b"\x1bOB")),
|
||||||
|
Event(evt="scroll", data=None),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
_, con = self.handle_events_short(events)
|
||||||
|
con.out.write.assert_any_call(self.move_left(8))
|
||||||
|
con.out.write.assert_any_call(self.erase_in_line())
|
||||||
|
con.restore()
|
||||||
|
|
||||||
|
def test_resize_bigger_on_multiline_function(self):
|
||||||
|
# fmt: off
|
||||||
|
code = (
|
||||||
|
"def f():\n"
|
||||||
|
" foo"
|
||||||
|
)
|
||||||
|
# fmt: on
|
||||||
|
|
||||||
|
events = itertools.chain(code_to_events(code))
|
||||||
|
reader, console = self.handle_events_short(events)
|
||||||
|
|
||||||
|
console.height = 2
|
||||||
|
console.getheightwidth = MagicMock(lambda _: (2, 80))
|
||||||
|
|
||||||
|
def same_reader(_):
|
||||||
|
return reader
|
||||||
|
|
||||||
|
def same_console(events):
|
||||||
|
console.get_event = MagicMock(side_effect=events)
|
||||||
|
return console
|
||||||
|
|
||||||
|
_, con = handle_all_events(
|
||||||
|
[Event(evt="resize", data=None)],
|
||||||
|
prepare_reader=same_reader,
|
||||||
|
prepare_console=same_console,
|
||||||
|
)
|
||||||
|
con.out.write.assert_has_calls(
|
||||||
|
[
|
||||||
|
call(self.move_left(5)),
|
||||||
|
call(self.move_up()),
|
||||||
|
call(b"def f():"),
|
||||||
|
call(self.move_left(3)),
|
||||||
|
call(self.move_down()),
|
||||||
|
]
|
||||||
|
)
|
||||||
|
console.restore()
|
||||||
|
con.restore()
|
||||||
|
|
||||||
|
def test_resize_smaller_on_multiline_function(self):
|
||||||
|
# fmt: off
|
||||||
|
code = (
|
||||||
|
"def f():\n"
|
||||||
|
" foo"
|
||||||
|
)
|
||||||
|
# fmt: on
|
||||||
|
|
||||||
|
events = itertools.chain(code_to_events(code))
|
||||||
|
reader, console = self.handle_events_height_3(events)
|
||||||
|
|
||||||
|
console.height = 1
|
||||||
|
console.getheightwidth = MagicMock(lambda _: (1, 80))
|
||||||
|
|
||||||
|
def same_reader(_):
|
||||||
|
return reader
|
||||||
|
|
||||||
|
def same_console(events):
|
||||||
|
console.get_event = MagicMock(side_effect=events)
|
||||||
|
return console
|
||||||
|
|
||||||
|
_, con = handle_all_events(
|
||||||
|
[Event(evt="resize", data=None)],
|
||||||
|
prepare_reader=same_reader,
|
||||||
|
prepare_console=same_console,
|
||||||
|
)
|
||||||
|
con.out.write.assert_has_calls(
|
||||||
|
[
|
||||||
|
call(self.move_left(5)),
|
||||||
|
call(self.move_up()),
|
||||||
|
call(self.erase_in_line()),
|
||||||
|
call(b" foo"),
|
||||||
|
]
|
||||||
|
)
|
||||||
|
console.restore()
|
||||||
|
con.restore()
|
||||||
|
|
||||||
|
def move_up(self, lines=1):
|
||||||
|
return MOVE_UP.format(lines).encode("utf8")
|
||||||
|
|
||||||
|
def move_down(self, lines=1):
|
||||||
|
return MOVE_DOWN.format(lines).encode("utf8")
|
||||||
|
|
||||||
|
def move_left(self, cols=1):
|
||||||
|
return MOVE_LEFT.format(cols).encode("utf8")
|
||||||
|
|
||||||
|
def move_right(self, cols=1):
|
||||||
|
return MOVE_RIGHT.format(cols).encode("utf8")
|
||||||
|
|
||||||
|
def erase_in_line(self):
|
||||||
|
return ERASE_IN_LINE.encode("utf8")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
|
@ -0,0 +1 @@
|
||||||
|
Add support for new pyrepl on Windows
|
Loading…
Add table
Add a link
Reference in a new issue