diff --git a/compiler/load/src/file.rs b/compiler/load/src/file.rs index 710b5e9ab4..10a7353b30 100644 --- a/compiler/load/src/file.rs +++ b/compiler/load/src/file.rs @@ -1,4 +1,7 @@ use bumpalo::Bump; +use crossbeam::channel::{unbounded, Receiver, Sender}; +use crossbeam::deque::{Injector, Steal, Stealer, Worker}; +use crossbeam::thread::{self, Scope}; use roc_builtins::std::{Mode, StdLib}; use roc_can::constraint::Constraint; use roc_can::def::Declaration; @@ -12,8 +15,6 @@ use roc_module::ident::{Ident, ModuleName}; use roc_module::symbol::{IdentIds, Interns, ModuleId, ModuleIds, Symbol}; use roc_parse::ast::{self, Attempting, ExposesEntry, ImportsEntry}; use roc_parse::module::module_defs; -use crossbeam::deque::{Injector, Steal, Stealer, Worker}; -use std::iter; use roc_parse::parser::{Fail, Parser, State}; use roc_region::all::{Located, Region}; use roc_solve::module::SolvedModule; @@ -23,11 +24,10 @@ use roc_types::subs::{Subs, VarStore, Variable}; use std::collections::{HashMap, HashSet}; use std::fs; use std::io; +use std::iter; use std::path::{Path, PathBuf}; use std::str::from_utf8_unchecked; use std::sync::{Arc, Mutex}; -use tokio::sync::mpsc; -use tokio::task::spawn_blocking; /// Filename extension for normal Roc modules const ROC_FILE_EXTENSION: &str = "roc"; @@ -47,11 +47,6 @@ pub struct LoadedModule { pub src: Box, } -#[derive(Debug, Clone)] -struct Env { - pub src_dir: PathBuf, -} - #[derive(Debug)] pub enum BuildProblem<'a> { FileNotFound(&'a Path), @@ -112,8 +107,8 @@ enum MaybeShared<'a, 'b, A, B> { type SharedModules<'a, 'b> = MaybeShared<'a, 'b, ModuleIds, IdentIdsByModule>; type IdentIdsByModule = MutMap; -type MsgSender = mpsc::Sender; -type MsgReceiver = mpsc::Receiver; +type MsgSender = Sender; +type MsgReceiver = Receiver; /// The loading process works like this, starting from the given filename (e.g. "main.roc"): /// @@ -167,7 +162,7 @@ pub fn load<'a>( ) -> Result { use self::MaybeShared::*; - let (msg_tx, msg_rx): (MsgSender, MsgReceiver) = mpsc::channel(1024); + let (msg_tx, msg_rx): (MsgSender, MsgReceiver) = unbounded(); let mut module_ids = ModuleIds::default(); let mut root_exposed_ident_ids: IdentIdsByModule = IdentIds::exposed_builtins(0); @@ -178,10 +173,80 @@ pub fn load<'a>( Unique(&mut module_ids, &mut root_exposed_ident_ids), )?; - load_deps(root_id, msg_tx, msg_rx, stdlib, src_dir, module_ids, root_exposed_ident_ids, exposed_types).await + load_deps( + root_id, + msg_tx, + msg_rx, + stdlib, + src_dir, + module_ids, + root_exposed_ident_ids, + exposed_types, + ) } -async fn load_deps<'a>( +#[derive(Debug)] +struct Env { + pub root_id: ModuleId, + pub src_dir: PathBuf, + pub exposed_types: SubsByModule, + + pub can_problems: Vec, + pub headers_parsed: MutSet, + pub type_problems: Vec, + + /// This is the "final" list of IdentIds, after canonicalization and constraint gen + /// have completed for a given module. + pub constrained_ident_ids: MutMap, + + /// From now on, these will be used by multiple threads; time to make an Arc>! + pub arc_modules: Arc>, + + pub ident_ids_by_module: Arc>, + + /// All the dependent modules we've already begun loading - + /// meaning we should never kick off another load_module on them! + pub loading_started: MutSet, + + pub declarations_by_id: MutMap>, + + pub exposed_symbols_by_module: MutMap>, + + /// Modules which are waiting for certain headers to be parsed + pub waiting_for_headers: MutMap>, + + + // When the key ModuleId gets solved, iterate through each of the given modules + // and remove that ModuleId from the appropriate waiting_for_headers entry. + // If the relevant module's waiting_for_headers entry is now empty, canonicalize the module. + pub header_listeners: MutMap>, + + pub unparsed_modules: MutMap, + + // Modules which are waiting for certain deps to be solved + pub waiting_for_solve: MutMap>, + + // When the key ModuleId gets solved, iterate through each of the given modules + // and remove that ModuleId from the appropriate waiting_for_solve entry. + // If the relevant module's waiting_for_solve entry is now empty, solve the module. + pub solve_listeners: MutMap>, + + #[allow(clippy::type_complexity)] + pub unsolved_modules: MutMap< + ModuleId, + (Module, Box, MutSet, Constraint, VarStore), + >, + + /// Idle worker threads will try to take from this first, and if it's empty, + /// steal from each other. + pub main_task_queue: Injector, +} + +enum BuildTask { + NoOp +} + +fn load_deps<'a>( root_id: ModuleId, msg_tx: MsgSender, mut msg_rx: MsgReceiver, @@ -191,327 +256,340 @@ async fn load_deps<'a>( root_exposed_ident_ids: IdentIdsByModule, mut exposed_types: SubsByModule, ) -> Result { + thread::scope(|scope| { + let mut headers_parsed = MutSet::default(); + + // We've already parsed the root's header. (But only its header, so far.) + headers_parsed.insert(root_id); + + let mut loading_started = MutSet::default(); + + // If the root module we're still processing happens to be an interface, + // it's possible that something else will import it. That will + // necessarily cause a cyclic import error, but in the meantime + // we still shouldn't load it. + loading_started.insert(root_id); + + let env = Env { + root_id, + src_dir, + exposed_types, + headers_parsed, + loading_started, + can_problems: Vec::new(), + type_problems: Vec::new(), + arc_modules: Arc::new(Mutex::new(module_ids)), + constrained_ident_ids: IdentIds::exposed_builtins(0), + ident_ids_by_module: Arc::new(Mutex::new(root_exposed_ident_ids)), + declarations_by_id: MutMap::default(), + exposed_symbols_by_module: MutMap::default(), + waiting_for_headers: MutMap::default(), + header_listeners: MutMap::default(), + unparsed_modules: MutMap::default(), + waiting_for_solve: MutMap::default(), + solve_listeners: MutMap::default(), + unsolved_modules: MutMap::default(), + main_task_queue: Injector::new() + }; + + // The root module will have already queued up messages to process, + // and processing those messages will in turn queue up more messages. + while let Ok(msg) = msg_rx.recv() { + match update(env, stdlib, msg_tx, msg_rx, msg) { + Ok(Some(loaded_module)) => { + // We're done! + return Ok(loaded_module); + } + Ok(None) => { + // We have more messages to process. Keep waiting! + } + Err(loading_problem) => { + // We hit an unrecoverable LoadingProblem. Bail out! + return Err(loading_problem); + } + } + } + + // The msg_rx receiver closed unexpectedly before we finished solving everything + Err(LoadingProblem::MsgChannelDied) + }) + .unwrap() +} + +fn update(mut env: Env, stdlib: &StdLib, msg_tx: MsgSender, mut msg_rx: MsgReceiver, msg: Msg) -> Result, LoadingProblem> { use self::MaybeShared::*; + use self::Msg::*; - let mut type_problems = Vec::new(); - let mut can_problems = Vec::new(); - let mut headers_parsed = MutSet::default(); + let Env { + root_id, + src_dir, + exposed_types, + can_problems, + type_problems, + headers_parsed, + arc_modules, + constrained_ident_ids, + ident_ids_by_module, + loading_started, + declarations_by_id, + exposed_symbols_by_module, + waiting_for_headers, + header_listeners, + unparsed_modules, + waiting_for_solve, + solve_listeners, + unsolved_modules, + main_task_queue, + } = env; - headers_parsed.insert(root_id); + match msg { + Header(header) => { + let home = header.module_id; + let deps_by_name = &header.deps_by_name; + let mut headers_needed = + HashSet::with_capacity_and_hasher(deps_by_name.len(), default_hasher()); - let env = Env { - src_dir: src_dir.clone(), - }; + headers_parsed.insert(home); - // This is the "final" list of IdentIds, after canonicalization and constraint gen - // have completed for a given module. - let mut constrained_ident_ids = IdentIds::exposed_builtins(0); + for dep_id in deps_by_name.values() { + if !headers_parsed.contains(&dep_id) { + headers_needed.insert(*dep_id); + } + } - // From now on, these will be used by multiple threads; time to make an Arc>! - let arc_modules = Arc::new(Mutex::new(module_ids)); - let ident_ids_by_module: Arc> = - Arc::new(Mutex::new(root_exposed_ident_ids)); + // This was a dependency. Write it down and keep processing messaages. + let mut exposed_symbols: MutSet = + HashSet::with_capacity_and_hasher(header.exposes.len(), default_hasher()); - // All the dependent modules we've already begun loading - - // meaning we should never kick off another load_module on them! - let mut loading_started: MutSet = MutSet::default(); + // TODO can we avoid this loop by storing them as a Set in Header to begin with? + for symbol in header.exposes.iter() { + exposed_symbols.insert(*symbol); + } - // If the root module we're compiling happens to be an interface, - // it's possible that something else will import it. That will - // necessarily cause a cyclic import error, but in the meantime - // we still shouldn't load it. - loading_started.insert(root_id); + debug_assert!(!exposed_symbols_by_module.contains_key(&home)); + exposed_symbols_by_module.insert(home, exposed_symbols); - // The declarations we'll ultimately be returning - let mut declarations_by_id: MutMap> = MutMap::default(); + // Notify all the listeners that headers are now available for this module. + if let Some(listeners) = header_listeners.remove(&home) { + for listener_id in listeners { + // This listener is longer waiting for this module, + // because this module's headers are now available! + let waiting_for = waiting_for_headers + .get_mut(&listener_id) + .expect("Unable to find module ID in waiting_for_headers"); - let mut exposed_symbols_by_module: MutMap> = MutMap::default(); + waiting_for.remove(&home); - // Modules which are waiting for certain headers to be parsed - let mut waiting_for_headers: MutMap> = MutMap::default(); + // If it's no longer waiting for anything else, solve it. + if waiting_for.is_empty() { + let header = unparsed_modules + .remove(&listener_id) + .expect("Could not find listener ID in unparsed_modules"); - // When the key ModuleId gets solved, iterate through each of the given modules - // and remove that ModuleId from the appropriate waiting_for_headers entry. - // If the relevant module's waiting_for_headers entry is now empty, canonicalize the module. - let mut header_listeners: MutMap> = MutMap::default(); + let exposed_symbols = exposed_symbols_by_module + .remove(&listener_id) + .expect("Could not find listener ID in exposed_symbols_by_module"); - let mut unparsed_modules: MutMap = MutMap::default(); - - // Modules which are waiting for certain deps to be solved - let mut waiting_for_solve: MutMap> = MutMap::default(); - - // When the key ModuleId gets solved, iterate through each of the given modules - // and remove that ModuleId from the appropriate waiting_for_solve entry. - // If the relevant module's waiting_for_solve entry is now empty, solve the module. - let mut solve_listeners: MutMap> = MutMap::default(); - - #[allow(clippy::type_complexity)] - let mut unsolved_modules: MutMap< - ModuleId, - (Module, Box, MutSet, Constraint, VarStore), - > = MutMap::default(); - - // Parse and canonicalize the module's deps - while let Some(msg) = msg_rx.recv().await { - use self::Msg::*; - - match msg { - Header(header) => { - let home = header.module_id; - let deps_by_name = &header.deps_by_name; - let mut headers_needed = - HashSet::with_capacity_and_hasher(deps_by_name.len(), default_hasher()); - - headers_parsed.insert(home); - - for dep_id in deps_by_name.values() { - if !headers_parsed.contains(&dep_id) { - headers_needed.insert(*dep_id); + spawn_parse_and_constrain( + header, + stdlib.mode, + Arc::clone(&arc_modules), + Arc::clone(&ident_ids_by_module), + &exposed_types, + exposed_symbols.clone(), + &mut waiting_for_solve, + msg_tx.clone(), + ) } } + } - // This was a dependency. Write it down and keep processing messaages. - let mut exposed_symbols: MutSet = - HashSet::with_capacity_and_hasher(header.exposes.len(), default_hasher()); + // If any of our deps weren't loaded before, start loading them. + for (dep_name, dep_id) in deps_by_name.iter() { + if !loading_started.contains(&dep_id) { + // Record that we've started loading the module *before* + // we actually start loading it. + loading_started.insert(*dep_id); - // TODO can we avoid this loop by storing them as a Set in Header to begin with? - for symbol in header.exposes.iter() { - exposed_symbols.insert(*symbol); + let msg_tx = msg_tx.clone(); + let dep_name = dep_name.clone(); + + // Provide mutexes of ModuleIds and IdentIds by module, + // so other modules can populate them as they load. + let shared = Shared(Arc::clone(&arc_modules), Arc::clone(&ident_ids_by_module)); + + // Start loading this module in the background. + spawn_blocking(move || load_module(src_dir.as_path(), dep_name, msg_tx, shared)); + } + } + + if headers_needed.is_empty() { + let exposed_symbols = exposed_symbols_by_module + .remove(&home) + .expect("Could not find listener ID in exposed_symbols_by_module"); + + spawn_parse_and_constrain( + header, + stdlib.mode, + Arc::clone(&arc_modules), + Arc::clone(&ident_ids_by_module), + &exposed_types, + exposed_symbols, + &mut waiting_for_solve, + msg_tx.clone(), + ) + } else { + // We will have to wait for our deps' headers to be parsed, + // so we can access their IdentId, which we need for canonicalization. + debug_assert!(!unparsed_modules.contains_key(&home)); + unparsed_modules.insert(home, header); + + // Register a listener with each of these. + for dep_id in headers_needed.iter() { + let listeners = header_listeners + .entry(*dep_id) + .or_insert_with(|| Vec::with_capacity(1)); + + (*listeners).push(home); } - debug_assert!(!exposed_symbols_by_module.contains_key(&home)); - exposed_symbols_by_module.insert(home, exposed_symbols); + debug_assert!(!waiting_for_headers.contains_key(&home)); + waiting_for_headers.insert(home, headers_needed); + } + } + Constrained { + module, + declarations, + src, + ident_ids, + imported_modules, + constraint, + problems, + var_store, + } => { + can_problems.extend(problems); - // Notify all the listeners that headers are now available for this module. - if let Some(listeners) = header_listeners.remove(&home) { + let module_id = module.module_id; + let waiting_for = waiting_for_solve.get_mut(&module_id).unwrap_or_else(|| { + panic!( + "Could not find module ID {:?} in waiting_for_solve", + module_id + ) + }); + + // Record the final IdentIds + debug_assert!(!constrained_ident_ids.contains_key(&module_id)); + constrained_ident_ids.insert(module_id, ident_ids); + + // It's possible that some modules have been solved since + // we began waiting for them. Remove those from waiting_for, + // because we no longer need to wait for them! + waiting_for.retain(|id| !exposed_types.contains_key(id)); + + declarations_by_id.insert(module_id, declarations); + + if waiting_for.is_empty() { + // All of our dependencies have already been solved. Great! + // That means we can proceed directly to solving. + spawn_solve_module( + module, + src, + constraint, + var_store, + imported_modules, + msg_tx.clone(), + &mut exposed_types, + stdlib, + ); + } else { + // We will have to wait for our dependencies to be solved. + debug_assert!(!unsolved_modules.contains_key(&module_id)); + unsolved_modules.insert( + module_id, + (module, src, imported_modules, constraint, var_store), + ); + + // Register a listener with each of these. + for dep_id in waiting_for.iter() { + let listeners = solve_listeners + .entry(*dep_id) + .or_insert_with(|| Vec::with_capacity(1)); + + (*listeners).push(module_id); + } + } + } + Solved { + src, + module_id, + solved_module, + solved_subs, + } => { + type_problems.extend(solved_module.problems); + + if module_id == root_id { + let solved = Arc::try_unwrap(solved_subs).unwrap_or_else(|_| { + panic!("There were still outstanding Arc references to Solved") + }); + + let module_ids = Arc::try_unwrap(arc_modules) + .unwrap_or_else(|_| { + panic!("There were still outstanding Arc references to module_ids") + }) + .into_inner() + .expect("Unwrapping mutex for module_ids"); + + let interns = Interns { + module_ids, + all_ident_ids: constrained_ident_ids, + }; + + return Ok(Some(LoadedModule { + module_id: root_id, + interns, + solved, + can_problems, + type_problems, + declarations_by_id, + exposed_vars_by_symbol: solved_module.exposed_vars_by_symbol, + src, + })); + } else { + // This was a dependency. Write it down and keep processing messages. + debug_assert!(!exposed_types.contains_key(&module_id)); + exposed_types.insert( + module_id, + ExposedModuleTypes::Valid(solved_module.solved_types, solved_module.aliases), + ); + + // Notify all the listeners that this solved. + if let Some(listeners) = solve_listeners.remove(&module_id) { for listener_id in listeners { // This listener is longer waiting for this module, - // because this module's headers are now available! - let waiting_for = waiting_for_headers + // because this module has now been solved! + let waiting_for = waiting_for_solve .get_mut(&listener_id) - .expect("Unable to find module ID in waiting_for_headers"); + .expect("Unable to find module ID in waiting_for_solve"); - waiting_for.remove(&home); + waiting_for.remove(&module_id); // If it's no longer waiting for anything else, solve it. if waiting_for.is_empty() { - let header = unparsed_modules - .remove(&listener_id) - .expect("Could not find listener ID in unparsed_modules"); + let (module, src, imported_modules, constraint, var_store) = + unsolved_modules + .remove(&listener_id) + .expect("Could not find listener ID in unsolved_modules"); - let exposed_symbols = exposed_symbols_by_module - .remove(&listener_id) - .expect("Could not find listener ID in exposed_symbols_by_module"); - - spawn_parse_and_constrain( - header, - stdlib.mode, - Arc::clone(&arc_modules), - Arc::clone(&ident_ids_by_module), - &exposed_types, - exposed_symbols.clone(), - &mut waiting_for_solve, + spawn_solve_module( + module, + src, + constraint, + var_store, + imported_modules, msg_tx.clone(), - ) - } - } - } - - // If any of our deps weren't loaded before, start loading them. - for (dep_name, dep_id) in deps_by_name.iter() { - if !loading_started.contains(&dep_id) { - // Record that we've started loading the module *before* - // we actually start loading it. - loading_started.insert(*dep_id); - - let env = env.clone(); - let msg_tx = msg_tx.clone(); - let dep_name = dep_name.clone(); - - // Provide mutexes of ModuleIds and IdentIds by module, - // so other modules can populate them as they load. - let shared = - Shared(Arc::clone(&arc_modules), Arc::clone(&ident_ids_by_module)); - - // Start loading this module in the background. - spawn_blocking(move || load_module(env, dep_name, msg_tx, shared)); - } - } - - if headers_needed.is_empty() { - let exposed_symbols = exposed_symbols_by_module - .remove(&home) - .expect("Could not find listener ID in exposed_symbols_by_module"); - - spawn_parse_and_constrain( - header, - stdlib.mode, - Arc::clone(&arc_modules), - Arc::clone(&ident_ids_by_module), - &exposed_types, - exposed_symbols, - &mut waiting_for_solve, - msg_tx.clone(), - ) - } else { - // We will have to wait for our deps' headers to be parsed, - // so we can access their IdentId, which we need for canonicalization. - debug_assert!(!unparsed_modules.contains_key(&home)); - unparsed_modules.insert(home, header); - - // Register a listener with each of these. - for dep_id in headers_needed.iter() { - let listeners = header_listeners - .entry(*dep_id) - .or_insert_with(|| Vec::with_capacity(1)); - - (*listeners).push(home); - } - - debug_assert!(!waiting_for_headers.contains_key(&home)); - waiting_for_headers.insert(home, headers_needed); - } - } - Constrained { - module, - declarations, - src, - ident_ids, - imported_modules, - constraint, - problems, - var_store, - } => { - can_problems.extend(problems); - - let module_id = module.module_id; - let waiting_for = waiting_for_solve.get_mut(&module_id).unwrap_or_else(|| { - panic!( - "Could not find module ID {:?} in waiting_for_solve", - module_id - ) - }); - - // Record the final IdentIds - debug_assert!(!constrained_ident_ids.contains_key(&module_id)); - constrained_ident_ids.insert(module_id, ident_ids); - - // It's possible that some modules have been solved since - // we began waiting for them. Remove those from waiting_for, - // because we no longer need to wait for them! - waiting_for.retain(|id| !exposed_types.contains_key(id)); - - declarations_by_id.insert(module_id, declarations); - - if waiting_for.is_empty() { - // All of our dependencies have already been solved. Great! - // That means we can proceed directly to solving. - spawn_solve_module( - module, - src, - constraint, - var_store, - imported_modules, - msg_tx.clone(), - &mut exposed_types, - stdlib, - ); - } else { - // We will have to wait for our dependencies to be solved. - debug_assert!(!unsolved_modules.contains_key(&module_id)); - unsolved_modules.insert( - module_id, - (module, src, imported_modules, constraint, var_store), - ); - - // Register a listener with each of these. - for dep_id in waiting_for.iter() { - let listeners = solve_listeners - .entry(*dep_id) - .or_insert_with(|| Vec::with_capacity(1)); - - (*listeners).push(module_id); - } - } - } - Solved { - src, - module_id, - solved_module, - solved_subs, - } => { - type_problems.extend(solved_module.problems); - - if module_id == root_id { - // Once we've solved the originally requested module, we're done! - msg_rx.close(); - - let solved = Arc::try_unwrap(solved_subs).unwrap_or_else(|_| { - panic!("There were still outstanding Arc references to Solved") - }); - - let module_ids = Arc::try_unwrap(arc_modules) - .unwrap_or_else(|_| { - panic!("There were still outstanding Arc references to module_ids") - }) - .into_inner() - .expect("Unwrapping mutex for module_ids"); - - let interns = Interns { - module_ids, - all_ident_ids: constrained_ident_ids, - }; - - return Ok(LoadedModule { - module_id: root_id, - interns, - solved, - can_problems, - type_problems, - declarations_by_id, - exposed_vars_by_symbol: solved_module.exposed_vars_by_symbol, - src, - }); - } else { - // This was a dependency. Write it down and keep processing messages. - debug_assert!(!exposed_types.contains_key(&module_id)); - exposed_types.insert( - module_id, - ExposedModuleTypes::Valid( - solved_module.solved_types, - solved_module.aliases, - ), - ); - - // Notify all the listeners that this solved. - if let Some(listeners) = solve_listeners.remove(&module_id) { - for listener_id in listeners { - // This listener is longer waiting for this module, - // because this module has now been solved! - let waiting_for = waiting_for_solve - .get_mut(&listener_id) - .expect("Unable to find module ID in waiting_for_solve"); - - waiting_for.remove(&module_id); - - // If it's no longer waiting for anything else, solve it. - if waiting_for.is_empty() { - let (module, src, imported_modules, constraint, var_store) = - unsolved_modules - .remove(&listener_id) - .expect("Could not find listener ID in unsolved_modules"); - - spawn_solve_module( - module, - src, - constraint, - var_store, - imported_modules, - msg_tx.clone(), - &mut exposed_types, - stdlib, - ); - } + &mut exposed_types, + stdlib, + ); } } } @@ -519,20 +597,21 @@ async fn load_deps<'a>( } } - // The msg_rx receiver closed unexpectedly before we finished solving everything - Err(LoadingProblem::MsgChannelDied) + // We did some work but haven't finish loading the module yet. + // If we had, this would have been Ok(Some(loaded_module)) instead. + Ok(None) } /// Load a module by its module name, rather than by its filename fn load_module( - env: Env, + src_dir: &Path, module_name: ModuleName, msg_tx: MsgSender, module_ids: SharedModules<'_, '_>, ) -> Result { let mut filename = PathBuf::new(); - filename.push(env.src_dir); + filename.push(src_dir); // Convert dots in module name to directories for part in module_name.as_str().split(MODULE_SEPARATOR) { @@ -553,17 +632,14 @@ fn load_module( /// corresponding Stealer, which can steal from it. Stealers can be shared across threads.) /// /// Based on https://docs.rs/crossbeam/0.7.3/crossbeam/deque/index.html#examples -fn find_task( - local: &Worker, - global: &Injector, - stealers: &[Stealer], -) -> Option { +fn find_task(local: &Worker, global: &Injector, stealers: &[Stealer]) -> Option { // Pop a task from the local queue, if not empty. local.pop().or_else(|| { // Otherwise, we need to look for a task elsewhere. iter::repeat_with(|| { // Try stealing a task from the global queue. - global.steal() + global + .steal() // Or try stealing a task from one of the other threads. .or_else(|| stealers.iter().map(|s| s.steal()).collect()) })