gh-132983: Remove leftovers from EndlessZstdDecompressor (#133856)

Co-authored-by: Adam Turner <9087854+AA-Turner@users.noreply.github.com>
Co-authored-by: Peter Bierma <zintensitydev@gmail.com>
This commit is contained in:
Rogdham 2025-05-11 04:04:25 +02:00 committed by GitHub
parent 1a87b6e9ae
commit 878e0fb8b4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -43,20 +43,11 @@ typedef struct {
PyObject *unused_data;
/* 0 if decompressor has (or may has) unconsumed input data, 0 or 1. */
char needs_input;
/* For decompress(), 0 or 1.
1 when both input and output streams are at a frame edge, means a
frame is completely decoded and fully flushed, or the decompressor
just be initialized. */
char at_frame_edge;
bool needs_input;
/* For ZstdDecompressor, 0 or 1.
1 means the end of the first frame has been reached. */
char eof;
/* Used for fast reset above three variables */
char _unused_char_for_align;
bool eof;
/* __init__ has been called, 0 or 1. */
bool initialized;
@ -258,19 +249,13 @@ load:
return 0;
}
typedef enum {
TYPE_DECOMPRESSOR, // <D>, ZstdDecompressor class
TYPE_ENDLESS_DECOMPRESSOR, // <E>, decompress() function
} decompress_type;
/*
Given the two types of decompressors (defined above),
decompress implementation for <D>, <E>, pseudo code:
Decompress implementation in pseudo code:
initialize_output_buffer
while True:
decompress_data
set_object_flag # .eof for <D>, .at_frame_edge for <E>.
set_object_flag # .eof
if output_buffer_exhausted:
if output_buffer_reached_max_length:
@ -287,64 +272,20 @@ typedef enum {
flushing to do to complete current frame.
Note, decompressing "an empty input" in any case will make it > 0.
<E> supports multiple frames, has an .at_frame_edge flag, it means both the
input and output streams are at a frame edge. The flag can be set by this
statement:
.at_frame_edge = (zstd_ret == 0) ? 1 : 0
But if decompressing "an empty input" at "a frame edge", zstd_ret will be
non-zero, then .at_frame_edge will be wrongly set to false. To solve this
problem, two AFE checks are needed to ensure that: when at "a frame edge",
empty input will not be decompressed.
// AFE check
if (self->at_frame_edge && in->pos == in->size) {
finish
}
In <E>, if .at_frame_edge is eventually set to true, but input stream has
unconsumed data (in->pos < in->size), then the outer function
stream_decompress() will set .at_frame_edge to false. In this case,
although the output stream is at a frame edge, for the caller, the input
stream is not at a frame edge, see below diagram. This behavior does not
affect the next AFE check, since (in->pos < in->size).
input stream: --------------|---
^
output stream: ====================|
^
*/
static PyObject *
decompress_impl(ZstdDecompressor *self, ZSTD_inBuffer *in,
Py_ssize_t max_length,
Py_ssize_t initial_size,
decompress_type type)
Py_ssize_t max_length)
{
size_t zstd_ret;
ZSTD_outBuffer out;
_BlocksOutputBuffer buffer = {.list = NULL};
PyObject *ret;
/* The first AFE check for setting .at_frame_edge flag */
if (type == TYPE_ENDLESS_DECOMPRESSOR) {
if (self->at_frame_edge && in->pos == in->size) {
return Py_GetConstant(Py_CONSTANT_EMPTY_BYTES);
}
}
/* Initialize the output buffer */
if (initial_size >= 0) {
if (_OutputBuffer_InitWithSize(&buffer, &out, max_length, initial_size) < 0) {
goto error;
}
}
else {
if (_OutputBuffer_InitAndGrow(&buffer, &out, max_length) < 0) {
goto error;
}
}
assert(out.pos == 0);
while (1) {
@ -362,23 +303,12 @@ decompress_impl(ZstdDecompressor *self, ZSTD_inBuffer *in,
goto error;
}
/* Set .eof/.af_frame_edge flag */
if (type == TYPE_DECOMPRESSOR) {
/* ZstdDecompressor class stops when a frame is decompressed */
/* Set .eof flag */
if (zstd_ret == 0) {
/* Stop when a frame is decompressed */
self->eof = 1;
break;
}
}
else if (type == TYPE_ENDLESS_DECOMPRESSOR) {
/* decompress() function supports multiple frames */
self->at_frame_edge = (zstd_ret == 0) ? 1 : 0;
/* The second AFE check for setting .at_frame_edge flag */
if (self->at_frame_edge && in->pos == in->size) {
break;
}
}
/* Need to check out before in. Maybe zstd's internal buffer still has
a few bytes can be output, grow the buffer and continue. */
@ -415,8 +345,7 @@ error:
}
static void
decompressor_reset_session(ZstdDecompressor *self,
decompress_type type)
decompressor_reset_session(ZstdDecompressor *self)
{
// TODO(emmatyping): use _Py_CRITICAL_SECTION_ASSERT_OBJECT_LOCKED here
// and ensure lock is always held
@ -425,56 +354,28 @@ decompressor_reset_session(ZstdDecompressor *self,
self->in_begin = 0;
self->in_end = 0;
if (type == TYPE_DECOMPRESSOR) {
Py_CLEAR(self->unused_data);
}
/* Reset variables in one operation */
self->needs_input = 1;
self->at_frame_edge = 1;
self->eof = 0;
self->_unused_char_for_align = 0;
/* Resetting session never fail */
/* Resetting session is guaranteed to never fail */
ZSTD_DCtx_reset(self->dctx, ZSTD_reset_session_only);
}
static PyObject *
stream_decompress(ZstdDecompressor *self, Py_buffer *data, Py_ssize_t max_length,
decompress_type type)
stream_decompress(ZstdDecompressor *self, Py_buffer *data, Py_ssize_t max_length)
{
Py_ssize_t initial_buffer_size = -1;
ZSTD_inBuffer in;
PyObject *ret = NULL;
int use_input_buffer;
if (type == TYPE_DECOMPRESSOR) {
/* Check .eof flag */
if (self->eof) {
PyErr_SetString(PyExc_EOFError, "Already at the end of a zstd frame.");
assert(ret == NULL);
goto success;
}
}
else if (type == TYPE_ENDLESS_DECOMPRESSOR) {
/* Fast path for the first frame */
if (self->at_frame_edge && self->in_begin == self->in_end) {
/* Read decompressed size */
uint64_t decompressed_size = ZSTD_getFrameContentSize(data->buf, data->len);
/* These two zstd constants always > PY_SSIZE_T_MAX:
ZSTD_CONTENTSIZE_UNKNOWN is (0ULL - 1)
ZSTD_CONTENTSIZE_ERROR is (0ULL - 2)
Use ZSTD_findFrameCompressedSize() to check complete frame,
prevent allocating too much memory for small input chunk. */
if (decompressed_size <= (uint64_t) PY_SSIZE_T_MAX &&
!ZSTD_isError(ZSTD_findFrameCompressedSize(data->buf, data->len)) )
{
initial_buffer_size = (Py_ssize_t) decompressed_size;
}
}
return NULL;
}
/* Prepare input buffer w/wo unconsumed data */
@ -561,31 +462,19 @@ stream_decompress(ZstdDecompressor *self, Py_buffer *data, Py_ssize_t max_length
assert(in.pos == 0);
/* Decompress */
ret = decompress_impl(self, &in,
max_length, initial_buffer_size,
type);
ret = decompress_impl(self, &in, max_length);
if (ret == NULL) {
goto error;
}
/* Unconsumed input data */
if (in.pos == in.size) {
if (type == TYPE_DECOMPRESSOR) {
if (Py_SIZE(ret) == max_length || self->eof) {
self->needs_input = 0;
}
else {
self->needs_input = 1;
}
}
else if (type == TYPE_ENDLESS_DECOMPRESSOR) {
if (Py_SIZE(ret) == max_length && !self->at_frame_edge) {
self->needs_input = 0;
}
else {
self->needs_input = 1;
}
}
if (use_input_buffer) {
/* Clear input_buffer */
@ -598,10 +487,6 @@ stream_decompress(ZstdDecompressor *self, Py_buffer *data, Py_ssize_t max_length
self->needs_input = 0;
if (type == TYPE_ENDLESS_DECOMPRESSOR) {
self->at_frame_edge = 0;
}
if (!use_input_buffer) {
/* Discard buffer if it's too small
(resizing it may needlessly copy the current contents) */
@ -634,16 +519,14 @@ stream_decompress(ZstdDecompressor *self, Py_buffer *data, Py_ssize_t max_length
}
}
goto success;
return ret;
error:
/* Reset decompressor's states/session */
decompressor_reset_session(self, type);
decompressor_reset_session(self);
Py_CLEAR(ret);
success:
return ret;
return NULL;
}
@ -668,9 +551,6 @@ _zstd_ZstdDecompressor_new(PyTypeObject *type, PyObject *args, PyObject *kwds)
/* needs_input flag */
self->needs_input = 1;
/* at_frame_edge flag */
self->at_frame_edge = 1;
/* Decompression context */
self->dctx = ZSTD_createDCtx();
if (self->dctx == NULL) {
@ -837,7 +717,7 @@ _zstd_ZstdDecompressor_decompress_impl(ZstdDecompressor *self,
/* Thread-safe code */
Py_BEGIN_CRITICAL_SECTION(self);
ret = stream_decompress(self, data, max_length, TYPE_DECOMPRESSOR);
ret = stream_decompress(self, data, max_length);
Py_END_CRITICAL_SECTION();
return ret;
}