progress column base class

This commit is contained in:
Will McGugan 2020-03-10 18:16:55 +00:00
parent 16455b6a5c
commit 9aba5ddd9f
4 changed files with 200 additions and 40 deletions

View file

@ -6,10 +6,10 @@ from typing import Iterable
from urllib.request import urlopen
from rich.progress import (
bar_widget,
data_speed_widget,
file_size_widget,
remaining_widget,
BarColumn,
TransferSpeedColumn,
FileSizeColumn,
TimeRemainingColumn,
Progress,
TaskID,
)
@ -17,14 +17,14 @@ from rich.progress import (
progress = Progress(
"[bold blue]{task.fields[filename]}",
bar_widget,
BarColumn(),
"[progress.percentage]{task.percentage:>3.0f}%",
"",
file_size_widget,
FileSizeColumn(),
"",
data_speed_widget,
TransferSpeedColumn(),
"",
remaining_widget,
TimeRemainingColumn(),
)

View file

@ -81,7 +81,7 @@ DEFAULT_STYLES: Dict[str, Style] = {
"traceback.exc_type": Style(color="bright_red", bold=True),
"traceback.exc_value": Style(),
"traceback.offset": Style(color="bright_red", bold=True),
"bar.back": Style(color="grey50"),
"bar.back": Style(color="grey23"),
"bar.complete": Style(color="bright_magenta"),
"bar.done": Style(color="bright_green"),
"progress.data": Style(color="green"),

108
rich/filesize.py Normal file
View file

@ -0,0 +1,108 @@
# coding: utf-8
"""Functions for reporting filesizes.
The functions declared in this module should cover the different
usecases needed to generate a string representation of a file size
using several different units. Since there are many standards regarding
file size units, three different functions have been implemented.
See Also:
* `Wikipedia: Binary prefix <https://en.wikipedia.org/wiki/Binary_prefix>`_
"""
__all__ = ["traditional", "decimal", "binary"]
from typing import Iterable
def _to_str(size: int, suffixes: Iterable[str], base: int) -> int:
try:
size = int(size)
except ValueError:
raise TypeError("filesize requires a numeric value, not {!r}".format(size))
if size == 1:
return "1 byte"
elif size < base:
return "{:,} bytes".format(size)
for i, suffix in enumerate(suffixes, 2): # noqa: B007
unit = base ** i
if size < unit:
break
return "{:,.1f} {}".format((base * size / unit), suffix)
def traditional(size: int) -> str:
# type: (SupportsInt) -> Text
"""Convert a filesize in to a string (powers of 1024, JDEC prefixes).
In this convention, ``1024 B = 1 KB``.
This is the format that was used to display the size of DVDs
(*700 MB* meaning actually about *734 003 200 bytes*) before
standardisation of IEC units among manufacturers, and still
used by **Windows** to report the storage capacity of hard
drives (*279.4 GB* meaning *279.4 × 1024³ bytes*).
Arguments:
size (int): A file size.
Returns:
`str`: A string containing an abbreviated file size and units.
Example:
>>> filesize.traditional(30000)
'29.3 KB'
"""
return _to_str(size, ("KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB"), 1024)
def binary(size: int) -> str:
"""Convert a filesize in to a string (powers of 1024, IEC prefixes).
In this convention, ``1024 B = 1 KiB``.
This is the format that has gained adoption among manufacturers
to avoid ambiguity regarding size units, since it explicitly states
using a binary base (*KiB = kibi bytes = kilo binary bytes*).
This format is notably being used by the **Linux** kernel (see
``man 7 units``).
Arguments:
int (size): A file size.
Returns:
`str`: A string containing a abbreviated file size and units.
Example:
>>> filesize.binary(30000)
'29.3 KiB'
"""
return _to_str(size, ("KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB"), 1024)
def decimal(size: int) -> str:
"""Convert a filesize in to a string (powers of 1000, SI prefixes).
In this convention, ``1000 B = 1 kB``.
This is typically the format used to advertise the storage
capacity of USB flash drives and the like (*256 MB* meaning
actually a storage capacity of more than *256 000 000 B*),
or used by **Mac OS X** since v10.6 to report file sizes.
Arguments:
int (size): A file size.
Returns:
`str`: A string containing a abbreviated file size and units.
Example:
>>> filesize.decimal(30000)
'30.0 kB'
"""
return _to_str(size, ("kB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB"), 1000)

View file

@ -1,3 +1,4 @@
from abc import ABC, abstractmethod
from collections import deque
from contextlib import contextmanager
from dataclasses import dataclass, field
@ -14,6 +15,7 @@ from typing import (
List,
Optional,
NamedTuple,
Tuple,
NewType,
Union,
)
@ -27,44 +29,91 @@ from .text import Text
TaskID = NewType("TaskID", int)
WidgetCallable = Callable[["Task"], RenderableType]
class ProgressError(Exception):
pass
class ProgressColumn(ABC):
"""Base class for a widget to use in progress display."""
max_refresh: Optional[float] = None
def __init__(self) -> None:
self._renderable_cache: Dict[TaskID, Tuple[float, RenderableType]] = {}
self._update_time: Optional[float] = None
def __call__(self, task: "Task") -> RenderableType:
"""Called by the Progress object to return a renderable for the given task.
Args:
task (Task): An object containing information regarding the task.
Returns:
RenderableType: Anything renderable (including str).
"""
current_time = monotonic()
if self.max_refresh is not None:
try:
timestamp, renderable = self._renderable_cache[task.id]
except KeyError:
pass
else:
if timestamp + self.max_refresh > current_time:
return renderable
renderable = self.render(task)
self._renderable_cache[task.id] = (current_time, renderable)
return renderable
@abstractmethod
def render(self, task: "Task") -> RenderableType:
"""Should return a renderable object."""
class MissingWidget(ProgressError):
pass
class BarColumn(ProgressColumn):
"""Renders a progress bar."""
def __init__(self, bar_width: Optional[int] = 40) -> None:
self.bar_width = bar_width
super().__init__()
def render(self, task: "Task") -> Bar:
"""Gets a progress bar widget for a task."""
return Bar(total=task.total, completed=task.completed, width=self.bar_width)
def bar_widget(task: "Task") -> Bar:
"""Gets a progress bar widget for a task."""
return Bar(total=task.total, completed=task.completed, width=40)
class TimeRemainingColumn(ProgressColumn):
"""Renders estimated time remaining."""
# Only refresh twice a second to prevent jitter
max_refresh = 0.5
def render(self, task: "Task") -> str:
"""Show time remaining."""
remaining = task.time_remaining
if remaining is None:
return Text("?", style="progress.remaining")
remaining_delta = timedelta(seconds=floor(remaining))
return Text(str(remaining_delta), style="progress.remaining")
def remaining_widget(task: "Task") -> str:
"""Show time remaining."""
remaining = task.time_remaining
if remaining is None:
return Text("?", style="progress.remaining")
remaining_delta = timedelta(seconds=floor(remaining))
return Text(str(remaining_delta), style="progress.remaining")
class FileSizeColumn(ProgressColumn):
"""Renders human readable filesize."""
def render(self, task: "Task") -> str:
"""Show data completed."""
data_size = filesize.decimal(task.completed)
return Text(data_size, style="progress.data")
def file_size_widget(task: "Task") -> str:
"""Show data completed."""
data_size = filesize.decimal(task.completed)
return Text(data_size, style="progress.data")
class TransferSpeedColumn(ProgressColumn):
"""Renders human readable transfer speed."""
def data_speed_widget(task: "Task") -> str:
"""Show data transfer speed."""
speed = task.speed
if speed is None:
return Text("?", style="progress.data.speed")
data_speed = filesize.decimal(int(speed))
return Text(f"{data_speed}/s", style="progress.data.speed")
def render(self, task: "Task") -> str:
"""Show data transfer speed."""
speed = task.speed
if speed is None:
return Text("?", style="progress.data.speed")
data_speed = filesize.decimal(int(speed))
return Text(f"{data_speed}/s", style="progress.data.speed")
class _ProgressSample(NamedTuple):
@ -78,6 +127,7 @@ class _ProgressSample(NamedTuple):
class Task:
"""Stores information regarding a progress task."""
id: TaskID
name: str
total: float
completed: float
@ -173,7 +223,7 @@ class Progress:
def __init__(
self,
*columns: Union[str, WidgetCallable],
*columns: Union[str, ProgressColumn],
console: Console = None,
auto_refresh: bool = True,
refresh_per_second: int = 15,
@ -181,9 +231,9 @@ class Progress:
) -> None:
self.columns = columns or (
"{task.name}",
bar_widget,
BarColumn(),
"[progress.percentage]{task.percentage:>3.0f}%",
remaining_widget,
TimeRemainingColumn(),
)
self.console = console or Console(file=sys.stderr)
self.auto_refresh = auto_refresh
@ -349,7 +399,9 @@ class Progress:
TaskID: An ID you can use when calling `update`.
"""
with self._lock:
task = Task(name, total, completed, visible=visible, fields=fields)
task = Task(
self._task_index, name, total, completed, visible=visible, fields=fields
)
self._tasks[self._task_index] = task
if start:
self.start(self._task_index)