Notify listeners after adding tasks

This commit is contained in:
Richard Feldman 2020-07-30 19:21:18 -04:00
parent f8590182f5
commit ccdc2ea89a

View file

@ -199,6 +199,23 @@ pub fn load<'a>(
) )
} }
/// Add a task to the queue, and notify all the listeners.
fn enqueue_task<'a, 'b>(
injector: &Injector<BuildTask<'a, 'b>>,
listeners: &[Sender<WorkerMsg>],
task: BuildTask<'a, 'b>,
) -> Result<(), LoadingProblem> {
injector.push(task);
for listener in listeners {
listener
.send(WorkerMsg::TaskAdded)
.map_err(|_| LoadingProblem::MsgChannelDied)?;
}
Ok(())
}
#[derive(Debug)] #[derive(Debug)]
struct State<'a> { struct State<'a> {
pub root_id: ModuleId, pub root_id: ModuleId,
@ -285,7 +302,7 @@ fn load_deps<'a>(
let num_workers = num_cpus::get() - 1; let num_workers = num_cpus::get() - 1;
// We'll add tasks to this, and then worker threads will take tasks from it. // We'll add tasks to this, and then worker threads will take tasks from it.
let main_task_queue = Injector::new(); let injector = Injector::new();
// We need to allocate worker *queues* on the main thread and then move them // We need to allocate worker *queues* on the main thread and then move them
// into the worker threads, because those workers' stealers need to be // into the worker threads, because those workers' stealers need to be
@ -348,10 +365,10 @@ fn load_deps<'a>(
worker_listeners.push(worker_msg_tx); worker_listeners.push(worker_msg_tx);
// We only want to move a *reference* to the main task queue // We only want to move a *reference* to the main task queue's
// into the thread, not the entire queue itself (since other threads // injector in the thread, not the injector itself
// need to reference it too). // (since other threads need to reference it too).
let main_task_queue = &main_task_queue; let injector = &injector;
// Record this thread's handle so the main thread can join it later. // Record this thread's handle so the main thread can join it later.
thread_scope.spawn(move |_| { thread_scope.spawn(move |_| {
@ -369,7 +386,7 @@ fn load_deps<'a>(
// Find a task - either from this thread's queue, // Find a task - either from this thread's queue,
// or from the main queue, or from another worker's // or from the main queue, or from another worker's
// queue - and run it. // queue - and run it.
match find_task(&worker, main_task_queue, stealers) { match find_task(&worker, injector, stealers) {
Some(task) => { Some(task) => {
let t2: BuildTask<'a, '_> = task; let t2: BuildTask<'a, '_> = task;
println!("run this task: {:?}", t2); println!("run this task: {:?}", t2);
@ -429,14 +446,7 @@ fn load_deps<'a>(
// This is where most of the main thread's work gets done. // This is where most of the main thread's work gets done.
// Everything up to this point has been setting up the threading // Everything up to this point has been setting up the threading
// system which lets this logic work efficiently. // system which lets this logic work efficiently.
state = update( state = update(state, msg, stdlib, &msg_tx, &injector, worker_listeners)?;
state,
msg,
stdlib,
&msg_tx,
&main_task_queue,
worker_listeners,
)?;
} }
} }
} }
@ -452,7 +462,7 @@ fn update<'a>(
msg: Msg<'a>, msg: Msg<'a>,
stdlib: &StdLib, stdlib: &StdLib,
msg_tx: &MsgSender<'a>, msg_tx: &MsgSender<'a>,
main_task_queue: &Injector<BuildTask<'a, '_>>, injector: &Injector<BuildTask<'a, '_>>,
worker_listeners: &'a [Sender<WorkerMsg>], worker_listeners: &'a [Sender<WorkerMsg>],
) -> Result<State<'a>, LoadingProblem> { ) -> Result<State<'a>, LoadingProblem> {
use self::MaybeShared::*; use self::MaybeShared::*;
@ -511,15 +521,19 @@ fn update<'a>(
.remove(&listener_id) .remove(&listener_id)
.expect("Could not find listener ID in exposed_symbols_by_module"); .expect("Could not find listener ID in exposed_symbols_by_module");
main_task_queue.push(build_parse_and_constrain_task( enqueue_task(
header, injector,
stdlib.mode, worker_listeners,
Arc::clone(&state.arc_modules), build_parse_and_constrain_task(
Arc::clone(&state.ident_ids_by_module), header,
&state.exposed_types, stdlib.mode,
exposed_symbols.clone(), Arc::clone(&state.arc_modules),
&mut state.waiting_for_solve, Arc::clone(&state.ident_ids_by_module),
)); &state.exposed_types,
exposed_symbols.clone(),
&mut state.waiting_for_solve,
),
)?;
for tx in worker_listeners { for tx in worker_listeners {
match tx.send(WorkerMsg::TaskAdded) { match tx.send(WorkerMsg::TaskAdded) {
@ -551,10 +565,14 @@ fn update<'a>(
); );
// Start loading this module in the background. // Start loading this module in the background.
main_task_queue.push(BuildTask::LoadModule { enqueue_task(
module_name: dep_name, injector,
module_ids: shared, worker_listeners,
}); BuildTask::LoadModule {
module_name: dep_name,
module_ids: shared,
},
)?;
} }
} }
@ -564,15 +582,19 @@ fn update<'a>(
.remove(&home) .remove(&home)
.expect("Could not find listener ID in exposed_symbols_by_module"); .expect("Could not find listener ID in exposed_symbols_by_module");
main_task_queue.push(build_parse_and_constrain_task( enqueue_task(
header, injector,
stdlib.mode, worker_listeners,
Arc::clone(&state.arc_modules), build_parse_and_constrain_task(
Arc::clone(&state.ident_ids_by_module), header,
&state.exposed_types, stdlib.mode,
exposed_symbols, Arc::clone(&state.arc_modules),
&mut state.waiting_for_solve, Arc::clone(&state.ident_ids_by_module),
)); &state.exposed_types,
exposed_symbols,
&mut state.waiting_for_solve,
),
)?;
} else { } else {
// We will have to wait for our deps' headers to be parsed, // We will have to wait for our deps' headers to be parsed,
// so we can access their IdentId, which we need for canonicalization. // so we can access their IdentId, which we need for canonicalization.