get it up and running

This commit is contained in:
Folkert 2020-10-10 22:35:38 +02:00
parent d8a0760726
commit 5c558a9a87

View file

@ -306,18 +306,30 @@ fn start_phase<'a>(module_id: ModuleId, phase: Phase, state: &mut State<'a>) ->
} }
} }
Phase::MakeSpecializations => { Phase::MakeSpecializations => {
// load let found_specializations = state
BuildTask::BuildSpecializations { .module_cache
layout_cache, .found_specializations
.remove(&module_id)
.unwrap();
let FoundSpecializationsModule {
module_id, module_id,
module_timing,
solved_subs,
decls,
finished_info,
ident_ids, ident_ids,
subs,
procs,
layout_cache,
finished_info,
} = found_specializations;
BuildTask::MakeSpecializations {
module_id,
ident_ids,
subs,
procs,
layout_cache,
finished_info,
} }
} }
_ => todo!(),
} }
} }
@ -366,19 +378,23 @@ struct ConstrainedModule<'a> {
#[derive(Debug)] #[derive(Debug)]
pub struct TypeCheckedModule<'a> { pub struct TypeCheckedModule<'a> {
module_id: ModuleId, pub module_id: ModuleId,
layout_cache: LayoutCache<'a>, pub layout_cache: LayoutCache<'a>,
module_timing: ModuleTiming, pub module_timing: ModuleTiming,
solved_subs: Solved<Subs>, pub solved_subs: Solved<Subs>,
decls: Vec<Declaration>, pub decls: Vec<Declaration>,
ident_ids: IdentIds, pub ident_ids: IdentIds,
finished_info: FinishedInfo<'a>, pub finished_info: FinishedInfo<'a>,
} }
#[derive(Debug)] #[derive(Debug)]
pub struct FoundSpecializationsModule<'a> { pub struct FoundSpecializationsModule<'a> {
pub module_id: ModuleId, pub module_id: ModuleId,
pub ident_ids: IdentIds,
pub layout_cache: LayoutCache<'a>,
pub procs: Procs<'a>, pub procs: Procs<'a>,
pub subs: Subs,
pub finished_info: FinishedInfo<'a>,
} }
#[derive(Debug)] #[derive(Debug)]
@ -433,12 +449,28 @@ enum Msg<'a> {
solved_subs: Solved<Subs>, solved_subs: Solved<Subs>,
finished_info: FinishedInfo<'a>, finished_info: FinishedInfo<'a>,
}, },
MadeSpecializations {}, MadeSpecializations {
FinishedAllSpecialization {}, module_id: ModuleId,
ident_ids: IdentIds,
layout_cache: LayoutCache<'a>,
procedures: MutMap<(Symbol, Layout<'a>), Proc<'a>>,
problems: Vec<roc_mono::ir::MonoProblem>,
subs: Subs,
finished_info: FinishedInfo<'a>,
},
/// The task is to only typecheck AND monomorphize modules
/// all modules are now monomorphized, we are done
FinishedAllSpecialization {
subs: Subs,
problems: Vec<MonoProblem>,
exposed_vars_by_symbol: Vec<(Symbol, Variable)>,
src: &'a str,
},
} }
#[derive(Debug)] #[derive(Debug)]
struct FinishedInfo<'a> { pub struct FinishedInfo<'a> {
problems: Vec<solve::TypeError>, problems: Vec<solve::TypeError>,
exposed_vars_by_symbol: Vec<(Symbol, Variable)>, exposed_vars_by_symbol: Vec<(Symbol, Variable)>,
src: &'a str, src: &'a str,
@ -458,6 +490,7 @@ struct State<'a> {
pub module_cache: ModuleCache<'a>, pub module_cache: ModuleCache<'a>,
pub dependencies: Dependencies, pub dependencies: Dependencies,
pub procedures: MutMap<(Symbol, Layout<'a>), Proc<'a>>,
/// This is the "final" list of IdentIds, after canonicalization and constraint gen /// This is the "final" list of IdentIds, after canonicalization and constraint gen
/// have completed for a given module. /// have completed for a given module.
@ -600,6 +633,14 @@ enum BuildTask<'a> {
decls: Vec<Declaration>, decls: Vec<Declaration>,
finished_info: FinishedInfo<'a>, finished_info: FinishedInfo<'a>,
}, },
MakeSpecializations {
module_id: ModuleId,
ident_ids: IdentIds,
subs: Subs,
procs: Procs<'a>,
layout_cache: LayoutCache<'a>,
finished_info: FinishedInfo<'a>,
},
} }
enum WorkerMsg { enum WorkerMsg {
@ -658,6 +699,8 @@ pub fn load_and_typecheck(
) -> Result<LoadedModule, LoadingProblem> { ) -> Result<LoadedModule, LoadingProblem> {
use LoadResult::*; use LoadResult::*;
// Reserve one CPU for the main thread, and let all the others be eligible
match load( match load(
arena, arena,
filename, filename,
@ -748,12 +791,16 @@ fn load<'a>(
src_dir: &Path, src_dir: &Path,
exposed_types: SubsByModule, exposed_types: SubsByModule,
goal_phase: Phase, goal_phase: Phase,
) -> Result<LoadResult<'a>, LoadingProblem> { ) -> Result<LoadResult<'a>, LoadingProblem>
where
{
// Reserve one CPU for the main thread, and let all the others be eligible // Reserve one CPU for the main thread, and let all the others be eligible
// to spawn workers. // to spawn workers.
let num_workers = num_cpus::get() - 1; let num_workers = num_cpus::get() - 1;
let worker_arenas = arena.alloc(bumpalo::collections::Vec::with_capacity_in(
let mut worker_arenas = bumpalo::collections::Vec::with_capacity_in(num_workers, &arena); num_workers,
arena,
));
for _ in 0..num_workers { for _ in 0..num_workers {
worker_arenas.push(Bump::new()); worker_arenas.push(Bump::new());
@ -769,7 +816,7 @@ fn load<'a>(
let root_start_time = SystemTime::now(); let root_start_time = SystemTime::now();
load_filename( load_filename(
&arena, arena,
filename, filename,
Arc::clone(&arc_modules), Arc::clone(&arc_modules),
Arc::clone(&ident_ids_by_module), Arc::clone(&ident_ids_by_module),
@ -788,169 +835,205 @@ fn load<'a>(
// into the worker threads, because those workers' stealers need to be // into the worker threads, because those workers' stealers need to be
// shared bet,een all threads, and this coordination work is much easier // shared bet,een all threads, and this coordination work is much easier
// on the main thread. // on the main thread.
let mut worker_queues = bumpalo::collections::Vec::with_capacity_in(num_workers, &arena); let mut worker_queues = bumpalo::collections::Vec::with_capacity_in(num_workers, arena);
let mut stealers = bumpalo::collections::Vec::with_capacity_in(num_workers, &arena); let mut stealers = bumpalo::collections::Vec::with_capacity_in(num_workers, arena);
thread::scope(|thread_scope| { let mut it = worker_arenas.iter_mut();
for _ in 0..num_workers {
let worker = Worker::new_lifo();
stealers.push(worker.stealer()); {
worker_queues.push(worker); thread::scope(|thread_scope| {
} for _ in 0..num_workers {
let worker = Worker::new_lifo();
// Get a reference to the completed stealers, so we can send that stealers.push(worker.stealer());
// reference to each worker. (Slices are Sync, but bumpalo Vecs are not.) worker_queues.push(worker);
let stealers = stealers.into_bump_slice(); }
let mut headers_parsed = MutSet::default(); // Get a reference to the completed stealers, so we can send that
// reference to each worker. (Slices are Sync, but bumpalo Vecs are not.)
let stealers = stealers.into_bump_slice();
// We've already parsed the root's header. (But only its header, so far.) let mut headers_parsed = MutSet::default();
headers_parsed.insert(root_id);
let mut loading_started = MutSet::default(); // We've already parsed the root's header. (But only its header, so far.)
headers_parsed.insert(root_id);
// If the root module we're still processing happens to be an interface, let mut loading_started = MutSet::default();
// it's possible that something else will import it. That will
// necessarily cause a cyclic import error, but in the meantime
// we still shouldn't load it.
loading_started.insert(root_id);
let mut worker_listeners = bumpalo::collections::Vec::with_capacity_in(num_workers, &arena); // If the root module we're still processing happens to be an interface,
// it's possible that something else will import it. That will
// necessarily cause a cyclic import error, but in the meantime
// we still shouldn't load it.
loading_started.insert(root_id);
let stdlib_mode = stdlib.mode; let mut worker_listeners =
bumpalo::collections::Vec::with_capacity_in(num_workers, arena);
for worker_arena in worker_arenas.iter_mut() { let stdlib_mode = stdlib.mode;
let msg_tx = msg_tx.clone();
let worker = worker_queues.pop().unwrap();
let (worker_msg_tx, worker_msg_rx) = bounded(1024);
worker_listeners.push(worker_msg_tx); for worker_arena in it {
let msg_tx = msg_tx.clone();
let worker = worker_queues.pop().unwrap();
let (worker_msg_tx, worker_msg_rx) = bounded(1024);
// We only want to move a *reference* to the main task queue's worker_listeners.push(worker_msg_tx);
// injector in the thread, not the injector itself
// (since other threads need to reference it too).
let injector = &injector;
// Record this thread's handle so the main thread can join it later. // We only want to move a *reference* to the main task queue's
thread_scope.spawn(move |_| { // injector in the thread, not the injector itself
// Keep listening until we receive a Shutdown msg // (since other threads need to reference it too).
for msg in worker_msg_rx.iter() { let injector = &injector;
match msg {
WorkerMsg::Shutdown => { // Record this thread's handle so the main thread can join it later.
// We've finished all our work. It's time to thread_scope.spawn(move |_| {
// shut down the thread, so when the main thread // Keep listening until we receive a Shutdown msg
// blocks on joining with all the worker threads, for msg in worker_msg_rx.iter() {
// it can finally exit too! match msg {
return; WorkerMsg::Shutdown => {
} // We've finished all our work. It's time to
WorkerMsg::TaskAdded => { // shut down the thread, so when the main thread
// Find a task - either from this thread's queue, // blocks on joining with all the worker threads,
// or from the main queue, or from another worker's // it can finally exit too!
// queue - and run it. return;
// }
// There might be no tasks to work on! That could WorkerMsg::TaskAdded => {
// happen if another thread is working on a task // Find a task - either from this thread's queue,
// which will later result in more tasks being // or from the main queue, or from another worker's
// added. In that case, do nothing, and keep waiting // queue - and run it.
// until we receive a Shutdown message. //
if let Some(task) = find_task(&worker, injector, stealers) { // There might be no tasks to work on! That could
run_task(task, worker_arena, src_dir, msg_tx.clone(), stdlib_mode) // happen if another thread is working on a task
// which will later result in more tasks being
// added. In that case, do nothing, and keep waiting
// until we receive a Shutdown message.
if let Some(task) = find_task(&worker, injector, stealers) {
run_task(
task,
worker_arena,
src_dir,
msg_tx.clone(),
stdlib_mode,
)
.expect("Msg channel closed unexpectedly."); .expect("Msg channel closed unexpectedly.");
}
} }
} }
} }
}
// Needed to prevent a borrow checker error about this closure // Needed to prevent a borrow checker error about this closure
// outliving its enclosing function. // outliving its enclosing function.
drop(worker_msg_rx); drop(worker_msg_rx);
}); });
} }
let mut state = State { let mut state = State {
root_id, root_id,
goal_phase, goal_phase,
stdlib, stdlib,
module_cache: ModuleCache::default(), module_cache: ModuleCache::default(),
dependencies: Dependencies::default(), dependencies: Dependencies::default(),
exposed_types, procedures: MutMap::default(),
headers_parsed, exposed_types,
loading_started, headers_parsed,
can_problems: std::vec::Vec::new(), loading_started,
type_problems: std::vec::Vec::new(), can_problems: std::vec::Vec::new(),
mono_problems: std::vec::Vec::new(), type_problems: std::vec::Vec::new(),
arc_modules, mono_problems: std::vec::Vec::new(),
constrained_ident_ids: IdentIds::exposed_builtins(0), arc_modules,
ident_ids_by_module, constrained_ident_ids: IdentIds::exposed_builtins(0),
declarations_by_id: MutMap::default(), ident_ids_by_module,
exposed_symbols_by_module: MutMap::default(), declarations_by_id: MutMap::default(),
unsolved_modules: MutMap::default(), exposed_symbols_by_module: MutMap::default(),
timings: MutMap::default(), unsolved_modules: MutMap::default(),
needs_specialization: MutSet::default(), timings: MutMap::default(),
all_pending_specializations: MutMap::default(), needs_specialization: MutSet::default(),
specializations_in_flight: 0, all_pending_specializations: MutMap::default(),
layout_caches: std::vec::Vec::with_capacity(num_cpus::get()), specializations_in_flight: 0,
procs: Procs::default(), layout_caches: std::vec::Vec::with_capacity(num_cpus::get()),
}; procs: Procs::default(),
};
// We've now distributed one worker queue to each thread. // We've now distributed one worker queue to each thread.
// There should be no queues left to distribute! // There should be no queues left to distribute!
debug_assert!(worker_queues.is_empty()); debug_assert!(worker_queues.is_empty());
drop(worker_queues); drop(worker_queues);
// Grab a reference to these Senders outside the loop, so we can share // Grab a reference to these Senders outside the loop, so we can share
// it across each iteration of the loop. // it across each iteration of the loop.
let worker_listeners = worker_listeners.into_bump_slice(); let worker_listeners = worker_listeners.into_bump_slice();
let msg_tx = msg_tx.clone(); let msg_tx = msg_tx.clone();
// The root module will have already queued up messages to process, // The root module will have already queued up messages to process,
// and processing those messages will in turn queue up more messages. // and processing those messages will in turn queue up more messages.
for msg in msg_rx.iter() { for msg in msg_rx.iter() {
match msg { match msg {
Msg::FinishedAllTypeChecking { Msg::FinishedAllTypeChecking {
solved_subs,
problems,
exposed_vars_by_symbol,
src,
} => {
// We're done! There should be no more messages pending.
debug_assert!(msg_rx.is_empty());
// Shut down all the worker threads.
for listener in worker_listeners {
listener
.send(WorkerMsg::Shutdown)
.map_err(|_| LoadingProblem::MsgChannelDied)?;
}
return Ok(LoadResult::TypeChecked(finish(
state,
solved_subs, solved_subs,
problems, problems,
exposed_vars_by_symbol, exposed_vars_by_symbol,
src, src,
))); } => {
} // We're done! There should be no more messages pending.
msg => { debug_assert!(msg_rx.is_empty());
// This is where most of the main thread's work gets done.
// Everything up to this point has been setting up the threading // Shut down all the worker threads.
// system which lets this logic work efficiently. for listener in worker_listeners {
state = update( listener
state, .send(WorkerMsg::Shutdown)
msg, .map_err(|_| LoadingProblem::MsgChannelDied)?;
msg_tx.clone(), }
&injector,
worker_listeners, return Ok(LoadResult::TypeChecked(finish(
&arena, state,
)?; solved_subs,
problems,
exposed_vars_by_symbol,
src,
)));
}
Msg::FinishedAllSpecialization {
subs,
problems,
exposed_vars_by_symbol,
src,
} => {
// We're done! There should be no more messages pending.
debug_assert!(msg_rx.is_empty());
// Shut down all the worker threads.
for listener in worker_listeners {
listener
.send(WorkerMsg::Shutdown)
.map_err(|_| LoadingProblem::MsgChannelDied)?;
}
return Ok(LoadResult::Monomorphized(finish_specialization(
state,
subs,
problems,
exposed_vars_by_symbol,
src,
)));
}
msg => {
// This is where most of the main thread's work gets done.
// Everything up to this point has been setting up the threading
// system which lets this logic work efficiently.
state = update(
state,
msg,
msg_tx.clone(),
&injector,
worker_listeners,
arena,
)?;
}
} }
} }
}
// The msg_rx receiver closed unexpectedly before we finished solving everything // The msg_rx receiver closed unexpectedly before we finished solving everything
Err(LoadingProblem::MsgChannelDied) Err(LoadingProblem::MsgChannelDied)
}) })
}
.unwrap() .unwrap()
} }
@ -1127,9 +1210,24 @@ fn update<'a>(
Ok(state) Ok(state)
} }
FoundSpecializations { FoundSpecializations {
module_id, procs, .. module_id,
procs,
finished_info,
solved_subs,
ident_ids,
layout_cache,
problems: _,
} => { } => {
let found_specializations_module = FoundSpecializationsModule { module_id, procs }; let subs = solved_subs.into_inner();
let found_specializations_module = FoundSpecializationsModule {
layout_cache,
module_id,
procs,
finished_info,
ident_ids,
subs,
};
state state
.module_cache .module_cache
@ -1145,11 +1243,56 @@ fn update<'a>(
enqueue_task(&injector, worker_listeners, task)? enqueue_task(&injector, worker_listeners, task)?
} }
Ok(state) Ok(state)
} }
MadeSpecializations { .. } => { MadeSpecializations {
todo!(); module_id,
ident_ids,
subs,
finished_info,
procedures,
..
} => {
println!("done specializing {:?}", module_id);
state.procedures.extend(procedures);
dbg!(&state.procedures);
let work = state
.dependencies
.notify(module_id, Phase::MakeSpecializations);
if work.is_empty()
&& state.dependencies.solved_all()
&& state.goal_phase == Phase::MakeSpecializations
{
// state.timings.insert(module_id, module_timing);
msg_tx
.send(Msg::FinishedAllSpecialization {
subs,
// TODO thread through mono problems
problems: vec![],
exposed_vars_by_symbol: finished_info.exposed_vars_by_symbol,
src: finished_info.src,
})
.map_err(|_| LoadingProblem::MsgChannelDied)?;
// bookkeeping
state.constrained_ident_ids.insert(module_id, ident_ids);
// As far as type-checking goes, once we've solved
// the originally requested module, we're all done!
return Ok(state);
} else {
for (module_id, phase) in work {
let task = start_phase(module_id, phase, &mut state);
enqueue_task(&injector, worker_listeners, task)?
}
}
Ok(state)
} }
Msg::FinishedAllTypeChecking { .. } => { Msg::FinishedAllTypeChecking { .. } => {
unreachable!(); unreachable!();
@ -1160,6 +1303,47 @@ fn update<'a>(
} }
} }
fn finish_specialization<'a>(
mut state: State<'a>,
subs: Subs,
problems: Vec<MonoProblem>,
exposed_vars_by_symbol: Vec<(Symbol, Variable)>,
src: &'a str,
) -> MonomorphizedModule<'a> {
state.mono_problems.extend(problems);
let module_ids = Arc::try_unwrap(state.arc_modules)
.unwrap_or_else(|_| panic!("There were still outstanding Arc references to module_ids"))
.into_inner()
.expect("Unwrapping mutex for module_ids");
let interns = Interns {
module_ids,
all_ident_ids: state.constrained_ident_ids,
};
let State {
mono_problems,
type_problems,
can_problems,
procedures,
..
} = state;
MonomorphizedModule {
can_problems,
mono_problems,
type_problems,
exposed_vars_by_symbol,
module_id: state.root_id,
subs,
interns,
procedures,
src: src.into(),
timings: state.timings,
}
}
fn finish<'a>( fn finish<'a>(
mut state: State<'a>, mut state: State<'a>,
solved: Solved<Subs>, solved: Solved<Subs>,
@ -1710,6 +1894,51 @@ fn ident_from_exposed(entry: &ExposesEntry<'_>) -> Ident {
} }
} }
#[allow(clippy::too_many_arguments)]
fn make_specializations<'a>(
arena: &'a Bump,
home: ModuleId,
mut ident_ids: IdentIds,
mut subs: Subs,
mut procs: Procs<'a>,
mut layout_cache: LayoutCache<'a>,
finished_info: FinishedInfo<'a>,
) -> Msg<'a> {
let mut mono_problems = Vec::new();
// do the thing
let mut mono_env = roc_mono::ir::Env {
arena,
problems: &mut mono_problems,
subs: &mut subs,
home,
ident_ids: &mut ident_ids,
};
dbg!(&procs);
// TODO: for now this final specialization pass is sequential,
// with no parallelization at all. We should try to parallelize
// this, but doing so will require a redesign of Procs.
procs = roc_mono::ir::specialize_all(
&mut mono_env,
procs,
&mut layout_cache,
// &finished_info.vars_by_symbol,
);
let (procedures, _param_map) = procs.get_specialized_procs_help(mono_env.arena);
dbg!(&procedures);
Msg::MadeSpecializations {
module_id: home,
ident_ids,
layout_cache,
procedures,
problems: mono_problems,
subs,
finished_info,
}
}
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
fn build_pending_specializations<'a>( fn build_pending_specializations<'a>(
arena: &'a Bump, arena: &'a Bump,
@ -1868,6 +2097,22 @@ fn run_task<'a>(
layout_cache, layout_cache,
finished_info, finished_info,
)), )),
MakeSpecializations {
module_id,
ident_ids,
subs,
procs,
layout_cache,
finished_info,
} => Ok(make_specializations(
arena,
module_id,
ident_ids,
subs,
procs,
layout_cache,
finished_info,
)),
}?; }?;
msg_tx msg_tx