Tweak response buffering (#224)

In my testing, we can both increase the number of concurrent requests
and remove the `ready_chunks`.
This commit is contained in:
Charlie Marsh 2023-10-29 18:07:46 -07:00 committed by GitHub
parent 1c5cdcd70a
commit 6da9c2f534
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -511,68 +511,62 @@ impl<'a, Context: BuildContext + Sync> Resolver<'a, Context> {
async fn fetch(&self, request_stream: UnboundedReceiver<Request>) -> Result<(), ResolveError> {
let mut response_stream = request_stream
.map(|request| self.process_request(request))
.buffer_unordered(32)
.ready_chunks(32);
.buffer_unordered(50);
while let Some(chunk) = response_stream.next().await {
for response in chunk {
match response? {
Response::Package(package_name, metadata) => {
trace!("Received package metadata for {}", package_name);
while let Some(response) = response_stream.next().await {
match response? {
Response::Package(package_name, metadata) => {
trace!("Received package metadata for {}", package_name);
// Group the distributions by version and kind, discarding any incompatible
// distributions.
let mut version_map: VersionMap = BTreeMap::new();
for file in metadata.files {
if let Ok(name) = WheelFilename::from_str(file.filename.as_str()) {
if name.is_compatible(self.tags) {
let version = PubGrubVersion::from(name.version);
// Group the distributions by version and kind, discarding any incompatible
// distributions.
let mut version_map: VersionMap = BTreeMap::new();
for file in metadata.files {
if let Ok(name) = WheelFilename::from_str(file.filename.as_str()) {
if name.is_compatible(self.tags) {
let version = PubGrubVersion::from(name.version);
match version_map.entry(version) {
std::collections::btree_map::Entry::Occupied(mut entry) => {
if let DistributionFile::Sdist(_) = entry.get() {
// Wheels get precedence over source distributions
entry.insert(DistributionFile::from(
WheelFile::from(file),
));
}
}
std::collections::btree_map::Entry::Vacant(entry) => {
match version_map.entry(version) {
std::collections::btree_map::Entry::Occupied(mut entry) => {
if let DistributionFile::Sdist(_) = entry.get() {
// Wheels get precedence over source distributions
entry.insert(DistributionFile::from(WheelFile::from(
file,
)));
}
}
}
} else if let Ok(name) = SourceDistributionFilename::parse(
file.filename.as_str(),
&package_name,
) {
let version = PubGrubVersion::from(name.version);
if let std::collections::btree_map::Entry::Vacant(entry) =
version_map.entry(version)
{
entry.insert(DistributionFile::from(SdistFile::from(file)));
std::collections::btree_map::Entry::Vacant(entry) => {
entry.insert(DistributionFile::from(WheelFile::from(file)));
}
}
}
} else if let Ok(name) =
SourceDistributionFilename::parse(file.filename.as_str(), &package_name)
{
let version = PubGrubVersion::from(name.version);
if let std::collections::btree_map::Entry::Vacant(entry) =
version_map.entry(version)
{
entry.insert(DistributionFile::from(SdistFile::from(file)));
}
}
}
self.index
.packages
.insert(package_name.clone(), version_map);
}
Response::Wheel(file, metadata) => {
trace!("Received file metadata for {}", file.filename);
self.index
.versions
.insert(file.hashes.sha256.clone(), metadata);
}
Response::Sdist(file, metadata) => {
trace!("Received sdist build metadata for {}", file.filename);
self.index
.versions
.insert(file.hashes.sha256.clone(), metadata);
}
self.index
.packages
.insert(package_name.clone(), version_map);
}
Response::Wheel(file, metadata) => {
trace!("Received file metadata for {}", file.filename);
self.index
.versions
.insert(file.hashes.sha256.clone(), metadata);
}
Response::Sdist(file, metadata) => {
trace!("Received sdist build metadata for {}", file.filename);
self.index
.versions
.insert(file.hashes.sha256.clone(), metadata);
}
}
}