mirror of
				https://github.com/python/cpython.git
				synced 2025-10-31 02:15:10 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			627 lines
		
	
	
	
		
			19 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			627 lines
		
	
	
	
		
			19 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import argparse
 | |
| import contextlib
 | |
| import logging
 | |
| import os
 | |
| import os.path
 | |
| import shutil
 | |
| import sys
 | |
| 
 | |
| from . import fsutil, strutil, iterutil, logging as loggingutil
 | |
| 
 | |
| 
 | |
| _NOT_SET = object()
 | |
| 
 | |
| 
 | |
| def get_prog(spec=None, *, absolute=False, allowsuffix=True):
 | |
|     if spec is None:
 | |
|         _, spec = _find_script()
 | |
|         # This is more natural for prog than __file__ would be.
 | |
|         filename = sys.argv[0]
 | |
|     elif isinstance(spec, str):
 | |
|         filename = os.path.normpath(spec)
 | |
|         spec = None
 | |
|     else:
 | |
|         filename = spec.origin
 | |
|     if _is_standalone(filename):
 | |
|         # Check if "installed".
 | |
|         if allowsuffix or not filename.endswith('.py'):
 | |
|             basename = os.path.basename(filename)
 | |
|             found = shutil.which(basename)
 | |
|             if found:
 | |
|                 script = os.path.abspath(filename)
 | |
|                 found = os.path.abspath(found)
 | |
|                 if os.path.normcase(script) == os.path.normcase(found):
 | |
|                     return basename
 | |
|         # It is only "standalone".
 | |
|         if absolute:
 | |
|             filename = os.path.abspath(filename)
 | |
|         return filename
 | |
|     elif spec is not None:
 | |
|         module = spec.name
 | |
|         if module.endswith('.__main__'):
 | |
|             module = module[:-9]
 | |
|         return f'{sys.executable} -m {module}'
 | |
|     else:
 | |
|         if absolute:
 | |
|             filename = os.path.abspath(filename)
 | |
|         return f'{sys.executable} {filename}'
 | |
| 
 | |
| 
 | |
| def _find_script():
 | |
|     frame = sys._getframe(2)
 | |
|     while frame.f_globals['__name__'] != '__main__':
 | |
|         frame = frame.f_back
 | |
| 
 | |
|     # This should match sys.argv[0].
 | |
|     filename = frame.f_globals['__file__']
 | |
|     # This will be None if -m wasn't used..
 | |
|     spec = frame.f_globals['__spec__']
 | |
|     return filename, spec
 | |
| 
 | |
| 
 | |
| def is_installed(filename, *, allowsuffix=True):
 | |
|     if not allowsuffix and filename.endswith('.py'):
 | |
|         return False
 | |
|     filename = os.path.abspath(os.path.normalize(filename))
 | |
|     found = shutil.which(os.path.basename(filename))
 | |
|     if not found:
 | |
|         return False
 | |
|     if found != filename:
 | |
|         return False
 | |
|     return _is_standalone(filename)
 | |
| 
 | |
| 
 | |
| def is_standalone(filename):
 | |
|     filename = os.path.abspath(os.path.normalize(filename))
 | |
|     return _is_standalone(filename)
 | |
| 
 | |
| 
 | |
| def _is_standalone(filename):
 | |
|     return fsutil.is_executable(filename)
 | |
| 
 | |
| 
 | |
| ##################################
 | |
| # logging
 | |
| 
 | |
| VERBOSITY = 3
 | |
| 
 | |
| TRACEBACK = os.environ.get('SHOW_TRACEBACK', '').strip()
 | |
| TRACEBACK = bool(TRACEBACK and TRACEBACK.upper() not in ('0', 'FALSE', 'NO'))
 | |
| 
 | |
| 
 | |
| logger = logging.getLogger(__name__)
 | |
| 
 | |
| 
 | |
| def configure_logger(verbosity, logger=None, **kwargs):
 | |
|     if logger is None:
 | |
|         # Configure the root logger.
 | |
|         logger = logging.getLogger()
 | |
|     loggingutil.configure_logger(logger, verbosity, **kwargs)
 | |
| 
 | |
| 
 | |
| ##################################
 | |
| # selections
 | |
| 
 | |
| class UnsupportedSelectionError(Exception):
 | |
|     def __init__(self, values, possible):
 | |
|         self.values = tuple(values)
 | |
|         self.possible = tuple(possible)
 | |
|         super().__init__(f'unsupported selections {self.unique}')
 | |
| 
 | |
|     @property
 | |
|     def unique(self):
 | |
|         return tuple(sorted(set(self.values)))
 | |
| 
 | |
| 
 | |
| def normalize_selection(selected: str, *, possible=None):
 | |
|     if selected in (None, True, False):
 | |
|         return selected
 | |
|     elif isinstance(selected, str):
 | |
|         selected = [selected]
 | |
|     elif not selected:
 | |
|         return ()
 | |
| 
 | |
|     unsupported = []
 | |
|     _selected = set()
 | |
|     for item in selected:
 | |
|         if not item:
 | |
|             continue
 | |
|         for value in item.strip().replace(',', ' ').split():
 | |
|             if not value:
 | |
|                 continue
 | |
|             # XXX Handle subtraction (leading "-").
 | |
|             if possible and value not in possible and value != 'all':
 | |
|                 unsupported.append(value)
 | |
|             _selected.add(value)
 | |
|     if unsupported:
 | |
|         raise UnsupportedSelectionError(unsupported, tuple(possible))
 | |
|     if 'all' in _selected:
 | |
|         return True
 | |
|     return frozenset(selected)
 | |
| 
 | |
| 
 | |
| ##################################
 | |
| # CLI parsing helpers
 | |
| 
 | |
| class CLIArgSpec(tuple):
 | |
|     def __new__(cls, *args, **kwargs):
 | |
|         return super().__new__(cls, (args, kwargs))
 | |
| 
 | |
|     def __repr__(self):
 | |
|         args, kwargs = self
 | |
|         args = [repr(arg) for arg in args]
 | |
|         for name, value in kwargs.items():
 | |
|             args.append(f'{name}={value!r}')
 | |
|         return f'{type(self).__name__}({", ".join(args)})'
 | |
| 
 | |
|     def __call__(self, parser, *, _noop=(lambda a: None)):
 | |
|         self.apply(parser)
 | |
|         return _noop
 | |
| 
 | |
|     def apply(self, parser):
 | |
|         args, kwargs = self
 | |
|         parser.add_argument(*args, **kwargs)
 | |
| 
 | |
| 
 | |
| def apply_cli_argspecs(parser, specs):
 | |
|     processors = []
 | |
|     for spec in specs:
 | |
|         if callable(spec):
 | |
|             procs = spec(parser)
 | |
|             _add_procs(processors, procs)
 | |
|         else:
 | |
|             args, kwargs = spec
 | |
|             parser.add_argument(args, kwargs)
 | |
|     return processors
 | |
| 
 | |
| 
 | |
| def _add_procs(flattened, procs):
 | |
|     # XXX Fail on non-empty, non-callable procs?
 | |
|     if not procs:
 | |
|         return
 | |
|     if callable(procs):
 | |
|         flattened.append(procs)
 | |
|     else:
 | |
|         #processors.extend(p for p in procs if callable(p))
 | |
|         for proc in procs:
 | |
|             _add_procs(flattened, proc)
 | |
| 
 | |
| 
 | |
| def add_verbosity_cli(parser):
 | |
|     parser.add_argument('-q', '--quiet', action='count', default=0)
 | |
|     parser.add_argument('-v', '--verbose', action='count', default=0)
 | |
| 
 | |
|     def process_args(args, *, argv=None):
 | |
|         ns = vars(args)
 | |
|         key = 'verbosity'
 | |
|         if key in ns:
 | |
|             parser.error(f'duplicate arg {key!r}')
 | |
|         ns[key] = max(0, VERBOSITY + ns.pop('verbose') - ns.pop('quiet'))
 | |
|         return key
 | |
|     return process_args
 | |
| 
 | |
| 
 | |
| def add_traceback_cli(parser):
 | |
|     parser.add_argument('--traceback', '--tb', action='store_true',
 | |
|                         default=TRACEBACK)
 | |
|     parser.add_argument('--no-traceback', '--no-tb', dest='traceback',
 | |
|                         action='store_const', const=False)
 | |
| 
 | |
|     def process_args(args, *, argv=None):
 | |
|         ns = vars(args)
 | |
|         key = 'traceback_cm'
 | |
|         if key in ns:
 | |
|             parser.error(f'duplicate arg {key!r}')
 | |
|         showtb = ns.pop('traceback')
 | |
| 
 | |
|         @contextlib.contextmanager
 | |
|         def traceback_cm():
 | |
|             restore = loggingutil.hide_emit_errors()
 | |
|             try:
 | |
|                 yield
 | |
|             except BrokenPipeError:
 | |
|                 # It was piped to "head" or something similar.
 | |
|                 pass
 | |
|             except NotImplementedError:
 | |
|                 raise  # re-raise
 | |
|             except Exception as exc:
 | |
|                 if not showtb:
 | |
|                     sys.exit(f'ERROR: {exc}')
 | |
|                 raise  # re-raise
 | |
|             except KeyboardInterrupt:
 | |
|                 if not showtb:
 | |
|                     sys.exit('\nINTERRUPTED')
 | |
|                 raise  # re-raise
 | |
|             except BaseException as exc:
 | |
|                 if not showtb:
 | |
|                     sys.exit(f'{type(exc).__name__}: {exc}')
 | |
|                 raise  # re-raise
 | |
|             finally:
 | |
|                 restore()
 | |
|         ns[key] = traceback_cm()
 | |
|         return key
 | |
|     return process_args
 | |
| 
 | |
| 
 | |
| def add_sepval_cli(parser, opt, dest, choices, *, sep=',', **kwargs):
 | |
| #    if opt is True:
 | |
| #        parser.add_argument(f'--{dest}', action='append', **kwargs)
 | |
| #    elif isinstance(opt, str) and opt.startswith('-'):
 | |
| #        parser.add_argument(opt, dest=dest, action='append', **kwargs)
 | |
| #    else:
 | |
| #        arg = dest if not opt else opt
 | |
| #        kwargs.setdefault('nargs', '+')
 | |
| #        parser.add_argument(arg, dest=dest, action='append', **kwargs)
 | |
|     if not isinstance(opt, str):
 | |
|         parser.error(f'opt must be a string, got {opt!r}')
 | |
|     elif opt.startswith('-'):
 | |
|         parser.add_argument(opt, dest=dest, action='append', **kwargs)
 | |
|     else:
 | |
|         kwargs.setdefault('nargs', '+')
 | |
|         #kwargs.setdefault('metavar', opt.upper())
 | |
|         parser.add_argument(opt, dest=dest, action='append', **kwargs)
 | |
| 
 | |
|     def process_args(args, *, argv=None):
 | |
|         ns = vars(args)
 | |
| 
 | |
|         # XXX Use normalize_selection()?
 | |
|         if isinstance(ns[dest], str):
 | |
|             ns[dest] = [ns[dest]]
 | |
|         selections = []
 | |
|         for many in ns[dest] or ():
 | |
|             for value in many.split(sep):
 | |
|                 if value not in choices:
 | |
|                     parser.error(f'unknown {dest} {value!r}')
 | |
|                 selections.append(value)
 | |
|         ns[dest] = selections
 | |
|     return process_args
 | |
| 
 | |
| 
 | |
| def add_files_cli(parser, *, excluded=None, nargs=None):
 | |
|     process_files = add_file_filtering_cli(parser, excluded=excluded)
 | |
|     parser.add_argument('filenames', nargs=nargs or '+', metavar='FILENAME')
 | |
|     return [
 | |
|         process_files,
 | |
|     ]
 | |
| 
 | |
| 
 | |
| def add_file_filtering_cli(parser, *, excluded=None):
 | |
|     parser.add_argument('--start')
 | |
|     parser.add_argument('--include', action='append')
 | |
|     parser.add_argument('--exclude', action='append')
 | |
| 
 | |
|     excluded = tuple(excluded or ())
 | |
| 
 | |
|     def process_args(args, *, argv=None):
 | |
|         ns = vars(args)
 | |
|         key = 'iter_filenames'
 | |
|         if key in ns:
 | |
|             parser.error(f'duplicate arg {key!r}')
 | |
| 
 | |
|         _include = tuple(ns.pop('include') or ())
 | |
|         _exclude = excluded + tuple(ns.pop('exclude') or ())
 | |
|         kwargs = dict(
 | |
|             start=ns.pop('start'),
 | |
|             include=tuple(_parse_files(_include)),
 | |
|             exclude=tuple(_parse_files(_exclude)),
 | |
|             # We use the default for "show_header"
 | |
|         )
 | |
|         def process_filenames(filenames, relroot=None):
 | |
|             return fsutil.process_filenames(filenames, relroot=relroot, **kwargs)
 | |
|         ns[key] = process_filenames
 | |
|     return process_args
 | |
| 
 | |
| 
 | |
| def _parse_files(filenames):
 | |
|     for filename, _ in strutil.parse_entries(filenames):
 | |
|         yield filename.strip()
 | |
| 
 | |
| 
 | |
| def add_progress_cli(parser, *, threshold=VERBOSITY, **kwargs):
 | |
|     parser.add_argument('--progress', dest='track_progress', action='store_const', const=True)
 | |
|     parser.add_argument('--no-progress', dest='track_progress', action='store_false')
 | |
|     parser.set_defaults(track_progress=True)
 | |
| 
 | |
|     def process_args(args, *, argv=None):
 | |
|         if args.track_progress:
 | |
|             ns = vars(args)
 | |
|             verbosity = ns.get('verbosity', VERBOSITY)
 | |
|             if verbosity <= threshold:
 | |
|                 args.track_progress = track_progress_compact
 | |
|             else:
 | |
|                 args.track_progress = track_progress_flat
 | |
|     return process_args
 | |
| 
 | |
| 
 | |
| def add_failure_filtering_cli(parser, pool, *, default=False):
 | |
|     parser.add_argument('--fail', action='append',
 | |
|                         metavar=f'"{{all|{"|".join(sorted(pool))}}},..."')
 | |
|     parser.add_argument('--no-fail', dest='fail', action='store_const', const=())
 | |
| 
 | |
|     def process_args(args, *, argv=None):
 | |
|         ns = vars(args)
 | |
| 
 | |
|         fail = ns.pop('fail')
 | |
|         try:
 | |
|             fail = normalize_selection(fail, possible=pool)
 | |
|         except UnsupportedSelectionError as exc:
 | |
|             parser.error(f'invalid --fail values: {", ".join(exc.unique)}')
 | |
|         else:
 | |
|             if fail is None:
 | |
|                 fail = default
 | |
| 
 | |
|             if fail is True:
 | |
|                 def ignore_exc(_exc):
 | |
|                     return False
 | |
|             elif fail is False:
 | |
|                 def ignore_exc(_exc):
 | |
|                     return True
 | |
|             else:
 | |
|                 def ignore_exc(exc):
 | |
|                     for err in fail:
 | |
|                         if type(exc) == pool[err]:
 | |
|                             return False
 | |
|                     else:
 | |
|                         return True
 | |
|             args.ignore_exc = ignore_exc
 | |
|     return process_args
 | |
| 
 | |
| 
 | |
| def add_kind_filtering_cli(parser, *, default=None):
 | |
|     parser.add_argument('--kinds', action='append')
 | |
| 
 | |
|     def process_args(args, *, argv=None):
 | |
|         ns = vars(args)
 | |
| 
 | |
|         kinds = []
 | |
|         for kind in ns.pop('kinds') or default or ():
 | |
|             kinds.extend(kind.strip().replace(',', ' ').split())
 | |
| 
 | |
|         if not kinds:
 | |
|             match_kind = (lambda k: True)
 | |
|         else:
 | |
|             included = set()
 | |
|             excluded = set()
 | |
|             for kind in kinds:
 | |
|                 if kind.startswith('-'):
 | |
|                     kind = kind[1:]
 | |
|                     excluded.add(kind)
 | |
|                     if kind in included:
 | |
|                         included.remove(kind)
 | |
|                 else:
 | |
|                     included.add(kind)
 | |
|                     if kind in excluded:
 | |
|                         excluded.remove(kind)
 | |
|             if excluded:
 | |
|                 if included:
 | |
|                     ...  # XXX fail?
 | |
|                 def match_kind(kind, *, _excluded=excluded):
 | |
|                     return kind not in _excluded
 | |
|             else:
 | |
|                 def match_kind(kind, *, _included=included):
 | |
|                     return kind in _included
 | |
|         args.match_kind = match_kind
 | |
|     return process_args
 | |
| 
 | |
| 
 | |
| COMMON_CLI = [
 | |
|     add_verbosity_cli,
 | |
|     add_traceback_cli,
 | |
|     #add_dryrun_cli,
 | |
| ]
 | |
| 
 | |
| 
 | |
| def add_commands_cli(parser, commands, *, commonspecs=COMMON_CLI, subset=None):
 | |
|     arg_processors = {}
 | |
|     if isinstance(subset, str):
 | |
|         cmdname = subset
 | |
|         try:
 | |
|             _, argspecs, _ = commands[cmdname]
 | |
|         except KeyError:
 | |
|             raise ValueError(f'unsupported subset {subset!r}')
 | |
|         parser.set_defaults(cmd=cmdname)
 | |
|         arg_processors[cmdname] = _add_cmd_cli(parser, commonspecs, argspecs)
 | |
|     else:
 | |
|         if subset is None:
 | |
|             cmdnames = subset = list(commands)
 | |
|         elif not subset:
 | |
|             raise NotImplementedError
 | |
|         elif isinstance(subset, set):
 | |
|             cmdnames = [k for k in commands if k in subset]
 | |
|             subset = sorted(subset)
 | |
|         else:
 | |
|             cmdnames = [n for n in subset if n in commands]
 | |
|         if len(cmdnames) < len(subset):
 | |
|             bad = tuple(n for n in subset if n not in commands)
 | |
|             raise ValueError(f'unsupported subset {bad}')
 | |
| 
 | |
|         common = argparse.ArgumentParser(add_help=False)
 | |
|         common_processors = apply_cli_argspecs(common, commonspecs)
 | |
|         subs = parser.add_subparsers(dest='cmd')
 | |
|         for cmdname in cmdnames:
 | |
|             description, argspecs, _ = commands[cmdname]
 | |
|             sub = subs.add_parser(
 | |
|                 cmdname,
 | |
|                 description=description,
 | |
|                 parents=[common],
 | |
|             )
 | |
|             cmd_processors = _add_cmd_cli(sub, (), argspecs)
 | |
|             arg_processors[cmdname] = common_processors + cmd_processors
 | |
|     return arg_processors
 | |
| 
 | |
| 
 | |
| def _add_cmd_cli(parser, commonspecs, argspecs):
 | |
|     processors = []
 | |
|     argspecs = list(commonspecs or ()) + list(argspecs or ())
 | |
|     for argspec in argspecs:
 | |
|         if callable(argspec):
 | |
|             procs = argspec(parser)
 | |
|             _add_procs(processors, procs)
 | |
|         else:
 | |
|             if not argspec:
 | |
|                 raise NotImplementedError
 | |
|             args = list(argspec)
 | |
|             if not isinstance(args[-1], str):
 | |
|                 kwargs = args.pop()
 | |
|                 if not isinstance(args[0], str):
 | |
|                     try:
 | |
|                         args, = args
 | |
|                     except (TypeError, ValueError):
 | |
|                         parser.error(f'invalid cmd args {argspec!r}')
 | |
|             else:
 | |
|                 kwargs = {}
 | |
|             parser.add_argument(*args, **kwargs)
 | |
|             # There will be nothing to process.
 | |
|     return processors
 | |
| 
 | |
| 
 | |
| def _flatten_processors(processors):
 | |
|     for proc in processors:
 | |
|         if proc is None:
 | |
|             continue
 | |
|         if callable(proc):
 | |
|             yield proc
 | |
|         else:
 | |
|             yield from _flatten_processors(proc)
 | |
| 
 | |
| 
 | |
| def process_args(args, argv, processors, *, keys=None):
 | |
|     processors = _flatten_processors(processors)
 | |
|     ns = vars(args)
 | |
|     extracted = {}
 | |
|     if keys is None:
 | |
|         for process_args in processors:
 | |
|             for key in process_args(args, argv=argv):
 | |
|                 extracted[key] = ns.pop(key)
 | |
|     else:
 | |
|         remainder = set(keys)
 | |
|         for process_args in processors:
 | |
|             hanging = process_args(args, argv=argv)
 | |
|             if isinstance(hanging, str):
 | |
|                 hanging = [hanging]
 | |
|             for key in hanging or ():
 | |
|                 if key not in remainder:
 | |
|                     raise NotImplementedError(key)
 | |
|                 extracted[key] = ns.pop(key)
 | |
|                 remainder.remove(key)
 | |
|         if remainder:
 | |
|             raise NotImplementedError(sorted(remainder))
 | |
|     return extracted
 | |
| 
 | |
| 
 | |
| def process_args_by_key(args, argv, processors, keys):
 | |
|     extracted = process_args(args, argv, processors, keys=keys)
 | |
|     return [extracted[key] for key in keys]
 | |
| 
 | |
| 
 | |
| ##################################
 | |
| # commands
 | |
| 
 | |
| def set_command(name, add_cli):
 | |
|     """A decorator factory to set CLI info."""
 | |
|     def decorator(func):
 | |
|         if hasattr(func, '__cli__'):
 | |
|             raise Exception(f'already set')
 | |
|         func.__cli__ = (name, add_cli)
 | |
|         return func
 | |
|     return decorator
 | |
| 
 | |
| 
 | |
| ##################################
 | |
| # main() helpers
 | |
| 
 | |
| def filter_filenames(filenames, process_filenames=None, relroot=fsutil.USE_CWD):
 | |
|     # We expect each filename to be a normalized, absolute path.
 | |
|     for filename, _, check, _ in _iter_filenames(filenames, process_filenames, relroot):
 | |
|         if (reason := check()):
 | |
|             logger.debug(f'{filename}: {reason}')
 | |
|             continue
 | |
|         yield filename
 | |
| 
 | |
| 
 | |
| def main_for_filenames(filenames, process_filenames=None, relroot=fsutil.USE_CWD):
 | |
|     filenames, relroot = fsutil.fix_filenames(filenames, relroot=relroot)
 | |
|     for filename, relfile, check, show in _iter_filenames(filenames, process_filenames, relroot):
 | |
|         if show:
 | |
|             print()
 | |
|             print(relfile)
 | |
|             print('-------------------------------------------')
 | |
|         if (reason := check()):
 | |
|             print(reason)
 | |
|             continue
 | |
|         yield filename, relfile
 | |
| 
 | |
| 
 | |
| def _iter_filenames(filenames, process, relroot):
 | |
|     if process is None:
 | |
|         yield from fsutil.process_filenames(filenames, relroot=relroot)
 | |
|         return
 | |
| 
 | |
|     onempty = Exception('no filenames provided')
 | |
|     items = process(filenames, relroot=relroot)
 | |
|     items, peeked = iterutil.peek_and_iter(items)
 | |
|     if not items:
 | |
|         raise onempty
 | |
|     if isinstance(peeked, str):
 | |
|         if relroot and relroot is not fsutil.USE_CWD:
 | |
|             relroot = os.path.abspath(relroot)
 | |
|         check = (lambda: True)
 | |
|         for filename, ismany in iterutil.iter_many(items, onempty):
 | |
|             relfile = fsutil.format_filename(filename, relroot, fixroot=False)
 | |
|             yield filename, relfile, check, ismany
 | |
|     elif len(peeked) == 4:
 | |
|         yield from items
 | |
|     else:
 | |
|         raise NotImplementedError
 | |
| 
 | |
| 
 | |
| def track_progress_compact(items, *, groups=5, **mark_kwargs):
 | |
|     last = os.linesep
 | |
|     marks = iter_marks(groups=groups, **mark_kwargs)
 | |
|     for item in items:
 | |
|         last = next(marks)
 | |
|         print(last, end='', flush=True)
 | |
|         yield item
 | |
|     if not last.endswith(os.linesep):
 | |
|         print()
 | |
| 
 | |
| 
 | |
| def track_progress_flat(items, fmt='<{}>'):
 | |
|     for item in items:
 | |
|         print(fmt.format(item), flush=True)
 | |
|         yield item
 | |
| 
 | |
| 
 | |
| def iter_marks(mark='.', *, group=5, groups=2, lines=_NOT_SET, sep=' '):
 | |
|     mark = mark or ''
 | |
|     group = group if group and group > 1 else 1
 | |
|     groups = groups if groups and groups > 1 else 1
 | |
| 
 | |
|     sep = f'{mark}{sep}' if sep else mark
 | |
|     end = f'{mark}{os.linesep}'
 | |
|     div = os.linesep
 | |
|     perline = group * groups
 | |
|     if lines is _NOT_SET:
 | |
|         # By default we try to put about 100 in each line group.
 | |
|         perlines = 100 // perline * perline
 | |
|     elif not lines or lines < 0:
 | |
|         perlines = None
 | |
|     else:
 | |
|         perlines = perline * lines
 | |
| 
 | |
|     if perline == 1:
 | |
|         yield end
 | |
|     elif group == 1:
 | |
|         yield sep
 | |
| 
 | |
|     count = 1
 | |
|     while True:
 | |
|         if count % perline == 0:
 | |
|             yield end
 | |
|             if perlines and count % perlines == 0:
 | |
|                 yield div
 | |
|         elif count % group == 0:
 | |
|             yield sep
 | |
|         else:
 | |
|             yield mark
 | |
|         count += 1
 |