mirror of
https://github.com/astral-sh/uv.git
synced 2025-10-29 11:07:59 +00:00
Remove WaitMap dependency (#1183)
## Summary This is an attempt to https://github.com/astral-sh/puffin/pull/1163 by removing the `WaitMap` and gaining more granular control over the values that we hold over `await` boundaries.
This commit is contained in:
parent
c129717b41
commit
3f5e7306a5
13 changed files with 130 additions and 218 deletions
|
|
@ -1,11 +1,9 @@
|
|||
use std::borrow::Borrow;
|
||||
use std::collections::hash_map::RandomState;
|
||||
|
||||
use std::hash::Hash;
|
||||
use std::sync::Mutex;
|
||||
use std::sync::Arc;
|
||||
|
||||
use rustc_hash::FxHashSet;
|
||||
use waitmap::{Ref, WaitMap};
|
||||
use dashmap::DashMap;
|
||||
use tokio::sync::Notify;
|
||||
|
||||
/// Run tasks only once and store the results in a parallel hash map.
|
||||
///
|
||||
|
|
@ -14,9 +12,7 @@ use waitmap::{Ref, WaitMap};
|
|||
/// dist builds, we want to wait until the other task is done and get a reference to the same
|
||||
/// result.
|
||||
pub struct OnceMap<K: Eq + Hash, V> {
|
||||
/// Computations that were started, including those that were finished.
|
||||
started: Mutex<FxHashSet<K>>,
|
||||
wait_map: WaitMap<K, V>,
|
||||
items: DashMap<K, Value<V>>,
|
||||
}
|
||||
|
||||
impl<K: Eq + Hash, V> OnceMap<K, V> {
|
||||
|
|
@ -25,72 +21,68 @@ impl<K: Eq + Hash, V> OnceMap<K, V> {
|
|||
/// If this method returns `true`, you need to start a job and call [`OnceMap::done`] eventually
|
||||
/// or other tasks will hang. If it returns `false`, this job is already in progress and you
|
||||
/// can [`OnceMap::wait`] for the result.
|
||||
pub fn register<Q>(&self, key: &Q) -> bool
|
||||
where
|
||||
K: Borrow<Q>,
|
||||
Q: ?Sized + Hash + Eq + ToOwned<Owned = K>,
|
||||
{
|
||||
let mut lock = self.started.lock().unwrap();
|
||||
if lock.contains(key) {
|
||||
return false;
|
||||
pub fn register(&self, key: K) -> bool {
|
||||
let entry = self.items.entry(key);
|
||||
match entry {
|
||||
dashmap::mapref::entry::Entry::Occupied(_) => false,
|
||||
dashmap::mapref::entry::Entry::Vacant(entry) => {
|
||||
entry.insert(Value::Waiting(Arc::new(Notify::new())));
|
||||
true
|
||||
}
|
||||
}
|
||||
lock.insert(key.to_owned())
|
||||
}
|
||||
|
||||
/// Like [`OnceMap::register`], but takes ownership of the key.
|
||||
pub fn register_owned(&self, key: K) -> bool {
|
||||
let mut lock = self.started.lock().unwrap();
|
||||
if lock.contains(&key) {
|
||||
return false;
|
||||
}
|
||||
lock.insert(key)
|
||||
}
|
||||
|
||||
/// Submit the result of a job you registered.
|
||||
pub fn done(&self, key: K, value: V) {
|
||||
self.wait_map.insert(key, value);
|
||||
if let Some(Value::Waiting(notify)) = self.items.insert(key, Value::Filled(Arc::new(value)))
|
||||
{
|
||||
notify.notify_waiters();
|
||||
}
|
||||
}
|
||||
|
||||
/// Wait for the result of a job that is running.
|
||||
///
|
||||
/// Will hang if [`OnceMap::done`] isn't called for this key.
|
||||
pub async fn wait<Q: ?Sized + Hash + Eq>(
|
||||
&self,
|
||||
key: &Q,
|
||||
) -> Result<Ref<'_, K, V, RandomState>, Error>
|
||||
where
|
||||
K: Borrow<Q> + for<'a> From<&'a Q>,
|
||||
{
|
||||
self.wait_map.wait(key).await.ok_or(Error::Canceled)
|
||||
pub async fn wait(&self, key: &K) -> Option<Arc<V>> {
|
||||
let entry = self.items.get(key)?;
|
||||
match entry.value() {
|
||||
Value::Filled(value) => Some(value.clone()),
|
||||
Value::Waiting(notify) => {
|
||||
let notify = notify.clone();
|
||||
drop(entry);
|
||||
notify.notified().await;
|
||||
|
||||
let entry = self.items.get(key).expect("map is append-only");
|
||||
match entry.value() {
|
||||
Value::Filled(value) => Some(value.clone()),
|
||||
Value::Waiting(_) => unreachable!("notify was called"),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Return the result of a previous job, if any.
|
||||
pub fn get<Q: ?Sized + Hash + Eq>(&self, key: &Q) -> Option<Ref<'_, K, V, RandomState>>
|
||||
pub fn get<Q: ?Sized + Hash + Eq>(&self, key: &Q) -> Option<Arc<V>>
|
||||
where
|
||||
K: Borrow<Q>,
|
||||
{
|
||||
self.wait_map.get(key)
|
||||
}
|
||||
|
||||
/// Cancel all waiting tasks.
|
||||
///
|
||||
/// Warning: waiting on tasks that have been canceled will cause the map to hang.
|
||||
pub fn cancel_all(&self) {
|
||||
self.wait_map.cancel_all();
|
||||
let entry = self.items.get(key)?;
|
||||
match entry.value() {
|
||||
Value::Filled(value) => Some(value.clone()),
|
||||
Value::Waiting(_) => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<K: Eq + Hash + Clone, V> Default for OnceMap<K, V> {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
started: Mutex::new(FxHashSet::default()),
|
||||
wait_map: WaitMap::new(),
|
||||
items: DashMap::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum Error {
|
||||
#[error("The operation was canceled")]
|
||||
Canceled,
|
||||
enum Value<V> {
|
||||
Waiting(Arc<Notify>),
|
||||
Filled(Arc<V>),
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue