Use the faulthandler module's infrastructure to write a GIL-less

memory watchdog for timely stats collection.
This commit is contained in:
Antoine Pitrou 2011-10-04 11:51:23 +02:00
parent 031487eb3b
commit 75e78b6c77
2 changed files with 249 additions and 43 deletions

View file

@ -23,6 +23,7 @@ import time
import sysconfig
import fnmatch
import logging.handlers
import struct
try:
import _thread, threading
@ -34,6 +35,10 @@ try:
except ImportError:
multiprocessing = None
try:
import faulthandler
except ImportError:
faulthandler = None
try:
import zlib
@ -1133,41 +1138,66 @@ def set_memlimit(limit):
raise ValueError('Memory limit %r too low to be useful' % (limit,))
max_memuse = memlimit
def _memory_watchdog(start_evt, finish_evt, period=10.0):
"""A function which periodically watches the process' memory consumption
class _MemoryWatchdog:
"""An object which periodically watches the process' memory consumption
and prints it out.
"""
# XXX: because of the GIL, and because the very long operations tested
# in most bigmem tests are uninterruptible, the loop below gets woken up
# much less often than expected.
# The polling code should be rewritten in raw C, without holding the GIL,
# and push results onto an anonymous pipe.
try:
page_size = os.sysconf('SC_PAGESIZE')
except (ValueError, AttributeError):
def __init__(self):
self.procfile = '/proc/{pid}/statm'.format(pid=os.getpid())
self.started = False
self.thread = None
try:
page_size = os.sysconf('SC_PAGE_SIZE')
self.page_size = os.sysconf('SC_PAGESIZE')
except (ValueError, AttributeError):
page_size = 4096
procfile = '/proc/{pid}/statm'.format(pid=os.getpid())
try:
f = open(procfile, 'rb')
except IOError as e:
warnings.warn('/proc not available for stats: {}'.format(e),
RuntimeWarning)
sys.stderr.flush()
return
with f:
start_evt.set()
old_data = -1
while not finish_evt.wait(period):
f.seek(0)
statm = f.read().decode('ascii')
data = int(statm.split()[5])
if data != old_data:
old_data = data
try:
self.page_size = os.sysconf('SC_PAGE_SIZE')
except (ValueError, AttributeError):
self.page_size = 4096
def consumer(self, fd):
HEADER = "l"
header_size = struct.calcsize(HEADER)
try:
while True:
header = os.read(fd, header_size)
if len(header) < header_size:
# Pipe closed on other end
break
data_len, = struct.unpack(HEADER, header)
data = os.read(fd, data_len)
statm = data.decode('ascii')
data = int(statm.split()[5])
print(" ... process data size: {data:.1f}G"
.format(data=data * page_size / (1024 ** 3)))
.format(data=data * self.page_size / (1024 ** 3)))
finally:
os.close(fd)
def start(self):
if not faulthandler or not hasattr(faulthandler, '_file_watchdog'):
return
try:
rfd = os.open(self.procfile, os.O_RDONLY)
except OSError as e:
warnings.warn('/proc not available for stats: {}'.format(e),
RuntimeWarning)
sys.stderr.flush()
return
pipe_fd, wfd = os.pipe()
# _file_watchdog() doesn't take the GIL in its child thread, and
# therefore collects statistics timely
faulthandler._file_watchdog(rfd, wfd, 3.0)
self.started = True
self.thread = threading.Thread(target=self.consumer, args=(pipe_fd,))
self.thread.daemon = True
self.thread.start()
def stop(self):
if not self.started:
return
faulthandler._cancel_file_watchdog()
self.thread.join()
def bigmemtest(size, memuse, dry_run=True):
"""Decorator for bigmem tests.
@ -1194,27 +1224,20 @@ def bigmemtest(size, memuse, dry_run=True):
"not enough memory: %.1fG minimum needed"
% (size * memuse / (1024 ** 3)))
if real_max_memuse and verbose and threading:
if real_max_memuse and verbose and faulthandler and threading:
print()
print(" ... expected peak memory use: {peak:.1f}G"
.format(peak=size * memuse / (1024 ** 3)))
sys.stdout.flush()
start_evt = threading.Event()
finish_evt = threading.Event()
t = threading.Thread(target=_memory_watchdog,
args=(start_evt, finish_evt, 0.5))
t.daemon = True
t.start()
start_evt.set()
watchdog = _MemoryWatchdog()
watchdog.start()
else:
t = None
watchdog = None
try:
return f(self, maxsize)
finally:
if t:
finish_evt.set()
t.join()
if watchdog:
watchdog.stop()
wrapper.size = size
wrapper.memuse = memuse