mirror of
https://github.com/roc-lang/roc.git
synced 2025-09-27 05:49:08 +00:00
Prevent panics in worker threads from causing deadlocks
Occasionally, we'll run into panics during compilation which prior to this diff would cause the compiler to hang indefinitely after printing the initial panic message. Note that this simplified/renamed worker 'wakeup' system seems to leave around a bit of a code smell - that this perhaps indicates we should rework/refactor in order to avoid the necessity of having separate wakeup and message stealing systems... but one problem at a time!
This commit is contained in:
parent
157aaae7e4
commit
d6b7ee2039
2 changed files with 84 additions and 125 deletions
|
@ -68,7 +68,7 @@ use roc_solve_problem::TypeError;
|
|||
use roc_target::Target;
|
||||
use roc_types::subs::{CopiedImport, ExposedTypesStorageSubs, Subs, VarStore, Variable};
|
||||
use roc_types::types::{Alias, Types};
|
||||
use roc_worker::{ChannelProblem, WorkerMsg};
|
||||
use roc_worker::ChannelProblem;
|
||||
use std::collections::hash_map::Entry::{Occupied, Vacant};
|
||||
use std::collections::HashMap;
|
||||
use std::io;
|
||||
|
@ -1035,14 +1035,14 @@ type MsgSender<'a> = Sender<Msg<'a>>;
|
|||
/// Add a task to the queue, and notify all the listeners.
|
||||
fn enqueue_task<'a>(
|
||||
injector: &Injector<BuildTask<'a>>,
|
||||
listeners: &[Sender<WorkerMsg>],
|
||||
listeners: &[Sender<()>],
|
||||
task: BuildTask<'a>,
|
||||
) -> Result<(), LoadingProblem<'a>> {
|
||||
injector.push(task);
|
||||
|
||||
for listener in listeners {
|
||||
listener
|
||||
.send(WorkerMsg::TaskAdded)
|
||||
.send(())
|
||||
.map_err(|_| LoadingProblem::ChannelProblem(ChannelProblem::FailedToEnqueueTask))?;
|
||||
}
|
||||
|
||||
|
@ -1569,9 +1569,9 @@ pub fn load_single_threaded<'a>(
|
|||
// We'll add tasks to this, and then worker threads will take tasks from it.
|
||||
let injector = Injector::new();
|
||||
|
||||
let (worker_msg_tx, worker_msg_rx) = bounded(1024);
|
||||
let worker_listener = worker_msg_tx;
|
||||
let worker_listeners = arena.alloc([worker_listener]);
|
||||
let (worker_wakup_tx, worker_wakup_rx) = bounded(1024);
|
||||
let worker_waker = worker_wakup_tx;
|
||||
let worker_wakers = [worker_waker];
|
||||
|
||||
let worker = Worker::new_fifo();
|
||||
let stealer = worker.stealer();
|
||||
|
@ -1579,7 +1579,7 @@ pub fn load_single_threaded<'a>(
|
|||
|
||||
// now we just manually interleave stepping the state "thread" and the worker "thread"
|
||||
loop {
|
||||
match state_thread_step(arena, state, worker_listeners, &injector, &msg_tx, &msg_rx) {
|
||||
match state_thread_step(arena, state, &worker_wakers, &injector, &msg_tx, &msg_rx) {
|
||||
Ok(ControlFlow::Break(done)) => return Ok(done),
|
||||
Ok(ControlFlow::Continue(new_state)) => {
|
||||
state = new_state;
|
||||
|
@ -1589,7 +1589,7 @@ pub fn load_single_threaded<'a>(
|
|||
|
||||
// then check if the worker can step
|
||||
let control_flow =
|
||||
roc_worker::worker_task_step(&worker, &injector, stealers, &worker_msg_rx, |task| {
|
||||
roc_worker::worker_task_step(&worker, &injector, stealers, &worker_wakup_rx, |task| {
|
||||
run_task(task, arena, &src_dir, msg_tx.clone(), roc_cache_dir, target)
|
||||
});
|
||||
|
||||
|
@ -1606,7 +1606,7 @@ pub fn load_single_threaded<'a>(
|
|||
fn state_thread_step<'a>(
|
||||
arena: &'a Bump,
|
||||
state: State<'a>,
|
||||
worker_listeners: &'a [Sender<WorkerMsg>],
|
||||
worker_wakers: &[Sender<()>],
|
||||
injector: &Injector<BuildTask<'a>>,
|
||||
msg_tx: &crossbeam::channel::Sender<Msg<'a>>,
|
||||
msg_rx: &crossbeam::channel::Receiver<Msg<'a>>,
|
||||
|
@ -1712,14 +1712,8 @@ fn state_thread_step<'a>(
|
|||
let render = state.render;
|
||||
let palette = state.palette;
|
||||
|
||||
let res_state = update(
|
||||
state,
|
||||
msg,
|
||||
msg_tx.clone(),
|
||||
injector,
|
||||
worker_listeners,
|
||||
arena,
|
||||
);
|
||||
let res_state =
|
||||
update(state, msg, msg_tx.clone(), injector, worker_wakers, arena);
|
||||
|
||||
match res_state {
|
||||
Ok(new_state) => Ok(ControlFlow::Continue(new_state)),
|
||||
|
@ -1993,15 +1987,21 @@ fn load_multi_threaded<'a>(
|
|||
|
||||
{
|
||||
let thread_result = thread::scope(|thread_scope| {
|
||||
let mut worker_listeners =
|
||||
bumpalo::collections::Vec::with_capacity_in(num_workers, arena);
|
||||
// Careful! It's important that worker listeners aren't allocated in the arena,
|
||||
// since they need to be correctly dropped if we have a panic in this thread::scope code.
|
||||
// Making sure they're owned means they'll be dropped correctly on either normal exit
|
||||
// of this thread::scope block or on panicking. When they're dropped, the worker threads
|
||||
// will correctly exit their message processing loops.
|
||||
// If these were allocated in the arena, we might panic without shutting down the worker threads,
|
||||
// causing the thread::scope block to hang while it waits for the worker threads to exit.
|
||||
let mut worker_wakers = Vec::with_capacity(num_workers);
|
||||
|
||||
for worker_arena in it {
|
||||
let msg_tx = msg_tx.clone();
|
||||
let worker = worker_queues.pop().unwrap();
|
||||
|
||||
let (worker_msg_tx, worker_msg_rx) = bounded(1024);
|
||||
worker_listeners.push(worker_msg_tx);
|
||||
let (worker_wakup_tx, worker_wakup_rx) = bounded(1024);
|
||||
worker_wakers.push(worker_wakup_tx);
|
||||
|
||||
// We only want to move a *reference* to the main task queue's
|
||||
// injector in the thread, not the injector itself
|
||||
|
@ -2015,16 +2015,22 @@ fn load_multi_threaded<'a>(
|
|||
.stack_size(EXPANDED_STACK_SIZE)
|
||||
.spawn(move |_| {
|
||||
// will process messages until we run out
|
||||
roc_worker::worker_task(worker, injector, stealers, worker_msg_rx, |task| {
|
||||
run_task(
|
||||
task,
|
||||
worker_arena,
|
||||
src_dir,
|
||||
msg_tx.clone(),
|
||||
roc_cache_dir,
|
||||
target,
|
||||
)
|
||||
})
|
||||
roc_worker::worker_task(
|
||||
worker,
|
||||
injector,
|
||||
stealers,
|
||||
worker_wakup_rx,
|
||||
|task| {
|
||||
run_task(
|
||||
task,
|
||||
worker_arena,
|
||||
src_dir,
|
||||
msg_tx.clone(),
|
||||
roc_cache_dir,
|
||||
target,
|
||||
)
|
||||
},
|
||||
)
|
||||
});
|
||||
|
||||
res_join_handle.unwrap_or_else(|_| {
|
||||
|
@ -2039,31 +2045,13 @@ fn load_multi_threaded<'a>(
|
|||
|
||||
// Grab a reference to these Senders outside the loop, so we can share
|
||||
// it across each iteration of the loop.
|
||||
let worker_listeners = worker_listeners.into_bump_slice();
|
||||
let msg_tx = msg_tx.clone();
|
||||
|
||||
macro_rules! shut_down_worker_threads {
|
||||
() => {
|
||||
for listener in worker_listeners {
|
||||
// We intentionally don't propagate this Result, because even if
|
||||
// shutting down a worker failed (which can happen if a a panic
|
||||
// occurred on that thread), we want to continue shutting down
|
||||
// the others regardless.
|
||||
if listener.send(WorkerMsg::Shutdown).is_err() {
|
||||
log!("There was an error trying to shutdown a worker thread. One reason this can happen is if the thread panicked.");
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
// The root module will have already queued up messages to process,
|
||||
// and processing those messages will in turn queue up more messages.
|
||||
loop {
|
||||
match state_thread_step(arena, state, worker_listeners, &injector, &msg_tx, &msg_rx)
|
||||
{
|
||||
match state_thread_step(arena, state, &worker_wakers, &injector, &msg_tx, &msg_rx) {
|
||||
Ok(ControlFlow::Break(load_result)) => {
|
||||
shut_down_worker_threads!();
|
||||
|
||||
return Ok(load_result);
|
||||
}
|
||||
Ok(ControlFlow::Continue(new_state)) => {
|
||||
|
@ -2071,8 +2059,6 @@ fn load_multi_threaded<'a>(
|
|||
continue;
|
||||
}
|
||||
Err(e) => {
|
||||
shut_down_worker_threads!();
|
||||
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
|
@ -2111,13 +2097,13 @@ fn start_tasks<'a>(
|
|||
state: &mut State<'a>,
|
||||
work: MutSet<(ModuleId, Phase)>,
|
||||
injector: &Injector<BuildTask<'a>>,
|
||||
worker_listeners: &'a [Sender<WorkerMsg>],
|
||||
worker_wakers: &[Sender<()>],
|
||||
) -> Result<(), LoadingProblem<'a>> {
|
||||
for (module_id, phase) in work {
|
||||
let tasks = start_phase(module_id, phase, arena, state);
|
||||
|
||||
for task in tasks {
|
||||
enqueue_task(injector, worker_listeners, task)?
|
||||
enqueue_task(injector, worker_wakers, task)?
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2179,7 +2165,7 @@ fn update<'a>(
|
|||
msg: Msg<'a>,
|
||||
msg_tx: MsgSender<'a>,
|
||||
injector: &Injector<BuildTask<'a>>,
|
||||
worker_listeners: &'a [Sender<WorkerMsg>],
|
||||
worker_wakers: &[Sender<()>],
|
||||
arena: &'a Bump,
|
||||
) -> Result<State<'a>, LoadingProblem<'a>> {
|
||||
use self::Msg::*;
|
||||
|
@ -2305,7 +2291,7 @@ fn update<'a>(
|
|||
work.extend(state.dependencies.notify(home, Phase::LoadHeader));
|
||||
work.insert((home, Phase::Parse));
|
||||
|
||||
start_tasks(arena, &mut state, work, injector, worker_listeners)?;
|
||||
start_tasks(arena, &mut state, work, injector, worker_wakers)?;
|
||||
|
||||
Ok(state)
|
||||
}
|
||||
|
@ -2382,7 +2368,7 @@ fn update<'a>(
|
|||
}
|
||||
};
|
||||
|
||||
start_tasks(arena, &mut state, work, injector, worker_listeners)?;
|
||||
start_tasks(arena, &mut state, work, injector, worker_wakers)?;
|
||||
|
||||
state
|
||||
.module_cache
|
||||
|
@ -2393,7 +2379,7 @@ fn update<'a>(
|
|||
|
||||
let work = state.dependencies.notify(module_id, Phase::Parse);
|
||||
|
||||
start_tasks(arena, &mut state, work, injector, worker_listeners)?;
|
||||
start_tasks(arena, &mut state, work, injector, worker_wakers)?;
|
||||
|
||||
Ok(state)
|
||||
}
|
||||
|
@ -2445,7 +2431,7 @@ fn update<'a>(
|
|||
.dependencies
|
||||
.notify(module_id, Phase::CanonicalizeAndConstrain);
|
||||
|
||||
start_tasks(arena, &mut state, work, injector, worker_listeners)?;
|
||||
start_tasks(arena, &mut state, work, injector, worker_wakers)?;
|
||||
|
||||
Ok(state)
|
||||
}
|
||||
|
@ -2652,7 +2638,7 @@ fn update<'a>(
|
|||
work
|
||||
};
|
||||
|
||||
start_tasks(arena, &mut state, work, injector, worker_listeners)?;
|
||||
start_tasks(arena, &mut state, work, injector, worker_wakers)?;
|
||||
}
|
||||
|
||||
Ok(state)
|
||||
|
@ -2700,7 +2686,7 @@ fn update<'a>(
|
|||
.dependencies
|
||||
.notify(module_id, Phase::FindSpecializations);
|
||||
|
||||
start_tasks(arena, &mut state, work, injector, worker_listeners)?;
|
||||
start_tasks(arena, &mut state, work, injector, worker_wakers)?;
|
||||
|
||||
Ok(state)
|
||||
}
|
||||
|
@ -2990,13 +2976,13 @@ fn update<'a>(
|
|||
|
||||
let work = state.dependencies.reload_make_specialization_pass();
|
||||
|
||||
start_tasks(arena, &mut state, work, injector, worker_listeners)?;
|
||||
start_tasks(arena, &mut state, work, injector, worker_wakers)?;
|
||||
|
||||
Ok(state)
|
||||
}
|
||||
|
||||
NextStep::MakingInPhase => {
|
||||
start_tasks(arena, &mut state, work, injector, worker_listeners)?;
|
||||
start_tasks(arena, &mut state, work, injector, worker_wakers)?;
|
||||
|
||||
Ok(state)
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue