[docs] Fix typo in docstring and add example to logging cookbook. (GH-117157)

This commit is contained in:
Vinay Sajip 2024-03-22 17:25:51 +00:00 committed by GitHub
parent 40d75c2b7f
commit 00baaa21de
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 127 additions and 15 deletions

View file

@ -1846,8 +1846,11 @@ the use of a :class:`Filter` does not provide the desired result.
.. _zeromq-handlers:
Subclassing QueueHandler - a ZeroMQ example
-------------------------------------------
Subclassing QueueHandler and QueueListener- a ZeroMQ example
------------------------------------------------------------
Subclass ``QueueHandler``
^^^^^^^^^^^^^^^^^^^^^^^^^
You can use a :class:`QueueHandler` subclass to send messages to other kinds
of queues, for example a ZeroMQ 'publish' socket. In the example below,the
@ -1885,8 +1888,8 @@ data needed by the handler to create the socket::
self.queue.close()
Subclassing QueueListener - a ZeroMQ example
--------------------------------------------
Subclass ``QueueListener``
^^^^^^^^^^^^^^^^^^^^^^^^^^
You can also subclass :class:`QueueListener` to get messages from other kinds
of queues, for example a ZeroMQ 'subscribe' socket. Here's an example::
@ -1903,25 +1906,134 @@ of queues, for example a ZeroMQ 'subscribe' socket. Here's an example::
msg = self.queue.recv_json()
return logging.makeLogRecord(msg)
.. _pynng-handlers:
.. seealso::
Subclassing QueueHandler and QueueListener- a ``pynng`` example
---------------------------------------------------------------
Module :mod:`logging`
API reference for the logging module.
In a similar way to the above section, we can implement a listener and handler
using `pynng <https://pypi.org/project/pynng/>`_, which is a Python binding to
`NNG <https://nng.nanomsg.org/>`_, billed as a spiritual successor to ZeroMQ.
The following snippets illustrate -- you can test them in an environment which has
``pynng`` installed. Juat for variety, we present the listener first.
Module :mod:`logging.config`
Configuration API for the logging module.
Module :mod:`logging.handlers`
Useful handlers included with the logging module.
Subclass ``QueueListener``
^^^^^^^^^^^^^^^^^^^^^^^^^^
:ref:`A basic logging tutorial <logging-basic-tutorial>`
.. code-block:: python
:ref:`A more advanced logging tutorial <logging-advanced-tutorial>`
import json
import logging
import logging.handlers
import pynng
DEFAULT_ADDR = "tcp://localhost:13232"
interrupted = False
class NNGSocketListener(logging.handlers.QueueListener):
def __init__(self, uri, /, *handlers, **kwargs):
# Have a timeout for interruptability, and open a
# subscriber socket
socket = pynng.Sub0(listen=uri, recv_timeout=500)
# The b'' subscription matches all topics
topics = kwargs.pop('topics', None) or b''
socket.subscribe(topics)
# We treat the socket as a queue
super().__init__(socket, *handlers, **kwargs)
def dequeue(self, block):
data = None
# Keep looping while not interrupted and no data received over the
# socket
while not interrupted:
try:
data = self.queue.recv(block=block)
break
except pynng.Timeout:
pass
except pynng.Closed: # sometimes hit when you hit Ctrl-C
break
if data is None:
return None
# Get the logging event sent from a publisher
event = json.loads(data.decode('utf-8'))
return logging.makeLogRecord(event)
def enqueue_sentinel(self):
# Not used in this implementation, as the socket isn't really a
# queue
pass
logging.getLogger('pynng').propagate = False
listener = NNGSocketListener(DEFAULT_ADDR, logging.StreamHandler(), topics=b'')
listener.start()
print('Press Ctrl-C to stop.')
try:
while True:
pass
except KeyboardInterrupt:
interrupted = True
finally:
listener.stop()
Subclass ``QueueHandler``
^^^^^^^^^^^^^^^^^^^^^^^^^
.. currentmodule:: logging
.. code-block:: python
import json
import logging
import logging.handlers
import time
import random
import pynng
DEFAULT_ADDR = "tcp://localhost:13232"
class NNGSocketHandler(logging.handlers.QueueHandler):
def __init__(self, uri):
socket = pynng.Pub0(dial=uri, send_timeout=500)
super().__init__(socket)
def enqueue(self, record):
# Send the record as UTF-8 encoded JSON
d = dict(record.__dict__)
data = json.dumps(d)
self.queue.send(data.encode('utf-8'))
def close(self):
self.queue.close()
logging.getLogger('pynng').propagate = False
handler = NNGSocketHandler(DEFAULT_ADDR)
logging.basicConfig(level=logging.DEBUG,
handlers=[logging.StreamHandler(), handler],
format='%(levelname)-8s %(name)10s %(message)s')
levels = (logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR,
logging.CRITICAL)
logger_names = ('myapp', 'myapp.lib1', 'myapp.lib2')
msgno = 1
while True:
# Just randomly select some loggers and levels and log away
level = random.choice(levels)
logger = logging.getLogger(random.choice(logger_names))
logger.log(level, 'Message no. %5d' % msgno)
msgno += 1
delay = random.random() * 2 + 0.5
time.sleep(delay)
You can run the above two snippets in separate command shells.
An example dictionary-based configuration
-----------------------------------------
@ -3418,7 +3530,7 @@ The worker thread is implemented using Qt's ``QThread`` class rather than the
:mod:`threading` module, as there are circumstances where one has to use
``QThread``, which offers better integration with other ``Qt`` components.
The code should work with recent releases of either ``PySide6``, ``PyQt6``,
The code should work with recent releases of any of ``PySide6``, ``PyQt6``,
``PySide2`` or ``PyQt5``. You should be able to adapt the approach to earlier
versions of Qt. Please refer to the comments in the code snippet for more
detailed information.