From 5c558a9a8716ae2ca33e717047e4eb3d03398fc1 Mon Sep 17 00:00:00 2001 From: Folkert Date: Sat, 10 Oct 2020 22:35:38 +0200 Subject: [PATCH] get it up and running --- compiler/load/src/file.rs | 567 +++++++++++++++++++++++++++----------- 1 file changed, 406 insertions(+), 161 deletions(-) diff --git a/compiler/load/src/file.rs b/compiler/load/src/file.rs index f59fed9516..07307acdf2 100644 --- a/compiler/load/src/file.rs +++ b/compiler/load/src/file.rs @@ -306,18 +306,30 @@ fn start_phase<'a>(module_id: ModuleId, phase: Phase, state: &mut State<'a>) -> } } Phase::MakeSpecializations => { - // load - BuildTask::BuildSpecializations { - layout_cache, + let found_specializations = state + .module_cache + .found_specializations + .remove(&module_id) + .unwrap(); + + let FoundSpecializationsModule { module_id, - module_timing, - solved_subs, - decls, - finished_info, ident_ids, + subs, + procs, + layout_cache, + finished_info, + } = found_specializations; + + BuildTask::MakeSpecializations { + module_id, + ident_ids, + subs, + procs, + layout_cache, + finished_info, } } - _ => todo!(), } } @@ -366,19 +378,23 @@ struct ConstrainedModule<'a> { #[derive(Debug)] pub struct TypeCheckedModule<'a> { - module_id: ModuleId, - layout_cache: LayoutCache<'a>, - module_timing: ModuleTiming, - solved_subs: Solved, - decls: Vec, - ident_ids: IdentIds, - finished_info: FinishedInfo<'a>, + pub module_id: ModuleId, + pub layout_cache: LayoutCache<'a>, + pub module_timing: ModuleTiming, + pub solved_subs: Solved, + pub decls: Vec, + pub ident_ids: IdentIds, + pub finished_info: FinishedInfo<'a>, } #[derive(Debug)] pub struct FoundSpecializationsModule<'a> { pub module_id: ModuleId, + pub ident_ids: IdentIds, + pub layout_cache: LayoutCache<'a>, pub procs: Procs<'a>, + pub subs: Subs, + pub finished_info: FinishedInfo<'a>, } #[derive(Debug)] @@ -433,12 +449,28 @@ enum Msg<'a> { solved_subs: Solved, finished_info: FinishedInfo<'a>, }, - MadeSpecializations {}, - FinishedAllSpecialization {}, + MadeSpecializations { + module_id: ModuleId, + ident_ids: IdentIds, + layout_cache: LayoutCache<'a>, + procedures: MutMap<(Symbol, Layout<'a>), Proc<'a>>, + problems: Vec, + subs: Subs, + finished_info: FinishedInfo<'a>, + }, + + /// The task is to only typecheck AND monomorphize modules + /// all modules are now monomorphized, we are done + FinishedAllSpecialization { + subs: Subs, + problems: Vec, + exposed_vars_by_symbol: Vec<(Symbol, Variable)>, + src: &'a str, + }, } #[derive(Debug)] -struct FinishedInfo<'a> { +pub struct FinishedInfo<'a> { problems: Vec, exposed_vars_by_symbol: Vec<(Symbol, Variable)>, src: &'a str, @@ -458,6 +490,7 @@ struct State<'a> { pub module_cache: ModuleCache<'a>, pub dependencies: Dependencies, + pub procedures: MutMap<(Symbol, Layout<'a>), Proc<'a>>, /// This is the "final" list of IdentIds, after canonicalization and constraint gen /// have completed for a given module. @@ -600,6 +633,14 @@ enum BuildTask<'a> { decls: Vec, finished_info: FinishedInfo<'a>, }, + MakeSpecializations { + module_id: ModuleId, + ident_ids: IdentIds, + subs: Subs, + procs: Procs<'a>, + layout_cache: LayoutCache<'a>, + finished_info: FinishedInfo<'a>, + }, } enum WorkerMsg { @@ -658,6 +699,8 @@ pub fn load_and_typecheck( ) -> Result { use LoadResult::*; + // Reserve one CPU for the main thread, and let all the others be eligible + match load( arena, filename, @@ -748,12 +791,16 @@ fn load<'a>( src_dir: &Path, exposed_types: SubsByModule, goal_phase: Phase, -) -> Result, LoadingProblem> { +) -> Result, LoadingProblem> +where +{ // Reserve one CPU for the main thread, and let all the others be eligible // to spawn workers. let num_workers = num_cpus::get() - 1; - - let mut worker_arenas = bumpalo::collections::Vec::with_capacity_in(num_workers, &arena); + let worker_arenas = arena.alloc(bumpalo::collections::Vec::with_capacity_in( + num_workers, + arena, + )); for _ in 0..num_workers { worker_arenas.push(Bump::new()); @@ -769,7 +816,7 @@ fn load<'a>( let root_start_time = SystemTime::now(); load_filename( - &arena, + arena, filename, Arc::clone(&arc_modules), Arc::clone(&ident_ids_by_module), @@ -788,169 +835,205 @@ fn load<'a>( // into the worker threads, because those workers' stealers need to be // shared bet,een all threads, and this coordination work is much easier // on the main thread. - let mut worker_queues = bumpalo::collections::Vec::with_capacity_in(num_workers, &arena); - let mut stealers = bumpalo::collections::Vec::with_capacity_in(num_workers, &arena); + let mut worker_queues = bumpalo::collections::Vec::with_capacity_in(num_workers, arena); + let mut stealers = bumpalo::collections::Vec::with_capacity_in(num_workers, arena); - thread::scope(|thread_scope| { - for _ in 0..num_workers { - let worker = Worker::new_lifo(); + let mut it = worker_arenas.iter_mut(); - stealers.push(worker.stealer()); - worker_queues.push(worker); - } + { + thread::scope(|thread_scope| { + for _ in 0..num_workers { + let worker = Worker::new_lifo(); - // Get a reference to the completed stealers, so we can send that - // reference to each worker. (Slices are Sync, but bumpalo Vecs are not.) - let stealers = stealers.into_bump_slice(); + stealers.push(worker.stealer()); + worker_queues.push(worker); + } - let mut headers_parsed = MutSet::default(); + // Get a reference to the completed stealers, so we can send that + // reference to each worker. (Slices are Sync, but bumpalo Vecs are not.) + let stealers = stealers.into_bump_slice(); - // We've already parsed the root's header. (But only its header, so far.) - headers_parsed.insert(root_id); + let mut headers_parsed = MutSet::default(); - let mut loading_started = MutSet::default(); + // We've already parsed the root's header. (But only its header, so far.) + headers_parsed.insert(root_id); - // If the root module we're still processing happens to be an interface, - // it's possible that something else will import it. That will - // necessarily cause a cyclic import error, but in the meantime - // we still shouldn't load it. - loading_started.insert(root_id); + let mut loading_started = MutSet::default(); - let mut worker_listeners = bumpalo::collections::Vec::with_capacity_in(num_workers, &arena); + // If the root module we're still processing happens to be an interface, + // it's possible that something else will import it. That will + // necessarily cause a cyclic import error, but in the meantime + // we still shouldn't load it. + loading_started.insert(root_id); - let stdlib_mode = stdlib.mode; + let mut worker_listeners = + bumpalo::collections::Vec::with_capacity_in(num_workers, arena); - for worker_arena in worker_arenas.iter_mut() { - let msg_tx = msg_tx.clone(); - let worker = worker_queues.pop().unwrap(); - let (worker_msg_tx, worker_msg_rx) = bounded(1024); + let stdlib_mode = stdlib.mode; - worker_listeners.push(worker_msg_tx); + for worker_arena in it { + let msg_tx = msg_tx.clone(); + let worker = worker_queues.pop().unwrap(); + let (worker_msg_tx, worker_msg_rx) = bounded(1024); - // We only want to move a *reference* to the main task queue's - // injector in the thread, not the injector itself - // (since other threads need to reference it too). - let injector = &injector; + worker_listeners.push(worker_msg_tx); - // Record this thread's handle so the main thread can join it later. - thread_scope.spawn(move |_| { - // Keep listening until we receive a Shutdown msg - for msg in worker_msg_rx.iter() { - match msg { - WorkerMsg::Shutdown => { - // We've finished all our work. It's time to - // shut down the thread, so when the main thread - // blocks on joining with all the worker threads, - // it can finally exit too! - return; - } - WorkerMsg::TaskAdded => { - // Find a task - either from this thread's queue, - // or from the main queue, or from another worker's - // queue - and run it. - // - // There might be no tasks to work on! That could - // happen if another thread is working on a task - // which will later result in more tasks being - // added. In that case, do nothing, and keep waiting - // until we receive a Shutdown message. - if let Some(task) = find_task(&worker, injector, stealers) { - run_task(task, worker_arena, src_dir, msg_tx.clone(), stdlib_mode) + // We only want to move a *reference* to the main task queue's + // injector in the thread, not the injector itself + // (since other threads need to reference it too). + let injector = &injector; + + // Record this thread's handle so the main thread can join it later. + thread_scope.spawn(move |_| { + // Keep listening until we receive a Shutdown msg + for msg in worker_msg_rx.iter() { + match msg { + WorkerMsg::Shutdown => { + // We've finished all our work. It's time to + // shut down the thread, so when the main thread + // blocks on joining with all the worker threads, + // it can finally exit too! + return; + } + WorkerMsg::TaskAdded => { + // Find a task - either from this thread's queue, + // or from the main queue, or from another worker's + // queue - and run it. + // + // There might be no tasks to work on! That could + // happen if another thread is working on a task + // which will later result in more tasks being + // added. In that case, do nothing, and keep waiting + // until we receive a Shutdown message. + if let Some(task) = find_task(&worker, injector, stealers) { + run_task( + task, + worker_arena, + src_dir, + msg_tx.clone(), + stdlib_mode, + ) .expect("Msg channel closed unexpectedly."); + } } } } - } - // Needed to prevent a borrow checker error about this closure - // outliving its enclosing function. - drop(worker_msg_rx); - }); - } + // Needed to prevent a borrow checker error about this closure + // outliving its enclosing function. + drop(worker_msg_rx); + }); + } - let mut state = State { - root_id, - goal_phase, - stdlib, - module_cache: ModuleCache::default(), - dependencies: Dependencies::default(), - exposed_types, - headers_parsed, - loading_started, - can_problems: std::vec::Vec::new(), - type_problems: std::vec::Vec::new(), - mono_problems: std::vec::Vec::new(), - arc_modules, - constrained_ident_ids: IdentIds::exposed_builtins(0), - ident_ids_by_module, - declarations_by_id: MutMap::default(), - exposed_symbols_by_module: MutMap::default(), - unsolved_modules: MutMap::default(), - timings: MutMap::default(), - needs_specialization: MutSet::default(), - all_pending_specializations: MutMap::default(), - specializations_in_flight: 0, - layout_caches: std::vec::Vec::with_capacity(num_cpus::get()), - procs: Procs::default(), - }; + let mut state = State { + root_id, + goal_phase, + stdlib, + module_cache: ModuleCache::default(), + dependencies: Dependencies::default(), + procedures: MutMap::default(), + exposed_types, + headers_parsed, + loading_started, + can_problems: std::vec::Vec::new(), + type_problems: std::vec::Vec::new(), + mono_problems: std::vec::Vec::new(), + arc_modules, + constrained_ident_ids: IdentIds::exposed_builtins(0), + ident_ids_by_module, + declarations_by_id: MutMap::default(), + exposed_symbols_by_module: MutMap::default(), + unsolved_modules: MutMap::default(), + timings: MutMap::default(), + needs_specialization: MutSet::default(), + all_pending_specializations: MutMap::default(), + specializations_in_flight: 0, + layout_caches: std::vec::Vec::with_capacity(num_cpus::get()), + procs: Procs::default(), + }; - // We've now distributed one worker queue to each thread. - // There should be no queues left to distribute! - debug_assert!(worker_queues.is_empty()); - drop(worker_queues); + // We've now distributed one worker queue to each thread. + // There should be no queues left to distribute! + debug_assert!(worker_queues.is_empty()); + drop(worker_queues); - // Grab a reference to these Senders outside the loop, so we can share - // it across each iteration of the loop. - let worker_listeners = worker_listeners.into_bump_slice(); - let msg_tx = msg_tx.clone(); + // Grab a reference to these Senders outside the loop, so we can share + // it across each iteration of the loop. + let worker_listeners = worker_listeners.into_bump_slice(); + let msg_tx = msg_tx.clone(); - // The root module will have already queued up messages to process, - // and processing those messages will in turn queue up more messages. - for msg in msg_rx.iter() { - match msg { - Msg::FinishedAllTypeChecking { - solved_subs, - problems, - exposed_vars_by_symbol, - src, - } => { - // We're done! There should be no more messages pending. - debug_assert!(msg_rx.is_empty()); - - // Shut down all the worker threads. - for listener in worker_listeners { - listener - .send(WorkerMsg::Shutdown) - .map_err(|_| LoadingProblem::MsgChannelDied)?; - } - - return Ok(LoadResult::TypeChecked(finish( - state, + // The root module will have already queued up messages to process, + // and processing those messages will in turn queue up more messages. + for msg in msg_rx.iter() { + match msg { + Msg::FinishedAllTypeChecking { solved_subs, problems, exposed_vars_by_symbol, src, - ))); - } - msg => { - // This is where most of the main thread's work gets done. - // Everything up to this point has been setting up the threading - // system which lets this logic work efficiently. - state = update( - state, - msg, - msg_tx.clone(), - &injector, - worker_listeners, - &arena, - )?; + } => { + // We're done! There should be no more messages pending. + debug_assert!(msg_rx.is_empty()); + + // Shut down all the worker threads. + for listener in worker_listeners { + listener + .send(WorkerMsg::Shutdown) + .map_err(|_| LoadingProblem::MsgChannelDied)?; + } + + return Ok(LoadResult::TypeChecked(finish( + state, + solved_subs, + problems, + exposed_vars_by_symbol, + src, + ))); + } + Msg::FinishedAllSpecialization { + subs, + problems, + exposed_vars_by_symbol, + src, + } => { + // We're done! There should be no more messages pending. + debug_assert!(msg_rx.is_empty()); + + // Shut down all the worker threads. + for listener in worker_listeners { + listener + .send(WorkerMsg::Shutdown) + .map_err(|_| LoadingProblem::MsgChannelDied)?; + } + + return Ok(LoadResult::Monomorphized(finish_specialization( + state, + subs, + problems, + exposed_vars_by_symbol, + src, + ))); + } + msg => { + // This is where most of the main thread's work gets done. + // Everything up to this point has been setting up the threading + // system which lets this logic work efficiently. + state = update( + state, + msg, + msg_tx.clone(), + &injector, + worker_listeners, + arena, + )?; + } } } - } - // The msg_rx receiver closed unexpectedly before we finished solving everything - Err(LoadingProblem::MsgChannelDied) - }) + // The msg_rx receiver closed unexpectedly before we finished solving everything + Err(LoadingProblem::MsgChannelDied) + }) + } .unwrap() } @@ -1127,9 +1210,24 @@ fn update<'a>( Ok(state) } FoundSpecializations { - module_id, procs, .. + module_id, + procs, + finished_info, + solved_subs, + ident_ids, + layout_cache, + problems: _, } => { - let found_specializations_module = FoundSpecializationsModule { module_id, procs }; + let subs = solved_subs.into_inner(); + + let found_specializations_module = FoundSpecializationsModule { + layout_cache, + module_id, + procs, + finished_info, + ident_ids, + subs, + }; state .module_cache @@ -1145,11 +1243,56 @@ fn update<'a>( enqueue_task(&injector, worker_listeners, task)? } - Ok(state) } - MadeSpecializations { .. } => { - todo!(); + MadeSpecializations { + module_id, + ident_ids, + subs, + finished_info, + procedures, + .. + } => { + println!("done specializing {:?}", module_id); + + state.procedures.extend(procedures); + dbg!(&state.procedures); + + let work = state + .dependencies + .notify(module_id, Phase::MakeSpecializations); + + if work.is_empty() + && state.dependencies.solved_all() + && state.goal_phase == Phase::MakeSpecializations + { + // state.timings.insert(module_id, module_timing); + + msg_tx + .send(Msg::FinishedAllSpecialization { + subs, + // TODO thread through mono problems + problems: vec![], + exposed_vars_by_symbol: finished_info.exposed_vars_by_symbol, + src: finished_info.src, + }) + .map_err(|_| LoadingProblem::MsgChannelDied)?; + + // bookkeeping + state.constrained_ident_ids.insert(module_id, ident_ids); + + // As far as type-checking goes, once we've solved + // the originally requested module, we're all done! + return Ok(state); + } else { + for (module_id, phase) in work { + let task = start_phase(module_id, phase, &mut state); + + enqueue_task(&injector, worker_listeners, task)? + } + } + + Ok(state) } Msg::FinishedAllTypeChecking { .. } => { unreachable!(); @@ -1160,6 +1303,47 @@ fn update<'a>( } } +fn finish_specialization<'a>( + mut state: State<'a>, + subs: Subs, + problems: Vec, + exposed_vars_by_symbol: Vec<(Symbol, Variable)>, + src: &'a str, +) -> MonomorphizedModule<'a> { + state.mono_problems.extend(problems); + + let module_ids = Arc::try_unwrap(state.arc_modules) + .unwrap_or_else(|_| panic!("There were still outstanding Arc references to module_ids")) + .into_inner() + .expect("Unwrapping mutex for module_ids"); + + let interns = Interns { + module_ids, + all_ident_ids: state.constrained_ident_ids, + }; + + let State { + mono_problems, + type_problems, + can_problems, + procedures, + .. + } = state; + + MonomorphizedModule { + can_problems, + mono_problems, + type_problems, + exposed_vars_by_symbol, + module_id: state.root_id, + subs, + interns, + procedures, + src: src.into(), + timings: state.timings, + } +} + fn finish<'a>( mut state: State<'a>, solved: Solved, @@ -1710,6 +1894,51 @@ fn ident_from_exposed(entry: &ExposesEntry<'_>) -> Ident { } } +#[allow(clippy::too_many_arguments)] +fn make_specializations<'a>( + arena: &'a Bump, + home: ModuleId, + mut ident_ids: IdentIds, + mut subs: Subs, + mut procs: Procs<'a>, + mut layout_cache: LayoutCache<'a>, + finished_info: FinishedInfo<'a>, +) -> Msg<'a> { + let mut mono_problems = Vec::new(); + // do the thing + let mut mono_env = roc_mono::ir::Env { + arena, + problems: &mut mono_problems, + subs: &mut subs, + home, + ident_ids: &mut ident_ids, + }; + + dbg!(&procs); + // TODO: for now this final specialization pass is sequential, + // with no parallelization at all. We should try to parallelize + // this, but doing so will require a redesign of Procs. + procs = roc_mono::ir::specialize_all( + &mut mono_env, + procs, + &mut layout_cache, + // &finished_info.vars_by_symbol, + ); + + let (procedures, _param_map) = procs.get_specialized_procs_help(mono_env.arena); + dbg!(&procedures); + + Msg::MadeSpecializations { + module_id: home, + ident_ids, + layout_cache, + procedures, + problems: mono_problems, + subs, + finished_info, + } +} + #[allow(clippy::too_many_arguments)] fn build_pending_specializations<'a>( arena: &'a Bump, @@ -1868,6 +2097,22 @@ fn run_task<'a>( layout_cache, finished_info, )), + MakeSpecializations { + module_id, + ident_ids, + subs, + procs, + layout_cache, + finished_info, + } => Ok(make_specializations( + arena, + module_id, + ident_ids, + subs, + procs, + layout_cache, + finished_info, + )), }?; msg_tx