bpo-40334: PEP 617 implementation: New PEG parser for CPython (GH-19503)

Co-authored-by: Guido van Rossum <guido@python.org>
Co-authored-by: Lysandros Nikolaou <lisandrosnik@gmail.com>
This commit is contained in:
Pablo Galindo 2020-04-22 23:29:27 +01:00 committed by GitHub
parent a81849b031
commit c5fc156852
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
91 changed files with 27057 additions and 146 deletions

View file

View file

@ -0,0 +1,136 @@
#!/usr/bin/env python3.8
"""pegen -- PEG Generator.
Search the web for PEG Parsers for reference.
"""
import argparse
import sys
import time
import token
import traceback
from typing import Final
from pegen.build import build_parser_and_generator
from pegen.testutil import print_memstats
argparser = argparse.ArgumentParser(
prog="pegen", description="Experimental PEG-like parser generator"
)
argparser.add_argument("-q", "--quiet", action="store_true", help="Don't print the parsed grammar")
argparser.add_argument(
"-v",
"--verbose",
action="count",
default=0,
help="Print timing stats; repeat for more debug output",
)
argparser.add_argument(
"-c", "--cpython", action="store_true", help="Generate C code for inclusion into CPython"
)
argparser.add_argument(
"--compile-extension",
action="store_true",
help="Compile generated C code into an extension module",
)
argparser.add_argument(
"-o",
"--output",
metavar="OUT",
help="Where to write the generated parser (default parse.py or parse.c)",
)
argparser.add_argument("filename", help="Grammar description")
argparser.add_argument(
"--optimized", action="store_true", help="Compile the extension in optimized mode"
)
argparser.add_argument(
"--skip-actions", action="store_true", help="Suppress code emission for rule actions",
)
def main() -> None:
args = argparser.parse_args()
verbose = args.verbose
verbose_tokenizer = verbose >= 3
verbose_parser = verbose == 2 or verbose >= 4
t0 = time.time()
output_file = args.output
if not output_file:
if args.cpython:
output_file = "parse.c"
else:
output_file = "parse.py"
try:
grammar, parser, tokenizer, gen = build_parser_and_generator(
args.filename,
output_file,
args.compile_extension,
verbose_tokenizer,
verbose_parser,
args.verbose,
keep_asserts_in_extension=False if args.optimized else True,
skip_actions=args.skip_actions,
)
except Exception as err:
if args.verbose:
raise # Show traceback
traceback.print_exception(err.__class__, err, None)
sys.stderr.write("For full traceback, use -v\n")
sys.exit(1)
if not args.quiet:
if args.verbose:
print("Raw Grammar:")
for line in repr(grammar).splitlines():
print(" ", line)
print("Clean Grammar:")
for line in str(grammar).splitlines():
print(" ", line)
if args.verbose:
print("First Graph:")
for src, dsts in gen.first_graph.items():
print(f" {src} -> {', '.join(dsts)}")
print("First SCCS:")
for scc in gen.first_sccs:
print(" ", scc, end="")
if len(scc) > 1:
print(
" # Indirectly left-recursive; leaders:",
{name for name in scc if grammar.rules[name].leader},
)
else:
name = next(iter(scc))
if name in gen.first_graph[name]:
print(" # Left-recursive")
else:
print()
t1 = time.time()
if args.verbose:
dt = t1 - t0
diag = tokenizer.diagnose()
nlines = diag.end[0]
if diag.type == token.ENDMARKER:
nlines -= 1
print(f"Total time: {dt:.3f} sec; {nlines} lines", end="")
if dt:
print(f"; {nlines / dt:.0f} lines/sec")
else:
print()
print("Caches sizes:")
print(f" token array : {len(tokenizer._tokens):10}")
print(f" cache : {len(parser._cache):10}")
if not print_memstats():
print("(Can't find psutil; install it for memory stats.)")
if __name__ == "__main__":
main()

View file

@ -0,0 +1,169 @@
import pathlib
import shutil
import tokenize
from typing import Optional, Tuple
import distutils.log
from distutils.core import Distribution, Extension
from distutils.command.clean import clean # type: ignore
from distutils.command.build_ext import build_ext # type: ignore
from pegen.c_generator import CParserGenerator
from pegen.grammar import Grammar
from pegen.grammar_parser import GeneratedParser as GrammarParser
from pegen.parser import Parser
from pegen.parser_generator import ParserGenerator
from pegen.python_generator import PythonParserGenerator
from pegen.tokenizer import Tokenizer
MOD_DIR = pathlib.Path(__file__).parent
def compile_c_extension(
generated_source_path: str,
build_dir: Optional[str] = None,
verbose: bool = False,
keep_asserts: bool = True,
) -> str:
"""Compile the generated source for a parser generator into an extension module.
The extension module will be generated in the same directory as the provided path
for the generated source, with the same basename (in addition to extension module
metadata). For example, for the source mydir/parser.c the generated extension
in a darwin system with python 3.8 will be mydir/parser.cpython-38-darwin.so.
If *build_dir* is provided, that path will be used as the temporary build directory
of distutils (this is useful in case you want to use a temporary directory).
"""
if verbose:
distutils.log.set_verbosity(distutils.log.DEBUG)
source_file_path = pathlib.Path(generated_source_path)
extension_name = source_file_path.stem
extra_compile_args = []
if keep_asserts:
extra_compile_args.append("-UNDEBUG")
extension = [
Extension(
extension_name,
sources=[
str(MOD_DIR.parent.parent.parent / "Python" / "Python-ast.c"),
str(MOD_DIR.parent.parent.parent / "Python" / "asdl.c"),
str(MOD_DIR.parent.parent.parent / "Parser" / "tokenizer.c"),
str(MOD_DIR.parent.parent.parent / "Parser" / "pegen" / "pegen.c"),
str(MOD_DIR.parent.parent.parent / "Parser" / "pegen" / "parse_string.c"),
str(MOD_DIR.parent / "peg_extension" / "peg_extension.c"),
generated_source_path,
],
include_dirs=[
str(MOD_DIR.parent.parent.parent / "Include" / "internal"),
str(MOD_DIR.parent.parent.parent / "Parser"),
str(MOD_DIR.parent.parent.parent / "Parser" / "pegen"),
],
extra_compile_args=extra_compile_args,
)
]
dist = Distribution({"name": extension_name, "ext_modules": extension})
cmd = build_ext(dist)
cmd.inplace = True
if build_dir:
cmd.build_temp = build_dir
cmd.ensure_finalized()
cmd.run()
extension_path = source_file_path.parent / cmd.get_ext_filename(extension_name)
shutil.move(cmd.get_ext_fullpath(extension_name), extension_path)
cmd = clean(dist)
cmd.finalize_options()
cmd.run()
return extension_path
def build_parser(
grammar_file: str, verbose_tokenizer: bool = False, verbose_parser: bool = False
) -> Tuple[Grammar, Parser, Tokenizer]:
with open(grammar_file) as file:
tokenizer = Tokenizer(tokenize.generate_tokens(file.readline), verbose=verbose_tokenizer)
parser = GrammarParser(tokenizer, verbose=verbose_parser)
grammar = parser.start()
if not grammar:
raise parser.make_syntax_error(grammar_file)
return grammar, parser, tokenizer
def build_generator(
tokenizer: Tokenizer,
grammar: Grammar,
grammar_file: str,
output_file: str,
compile_extension: bool = False,
verbose_c_extension: bool = False,
keep_asserts_in_extension: bool = True,
skip_actions: bool = False,
) -> ParserGenerator:
# TODO: Allow other extensions; pass the output type as an argument.
if not output_file.endswith((".c", ".py")):
raise RuntimeError("Your output file must either be a .c or .py file")
with open(output_file, "w") as file:
gen: ParserGenerator
if output_file.endswith(".c"):
gen = CParserGenerator(grammar, file, skip_actions=skip_actions)
elif output_file.endswith(".py"):
gen = PythonParserGenerator(grammar, file) # TODO: skip_actions
else:
assert False # Should have been checked above
gen.generate(grammar_file)
if compile_extension and output_file.endswith(".c"):
compile_c_extension(
output_file, verbose=verbose_c_extension, keep_asserts=keep_asserts_in_extension
)
return gen
def build_parser_and_generator(
grammar_file: str,
output_file: str,
compile_extension: bool = False,
verbose_tokenizer: bool = False,
verbose_parser: bool = False,
verbose_c_extension: bool = False,
keep_asserts_in_extension: bool = True,
skip_actions: bool = False,
) -> Tuple[Grammar, Parser, Tokenizer, ParserGenerator]:
"""Generate rules, parser, tokenizer, parser generator for a given grammar
Args:
grammar_file (string): Path for the grammar file
output_file (string): Path for the output file
compile_extension (bool, optional): Whether to compile the C extension.
Defaults to False.
verbose_tokenizer (bool, optional): Whether to display additional output
when generating the tokenizer. Defaults to False.
verbose_parser (bool, optional): Whether to display additional output
when generating the parser. Defaults to False.
verbose_c_extension (bool, optional): Whether to display additional
output when compiling the C extension . Defaults to False.
keep_asserts_in_extension (bool, optional): Whether to keep the assert statements
when compiling the extension module. Defaults to True.
skip_actions (bool, optional): Whether to pretend no rule has any actions.
"""
grammar, parser, tokenizer = build_parser(grammar_file, verbose_tokenizer, verbose_parser)
gen = build_generator(
tokenizer,
grammar,
grammar_file,
output_file,
compile_extension,
verbose_c_extension,
keep_asserts_in_extension,
skip_actions=skip_actions,
)
return grammar, parser, tokenizer, gen

View file

@ -0,0 +1,605 @@
import ast
import re
from typing import Any, cast, Dict, IO, Optional, List, Text, Tuple
from pegen.grammar import (
Cut,
GrammarVisitor,
Rhs,
Alt,
NamedItem,
NameLeaf,
StringLeaf,
Lookahead,
PositiveLookahead,
NegativeLookahead,
Opt,
Repeat0,
Repeat1,
Gather,
Group,
Rule,
)
from pegen import grammar
from pegen.parser_generator import dedupe, ParserGenerator
from pegen.tokenizer import exact_token_types
EXTENSION_PREFIX = """\
#include "pegen.h"
"""
EXTENSION_SUFFIX = """
void *
_PyPegen_parse(Parser *p)
{
// Initialize keywords
p->keywords = reserved_keywords;
p->n_keyword_lists = n_keyword_lists;
return start_rule(p);
}
"""
class CCallMakerVisitor(GrammarVisitor):
def __init__(self, parser_generator: ParserGenerator):
self.gen = parser_generator
self.cache: Dict[Any, Any] = {}
self.keyword_cache: Dict[str, int] = {}
def keyword_helper(self, keyword: str) -> Tuple[str, str]:
if keyword not in self.keyword_cache:
self.keyword_cache[keyword] = self.gen.keyword_type()
return "keyword", f"_PyPegen_expect_token(p, {self.keyword_cache[keyword]})"
def visit_NameLeaf(self, node: NameLeaf) -> Tuple[str, str]:
name = node.value
if name in ("NAME", "NUMBER", "STRING"):
name = name.lower()
return f"{name}_var", f"_PyPegen_{name}_token(p)"
if name in ("NEWLINE", "DEDENT", "INDENT", "ENDMARKER", "ASYNC", "AWAIT"):
name = name.lower()
return f"{name}_var", f"_PyPegen_{name}_token(p)"
return f"{name}_var", f"{name}_rule(p)"
def visit_StringLeaf(self, node: StringLeaf) -> Tuple[str, str]:
val = ast.literal_eval(node.value)
if re.match(r"[a-zA-Z_]\w*\Z", val): # This is a keyword
return self.keyword_helper(val)
else:
assert val in exact_token_types, f"{node.value} is not a known literal"
type = exact_token_types[val]
return "literal", f"_PyPegen_expect_token(p, {type})"
def visit_Rhs(self, node: Rhs) -> Tuple[Optional[str], str]:
if node in self.cache:
return self.cache[node]
if len(node.alts) == 1 and len(node.alts[0].items) == 1:
self.cache[node] = self.visit(node.alts[0].items[0])
else:
name = self.gen.name_node(node)
self.cache[node] = f"{name}_var", f"{name}_rule(p)"
return self.cache[node]
def visit_NamedItem(self, node: NamedItem) -> Tuple[Optional[str], str]:
name, call = self.visit(node.item)
if node.name:
name = node.name
return name, call
def lookahead_call_helper(self, node: Lookahead, positive: int) -> Tuple[None, str]:
name, call = self.visit(node.node)
func, args = call.split("(", 1)
assert args[-1] == ")"
args = args[:-1]
if not args.startswith("p,"):
return None, f"_PyPegen_lookahead({positive}, {func}, {args})"
elif args[2:].strip().isalnum():
return None, f"_PyPegen_lookahead_with_int({positive}, {func}, {args})"
else:
return None, f"_PyPegen_lookahead_with_string({positive}, {func}, {args})"
def visit_PositiveLookahead(self, node: PositiveLookahead) -> Tuple[None, str]:
return self.lookahead_call_helper(node, 1)
def visit_NegativeLookahead(self, node: NegativeLookahead) -> Tuple[None, str]:
return self.lookahead_call_helper(node, 0)
def visit_Opt(self, node: Opt) -> Tuple[str, str]:
name, call = self.visit(node.node)
return "opt_var", f"{call}, 1" # Using comma operator!
def visit_Repeat0(self, node: Repeat0) -> Tuple[str, str]:
if node in self.cache:
return self.cache[node]
name = self.gen.name_loop(node.node, False)
self.cache[node] = f"{name}_var", f"{name}_rule(p)"
return self.cache[node]
def visit_Repeat1(self, node: Repeat1) -> Tuple[str, str]:
if node in self.cache:
return self.cache[node]
name = self.gen.name_loop(node.node, True)
self.cache[node] = f"{name}_var", f"{name}_rule(p)"
return self.cache[node]
def visit_Gather(self, node: Gather) -> Tuple[str, str]:
if node in self.cache:
return self.cache[node]
name = self.gen.name_gather(node)
self.cache[node] = f"{name}_var", f"{name}_rule(p)"
return self.cache[node]
def visit_Group(self, node: Group) -> Tuple[Optional[str], str]:
return self.visit(node.rhs)
def visit_Cut(self, node: Cut) -> Tuple[str, str]:
return "cut_var", "1"
class CParserGenerator(ParserGenerator, GrammarVisitor):
def __init__(
self,
grammar: grammar.Grammar,
file: Optional[IO[Text]],
debug: bool = False,
skip_actions: bool = False,
):
super().__init__(grammar, file)
self.callmakervisitor: CCallMakerVisitor = CCallMakerVisitor(self)
self._varname_counter = 0
self.debug = debug
self.skip_actions = skip_actions
def unique_varname(self, name: str = "tmpvar") -> str:
new_var = name + "_" + str(self._varname_counter)
self._varname_counter += 1
return new_var
def call_with_errorcheck_return(self, call_text: str, returnval: str) -> None:
error_var = self.unique_varname()
self.print(f"int {error_var} = {call_text};")
self.print(f"if ({error_var}) {{")
with self.indent():
self.print(f"return {returnval};")
self.print(f"}}")
def call_with_errorcheck_goto(self, call_text: str, goto_target: str) -> None:
error_var = self.unique_varname()
self.print(f"int {error_var} = {call_text};")
self.print(f"if ({error_var}) {{")
with self.indent():
self.print(f"goto {goto_target};")
self.print(f"}}")
def out_of_memory_return(
self, expr: str, returnval: str, message: str = "Parser out of memory", cleanup_code=None
) -> None:
self.print(f"if ({expr}) {{")
with self.indent():
self.print(f'PyErr_Format(PyExc_MemoryError, "{message}");')
if cleanup_code is not None:
self.print(cleanup_code)
self.print(f"return {returnval};")
self.print(f"}}")
def out_of_memory_goto(
self, expr: str, goto_target: str, message: str = "Parser out of memory"
) -> None:
self.print(f"if ({expr}) {{")
with self.indent():
self.print(f'PyErr_Format(PyExc_MemoryError, "{message}");')
self.print(f"goto {goto_target};")
self.print(f"}}")
def generate(self, filename: str) -> None:
self.collect_todo()
self.print(f"// @generated by pegen.py from {filename}")
header = self.grammar.metas.get("header", EXTENSION_PREFIX)
if header:
self.print(header.rstrip("\n"))
subheader = self.grammar.metas.get("subheader", "")
if subheader:
self.print(subheader)
self._setup_keywords()
for i, (rulename, rule) in enumerate(self.todo.items(), 1000):
comment = " // Left-recursive" if rule.left_recursive else ""
self.print(f"#define {rulename}_type {i}{comment}")
self.print()
for rulename, rule in self.todo.items():
if rule.is_loop() or rule.is_gather():
type = "asdl_seq *"
elif rule.type:
type = rule.type + " "
else:
type = "void *"
self.print(f"static {type}{rulename}_rule(Parser *p);")
self.print()
while self.todo:
for rulename, rule in list(self.todo.items()):
del self.todo[rulename]
self.print()
if rule.left_recursive:
self.print("// Left-recursive")
self.visit(rule)
if self.skip_actions:
mode = 0
else:
mode = int(self.rules["start"].type == "mod_ty") if "start" in self.rules else 1
if mode == 1 and self.grammar.metas.get("bytecode"):
mode += 1
modulename = self.grammar.metas.get("modulename", "parse")
trailer = self.grammar.metas.get("trailer", EXTENSION_SUFFIX)
keyword_cache = self.callmakervisitor.keyword_cache
if trailer:
self.print(trailer.rstrip("\n") % dict(mode=mode, modulename=modulename))
def _group_keywords_by_length(self) -> Dict[int, List[Tuple[str, int]]]:
groups: Dict[int, List[Tuple[str, int]]] = {}
for keyword_str, keyword_type in self.callmakervisitor.keyword_cache.items():
length = len(keyword_str)
if length in groups:
groups[length].append((keyword_str, keyword_type))
else:
groups[length] = [(keyword_str, keyword_type)]
return groups
def _setup_keywords(self) -> None:
keyword_cache = self.callmakervisitor.keyword_cache
n_keyword_lists = (
len(max(keyword_cache.keys(), key=len)) + 1 if len(keyword_cache) > 0 else 0
)
self.print(f"static const int n_keyword_lists = {n_keyword_lists};")
groups = self._group_keywords_by_length()
self.print("static KeywordToken *reserved_keywords[] = {")
with self.indent():
num_groups = max(groups) + 1 if groups else 1
for keywords_length in range(num_groups):
if keywords_length not in groups.keys():
self.print("NULL,")
else:
self.print("(KeywordToken[]) {")
with self.indent():
for keyword_str, keyword_type in groups[keywords_length]:
self.print(f'{{"{keyword_str}", {keyword_type}}},')
self.print("{NULL, -1},")
self.print("},")
self.print("};")
def _set_up_token_start_metadata_extraction(self) -> None:
self.print("if (p->mark == p->fill && _PyPegen_fill_token(p) < 0) {")
with self.indent():
self.print("p->error_indicator = 1;")
self.print("return NULL;")
self.print("}")
self.print("int start_lineno = p->tokens[mark]->lineno;")
self.print("UNUSED(start_lineno); // Only used by EXTRA macro")
self.print("int start_col_offset = p->tokens[mark]->col_offset;")
self.print("UNUSED(start_col_offset); // Only used by EXTRA macro")
def _set_up_token_end_metadata_extraction(self) -> None:
self.print("Token *token = _PyPegen_get_last_nonnwhitespace_token(p);")
self.print("if (token == NULL) {")
with self.indent():
self.print("return NULL;")
self.print("}")
self.print(f"int end_lineno = token->end_lineno;")
self.print("UNUSED(end_lineno); // Only used by EXTRA macro")
self.print(f"int end_col_offset = token->end_col_offset;")
self.print("UNUSED(end_col_offset); // Only used by EXTRA macro")
def _set_up_rule_memoization(self, node: Rule, result_type: str) -> None:
self.print("{")
with self.indent():
self.print(f"{result_type} res = NULL;")
self.print(f"if (_PyPegen_is_memoized(p, {node.name}_type, &res))")
with self.indent():
self.print("return res;")
self.print("int mark = p->mark;")
self.print("int resmark = p->mark;")
self.print("while (1) {")
with self.indent():
self.call_with_errorcheck_return(
f"_PyPegen_update_memo(p, mark, {node.name}_type, res)", "res"
)
self.print("p->mark = mark;")
self.print(f"void *raw = {node.name}_raw(p);")
self.print("if (raw == NULL || p->mark <= resmark)")
with self.indent():
self.print("break;")
self.print("resmark = p->mark;")
self.print("res = raw;")
self.print("}")
self.print("p->mark = resmark;")
self.print("return res;")
self.print("}")
self.print(f"static {result_type}")
self.print(f"{node.name}_raw(Parser *p)")
def _should_memoize(self, node: Rule) -> bool:
return node.memo and not node.left_recursive
def _handle_default_rule_body(self, node: Rule, rhs: Rhs, result_type: str) -> None:
memoize = self._should_memoize(node)
with self.indent():
self.print("if (p->error_indicator) {")
with self.indent():
self.print("return NULL;")
self.print("}")
self.print(f"{result_type} res = NULL;")
if memoize:
self.print(f"if (_PyPegen_is_memoized(p, {node.name}_type, &res))")
with self.indent():
self.print("return res;")
self.print("int mark = p->mark;")
if any(alt.action and "EXTRA" in alt.action for alt in rhs.alts):
self._set_up_token_start_metadata_extraction()
self.visit(
rhs,
is_loop=False,
is_gather=node.is_gather(),
rulename=node.name if memoize else None,
)
if self.debug:
self.print(f'fprintf(stderr, "Fail at %d: {node.name}\\n", p->mark);')
self.print("res = NULL;")
self.print(" done:")
with self.indent():
if memoize:
self.print(f"_PyPegen_insert_memo(p, mark, {node.name}_type, res);")
self.print("return res;")
def _handle_loop_rule_body(self, node: Rule, rhs: Rhs) -> None:
memoize = self._should_memoize(node)
is_repeat1 = node.name.startswith("_loop1")
with self.indent():
self.print("if (p->error_indicator) {")
with self.indent():
self.print("return NULL;")
self.print("}")
self.print(f"void *res = NULL;")
if memoize:
self.print(f"if (_PyPegen_is_memoized(p, {node.name}_type, &res))")
with self.indent():
self.print("return res;")
self.print("int mark = p->mark;")
self.print("int start_mark = p->mark;")
self.print("void **children = PyMem_Malloc(sizeof(void *));")
self.out_of_memory_return(f"!children", "NULL")
self.print("ssize_t children_capacity = 1;")
self.print("ssize_t n = 0;")
if any(alt.action and "EXTRA" in alt.action for alt in rhs.alts):
self._set_up_token_start_metadata_extraction()
self.visit(
rhs,
is_loop=True,
is_gather=node.is_gather(),
rulename=node.name if memoize else None,
)
if is_repeat1:
self.print("if (n == 0) {")
with self.indent():
self.print("PyMem_Free(children);")
self.print("return NULL;")
self.print("}")
self.print("asdl_seq *seq = _Py_asdl_seq_new(n, p->arena);")
self.out_of_memory_return(
f"!seq",
"NULL",
message=f"asdl_seq_new {node.name}",
cleanup_code="PyMem_Free(children);",
)
self.print("for (int i = 0; i < n; i++) asdl_seq_SET(seq, i, children[i]);")
self.print("PyMem_Free(children);")
if node.name:
self.print(f"_PyPegen_insert_memo(p, start_mark, {node.name}_type, seq);")
self.print("return seq;")
def visit_Rule(self, node: Rule) -> None:
is_loop = node.is_loop()
is_gather = node.is_gather()
rhs = node.flatten()
if is_loop or is_gather:
result_type = "asdl_seq *"
elif node.type:
result_type = node.type
else:
result_type = "void *"
for line in str(node).splitlines():
self.print(f"// {line}")
if node.left_recursive and node.leader:
self.print(f"static {result_type} {node.name}_raw(Parser *);")
self.print(f"static {result_type}")
self.print(f"{node.name}_rule(Parser *p)")
if node.left_recursive and node.leader:
self._set_up_rule_memoization(node, result_type)
self.print("{")
if is_loop:
self._handle_loop_rule_body(node, rhs)
else:
self._handle_default_rule_body(node, rhs, result_type)
self.print("}")
def visit_NamedItem(self, node: NamedItem, names: List[str]) -> None:
name, call = self.callmakervisitor.visit(node)
if not name:
self.print(call)
else:
name = dedupe(name, names)
self.print(f"({name} = {call})")
def visit_Rhs(
self, node: Rhs, is_loop: bool, is_gather: bool, rulename: Optional[str]
) -> None:
if is_loop:
assert len(node.alts) == 1
for alt in node.alts:
self.visit(alt, is_loop=is_loop, is_gather=is_gather, rulename=rulename)
def join_conditions(self, keyword: str, node: Any, names: List[str]) -> None:
self.print(f"{keyword} (")
with self.indent():
first = True
for item in node.items:
if first:
first = False
else:
self.print("&&")
self.visit(item, names=names)
self.print(")")
def emit_action(self, node: Alt, cleanup_code=None) -> None:
self.print(f"res = {node.action};")
self.print("if (res == NULL && PyErr_Occurred()) {")
with self.indent():
self.print("p->error_indicator = 1;")
if cleanup_code:
self.print(cleanup_code)
self.print("return NULL;")
self.print("}")
if self.debug:
self.print(
f'fprintf(stderr, "Hit with action [%d-%d]: %s\\n", mark, p->mark, "{node}");'
)
def emit_default_action(self, is_gather: bool, names: List[str], node: Alt) -> None:
if len(names) > 1:
if is_gather:
assert len(names) == 2
self.print(f"res = _PyPegen_seq_insert_in_front(p, {names[0]}, {names[1]});")
else:
if self.debug:
self.print(
f'fprintf(stderr, "Hit without action [%d:%d]: %s\\n", mark, p->mark, "{node}");'
)
self.print(f"res = _PyPegen_dummy_name(p, {', '.join(names)});")
else:
if self.debug:
self.print(
f'fprintf(stderr, "Hit with default action [%d:%d]: %s\\n", mark, p->mark, "{node}");'
)
self.print(f"res = {names[0]};")
def emit_dummy_action(self) -> None:
self.print(f"res = _PyPegen_dummy_name(p);")
def handle_alt_normal(self, node: Alt, is_gather: bool, names: List[str]) -> None:
self.join_conditions(keyword="if", node=node, names=names)
self.print("{")
# We have parsed successfully all the conditions for the option.
with self.indent():
# Prepare to emmit the rule action and do so
if node.action and "EXTRA" in node.action:
self._set_up_token_end_metadata_extraction()
if self.skip_actions:
self.emit_dummy_action()
elif node.action:
self.emit_action(node)
else:
self.emit_default_action(is_gather, names, node)
# As the current option has parsed correctly, do not continue with the rest.
self.print(f"goto done;")
self.print("}")
def handle_alt_loop(
self, node: Alt, is_gather: bool, rulename: Optional[str], names: List[str]
) -> None:
# Condition of the main body of the alternative
self.join_conditions(keyword="while", node=node, names=names)
self.print("{")
# We have parsed successfully one item!
with self.indent():
# Prepare to emit the rule action and do so
if node.action and "EXTRA" in node.action:
self._set_up_token_end_metadata_extraction()
if self.skip_actions:
self.emit_dummy_action()
elif node.action:
self.emit_action(node, cleanup_code="PyMem_Free(children);")
else:
self.emit_default_action(is_gather, names, node)
# Add the result of rule to the temporary buffer of children. This buffer
# will populate later an asdl_seq with all elements to return.
self.print("if (n == children_capacity) {")
with self.indent():
self.print("children_capacity *= 2;")
self.print("children = PyMem_Realloc(children, children_capacity*sizeof(void *));")
self.out_of_memory_return(f"!children", "NULL", message=f"realloc {rulename}")
self.print("}")
self.print(f"children[n++] = res;")
self.print("mark = p->mark;")
self.print("}")
def visit_Alt(
self, node: Alt, is_loop: bool, is_gather: bool, rulename: Optional[str]
) -> None:
self.print(f"{{ // {node}")
with self.indent():
# Prepare variable declarations for the alternative
vars = self.collect_vars(node)
for v, var_type in sorted(item for item in vars.items() if item[0] is not None):
if not var_type:
var_type = "void *"
else:
var_type += " "
if v == "cut_var":
v += " = 0" # cut_var must be initialized
self.print(f"{var_type}{v};")
if v == "opt_var":
self.print("UNUSED(opt_var); // Silence compiler warnings")
names: List[str] = []
if is_loop:
self.handle_alt_loop(node, is_gather, rulename, names)
else:
self.handle_alt_normal(node, is_gather, names)
self.print("p->mark = mark;")
if "cut_var" in names:
self.print("if (cut_var) return NULL;")
self.print("}")
def collect_vars(self, node: Alt) -> Dict[str, Optional[str]]:
names: List[str] = []
types = {}
for item in node.items:
name, type = self.add_var(item, names)
types[name] = type
return types
def add_var(self, node: NamedItem, names: List[str]) -> Tuple[str, Optional[str]]:
name: str
call: str
name, call = self.callmakervisitor.visit(node.item)
type = None
if not name:
return name, type
if name.startswith("cut"):
return name, "int"
if name.endswith("_var"):
rulename = name[:-4]
rule = self.rules.get(rulename)
if rule is not None:
if rule.is_loop() or rule.is_gather():
type = "asdl_seq *"
else:
type = rule.type
elif name.startswith("_loop") or name.startswith("_gather"):
type = "asdl_seq *"
elif name in ("name_var", "string_var", "number_var"):
type = "expr_ty"
if node.name:
name = node.name
name = dedupe(name, names)
return name, type

View file

@ -0,0 +1,153 @@
#!/usr/bin/env python3.8
import argparse
import collections
import pprint
import sys
from typing import Optional, Set, Dict
from pegen.build import build_parser
from pegen.grammar import (
Alt,
Cut,
Gather,
Grammar,
GrammarVisitor,
Group,
Leaf,
Lookahead,
NamedItem,
NameLeaf,
NegativeLookahead,
Opt,
Repeat,
Repeat0,
Repeat1,
Rhs,
Rule,
StringLeaf,
PositiveLookahead,
)
argparser = argparse.ArgumentParser(
prog="calculate_first_sets", description="Calculate the first sets of a grammar",
)
argparser.add_argument("grammar_file", help="The grammar file")
class FirstSetCalculator(GrammarVisitor):
def __init__(self, rules: Dict[str, Rule]) -> None:
self.rules = rules
for rule in rules.values():
rule.nullable_visit(rules)
self.first_sets: Dict[str, Set[str]] = dict()
self.in_process: Set[str] = set()
def calculate(self) -> Dict[str, Set[str]]:
for name, rule in self.rules.items():
self.visit(rule)
return self.first_sets
def visit_Alt(self, item: Alt) -> Set[str]:
result: Set[str] = set()
to_remove: Set[str] = set()
for other in item.items:
new_terminals = self.visit(other)
if isinstance(other.item, NegativeLookahead):
to_remove |= new_terminals
result |= new_terminals
if to_remove:
result -= to_remove
# If the set of new terminals can start with the empty string,
# it means that the item is completelly nullable and we should
# also considering at least the next item in case the current
# one fails to parse.
if "" in new_terminals:
continue
if not isinstance(other.item, (Opt, NegativeLookahead, Repeat0)):
break
# Do not allow the empty string to propagate.
result.discard("")
return result
def visit_Cut(self, item: Cut) -> Set[str]:
return set()
def visit_Group(self, item: Group) -> Set[str]:
return self.visit(item.rhs)
def visit_PositiveLookahead(self, item: Lookahead) -> Set[str]:
return self.visit(item.node)
def visit_NegativeLookahead(self, item: NegativeLookahead) -> Set[str]:
return self.visit(item.node)
def visit_NamedItem(self, item: NamedItem) -> Set[str]:
return self.visit(item.item)
def visit_Opt(self, item: Opt) -> Set[str]:
return self.visit(item.node)
def visit_Gather(self, item: Gather) -> Set[str]:
return self.visit(item.node)
def visit_Repeat0(self, item: Repeat0) -> Set[str]:
return self.visit(item.node)
def visit_Repeat1(self, item: Repeat1) -> Set[str]:
return self.visit(item.node)
def visit_NameLeaf(self, item: NameLeaf) -> Set[str]:
if item.value not in self.rules:
return {item.value}
if item.value not in self.first_sets:
self.first_sets[item.value] = self.visit(self.rules[item.value])
return self.first_sets[item.value]
elif item.value in self.in_process:
return set()
return self.first_sets[item.value]
def visit_StringLeaf(self, item: StringLeaf) -> Set[str]:
return {item.value}
def visit_Rhs(self, item: Rhs) -> Set[str]:
result: Set[str] = set()
for alt in item.alts:
result |= self.visit(alt)
return result
def visit_Rule(self, item: Rule) -> Set[str]:
if item.name in self.in_process:
return set()
elif item.name not in self.first_sets:
self.in_process.add(item.name)
terminals = self.visit(item.rhs)
if item.nullable:
terminals.add("")
self.first_sets[item.name] = terminals
self.in_process.remove(item.name)
return self.first_sets[item.name]
def main() -> None:
args = argparser.parse_args()
try:
grammar, parser, tokenizer = build_parser(args.grammar_file)
except Exception as err:
print("ERROR: Failed to parse grammar file", file=sys.stderr)
sys.exit(1)
firs_sets = FirstSetCalculator(grammar.rules).calculate()
pprint.pprint(firs_sets)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,470 @@
from __future__ import annotations
from abc import abstractmethod
from typing import (
AbstractSet,
Any,
Callable,
Dict,
Iterable,
Iterator,
List,
Optional,
Set,
Tuple,
TYPE_CHECKING,
TypeVar,
Union,
)
from pegen.parser import memoize, Parser
if TYPE_CHECKING:
from pegen.parser_generator import ParserGenerator
class GrammarError(Exception):
pass
class GrammarVisitor:
def visit(self, node: Any, *args: Any, **kwargs: Any) -> Any:
"""Visit a node."""
method = "visit_" + node.__class__.__name__
visitor = getattr(self, method, self.generic_visit)
return visitor(node, *args, **kwargs)
def generic_visit(self, node: Iterable[Any], *args: Any, **kwargs: Any) -> None:
"""Called if no explicit visitor function exists for a node."""
for value in node:
if isinstance(value, list):
for item in value:
self.visit(item, *args, **kwargs)
else:
self.visit(value, *args, **kwargs)
class Grammar:
def __init__(self, rules: Iterable[Rule], metas: Iterable[Tuple[str, Optional[str]]]):
self.rules = {rule.name: rule for rule in rules}
self.metas = dict(metas)
def __str__(self) -> str:
return "\n".join(str(rule) for name, rule in self.rules.items())
def __repr__(self) -> str:
lines = ["Grammar("]
lines.append(" [")
for rule in self.rules.values():
lines.append(f" {repr(rule)},")
lines.append(" ],")
lines.append(" {repr(list(self.metas.items()))}")
lines.append(")")
return "\n".join(lines)
def __iter__(self) -> Iterator[Rule]:
yield from self.rules.values()
# Global flag whether we want actions in __str__() -- default off.
SIMPLE_STR = True
class Rule:
def __init__(self, name: str, type: Optional[str], rhs: Rhs, memo: Optional[object] = None):
self.name = name
self.type = type
self.rhs = rhs
self.memo = bool(memo)
self.visited = False
self.nullable = False
self.left_recursive = False
self.leader = False
def is_loop(self) -> bool:
return self.name.startswith("_loop")
def is_gather(self) -> bool:
return self.name.startswith("_gather")
def __str__(self) -> str:
if SIMPLE_STR or self.type is None:
res = f"{self.name}: {self.rhs}"
else:
res = f"{self.name}[{self.type}]: {self.rhs}"
if len(res) < 88:
return res
lines = [res.split(":")[0] + ":"]
lines += [f" | {alt}" for alt in self.rhs.alts]
return "\n".join(lines)
def __repr__(self) -> str:
return f"Rule({self.name!r}, {self.type!r}, {self.rhs!r})"
def __iter__(self) -> Iterator[Rhs]:
yield self.rhs
def nullable_visit(self, rules: Dict[str, Rule]) -> bool:
if self.visited:
# A left-recursive rule is considered non-nullable.
return False
self.visited = True
self.nullable = self.rhs.nullable_visit(rules)
return self.nullable
def initial_names(self) -> AbstractSet[str]:
return self.rhs.initial_names()
def flatten(self) -> Rhs:
# If it's a single parenthesized group, flatten it.
rhs = self.rhs
if (
not self.is_loop()
and len(rhs.alts) == 1
and len(rhs.alts[0].items) == 1
and isinstance(rhs.alts[0].items[0].item, Group)
):
rhs = rhs.alts[0].items[0].item.rhs
return rhs
def collect_todo(self, gen: ParserGenerator) -> None:
rhs = self.flatten()
rhs.collect_todo(gen)
class Leaf:
def __init__(self, value: str):
self.value = value
def __str__(self) -> str:
return self.value
def __iter__(self) -> Iterable[str]:
if False:
yield
@abstractmethod
def nullable_visit(self, rules: Dict[str, Rule]) -> bool:
raise NotImplementedError
@abstractmethod
def initial_names(self) -> AbstractSet[str]:
raise NotImplementedError
class NameLeaf(Leaf):
"""The value is the name."""
def __str__(self) -> str:
if self.value == "ENDMARKER":
return "$"
return super().__str__()
def __repr__(self) -> str:
return f"NameLeaf({self.value!r})"
def nullable_visit(self, rules: Dict[str, Rule]) -> bool:
if self.value in rules:
return rules[self.value].nullable_visit(rules)
# Token or unknown; never empty.
return False
def initial_names(self) -> AbstractSet[str]:
return {self.value}
class StringLeaf(Leaf):
"""The value is a string literal, including quotes."""
def __repr__(self) -> str:
return f"StringLeaf({self.value!r})"
def nullable_visit(self, rules: Dict[str, Rule]) -> bool:
# The string token '' is considered empty.
return not self.value
def initial_names(self) -> AbstractSet[str]:
return set()
class Rhs:
def __init__(self, alts: List[Alt]):
self.alts = alts
self.memo: Optional[Tuple[Optional[str], str]] = None
def __str__(self) -> str:
return " | ".join(str(alt) for alt in self.alts)
def __repr__(self) -> str:
return f"Rhs({self.alts!r})"
def __iter__(self) -> Iterator[List[Alt]]:
yield self.alts
def nullable_visit(self, rules: Dict[str, Rule]) -> bool:
for alt in self.alts:
if alt.nullable_visit(rules):
return True
return False
def initial_names(self) -> AbstractSet[str]:
names: Set[str] = set()
for alt in self.alts:
names |= alt.initial_names()
return names
def collect_todo(self, gen: ParserGenerator) -> None:
for alt in self.alts:
alt.collect_todo(gen)
class Alt:
def __init__(self, items: List[NamedItem], *, icut: int = -1, action: Optional[str] = None):
self.items = items
self.icut = icut
self.action = action
def __str__(self) -> str:
core = " ".join(str(item) for item in self.items)
if not SIMPLE_STR and self.action:
return f"{core} {{ {self.action} }}"
else:
return core
def __repr__(self) -> str:
args = [repr(self.items)]
if self.icut >= 0:
args.append(f"icut={self.icut}")
if self.action:
args.append(f"action={self.action!r}")
return f"Alt({', '.join(args)})"
def __iter__(self) -> Iterator[List[NamedItem]]:
yield self.items
def nullable_visit(self, rules: Dict[str, Rule]) -> bool:
for item in self.items:
if not item.nullable_visit(rules):
return False
return True
def initial_names(self) -> AbstractSet[str]:
names: Set[str] = set()
for item in self.items:
names |= item.initial_names()
if not item.nullable:
break
return names
def collect_todo(self, gen: ParserGenerator) -> None:
for item in self.items:
item.collect_todo(gen)
class NamedItem:
def __init__(self, name: Optional[str], item: Item):
self.name = name
self.item = item
self.nullable = False
def __str__(self) -> str:
if not SIMPLE_STR and self.name:
return f"{self.name}={self.item}"
else:
return str(self.item)
def __repr__(self) -> str:
return f"NamedItem({self.name!r}, {self.item!r})"
def __iter__(self) -> Iterator[Item]:
yield self.item
def nullable_visit(self, rules: Dict[str, Rule]) -> bool:
self.nullable = self.item.nullable_visit(rules)
return self.nullable
def initial_names(self) -> AbstractSet[str]:
return self.item.initial_names()
def collect_todo(self, gen: ParserGenerator) -> None:
gen.callmakervisitor.visit(self.item)
class Lookahead:
def __init__(self, node: Plain, sign: str):
self.node = node
self.sign = sign
def __str__(self) -> str:
return f"{self.sign}{self.node}"
def __iter__(self) -> Iterator[Plain]:
yield self.node
def nullable_visit(self, rules: Dict[str, Rule]) -> bool:
return True
def initial_names(self) -> AbstractSet[str]:
return set()
class PositiveLookahead(Lookahead):
def __init__(self, node: Plain):
super().__init__(node, "&")
def __repr__(self) -> str:
return f"PositiveLookahead({self.node!r})"
class NegativeLookahead(Lookahead):
def __init__(self, node: Plain):
super().__init__(node, "!")
def __repr__(self) -> str:
return f"NegativeLookahead({self.node!r})"
class Opt:
def __init__(self, node: Item):
self.node = node
def __str__(self) -> str:
s = str(self.node)
# TODO: Decide whether to use [X] or X? based on type of X
if " " in s:
return f"[{s}]"
else:
return f"{s}?"
def __repr__(self) -> str:
return f"Opt({self.node!r})"
def __iter__(self) -> Iterator[Item]:
yield self.node
def nullable_visit(self, rules: Dict[str, Rule]) -> bool:
return True
def initial_names(self) -> AbstractSet[str]:
return self.node.initial_names()
class Repeat:
"""Shared base class for x* and x+."""
def __init__(self, node: Plain):
self.node = node
self.memo: Optional[Tuple[Optional[str], str]] = None
@abstractmethod
def nullable_visit(self, rules: Dict[str, Rule]) -> bool:
raise NotImplementedError
def __iter__(self) -> Iterator[Plain]:
yield self.node
def initial_names(self) -> AbstractSet[str]:
return self.node.initial_names()
class Repeat0(Repeat):
def __str__(self) -> str:
s = str(self.node)
# TODO: Decide whether to use (X)* or X* based on type of X
if " " in s:
return f"({s})*"
else:
return f"{s}*"
def __repr__(self) -> str:
return f"Repeat0({self.node!r})"
def nullable_visit(self, rules: Dict[str, Rule]) -> bool:
return True
class Repeat1(Repeat):
def __str__(self) -> str:
s = str(self.node)
# TODO: Decide whether to use (X)+ or X+ based on type of X
if " " in s:
return f"({s})+"
else:
return f"{s}+"
def __repr__(self) -> str:
return f"Repeat1({self.node!r})"
def nullable_visit(self, rules: Dict[str, Rule]) -> bool:
return False
class Gather(Repeat):
def __init__(self, separator: Plain, node: Plain):
self.separator = separator
self.node = node
def __str__(self) -> str:
return f"{self.separator!s}.{self.node!s}+"
def __repr__(self) -> str:
return f"Gather({self.separator!r}, {self.node!r})"
def nullable_visit(self, rules: Dict[str, Rule]) -> bool:
return False
class Group:
def __init__(self, rhs: Rhs):
self.rhs = rhs
def __str__(self) -> str:
return f"({self.rhs})"
def __repr__(self) -> str:
return f"Group({self.rhs!r})"
def __iter__(self) -> Iterator[Rhs]:
yield self.rhs
def nullable_visit(self, rules: Dict[str, Rule]) -> bool:
return self.rhs.nullable_visit(rules)
def initial_names(self) -> AbstractSet[str]:
return self.rhs.initial_names()
class Cut:
def __init__(self) -> None:
pass
def __repr__(self) -> str:
return f"Cut()"
def __str__(self) -> str:
return f"~"
def __iter__(self) -> Iterator[Tuple[str, str]]:
if False:
yield
def __eq__(self, other: object) -> bool:
if not isinstance(other, Cut):
return NotImplemented
return True
def nullable_visit(self, rules: Dict[str, Rule]) -> bool:
return True
def initial_names(self) -> AbstractSet[str]:
return set()
Plain = Union[Leaf, Group]
Item = Union[Plain, Opt, Repeat, Lookahead, Rhs, Cut]
RuleName = Tuple[str, str]
MetaTuple = Tuple[str, Optional[str]]
MetaList = List[MetaTuple]
RuleList = List[Rule]
NamedItemList = List[NamedItem]
LookaheadOrCut = Union[Lookahead, Cut]

View file

@ -0,0 +1,677 @@
#!/usr/bin/env python3.8
# @generated by pegen from pegen/metagrammar.gram
import ast
import sys
import tokenize
from typing import Any, Optional
from pegen.parser import memoize, memoize_left_rec, logger, Parser
from ast import literal_eval
from pegen.grammar import (
Alt,
Cut,
Gather,
Group,
Item,
Lookahead,
LookaheadOrCut,
MetaTuple,
MetaList,
NameLeaf,
NamedItem,
NamedItemList,
NegativeLookahead,
Opt,
Plain,
PositiveLookahead,
Repeat0,
Repeat1,
Rhs,
Rule,
RuleList,
RuleName,
Grammar,
StringLeaf,
)
class GeneratedParser(Parser):
@memoize
def start(self) -> Optional[Grammar]:
# start: grammar $
mark = self.mark()
cut = False
if (
(grammar := self.grammar())
and
(endmarker := self.expect('ENDMARKER'))
):
return grammar
self.reset(mark)
if cut: return None
return None
@memoize
def grammar(self) -> Optional[Grammar]:
# grammar: metas rules | rules
mark = self.mark()
cut = False
if (
(metas := self.metas())
and
(rules := self.rules())
):
return Grammar ( rules , metas )
self.reset(mark)
if cut: return None
cut = False
if (
(rules := self.rules())
):
return Grammar ( rules , [ ] )
self.reset(mark)
if cut: return None
return None
@memoize
def metas(self) -> Optional[MetaList]:
# metas: meta metas | meta
mark = self.mark()
cut = False
if (
(meta := self.meta())
and
(metas := self.metas())
):
return [ meta ] + metas
self.reset(mark)
if cut: return None
cut = False
if (
(meta := self.meta())
):
return [ meta ]
self.reset(mark)
if cut: return None
return None
@memoize
def meta(self) -> Optional[MetaTuple]:
# meta: "@" NAME NEWLINE | "@" NAME NAME NEWLINE | "@" NAME STRING NEWLINE
mark = self.mark()
cut = False
if (
(literal := self.expect("@"))
and
(name := self.name())
and
(newline := self.expect('NEWLINE'))
):
return ( name . string , None )
self.reset(mark)
if cut: return None
cut = False
if (
(literal := self.expect("@"))
and
(a := self.name())
and
(b := self.name())
and
(newline := self.expect('NEWLINE'))
):
return ( a . string , b . string )
self.reset(mark)
if cut: return None
cut = False
if (
(literal := self.expect("@"))
and
(name := self.name())
and
(string := self.string())
and
(newline := self.expect('NEWLINE'))
):
return ( name . string , literal_eval ( string . string ) )
self.reset(mark)
if cut: return None
return None
@memoize
def rules(self) -> Optional[RuleList]:
# rules: rule rules | rule
mark = self.mark()
cut = False
if (
(rule := self.rule())
and
(rules := self.rules())
):
return [ rule ] + rules
self.reset(mark)
if cut: return None
cut = False
if (
(rule := self.rule())
):
return [ rule ]
self.reset(mark)
if cut: return None
return None
@memoize
def rule(self) -> Optional[Rule]:
# rule: rulename memoflag? ":" alts NEWLINE INDENT more_alts DEDENT | rulename memoflag? ":" NEWLINE INDENT more_alts DEDENT | rulename memoflag? ":" alts NEWLINE
mark = self.mark()
cut = False
if (
(rulename := self.rulename())
and
(opt := self.memoflag(),)
and
(literal := self.expect(":"))
and
(alts := self.alts())
and
(newline := self.expect('NEWLINE'))
and
(indent := self.expect('INDENT'))
and
(more_alts := self.more_alts())
and
(dedent := self.expect('DEDENT'))
):
return Rule ( rulename [ 0 ] , rulename [ 1 ] , Rhs ( alts . alts + more_alts . alts ) , memo = opt )
self.reset(mark)
if cut: return None
cut = False
if (
(rulename := self.rulename())
and
(opt := self.memoflag(),)
and
(literal := self.expect(":"))
and
(newline := self.expect('NEWLINE'))
and
(indent := self.expect('INDENT'))
and
(more_alts := self.more_alts())
and
(dedent := self.expect('DEDENT'))
):
return Rule ( rulename [ 0 ] , rulename [ 1 ] , more_alts , memo = opt )
self.reset(mark)
if cut: return None
cut = False
if (
(rulename := self.rulename())
and
(opt := self.memoflag(),)
and
(literal := self.expect(":"))
and
(alts := self.alts())
and
(newline := self.expect('NEWLINE'))
):
return Rule ( rulename [ 0 ] , rulename [ 1 ] , alts , memo = opt )
self.reset(mark)
if cut: return None
return None
@memoize
def rulename(self) -> Optional[RuleName]:
# rulename: NAME '[' NAME '*' ']' | NAME '[' NAME ']' | NAME
mark = self.mark()
cut = False
if (
(name := self.name())
and
(literal := self.expect('['))
and
(type := self.name())
and
(literal_1 := self.expect('*'))
and
(literal_2 := self.expect(']'))
):
return ( name . string , type . string + "*" )
self.reset(mark)
if cut: return None
cut = False
if (
(name := self.name())
and
(literal := self.expect('['))
and
(type := self.name())
and
(literal_1 := self.expect(']'))
):
return ( name . string , type . string )
self.reset(mark)
if cut: return None
cut = False
if (
(name := self.name())
):
return ( name . string , None )
self.reset(mark)
if cut: return None
return None
@memoize
def memoflag(self) -> Optional[str]:
# memoflag: '(' 'memo' ')'
mark = self.mark()
cut = False
if (
(literal := self.expect('('))
and
(literal_1 := self.expect('memo'))
and
(literal_2 := self.expect(')'))
):
return "memo"
self.reset(mark)
if cut: return None
return None
@memoize
def alts(self) -> Optional[Rhs]:
# alts: alt "|" alts | alt
mark = self.mark()
cut = False
if (
(alt := self.alt())
and
(literal := self.expect("|"))
and
(alts := self.alts())
):
return Rhs ( [ alt ] + alts . alts )
self.reset(mark)
if cut: return None
cut = False
if (
(alt := self.alt())
):
return Rhs ( [ alt ] )
self.reset(mark)
if cut: return None
return None
@memoize
def more_alts(self) -> Optional[Rhs]:
# more_alts: "|" alts NEWLINE more_alts | "|" alts NEWLINE
mark = self.mark()
cut = False
if (
(literal := self.expect("|"))
and
(alts := self.alts())
and
(newline := self.expect('NEWLINE'))
and
(more_alts := self.more_alts())
):
return Rhs ( alts . alts + more_alts . alts )
self.reset(mark)
if cut: return None
cut = False
if (
(literal := self.expect("|"))
and
(alts := self.alts())
and
(newline := self.expect('NEWLINE'))
):
return Rhs ( alts . alts )
self.reset(mark)
if cut: return None
return None
@memoize
def alt(self) -> Optional[Alt]:
# alt: items '$' action | items '$' | items action | items
mark = self.mark()
cut = False
if (
(items := self.items())
and
(literal := self.expect('$'))
and
(action := self.action())
):
return Alt ( items + [ NamedItem ( None , NameLeaf ( 'ENDMARKER' ) ) ] , action = action )
self.reset(mark)
if cut: return None
cut = False
if (
(items := self.items())
and
(literal := self.expect('$'))
):
return Alt ( items + [ NamedItem ( None , NameLeaf ( 'ENDMARKER' ) ) ] , action = None )
self.reset(mark)
if cut: return None
cut = False
if (
(items := self.items())
and
(action := self.action())
):
return Alt ( items , action = action )
self.reset(mark)
if cut: return None
cut = False
if (
(items := self.items())
):
return Alt ( items , action = None )
self.reset(mark)
if cut: return None
return None
@memoize
def items(self) -> Optional[NamedItemList]:
# items: named_item items | named_item
mark = self.mark()
cut = False
if (
(named_item := self.named_item())
and
(items := self.items())
):
return [ named_item ] + items
self.reset(mark)
if cut: return None
cut = False
if (
(named_item := self.named_item())
):
return [ named_item ]
self.reset(mark)
if cut: return None
return None
@memoize
def named_item(self) -> Optional[NamedItem]:
# named_item: NAME '=' ~ item | item | lookahead
mark = self.mark()
cut = False
if (
(name := self.name())
and
(literal := self.expect('='))
and
(cut := True)
and
(item := self.item())
):
return NamedItem ( name . string , item )
self.reset(mark)
if cut: return None
cut = False
if (
(item := self.item())
):
return NamedItem ( None , item )
self.reset(mark)
if cut: return None
cut = False
if (
(it := self.lookahead())
):
return NamedItem ( None , it )
self.reset(mark)
if cut: return None
return None
@memoize
def lookahead(self) -> Optional[LookaheadOrCut]:
# lookahead: '&' ~ atom | '!' ~ atom | '~'
mark = self.mark()
cut = False
if (
(literal := self.expect('&'))
and
(cut := True)
and
(atom := self.atom())
):
return PositiveLookahead ( atom )
self.reset(mark)
if cut: return None
cut = False
if (
(literal := self.expect('!'))
and
(cut := True)
and
(atom := self.atom())
):
return NegativeLookahead ( atom )
self.reset(mark)
if cut: return None
cut = False
if (
(literal := self.expect('~'))
):
return Cut ( )
self.reset(mark)
if cut: return None
return None
@memoize
def item(self) -> Optional[Item]:
# item: '[' ~ alts ']' | atom '?' | atom '*' | atom '+' | atom '.' atom '+' | atom
mark = self.mark()
cut = False
if (
(literal := self.expect('['))
and
(cut := True)
and
(alts := self.alts())
and
(literal_1 := self.expect(']'))
):
return Opt ( alts )
self.reset(mark)
if cut: return None
cut = False
if (
(atom := self.atom())
and
(literal := self.expect('?'))
):
return Opt ( atom )
self.reset(mark)
if cut: return None
cut = False
if (
(atom := self.atom())
and
(literal := self.expect('*'))
):
return Repeat0 ( atom )
self.reset(mark)
if cut: return None
cut = False
if (
(atom := self.atom())
and
(literal := self.expect('+'))
):
return Repeat1 ( atom )
self.reset(mark)
if cut: return None
cut = False
if (
(sep := self.atom())
and
(literal := self.expect('.'))
and
(node := self.atom())
and
(literal_1 := self.expect('+'))
):
return Gather ( sep , node )
self.reset(mark)
if cut: return None
cut = False
if (
(atom := self.atom())
):
return atom
self.reset(mark)
if cut: return None
return None
@memoize
def atom(self) -> Optional[Plain]:
# atom: '(' ~ alts ')' | NAME | STRING
mark = self.mark()
cut = False
if (
(literal := self.expect('('))
and
(cut := True)
and
(alts := self.alts())
and
(literal_1 := self.expect(')'))
):
return Group ( alts )
self.reset(mark)
if cut: return None
cut = False
if (
(name := self.name())
):
return NameLeaf ( name . string )
self.reset(mark)
if cut: return None
cut = False
if (
(string := self.string())
):
return StringLeaf ( string . string )
self.reset(mark)
if cut: return None
return None
@memoize
def action(self) -> Optional[str]:
# action: "{" ~ target_atoms "}"
mark = self.mark()
cut = False
if (
(literal := self.expect("{"))
and
(cut := True)
and
(target_atoms := self.target_atoms())
and
(literal_1 := self.expect("}"))
):
return target_atoms
self.reset(mark)
if cut: return None
return None
@memoize
def target_atoms(self) -> Optional[str]:
# target_atoms: target_atom target_atoms | target_atom
mark = self.mark()
cut = False
if (
(target_atom := self.target_atom())
and
(target_atoms := self.target_atoms())
):
return target_atom + " " + target_atoms
self.reset(mark)
if cut: return None
cut = False
if (
(target_atom := self.target_atom())
):
return target_atom
self.reset(mark)
if cut: return None
return None
@memoize
def target_atom(self) -> Optional[str]:
# target_atom: "{" ~ target_atoms "}" | NAME | NUMBER | STRING | "?" | ":" | !"}" OP
mark = self.mark()
cut = False
if (
(literal := self.expect("{"))
and
(cut := True)
and
(target_atoms := self.target_atoms())
and
(literal_1 := self.expect("}"))
):
return "{" + target_atoms + "}"
self.reset(mark)
if cut: return None
cut = False
if (
(name := self.name())
):
return name . string
self.reset(mark)
if cut: return None
cut = False
if (
(number := self.number())
):
return number . string
self.reset(mark)
if cut: return None
cut = False
if (
(string := self.string())
):
return string . string
self.reset(mark)
if cut: return None
cut = False
if (
(literal := self.expect("?"))
):
return "?"
self.reset(mark)
if cut: return None
cut = False
if (
(literal := self.expect(":"))
):
return ":"
self.reset(mark)
if cut: return None
cut = False
if (
self.negative_lookahead(self.expect, "}")
and
(op := self.op())
):
return op . string
self.reset(mark)
if cut: return None
return None
if __name__ == '__main__':
from pegen.parser import simple_parser_main
simple_parser_main(GeneratedParser)

View file

@ -0,0 +1,65 @@
import argparse
import sys
from typing import Any, Iterator, Iterable, Callable
from pegen.build import build_parser
from pegen.grammar import Grammar, Rule
argparser = argparse.ArgumentParser(
prog="pegen", description="Pretty print the AST for a given PEG grammar"
)
argparser.add_argument("filename", help="Grammar description")
class ASTGrammarPrinter:
def children(self, node: Rule) -> Iterator[Any]:
for value in node:
if isinstance(value, list):
yield from value
else:
yield value
def name(self, node: Rule) -> str:
if not list(self.children(node)):
return repr(node)
return node.__class__.__name__
def print_grammar_ast(self, grammar: Grammar, printer: Callable[..., None] = print) -> None:
for rule in grammar.rules.values():
printer(self.print_nodes_recursively(rule))
def print_nodes_recursively(self, node: Rule, prefix: str = "", istail: bool = True) -> str:
children = list(self.children(node))
value = self.name(node)
line = prefix + ("└──" if istail else "├──") + value + "\n"
sufix = " " if istail else ""
if not children:
return line
*children, last = children
for child in children:
line += self.print_nodes_recursively(child, prefix + sufix, False)
line += self.print_nodes_recursively(last, prefix + sufix, True)
return line
def main() -> None:
args = argparser.parse_args()
try:
grammar, parser, tokenizer = build_parser(args.filename)
except Exception as err:
print("ERROR: Failed to parse grammar file", file=sys.stderr)
sys.exit(1)
visitor = ASTGrammarPrinter()
visitor.print_grammar_ast(grammar)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,123 @@
@subheader """\
from ast import literal_eval
from pegen.grammar import (
Alt,
Cut,
Gather,
Group,
Item,
Lookahead,
LookaheadOrCut,
MetaTuple,
MetaList,
NameLeaf,
NamedItem,
NamedItemList,
NegativeLookahead,
Opt,
Plain,
PositiveLookahead,
Repeat0,
Repeat1,
Rhs,
Rule,
RuleList,
RuleName,
Grammar,
StringLeaf,
)
"""
start[Grammar]: grammar ENDMARKER { grammar }
grammar[Grammar]:
| metas rules { Grammar(rules, metas) }
| rules { Grammar(rules, []) }
metas[MetaList]:
| meta metas { [meta] + metas }
| meta { [meta] }
meta[MetaTuple]:
| "@" NAME NEWLINE { (name.string, None) }
| "@" a=NAME b=NAME NEWLINE { (a.string, b.string) }
| "@" NAME STRING NEWLINE { (name.string, literal_eval(string.string)) }
rules[RuleList]:
| rule rules { [rule] + rules }
| rule { [rule] }
rule[Rule]:
| rulename memoflag? ":" alts NEWLINE INDENT more_alts DEDENT {
Rule(rulename[0], rulename[1], Rhs(alts.alts + more_alts.alts), memo=opt) }
| rulename memoflag? ":" NEWLINE INDENT more_alts DEDENT {
Rule(rulename[0], rulename[1], more_alts, memo=opt) }
| rulename memoflag? ":" alts NEWLINE { Rule(rulename[0], rulename[1], alts, memo=opt) }
rulename[RuleName]:
| NAME '[' type=NAME '*' ']' { (name.string, type.string+"*") }
| NAME '[' type=NAME ']' { (name.string, type.string) }
| NAME { (name.string, None) }
# In the future this may return something more complicated
memoflag[str]:
| '(' 'memo' ')' { "memo" }
alts[Rhs]:
| alt "|" alts { Rhs([alt] + alts.alts)}
| alt { Rhs([alt]) }
more_alts[Rhs]:
| "|" alts NEWLINE more_alts { Rhs(alts.alts + more_alts.alts) }
| "|" alts NEWLINE { Rhs(alts.alts) }
alt[Alt]:
| items '$' action { Alt(items + [NamedItem(None, NameLeaf('ENDMARKER'))], action=action) }
| items '$' { Alt(items + [NamedItem(None, NameLeaf('ENDMARKER'))], action=None) }
| items action { Alt(items, action=action) }
| items { Alt(items, action=None) }
items[NamedItemList]:
| named_item items { [named_item] + items }
| named_item { [named_item] }
named_item[NamedItem]:
| NAME '=' ~ item {NamedItem(name.string, item)}
| item {NamedItem(None, item)}
| it=lookahead {NamedItem(None, it)}
lookahead[LookaheadOrCut]:
| '&' ~ atom {PositiveLookahead(atom)}
| '!' ~ atom {NegativeLookahead(atom)}
| '~' {Cut()}
item[Item]:
| '[' ~ alts ']' {Opt(alts)}
| atom '?' {Opt(atom)}
| atom '*' {Repeat0(atom)}
| atom '+' {Repeat1(atom)}
| sep=atom '.' node=atom '+' {Gather(sep, node)}
| atom {atom}
atom[Plain]:
| '(' ~ alts ')' {Group(alts)}
| NAME {NameLeaf(name.string) }
| STRING {StringLeaf(string.string)}
# Mini-grammar for the actions
action[str]: "{" ~ target_atoms "}" { target_atoms }
target_atoms[str]:
| target_atom target_atoms { target_atom + " " + target_atoms }
| target_atom { target_atom }
target_atom[str]:
| "{" ~ target_atoms "}" { "{" + target_atoms + "}" }
| NAME { name.string }
| NUMBER { number.string }
| STRING { string.string }
| "?" { "?" }
| ":" { ":" }
| !"}" OP { op.string }

View file

@ -0,0 +1,310 @@
import argparse
import sys
import time
import token
import tokenize
import traceback
from abc import abstractmethod
from typing import Any, Callable, cast, Dict, Optional, Tuple, Type, TypeVar
from pegen.tokenizer import exact_token_types
from pegen.tokenizer import Mark
from pegen.tokenizer import Tokenizer
T = TypeVar("T")
P = TypeVar("P", bound="Parser")
F = TypeVar("F", bound=Callable[..., Any])
def logger(method: F) -> F:
"""For non-memoized functions that we want to be logged.
(In practice this is only non-leader left-recursive functions.)
"""
method_name = method.__name__
def logger_wrapper(self: P, *args: object) -> T:
if not self._verbose:
return method(self, *args)
argsr = ",".join(repr(arg) for arg in args)
fill = " " * self._level
print(f"{fill}{method_name}({argsr}) .... (looking at {self.showpeek()})")
self._level += 1
tree = method(self, *args)
self._level -= 1
print(f"{fill}... {method_name}({argsr}) --> {tree!s:.200}")
return tree
logger_wrapper.__wrapped__ = method # type: ignore
return cast(F, logger_wrapper)
def memoize(method: F) -> F:
"""Memoize a symbol method."""
method_name = method.__name__
def memoize_wrapper(self: P, *args: object) -> T:
mark = self.mark()
key = mark, method_name, args
# Fast path: cache hit, and not verbose.
if key in self._cache and not self._verbose:
tree, endmark = self._cache[key]
self.reset(endmark)
return tree
# Slow path: no cache hit, or verbose.
verbose = self._verbose
argsr = ",".join(repr(arg) for arg in args)
fill = " " * self._level
if key not in self._cache:
if verbose:
print(f"{fill}{method_name}({argsr}) ... (looking at {self.showpeek()})")
self._level += 1
tree = method(self, *args)
self._level -= 1
if verbose:
print(f"{fill}... {method_name}({argsr}) -> {tree!s:.200}")
endmark = self.mark()
self._cache[key] = tree, endmark
else:
tree, endmark = self._cache[key]
if verbose:
print(f"{fill}{method_name}({argsr}) -> {tree!s:.200}")
self.reset(endmark)
return tree
memoize_wrapper.__wrapped__ = method # type: ignore
return cast(F, memoize_wrapper)
def memoize_left_rec(method: Callable[[P], Optional[T]]) -> Callable[[P], Optional[T]]:
"""Memoize a left-recursive symbol method."""
method_name = method.__name__
def memoize_left_rec_wrapper(self: P) -> Optional[T]:
mark = self.mark()
key = mark, method_name, ()
# Fast path: cache hit, and not verbose.
if key in self._cache and not self._verbose:
tree, endmark = self._cache[key]
self.reset(endmark)
return tree
# Slow path: no cache hit, or verbose.
verbose = self._verbose
fill = " " * self._level
if key not in self._cache:
if verbose:
print(f"{fill}{method_name} ... (looking at {self.showpeek()})")
self._level += 1
# For left-recursive rules we manipulate the cache and
# loop until the rule shows no progress, then pick the
# previous result. For an explanation why this works, see
# https://github.com/PhilippeSigaud/Pegged/wiki/Left-Recursion
# (But we use the memoization cache instead of a static
# variable; perhaps this is similar to a paper by Warth et al.
# (http://web.cs.ucla.edu/~todd/research/pub.php?id=pepm08).
# Prime the cache with a failure.
self._cache[key] = None, mark
lastresult, lastmark = None, mark
depth = 0
if verbose:
print(f"{fill}Recursive {method_name} at {mark} depth {depth}")
while True:
self.reset(mark)
result = method(self)
endmark = self.mark()
depth += 1
if verbose:
print(
f"{fill}Recursive {method_name} at {mark} depth {depth}: {result!s:.200} to {endmark}"
)
if not result:
if verbose:
print(f"{fill}Fail with {lastresult!s:.200} to {lastmark}")
break
if endmark <= lastmark:
if verbose:
print(f"{fill}Bailing with {lastresult!s:.200} to {lastmark}")
break
self._cache[key] = lastresult, lastmark = result, endmark
self.reset(lastmark)
tree = lastresult
self._level -= 1
if verbose:
print(f"{fill}{method_name}() -> {tree!s:.200} [cached]")
if tree:
endmark = self.mark()
else:
endmark = mark
self.reset(endmark)
self._cache[key] = tree, endmark
else:
tree, endmark = self._cache[key]
if verbose:
print(f"{fill}{method_name}() -> {tree!s:.200} [fresh]")
if tree:
self.reset(endmark)
return tree
memoize_left_rec_wrapper.__wrapped__ = method # type: ignore
return memoize_left_rec_wrapper
class Parser:
"""Parsing base class."""
def __init__(self, tokenizer: Tokenizer, *, verbose: bool = False):
self._tokenizer = tokenizer
self._verbose = verbose
self._level = 0
self._cache: Dict[Tuple[Mark, str, Tuple[Any, ...]], Tuple[Any, Mark]] = {}
# Pass through common tokenizer methods.
# TODO: Rename to _mark and _reset.
self.mark = self._tokenizer.mark
self.reset = self._tokenizer.reset
@abstractmethod
def start(self) -> Any:
pass
def showpeek(self) -> str:
tok = self._tokenizer.peek()
return f"{tok.start[0]}.{tok.start[1]}: {token.tok_name[tok.type]}:{tok.string!r}"
@memoize
def name(self) -> Optional[tokenize.TokenInfo]:
tok = self._tokenizer.peek()
if tok.type == token.NAME:
return self._tokenizer.getnext()
return None
@memoize
def number(self) -> Optional[tokenize.TokenInfo]:
tok = self._tokenizer.peek()
if tok.type == token.NUMBER:
return self._tokenizer.getnext()
return None
@memoize
def string(self) -> Optional[tokenize.TokenInfo]:
tok = self._tokenizer.peek()
if tok.type == token.STRING:
return self._tokenizer.getnext()
return None
@memoize
def op(self) -> Optional[tokenize.TokenInfo]:
tok = self._tokenizer.peek()
if tok.type == token.OP:
return self._tokenizer.getnext()
return None
@memoize
def expect(self, type: str) -> Optional[tokenize.TokenInfo]:
tok = self._tokenizer.peek()
if tok.string == type:
return self._tokenizer.getnext()
if type in exact_token_types:
if tok.type == exact_token_types[type]:
return self._tokenizer.getnext()
if type in token.__dict__:
if tok.type == token.__dict__[type]:
return self._tokenizer.getnext()
if tok.type == token.OP and tok.string == type:
return self._tokenizer.getnext()
return None
def positive_lookahead(self, func: Callable[..., T], *args: object) -> T:
mark = self.mark()
ok = func(*args)
self.reset(mark)
return ok
def negative_lookahead(self, func: Callable[..., object], *args: object) -> bool:
mark = self.mark()
ok = func(*args)
self.reset(mark)
return not ok
def make_syntax_error(self, filename: str = "<unknown>") -> SyntaxError:
tok = self._tokenizer.diagnose()
return SyntaxError(
"pegen parse failure", (filename, tok.start[0], 1 + tok.start[1], tok.line)
)
def simple_parser_main(parser_class: Type[Parser]) -> None:
argparser = argparse.ArgumentParser()
argparser.add_argument(
"-v",
"--verbose",
action="count",
default=0,
help="Print timing stats; repeat for more debug output",
)
argparser.add_argument(
"-q", "--quiet", action="store_true", help="Don't print the parsed program"
)
argparser.add_argument("filename", help="Input file ('-' to use stdin)")
args = argparser.parse_args()
verbose = args.verbose
verbose_tokenizer = verbose >= 3
verbose_parser = verbose == 2 or verbose >= 4
t0 = time.time()
filename = args.filename
if filename == "" or filename == "-":
filename = "<stdin>"
file = sys.stdin
else:
file = open(args.filename)
try:
tokengen = tokenize.generate_tokens(file.readline)
tokenizer = Tokenizer(tokengen, verbose=verbose_tokenizer)
parser = parser_class(tokenizer, verbose=verbose_parser)
tree = parser.start()
try:
if file.isatty():
endpos = 0
else:
endpos = file.tell()
except IOError:
endpos = 0
finally:
if file is not sys.stdin:
file.close()
t1 = time.time()
if not tree:
err = parser.make_syntax_error(filename)
traceback.print_exception(err.__class__, err, None)
sys.exit(1)
if not args.quiet:
print(tree)
if verbose:
dt = t1 - t0
diag = tokenizer.diagnose()
nlines = diag.end[0]
if diag.type == token.ENDMARKER:
nlines -= 1
print(f"Total time: {dt:.3f} sec; {nlines} lines", end="")
if endpos:
print(f" ({endpos} bytes)", end="")
if dt:
print(f"; {nlines / dt:.0f} lines/sec")
else:
print()
print("Caches sizes:")
print(f" token array : {len(tokenizer._tokens):10}")
print(f" cache : {len(parser._cache):10}")
## print_memstats()

View file

@ -0,0 +1,188 @@
import contextlib
import token
from abc import abstractmethod
from typing import AbstractSet, Dict, IO, Iterator, List, Optional, Set, Text, Tuple
from pegen import sccutils
from pegen.grammar import (
Grammar,
Rule,
Rhs,
Alt,
NamedItem,
Plain,
NameLeaf,
StringLeaf,
Gather,
)
from pegen.grammar import GrammarError, GrammarVisitor
class RuleCheckingVisitor(GrammarVisitor):
def __init__(self, rules: Dict[str, Rule]):
self.rules = rules
def visit_NameLeaf(self, node: NameLeaf) -> None:
if node.value not in self.rules and node.value not in token.tok_name.values():
# TODO: Add line/col info to (leaf) nodes
raise GrammarError(f"Dangling reference to rule {node.value!r}")
class ParserGenerator:
callmakervisitor: GrammarVisitor
def __init__(self, grammar: Grammar, file: Optional[IO[Text]]):
self.grammar = grammar
self.rules = grammar.rules
if "trailer" not in grammar.metas and "start" not in self.rules:
raise GrammarError("Grammar without a trailer must have a 'start' rule")
checker = RuleCheckingVisitor(self.rules)
for rule in self.rules.values():
checker.visit(rule)
self.file = file
self.level = 0
compute_nullables(self.rules)
self.first_graph, self.first_sccs = compute_left_recursives(self.rules)
self.todo = self.rules.copy() # Rules to generate
self.counter = 0 # For name_rule()/name_loop()
self.keyword_counter = 499 # For keyword_type()
@abstractmethod
def generate(self, filename: str) -> None:
raise NotImplementedError
@contextlib.contextmanager
def indent(self) -> Iterator[None]:
self.level += 1
try:
yield
finally:
self.level -= 1
def print(self, *args: object) -> None:
if not args:
print(file=self.file)
else:
print(" " * self.level, end="", file=self.file)
print(*args, file=self.file)
def printblock(self, lines: str) -> None:
for line in lines.splitlines():
self.print(line)
def collect_todo(self) -> None:
done: Set[str] = set()
while True:
alltodo = list(self.todo)
todo = [i for i in alltodo if i not in done]
if not todo:
break
for rulename in todo:
self.todo[rulename].collect_todo(self)
done = set(alltodo)
def keyword_type(self) -> int:
self.keyword_counter += 1
return self.keyword_counter
def name_node(self, rhs: Rhs) -> str:
self.counter += 1
name = f"_tmp_{self.counter}" # TODO: Pick a nicer name.
self.todo[name] = Rule(name, None, rhs)
return name
def name_loop(self, node: Plain, is_repeat1: bool) -> str:
self.counter += 1
if is_repeat1:
prefix = "_loop1_"
else:
prefix = "_loop0_"
name = f"{prefix}{self.counter}" # TODO: It's ugly to signal via the name.
self.todo[name] = Rule(name, None, Rhs([Alt([NamedItem(None, node)])]))
return name
def name_gather(self, node: Gather) -> str:
self.counter += 1
name = f"_gather_{self.counter}"
self.counter += 1
extra_function_name = f"_loop0_{self.counter}"
extra_function_alt = Alt(
[NamedItem(None, node.separator), NamedItem("elem", node.node),], action="elem",
)
self.todo[extra_function_name] = Rule(
extra_function_name, None, Rhs([extra_function_alt]),
)
alt = Alt(
[NamedItem("elem", node.node), NamedItem("seq", NameLeaf(extra_function_name)),],
)
self.todo[name] = Rule(name, None, Rhs([alt]),)
return name
def dedupe(name: str, names: List[str]) -> str:
origname = name
counter = 0
while name in names:
counter += 1
name = f"{origname}_{counter}"
names.append(name)
return name
def compute_nullables(rules: Dict[str, Rule]) -> None:
"""Compute which rules in a grammar are nullable.
Thanks to TatSu (tatsu/leftrec.py) for inspiration.
"""
for rule in rules.values():
rule.nullable_visit(rules)
def compute_left_recursives(
rules: Dict[str, Rule]
) -> Tuple[Dict[str, AbstractSet[str]], List[AbstractSet[str]]]:
graph = make_first_graph(rules)
sccs = list(sccutils.strongly_connected_components(graph.keys(), graph))
for scc in sccs:
if len(scc) > 1:
for name in scc:
rules[name].left_recursive = True
# Try to find a leader such that all cycles go through it.
leaders = set(scc)
for start in scc:
for cycle in sccutils.find_cycles_in_scc(graph, scc, start):
## print("Cycle:", " -> ".join(cycle))
leaders -= scc - set(cycle)
if not leaders:
raise ValueError(
f"SCC {scc} has no leadership candidate (no element is included in all cycles)"
)
## print("Leaders:", leaders)
leader = min(leaders) # Pick an arbitrary leader from the candidates.
rules[leader].leader = True
else:
name = min(scc) # The only element.
if name in graph[name]:
rules[name].left_recursive = True
rules[name].leader = True
return graph, sccs
def make_first_graph(rules: Dict[str, Rule]) -> Dict[str, AbstractSet[str]]:
"""Compute the graph of left-invocations.
There's an edge from A to B if A may invoke B at its initial
position.
Note that this requires the nullable flags to have been computed.
"""
graph = {}
vertices: Set[str] = set()
for rulename, rhs in rules.items():
graph[rulename] = names = rhs.initial_names()
vertices |= names
for vertex in vertices:
graph.setdefault(vertex, set())
return graph

View file

@ -0,0 +1,224 @@
from typing import Any, Dict, List, Optional, IO, Text, Tuple
from pegen.grammar import (
Cut,
GrammarVisitor,
NameLeaf,
StringLeaf,
Rhs,
NamedItem,
Lookahead,
PositiveLookahead,
NegativeLookahead,
Opt,
Repeat0,
Repeat1,
Gather,
Group,
Rule,
Alt,
)
from pegen import grammar
from pegen.parser_generator import dedupe, ParserGenerator
MODULE_PREFIX = """\
#!/usr/bin/env python3.8
# @generated by pegen from {filename}
import ast
import sys
import tokenize
from typing import Any, Optional
from pegen.parser import memoize, memoize_left_rec, logger, Parser
"""
MODULE_SUFFIX = """
if __name__ == '__main__':
from pegen.parser import simple_parser_main
simple_parser_main(GeneratedParser)
"""
class PythonCallMakerVisitor(GrammarVisitor):
def __init__(self, parser_generator: ParserGenerator):
self.gen = parser_generator
self.cache: Dict[Any, Any] = {}
def visit_NameLeaf(self, node: NameLeaf) -> Tuple[Optional[str], str]:
name = node.value
if name in ("NAME", "NUMBER", "STRING", "OP"):
name = name.lower()
return name, f"self.{name}()"
if name in ("NEWLINE", "DEDENT", "INDENT", "ENDMARKER", "ASYNC", "AWAIT"):
return name.lower(), f"self.expect({name!r})"
return name, f"self.{name}()"
def visit_StringLeaf(self, node: StringLeaf) -> Tuple[str, str]:
return "literal", f"self.expect({node.value})"
def visit_Rhs(self, node: Rhs) -> Tuple[Optional[str], str]:
if node in self.cache:
return self.cache[node]
if len(node.alts) == 1 and len(node.alts[0].items) == 1:
self.cache[node] = self.visit(node.alts[0].items[0])
else:
name = self.gen.name_node(node)
self.cache[node] = name, f"self.{name}()"
return self.cache[node]
def visit_NamedItem(self, node: NamedItem) -> Tuple[Optional[str], str]:
name, call = self.visit(node.item)
if node.name:
name = node.name
return name, call
def lookahead_call_helper(self, node: Lookahead) -> Tuple[str, str]:
name, call = self.visit(node.node)
head, tail = call.split("(", 1)
assert tail[-1] == ")"
tail = tail[:-1]
return head, tail
def visit_PositiveLookahead(self, node: PositiveLookahead) -> Tuple[None, str]:
head, tail = self.lookahead_call_helper(node)
return None, f"self.positive_lookahead({head}, {tail})"
def visit_NegativeLookahead(self, node: NegativeLookahead) -> Tuple[None, str]:
head, tail = self.lookahead_call_helper(node)
return None, f"self.negative_lookahead({head}, {tail})"
def visit_Opt(self, node: Opt) -> Tuple[str, str]:
name, call = self.visit(node.node)
return "opt", f"{call}," # Note trailing comma!
def visit_Repeat0(self, node: Repeat0) -> Tuple[str, str]:
if node in self.cache:
return self.cache[node]
name = self.gen.name_loop(node.node, False)
self.cache[node] = name, f"self.{name}()," # Also a trailing comma!
return self.cache[node]
def visit_Repeat1(self, node: Repeat1) -> Tuple[str, str]:
if node in self.cache:
return self.cache[node]
name = self.gen.name_loop(node.node, True)
self.cache[node] = name, f"self.{name}()" # But no trailing comma here!
return self.cache[node]
def visit_Gather(self, node: Gather) -> Tuple[str, str]:
if node in self.cache:
return self.cache[node]
name = self.gen.name_gather(node)
self.cache[node] = name, f"self.{name}()" # No trailing comma here either!
return self.cache[node]
def visit_Group(self, node: Group) -> Tuple[Optional[str], str]:
return self.visit(node.rhs)
def visit_Cut(self, node: Cut) -> Tuple[str, str]:
return "cut", "True"
class PythonParserGenerator(ParserGenerator, GrammarVisitor):
def __init__(self, grammar: grammar.Grammar, file: Optional[IO[Text]]):
super().__init__(grammar, file)
self.callmakervisitor = PythonCallMakerVisitor(self)
def generate(self, filename: str) -> None:
header = self.grammar.metas.get("header", MODULE_PREFIX)
if header is not None:
self.print(header.rstrip("\n").format(filename=filename))
subheader = self.grammar.metas.get("subheader", "")
if subheader:
self.print(subheader.format(filename=filename))
self.print("class GeneratedParser(Parser):")
while self.todo:
for rulename, rule in list(self.todo.items()):
del self.todo[rulename]
self.print()
with self.indent():
self.visit(rule)
trailer = self.grammar.metas.get("trailer", MODULE_SUFFIX)
if trailer is not None:
self.print(trailer.rstrip("\n"))
def visit_Rule(self, node: Rule) -> None:
is_loop = node.is_loop()
is_gather = node.is_gather()
rhs = node.flatten()
if node.left_recursive:
if node.leader:
self.print("@memoize_left_rec")
else:
# Non-leader rules in a cycle are not memoized,
# but they must still be logged.
self.print("@logger")
else:
self.print("@memoize")
node_type = node.type or "Any"
self.print(f"def {node.name}(self) -> Optional[{node_type}]:")
with self.indent():
self.print(f"# {node.name}: {rhs}")
if node.nullable:
self.print(f"# nullable={node.nullable}")
self.print("mark = self.mark()")
if is_loop:
self.print("children = []")
self.visit(rhs, is_loop=is_loop, is_gather=is_gather)
if is_loop:
self.print("return children")
else:
self.print("return None")
def visit_NamedItem(self, node: NamedItem, names: List[str]) -> None:
name, call = self.callmakervisitor.visit(node.item)
if node.name:
name = node.name
if not name:
self.print(call)
else:
if name != "cut":
name = dedupe(name, names)
self.print(f"({name} := {call})")
def visit_Rhs(self, node: Rhs, is_loop: bool = False, is_gather: bool = False) -> None:
if is_loop:
assert len(node.alts) == 1
for alt in node.alts:
self.visit(alt, is_loop=is_loop, is_gather=is_gather)
def visit_Alt(self, node: Alt, is_loop: bool, is_gather: bool) -> None:
names: List[str] = []
self.print("cut = False") # TODO: Only if needed.
if is_loop:
self.print("while (")
else:
self.print("if (")
with self.indent():
first = True
for item in node.items:
if first:
first = False
else:
self.print("and")
self.visit(item, names=names)
self.print("):")
with self.indent():
action = node.action
if not action:
if is_gather:
assert len(names) == 2
action = f"[{names[0]}] + {names[1]}"
else:
action = f"[{', '.join(names)}]"
if is_loop:
self.print(f"children.append({action})")
self.print(f"mark = self.mark()")
else:
self.print(f"return {action}")
self.print("self.reset(mark)")
# Skip remaining alternatives if a cut was reached.
self.print("if cut: return None") # TODO: Only if needed.

View file

@ -0,0 +1,128 @@
# Adapted from mypy (mypy/build.py) under the MIT license.
from typing import *
def strongly_connected_components(
vertices: AbstractSet[str], edges: Dict[str, AbstractSet[str]]
) -> Iterator[AbstractSet[str]]:
"""Compute Strongly Connected Components of a directed graph.
Args:
vertices: the labels for the vertices
edges: for each vertex, gives the target vertices of its outgoing edges
Returns:
An iterator yielding strongly connected components, each
represented as a set of vertices. Each input vertex will occur
exactly once; vertices not part of a SCC are returned as
singleton sets.
From http://code.activestate.com/recipes/578507/.
"""
identified: Set[str] = set()
stack: List[str] = []
index: Dict[str, int] = {}
boundaries: List[int] = []
def dfs(v: str) -> Iterator[Set[str]]:
index[v] = len(stack)
stack.append(v)
boundaries.append(index[v])
for w in edges[v]:
if w not in index:
yield from dfs(w)
elif w not in identified:
while index[w] < boundaries[-1]:
boundaries.pop()
if boundaries[-1] == index[v]:
boundaries.pop()
scc = set(stack[index[v] :])
del stack[index[v] :]
identified.update(scc)
yield scc
for v in vertices:
if v not in index:
yield from dfs(v)
def topsort(
data: Dict[AbstractSet[str], Set[AbstractSet[str]]]
) -> Iterable[AbstractSet[AbstractSet[str]]]:
"""Topological sort.
Args:
data: A map from SCCs (represented as frozen sets of strings) to
sets of SCCs, its dependencies. NOTE: This data structure
is modified in place -- for normalization purposes,
self-dependencies are removed and entries representing
orphans are added.
Returns:
An iterator yielding sets of SCCs that have an equivalent
ordering. NOTE: The algorithm doesn't care about the internal
structure of SCCs.
Example:
Suppose the input has the following structure:
{A: {B, C}, B: {D}, C: {D}}
This is normalized to:
{A: {B, C}, B: {D}, C: {D}, D: {}}
The algorithm will yield the following values:
{D}
{B, C}
{A}
From http://code.activestate.com/recipes/577413/.
"""
# TODO: Use a faster algorithm?
for k, v in data.items():
v.discard(k) # Ignore self dependencies.
for item in set.union(*data.values()) - set(data.keys()):
data[item] = set()
while True:
ready = {item for item, dep in data.items() if not dep}
if not ready:
break
yield ready
data = {item: (dep - ready) for item, dep in data.items() if item not in ready}
assert not data, "A cyclic dependency exists amongst %r" % data
def find_cycles_in_scc(
graph: Dict[str, AbstractSet[str]], scc: AbstractSet[str], start: str
) -> Iterable[List[str]]:
"""Find cycles in SCC emanating from start.
Yields lists of the form ['A', 'B', 'C', 'A'], which means there's
a path from A -> B -> C -> A. The first item is always the start
argument, but the last item may be another element, e.g. ['A',
'B', 'C', 'B'] means there's a path from A to B and there's a
cycle from B to C and back.
"""
# Basic input checks.
assert start in scc, (start, scc)
assert scc <= graph.keys(), scc - graph.keys()
# Reduce the graph to nodes in the SCC.
graph = {src: {dst for dst in dsts if dst in scc} for src, dsts in graph.items() if src in scc}
assert start in graph
# Recursive helper that yields cycles.
def dfs(node: str, path: List[str]) -> Iterator[List[str]]:
if node in path:
yield path + [node]
return
path = path + [node] # TODO: Make this not quadratic.
for child in graph[node]:
yield from dfs(child, path)
yield from dfs(start, [])

View file

@ -0,0 +1,126 @@
import importlib.util
import io
import os
import pathlib
import sys
import textwrap
import tokenize
from typing import Any, cast, Dict, IO, Type, Final
from pegen.build import compile_c_extension
from pegen.c_generator import CParserGenerator
from pegen.grammar import Grammar
from pegen.grammar_parser import GeneratedParser as GrammarParser
from pegen.parser import Parser
from pegen.python_generator import PythonParserGenerator
from pegen.tokenizer import Tokenizer
def generate_parser(grammar: Grammar) -> Type[Parser]:
# Generate a parser.
out = io.StringIO()
genr = PythonParserGenerator(grammar, out)
genr.generate("<string>")
# Load the generated parser class.
ns: Dict[str, Any] = {}
exec(out.getvalue(), ns)
return ns["GeneratedParser"]
def run_parser(file: IO[bytes], parser_class: Type[Parser], *, verbose: bool = False) -> Any:
# Run a parser on a file (stream).
tokenizer = Tokenizer(tokenize.generate_tokens(file.readline)) # type: ignore # typeshed issue #3515
parser = parser_class(tokenizer, verbose=verbose)
result = parser.start()
if result is None:
raise parser.make_syntax_error()
return result
def parse_string(
source: str, parser_class: Type[Parser], *, dedent: bool = True, verbose: bool = False
) -> Any:
# Run the parser on a string.
if dedent:
source = textwrap.dedent(source)
file = io.StringIO(source)
return run_parser(file, parser_class, verbose=verbose) # type: ignore # typeshed issue #3515
def make_parser(source: str) -> Type[Parser]:
# Combine parse_string() and generate_parser().
grammar = parse_string(source, GrammarParser)
return generate_parser(grammar)
def import_file(full_name: str, path: str) -> Any:
"""Import a python module from a path"""
spec = importlib.util.spec_from_file_location(full_name, path)
mod = importlib.util.module_from_spec(spec)
# We assume this is not None and has an exec_module() method.
# See https://docs.python.org/3/reference/import.html?highlight=exec_module#loading
loader = cast(Any, spec.loader)
loader.exec_module(mod)
return mod
def generate_c_parser_source(grammar: Grammar) -> str:
out = io.StringIO()
genr = CParserGenerator(grammar, out)
genr.generate("<string>")
return out.getvalue()
def generate_parser_c_extension(
grammar: Grammar, path: pathlib.PurePath, debug: bool = False
) -> Any:
"""Generate a parser c extension for the given grammar in the given path
Returns a module object with a parse_string() method.
TODO: express that using a Protocol.
"""
# Make sure that the working directory is empty: reusing non-empty temporary
# directories when generating extensions can lead to segmentation faults.
# Check issue #95 (https://github.com/gvanrossum/pegen/issues/95) for more
# context.
assert not os.listdir(path)
source = path / "parse.c"
with open(source, "w") as file:
genr = CParserGenerator(grammar, file, debug=debug)
genr.generate("parse.c")
extension_path = compile_c_extension(str(source), build_dir=str(path / "build"))
extension = import_file("parse", extension_path)
return extension
def print_memstats() -> bool:
MiB: Final = 2 ** 20
try:
import psutil # type: ignore
except ImportError:
return False
print("Memory stats:")
process = psutil.Process()
meminfo = process.memory_info()
res = {}
res["rss"] = meminfo.rss / MiB
res["vms"] = meminfo.vms / MiB
if sys.platform == "win32":
res["maxrss"] = meminfo.peak_wset / MiB
else:
# See https://stackoverflow.com/questions/938733/total-memory-used-by-python-process
import resource # Since it doesn't exist on Windows.
rusage = resource.getrusage(resource.RUSAGE_SELF)
if sys.platform == "darwin":
factor = 1
else:
factor = 1024 # Linux
res["maxrss"] = rusage.ru_maxrss * factor / MiB
for key, value in res.items():
print(f" {key:12.12s}: {value:10.0f} MiB")
return True

View file

@ -0,0 +1,86 @@
import token
import tokenize
from typing import List, Iterator
Mark = int # NewType('Mark', int)
exact_token_types = token.EXACT_TOKEN_TYPES # type: ignore
def shorttok(tok: tokenize.TokenInfo) -> str:
return "%-25.25s" % f"{tok.start[0]}.{tok.start[1]}: {token.tok_name[tok.type]}:{tok.string!r}"
class Tokenizer:
"""Caching wrapper for the tokenize module.
This is pretty tied to Python's syntax.
"""
_tokens: List[tokenize.TokenInfo]
def __init__(self, tokengen: Iterator[tokenize.TokenInfo], *, verbose: bool = False):
self._tokengen = tokengen
self._tokens = []
self._index = 0
self._verbose = verbose
if verbose:
self.report(False, False)
def getnext(self) -> tokenize.TokenInfo:
"""Return the next token and updates the index."""
cached = True
while self._index == len(self._tokens):
tok = next(self._tokengen)
if tok.type in (tokenize.NL, tokenize.COMMENT):
continue
if tok.type == token.ERRORTOKEN and tok.string.isspace():
continue
self._tokens.append(tok)
cached = False
tok = self._tokens[self._index]
self._index += 1
if self._verbose:
self.report(cached, False)
return tok
def peek(self) -> tokenize.TokenInfo:
"""Return the next token *without* updating the index."""
while self._index == len(self._tokens):
tok = next(self._tokengen)
if tok.type in (tokenize.NL, tokenize.COMMENT):
continue
if tok.type == token.ERRORTOKEN and tok.string.isspace():
continue
self._tokens.append(tok)
return self._tokens[self._index]
def diagnose(self) -> tokenize.TokenInfo:
if not self._tokens:
self.getnext()
return self._tokens[-1]
def mark(self) -> Mark:
return self._index
def reset(self, index: Mark) -> None:
if index == self._index:
return
assert 0 <= index <= len(self._tokens), (index, len(self._tokens))
old_index = self._index
self._index = index
if self._verbose:
self.report(True, index < old_index)
def report(self, cached: bool, back: bool) -> None:
if back:
fill = "-" * self._index + "-"
elif cached:
fill = "-" * self._index + ">"
else:
fill = "-" * self._index + "*"
if self._index == 0:
print(f"{fill} (Bof)")
else:
tok = self._tokens[self._index - 1]
print(f"{fill} {shorttok(tok)}")