diff --git a/compiler/load/src/file.rs b/compiler/load/src/file.rs index 95c3618140..b0b69907af 100644 --- a/compiler/load/src/file.rs +++ b/compiler/load/src/file.rs @@ -199,6 +199,23 @@ pub fn load<'a>( ) } +/// Add a task to the queue, and notify all the listeners. +fn enqueue_task<'a, 'b>( + injector: &Injector>, + listeners: &[Sender], + task: BuildTask<'a, 'b>, +) -> Result<(), LoadingProblem> { + injector.push(task); + + for listener in listeners { + listener + .send(WorkerMsg::TaskAdded) + .map_err(|_| LoadingProblem::MsgChannelDied)?; + } + + Ok(()) +} + #[derive(Debug)] struct State<'a> { pub root_id: ModuleId, @@ -285,7 +302,7 @@ fn load_deps<'a>( let num_workers = num_cpus::get() - 1; // We'll add tasks to this, and then worker threads will take tasks from it. - let main_task_queue = Injector::new(); + let injector = Injector::new(); // We need to allocate worker *queues* on the main thread and then move them // into the worker threads, because those workers' stealers need to be @@ -348,10 +365,10 @@ fn load_deps<'a>( worker_listeners.push(worker_msg_tx); - // We only want to move a *reference* to the main task queue - // into the thread, not the entire queue itself (since other threads - // need to reference it too). - let main_task_queue = &main_task_queue; + // We only want to move a *reference* to the main task queue's + // injector in the thread, not the injector itself + // (since other threads need to reference it too). + let injector = &injector; // Record this thread's handle so the main thread can join it later. thread_scope.spawn(move |_| { @@ -369,7 +386,7 @@ fn load_deps<'a>( // Find a task - either from this thread's queue, // or from the main queue, or from another worker's // queue - and run it. - match find_task(&worker, main_task_queue, stealers) { + match find_task(&worker, injector, stealers) { Some(task) => { let t2: BuildTask<'a, '_> = task; println!("run this task: {:?}", t2); @@ -429,14 +446,7 @@ fn load_deps<'a>( // This is where most of the main thread's work gets done. // Everything up to this point has been setting up the threading // system which lets this logic work efficiently. - state = update( - state, - msg, - stdlib, - &msg_tx, - &main_task_queue, - worker_listeners, - )?; + state = update(state, msg, stdlib, &msg_tx, &injector, worker_listeners)?; } } } @@ -452,7 +462,7 @@ fn update<'a>( msg: Msg<'a>, stdlib: &StdLib, msg_tx: &MsgSender<'a>, - main_task_queue: &Injector>, + injector: &Injector>, worker_listeners: &'a [Sender], ) -> Result, LoadingProblem> { use self::MaybeShared::*; @@ -511,15 +521,19 @@ fn update<'a>( .remove(&listener_id) .expect("Could not find listener ID in exposed_symbols_by_module"); - main_task_queue.push(build_parse_and_constrain_task( - header, - stdlib.mode, - Arc::clone(&state.arc_modules), - Arc::clone(&state.ident_ids_by_module), - &state.exposed_types, - exposed_symbols.clone(), - &mut state.waiting_for_solve, - )); + enqueue_task( + injector, + worker_listeners, + build_parse_and_constrain_task( + header, + stdlib.mode, + Arc::clone(&state.arc_modules), + Arc::clone(&state.ident_ids_by_module), + &state.exposed_types, + exposed_symbols.clone(), + &mut state.waiting_for_solve, + ), + )?; for tx in worker_listeners { match tx.send(WorkerMsg::TaskAdded) { @@ -551,10 +565,14 @@ fn update<'a>( ); // Start loading this module in the background. - main_task_queue.push(BuildTask::LoadModule { - module_name: dep_name, - module_ids: shared, - }); + enqueue_task( + injector, + worker_listeners, + BuildTask::LoadModule { + module_name: dep_name, + module_ids: shared, + }, + )?; } } @@ -564,15 +582,19 @@ fn update<'a>( .remove(&home) .expect("Could not find listener ID in exposed_symbols_by_module"); - main_task_queue.push(build_parse_and_constrain_task( - header, - stdlib.mode, - Arc::clone(&state.arc_modules), - Arc::clone(&state.ident_ids_by_module), - &state.exposed_types, - exposed_symbols, - &mut state.waiting_for_solve, - )); + enqueue_task( + injector, + worker_listeners, + build_parse_and_constrain_task( + header, + stdlib.mode, + Arc::clone(&state.arc_modules), + Arc::clone(&state.ident_ids_by_module), + &state.exposed_types, + exposed_symbols, + &mut state.waiting_for_solve, + ), + )?; } else { // We will have to wait for our deps' headers to be parsed, // so we can access their IdentId, which we need for canonicalization.