Add type annotations to public API.

This commit is contained in:
Pavel Minaev 2022-05-19 17:01:23 -07:00 committed by Pavel Minaev
parent 891bc7f88b
commit eb126910c0
5 changed files with 93 additions and 68 deletions

View file

@ -34,3 +34,5 @@ assert sys.version_info >= (3, 7), (
# SyntaxError on Python 2 and preventing the above version check from executing.
from debugpy.public_api import * # noqa
from debugpy.public_api import __version__
del sys

View file

@ -2,6 +2,12 @@
# Licensed under the MIT License. See LICENSE in the project root
# for license information.
from __future__ import annotations
import typing
if typing.TYPE_CHECKING:
__all__: list[str]
__all__ = []
access_token = None

View file

@ -2,8 +2,12 @@
# Licensed under the MIT License. See LICENSE in the project root
# for license information.
from __future__ import annotations
import os
import typing
if typing.TYPE_CHECKING:
__all__: list[str]
__all__ = []

View file

@ -2,6 +2,9 @@
# Licensed under the MIT License. See LICENSE in the project root
# for license information.
from __future__ import annotations
import typing
from debugpy import _version
@ -13,19 +16,46 @@ from debugpy import _version
# than 72 characters per line! - and must be readable when retrieved via help().
def log_to(path):
# Type aliases and protocols must be guarded to avoid runtime errors due to unsupported
# syntax in Python <3.9; since they aren't annotations, they're eagerly evaluated!
if typing.TYPE_CHECKING:
Endpoint = tuple[str, int]
def _api(cancelable=False):
def apply(f):
def wrapper(*args, **kwargs):
from debugpy.server import api
wrapped = getattr(api, f.__name__)
wrapped(*args, **kwargs)
if cancelable:
def cancel(*args, **kwargs):
from debugpy.server import api
wrapped = getattr(api, f.__name__)
wrapped.cancel(*args, **kwargs)
wrapper.cancel = cancel
return wrapper
return apply
@_api()
def log_to(__path: str) -> None:
"""Generate detailed debugpy logs in the specified directory.
The directory must already exist. Several log files are generated,
one for every process involved in the debug session.
"""
from debugpy.server import api
return api.log_to(path)
def configure(properties=None, **kwargs):
@_api()
def configure(__properties: dict[str] = None, **kwargs) -> None:
"""Sets debug configuration properties that cannot be set in the
"attach" request, because they must be applied as early as possible
in the process being debugged.
@ -59,93 +89,75 @@ def configure(properties=None, **kwargs):
debugpy.configure({"subProcess": False})
"""
from debugpy.server import api
return api.configure(properties, **kwargs)
def listen(address):
@_api()
def listen(__endpoint: Endpoint | int) -> Endpoint:
"""Starts a debug adapter debugging this process, that listens for
incoming socket connections from clients on the specified address.
address must be either a (host, port) tuple, as defined by the
standard socket module for the AF_INET address family, or a port
`__endpoint` must be either a (host, port) tuple as defined by the
standard `socket` module for the `AF_INET` address family, or a port
number. If only the port is specified, host is "127.0.0.1".
Returns the interface and the port on which the debug adapter is
actually listening, in the same format as address. This may be
actually listening, in the same format as `__endpoint`. This may be
different from address if port was 0 in the latter, in which case
the adapter will pick some unused ephemeral port to listen on.
This function does't wait for a client to connect to the debug
adapter that it starts. Use wait_for_client() to block execution
adapter that it starts. Use `wait_for_client` to block execution
until the client connects.
"""
from debugpy.server import api
return api.listen(address)
def connect(address, *, access_token=None):
@_api()
def connect(__endpoint: Endpoint | int, *, access_token: str = None) -> Endpoint:
"""Tells an existing debug adapter instance that is listening on the
specified address to debug this process.
address must be either a (host, port) tuple, as defined by the
standard socket module for the AF_INET address family, or a port
`__endpoint` must be either a (host, port) tuple as defined by the
standard `socket` module for the `AF_INET` address family, or a port
number. If only the port is specified, host is "127.0.0.1".
access_token must be the same value that was passed to the adapter
via the --server-access-token command-line switch.
`access_token` must be the same value that was passed to the adapter
via the `--server-access-token` command-line switch.
This function does't wait for a client to connect to the debug
adapter that it connects to. Use wait_for_client() to block
adapter that it connects to. Use `wait_for_client` to block
execution until the client connects.
"""
from debugpy.server import api
return api.connect(address, access_token=access_token)
def wait_for_client():
@_api(cancelable=True)
def wait_for_client() -> None:
"""If there is a client connected to the debug adapter that is
debugging this process, returns immediately. Otherwise, blocks
until a client connects to the adapter.
While this function is waiting, it can be canceled by calling
wait_for_client.cancel() from another thread.
`wait_for_client.cancel()` from another thread.
"""
from debugpy.server import api
return api.wait_for_client()
def is_client_connected():
@_api()
def is_client_connected() -> bool:
"""True if a client is connected to the debug adapter that is
debugging this process.
"""
from debugpy.server import api
return api.is_client_connected()
def breakpoint():
@_api()
def breakpoint() -> None:
"""If a client is connected to the debug adapter that is debugging
this process, pauses execution of all threads, and simulates a
breakpoint being hit at the line following the call.
On Python 3.7 and above, this is the same as builtins.breakpoint().
It is also registered as the default handler for builtins.breakpoint().
"""
from debugpy.server import api
return api.breakpoint()
def debug_this_thread():
@_api()
def debug_this_thread() -> None:
"""Makes the debugger aware of the current thread.
Must be called on any background thread that is started by means
@ -153,12 +165,9 @@ def debug_this_thread():
in order for breakpoints to work on that thread.
"""
from debugpy.server import api
return api.debug_this_thread()
def trace_this_thread(should_trace):
@_api()
def trace_this_thread(__should_trace: bool):
"""Tells the debug adapter to enable or disable tracing on the
current thread.
@ -172,9 +181,5 @@ def trace_this_thread(should_trace):
client connected to the debug adapter.
"""
from debugpy.server import api
return api.trace_this_thread(should_trace)
__version__ = _version.get_versions()["version"]
__version__: str = _version.get_versions()["version"]

View file

@ -113,7 +113,7 @@ def _starts_debugging(func):
port.__index__() # ensure it's int-like
except Exception:
raise ValueError("expected port or (host, port)")
if not (0 <= port < 2**16):
if not (0 <= port < 2 ** 16):
raise ValueError("invalid port number")
ensure_logging()
@ -269,22 +269,30 @@ def listen(address, settrace_kwargs):
@_starts_debugging
def connect(address, settrace_kwargs, access_token):
def connect(address, settrace_kwargs, access_token=None):
host, port = address
_settrace(host=host, port=port, client_access_token=access_token, **settrace_kwargs)
def wait_for_client():
ensure_logging()
log.debug("wait_for_client()")
class wait_for_client:
def __call__(self):
ensure_logging()
log.debug("wait_for_client()")
pydb = get_global_debugger()
if pydb is None:
raise RuntimeError("listen() or connect() must be called first")
pydb = get_global_debugger()
if pydb is None:
raise RuntimeError("listen() or connect() must be called first")
cancel_event = threading.Event()
debugpy.wait_for_client.cancel = wait_for_client.cancel = cancel_event.set
pydevd._wait_for_attach(cancel=cancel_event)
cancel_event = threading.Event()
self.cancel = cancel_event.set
pydevd._wait_for_attach(cancel=cancel_event)
@staticmethod
def cancel():
raise RuntimeError("wait_for_client() must be called first")
wait_for_client = wait_for_client()
def is_client_connected():