mirror of
https://github.com/python/cpython.git
synced 2025-07-23 19:25:40 +00:00
gh-131591: Allow pdb to attach to a running process (#132451)
Co-authored-by: Pablo Galindo <pablogsal@gmail.com>
This commit is contained in:
parent
3a39e33ee4
commit
797b29b1b5
5 changed files with 1330 additions and 11 deletions
|
@ -99,7 +99,9 @@ PEP 768: Safe external debugger interface for CPython
|
|||
|
||||
:pep:`768` introduces a zero-overhead debugging interface that allows debuggers and profilers
|
||||
to safely attach to running Python processes. This is a significant enhancement to Python's
|
||||
debugging capabilities allowing debuggers to forego unsafe alternatives.
|
||||
debugging capabilities allowing debuggers to forego unsafe alternatives. See
|
||||
:ref:`below <whatsnew314-remote-pdb>` for how this feature is leveraged to
|
||||
implement the new :mod:`pdb` module's remote attaching capabilities.
|
||||
|
||||
The new interface provides safe execution points for attaching debugger code without modifying
|
||||
the interpreter's normal execution path or adding runtime overhead. This enables tools to
|
||||
|
@ -149,6 +151,32 @@ See :pep:`768` for more details.
|
|||
|
||||
(Contributed by Pablo Galindo Salgado, Matt Wozniski, and Ivona Stojanovic in :gh:`131591`.)
|
||||
|
||||
|
||||
.. _whatsnew314-remote-pdb:
|
||||
|
||||
Remote attaching to a running Python process with PDB
|
||||
-----------------------------------------------------
|
||||
|
||||
The :mod:`pdb` module now supports remote attaching to a running Python process
|
||||
using a new ``-p PID`` command-line option:
|
||||
|
||||
.. code-block:: sh
|
||||
|
||||
python -m pdb -p 1234
|
||||
|
||||
This will connect to the Python process with the given PID and allow you to
|
||||
debug it interactively. Notice that due to how the Python interpreter works
|
||||
attaching to a remote process that is blocked in a system call or waiting for
|
||||
I/O will only work once the next bytecode instruction is executed or when the
|
||||
process receives a signal.
|
||||
|
||||
This feature leverages :pep:`768` and the :func:`sys.remote_exec` function
|
||||
to attach to the remote process and send the PDB commands to it.
|
||||
|
||||
|
||||
(Contributed by Matt Wozniski and Pablo Galindo in :gh:`131591`.)
|
||||
|
||||
|
||||
.. _whatsnew314-pep758:
|
||||
|
||||
PEP 758 – Allow except and except* expressions without parentheses
|
||||
|
|
616
Lib/pdb.py
616
Lib/pdb.py
|
@ -74,13 +74,19 @@ import bdb
|
|||
import dis
|
||||
import code
|
||||
import glob
|
||||
import json
|
||||
import token
|
||||
import types
|
||||
import codeop
|
||||
import pprint
|
||||
import signal
|
||||
import socket
|
||||
import typing
|
||||
import asyncio
|
||||
import inspect
|
||||
import weakref
|
||||
import builtins
|
||||
import tempfile
|
||||
import textwrap
|
||||
import tokenize
|
||||
import itertools
|
||||
|
@ -88,6 +94,7 @@ import traceback
|
|||
import linecache
|
||||
import _colorize
|
||||
|
||||
from contextlib import closing
|
||||
from contextlib import contextmanager
|
||||
from rlcompleter import Completer
|
||||
from types import CodeType
|
||||
|
@ -918,7 +925,7 @@ class Pdb(bdb.Bdb, cmd.Cmd):
|
|||
if cmd == 'end':
|
||||
return True # end of cmd list
|
||||
elif cmd == 'EOF':
|
||||
print('')
|
||||
self.message('')
|
||||
return True # end of cmd list
|
||||
cmdlist = self.commands[self.commands_bnum]
|
||||
if cmd == 'silent':
|
||||
|
@ -1458,6 +1465,13 @@ class Pdb(bdb.Bdb, cmd.Cmd):
|
|||
|
||||
complete_ignore = _complete_bpnumber
|
||||
|
||||
def _prompt_for_confirmation(self, prompt, default):
|
||||
try:
|
||||
reply = input(prompt)
|
||||
except EOFError:
|
||||
reply = default
|
||||
return reply.strip().lower()
|
||||
|
||||
def do_clear(self, arg):
|
||||
"""cl(ear) [filename:lineno | bpnumber ...]
|
||||
|
||||
|
@ -1467,11 +1481,10 @@ class Pdb(bdb.Bdb, cmd.Cmd):
|
|||
clear all breaks at that line in that file.
|
||||
"""
|
||||
if not arg:
|
||||
try:
|
||||
reply = input('Clear all breaks? ')
|
||||
except EOFError:
|
||||
reply = 'no'
|
||||
reply = reply.strip().lower()
|
||||
reply = self._prompt_for_confirmation(
|
||||
'Clear all breaks? ',
|
||||
default='no',
|
||||
)
|
||||
if reply in ('y', 'yes'):
|
||||
bplist = [bp for bp in bdb.Breakpoint.bpbynumber if bp]
|
||||
self.clear_all_breaks()
|
||||
|
@ -1775,6 +1788,9 @@ class Pdb(bdb.Bdb, cmd.Cmd):
|
|||
self.error('Jump failed: %s' % e)
|
||||
do_j = do_jump
|
||||
|
||||
def _create_recursive_debugger(self):
|
||||
return Pdb(self.completekey, self.stdin, self.stdout)
|
||||
|
||||
def do_debug(self, arg):
|
||||
"""debug code
|
||||
|
||||
|
@ -1788,7 +1804,7 @@ class Pdb(bdb.Bdb, cmd.Cmd):
|
|||
self.stop_trace()
|
||||
globals = self.curframe.f_globals
|
||||
locals = self.curframe.f_locals
|
||||
p = Pdb(self.completekey, self.stdin, self.stdout)
|
||||
p = self._create_recursive_debugger()
|
||||
p.prompt = "(%s) " % self.prompt.strip()
|
||||
self.message("ENTERING RECURSIVE DEBUGGER")
|
||||
try:
|
||||
|
@ -2485,6 +2501,581 @@ def set_trace(*, header=None, commands=None):
|
|||
pdb.message(header)
|
||||
pdb.set_trace(sys._getframe().f_back, commands=commands)
|
||||
|
||||
# Remote PDB
|
||||
|
||||
class _PdbServer(Pdb):
|
||||
def __init__(self, sockfile, owns_sockfile=True, **kwargs):
|
||||
self._owns_sockfile = owns_sockfile
|
||||
self._interact_state = None
|
||||
self._sockfile = sockfile
|
||||
self._command_name_cache = []
|
||||
self._write_failed = False
|
||||
super().__init__(**kwargs)
|
||||
|
||||
@staticmethod
|
||||
def protocol_version():
|
||||
# By default, assume a client and server are compatible if they run
|
||||
# the same Python major.minor version. We'll try to keep backwards
|
||||
# compatibility between patch versions of a minor version if possible.
|
||||
# If we do need to change the protocol in a patch version, we'll change
|
||||
# `revision` to the patch version where the protocol changed.
|
||||
# We can ignore compatibility for pre-release versions; sys.remote_exec
|
||||
# can't attach to a pre-release version except from that same version.
|
||||
v = sys.version_info
|
||||
revision = 0
|
||||
return int(f"{v.major:02X}{v.minor:02X}{revision:02X}F0", 16)
|
||||
|
||||
def _ensure_valid_message(self, msg):
|
||||
# Ensure the message conforms to our protocol.
|
||||
# If anything needs to be changed here for a patch release of Python,
|
||||
# the 'revision' in protocol_version() should be updated.
|
||||
match msg:
|
||||
case {"message": str(), "type": str()}:
|
||||
# Have the client show a message. The client chooses how to
|
||||
# format the message based on its type. The currently defined
|
||||
# types are "info" and "error". If a message has a type the
|
||||
# client doesn't recognize, it must be treated as "info".
|
||||
pass
|
||||
case {"help": str()}:
|
||||
# Have the client show the help for a given argument.
|
||||
pass
|
||||
case {"prompt": str(), "state": str()}:
|
||||
# Have the client display the given prompt and wait for a reply
|
||||
# from the user. If the client recognizes the state it may
|
||||
# enable mode-specific features like multi-line editing.
|
||||
# If it doesn't recognize the state it must prompt for a single
|
||||
# line only and send it directly to the server. A server won't
|
||||
# progress until it gets a "reply" or "signal" message, but can
|
||||
# process "complete" requests while waiting for the reply.
|
||||
pass
|
||||
case {
|
||||
"completions": list(completions)
|
||||
} if all(isinstance(c, str) for c in completions):
|
||||
# Return valid completions for a client's "complete" request.
|
||||
pass
|
||||
case {
|
||||
"command_list": list(command_list)
|
||||
} if all(isinstance(c, str) for c in command_list):
|
||||
# Report the list of legal PDB commands to the client.
|
||||
# Due to aliases this list is not static, but the client
|
||||
# needs to know it for multi-line editing.
|
||||
pass
|
||||
case _:
|
||||
raise AssertionError(
|
||||
f"PDB message doesn't follow the schema! {msg}"
|
||||
)
|
||||
|
||||
def _send(self, **kwargs):
|
||||
self._ensure_valid_message(kwargs)
|
||||
json_payload = json.dumps(kwargs)
|
||||
try:
|
||||
self._sockfile.write(json_payload.encode() + b"\n")
|
||||
self._sockfile.flush()
|
||||
except OSError:
|
||||
# This means that the client has abruptly disconnected, but we'll
|
||||
# handle that the next time we try to read from the client instead
|
||||
# of trying to handle it from everywhere _send() may be called.
|
||||
# Track this with a flag rather than assuming readline() will ever
|
||||
# return an empty string because the socket may be half-closed.
|
||||
self._write_failed = True
|
||||
|
||||
@typing.override
|
||||
def message(self, msg, end="\n"):
|
||||
self._send(message=str(msg) + end, type="info")
|
||||
|
||||
@typing.override
|
||||
def error(self, msg):
|
||||
self._send(message=str(msg), type="error")
|
||||
|
||||
def _get_input(self, prompt, state) -> str:
|
||||
# Before displaying a (Pdb) prompt, send the list of PDB commands
|
||||
# unless we've already sent an up-to-date list.
|
||||
if state == "pdb" and not self._command_name_cache:
|
||||
self._command_name_cache = self.completenames("", "", 0, 0)
|
||||
self._send(command_list=self._command_name_cache)
|
||||
self._send(prompt=prompt, state=state)
|
||||
return self._read_reply()
|
||||
|
||||
def _read_reply(self):
|
||||
# Loop until we get a 'reply' or 'signal' from the client,
|
||||
# processing out-of-band 'complete' requests as they arrive.
|
||||
while True:
|
||||
if self._write_failed:
|
||||
raise EOFError
|
||||
|
||||
msg = self._sockfile.readline()
|
||||
if not msg:
|
||||
raise EOFError
|
||||
|
||||
try:
|
||||
payload = json.loads(msg)
|
||||
except json.JSONDecodeError:
|
||||
self.error(f"Disconnecting: client sent invalid JSON {msg}")
|
||||
raise EOFError
|
||||
|
||||
match payload:
|
||||
case {"reply": str(reply)}:
|
||||
return reply
|
||||
case {"signal": str(signal)}:
|
||||
if signal == "INT":
|
||||
raise KeyboardInterrupt
|
||||
elif signal == "EOF":
|
||||
raise EOFError
|
||||
else:
|
||||
self.error(
|
||||
f"Received unrecognized signal: {signal}"
|
||||
)
|
||||
# Our best hope of recovering is to pretend we
|
||||
# got an EOF to exit whatever mode we're in.
|
||||
raise EOFError
|
||||
case {
|
||||
"complete": {
|
||||
"text": str(text),
|
||||
"line": str(line),
|
||||
"begidx": int(begidx),
|
||||
"endidx": int(endidx),
|
||||
}
|
||||
}:
|
||||
items = self._complete_any(text, line, begidx, endidx)
|
||||
self._send(completions=items)
|
||||
continue
|
||||
# Valid JSON, but doesn't meet the schema.
|
||||
self.error(f"Ignoring invalid message from client: {msg}")
|
||||
|
||||
def _complete_any(self, text, line, begidx, endidx):
|
||||
if begidx == 0:
|
||||
return self.completenames(text, line, begidx, endidx)
|
||||
|
||||
cmd = self.parseline(line)[0]
|
||||
if cmd:
|
||||
compfunc = getattr(self, "complete_" + cmd, self.completedefault)
|
||||
else:
|
||||
compfunc = self.completedefault
|
||||
return compfunc(text, line, begidx, endidx)
|
||||
|
||||
def cmdloop(self, intro=None):
|
||||
self.preloop()
|
||||
if intro is not None:
|
||||
self.intro = intro
|
||||
if self.intro:
|
||||
self.message(str(self.intro))
|
||||
stop = None
|
||||
while not stop:
|
||||
if self._interact_state is not None:
|
||||
try:
|
||||
reply = self._get_input(prompt=">>> ", state="interact")
|
||||
except KeyboardInterrupt:
|
||||
# Match how KeyboardInterrupt is handled in a REPL
|
||||
self.message("\nKeyboardInterrupt")
|
||||
except EOFError:
|
||||
self.message("\n*exit from pdb interact command*")
|
||||
self._interact_state = None
|
||||
else:
|
||||
self._run_in_python_repl(reply)
|
||||
continue
|
||||
|
||||
if not self.cmdqueue:
|
||||
try:
|
||||
state = "commands" if self.commands_defining else "pdb"
|
||||
reply = self._get_input(prompt=self.prompt, state=state)
|
||||
except EOFError:
|
||||
reply = "EOF"
|
||||
|
||||
self.cmdqueue.append(reply)
|
||||
|
||||
line = self.cmdqueue.pop(0)
|
||||
line = self.precmd(line)
|
||||
stop = self.onecmd(line)
|
||||
stop = self.postcmd(stop, line)
|
||||
self.postloop()
|
||||
|
||||
def postloop(self):
|
||||
super().postloop()
|
||||
if self.quitting:
|
||||
self.detach()
|
||||
|
||||
def detach(self):
|
||||
# Detach the debugger and close the socket without raising BdbQuit
|
||||
self.quitting = False
|
||||
if self._owns_sockfile:
|
||||
# Don't try to reuse this instance, it's not valid anymore.
|
||||
Pdb._last_pdb_instance = None
|
||||
try:
|
||||
self._sockfile.close()
|
||||
except OSError:
|
||||
# close() can fail if the connection was broken unexpectedly.
|
||||
pass
|
||||
|
||||
def do_debug(self, arg):
|
||||
# Clear our cached list of valid commands; the recursive debugger might
|
||||
# send its own differing list, and so ours needs to be re-sent.
|
||||
self._command_name_cache = []
|
||||
return super().do_debug(arg)
|
||||
|
||||
def do_alias(self, arg):
|
||||
# Clear our cached list of valid commands; one might be added.
|
||||
self._command_name_cache = []
|
||||
return super().do_alias(arg)
|
||||
|
||||
def do_unalias(self, arg):
|
||||
# Clear our cached list of valid commands; one might be removed.
|
||||
self._command_name_cache = []
|
||||
return super().do_unalias(arg)
|
||||
|
||||
def do_help(self, arg):
|
||||
# Tell the client to render the help, since it might need a pager.
|
||||
self._send(help=arg)
|
||||
|
||||
do_h = do_help
|
||||
|
||||
def _interact_displayhook(self, obj):
|
||||
# Like the default `sys.displayhook` except sending a socket message.
|
||||
if obj is not None:
|
||||
self.message(repr(obj))
|
||||
builtins._ = obj
|
||||
|
||||
def _run_in_python_repl(self, lines):
|
||||
# Run one 'interact' mode code block against an existing namespace.
|
||||
assert self._interact_state
|
||||
save_displayhook = sys.displayhook
|
||||
try:
|
||||
sys.displayhook = self._interact_displayhook
|
||||
code_obj = self._interact_state["compiler"](lines + "\n")
|
||||
if code_obj is None:
|
||||
raise SyntaxError("Incomplete command")
|
||||
exec(code_obj, self._interact_state["ns"])
|
||||
except:
|
||||
self._error_exc()
|
||||
finally:
|
||||
sys.displayhook = save_displayhook
|
||||
|
||||
def do_interact(self, arg):
|
||||
# Prepare to run 'interact' mode code blocks, and trigger the client
|
||||
# to start treating all input as Python commands, not PDB ones.
|
||||
self.message("*pdb interact start*")
|
||||
self._interact_state = dict(
|
||||
compiler=codeop.CommandCompiler(),
|
||||
ns={**self.curframe.f_globals, **self.curframe.f_locals},
|
||||
)
|
||||
|
||||
@typing.override
|
||||
def _create_recursive_debugger(self):
|
||||
return _PdbServer(self._sockfile, owns_sockfile=False)
|
||||
|
||||
@typing.override
|
||||
def _prompt_for_confirmation(self, prompt, default):
|
||||
try:
|
||||
return self._get_input(prompt=prompt, state="confirm")
|
||||
except (EOFError, KeyboardInterrupt):
|
||||
return default
|
||||
|
||||
def do_run(self, arg):
|
||||
self.error("remote PDB cannot restart the program")
|
||||
|
||||
do_restart = do_run
|
||||
|
||||
def _error_exc(self):
|
||||
if self._interact_state and isinstance(sys.exception(), SystemExit):
|
||||
# If we get a SystemExit in 'interact' mode, exit the REPL.
|
||||
self._interact_state = None
|
||||
ret = super()._error_exc()
|
||||
self.message("*exit from pdb interact command*")
|
||||
return ret
|
||||
else:
|
||||
return super()._error_exc()
|
||||
|
||||
def default(self, line):
|
||||
# Unlike Pdb, don't prompt for more lines of a multi-line command.
|
||||
# The remote needs to send us the whole block in one go.
|
||||
try:
|
||||
candidate = line.removeprefix("!") + "\n"
|
||||
if codeop.compile_command(candidate, "<stdin>", "single") is None:
|
||||
raise SyntaxError("Incomplete command")
|
||||
return super().default(candidate)
|
||||
except:
|
||||
self._error_exc()
|
||||
|
||||
|
||||
class _PdbClient:
|
||||
def __init__(self, pid, sockfile, interrupt_script):
|
||||
self.pid = pid
|
||||
self.sockfile = sockfile
|
||||
self.interrupt_script = interrupt_script
|
||||
self.pdb_instance = Pdb()
|
||||
self.pdb_commands = set()
|
||||
self.completion_matches = []
|
||||
self.state = "dumb"
|
||||
self.write_failed = False
|
||||
|
||||
def _ensure_valid_message(self, msg):
|
||||
# Ensure the message conforms to our protocol.
|
||||
# If anything needs to be changed here for a patch release of Python,
|
||||
# the 'revision' in protocol_version() should be updated.
|
||||
match msg:
|
||||
case {"reply": str()}:
|
||||
# Send input typed by a user at a prompt to the remote PDB.
|
||||
pass
|
||||
case {"signal": "EOF"}:
|
||||
# Tell the remote PDB that the user pressed ^D at a prompt.
|
||||
pass
|
||||
case {"signal": "INT"}:
|
||||
# Tell the remote PDB that the user pressed ^C at a prompt.
|
||||
pass
|
||||
case {
|
||||
"complete": {
|
||||
"text": str(),
|
||||
"line": str(),
|
||||
"begidx": int(),
|
||||
"endidx": int(),
|
||||
}
|
||||
}:
|
||||
# Ask the remote PDB what completions are valid for the given
|
||||
# parameters, using readline's completion protocol.
|
||||
pass
|
||||
case _:
|
||||
raise AssertionError(
|
||||
f"PDB message doesn't follow the schema! {msg}"
|
||||
)
|
||||
|
||||
def _send(self, **kwargs):
|
||||
self._ensure_valid_message(kwargs)
|
||||
json_payload = json.dumps(kwargs)
|
||||
try:
|
||||
self.sockfile.write(json_payload.encode() + b"\n")
|
||||
self.sockfile.flush()
|
||||
except OSError:
|
||||
# This means that the client has abruptly disconnected, but we'll
|
||||
# handle that the next time we try to read from the client instead
|
||||
# of trying to handle it from everywhere _send() may be called.
|
||||
# Track this with a flag rather than assuming readline() will ever
|
||||
# return an empty string because the socket may be half-closed.
|
||||
self.write_failed = True
|
||||
|
||||
def read_command(self, prompt):
|
||||
reply = input(prompt)
|
||||
|
||||
if self.state == "dumb":
|
||||
# No logic applied whatsoever, just pass the raw reply back.
|
||||
return reply
|
||||
|
||||
prefix = ""
|
||||
if self.state == "pdb":
|
||||
# PDB command entry mode
|
||||
cmd = self.pdb_instance.parseline(reply)[0]
|
||||
if cmd in self.pdb_commands or reply.strip() == "":
|
||||
# Recognized PDB command, or blank line repeating last command
|
||||
return reply
|
||||
|
||||
# Otherwise, explicit or implicit exec command
|
||||
if reply.startswith("!"):
|
||||
prefix = "!"
|
||||
reply = reply.removeprefix(prefix).lstrip()
|
||||
|
||||
if codeop.compile_command(reply + "\n", "<stdin>", "single") is not None:
|
||||
# Valid single-line statement
|
||||
return prefix + reply
|
||||
|
||||
# Otherwise, valid first line of a multi-line statement
|
||||
continue_prompt = "...".ljust(len(prompt))
|
||||
while codeop.compile_command(reply, "<stdin>", "single") is None:
|
||||
reply += "\n" + input(continue_prompt)
|
||||
|
||||
return prefix + reply
|
||||
|
||||
@contextmanager
|
||||
def readline_completion(self, completer):
|
||||
try:
|
||||
import readline
|
||||
except ImportError:
|
||||
yield
|
||||
return
|
||||
|
||||
old_completer = readline.get_completer()
|
||||
try:
|
||||
readline.set_completer(completer)
|
||||
if readline.backend == "editline":
|
||||
# libedit uses "^I" instead of "tab"
|
||||
command_string = "bind ^I rl_complete"
|
||||
else:
|
||||
command_string = "tab: complete"
|
||||
readline.parse_and_bind(command_string)
|
||||
yield
|
||||
finally:
|
||||
readline.set_completer(old_completer)
|
||||
|
||||
def cmdloop(self):
|
||||
with self.readline_completion(self.complete):
|
||||
while not self.write_failed:
|
||||
try:
|
||||
if not (payload_bytes := self.sockfile.readline()):
|
||||
break
|
||||
except KeyboardInterrupt:
|
||||
self.send_interrupt()
|
||||
continue
|
||||
|
||||
try:
|
||||
payload = json.loads(payload_bytes)
|
||||
except json.JSONDecodeError:
|
||||
print(
|
||||
f"*** Invalid JSON from remote: {payload_bytes}",
|
||||
flush=True,
|
||||
)
|
||||
continue
|
||||
|
||||
self.process_payload(payload)
|
||||
|
||||
def send_interrupt(self):
|
||||
print(
|
||||
"\n*** Program will stop at the next bytecode instruction."
|
||||
" (Use 'cont' to resume)."
|
||||
)
|
||||
sys.remote_exec(self.pid, self.interrupt_script)
|
||||
|
||||
def process_payload(self, payload):
|
||||
match payload:
|
||||
case {
|
||||
"command_list": command_list
|
||||
} if all(isinstance(c, str) for c in command_list):
|
||||
self.pdb_commands = set(command_list)
|
||||
case {"message": str(msg), "type": str(msg_type)}:
|
||||
if msg_type == "error":
|
||||
print("***", msg, flush=True)
|
||||
else:
|
||||
print(msg, end="", flush=True)
|
||||
case {"help": str(arg)}:
|
||||
self.pdb_instance.do_help(arg)
|
||||
case {"prompt": str(prompt), "state": str(state)}:
|
||||
if state not in ("pdb", "interact"):
|
||||
state = "dumb"
|
||||
self.state = state
|
||||
self.prompt_for_reply(prompt)
|
||||
case _:
|
||||
raise RuntimeError(f"Unrecognized payload {payload}")
|
||||
|
||||
def prompt_for_reply(self, prompt):
|
||||
while True:
|
||||
try:
|
||||
payload = {"reply": self.read_command(prompt)}
|
||||
except EOFError:
|
||||
payload = {"signal": "EOF"}
|
||||
except KeyboardInterrupt:
|
||||
payload = {"signal": "INT"}
|
||||
except Exception as exc:
|
||||
msg = traceback.format_exception_only(exc)[-1].strip()
|
||||
print("***", msg, flush=True)
|
||||
continue
|
||||
|
||||
self._send(**payload)
|
||||
return
|
||||
|
||||
def complete(self, text, state):
|
||||
import readline
|
||||
|
||||
if state == 0:
|
||||
self.completion_matches = []
|
||||
if self.state not in ("pdb", "interact"):
|
||||
return None
|
||||
|
||||
origline = readline.get_line_buffer()
|
||||
line = origline.lstrip()
|
||||
stripped = len(origline) - len(line)
|
||||
begidx = readline.get_begidx() - stripped
|
||||
endidx = readline.get_endidx() - stripped
|
||||
|
||||
msg = {
|
||||
"complete": {
|
||||
"text": text,
|
||||
"line": line,
|
||||
"begidx": begidx,
|
||||
"endidx": endidx,
|
||||
}
|
||||
}
|
||||
|
||||
self._send(**msg)
|
||||
if self.write_failed:
|
||||
return None
|
||||
|
||||
payload = self.sockfile.readline()
|
||||
if not payload:
|
||||
return None
|
||||
|
||||
payload = json.loads(payload)
|
||||
if "completions" not in payload:
|
||||
raise RuntimeError(
|
||||
f"Failed to get valid completions. Got: {payload}"
|
||||
)
|
||||
|
||||
self.completion_matches = payload["completions"]
|
||||
try:
|
||||
return self.completion_matches[state]
|
||||
except IndexError:
|
||||
return None
|
||||
|
||||
|
||||
def _connect(host, port, frame, commands, version):
|
||||
with closing(socket.create_connection((host, port))) as conn:
|
||||
sockfile = conn.makefile("rwb")
|
||||
|
||||
remote_pdb = _PdbServer(sockfile)
|
||||
weakref.finalize(remote_pdb, sockfile.close)
|
||||
|
||||
if Pdb._last_pdb_instance is not None:
|
||||
remote_pdb.error("Another PDB instance is already attached.")
|
||||
elif version != remote_pdb.protocol_version():
|
||||
target_ver = f"0x{remote_pdb.protocol_version():08X}"
|
||||
attach_ver = f"0x{version:08X}"
|
||||
remote_pdb.error(
|
||||
f"The target process is running a Python version that is"
|
||||
f" incompatible with this PDB module."
|
||||
f"\nTarget process pdb protocol version: {target_ver}"
|
||||
f"\nLocal pdb module's protocol version: {attach_ver}"
|
||||
)
|
||||
else:
|
||||
remote_pdb.rcLines.extend(commands.splitlines())
|
||||
remote_pdb.set_trace(frame=frame)
|
||||
|
||||
|
||||
def attach(pid, commands=()):
|
||||
"""Attach to a running process with the given PID."""
|
||||
with closing(socket.create_server(("localhost", 0))) as server:
|
||||
port = server.getsockname()[1]
|
||||
|
||||
with tempfile.NamedTemporaryFile("w", delete_on_close=False) as connect_script:
|
||||
connect_script.write(
|
||||
textwrap.dedent(
|
||||
f"""
|
||||
import pdb, sys
|
||||
pdb._connect(
|
||||
host="localhost",
|
||||
port={port},
|
||||
frame=sys._getframe(1),
|
||||
commands={json.dumps("\n".join(commands))},
|
||||
version={_PdbServer.protocol_version()},
|
||||
)
|
||||
"""
|
||||
)
|
||||
)
|
||||
connect_script.close()
|
||||
sys.remote_exec(pid, connect_script.name)
|
||||
|
||||
# TODO Add a timeout? Or don't bother since the user can ^C?
|
||||
client_sock, _ = server.accept()
|
||||
|
||||
with closing(client_sock):
|
||||
sockfile = client_sock.makefile("rwb")
|
||||
|
||||
with closing(sockfile):
|
||||
with tempfile.NamedTemporaryFile("w", delete_on_close=False) as interrupt_script:
|
||||
interrupt_script.write(
|
||||
'import pdb, sys\n'
|
||||
'if inst := pdb.Pdb._last_pdb_instance:\n'
|
||||
' inst.set_trace(sys._getframe(1))\n'
|
||||
)
|
||||
interrupt_script.close()
|
||||
|
||||
_PdbClient(pid, sockfile, interrupt_script.name).cmdloop()
|
||||
|
||||
|
||||
# Post-Mortem interface
|
||||
|
||||
def post_mortem(t=None):
|
||||
|
@ -2554,7 +3145,7 @@ To let the script run up to a given line X in the debugged file, use
|
|||
def main():
|
||||
import argparse
|
||||
|
||||
parser = argparse.ArgumentParser(usage="%(prog)s [-h] [-c command] (-m module | pyfile) [args ...]",
|
||||
parser = argparse.ArgumentParser(usage="%(prog)s [-h] [-c command] (-m module | -p pid | pyfile) [args ...]",
|
||||
description=_usage,
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||
allow_abbrev=False)
|
||||
|
@ -2565,6 +3156,7 @@ def main():
|
|||
parser.add_argument('-c', '--command', action='append', default=[], metavar='command', dest='commands',
|
||||
help='pdb commands to execute as if given in a .pdbrc file')
|
||||
parser.add_argument('-m', metavar='module', dest='module')
|
||||
parser.add_argument('-p', '--pid', type=int, help="attach to the specified PID", default=None)
|
||||
|
||||
if len(sys.argv) == 1:
|
||||
# If no arguments were given (python -m pdb), print the whole help message.
|
||||
|
@ -2574,7 +3166,15 @@ def main():
|
|||
|
||||
opts, args = parser.parse_known_args()
|
||||
|
||||
if opts.pid:
|
||||
# If attaching to a remote pid, unrecognized arguments are not allowed.
|
||||
# This will raise an error if there are extra unrecognized arguments.
|
||||
opts = parser.parse_args()
|
||||
if opts.module:
|
||||
parser.error("argument -m: not allowed with argument --pid")
|
||||
attach(opts.pid, opts.commands)
|
||||
return
|
||||
elif opts.module:
|
||||
# If a module is being debugged, we consider the arguments after "-m module" to
|
||||
# be potential arguments to the module itself. We need to parse the arguments
|
||||
# before "-m" to check if there is any invalid argument.
|
||||
|
|
|
@ -253,7 +253,8 @@ class PyclbrTest(TestCase):
|
|||
cm(
|
||||
'pdb',
|
||||
# pyclbr does not handle elegantly `typing` or properties
|
||||
ignore=('Union', '_ModuleTarget', '_ScriptTarget', '_ZipTarget', 'curframe_locals'),
|
||||
ignore=('Union', '_ModuleTarget', '_ScriptTarget', '_ZipTarget', 'curframe_locals',
|
||||
'_InteractState'),
|
||||
)
|
||||
cm('pydoc', ignore=('input', 'output',)) # properties
|
||||
|
||||
|
|
687
Lib/test/test_remote_pdb.py
Normal file
687
Lib/test/test_remote_pdb.py
Normal file
|
@ -0,0 +1,687 @@
|
|||
import io
|
||||
import json
|
||||
import os
|
||||
import signal
|
||||
import socket
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
import textwrap
|
||||
import threading
|
||||
import unittest
|
||||
import unittest.mock
|
||||
from contextlib import contextmanager
|
||||
from pathlib import Path
|
||||
from test.support import is_wasi, os_helper
|
||||
from test.support.os_helper import temp_dir, TESTFN, unlink
|
||||
from typing import Dict, List, Optional, Tuple, Union, Any
|
||||
|
||||
import pdb
|
||||
from pdb import _PdbServer, _PdbClient
|
||||
|
||||
|
||||
class MockSocketFile:
|
||||
"""Mock socket file for testing _PdbServer without actual socket connections."""
|
||||
|
||||
def __init__(self):
|
||||
self.input_queue = []
|
||||
self.output_buffer = []
|
||||
|
||||
def write(self, data: bytes) -> None:
|
||||
"""Simulate write to socket."""
|
||||
self.output_buffer.append(data)
|
||||
|
||||
def flush(self) -> None:
|
||||
"""No-op flush implementation."""
|
||||
pass
|
||||
|
||||
def readline(self) -> bytes:
|
||||
"""Read a line from the prepared input queue."""
|
||||
if not self.input_queue:
|
||||
return b""
|
||||
return self.input_queue.pop(0)
|
||||
|
||||
def close(self) -> None:
|
||||
"""Close the mock socket file."""
|
||||
pass
|
||||
|
||||
def add_input(self, data: dict) -> None:
|
||||
"""Add input that will be returned by readline."""
|
||||
self.input_queue.append(json.dumps(data).encode() + b"\n")
|
||||
|
||||
def get_output(self) -> List[dict]:
|
||||
"""Get the output that was written by the object being tested."""
|
||||
results = []
|
||||
for data in self.output_buffer:
|
||||
if isinstance(data, bytes) and data.endswith(b"\n"):
|
||||
try:
|
||||
results.append(json.loads(data.decode().strip()))
|
||||
except json.JSONDecodeError:
|
||||
pass # Ignore non-JSON output
|
||||
self.output_buffer = []
|
||||
return results
|
||||
|
||||
|
||||
class RemotePdbTestCase(unittest.TestCase):
|
||||
"""Tests for the _PdbServer class."""
|
||||
|
||||
def setUp(self):
|
||||
self.sockfile = MockSocketFile()
|
||||
self.pdb = _PdbServer(self.sockfile)
|
||||
|
||||
# Mock some Bdb attributes that are lazily created when tracing starts
|
||||
self.pdb.botframe = None
|
||||
self.pdb.quitting = False
|
||||
|
||||
# Create a frame for testing
|
||||
self.test_globals = {'a': 1, 'b': 2, '__pdb_convenience_variables': {'x': 100}}
|
||||
self.test_locals = {'c': 3, 'd': 4}
|
||||
|
||||
# Create a simple test frame
|
||||
frame_info = unittest.mock.Mock()
|
||||
frame_info.f_globals = self.test_globals
|
||||
frame_info.f_locals = self.test_locals
|
||||
frame_info.f_lineno = 42
|
||||
frame_info.f_code = unittest.mock.Mock()
|
||||
frame_info.f_code.co_filename = "test_file.py"
|
||||
frame_info.f_code.co_name = "test_function"
|
||||
|
||||
self.pdb.curframe = frame_info
|
||||
|
||||
def test_message_and_error(self):
|
||||
"""Test message and error methods send correct JSON."""
|
||||
self.pdb.message("Test message")
|
||||
self.pdb.error("Test error")
|
||||
|
||||
outputs = self.sockfile.get_output()
|
||||
self.assertEqual(len(outputs), 2)
|
||||
self.assertEqual(outputs[0], {"message": "Test message\n", "type": "info"})
|
||||
self.assertEqual(outputs[1], {"message": "Test error", "type": "error"})
|
||||
|
||||
def test_read_command(self):
|
||||
"""Test reading commands from the socket."""
|
||||
# Add test input
|
||||
self.sockfile.add_input({"reply": "help"})
|
||||
|
||||
# Read the command
|
||||
cmd = self.pdb._read_reply()
|
||||
self.assertEqual(cmd, "help")
|
||||
|
||||
def test_read_command_EOF(self):
|
||||
"""Test reading EOF command."""
|
||||
# Simulate socket closure
|
||||
self.pdb._write_failed = True
|
||||
with self.assertRaises(EOFError):
|
||||
self.pdb._read_reply()
|
||||
|
||||
def test_completion(self):
|
||||
"""Test handling completion requests."""
|
||||
# Mock completenames to return specific values
|
||||
with unittest.mock.patch.object(self.pdb, 'completenames',
|
||||
return_value=["continue", "clear"]):
|
||||
|
||||
# Add a completion request
|
||||
self.sockfile.add_input({
|
||||
"complete": {
|
||||
"text": "c",
|
||||
"line": "c",
|
||||
"begidx": 0,
|
||||
"endidx": 1
|
||||
}
|
||||
})
|
||||
|
||||
# Add a regular command to break the loop
|
||||
self.sockfile.add_input({"reply": "help"})
|
||||
|
||||
# Read command - this should process the completion request first
|
||||
cmd = self.pdb._read_reply()
|
||||
|
||||
# Verify completion response was sent
|
||||
outputs = self.sockfile.get_output()
|
||||
self.assertEqual(len(outputs), 1)
|
||||
self.assertEqual(outputs[0], {"completions": ["continue", "clear"]})
|
||||
|
||||
# The actual command should be returned
|
||||
self.assertEqual(cmd, "help")
|
||||
|
||||
def test_do_help(self):
|
||||
"""Test that do_help sends the help message."""
|
||||
self.pdb.do_help("break")
|
||||
|
||||
outputs = self.sockfile.get_output()
|
||||
self.assertEqual(len(outputs), 1)
|
||||
self.assertEqual(outputs[0], {"help": "break"})
|
||||
|
||||
def test_interact_mode(self):
|
||||
"""Test interaction mode setup and execution."""
|
||||
# First set up interact mode
|
||||
self.pdb.do_interact("")
|
||||
|
||||
# Verify _interact_state is properly initialized
|
||||
self.assertIsNotNone(self.pdb._interact_state)
|
||||
self.assertIsInstance(self.pdb._interact_state, dict)
|
||||
|
||||
# Test running code in interact mode
|
||||
with unittest.mock.patch.object(self.pdb, '_error_exc') as mock_error:
|
||||
self.pdb._run_in_python_repl("print('test')")
|
||||
mock_error.assert_not_called()
|
||||
|
||||
# Test with syntax error
|
||||
self.pdb._run_in_python_repl("if:")
|
||||
mock_error.assert_called_once()
|
||||
|
||||
def test_registering_commands(self):
|
||||
"""Test registering breakpoint commands."""
|
||||
# Mock get_bpbynumber
|
||||
with unittest.mock.patch.object(self.pdb, 'get_bpbynumber'):
|
||||
# Queue up some input to send
|
||||
self.sockfile.add_input({"reply": "commands 1"})
|
||||
self.sockfile.add_input({"reply": "silent"})
|
||||
self.sockfile.add_input({"reply": "print('hi')"})
|
||||
self.sockfile.add_input({"reply": "end"})
|
||||
self.sockfile.add_input({"signal": "EOF"})
|
||||
|
||||
# Run the PDB command loop
|
||||
self.pdb.cmdloop()
|
||||
|
||||
outputs = self.sockfile.get_output()
|
||||
self.assertIn('command_list', outputs[0])
|
||||
self.assertEqual(outputs[1], {"prompt": "(Pdb) ", "state": "pdb"})
|
||||
self.assertEqual(outputs[2], {"prompt": "(com) ", "state": "commands"})
|
||||
self.assertEqual(outputs[3], {"prompt": "(com) ", "state": "commands"})
|
||||
self.assertEqual(outputs[4], {"prompt": "(com) ", "state": "commands"})
|
||||
self.assertEqual(outputs[5], {"prompt": "(Pdb) ", "state": "pdb"})
|
||||
self.assertEqual(outputs[6], {"message": "\n", "type": "info"})
|
||||
self.assertEqual(len(outputs), 7)
|
||||
|
||||
self.assertEqual(
|
||||
self.pdb.commands[1],
|
||||
["_pdbcmd_silence_frame_status", "print('hi')"],
|
||||
)
|
||||
|
||||
def test_detach(self):
|
||||
"""Test the detach method."""
|
||||
with unittest.mock.patch.object(self.sockfile, 'close') as mock_close:
|
||||
self.pdb.detach()
|
||||
mock_close.assert_called_once()
|
||||
self.assertFalse(self.pdb.quitting)
|
||||
|
||||
def test_cmdloop(self):
|
||||
"""Test the command loop with various commands."""
|
||||
# Mock onecmd to track command execution
|
||||
with unittest.mock.patch.object(self.pdb, 'onecmd', return_value=False) as mock_onecmd:
|
||||
# Add commands to the queue
|
||||
self.pdb.cmdqueue = ['help', 'list']
|
||||
|
||||
# Add a command from the socket for when cmdqueue is empty
|
||||
self.sockfile.add_input({"reply": "next"})
|
||||
|
||||
# Add a second command to break the loop
|
||||
self.sockfile.add_input({"reply": "quit"})
|
||||
|
||||
# Configure onecmd to exit the loop on "quit"
|
||||
def side_effect(line):
|
||||
return line == 'quit'
|
||||
mock_onecmd.side_effect = side_effect
|
||||
|
||||
# Run the command loop
|
||||
self.pdb.quitting = False # Set this by hand because we don't want to really call set_trace()
|
||||
self.pdb.cmdloop()
|
||||
|
||||
# Should have processed 4 commands: 2 from cmdqueue, 2 from socket
|
||||
self.assertEqual(mock_onecmd.call_count, 4)
|
||||
mock_onecmd.assert_any_call('help')
|
||||
mock_onecmd.assert_any_call('list')
|
||||
mock_onecmd.assert_any_call('next')
|
||||
mock_onecmd.assert_any_call('quit')
|
||||
|
||||
# Check if prompt was sent to client
|
||||
outputs = self.sockfile.get_output()
|
||||
prompts = [o for o in outputs if 'prompt' in o]
|
||||
self.assertEqual(len(prompts), 2) # Should have sent 2 prompts
|
||||
|
||||
|
||||
@unittest.skipIf(is_wasi, "WASI does not support TCP sockets")
|
||||
class PdbConnectTestCase(unittest.TestCase):
|
||||
"""Tests for the _connect mechanism using direct socket communication."""
|
||||
|
||||
def setUp(self):
|
||||
# Create a server socket that will wait for the debugger to connect
|
||||
self.server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self.server_sock.bind(('127.0.0.1', 0)) # Let OS assign port
|
||||
self.server_sock.listen(1)
|
||||
self.port = self.server_sock.getsockname()[1]
|
||||
|
||||
def _create_script(self, script=None):
|
||||
# Create a file for subprocess script
|
||||
if script is None:
|
||||
script = textwrap.dedent(
|
||||
f"""
|
||||
import pdb
|
||||
import sys
|
||||
import time
|
||||
|
||||
def foo():
|
||||
x = 42
|
||||
return bar()
|
||||
|
||||
def bar():
|
||||
return 42
|
||||
|
||||
def connect_to_debugger():
|
||||
# Create a frame to debug
|
||||
def dummy_function():
|
||||
x = 42
|
||||
# Call connect to establish connection
|
||||
# with the test server
|
||||
frame = sys._getframe() # Get the current frame
|
||||
pdb._connect(
|
||||
host='127.0.0.1',
|
||||
port={self.port},
|
||||
frame=frame,
|
||||
commands="",
|
||||
version=pdb._PdbServer.protocol_version(),
|
||||
)
|
||||
return x # This line won't be reached in debugging
|
||||
|
||||
return dummy_function()
|
||||
|
||||
result = connect_to_debugger()
|
||||
foo()
|
||||
print(f"Function returned: {{result}}")
|
||||
""")
|
||||
|
||||
self.script_path = TESTFN + "_connect_test.py"
|
||||
with open(self.script_path, 'w') as f:
|
||||
f.write(script)
|
||||
|
||||
def tearDown(self):
|
||||
self.server_sock.close()
|
||||
try:
|
||||
unlink(self.script_path)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
def _connect_and_get_client_file(self):
|
||||
"""Helper to start subprocess and get connected client file."""
|
||||
# Start the subprocess that will connect to our socket
|
||||
process = subprocess.Popen(
|
||||
[sys.executable, self.script_path],
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
text=True
|
||||
)
|
||||
|
||||
# Accept the connection from the subprocess
|
||||
client_sock, _ = self.server_sock.accept()
|
||||
client_file = client_sock.makefile('rwb')
|
||||
self.addCleanup(client_file.close)
|
||||
self.addCleanup(client_sock.close)
|
||||
|
||||
return process, client_file
|
||||
|
||||
def _read_until_prompt(self, client_file):
|
||||
"""Helper to read messages until a prompt is received."""
|
||||
messages = []
|
||||
while True:
|
||||
data = client_file.readline()
|
||||
if not data:
|
||||
break
|
||||
msg = json.loads(data.decode())
|
||||
messages.append(msg)
|
||||
if 'prompt' in msg:
|
||||
break
|
||||
return messages
|
||||
|
||||
def _send_command(self, client_file, command):
|
||||
"""Helper to send a command to the debugger."""
|
||||
client_file.write(json.dumps({"reply": command}).encode() + b"\n")
|
||||
client_file.flush()
|
||||
|
||||
def _send_interrupt(self, pid):
|
||||
"""Helper to send an interrupt signal to the debugger."""
|
||||
# with tempfile.NamedTemporaryFile("w", delete_on_close=False) as interrupt_script:
|
||||
interrupt_script = TESTFN + "_interrupt_script.py"
|
||||
with open(interrupt_script, 'w') as f:
|
||||
f.write(
|
||||
'import pdb, sys\n'
|
||||
'print("Hello, world!")\n'
|
||||
'if inst := pdb.Pdb._last_pdb_instance:\n'
|
||||
' inst.set_trace(sys._getframe(1))\n'
|
||||
)
|
||||
self.addCleanup(unlink, interrupt_script)
|
||||
try:
|
||||
sys.remote_exec(pid, interrupt_script)
|
||||
except PermissionError:
|
||||
self.skipTest("Insufficient permissions to execute code in remote process")
|
||||
|
||||
def test_connect_and_basic_commands(self):
|
||||
"""Test connecting to a remote debugger and sending basic commands."""
|
||||
self._create_script()
|
||||
process, client_file = self._connect_and_get_client_file()
|
||||
|
||||
with process:
|
||||
# We should receive initial data from the debugger
|
||||
data = client_file.readline()
|
||||
initial_data = json.loads(data.decode())
|
||||
self.assertIn('message', initial_data)
|
||||
self.assertIn('pdb._connect', initial_data['message'])
|
||||
|
||||
# First, look for command_list message
|
||||
data = client_file.readline()
|
||||
command_list = json.loads(data.decode())
|
||||
self.assertIn('command_list', command_list)
|
||||
|
||||
# Then, look for the first prompt
|
||||
data = client_file.readline()
|
||||
prompt_data = json.loads(data.decode())
|
||||
self.assertIn('prompt', prompt_data)
|
||||
self.assertEqual(prompt_data['state'], 'pdb')
|
||||
|
||||
# Send 'bt' (backtrace) command
|
||||
self._send_command(client_file, "bt")
|
||||
|
||||
# Check for response - we should get some stack frames
|
||||
messages = self._read_until_prompt(client_file)
|
||||
|
||||
# Extract text messages containing stack info
|
||||
text_msg = [msg['message'] for msg in messages
|
||||
if 'message' in msg and 'connect_to_debugger' in msg['message']]
|
||||
got_stack_info = bool(text_msg)
|
||||
|
||||
expected_stacks = [
|
||||
"<module>",
|
||||
"connect_to_debugger",
|
||||
]
|
||||
|
||||
for stack, msg in zip(expected_stacks, text_msg, strict=True):
|
||||
self.assertIn(stack, msg)
|
||||
|
||||
self.assertTrue(got_stack_info, "Should have received stack trace information")
|
||||
|
||||
# Send 'c' (continue) command to let the program finish
|
||||
self._send_command(client_file, "c")
|
||||
|
||||
# Wait for process to finish
|
||||
stdout, _ = process.communicate(timeout=5)
|
||||
|
||||
# Check if we got the expected output
|
||||
self.assertIn("Function returned: 42", stdout)
|
||||
self.assertEqual(process.returncode, 0)
|
||||
|
||||
def test_breakpoints(self):
|
||||
"""Test setting and hitting breakpoints."""
|
||||
self._create_script()
|
||||
process, client_file = self._connect_and_get_client_file()
|
||||
with process:
|
||||
# Skip initial messages until we get to the prompt
|
||||
self._read_until_prompt(client_file)
|
||||
|
||||
# Set a breakpoint at the return statement
|
||||
self._send_command(client_file, "break bar")
|
||||
messages = self._read_until_prompt(client_file)
|
||||
bp_msg = next(msg['message'] for msg in messages if 'message' in msg)
|
||||
self.assertIn("Breakpoint", bp_msg)
|
||||
|
||||
# Continue execution until breakpoint
|
||||
self._send_command(client_file, "c")
|
||||
messages = self._read_until_prompt(client_file)
|
||||
|
||||
# Verify we hit the breakpoint
|
||||
hit_msg = next(msg['message'] for msg in messages if 'message' in msg)
|
||||
self.assertIn("bar()", hit_msg)
|
||||
|
||||
# Check breakpoint list
|
||||
self._send_command(client_file, "b")
|
||||
messages = self._read_until_prompt(client_file)
|
||||
list_msg = next(msg['message'] for msg in reversed(messages) if 'message' in msg)
|
||||
self.assertIn("1 breakpoint", list_msg)
|
||||
self.assertIn("breakpoint already hit 1 time", list_msg)
|
||||
|
||||
# Clear breakpoint
|
||||
self._send_command(client_file, "clear 1")
|
||||
messages = self._read_until_prompt(client_file)
|
||||
clear_msg = next(msg['message'] for msg in reversed(messages) if 'message' in msg)
|
||||
self.assertIn("Deleted breakpoint", clear_msg)
|
||||
|
||||
# Continue to end
|
||||
self._send_command(client_file, "c")
|
||||
stdout, _ = process.communicate(timeout=5)
|
||||
|
||||
self.assertIn("Function returned: 42", stdout)
|
||||
self.assertEqual(process.returncode, 0)
|
||||
|
||||
def test_keyboard_interrupt(self):
|
||||
"""Test that sending keyboard interrupt breaks into pdb."""
|
||||
synchronizer_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
synchronizer_sock.bind(('127.0.0.1', 0)) # Let OS assign port
|
||||
synchronizer_sock.settimeout(5)
|
||||
synchronizer_sock.listen(1)
|
||||
self.addCleanup(synchronizer_sock.close)
|
||||
sync_port = synchronizer_sock.getsockname()[1]
|
||||
|
||||
script = textwrap.dedent(f"""
|
||||
import time
|
||||
import sys
|
||||
import socket
|
||||
import pdb
|
||||
def bar():
|
||||
frame = sys._getframe() # Get the current frame
|
||||
pdb._connect(
|
||||
host='127.0.0.1',
|
||||
port={self.port},
|
||||
frame=frame,
|
||||
commands="",
|
||||
version=pdb._PdbServer.protocol_version(),
|
||||
)
|
||||
print("Connected to debugger")
|
||||
iterations = 10
|
||||
socket.create_connection(('127.0.0.1', {sync_port})).close()
|
||||
while iterations > 0:
|
||||
print("Iteration", iterations)
|
||||
time.sleep(1)
|
||||
iterations -= 1
|
||||
return 42
|
||||
|
||||
if __name__ == "__main__":
|
||||
print("Function returned:", bar())
|
||||
""")
|
||||
self._create_script(script=script)
|
||||
process, client_file = self._connect_and_get_client_file()
|
||||
|
||||
with process:
|
||||
|
||||
# Skip initial messages until we get to the prompt
|
||||
self._read_until_prompt(client_file)
|
||||
|
||||
# Continue execution
|
||||
self._send_command(client_file, "c")
|
||||
|
||||
# Wait until execution has continued
|
||||
synchronizer_sock.accept()[0].close()
|
||||
|
||||
# Inject a script to interrupt the running process
|
||||
self._send_interrupt(process.pid)
|
||||
messages = self._read_until_prompt(client_file)
|
||||
|
||||
# Verify we got the keyboard interrupt message
|
||||
interrupt_msg = next(msg['message'] for msg in messages if 'message' in msg)
|
||||
self.assertIn("bar()", interrupt_msg)
|
||||
|
||||
# Continue to end
|
||||
self._send_command(client_file, "iterations = 0")
|
||||
self._send_command(client_file, "c")
|
||||
stdout, _ = process.communicate(timeout=5)
|
||||
self.assertIn("Function returned: 42", stdout)
|
||||
self.assertEqual(process.returncode, 0)
|
||||
|
||||
def test_handle_eof(self):
|
||||
"""Test that EOF signal properly exits the debugger."""
|
||||
self._create_script()
|
||||
process, client_file = self._connect_and_get_client_file()
|
||||
|
||||
with process:
|
||||
# Skip initial messages until we get to the prompt
|
||||
self._read_until_prompt(client_file)
|
||||
|
||||
# Send EOF signal to exit the debugger
|
||||
client_file.write(json.dumps({"signal": "EOF"}).encode() + b"\n")
|
||||
client_file.flush()
|
||||
|
||||
# The process should complete normally after receiving EOF
|
||||
stdout, stderr = process.communicate(timeout=5)
|
||||
|
||||
# Verify process completed correctly
|
||||
self.assertIn("Function returned: 42", stdout)
|
||||
self.assertEqual(process.returncode, 0)
|
||||
self.assertEqual(stderr, "")
|
||||
|
||||
def test_protocol_version(self):
|
||||
"""Test that incompatible protocol versions are properly detected."""
|
||||
# Create a script using an incompatible protocol version
|
||||
script = textwrap.dedent(f'''
|
||||
import sys
|
||||
import pdb
|
||||
|
||||
def run_test():
|
||||
frame = sys._getframe()
|
||||
|
||||
# Use a fake version number that's definitely incompatible
|
||||
fake_version = 0x01010101 # A fake version that doesn't match any real Python version
|
||||
|
||||
# Connect with the wrong version
|
||||
pdb._connect(
|
||||
host='127.0.0.1',
|
||||
port={self.port},
|
||||
frame=frame,
|
||||
commands="",
|
||||
version=fake_version,
|
||||
)
|
||||
|
||||
# This should print if the debugger detaches correctly
|
||||
print("Debugger properly detected version mismatch")
|
||||
return True
|
||||
|
||||
if __name__ == "__main__":
|
||||
print("Test result:", run_test())
|
||||
''')
|
||||
self._create_script(script=script)
|
||||
process, client_file = self._connect_and_get_client_file()
|
||||
|
||||
with process:
|
||||
# First message should be an error about protocol version mismatch
|
||||
data = client_file.readline()
|
||||
message = json.loads(data.decode())
|
||||
|
||||
self.assertIn('message', message)
|
||||
self.assertEqual(message['type'], 'error')
|
||||
self.assertIn('incompatible', message['message'])
|
||||
self.assertIn('protocol version', message['message'])
|
||||
|
||||
# The process should complete normally
|
||||
stdout, stderr = process.communicate(timeout=5)
|
||||
|
||||
# Verify the process completed successfully
|
||||
self.assertIn("Test result: True", stdout)
|
||||
self.assertIn("Debugger properly detected version mismatch", stdout)
|
||||
self.assertEqual(process.returncode, 0)
|
||||
|
||||
def test_help_system(self):
|
||||
"""Test that the help system properly sends help text to the client."""
|
||||
self._create_script()
|
||||
process, client_file = self._connect_and_get_client_file()
|
||||
|
||||
with process:
|
||||
# Skip initial messages until we get to the prompt
|
||||
self._read_until_prompt(client_file)
|
||||
|
||||
# Request help for different commands
|
||||
help_commands = ["help", "help break", "help continue", "help pdb"]
|
||||
|
||||
for cmd in help_commands:
|
||||
self._send_command(client_file, cmd)
|
||||
|
||||
# Look for help message
|
||||
data = client_file.readline()
|
||||
message = json.loads(data.decode())
|
||||
|
||||
self.assertIn('help', message)
|
||||
|
||||
if cmd == "help":
|
||||
# Should just contain the command itself
|
||||
self.assertEqual(message['help'], "")
|
||||
else:
|
||||
# Should contain the specific command we asked for help with
|
||||
command = cmd.split()[1]
|
||||
self.assertEqual(message['help'], command)
|
||||
|
||||
# Skip to the next prompt
|
||||
self._read_until_prompt(client_file)
|
||||
|
||||
# Continue execution to finish the program
|
||||
self._send_command(client_file, "c")
|
||||
|
||||
stdout, stderr = process.communicate(timeout=5)
|
||||
self.assertIn("Function returned: 42", stdout)
|
||||
self.assertEqual(process.returncode, 0)
|
||||
|
||||
def test_multi_line_commands(self):
|
||||
"""Test that multi-line commands work properly over remote connection."""
|
||||
self._create_script()
|
||||
process, client_file = self._connect_and_get_client_file()
|
||||
|
||||
with process:
|
||||
# Skip initial messages until we get to the prompt
|
||||
self._read_until_prompt(client_file)
|
||||
|
||||
# Send a multi-line command
|
||||
multi_line_commands = [
|
||||
# Define a function
|
||||
"def test_func():\n return 42",
|
||||
|
||||
# For loop
|
||||
"for i in range(3):\n print(i)",
|
||||
|
||||
# If statement
|
||||
"if True:\n x = 42\nelse:\n x = 0",
|
||||
|
||||
# Try/except
|
||||
"try:\n result = 10/2\n print(result)\nexcept ZeroDivisionError:\n print('Error')",
|
||||
|
||||
# Class definition
|
||||
"class TestClass:\n def __init__(self):\n self.value = 100\n def get_value(self):\n return self.value"
|
||||
]
|
||||
|
||||
for cmd in multi_line_commands:
|
||||
self._send_command(client_file, cmd)
|
||||
self._read_until_prompt(client_file)
|
||||
|
||||
# Test executing the defined function
|
||||
self._send_command(client_file, "test_func()")
|
||||
messages = self._read_until_prompt(client_file)
|
||||
|
||||
# Find the result message
|
||||
result_msg = next(msg['message'] for msg in messages if 'message' in msg)
|
||||
self.assertIn("42", result_msg)
|
||||
|
||||
# Test creating an instance of the defined class
|
||||
self._send_command(client_file, "obj = TestClass()")
|
||||
self._read_until_prompt(client_file)
|
||||
|
||||
# Test calling a method on the instance
|
||||
self._send_command(client_file, "obj.get_value()")
|
||||
messages = self._read_until_prompt(client_file)
|
||||
|
||||
# Find the result message
|
||||
result_msg = next(msg['message'] for msg in messages if 'message' in msg)
|
||||
self.assertIn("100", result_msg)
|
||||
|
||||
# Continue execution to finish
|
||||
self._send_command(client_file, "c")
|
||||
|
||||
stdout, stderr = process.communicate(timeout=5)
|
||||
self.assertIn("Function returned: 42", stdout)
|
||||
self.assertEqual(process.returncode, 0)
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
|
@ -0,0 +1,3 @@
|
|||
The CLI for the PDB debugger now accepts a ``-p PID`` argument to allow
|
||||
attaching to a running process. The process must be running the same version
|
||||
of Python as the one running PDB.
|
Loading…
Add table
Add a link
Reference in a new issue