Have file.rs use the new roc_worker crate

This commit is contained in:
Richard Feldman 2024-06-17 22:14:00 -04:00
parent 4042fd8d52
commit d1a002f49e
No known key found for this signature in database
GPG key ID: F1F21AA5B1D9E43B
6 changed files with 275 additions and 333 deletions

12
Cargo.lock generated
View file

@ -2802,6 +2802,7 @@ dependencies = [
"roc_types",
"roc_unify",
"roc_work",
"roc_worker",
"tempfile",
"ven_pretty",
]
@ -3226,6 +3227,17 @@ dependencies = [
"roc_module",
]
[[package]]
name = "roc_worker"
version = "0.0.1"
dependencies = [
"crossbeam",
"roc_collections",
"roc_error_macros",
"roc_module",
"roc_work",
]
[[package]]
name = "rustc-demangle"
version = "0.1.23"

View file

@ -32,6 +32,7 @@ roc_target = { path = "../roc_target" }
roc_tracing = { path = "../../tracing" }
roc_types = { path = "../types" }
roc_unify = { path = "../unify" }
roc_worker = { path = "../worker" }
ven_pretty = { path = "../../vendor/pretty" }

View file

@ -9,7 +9,7 @@ use crate::module::{
use crate::module_cache::ModuleCache;
use bumpalo::{collections::CollectIn, Bump};
use crossbeam::channel::{bounded, Sender};
use crossbeam::deque::{Injector, Stealer, Worker};
use crossbeam::deque::{Injector, Worker};
use crossbeam::thread;
use parking_lot::Mutex;
use roc_builtins::roc::module_source;
@ -66,10 +66,10 @@ use roc_solve_problem::TypeError;
use roc_target::Target;
use roc_types::subs::{CopiedImport, ExposedTypesStorageSubs, Subs, VarStore, Variable};
use roc_types::types::{Alias, Types};
use roc_worker::{ChannelProblem, WorkerMsg};
use std::collections::hash_map::Entry::{Occupied, Vacant};
use std::collections::HashMap;
use std::io;
use std::iter;
use std::ops::ControlFlow;
use std::path::{Path, PathBuf};
use std::str::from_utf8_unchecked;
@ -939,12 +939,6 @@ enum BuildTask<'a> {
},
}
#[derive(Debug)]
enum WorkerMsg {
Shutdown,
TaskAdded,
}
#[derive(Debug)]
pub struct IncorrectModuleName<'a> {
pub module_id: ModuleId,
@ -1012,26 +1006,6 @@ impl<'a> AvailableShorthands<'a> {
}
}
#[derive(Debug)]
pub enum ChannelProblem {
FailedToEnqueueTask(Box<PanicReportInfo>),
FailedToSendRootMsg,
FailedToSendWorkerShutdownMsg,
ChannelDisconnected,
FailedToSendManyMsg,
FailedToSendFinishedSpecializationsMsg,
FailedToSendTaskMsg,
FailedToSendFinishedTypeCheckingMsg,
}
#[derive(Debug)]
pub struct PanicReportInfo {
can_problems: MutMap<ModuleId, Vec<roc_problem::can::Problem>>,
type_problems: MutMap<ModuleId, Vec<TypeError>>,
sources: MutMap<ModuleId, (PathBuf, Box<str>)>,
interns: Interns,
}
pub enum Phases {
/// Parse, canonicalize, check types
TypeCheck,
@ -1046,35 +1020,13 @@ fn enqueue_task<'a>(
injector: &Injector<BuildTask<'a>>,
listeners: &[Sender<WorkerMsg>],
task: BuildTask<'a>,
state: &State<'a>,
) -> Result<(), LoadingProblem<'a>> {
injector.push(task);
for listener in listeners {
listener.send(WorkerMsg::TaskAdded).map_err(|_| {
let module_ids = { (*state.arc_modules).lock().clone() }.into_module_ids();
let interns = Interns {
module_ids,
all_ident_ids: state.constrained_ident_ids.clone(),
};
LoadingProblem::ChannelProblem(ChannelProblem::FailedToEnqueueTask(Box::new(
PanicReportInfo {
can_problems: state.module_cache.can_problems.clone(),
type_problems: state.module_cache.type_problems.clone(),
interns,
sources: state
.module_cache
.sources
.iter()
.map(|(key, (path, str_ref))| {
(*key, (path.clone(), str_ref.to_string().into_boxed_str()))
})
.collect(),
},
)))
})?;
listener
.send(WorkerMsg::TaskAdded)
.map_err(|_| LoadingProblem::ChannelProblem(ChannelProblem::FailedToEnqueueTask))?;
}
Ok(())
@ -1625,24 +1577,17 @@ pub fn load_single_threaded<'a>(
}
// then check if the worker can step
let control_flow = worker_task_step(
arena,
&worker,
&injector,
stealers,
&worker_msg_rx,
&msg_tx,
&src_dir,
roc_cache_dir,
target,
);
let control_flow =
roc_worker::worker_task_step(&worker, &injector, stealers, &worker_msg_rx, |task| {
run_task(task, arena, &src_dir, msg_tx.clone(), roc_cache_dir, target)
});
match control_flow {
Ok(ControlFlow::Break(())) => panic!("the worker should not break!"),
Ok(ControlFlow::Continue(())) => {
// progress was made
}
Err(e) => return Err(e),
Err(e) => return Err(LoadingProblem::ChannelProblem(e)),
}
}
}
@ -2037,11 +1982,6 @@ fn load_multi_threaded<'a>(
let stealers = stealers.into_bump_slice();
let it = worker_arenas.iter_mut();
let mut can_problems_recorded = MutMap::default();
let mut type_problems_recorded = MutMap::default();
let mut sources_recorded = MutMap::default();
let mut interns_recorded = Interns::default();
{
let thread_result = thread::scope(|thread_scope| {
let mut worker_listeners =
@ -2066,17 +2006,16 @@ fn load_multi_threaded<'a>(
.stack_size(EXPANDED_STACK_SIZE)
.spawn(move |_| {
// will process messages until we run out
worker_task(
worker_arena,
worker,
injector,
stealers,
worker_msg_rx,
msg_tx,
src_dir,
roc_cache_dir,
target,
)
roc_worker::worker_task(worker, injector, stealers, worker_msg_rx, |task| {
run_task(
task,
worker_arena,
src_dir,
msg_tx.clone(),
roc_cache_dir,
target,
)
})
});
res_join_handle.unwrap_or_else(|_| {
@ -2129,36 +2068,6 @@ fn load_multi_threaded<'a>(
state = new_state;
continue;
}
Err(LoadingProblem::ChannelProblem(ChannelProblem::FailedToEnqueueTask(
info,
))) => {
let PanicReportInfo {
can_problems,
type_problems,
sources,
interns,
} = *info;
// Record these for later.
can_problems_recorded = can_problems;
type_problems_recorded = type_problems;
sources_recorded = sources;
interns_recorded = interns;
shut_down_worker_threads!();
return Err(LoadingProblem::ChannelProblem(
ChannelProblem::FailedToEnqueueTask(Box::new(PanicReportInfo {
// This return value never gets used, so don't bother
// cloning these in order to be able to return them.
// Really, anything could go here.
can_problems: Default::default(),
type_problems: Default::default(),
sources: Default::default(),
interns: Default::default(),
})),
));
}
Err(e) => {
shut_down_worker_threads!();
@ -2195,180 +2104,6 @@ fn load_multi_threaded<'a>(
}
}
fn worker_task_step<'a>(
worker_arena: &'a Bump,
worker: &Worker<BuildTask<'a>>,
injector: &Injector<BuildTask<'a>>,
stealers: &[Stealer<BuildTask<'a>>],
worker_msg_rx: &crossbeam::channel::Receiver<WorkerMsg>,
msg_tx: &MsgSender<'a>,
src_dir: &Path,
roc_cache_dir: RocCacheDir<'_>,
target: Target,
) -> Result<ControlFlow<(), ()>, LoadingProblem<'a>> {
match worker_msg_rx.try_recv() {
Ok(msg) => {
match msg {
WorkerMsg::Shutdown => {
// We've finished all our work. It's time to
// shut down the thread, so when the main thread
// blocks on joining with all the worker threads,
// it can finally exit too!
Ok(ControlFlow::Break(()))
}
WorkerMsg::TaskAdded => {
// Find a task - either from this thread's queue,
// or from the main queue, or from another worker's
// queue - and run it.
//
// There might be no tasks to work on! That could
// happen if another thread is working on a task
// which will later result in more tasks being
// added. In that case, do nothing, and keep waiting
// until we receive a Shutdown message.
if let Some(task) = find_task(worker, injector, stealers) {
let result = run_task(
task,
worker_arena,
src_dir,
msg_tx.clone(),
roc_cache_dir,
target,
);
match result {
Ok(()) => {}
Err(LoadingProblem::ChannelProblem(problem)) => {
panic!("Channel problem: {problem:?}");
}
Err(LoadingProblem::ParsingFailed(problem)) => {
msg_tx.send(Msg::FailedToParse(problem)).unwrap();
}
Err(LoadingProblem::FileProblem { filename, error }) => {
msg_tx
.send(Msg::FailedToReadFile { filename, error })
.unwrap();
}
Err(LoadingProblem::IncorrectModuleName(err)) => {
msg_tx.send(Msg::IncorrectModuleName(err)).unwrap();
}
Err(err @ LoadingProblem::UnrecognizedPackageShorthand { .. }) => {
msg_tx.send(Msg::FailedToLoad(err)).unwrap();
}
Err(other) => {
return Err(other);
}
}
}
Ok(ControlFlow::Continue(()))
}
}
}
Err(err) => match err {
crossbeam::channel::TryRecvError::Empty => Ok(ControlFlow::Continue(())),
crossbeam::channel::TryRecvError::Disconnected => Ok(ControlFlow::Break(())),
},
}
}
fn worker_task<'a>(
worker_arena: &'a Bump,
worker: Worker<BuildTask<'a>>,
injector: &Injector<BuildTask<'a>>,
stealers: &[Stealer<BuildTask<'a>>],
worker_msg_rx: crossbeam::channel::Receiver<WorkerMsg>,
msg_tx: MsgSender<'a>,
src_dir: &Path,
roc_cache_dir: RocCacheDir<'_>,
target: Target,
) -> Result<(), LoadingProblem<'a>> {
// Keep listening until we receive a Shutdown msg
for msg in worker_msg_rx.iter() {
match msg {
WorkerMsg::Shutdown => {
// We've finished all our work. It's time to
// shut down the thread, so when the main thread
// blocks on joining with all the worker threads,
// it can finally exit too!
return Ok(());
}
WorkerMsg::TaskAdded => {
// Find a task - either from this thread's queue,
// or from the main queue, or from another worker's
// queue - and run it.
//
// There might be no tasks to work on! That could
// happen if another thread is working on a task
// which will later result in more tasks being
// added. In that case, do nothing, and keep waiting
// until we receive a Shutdown message.
if let Some(task) = find_task(&worker, injector, stealers) {
log!(
">>> {}",
match &task {
BuildTask::LoadModule { module_name, .. } => {
format!("BuildTask::LoadModule({module_name:?})")
}
BuildTask::Parse { header, .. } => {
format!("BuildTask::Parse({})", header.module_path.display())
}
BuildTask::CanonicalizeAndConstrain { parsed, .. } => format!(
"BuildTask::CanonicalizeAndConstrain({})",
parsed.module_path.display()
),
BuildTask::Solve { module, .. } => {
format!("BuildTask::Solve({:?})", module.module_id)
}
BuildTask::BuildPendingSpecializations { module_id, .. } => {
format!("BuildTask::BuildPendingSpecializations({module_id:?})")
}
BuildTask::MakeSpecializations { module_id, .. } => {
format!("BuildTask::MakeSpecializations({module_id:?})")
}
}
);
let result = run_task(
task,
worker_arena,
src_dir,
msg_tx.clone(),
roc_cache_dir,
target,
);
match result {
Ok(()) => {}
Err(LoadingProblem::ChannelProblem(problem)) => {
panic!("Channel problem: {problem:?}");
}
Err(LoadingProblem::ParsingFailed(problem)) => {
msg_tx.send(Msg::FailedToParse(problem)).unwrap();
}
Err(LoadingProblem::FileProblem { filename, error }) => {
msg_tx
.send(Msg::FailedToReadFile { filename, error })
.unwrap();
}
Err(LoadingProblem::IncorrectModuleName(err)) => {
msg_tx.send(Msg::IncorrectModuleName(err)).unwrap();
}
Err(err @ LoadingProblem::UnrecognizedPackageShorthand { .. }) => {
msg_tx.send(Msg::FailedToLoad(err)).unwrap();
}
Err(other) => {
return Err(other);
}
}
}
}
}
}
Ok(())
}
fn start_tasks<'a>(
arena: &'a Bump,
state: &mut State<'a>,
@ -2380,7 +2115,7 @@ fn start_tasks<'a>(
let tasks = start_phase(module_id, phase, arena, state);
for task in tasks {
enqueue_task(injector, worker_listeners, task, state)?
enqueue_task(injector, worker_listeners, task)?
}
}
@ -3962,32 +3697,6 @@ fn module_name_to_path<'a>(
(filename, opt_shorthand)
}
/// Find a task according to the following algorithm:
///
/// 1. Look in a local Worker queue. If it has a task, pop it off the queue and return it.
/// 2. If that queue was empty, ask the global queue for a task.
/// 3. If the global queue is also empty, iterate through each Stealer (each Worker queue has a
/// corresponding Stealer, which can steal from it. Stealers can be shared across threads.)
///
/// Based on https://docs.rs/crossbeam/0.7.3/crossbeam/deque/index.html#examples
fn find_task<T>(local: &Worker<T>, global: &Injector<T>, stealers: &[Stealer<T>]) -> Option<T> {
// Pop a task from the local queue, if not empty.
local.pop().or_else(|| {
// Otherwise, we need to look for a task elsewhere.
iter::repeat_with(|| {
// Try stealing a task from the global queue.
global
.steal()
// Or try stealing a task from one of the other threads.
.or_else(|| stealers.iter().map(|s| s.steal()).collect())
})
// Loop while no task was stolen and any steal operation needs to be retried.
.find(|s| !s.is_retry())
// Extract the stolen task, if there is one.
.and_then(|s| s.success())
})
}
#[derive(Debug)]
struct HeaderOutput<'a> {
module_id: ModuleId,
@ -3996,14 +3705,19 @@ struct HeaderOutput<'a> {
opt_platform_shorthand: Option<&'a str>,
}
fn ensure_roc_file<'a>(filename: &Path, src_bytes: &[u8]) -> Result<(), LoadingProblem<'a>> {
pub enum RocFileErr {
InvalidExtension,
NotDotRocAndNoHashbangOnFirstLine,
InvalidUtf8,
}
fn ensure_roc_file<'a>(filename: &Path, src_bytes: &[u8]) -> Result<(), RocFileErr> {
match filename.extension() {
Some(ext) => {
if ext != ROC_FILE_EXTENSION {
return Err(LoadingProblem::FileProblem {
filename: filename.to_path_buf(),
error: io::ErrorKind::Unsupported,
});
if ext == ROC_FILE_EXTENSION {
Ok(())
} else {
Err(RocFileErr::InvalidExtension)
}
}
None => {
@ -4011,18 +3725,19 @@ fn ensure_roc_file<'a>(filename: &Path, src_bytes: &[u8]) -> Result<(), LoadingP
.iter()
.position(|a| *a == b'\n')
.unwrap_or(src_bytes.len());
let frist_line_bytes = src_bytes[0..index].to_vec();
if let Ok(first_line) = String::from_utf8(frist_line_bytes) {
let first_line_bytes = &src_bytes[0..index];
if let Ok(first_line) = core::str::from_utf8(first_line_bytes) {
if !(first_line.starts_with("#!") && first_line.contains("roc")) {
return Err(LoadingProblem::FileProblem {
filename: filename.to_path_buf(),
error: std::io::ErrorKind::Unsupported,
});
Err(RocFileErr::NotDotRocAndNoHashbangOnFirstLine)
} else {
Ok(())
}
} else {
Err(RocFileErr::InvalidUtf8)
}
}
}
Ok(())
}
fn parse_header<'a>(
@ -4043,7 +3758,23 @@ fn parse_header<'a>(
let parsed = roc_parse::module::parse_header(arena, parse_state.clone());
let parse_header_duration = parse_start.elapsed();
ensure_roc_file(&filename, src_bytes)?;
if let Err(problem) = ensure_roc_file(&filename, src_bytes) {
let problem = match problem {
// TODO we should print separate error messages for these
RocFileErr::InvalidExtension | RocFileErr::NotDotRocAndNoHashbangOnFirstLine => {
LoadingProblem::FileProblem {
filename,
error: io::ErrorKind::Unsupported,
}
}
RocFileErr::InvalidUtf8 => LoadingProblem::FileProblem {
filename,
error: io::ErrorKind::InvalidData,
},
};
return Err(problem);
};
// Insert the first entries for this module's timings
let mut module_timing = ModuleTiming::new(start_time);
@ -6357,10 +6088,10 @@ fn run_task<'a>(
msg_tx: MsgSender<'a>,
roc_cache_dir: RocCacheDir<'_>,
target: Target,
) -> Result<(), LoadingProblem<'a>> {
) -> Result<(), ChannelProblem> {
use BuildTask::*;
let msg = match task {
let msg_result = match task {
LoadModule {
module_name,
module_ids,
@ -6509,13 +6240,37 @@ fn run_task<'a>(
derived_module,
expectations,
)),
}?;
};
msg_tx
.send(msg)
.map_err(|_| LoadingProblem::ChannelProblem(ChannelProblem::FailedToSendTaskMsg))?;
match msg_result {
Ok(msg) => {
msg_tx
.send(msg)
.map_err(|_| ChannelProblem::FailedToSendTaskMsg)?;
Ok(())
Ok(())
}
Err(loading_problem) => {
let result = match loading_problem {
LoadingProblem::ChannelProblem(problem) => {
return Err(problem);
}
LoadingProblem::ParsingFailed(problem) => msg_tx.send(Msg::FailedToParse(problem)),
LoadingProblem::FileProblem { filename, error } => {
msg_tx.send(Msg::FailedToReadFile { filename, error })
}
LoadingProblem::IncorrectModuleName(err) => {
msg_tx.send(Msg::IncorrectModuleName(err))
}
err => msg_tx.send(Msg::FailedToLoad(err)),
};
match result {
Ok(()) => Ok(()),
Err(_) => Err(ChannelProblem::FailedToSendTaskMsg),
}
}
}
}
fn to_import_cycle_report(

View file

@ -0,0 +1,16 @@
[package]
name = "roc_worker"
description = "This is used in loading."
authors.workspace = true
edition.workspace = true
license.workspace = true
version.workspace = true
[dependencies]
roc_collections = { path = "../collections" }
roc_module = { path = "../module" }
roc_work = { path = "../work" }
roc_error_macros = { path = "../../error_macros" }
crossbeam.workspace = true

View file

@ -0,0 +1,3 @@
mod worker;
pub use worker::*;

View file

@ -0,0 +1,155 @@
use crossbeam::{
channel::{Receiver, SendError, Sender},
deque::{Injector, Stealer, Worker},
};
use roc_collections::MutSet;
use roc_module::symbol::ModuleId;
use roc_work::Phase;
use std::ops::ControlFlow;
#[derive(Debug)]
pub enum WorkerMsg {
Shutdown,
TaskAdded,
}
#[derive(Debug)]
pub enum ChannelProblem {
FailedToSendRootMsg,
FailedToSendWorkerShutdownMsg,
ChannelDisconnected,
FailedToSendManyMsg,
FailedToSendFinishedSpecializationsMsg,
FailedToSendTaskMsg,
FailedToSendFinishedTypeCheckingMsg,
FailedToEnqueueTask,
}
pub fn worker_task_step<'a, BuildTask>(
worker: &Worker<BuildTask>,
injector: &Injector<BuildTask>,
stealers: &[Stealer<BuildTask>],
worker_msg_rx: &Receiver<WorkerMsg>,
run_task: impl Fn(BuildTask) -> Result<(), ChannelProblem>,
) -> Result<ControlFlow<(), ()>, ChannelProblem> {
match worker_msg_rx.try_recv() {
Ok(msg) => {
match msg {
WorkerMsg::Shutdown => {
// We've finished all our work. It's time to
// shut down the thread, so when the main thread
// blocks on joining with all the worker threads,
// it can finally exit too!
Ok(ControlFlow::Break(()))
}
WorkerMsg::TaskAdded => {
// Find a task - either from this thread's queue,
// or from the main queue, or from another worker's
// queue - and run it.
//
// There might be no tasks to work on! That could
// happen if another thread is working on a task
// which will later result in more tasks being
// added. In that case, do nothing, and keep waiting
// until we receive a Shutdown message.
if let Some(task) = find_task(worker, injector, stealers) {
run_task(task)?;
}
Ok(ControlFlow::Continue(()))
}
}
}
Err(err) => match err {
crossbeam::channel::TryRecvError::Empty => Ok(ControlFlow::Continue(())),
crossbeam::channel::TryRecvError::Disconnected => {
Err(ChannelProblem::ChannelDisconnected)
}
},
}
}
pub fn worker_task<'a, BuildTask>(
worker: Worker<BuildTask>,
injector: &Injector<BuildTask>,
stealers: &[Stealer<BuildTask>],
worker_msg_rx: crossbeam::channel::Receiver<WorkerMsg>,
run_task: impl Fn(BuildTask) -> Result<(), ChannelProblem>,
) -> Result<(), ChannelProblem> {
// Keep listening until we receive a Shutdown msg
for msg in worker_msg_rx.iter() {
match msg {
WorkerMsg::Shutdown => {
// We've finished all our work. It's time to
// shut down the thread, so when the main thread
// blocks on joining with all the worker threads,
// it can finally exit too!
return Ok(());
}
WorkerMsg::TaskAdded => {
// Find a task - either from this thread's queue,
// or from the main queue, or from another worker's
// queue - and run it.
//
// There might be no tasks to work on! That could
// happen if another thread is working on a task
// which will later result in more tasks being
// added. In that case, do nothing, and keep waiting
// until we receive a Shutdown message.
if let Some(task) = find_task(&worker, injector, stealers) {
run_task(task)?;
}
}
}
}
Ok(())
}
pub fn start_tasks<'a, State, Task, Tasks: IntoIterator<Item = Task>>(
state: &mut State,
work: MutSet<(ModuleId, Phase)>,
injector: &Injector<Task>,
worker_listeners: &'a [Sender<WorkerMsg>],
mut start_phase: impl FnMut(ModuleId, Phase, &mut State) -> Tasks,
) -> Result<(), SendError<WorkerMsg>> {
for (module_id, phase) in work {
let tasks = start_phase(module_id, phase, state);
for task in tasks {
injector.push(task);
for listener in worker_listeners {
listener.send(WorkerMsg::TaskAdded)?;
}
}
}
Ok(())
}
/// Find a task according to the following algorithm:
///
/// 1. Look in a local Worker queue. If it has a task, pop it off the queue and return it.
/// 2. If that queue was empty, ask the global queue for a task.
/// 3. If the global queue is also empty, iterate through each Stealer (each Worker queue has a
/// corresponding Stealer, which can steal from it. Stealers can be shared across threads.)
///
/// Based on https://docs.rs/crossbeam/0.7.3/crossbeam/deque/index.html#examples
pub fn find_task<T>(local: &Worker<T>, global: &Injector<T>, stealers: &[Stealer<T>]) -> Option<T> {
// Pop a task from the local queue, if not empty.
local.pop().or_else(|| {
// Otherwise, we need to look for a task elsewhere.
core::iter::repeat_with(|| {
// Try stealing a task from the global queue.
global
.steal()
// Or try stealing a task from one of the other threads.
.or_else(|| stealers.iter().map(|s| s.steal()).collect())
})
// Loop while no task was stolen and any steal operation needs to be retried.
.find(|s| !s.is_retry())
// Extract the stolen task, if there is one.
.and_then(|s| s.success())
})
}