mirror of
https://github.com/python/cpython.git
synced 2025-08-03 16:39:00 +00:00

Previously every test was building an extension module and loading it into sys.modules. The tearDown function was thus not able to clean up correctly, resulting in memory leaks. With this commit, every test function now builds the extension module and runs the actual test code in a new process (using assert_python_ok), so that sys.modules stays intact and no memory gets leaked.
124 lines
3.9 KiB
Python
124 lines
3.9 KiB
Python
import importlib.util
|
|
import io
|
|
import os
|
|
import pathlib
|
|
import sys
|
|
import textwrap
|
|
import tokenize
|
|
|
|
from typing import Any, cast, Dict, IO, Type, Final
|
|
|
|
from pegen.build import compile_c_extension
|
|
from pegen.c_generator import CParserGenerator
|
|
from pegen.grammar import Grammar
|
|
from pegen.grammar_parser import GeneratedParser as GrammarParser
|
|
from pegen.parser import Parser
|
|
from pegen.python_generator import PythonParserGenerator
|
|
from pegen.tokenizer import Tokenizer
|
|
|
|
|
|
def generate_parser(grammar: Grammar) -> Type[Parser]:
|
|
# Generate a parser.
|
|
out = io.StringIO()
|
|
genr = PythonParserGenerator(grammar, out)
|
|
genr.generate("<string>")
|
|
|
|
# Load the generated parser class.
|
|
ns: Dict[str, Any] = {}
|
|
exec(out.getvalue(), ns)
|
|
return ns["GeneratedParser"]
|
|
|
|
|
|
def run_parser(file: IO[bytes], parser_class: Type[Parser], *, verbose: bool = False) -> Any:
|
|
# Run a parser on a file (stream).
|
|
tokenizer = Tokenizer(tokenize.generate_tokens(file.readline)) # type: ignore # typeshed issue #3515
|
|
parser = parser_class(tokenizer, verbose=verbose)
|
|
result = parser.start()
|
|
if result is None:
|
|
raise parser.make_syntax_error()
|
|
return result
|
|
|
|
|
|
def parse_string(
|
|
source: str, parser_class: Type[Parser], *, dedent: bool = True, verbose: bool = False
|
|
) -> Any:
|
|
# Run the parser on a string.
|
|
if dedent:
|
|
source = textwrap.dedent(source)
|
|
file = io.StringIO(source)
|
|
return run_parser(file, parser_class, verbose=verbose) # type: ignore # typeshed issue #3515
|
|
|
|
|
|
def make_parser(source: str) -> Type[Parser]:
|
|
# Combine parse_string() and generate_parser().
|
|
grammar = parse_string(source, GrammarParser)
|
|
return generate_parser(grammar)
|
|
|
|
|
|
def import_file(full_name: str, path: str) -> Any:
|
|
"""Import a python module from a path"""
|
|
|
|
spec = importlib.util.spec_from_file_location(full_name, path)
|
|
mod = importlib.util.module_from_spec(spec)
|
|
|
|
# We assume this is not None and has an exec_module() method.
|
|
# See https://docs.python.org/3/reference/import.html?highlight=exec_module#loading
|
|
loader = cast(Any, spec.loader)
|
|
loader.exec_module(mod)
|
|
return mod
|
|
|
|
|
|
def generate_c_parser_source(grammar: Grammar) -> str:
|
|
out = io.StringIO()
|
|
genr = CParserGenerator(grammar, out)
|
|
genr.generate("<string>")
|
|
return out.getvalue()
|
|
|
|
|
|
def generate_parser_c_extension(
|
|
grammar: Grammar, path: pathlib.PurePath, debug: bool = False
|
|
) -> Any:
|
|
"""Generate a parser c extension for the given grammar in the given path
|
|
|
|
Returns a module object with a parse_string() method.
|
|
TODO: express that using a Protocol.
|
|
"""
|
|
# Make sure that the working directory is empty: reusing non-empty temporary
|
|
# directories when generating extensions can lead to segmentation faults.
|
|
# Check issue #95 (https://github.com/gvanrossum/pegen/issues/95) for more
|
|
# context.
|
|
assert not os.listdir(path)
|
|
source = path / "parse.c"
|
|
with open(source, "w") as file:
|
|
genr = CParserGenerator(grammar, file, debug=debug)
|
|
genr.generate("parse.c")
|
|
compile_c_extension(str(source), build_dir=str(path))
|
|
|
|
|
|
def print_memstats() -> bool:
|
|
MiB: Final = 2 ** 20
|
|
try:
|
|
import psutil # type: ignore
|
|
except ImportError:
|
|
return False
|
|
print("Memory stats:")
|
|
process = psutil.Process()
|
|
meminfo = process.memory_info()
|
|
res = {}
|
|
res["rss"] = meminfo.rss / MiB
|
|
res["vms"] = meminfo.vms / MiB
|
|
if sys.platform == "win32":
|
|
res["maxrss"] = meminfo.peak_wset / MiB
|
|
else:
|
|
# See https://stackoverflow.com/questions/938733/total-memory-used-by-python-process
|
|
import resource # Since it doesn't exist on Windows.
|
|
|
|
rusage = resource.getrusage(resource.RUSAGE_SELF)
|
|
if sys.platform == "darwin":
|
|
factor = 1
|
|
else:
|
|
factor = 1024 # Linux
|
|
res["maxrss"] = rusage.ru_maxrss * factor / MiB
|
|
for key, value in res.items():
|
|
print(f" {key:12.12s}: {value:10.0f} MiB")
|
|
return True
|