bpo-36876: Fix the C analyzer tool. (GH-22841)

The original tool wasn't working right and it was simpler to create a new one, partially re-using some of the old code. At this point the tool runs properly on the master. (Try: ./python Tools/c-analyzer/c-analyzer.py analyze.)  It take ~40 seconds on my machine to analyze the full CPython code base.

Note that we'll need to iron out some OS-specific stuff (e.g. preprocessor). We're okay though since this tool isn't used yet in our workflow. We will also need to verify the analysis results in detail before activating the check in CI, though I'm pretty sure it's close.

https://bugs.python.org/issue36876
This commit is contained in:
Eric Snow 2020-10-22 18:42:51 -06:00 committed by GitHub
parent ec388cfb4e
commit 345cd37abe
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
92 changed files with 8868 additions and 10539 deletions

View file

@ -0,0 +1,2 @@
NOT_SET = object()

View file

@ -0,0 +1,117 @@
_NOT_SET = object()
class Slot:
"""A descriptor that provides a slot.
This is useful for types that can't have slots via __slots__,
e.g. tuple subclasses.
"""
__slots__ = ('initial', 'default', 'readonly', 'instances', 'name')
def __init__(self, initial=_NOT_SET, *,
default=_NOT_SET,
readonly=False,
):
self.initial = initial
self.default = default
self.readonly = readonly
# The instance cache is not inherently tied to the normal
# lifetime of the instances. So must do something in order to
# avoid keeping the instances alive by holding a reference here.
# Ideally we would use weakref.WeakValueDictionary to do this.
# However, most builtin types do not support weakrefs. So
# instead we monkey-patch __del__ on the attached class to clear
# the instance.
self.instances = {}
self.name = None
def __set_name__(self, cls, name):
if self.name is not None:
raise TypeError('already used')
self.name = name
try:
slotnames = cls.__slot_names__
except AttributeError:
slotnames = cls.__slot_names__ = []
slotnames.append(name)
self._ensure___del__(cls, slotnames)
def __get__(self, obj, cls):
if obj is None: # called on the class
return self
try:
value = self.instances[id(obj)]
except KeyError:
if self.initial is _NOT_SET:
value = self.default
else:
value = self.initial
self.instances[id(obj)] = value
if value is _NOT_SET:
raise AttributeError(self.name)
# XXX Optionally make a copy?
return value
def __set__(self, obj, value):
if self.readonly:
raise AttributeError(f'{self.name} is readonly')
# XXX Optionally coerce?
self.instances[id(obj)] = value
def __delete__(self, obj):
if self.readonly:
raise AttributeError(f'{self.name} is readonly')
self.instances[id(obj)] = self.default # XXX refleak?
def _ensure___del__(self, cls, slotnames): # See the comment in __init__().
try:
old___del__ = cls.__del__
except AttributeError:
old___del__ = (lambda s: None)
else:
if getattr(old___del__, '_slotted', False):
return
def __del__(_self):
for name in slotnames:
delattr(_self, name)
old___del__(_self)
__del__._slotted = True
cls.__del__ = __del__
def set(self, obj, value):
"""Update the cached value for an object.
This works even if the descriptor is read-only. This is
particularly useful when initializing the object (e.g. in
its __new__ or __init__).
"""
self.instances[id(obj)] = value
class classonly:
"""A non-data descriptor that makes a value only visible on the class.
This is like the "classmethod" builtin, but does not show up on
instances of the class. It may be used as a decorator.
"""
def __init__(self, value):
self.value = value
self.getter = classmethod(value).__get__
self.name = None
def __set_name__(self, cls, name):
if self.name is not None:
raise TypeError('already used')
self.name = name
def __get__(self, obj, cls):
if obj is not None:
raise AttributeError(self.name)
# called on the class
return self.getter(None, cls)

View file

@ -0,0 +1,388 @@
import fnmatch
import glob
import os
import os.path
import shutil
import stat
from .iterutil import iter_many
C_SOURCE_SUFFIXES = ('.c', '.h')
def create_backup(old, backup=None):
if isinstance(old, str):
filename = old
else:
filename = getattr(old, 'name', None)
if not filename:
return None
if not backup or backup is True:
backup = f'{filename}.bak'
try:
shutil.copyfile(filename, backup)
except FileNotFoundError as exc:
if exc.filename != filename:
raise # re-raise
backup = None
return backup
##################################
# find files
def match_glob(filename, pattern):
if fnmatch.fnmatch(filename, pattern):
return True
# fnmatch doesn't handle ** quite right. It will not match the
# following:
#
# ('x/spam.py', 'x/**/*.py')
# ('spam.py', '**/*.py')
#
# though it *will* match the following:
#
# ('x/y/spam.py', 'x/**/*.py')
# ('x/spam.py', '**/*.py')
if '**/' not in pattern:
return False
# We only accommodate the single-"**" case.
return fnmatch.fnmatch(filename, pattern.replace('**/', '', 1))
def iter_filenames(filenames, *,
start=None,
include=None,
exclude=None,
):
onempty = Exception('no filenames provided')
for filename, solo in iter_many(filenames, onempty):
check, start = _get_check(filename, start, include, exclude)
yield filename, check, solo
# filenames = iter(filenames or ())
# try:
# first = next(filenames)
# except StopIteration:
# raise Exception('no filenames provided')
# try:
# second = next(filenames)
# except StopIteration:
# check, _ = _get_check(first, start, include, exclude)
# yield first, check, False
# return
#
# check, start = _get_check(first, start, include, exclude)
# yield first, check, True
# check, start = _get_check(second, start, include, exclude)
# yield second, check, True
# for filename in filenames:
# check, start = _get_check(filename, start, include, exclude)
# yield filename, check, True
def expand_filenames(filenames):
for filename in filenames:
# XXX Do we need to use glob.escape (a la commit 9355868458, GH-20994)?
if '**/' in filename:
yield from glob.glob(filename.replace('**/', ''))
yield from glob.glob(filename)
def _get_check(filename, start, include, exclude):
if start and filename != start:
return (lambda: '<skipped>'), start
else:
def check():
if _is_excluded(filename, exclude, include):
return '<excluded>'
return None
return check, None
def _is_excluded(filename, exclude, include):
if include:
for included in include:
if match_glob(filename, included):
return False
return True
elif exclude:
for excluded in exclude:
if match_glob(filename, excluded):
return True
return False
else:
return False
def _walk_tree(root, *,
_walk=os.walk,
):
# A wrapper around os.walk that resolves the filenames.
for parent, _, names in _walk(root):
for name in names:
yield os.path.join(parent, name)
def walk_tree(root, *,
suffix=None,
walk=_walk_tree,
):
"""Yield each file in the tree under the given directory name.
If "suffix" is provided then only files with that suffix will
be included.
"""
if suffix and not isinstance(suffix, str):
raise ValueError('suffix must be a string')
for filename in walk(root):
if suffix and not filename.endswith(suffix):
continue
yield filename
def glob_tree(root, *,
suffix=None,
_glob=glob.iglob,
):
"""Yield each file in the tree under the given directory name.
If "suffix" is provided then only files with that suffix will
be included.
"""
suffix = suffix or ''
if not isinstance(suffix, str):
raise ValueError('suffix must be a string')
for filename in _glob(f'{root}/*{suffix}'):
yield filename
for filename in _glob(f'{root}/**/*{suffix}'):
yield filename
def iter_files(root, suffix=None, relparent=None, *,
get_files=os.walk,
_glob=glob_tree,
_walk=walk_tree,
):
"""Yield each file in the tree under the given directory name.
If "root" is a non-string iterable then do the same for each of
those trees.
If "suffix" is provided then only files with that suffix will
be included.
if "relparent" is provided then it is used to resolve each
filename as a relative path.
"""
if not isinstance(root, str):
roots = root
for root in roots:
yield from iter_files(root, suffix, relparent,
get_files=get_files,
_glob=_glob, _walk=_walk)
return
# Use the right "walk" function.
if get_files in (glob.glob, glob.iglob, glob_tree):
get_files = _glob
else:
_files = _walk_tree if get_files in (os.walk, walk_tree) else get_files
get_files = (lambda *a, **k: _walk(*a, walk=_files, **k))
# Handle a single suffix.
if suffix and not isinstance(suffix, str):
filenames = get_files(root)
suffix = tuple(suffix)
else:
filenames = get_files(root, suffix=suffix)
suffix = None
for filename in filenames:
if suffix and not isinstance(suffix, str): # multiple suffixes
if not filename.endswith(suffix):
continue
if relparent:
filename = os.path.relpath(filename, relparent)
yield filename
def iter_files_by_suffix(root, suffixes, relparent=None, *,
walk=walk_tree,
_iter_files=iter_files,
):
"""Yield each file in the tree that has the given suffixes.
Unlike iter_files(), the results are in the original suffix order.
"""
if isinstance(suffixes, str):
suffixes = [suffixes]
# XXX Ignore repeated suffixes?
for suffix in suffixes:
yield from _iter_files(root, suffix, relparent)
##################################
# file info
# XXX posix-only?
S_IRANY = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
S_IWANY = stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
S_IXANY = stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
def is_readable(file, *, user=None, check=False):
filename, st, mode = _get_file_info(file)
if check:
try:
okay = _check_file(filename, S_IRANY)
except NotImplementedError:
okay = NotImplemented
if okay is not NotImplemented:
return okay
# Fall back to checking the mode.
return _check_mode(st, mode, S_IRANY, user)
def is_writable(file, *, user=None, check=False):
filename, st, mode = _get_file_info(file)
if check:
try:
okay = _check_file(filename, S_IWANY)
except NotImplementedError:
okay = NotImplemented
if okay is not NotImplemented:
return okay
# Fall back to checking the mode.
return _check_mode(st, mode, S_IWANY, user)
def is_executable(file, *, user=None, check=False):
filename, st, mode = _get_file_info(file)
if check:
try:
okay = _check_file(filename, S_IXANY)
except NotImplementedError:
okay = NotImplemented
if okay is not NotImplemented:
return okay
# Fall back to checking the mode.
return _check_mode(st, mode, S_IXANY, user)
def _get_file_info(file):
filename = st = mode = None
if isinstance(file, int):
mode = file
elif isinstance(file, os.stat_result):
st = file
else:
if isinstance(file, str):
filename = file
elif hasattr(file, 'name') and os.path.exists(file.name):
filename = file.name
else:
raise NotImplementedError(file)
st = os.stat(filename)
return filename, st, mode or st.st_mode
def _check_file(filename, check):
if not isinstance(filename, str):
raise Exception(f'filename required to check file, got {filename}')
if check & S_IRANY:
flags = os.O_RDONLY
elif check & S_IWANY:
flags = os.O_WRONLY
elif check & S_IXANY:
# We can worry about S_IXANY later
return NotImplemented
else:
raise NotImplementedError(check)
try:
fd = os.open(filename, flags)
except PermissionError:
return False
# We do not ignore other exceptions.
else:
os.close(fd)
return True
def _get_user_info(user):
import pwd
username = uid = gid = groups = None
if user is None:
uid = os.geteuid()
#username = os.getlogin()
username = pwd.getpwuid(uid)[0]
gid = os.getgid()
groups = os.getgroups()
else:
if isinstance(user, int):
uid = user
entry = pwd.getpwuid(uid)
username = entry.pw_name
elif isinstance(user, str):
username = user
entry = pwd.getpwnam(username)
uid = entry.pw_uid
else:
raise NotImplementedError(user)
gid = entry.pw_gid
os.getgrouplist(username, gid)
return username, uid, gid, groups
def _check_mode(st, mode, check, user):
orig = check
_, uid, gid, groups = _get_user_info(user)
if check & S_IRANY:
check -= S_IRANY
matched = False
if mode & stat.S_IRUSR:
if st.st_uid == uid:
matched = True
if mode & stat.S_IRGRP:
if st.st_uid == gid or st.st_uid in groups:
matched = True
if mode & stat.S_IROTH:
matched = True
if not matched:
return False
if check & S_IWANY:
check -= S_IWANY
matched = False
if mode & stat.S_IWUSR:
if st.st_uid == uid:
matched = True
if mode & stat.S_IWGRP:
if st.st_uid == gid or st.st_uid in groups:
matched = True
if mode & stat.S_IWOTH:
matched = True
if not matched:
return False
if check & S_IXANY:
check -= S_IXANY
matched = False
if mode & stat.S_IXUSR:
if st.st_uid == uid:
matched = True
if mode & stat.S_IXGRP:
if st.st_uid == gid or st.st_uid in groups:
matched = True
if mode & stat.S_IXOTH:
matched = True
if not matched:
return False
if check:
raise NotImplementedError((orig, check))
return True

View file

View file

@ -0,0 +1,48 @@
_NOT_SET = object()
def peek_and_iter(items):
if not items:
return None, None
items = iter(items)
try:
peeked = next(items)
except StopIteration:
return None, None
def chain():
yield peeked
yield from items
return chain(), peeked
def iter_many(items, onempty=None):
if not items:
if onempty is None:
return
if not callable(onempty):
raise onEmpty
items = onempty(items)
yield from iter_many(items, onempty=None)
return
items = iter(items)
try:
first = next(items)
except StopIteration:
if onempty is None:
return
if not callable(onempty):
raise onEmpty
items = onempty(items)
yield from iter_many(items, onempty=None)
else:
try:
second = next(items)
except StopIteration:
yield first, False
return
else:
yield first, True
yield second, True
for item in items:
yield item, True

View file

@ -0,0 +1,63 @@
import logging
import sys
VERBOSITY = 3
# The root logger for the whole top-level package:
_logger = logging.getLogger(__name__.rpartition('.')[0])
def configure_logger(logger, verbosity=VERBOSITY, *,
logfile=None,
maxlevel=logging.CRITICAL,
):
level = max(1, # 0 disables it, so we use the next lowest.
min(maxlevel,
maxlevel - verbosity * 10))
logger.setLevel(level)
#logger.propagate = False
if not logger.handlers:
if logfile:
handler = logging.FileHandler(logfile)
else:
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(level)
#handler.setFormatter(logging.Formatter())
logger.addHandler(handler)
# In case the provided logger is in a sub-package...
if logger is not _logger:
configure_logger(
_logger,
verbosity,
logfile=logfile,
maxlevel=maxlevel,
)
def hide_emit_errors():
"""Ignore errors while emitting log entries.
Rather than printing a message desribing the error, we show nothing.
"""
# For now we simply ignore all exceptions. If we wanted to ignore
# specific ones (e.g. BrokenPipeError) then we would need to use
# a Handler subclass with a custom handleError() method.
orig = logging.raiseExceptions
logging.raiseExceptions = False
def restore():
logging.raiseExceptions = orig
return restore
class Printer:
def __init__(self, verbosity=VERBOSITY):
self.verbosity = verbosity
def info(self, *args, **kwargs):
if self.verbosity < 3:
return
print(*args, **kwargs)

View file

@ -0,0 +1,7 @@
class Labeled:
__slots__ = ('_label',)
def __init__(self, label):
self._label = label
def __repr__(self):
return f'<{self._label}>'

View file

@ -0,0 +1,577 @@
import argparse
import contextlib
import fnmatch
import logging
import os
import os.path
import shutil
import sys
from . import fsutil, strutil, iterutil, logging as loggingutil
def get_prog(spec=None, *, absolute=False, allowsuffix=True):
if spec is None:
_, spec = _find_script()
# This is more natural for prog than __file__ would be.
filename = sys.argv[0]
elif isinstance(spec, str):
filename = os.path.normpath(spec)
spec = None
else:
filename = spec.origin
if _is_standalone(filename):
# Check if "installed".
if allowsuffix or not filename.endswith('.py'):
basename = os.path.basename(filename)
found = shutil.which(basename)
if found:
script = os.path.abspath(filename)
found = os.path.abspath(found)
if os.path.normcase(script) == os.path.normcase(found):
return basename
# It is only "standalone".
if absolute:
filename = os.path.abspath(filename)
return filename
elif spec is not None:
module = spec.name
if module.endswith('.__main__'):
module = module[:-9]
return f'{sys.executable} -m {module}'
else:
if absolute:
filename = os.path.abspath(filename)
return f'{sys.executable} {filename}'
def _find_script():
frame = sys._getframe(2)
while frame.f_globals['__name__'] != '__main__':
frame = frame.f_back
# This should match sys.argv[0].
filename = frame.f_globals['__file__']
# This will be None if -m wasn't used..
spec = frame.f_globals['__spec__']
return filename, spec
def is_installed(filename, *, allowsuffix=True):
if not allowsuffix and filename.endswith('.py'):
return False
filename = os.path.abspath(os.path.normalize(filename))
found = shutil.which(os.path.basename(filename))
if not found:
return False
if found != filename:
return False
return _is_standalone(filename)
def is_standalone(filename):
filename = os.path.abspath(os.path.normalize(filename))
return _is_standalone(filename)
def _is_standalone(filename):
return fsutil.is_executable(filename)
##################################
# logging
VERBOSITY = 3
TRACEBACK = os.environ.get('SHOW_TRACEBACK', '').strip()
TRACEBACK = bool(TRACEBACK and TRACEBACK.upper() not in ('0', 'FALSE', 'NO'))
logger = logging.getLogger(__name__)
def configure_logger(verbosity, logger=None, **kwargs):
if logger is None:
# Configure the root logger.
logger = logging.getLogger()
loggingutil.configure_logger(logger, verbosity, **kwargs)
##################################
# selections
class UnsupportedSelectionError(Exception):
def __init__(self, values, possible):
self.values = tuple(values)
self.possible = tuple(possible)
super().__init__(f'unsupported selections {self.unique}')
@property
def unique(self):
return tuple(sorted(set(self.values)))
def normalize_selection(selected: str, *, possible=None):
if selected in (None, True, False):
return selected
elif isinstance(selected, str):
selected = [selected]
elif not selected:
return ()
unsupported = []
_selected = set()
for item in selected:
if not item:
continue
for value in item.strip().replace(',', ' ').split():
if not value:
continue
# XXX Handle subtraction (leading "-").
if possible and value not in possible and value != 'all':
unsupported.append(value)
_selected.add(value)
if unsupported:
raise UnsupportedSelectionError(unsupported, tuple(possible))
if 'all' in _selected:
return True
return frozenset(selected)
##################################
# CLI parsing helpers
class CLIArgSpec(tuple):
def __new__(cls, *args, **kwargs):
return super().__new__(cls, (args, kwargs))
def __repr__(self):
args, kwargs = self
args = [repr(arg) for arg in args]
for name, value in kwargs.items():
args.append(f'{name}={value!r}')
return f'{type(self).__name__}({", ".join(args)})'
def __call__(self, parser, *, _noop=(lambda a: None)):
self.apply(parser)
return _noop
def apply(self, parser):
args, kwargs = self
parser.add_argument(*args, **kwargs)
def apply_cli_argspecs(parser, specs):
processors = []
for spec in specs:
if callable(spec):
procs = spec(parser)
_add_procs(processors, procs)
else:
args, kwargs = spec
parser.add_argument(args, kwargs)
return processors
def _add_procs(flattened, procs):
# XXX Fail on non-empty, non-callable procs?
if not procs:
return
if callable(procs):
flattened.append(procs)
else:
#processors.extend(p for p in procs if callable(p))
for proc in procs:
_add_procs(flattened, proc)
def add_verbosity_cli(parser):
parser.add_argument('-q', '--quiet', action='count', default=0)
parser.add_argument('-v', '--verbose', action='count', default=0)
def process_args(args):
ns = vars(args)
key = 'verbosity'
if key in ns:
parser.error(f'duplicate arg {key!r}')
ns[key] = max(0, VERBOSITY + ns.pop('verbose') - ns.pop('quiet'))
return key
return process_args
def add_traceback_cli(parser):
parser.add_argument('--traceback', '--tb', action='store_true',
default=TRACEBACK)
parser.add_argument('--no-traceback', '--no-tb', dest='traceback',
action='store_const', const=False)
def process_args(args):
ns = vars(args)
key = 'traceback_cm'
if key in ns:
parser.error(f'duplicate arg {key!r}')
showtb = ns.pop('traceback')
@contextlib.contextmanager
def traceback_cm():
restore = loggingutil.hide_emit_errors()
try:
yield
except BrokenPipeError:
# It was piped to "head" or something similar.
pass
except NotImplementedError:
raise # re-raise
except Exception as exc:
if not showtb:
sys.exit(f'ERROR: {exc}')
raise # re-raise
except KeyboardInterrupt:
if not showtb:
sys.exit('\nINTERRUPTED')
raise # re-raise
except BaseException as exc:
if not showtb:
sys.exit(f'{type(exc).__name__}: {exc}')
raise # re-raise
finally:
restore()
ns[key] = traceback_cm()
return key
return process_args
def add_sepval_cli(parser, opt, dest, choices, *, sep=',', **kwargs):
# if opt is True:
# parser.add_argument(f'--{dest}', action='append', **kwargs)
# elif isinstance(opt, str) and opt.startswith('-'):
# parser.add_argument(opt, dest=dest, action='append', **kwargs)
# else:
# arg = dest if not opt else opt
# kwargs.setdefault('nargs', '+')
# parser.add_argument(arg, dest=dest, action='append', **kwargs)
if not isinstance(opt, str):
parser.error(f'opt must be a string, got {opt!r}')
elif opt.startswith('-'):
parser.add_argument(opt, dest=dest, action='append', **kwargs)
else:
kwargs.setdefault('nargs', '+')
#kwargs.setdefault('metavar', opt.upper())
parser.add_argument(opt, dest=dest, action='append', **kwargs)
def process_args(args):
ns = vars(args)
# XXX Use normalize_selection()?
if isinstance(ns[dest], str):
ns[dest] = [ns[dest]]
selections = []
for many in ns[dest] or ():
for value in many.split(sep):
if value not in choices:
parser.error(f'unknown {dest} {value!r}')
selections.append(value)
ns[dest] = selections
return process_args
def add_files_cli(parser, *, excluded=None, nargs=None):
process_files = add_file_filtering_cli(parser, excluded=excluded)
parser.add_argument('filenames', nargs=nargs or '+', metavar='FILENAME')
return [
process_files,
]
def add_file_filtering_cli(parser, *, excluded=None):
parser.add_argument('--start')
parser.add_argument('--include', action='append')
parser.add_argument('--exclude', action='append')
excluded = tuple(excluded or ())
def process_args(args):
ns = vars(args)
key = 'iter_filenames'
if key in ns:
parser.error(f'duplicate arg {key!r}')
_include = tuple(ns.pop('include') or ())
_exclude = excluded + tuple(ns.pop('exclude') or ())
kwargs = dict(
start=ns.pop('start'),
include=tuple(_parse_files(_include)),
exclude=tuple(_parse_files(_exclude)),
# We use the default for "show_header"
)
ns[key] = (lambda files: fsutil.iter_filenames(files, **kwargs))
return process_args
def _parse_files(filenames):
for filename, _ in strutil.parse_entries(filenames):
yield filename.strip()
def add_failure_filtering_cli(parser, pool, *, default=False):
parser.add_argument('--fail', action='append',
metavar=f'"{{all|{"|".join(sorted(pool))}}},..."')
parser.add_argument('--no-fail', dest='fail', action='store_const', const=())
def process_args(args):
ns = vars(args)
fail = ns.pop('fail')
try:
fail = normalize_selection(fail, possible=pool)
except UnsupportedSelectionError as exc:
parser.error(f'invalid --fail values: {", ".join(exc.unique)}')
else:
if fail is None:
fail = default
if fail is True:
def ignore_exc(_exc):
return False
elif fail is False:
def ignore_exc(_exc):
return True
else:
def ignore_exc(exc):
for err in fail:
if type(exc) == pool[err]:
return False
else:
return True
args.ignore_exc = ignore_exc
return process_args
def add_kind_filtering_cli(parser, *, default=None):
parser.add_argument('--kinds', action='append')
def process_args(args):
ns = vars(args)
kinds = []
for kind in ns.pop('kinds') or default or ():
kinds.extend(kind.strip().replace(',', ' ').split())
if not kinds:
match_kind = (lambda k: True)
else:
included = set()
excluded = set()
for kind in kinds:
if kind.startswith('-'):
kind = kind[1:]
excluded.add(kind)
if kind in included:
included.remove(kind)
else:
included.add(kind)
if kind in excluded:
excluded.remove(kind)
if excluded:
if included:
... # XXX fail?
def match_kind(kind, *, _excluded=excluded):
return kind not in _excluded
else:
def match_kind(kind, *, _included=included):
return kind in _included
args.match_kind = match_kind
return process_args
COMMON_CLI = [
add_verbosity_cli,
add_traceback_cli,
#add_dryrun_cli,
]
def add_commands_cli(parser, commands, *, commonspecs=COMMON_CLI, subset=None):
arg_processors = {}
if isinstance(subset, str):
cmdname = subset
try:
_, argspecs, _ = commands[cmdname]
except KeyError:
raise ValueError(f'unsupported subset {subset!r}')
parser.set_defaults(cmd=cmdname)
arg_processors[cmdname] = _add_cmd_cli(parser, commonspecs, argspecs)
else:
if subset is None:
cmdnames = subset = list(commands)
elif not subset:
raise NotImplementedError
elif isinstance(subset, set):
cmdnames = [k for k in commands if k in subset]
subset = sorted(subset)
else:
cmdnames = [n for n in subset if n in commands]
if len(cmdnames) < len(subset):
bad = tuple(n for n in subset if n not in commands)
raise ValueError(f'unsupported subset {bad}')
common = argparse.ArgumentParser(add_help=False)
common_processors = apply_cli_argspecs(common, commonspecs)
subs = parser.add_subparsers(dest='cmd')
for cmdname in cmdnames:
description, argspecs, _ = commands[cmdname]
sub = subs.add_parser(
cmdname,
description=description,
parents=[common],
)
cmd_processors = _add_cmd_cli(sub, (), argspecs)
arg_processors[cmdname] = common_processors + cmd_processors
return arg_processors
def _add_cmd_cli(parser, commonspecs, argspecs):
processors = []
argspecs = list(commonspecs or ()) + list(argspecs or ())
for argspec in argspecs:
if callable(argspec):
procs = argspec(parser)
_add_procs(processors, procs)
else:
if not argspec:
raise NotImplementedError
args = list(argspec)
if not isinstance(args[-1], str):
kwargs = args.pop()
if not isinstance(args[0], str):
try:
args, = args
except (TypeError, ValueError):
parser.error(f'invalid cmd args {argspec!r}')
else:
kwargs = {}
parser.add_argument(*args, **kwargs)
# There will be nothing to process.
return processors
def _flatten_processors(processors):
for proc in processors:
if proc is None:
continue
if callable(proc):
yield proc
else:
yield from _flatten_processors(proc)
def process_args(args, processors, *, keys=None):
processors = _flatten_processors(processors)
ns = vars(args)
extracted = {}
if keys is None:
for process_args in processors:
for key in process_args(args):
extracted[key] = ns.pop(key)
else:
remainder = set(keys)
for process_args in processors:
hanging = process_args(args)
if isinstance(hanging, str):
hanging = [hanging]
for key in hanging or ():
if key not in remainder:
raise NotImplementedError(key)
extracted[key] = ns.pop(key)
remainder.remove(key)
if remainder:
raise NotImplementedError(sorted(remainder))
return extracted
def process_args_by_key(args, processors, keys):
extracted = process_args(args, processors, keys=keys)
return [extracted[key] for key in keys]
##################################
# commands
def set_command(name, add_cli):
"""A decorator factory to set CLI info."""
def decorator(func):
if hasattr(func, '__cli__'):
raise Exception(f'already set')
func.__cli__ = (name, add_cli)
return func
return decorator
##################################
# main() helpers
def filter_filenames(filenames, iter_filenames=None):
for filename, check, _ in _iter_filenames(filenames, iter_filenames):
if (reason := check()):
logger.debug(f'{filename}: {reason}')
continue
yield filename
def main_for_filenames(filenames, iter_filenames=None):
for filename, check, show in _iter_filenames(filenames, iter_filenames):
if show:
print()
print('-------------------------------------------')
print(filename)
if (reason := check()):
print(reason)
continue
yield filename
def _iter_filenames(filenames, iter_files):
if iter_files is None:
iter_files = fsutil.iter_filenames
yield from iter_files(filenames)
return
onempty = Exception('no filenames provided')
items = iter_files(filenames)
items, peeked = iterutil.peek_and_iter(items)
if not items:
raise onempty
if isinstance(peeked, str):
check = (lambda: True)
for filename, ismany in iterutil.iter_many(items, onempty):
yield filename, check, ismany
elif len(peeked) == 3:
yield from items
else:
raise NotImplementedError
def iter_marks(mark='.', *, group=5, groups=2, lines=10, sep=' '):
mark = mark or ''
sep = f'{mark}{sep}' if sep else mark
end = f'{mark}{os.linesep}'
div = os.linesep
perline = group * groups
perlines = perline * lines
if perline == 1:
yield end
elif group == 1:
yield sep
count = 1
while True:
if count % perline == 0:
yield end
if count % perlines == 0:
yield div
elif count % group == 0:
yield sep
else:
yield mark
count += 1

View file

View file

@ -0,0 +1,42 @@
import logging
logger = logging.getLogger(__name__)
def unrepr(value):
raise NotImplementedError
def parse_entries(entries, *, ignoresep=None):
for entry in entries:
if ignoresep and ignoresep in entry:
subentries = [entry]
else:
subentries = entry.strip().replace(',', ' ').split()
for item in subentries:
if item.startswith('+'):
filename = item[1:]
try:
infile = open(filename)
except FileNotFoundError:
logger.debug(f'ignored in parse_entries(): +{filename}')
return
with infile:
# We read the entire file here to ensure the file
# gets closed sooner rather than later. Note that
# the file would stay open if this iterator is never
# exchausted.
lines = infile.read().splitlines()
for line in _iter_significant_lines(lines):
yield line, filename
else:
yield item, None
def _iter_significant_lines(lines):
for line in lines:
line = line.partition('#')[0]
if not line.strip():
continue
yield line

View file

@ -0,0 +1,213 @@
import csv
from . import NOT_SET, strutil, fsutil
EMPTY = '-'
UNKNOWN = '???'
def parse_markers(markers, default=None):
if markers is NOT_SET:
return default
if not markers:
return None
if type(markers) is not str:
return markers
if markers == markers[0] * len(markers):
return [markers]
return list(markers)
def fix_row(row, **markers):
if isinstance(row, str):
raise NotImplementedError(row)
empty = parse_markers(markers.pop('empty', ('-',)))
unknown = parse_markers(markers.pop('unknown', ('???',)))
row = (val if val else None for val in row)
if not empty:
if not unknown:
return row
return (UNKNOWN if val in unknown else val for val in row)
elif not unknown:
return (EMPTY if val in empty else val for val in row)
return (EMPTY if val in empty else (UNKNOWN if val in unknown else val)
for val in row)
def _fix_read_default(row):
for value in row:
yield value.strip()
def _fix_write_default(row, empty=''):
for value in row:
yield empty if value is None else str(value)
def _normalize_fix_read(fix):
if fix is None:
fix = ''
if callable(fix):
def fix_row(row):
values = fix(row)
return _fix_read_default(values)
elif isinstance(fix, str):
def fix_row(row):
values = _fix_read_default(row)
return (None if v == fix else v
for v in values)
else:
raise NotImplementedError(fix)
return fix_row
def _normalize_fix_write(fix, empty=''):
if fix is None:
fix = empty
if callable(fix):
def fix_row(row):
values = fix(row)
return _fix_write_default(values, empty)
elif isinstance(fix, str):
def fix_row(row):
return _fix_write_default(row, fix)
else:
raise NotImplementedError(fix)
return fix_row
def read_table(infile, header, *,
sep='\t',
fix=None,
_open=open,
_get_reader=csv.reader,
):
"""Yield each row of the given ???-separated (e.g. tab) file."""
if isinstance(infile, str):
with _open(infile, newline='') as infile:
yield from read_table(
infile,
header,
sep=sep,
fix=fix,
_open=_open,
_get_reader=_get_reader,
)
return
lines = strutil._iter_significant_lines(infile)
# Validate the header.
if not isinstance(header, str):
header = sep.join(header)
try:
actualheader = next(lines).strip()
except StopIteration:
actualheader = ''
if actualheader != header:
raise ValueError(f'bad header {actualheader!r}')
fix_row = _normalize_fix_read(fix)
for row in _get_reader(lines, delimiter=sep or '\t'):
yield tuple(fix_row(row))
def write_table(outfile, header, rows, *,
sep='\t',
fix=None,
backup=True,
_open=open,
_get_writer=csv.writer,
):
"""Write each of the rows to the given ???-separated (e.g. tab) file."""
if backup:
fsutil.create_backup(outfile, backup)
if isinstance(outfile, str):
with _open(outfile, 'w', newline='') as outfile:
return write_table(
outfile,
header,
rows,
sep=sep,
fix=fix,
backup=backup,
_open=_open,
_get_writer=_get_writer,
)
if isinstance(header, str):
header = header.split(sep or '\t')
fix_row = _normalize_fix_write(fix)
writer = _get_writer(outfile, delimiter=sep or '\t')
writer.writerow(header)
for row in rows:
writer.writerow(
tuple(fix_row(row))
)
def parse_table(entries, sep, header=None, rawsep=None, *,
default=NOT_SET,
strict=True,
):
header, sep = _normalize_table_file_props(header, sep)
if not sep:
raise ValueError('missing "sep"')
ncols = None
if header:
if strict:
ncols = len(header.split(sep))
cur_file = None
for line, filename in strutil.parse_entries(entries, ignoresep=sep):
_sep = sep
if filename:
if header and cur_file != filename:
cur_file = filename
# Skip the first line if it's the header.
if line.strip() == header:
continue
else:
# We expected the header.
raise NotImplementedError((header, line))
elif rawsep and sep not in line:
_sep = rawsep
row = _parse_row(line, _sep, ncols, default)
if strict and not ncols:
ncols = len(row)
yield row, filename
def parse_row(line, sep, *, ncols=None, default=NOT_SET):
if not sep:
raise ValueError('missing "sep"')
return _parse_row(line, sep, ncols, default)
def _parse_row(line, sep, ncols, default):
row = tuple(v.strip() for v in line.split(sep))
if (ncols or 0) > 0:
diff = ncols - len(row)
if diff:
if default is NOT_SET or diff < 0:
raise Exception(f'bad row (expected {ncols} columns, got {row!r})')
row += (default,) * diff
return row
def _normalize_table_file_props(header, sep):
if not header:
return None, sep
if not isinstance(header, str):
if not sep:
raise NotImplementedError(header)
header = sep.join(header)
elif not sep:
for sep in ('\t', ',', ' '):
if sep in header:
break
else:
sep = None
return header, sep