# Copyright 2000-2004 Michael Hudson-Doyle # # All Rights Reserved # # # Permission to use, copy, modify, and distribute this software and # its documentation for any purpose is hereby granted without fee, # provided that the above copyright notice appear in all copies and # that both that copyright notice and this permission notice appear in # supporting documentation. # # THE AUTHOR MICHAEL HUDSON DISCLAIMS ALL WARRANTIES WITH REGARD TO # THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY # AND FITNESS, IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, # INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER # RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF # CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN # CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. from __future__ import annotations import io from multiprocessing import Value import os import sys from abc import ABC, abstractmethod from collections import deque from dataclasses import dataclass, field import ctypes from ctypes.wintypes import ( _COORD, WORD, SMALL_RECT, BOOL, HANDLE, CHAR, DWORD, WCHAR, SHORT, ) from ctypes import Structure, POINTER, Union from .console import Event, Console from .trace import trace from .utils import wlen try: from ctypes import GetLastError, WinDLL, windll, WinError # type: ignore[attr-defined] except: # Keep MyPy happy off Windows from ctypes import CDLL as WinDLL, cdll as windll def GetLastError() -> int: return 42 class WinError(OSError): # type: ignore[no-redef] def __init__(self, err: int | None, descr: str | None = None) -> None: self.err = err self.descr = descr TYPE_CHECKING = False if TYPE_CHECKING: from typing import IO VK_MAP: dict[int, str] = { 0x23: "end", # VK_END 0x24: "home", # VK_HOME 0x25: "left", # VK_LEFT 0x26: "up", # VK_UP 0x27: "right", # VK_RIGHT 0x28: "down", # VK_DOWN 0x2E: "delete", # VK_DELETE 0x70: "f1", # VK_F1 0x71: "f2", # VK_F2 0x72: "f3", # VK_F3 0x73: "f4", # VK_F4 0x74: "f5", # VK_F5 0x75: "f6", # VK_F6 0x76: "f7", # VK_F7 0x77: "f8", # VK_F8 0x78: "f9", # VK_F9 0x79: "f10", # VK_F10 0x7A: "f11", # VK_F11 0x7B: "f12", # VK_F12 0x7C: "f13", # VK_F13 0x7D: "f14", # VK_F14 0x7E: "f15", # VK_F15 0x7F: "f16", # VK_F16 0x79: "f17", # VK_F17 0x80: "f18", # VK_F18 0x81: "f19", # VK_F19 0x82: "f20", # VK_F20 } # Console escape codes: https://learn.microsoft.com/en-us/windows/console/console-virtual-terminal-sequences ERASE_IN_LINE = "\x1b[K" MOVE_LEFT = "\x1b[{}D" MOVE_RIGHT = "\x1b[{}C" MOVE_UP = "\x1b[{}A" MOVE_DOWN = "\x1b[{}B" CLEAR = "\x1b[H\x1b[J" class _error(Exception): pass class WindowsConsole(Console): def __init__( self, f_in: IO[bytes] | int = 0, f_out: IO[bytes] | int = 1, term: str = "", encoding: str = "", ): super().__init__(f_in, f_out, term, encoding) SetConsoleMode( OutHandle, ENABLE_WRAP_AT_EOL_OUTPUT | ENABLE_PROCESSED_OUTPUT | ENABLE_VIRTUAL_TERMINAL_PROCESSING, ) self.screen: list[str] = [] self.width = 80 self.height = 25 self.__offset = 0 self.event_queue: deque[Event] = deque() try: self.out = io._WindowsConsoleIO(self.output_fd, "w") # type: ignore[attr-defined] except ValueError: # Console I/O is redirected, fallback... self.out = None def refresh(self, screen: list[str], c_xy: tuple[int, int]) -> None: """ Refresh the console screen. Parameters: - screen (list): List of strings representing the screen contents. - c_xy (tuple): Cursor position (x, y) on the screen. """ cx, cy = c_xy while len(self.screen) < min(len(screen), self.height): self._hide_cursor() self._move_relative(0, len(self.screen) - 1) self.__write("\n") self.__posxy = 0, len(self.screen) self.screen.append("") px, py = self.__posxy old_offset = offset = self.__offset height = self.height # we make sure the cursor is on the screen, and that we're # using all of the screen if we can if cy < offset: offset = cy elif cy >= offset + height: offset = cy - height + 1 scroll_lines = offset - old_offset # Scrolling the buffer as the current input is greater than the visible # portion of the window. We need to scroll the visible portion and the # entire history self._scroll(scroll_lines, self._getscrollbacksize()) self.__posxy = self.__posxy[0], self.__posxy[1] + scroll_lines self.__offset += scroll_lines for i in range(scroll_lines): self.screen.append("") elif offset > 0 and len(screen) < offset + height: offset = max(len(screen) - height, 0) screen.append("") oldscr = self.screen[old_offset : old_offset + height] newscr = screen[offset : offset + height] self.__offset = offset self._hide_cursor() for ( y, oldline, newline, ) in zip(range(offset, offset + height), oldscr, newscr): if oldline != newline: self.__write_changed_line(y, oldline, newline, px) y = len(newscr) while y < len(oldscr): self._move_relative(0, y) self.__posxy = 0, y self._erase_to_end() y += 1 self._show_cursor() self.screen = screen self.move_cursor(cx, cy) def __write_changed_line( self, y: int, oldline: str, newline: str, px_coord: int ) -> None: # this is frustrating; there's no reason to test (say) # self.dch1 inside the loop -- but alternative ways of # structuring this function are equally painful (I'm trying to # avoid writing code generators these days...) minlen = min(wlen(oldline), wlen(newline)) x_pos = 0 x_coord = 0 px_pos = 0 j = 0 for c in oldline: if j >= px_coord: break j += wlen(c) px_pos += 1 # reuse the oldline as much as possible, but stop as soon as we # encounter an ESCAPE, because it might be the start of an escape # sequene while ( x_coord < minlen and oldline[x_pos] == newline[x_pos] and newline[x_pos] != "\x1b" ): x_coord += wlen(newline[x_pos]) x_pos += 1 self._hide_cursor() self._move_relative(x_coord, y) if wlen(oldline) > wlen(newline): self._erase_to_end() self.__write(newline[x_pos:]) if wlen(newline) == self.width: # If we wrapped we want to start at the next line self._move_relative(0, y + 1) self.__posxy = 0, y + 1 else: self.__posxy = wlen(newline), y if "\x1b" in newline or y != self.__posxy[1]: # ANSI escape characters are present, so we can't assume # anything about the position of the cursor. Moving the cursor # to the left margin should work to get to a known position. self.move_cursor(0, y) def _scroll( self, top: int, bottom: int, left: int | None = None, right: int | None = None ) -> None: scroll_rect = SMALL_RECT() scroll_rect.Top = SHORT(top) scroll_rect.Bottom = SHORT(bottom) scroll_rect.Left = SHORT(0 if left is None else left) scroll_rect.Right = SHORT( self.getheightwidth()[1] - 1 if right is None else right ) destination_origin = _COORD() fill_info = CHAR_INFO() fill_info.UnicodeChar = " " if not ScrollConsoleScreenBuffer( OutHandle, scroll_rect, None, destination_origin, fill_info ): raise WinError(GetLastError()) def _hide_cursor(self): self.__write("\x1b[?25l") def _show_cursor(self): self.__write("\x1b[?25h") def _enable_blinking(self): self.__write("\x1b[?12h") def _disable_blinking(self): self.__write("\x1b[?12l") def __write(self, text: str) -> None: if self.out is not None: self.out.write(text.encode(self.encoding, "replace")) self.out.flush() else: os.write(self.output_fd, text.encode(self.encoding, "replace")) @property def screen_xy(self) -> tuple[int, int]: info = CONSOLE_SCREEN_BUFFER_INFO() if not GetConsoleScreenBufferInfo(OutHandle, info): raise WinError(GetLastError()) return info.dwCursorPosition.X, info.dwCursorPosition.Y def _erase_to_end(self) -> None: self.__write(ERASE_IN_LINE) def prepare(self) -> None: trace("prepare") self.screen = [] self.height, self.width = self.getheightwidth() self.__posxy = 0, 0 self.__gone_tall = 0 self.__offset = 0 def restore(self) -> None: pass def _move_relative(self, x: int, y: int) -> None: """Moves relative to the current __posxy""" dx = x - self.__posxy[0] dy = y - self.__posxy[1] if dx < 0: self.__write(MOVE_LEFT.format(-dx)) elif dx > 0: self.__write(MOVE_RIGHT.format(dx)) if dy < 0: self.__write(MOVE_UP.format(-dy)) elif dy > 0: self.__write(MOVE_DOWN.format(dy)) def move_cursor(self, x: int, y: int) -> None: if x < 0 or y < 0: raise ValueError(f"Bad cursor position {x}, {y}") if y < self.__offset or y >= self.__offset + self.height: self.event_queue.insert(0, Event("scroll", "")) else: self._move_relative(x, y) self.__posxy = x, y def set_cursor_vis(self, visible: bool) -> None: if visible: self._show_cursor() else: self._hide_cursor() def getheightwidth(self) -> tuple[int, int]: """Return (height, width) where height and width are the height and width of the terminal window in characters.""" info = CONSOLE_SCREEN_BUFFER_INFO() if not GetConsoleScreenBufferInfo(OutHandle, info): raise WinError(GetLastError()) return ( info.srWindow.Bottom - info.srWindow.Top + 1, info.srWindow.Right - info.srWindow.Left + 1, ) def _getscrollbacksize(self) -> int: info = CONSOLE_SCREEN_BUFFER_INFO() if not GetConsoleScreenBufferInfo(OutHandle, info): raise WinError(GetLastError()) return info.srWindow.Bottom # type: ignore[no-any-return] def _read_input(self) -> INPUT_RECORD | None: rec = INPUT_RECORD() read = DWORD() if not ReadConsoleInput(InHandle, rec, 1, read): raise WinError(GetLastError()) if read.value == 0: return None return rec def get_event(self, block: bool = True) -> Event | None: """Return an Event instance. Returns None if |block| is false and there is no event pending, otherwise waits for the completion of an event.""" if self.event_queue: return self.event_queue.pop() while True: rec = self._read_input() if rec is None: if block: continue return None if rec.EventType == WINDOW_BUFFER_SIZE_EVENT: return Event("resize", "") if rec.EventType != KEY_EVENT or not rec.Event.KeyEvent.bKeyDown: # Only process keys and keydown events if block: continue return None key = rec.Event.KeyEvent.uChar.UnicodeChar if rec.Event.KeyEvent.uChar.UnicodeChar == "\r": # Make enter make unix-like return Event(evt="key", data="\n", raw=b"\n") elif rec.Event.KeyEvent.wVirtualKeyCode == 8: # Turn backspace directly into the command return Event( evt="key", data="backspace", raw=rec.Event.KeyEvent.uChar.UnicodeChar, ) elif rec.Event.KeyEvent.uChar.UnicodeChar == "\x00": # Handle special keys like arrow keys and translate them into the appropriate command code = VK_MAP.get(rec.Event.KeyEvent.wVirtualKeyCode) if code: return Event( evt="key", data=code, raw=rec.Event.KeyEvent.uChar.UnicodeChar ) if block: continue return None return Event(evt="key", data=key, raw=rec.Event.KeyEvent.uChar.UnicodeChar) def push_char(self, char: int | bytes) -> None: """ Push a character to the console event queue. """ raise NotImplementedError("push_char not supported on Windows") def beep(self) -> None: self.__write("\x07") def clear(self) -> None: """Wipe the screen""" self.__write(CLEAR) self.__posxy = 0, 0 self.screen = [""] def finish(self) -> None: """Move the cursor to the end of the display and otherwise get ready for end. XXX could be merged with restore? Hmm.""" y = len(self.screen) - 1 while y >= 0 and not self.screen[y]: y -= 1 self._move_relative(0, min(y, self.height + self.__offset - 1)) self.__write("\r\n") def flushoutput(self) -> None: """Flush all output to the screen (assuming there's some buffering going on somewhere). All output on Windows is unbuffered so this is a nop""" pass def forgetinput(self) -> None: """Forget all pending, but not yet processed input.""" while self._read_input() is not None: pass def getpending(self) -> Event: """Return the characters that have been typed but not yet processed.""" return Event("key", "", b"") def wait(self) -> None: """Wait for an event.""" raise NotImplementedError("No wait support") def repaint(self) -> None: raise NotImplementedError("No repaint support") # Windows interop class CONSOLE_SCREEN_BUFFER_INFO(Structure): _fields_ = [ ("dwSize", _COORD), ("dwCursorPosition", _COORD), ("wAttributes", WORD), ("srWindow", SMALL_RECT), ("dwMaximumWindowSize", _COORD), ] class CONSOLE_CURSOR_INFO(Structure): _fields_ = [ ("dwSize", DWORD), ("bVisible", BOOL), ] class CHAR_INFO(Structure): _fields_ = [ ("UnicodeChar", WCHAR), ("Attributes", WORD), ] class Char(Union): _fields_ = [ ("UnicodeChar", WCHAR), ("Char", CHAR), ] class KeyEvent(ctypes.Structure): _fields_ = [ ("bKeyDown", BOOL), ("wRepeatCount", WORD), ("wVirtualKeyCode", WORD), ("wVirtualScanCode", WORD), ("uChar", Char), ("dwControlKeyState", DWORD), ] class WindowsBufferSizeEvent(ctypes.Structure): _fields_ = [("dwSize", _COORD)] class ConsoleEvent(ctypes.Union): _fields_ = [ ("KeyEvent", KeyEvent), ("WindowsBufferSizeEvent", WindowsBufferSizeEvent), ] class INPUT_RECORD(Structure): _fields_ = [("EventType", WORD), ("Event", ConsoleEvent)] KEY_EVENT = 0x01 FOCUS_EVENT = 0x10 MENU_EVENT = 0x08 MOUSE_EVENT = 0x02 WINDOW_BUFFER_SIZE_EVENT = 0x04 ENABLE_PROCESSED_OUTPUT = 0x01 ENABLE_WRAP_AT_EOL_OUTPUT = 0x02 ENABLE_VIRTUAL_TERMINAL_PROCESSING = 0x04 STD_INPUT_HANDLE = -10 STD_OUTPUT_HANDLE = -11 if sys.platform == "win32": _KERNEL32 = WinDLL("kernel32", use_last_error=True) GetStdHandle = windll.kernel32.GetStdHandle GetStdHandle.argtypes = [DWORD] GetStdHandle.restype = HANDLE GetConsoleScreenBufferInfo = _KERNEL32.GetConsoleScreenBufferInfo GetConsoleScreenBufferInfo.argtypes = [ HANDLE, ctypes.POINTER(CONSOLE_SCREEN_BUFFER_INFO), ] GetConsoleScreenBufferInfo.restype = BOOL ScrollConsoleScreenBuffer = _KERNEL32.ScrollConsoleScreenBufferW ScrollConsoleScreenBuffer.argtypes = [ HANDLE, POINTER(SMALL_RECT), POINTER(SMALL_RECT), _COORD, POINTER(CHAR_INFO), ] ScrollConsoleScreenBuffer.restype = BOOL SetConsoleMode = _KERNEL32.SetConsoleMode SetConsoleMode.argtypes = [HANDLE, DWORD] SetConsoleMode.restype = BOOL ReadConsoleInput = _KERNEL32.ReadConsoleInputW ReadConsoleInput.argtypes = [HANDLE, POINTER(INPUT_RECORD), DWORD, POINTER(DWORD)] ReadConsoleInput.restype = BOOL OutHandle = GetStdHandle(STD_OUTPUT_HANDLE) InHandle = GetStdHandle(STD_INPUT_HANDLE) else: def _win_only(*args, **kwargs): raise NotImplementedError("Windows only") GetStdHandle = _win_only GetConsoleScreenBufferInfo = _win_only ScrollConsoleScreenBuffer = _win_only SetConsoleMode = _win_only ReadConsoleInput = _win_only OutHandle = 0 InHandle = 0