"""Generate the main interpreter switch. Reads the instruction definitions from bytecodes.c. Writes the cases to generated_cases.c.h, which is #included in ceval.c. """ import argparse import contextlib import dataclasses import os import posixpath import re import sys import typing import lexer as lx import parser from parser import StackEffect HERE = os.path.dirname(__file__) ROOT = os.path.join(HERE, "../..") THIS = os.path.relpath(__file__, ROOT).replace(os.path.sep, posixpath.sep) DEFAULT_INPUT = os.path.relpath(os.path.join(ROOT, "Python/bytecodes.c")) DEFAULT_OUTPUT = os.path.relpath(os.path.join(ROOT, "Python/generated_cases.c.h")) DEFAULT_METADATA_OUTPUT = os.path.relpath( os.path.join(ROOT, "Python/opcode_metadata.h") ) BEGIN_MARKER = "// BEGIN BYTECODES //" END_MARKER = "// END BYTECODES //" RE_PREDICTED = ( r"^\s*(?:GO_TO_INSTRUCTION\(|DEOPT_IF\(.*?,\s*)(\w+)\);\s*(?://.*)?$" ) UNUSED = "unused" BITS_PER_CODE_UNIT = 16 RESERVED_WORDS = { "co_consts" : "Use FRAME_CO_CONSTS.", "co_names": "Use FRAME_CO_NAMES.", } arg_parser = argparse.ArgumentParser( description="Generate the code for the interpreter switch.", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) arg_parser.add_argument( "-o", "--output", type=str, help="Generated code", default=DEFAULT_OUTPUT ) arg_parser.add_argument( "-m", "--metadata", type=str, help="Generated metadata", default=DEFAULT_METADATA_OUTPUT ) arg_parser.add_argument( "-l", "--emit-line-directives", help="Emit #line directives", action="store_true" ) arg_parser.add_argument( "input", nargs=argparse.REMAINDER, help="Instruction definition file(s)" ) def effect_size(effect: StackEffect) -> tuple[int, str]: """Return the 'size' impact of a stack effect. Returns a tuple (numeric, symbolic) where: - numeric is an int giving the statically analyzable size of the effect - symbolic is a string representing a variable effect (e.g. 'oparg*2') At most one of these will be non-zero / non-empty. """ if effect.size: assert not effect.cond, "Array effects cannot have a condition" return 0, effect.size elif effect.cond: return 0, f"{maybe_parenthesize(effect.cond)} ? 1 : 0" else: return 1, "" def maybe_parenthesize(sym: str) -> str: """Add parentheses around a string if it contains an operator. An exception is made for '*' which is common and harmless in the context where the symbolic size is used. """ if re.match(r"^[\s\w*]+$", sym): return sym else: return f"({sym})" def list_effect_size(effects: list[StackEffect]) -> tuple[int, str]: numeric = 0 symbolic: list[str] = [] for effect in effects: diff, sym = effect_size(effect) numeric += diff if sym: symbolic.append(maybe_parenthesize(sym)) return numeric, " + ".join(symbolic) def string_effect_size(arg: tuple[int, str]) -> str: numeric, symbolic = arg if numeric and symbolic: return f"{numeric} + {symbolic}" elif symbolic: return symbolic else: return str(numeric) class Formatter: """Wraps an output stream with the ability to indent etc.""" stream: typing.TextIO prefix: str emit_line_directives: bool = False lineno: int # Next line number, 1-based filename: str # Slightly improved stream.filename nominal_lineno: int nominal_filename: str def __init__( self, stream: typing.TextIO, indent: int, emit_line_directives: bool = False ) -> None: self.stream = stream self.prefix = " " * indent self.emit_line_directives = emit_line_directives self.lineno = 1 filename = os.path.relpath(self.stream.name, ROOT) # Make filename more user-friendly and less platform-specific filename = filename.replace("\\", "/") if filename.startswith("./"): filename = filename[2:] if filename.endswith(".new"): filename = filename[:-4] self.filename = filename self.nominal_lineno = 1 self.nominal_filename = filename def write_raw(self, s: str) -> None: self.stream.write(s) newlines = s.count("\n") self.lineno += newlines self.nominal_lineno += newlines def emit(self, arg: str) -> None: if arg: self.write_raw(f"{self.prefix}{arg}\n") else: self.write_raw("\n") def set_lineno(self, lineno: int, filename: str) -> None: if self.emit_line_directives: if lineno != self.nominal_lineno or filename != self.nominal_filename: self.emit(f'#line {lineno} "{filename}"') self.nominal_lineno = lineno self.nominal_filename = filename def reset_lineno(self) -> None: if self.lineno != self.nominal_lineno or self.filename != self.nominal_filename: self.set_lineno(self.lineno + 1, self.filename) @contextlib.contextmanager def indent(self): self.prefix += " " yield self.prefix = self.prefix[:-4] @contextlib.contextmanager def block(self, head: str): if head: self.emit(head + " {") else: self.emit("{") with self.indent(): yield self.emit("}") def stack_adjust( self, input_effects: list[StackEffect], output_effects: list[StackEffect], ): shrink, isym = list_effect_size(input_effects) grow, osym = list_effect_size(output_effects) diff = grow - shrink if isym and isym != osym: self.emit(f"STACK_SHRINK({isym});") if diff < 0: self.emit(f"STACK_SHRINK({-diff});") if diff > 0: self.emit(f"STACK_GROW({diff});") if osym and osym != isym: self.emit(f"STACK_GROW({osym});") def declare(self, dst: StackEffect, src: StackEffect | None): if dst.name == UNUSED: return typ = f"{dst.type}" if dst.type else "PyObject *" if src: cast = self.cast(dst, src) init = f" = {cast}{src.name}" elif dst.cond: init = " = NULL" else: init = "" sepa = "" if typ.endswith("*") else " " self.emit(f"{typ}{sepa}{dst.name}{init};") def assign(self, dst: StackEffect, src: StackEffect): if src.name == UNUSED: return if src.size: # Don't write sized arrays -- it's up to the user code. return cast = self.cast(dst, src) if re.match(r"^REG\(oparg(\d+)\)$", dst.name): self.emit(f"Py_XSETREF({dst.name}, {cast}{src.name});") else: stmt = f"{dst.name} = {cast}{src.name};" if src.cond: stmt = f"if ({src.cond}) {{ {stmt} }}" self.emit(stmt) def cast(self, dst: StackEffect, src: StackEffect) -> str: return f"({dst.type or 'PyObject *'})" if src.type != dst.type else "" INSTRUCTION_FLAGS = ['HAS_ARG', 'HAS_CONST', 'HAS_NAME'] @dataclasses.dataclass class Instruction: """An instruction with additional data and code.""" # Parts of the underlying instruction definition inst: parser.InstDef kind: typing.Literal["inst", "op"] name: str block: parser.Block block_text: list[str] # Block.text, less curlies, less PREDICT() calls block_line: int # First line of block in original code # Computed by constructor always_exits: bool cache_offset: int cache_effects: list[parser.CacheEffect] input_effects: list[StackEffect] output_effects: list[StackEffect] unmoved_names: frozenset[str] instr_fmt: str flags: int # Set later family: parser.Family | None = None predicted: bool = False def __init__(self, inst: parser.InstDef): self.inst = inst self.kind = inst.kind self.name = inst.name self.block = inst.block self.block_text, self.check_eval_breaker, self.block_line = \ extract_block_text(self.block) self.always_exits = always_exits(self.block_text) self.cache_effects = [ effect for effect in inst.inputs if isinstance(effect, parser.CacheEffect) ] self.cache_offset = sum(c.size for c in self.cache_effects) self.input_effects = [ effect for effect in inst.inputs if isinstance(effect, StackEffect) ] self.output_effects = inst.outputs # For consistency/completeness unmoved_names: set[str] = set() for ieffect, oeffect in zip(self.input_effects, self.output_effects): if ieffect.name == oeffect.name: unmoved_names.add(ieffect.name) else: break self.unmoved_names = frozenset(unmoved_names) flag_data = { 'HAS_ARG' : variable_used(inst, "oparg"), 'HAS_CONST': variable_used(inst, "FRAME_CO_CONSTS"), 'HAS_NAME' : variable_used(inst, "FRAME_CO_NAMES"), } assert set(flag_data.keys()) == set(INSTRUCTION_FLAGS) self.flags = 0 for i, name in enumerate(INSTRUCTION_FLAGS): self.flags |= (1< None: """Write one instruction, sans prologue and epilogue.""" # Write a static assertion that a family's cache size is correct if family := self.family: if self.name == family.members[0]: if cache_size := family.size: out.emit( f"static_assert({cache_size} == " f'{self.cache_offset}, "incorrect cache size");' ) # Write input stack effect variable declarations and initializations ieffects = list(reversed(self.input_effects)) for i, ieffect in enumerate(ieffects): isize = string_effect_size( list_effect_size([ieff for ieff in ieffects[: i + 1]]) ) if ieffect.size: src = StackEffect(f"(stack_pointer - {maybe_parenthesize(isize)})", "PyObject **") elif ieffect.cond: src = StackEffect(f"({ieffect.cond}) ? stack_pointer[-{maybe_parenthesize(isize)}] : NULL", "") else: src = StackEffect(f"stack_pointer[-{maybe_parenthesize(isize)}]", "") out.declare(ieffect, src) # Write output stack effect variable declarations isize = string_effect_size(list_effect_size(self.input_effects)) input_names = {ieffect.name for ieffect in self.input_effects} for i, oeffect in enumerate(self.output_effects): if oeffect.name not in input_names: if oeffect.size: osize = string_effect_size( list_effect_size([oeff for oeff in self.output_effects[:i]]) ) offset = "stack_pointer" if isize != osize: if isize != "0": offset += f" - ({isize})" if osize != "0": offset += f" + {osize}" src = StackEffect(offset, "PyObject **") out.declare(oeffect, src) else: out.declare(oeffect, None) # out.emit(f"next_instr += OPSIZE({self.inst.name}) - 1;") self.write_body(out, 0) # Skip the rest if the block always exits if self.always_exits: return # Write net stack growth/shrinkage out.stack_adjust( [ieff for ieff in self.input_effects], [oeff for oeff in self.output_effects], ) # Write output stack effect assignments oeffects = list(reversed(self.output_effects)) for i, oeffect in enumerate(oeffects): if oeffect.name in self.unmoved_names: continue osize = string_effect_size( list_effect_size([oeff for oeff in oeffects[: i + 1]]) ) if oeffect.size: dst = StackEffect(f"stack_pointer - {maybe_parenthesize(osize)}", "PyObject **") else: dst = StackEffect(f"stack_pointer[-{maybe_parenthesize(osize)}]", "") out.assign(dst, oeffect) # Write cache effect if self.cache_offset: out.emit(f"next_instr += {self.cache_offset};") def write_body(self, out: Formatter, dedent: int, cache_adjust: int = 0) -> None: """Write the instruction body.""" # Write cache effect variable declarations and initializations cache_offset = cache_adjust for ceffect in self.cache_effects: if ceffect.name != UNUSED: bits = ceffect.size * BITS_PER_CODE_UNIT if bits == 64: # NOTE: We assume that 64-bit data in the cache # is always an object pointer. # If this becomes false, we need a way to specify # syntactically what type the cache data is. typ = "PyObject *" func = "read_obj" else: typ = f"uint{bits}_t " func = f"read_u{bits}" out.emit( f"{typ}{ceffect.name} = {func}(&next_instr[{cache_offset}].cache);" ) cache_offset += ceffect.size assert cache_offset == self.cache_offset + cache_adjust # Write the body, substituting a goto for ERROR_IF() and other stuff assert dedent <= 0 extra = " " * -dedent names_to_skip = self.unmoved_names | frozenset({UNUSED, "null"}) offset = 0 context = self.block.context assert context is not None and context.owner is not None filename = context.owner.filename for line in self.block_text: out.set_lineno(self.block_line + offset, filename) offset += 1 if m := re.match(r"(\s*)ERROR_IF\((.+), (\w+)\);\s*(?://.*)?$", line): space, cond, label = m.groups() space = extra + space # ERROR_IF() must pop the inputs from the stack. # The code block is responsible for DECREF()ing them. # NOTE: If the label doesn't exist, just add it to ceval.c. # Don't pop common input/output effects at the bottom! # These aren't DECREF'ed so they can stay. ieffs = list(self.input_effects) oeffs = list(self.output_effects) while ieffs and oeffs and ieffs[0] == oeffs[0]: ieffs.pop(0) oeffs.pop(0) ninputs, symbolic = list_effect_size(ieffs) if ninputs: label = f"pop_{ninputs}_{label}" if symbolic: out.write_raw( f"{space}if ({cond}) {{ STACK_SHRINK({symbolic}); goto {label}; }}\n" ) else: out.write_raw(f"{space}if ({cond}) goto {label};\n") elif m := re.match(r"(\s*)DECREF_INPUTS\(\);\s*(?://.*)?$", line): out.reset_lineno() space = extra + m.group(1) for ieff in self.input_effects: if ieff.name in names_to_skip: continue if ieff.size: out.write_raw( f"{space}for (int _i = {ieff.size}; --_i >= 0;) {{\n" ) out.write_raw(f"{space} Py_DECREF({ieff.name}[_i]);\n") out.write_raw(f"{space}}}\n") else: decref = "XDECREF" if ieff.cond else "DECREF" out.write_raw(f"{space}Py_{decref}({ieff.name});\n") else: out.write_raw(extra + line) out.reset_lineno() InstructionOrCacheEffect = Instruction | parser.CacheEffect StackEffectMapping = list[tuple[StackEffect, StackEffect]] @dataclasses.dataclass class Component: instr: Instruction input_mapping: StackEffectMapping output_mapping: StackEffectMapping def write_body(self, out: Formatter, cache_adjust: int) -> None: with out.block(""): input_names = {ieffect.name for _, ieffect in self.input_mapping} for var, ieffect in self.input_mapping: out.declare(ieffect, var) for _, oeffect in self.output_mapping: if oeffect.name not in input_names: out.declare(oeffect, None) self.instr.write_body(out, dedent=-4, cache_adjust=cache_adjust) for var, oeffect in self.output_mapping: out.assign(var, oeffect) @dataclasses.dataclass class MacroInstruction: """A macro instruction.""" name: str stack: list[StackEffect] initial_sp: int final_sp: int instr_fmt: str flags: int macro: parser.Macro parts: list[Component | parser.CacheEffect] predicted: bool = False @dataclasses.dataclass class PseudoInstruction: """A pseudo instruction.""" name: str targets: list[Instruction] instr_fmt: str flags: int @dataclasses.dataclass class OverriddenInstructionPlaceHolder: name: str AnyInstruction = Instruction | MacroInstruction | PseudoInstruction INSTR_FMT_PREFIX = "INSTR_FMT_" INSTR_FLAG_SUFFIX = "_FLAG" class Analyzer: """Parse input, analyze it, and write to output.""" input_filenames: list[str] output_filename: str metadata_filename: str errors: int = 0 emit_line_directives: bool = False def __init__(self, input_filenames: list[str], output_filename: str, metadata_filename: str): """Read the input file.""" self.input_filenames = input_filenames self.output_filename = output_filename self.metadata_filename = metadata_filename def error(self, msg: str, node: parser.Node) -> None: lineno = 0 filename = "" if context := node.context: filename = context.owner.filename # Use line number of first non-comment in the node for token in context.owner.tokens[context.begin : context.end]: lineno = token.line if token.kind != "COMMENT": break print(f"{filename}:{lineno}: {msg}", file=sys.stderr) self.errors += 1 everything: list[ parser.InstDef | parser.Macro | parser.Pseudo | OverriddenInstructionPlaceHolder ] instrs: dict[str, Instruction] # Includes ops macros: dict[str, parser.Macro] macro_instrs: dict[str, MacroInstruction] families: dict[str, parser.Family] pseudos: dict[str, parser.Pseudo] pseudo_instrs: dict[str, PseudoInstruction] def parse(self) -> None: """Parse the source text. We only want the parser to see the stuff between the begin and end markers. """ self.everything = [] self.instrs = {} self.macros = {} self.families = {} self.pseudos = {} instrs_idx: dict[str, int] = dict() for filename in self.input_filenames: self.parse_file(filename, instrs_idx) files = " + ".join(self.input_filenames) print( f"Read {len(self.instrs)} instructions/ops, " f"{len(self.macros)} macros, {len(self.pseudos)} pseudos, " f"and {len(self.families)} families from {files}", file=sys.stderr, ) def parse_file(self, filename: str, instrs_idx: dict[str, int]) -> None: with open(filename) as file: src = file.read() filename = os.path.relpath(filename, ROOT) # Make filename more user-friendly and less platform-specific filename = filename.replace("\\", "/") if filename.startswith("./"): filename = filename[2:] psr = parser.Parser(src, filename=filename) # Skip until begin marker while tkn := psr.next(raw=True): if tkn.text == BEGIN_MARKER: break else: raise psr.make_syntax_error( f"Couldn't find {BEGIN_MARKER!r} in {psr.filename}" ) start = psr.getpos() # Find end marker, then delete everything after it while tkn := psr.next(raw=True): if tkn.text == END_MARKER: break del psr.tokens[psr.getpos() - 1 :] # Parse from start psr.setpos(start) thing: parser.InstDef | parser.Macro | parser.Pseudo | parser.Family | None thing_first_token = psr.peek() while thing := psr.definition(): if ws := [w for w in RESERVED_WORDS if variable_used(thing, w)]: self.error(f"'{ws[0]}' is a reserved word. {RESERVED_WORDS[ws[0]]}", thing) match thing: case parser.InstDef(name=name): if name in self.instrs: if not thing.override: raise psr.make_syntax_error( f"Duplicate definition of '{name}' @ {thing.context} " f"previous definition @ {self.instrs[name].inst.context}", thing_first_token, ) self.everything[instrs_idx[name]] = OverriddenInstructionPlaceHolder(name=name) if name not in self.instrs and thing.override: raise psr.make_syntax_error( f"Definition of '{name}' @ {thing.context} is supposed to be " "an override but no previous definition exists.", thing_first_token, ) self.instrs[name] = Instruction(thing) instrs_idx[name] = len(self.everything) self.everything.append(thing) case parser.Macro(name): self.macros[name] = thing self.everything.append(thing) case parser.Family(name): self.families[name] = thing case parser.Pseudo(name): self.pseudos[name] = thing self.everything.append(thing) case _: typing.assert_never(thing) if not psr.eof(): raise psr.make_syntax_error(f"Extra stuff at the end of {filename}") def analyze(self) -> None: """Analyze the inputs. Raises SystemExit if there is an error. """ self.analyze_macros_and_pseudos() self.find_predictions() self.map_families() self.check_families() def find_predictions(self) -> None: """Find the instructions that need PREDICTED() labels.""" for instr in self.instrs.values(): targets: set[str] = set() for line in instr.block_text: if m := re.match(RE_PREDICTED, line): targets.add(m.group(1)) for target in targets: if target_instr := self.instrs.get(target): target_instr.predicted = True elif target_macro := self.macro_instrs.get(target): target_macro.predicted = True else: self.error( f"Unknown instruction {target!r} predicted in {instr.name!r}", instr.inst, # TODO: Use better location ) def map_families(self) -> None: """Link instruction names back to their family, if they have one.""" for family in self.families.values(): for member in family.members: if member_instr := self.instrs.get(member): if member_instr.family not in (family, None): self.error( f"Instruction {member} is a member of multiple families " f"({member_instr.family.name}, {family.name}).", family, ) else: member_instr.family = family elif member_macro := self.macro_instrs.get(member): for part in member_macro.parts: if isinstance(part, Component): if part.instr.family not in (family, None): self.error( f"Component {part.instr.name} of macro {member} " f"is a member of multiple families " f"({part.instr.family.name}, {family.name}).", family, ) else: part.instr.family = family else: self.error( f"Unknown instruction {member!r} referenced in family {family.name!r}", family, ) def check_families(self) -> None: """Check each family: - Must have at least 2 members - All members must be known instructions - All members must have the same cache, input and output effects """ for family in self.families.values(): if len(family.members) < 2: self.error(f"Family {family.name!r} has insufficient members", family) members = [ member for member in family.members if member in self.instrs or member in self.macro_instrs ] if members != family.members: unknown = set(family.members) - set(members) self.error( f"Family {family.name!r} has unknown members: {unknown}", family ) if len(members) < 2: continue expected_effects = self.effect_counts(members[0]) for member in members[1:]: member_effects = self.effect_counts(member) if member_effects != expected_effects: self.error( f"Family {family.name!r} has inconsistent " f"(cache, input, output) effects:\n" f" {family.members[0]} = {expected_effects}; " f"{member} = {member_effects}", family, ) def effect_counts(self, name: str) -> tuple[int, int, int]: if instr := self.instrs.get(name): cache = instr.cache_offset input = len(instr.input_effects) output = len(instr.output_effects) elif macro := self.macro_instrs.get(name): cache, input, output = 0, 0, 0 for part in macro.parts: if isinstance(part, Component): cache += part.instr.cache_offset # A component may pop what the previous component pushed, # so we offset the input/output counts by that. delta_i = len(part.instr.input_effects) delta_o = len(part.instr.output_effects) offset = min(delta_i, output) input += delta_i - offset output += delta_o - offset else: assert isinstance(part, parser.CacheEffect), part cache += part.size else: assert False, f"Unknown instruction {name!r}" return cache, input, output def analyze_macros_and_pseudos(self) -> None: """Analyze each macro and pseudo instruction.""" self.macro_instrs = {} self.pseudo_instrs = {} for name, macro in self.macros.items(): self.macro_instrs[name] = self.analyze_macro(macro) for name, pseudo in self.pseudos.items(): self.pseudo_instrs[name] = self.analyze_pseudo(pseudo) def analyze_macro(self, macro: parser.Macro) -> MacroInstruction: components = self.check_macro_components(macro) stack, initial_sp = self.stack_analysis(components) sp = initial_sp parts: list[Component | parser.CacheEffect] = [] format = "IB" flags = 0 cache = "C" for component in components: match component: case parser.CacheEffect() as ceffect: parts.append(ceffect) for _ in range(ceffect.size): format += cache cache = "0" case Instruction() as instr: part, sp = self.analyze_instruction(instr, stack, sp) parts.append(part) for ce in instr.cache_effects: for _ in range(ce.size): format += cache cache = "0" flags |= instr.flags case _: typing.assert_never(component) final_sp = sp return MacroInstruction( macro.name, stack, initial_sp, final_sp, format, flags, macro, parts ) def analyze_pseudo(self, pseudo: parser.Pseudo) -> PseudoInstruction: targets = [self.instrs[target] for target in pseudo.targets] assert targets # Make sure the targets have the same fmt fmts = list(set([t.instr_fmt for t in targets])) assert(len(fmts) == 1) flags_list = list(set([t.flags for t in targets])) assert(len(flags_list) == 1) return PseudoInstruction(pseudo.name, targets, fmts[0], flags_list[0]) def analyze_instruction( self, instr: Instruction, stack: list[StackEffect], sp: int ) -> tuple[Component, int]: input_mapping: StackEffectMapping = [] for ieffect in reversed(instr.input_effects): sp -= 1 input_mapping.append((stack[sp], ieffect)) output_mapping: StackEffectMapping = [] for oeffect in instr.output_effects: output_mapping.append((stack[sp], oeffect)) sp += 1 return Component(instr, input_mapping, output_mapping), sp def check_macro_components( self, macro: parser.Macro ) -> list[InstructionOrCacheEffect]: components: list[InstructionOrCacheEffect] = [] for uop in macro.uops: match uop: case parser.OpName(name): if name not in self.instrs: self.error(f"Unknown instruction {name!r}", macro) components.append(self.instrs[name]) case parser.CacheEffect(): components.append(uop) case _: typing.assert_never(uop) return components def stack_analysis( self, components: typing.Iterable[InstructionOrCacheEffect] ) -> tuple[list[StackEffect], int]: """Analyze a macro. Ignore cache effects. Return the list of variables (as StackEffects) and the initial stack pointer. """ lowest = current = highest = 0 conditions: dict[int, str] = {} # Indexed by 'current'. last_instr: Instruction | None = None for thing in components: if isinstance(thing, Instruction): last_instr = thing for thing in components: match thing: case Instruction() as instr: if any( eff.size for eff in instr.input_effects + instr.output_effects ): # TODO: Eventually this will be needed, at least for macros. self.error( f"Instruction {instr.name!r} has variable-sized stack effect, " "which are not supported in macro instructions", instr.inst, # TODO: Pass name+location of macro ) if any(eff.cond for eff in instr.input_effects): self.error( f"Instruction {instr.name!r} has conditional input stack effect, " "which are not supported in macro instructions", instr.inst, # TODO: Pass name+location of macro ) if any(eff.cond for eff in instr.output_effects) and instr is not last_instr: self.error( f"Instruction {instr.name!r} has conditional output stack effect, " "but is not the last instruction in a macro", instr.inst, # TODO: Pass name+location of macro ) current -= len(instr.input_effects) lowest = min(lowest, current) for eff in instr.output_effects: if eff.cond: conditions[current] = eff.cond current += 1 highest = max(highest, current) case parser.CacheEffect(): pass case _: typing.assert_never(thing) # At this point, 'current' is the net stack effect, # and 'lowest' and 'highest' are the extremes. # Note that 'lowest' may be negative. stack = [ StackEffect(f"_tmp_{i}", "", conditions.get(highest - i, "")) for i in reversed(range(1, highest - lowest + 1)) ] return stack, -lowest def get_stack_effect_info( self, thing: parser.InstDef | parser.Macro | parser.Pseudo ) -> tuple[AnyInstruction | None, str, str]: def effect_str(effects: list[StackEffect]) -> str: n_effect, sym_effect = list_effect_size(effects) if sym_effect: return f"{sym_effect} + {n_effect}" if n_effect else sym_effect return str(n_effect) instr: AnyInstruction | None match thing: case parser.InstDef(): if thing.kind != "op": instr = self.instrs[thing.name] popped = effect_str(instr.input_effects) pushed = effect_str(instr.output_effects) else: instr = None popped = "" pushed = "" case parser.Macro(): instr = self.macro_instrs[thing.name] parts = [comp for comp in instr.parts if isinstance(comp, Component)] # Note: stack_analysis() already verifies that macro components # have no variable-sized stack effects. low = 0 sp = 0 high = 0 pushed_symbolic: list[str] = [] for comp in parts: for effect in comp.instr.input_effects: assert not effect.cond, effect assert not effect.size, effect sp -= 1 low = min(low, sp) for effect in comp.instr.output_effects: assert not effect.size, effect if effect.cond: pushed_symbolic.append(maybe_parenthesize(f"{maybe_parenthesize(effect.cond)} ? 1 : 0")) sp += 1 high = max(sp, high) if high != max(0, sp): # If you get this, intermediate stack growth occurs, # and stack size calculations may go awry. # E.g. [push, pop]. The fix would be for stack size # calculations to use the micro ops. self.error("Macro has virtual stack growth", thing) popped = str(-low) pushed_symbolic.append(str(sp - low - len(pushed_symbolic))) pushed = " + ".join(pushed_symbolic) case parser.Pseudo(): instr = self.pseudo_instrs[thing.name] popped = pushed = None # Calculate stack effect, and check that it's the the same # for all targets. for target in self.pseudos[thing.name].targets: target_instr = self.instrs.get(target) # Currently target is always an instr. This could change # in the future, e.g., if we have a pseudo targetting a # macro instruction. assert target_instr target_popped = effect_str(target_instr.input_effects) target_pushed = effect_str(target_instr.output_effects) if popped is None and pushed is None: popped, pushed = target_popped, target_pushed else: assert popped == target_popped assert pushed == target_pushed case _: typing.assert_never(thing) return instr, popped, pushed def write_stack_effect_functions(self) -> None: popped_data: list[tuple[AnyInstruction, str]] = [] pushed_data: list[tuple[AnyInstruction, str]] = [] for thing in self.everything: if isinstance(thing, OverriddenInstructionPlaceHolder): continue instr, popped, pushed = self.get_stack_effect_info(thing) if instr is not None: popped_data.append((instr, popped)) pushed_data.append((instr, pushed)) def write_function( direction: str, data: list[tuple[AnyInstruction, str]] ) -> None: self.out.emit("") self.out.emit("#ifndef NEED_OPCODE_METADATA") self.out.emit(f"extern int _PyOpcode_num_{direction}(int opcode, int oparg, bool jump);") self.out.emit("#else") self.out.emit("int") self.out.emit(f"_PyOpcode_num_{direction}(int opcode, int oparg, bool jump) {{") self.out.emit(" switch(opcode) {") for instr, effect in data: self.out.emit(f" case {instr.name}:") self.out.emit(f" return {effect};") self.out.emit(" default:") self.out.emit(" return -1;") self.out.emit(" }") self.out.emit("}") self.out.emit("#endif") write_function("popped", popped_data) write_function("pushed", pushed_data) self.out.emit("") def from_source_files(self) -> str: paths = "\n// ".join( os.path.relpath(filename, ROOT).replace(os.path.sep, posixpath.sep) for filename in self.input_filenames ) return f"// from:\n// {paths}\n" def write_metadata(self) -> None: """Write instruction metadata to output file.""" # Compute the set of all instruction formats. all_formats: set[str] = set() for thing in self.everything: match thing: case OverriddenInstructionPlaceHolder(): continue case parser.InstDef(): format = self.instrs[thing.name].instr_fmt case parser.Macro(): format = self.macro_instrs[thing.name].instr_fmt case parser.Pseudo(): format = None for target in self.pseudos[thing.name].targets: target_instr = self.instrs.get(target) assert target_instr if format is None: format = target_instr.instr_fmt else: assert format == target_instr.instr_fmt case _: typing.assert_never(thing) all_formats.add(format) # Turn it into a list of enum definitions. format_enums = [INSTR_FMT_PREFIX + format for format in sorted(all_formats)] with open(self.metadata_filename, "w") as f: # Create formatter self.out = Formatter(f, 0) # Write provenance header self.out.write_raw(f"// This file is generated by {THIS}\n") self.out.write_raw(self.from_source_files()) self.out.write_raw(f"// Do not edit!\n") self.write_pseudo_instrs() self.write_stack_effect_functions() # Write type definitions self.out.emit(f"enum InstructionFormat {{ {', '.join(format_enums)} }};") for i, flag in enumerate(INSTRUCTION_FLAGS): self.out.emit(f"#define {flag}{INSTR_FLAG_SUFFIX} ({1 << i})"); for flag in INSTRUCTION_FLAGS: flag_name = f"{flag}{INSTR_FLAG_SUFFIX}" self.out.emit( f"#define OPCODE_{flag}(OP) " f"(_PyOpcode_opcode_metadata[(OP)].flags & ({flag_name}))") self.out.emit("struct opcode_metadata {") with self.out.indent(): self.out.emit("bool valid_entry;") self.out.emit("enum InstructionFormat instr_format;") self.out.emit("int flags;") self.out.emit("};") self.out.emit("") self.out.emit("#define OPCODE_METADATA_FMT(OP) " "(_PyOpcode_opcode_metadata[(OP)].instr_format)") self.out.emit("#define SAME_OPCODE_METADATA(OP1, OP2) \\") self.out.emit(" (OPCODE_METADATA_FMT(OP1) == OPCODE_METADATA_FMT(OP2))") self.out.emit("") # Write metadata array declaration self.out.emit("#ifndef NEED_OPCODE_METADATA") self.out.emit("extern const struct opcode_metadata _PyOpcode_opcode_metadata[512];") self.out.emit("#else") self.out.emit("const struct opcode_metadata _PyOpcode_opcode_metadata[512] = {") # Write metadata for each instruction for thing in self.everything: match thing: case OverriddenInstructionPlaceHolder(): continue case parser.InstDef(): if thing.kind != "op": self.write_metadata_for_inst(self.instrs[thing.name]) case parser.Macro(): self.write_metadata_for_macro(self.macro_instrs[thing.name]) case parser.Pseudo(): self.write_metadata_for_pseudo(self.pseudo_instrs[thing.name]) case _: typing.assert_never(thing) # Write end of array self.out.emit("};") self.out.emit("#endif") def write_pseudo_instrs(self) -> None: """Write the IS_PSEUDO_INSTR macro""" self.out.emit("\n\n#define IS_PSEUDO_INSTR(OP) \\") for op in self.pseudos: self.out.emit(f" ((OP) == {op}) || \\") self.out.emit(f" 0") def emit_metadata_entry(self, name: str, fmt: str, flags: int) -> None: flags_strs = [f"{name}{INSTR_FLAG_SUFFIX}" for i, name in enumerate(INSTRUCTION_FLAGS) if (flags & (1< None: """Write metadata for a single instruction.""" self.emit_metadata_entry(instr.name, instr.instr_fmt, instr.flags) def write_metadata_for_macro(self, mac: MacroInstruction) -> None: """Write metadata for a macro-instruction.""" self.emit_metadata_entry(mac.name, mac.instr_fmt, mac.flags) def write_metadata_for_pseudo(self, ps: PseudoInstruction) -> None: """Write metadata for a macro-instruction.""" self.emit_metadata_entry(ps.name, ps.instr_fmt, ps.flags) def write_instructions(self) -> None: """Write instructions to output file.""" with open(self.output_filename, "w") as f: # Create formatter self.out = Formatter(f, 8, self.emit_line_directives) # Write provenance header self.out.write_raw(f"// This file is generated by {THIS}\n") self.out.write_raw(self.from_source_files()) self.out.write_raw(f"// Do not edit!\n") # Write and count instructions of all kinds n_instrs = 0 n_macros = 0 n_pseudos = 0 for thing in self.everything: match thing: case OverriddenInstructionPlaceHolder(): self.write_overridden_instr_place_holder(thing) case parser.InstDef(): if thing.kind != "op": n_instrs += 1 self.write_instr(self.instrs[thing.name]) case parser.Macro(): n_macros += 1 self.write_macro(self.macro_instrs[thing.name]) case parser.Pseudo(): n_pseudos += 1 case _: typing.assert_never(thing) print( f"Wrote {n_instrs} instructions, {n_macros} macros, " f"and {n_pseudos} pseudos to {self.output_filename}", file=sys.stderr, ) def write_overridden_instr_place_holder(self, place_holder: OverriddenInstructionPlaceHolder) -> None: self.out.emit("") self.out.emit( f"// TARGET({place_holder.name}) overridden by later definition") def write_instr(self, instr: Instruction) -> None: name = instr.name self.out.emit("") if instr.inst.override: self.out.emit("// Override") with self.out.block(f"TARGET({name})"): if instr.predicted: self.out.emit(f"PREDICTED({name});") instr.write(self.out) if not instr.always_exits: if instr.check_eval_breaker: self.out.emit("CHECK_EVAL_BREAKER();") self.out.emit(f"DISPATCH();") def write_macro(self, mac: MacroInstruction) -> None: """Write code for a macro instruction.""" last_instr: Instruction | None = None with self.wrap_macro(mac): cache_adjust = 0 for part in mac.parts: match part: case parser.CacheEffect(size=size): cache_adjust += size case Component() as comp: last_instr = comp.instr comp.write_body(self.out, cache_adjust) cache_adjust += comp.instr.cache_offset if cache_adjust: self.out.emit(f"next_instr += {cache_adjust};") if ( last_instr and (family := last_instr.family) and mac.name == family.members[0] and (cache_size := family.size) ): self.out.emit( f"static_assert({cache_size} == " f'{cache_adjust}, "incorrect cache size");' ) @contextlib.contextmanager def wrap_macro(self, mac: MacroInstruction): """Boilerplate for macro instructions.""" # TODO: Somewhere (where?) make it so that if one instruction # has an output that is input to another, and the variable names # and types match and don't conflict with other instructions, # that variable is declared with the right name and type in the # outer block, rather than trusting the compiler to optimize it. self.out.emit("") with self.out.block(f"TARGET({mac.name})"): if mac.predicted: self.out.emit(f"PREDICTED({mac.name});") # The input effects should have no conditionals. # Only the output effects do (for now). ieffects = [ StackEffect(eff.name, eff.type) if eff.cond else eff for eff in mac.stack ] for i, var in reversed(list(enumerate(ieffects))): src = None if i < mac.initial_sp: src = StackEffect(f"stack_pointer[-{mac.initial_sp - i}]", "") self.out.declare(var, src) yield self.out.stack_adjust(ieffects[:mac.initial_sp], mac.stack[:mac.final_sp]) for i, var in enumerate(reversed(mac.stack[: mac.final_sp]), 1): dst = StackEffect(f"stack_pointer[-{i}]", "") self.out.assign(dst, var) self.out.emit(f"DISPATCH();") def extract_block_text(block: parser.Block) -> tuple[list[str], bool, int]: # Get lines of text with proper dedent blocklines = block.text.splitlines(True) first_token: lx.Token = block.tokens[0] # IndexError means the context is broken block_line = first_token.begin[0] # Remove blank lines from both ends while blocklines and not blocklines[0].strip(): blocklines.pop(0) block_line += 1 while blocklines and not blocklines[-1].strip(): blocklines.pop() # Remove leading and trailing braces assert blocklines and blocklines[0].strip() == "{" assert blocklines and blocklines[-1].strip() == "}" blocklines.pop() blocklines.pop(0) block_line += 1 # Remove trailing blank lines while blocklines and not blocklines[-1].strip(): blocklines.pop() # Separate CHECK_EVAL_BREAKER() macro from end check_eval_breaker = \ blocklines != [] and blocklines[-1].strip() == "CHECK_EVAL_BREAKER();" if check_eval_breaker: del blocklines[-1] return blocklines, check_eval_breaker, block_line def always_exits(lines: list[str]) -> bool: """Determine whether a block always ends in a return/goto/etc.""" if not lines: return False line = lines[-1].rstrip() # Indent must match exactly (TODO: Do something better) if line[:12] != " " * 12: return False line = line[12:] return line.startswith( ( "goto ", "return ", "DISPATCH", "GO_TO_", "Py_UNREACHABLE()", "ERROR_IF(true, ", ) ) def variable_used(node: parser.Node, name: str) -> bool: """Determine whether a variable with a given name is used in a node.""" return any( token.kind == "IDENTIFIER" and token.text == name for token in node.tokens ) def main(): """Parse command line, parse input, analyze, write output.""" args = arg_parser.parse_args() # Prints message and sys.exit(2) on error if len(args.input) == 0: args.input.append(DEFAULT_INPUT) a = Analyzer(args.input, args.output, args.metadata) # Raises OSError if input unreadable if args.emit_line_directives: a.emit_line_directives = True a.parse() # Raises SyntaxError on failure a.analyze() # Prints messages and sets a.errors on failure if a.errors: sys.exit(f"Found {a.errors} errors") a.write_instructions() # Raises OSError if output can't be written a.write_metadata() if __name__ == "__main__": main()