Move lots of things into Env

This commit is contained in:
Richard Feldman 2020-07-28 21:43:00 -04:00
parent 4ebe36fa4a
commit 931c558b5a

View file

@ -1,4 +1,7 @@
use bumpalo::Bump;
use crossbeam::channel::{unbounded, Receiver, Sender};
use crossbeam::deque::{Injector, Steal, Stealer, Worker};
use crossbeam::thread::{self, Scope};
use roc_builtins::std::{Mode, StdLib};
use roc_can::constraint::Constraint;
use roc_can::def::Declaration;
@ -12,8 +15,6 @@ use roc_module::ident::{Ident, ModuleName};
use roc_module::symbol::{IdentIds, Interns, ModuleId, ModuleIds, Symbol};
use roc_parse::ast::{self, Attempting, ExposesEntry, ImportsEntry};
use roc_parse::module::module_defs;
use crossbeam::deque::{Injector, Steal, Stealer, Worker};
use std::iter;
use roc_parse::parser::{Fail, Parser, State};
use roc_region::all::{Located, Region};
use roc_solve::module::SolvedModule;
@ -23,11 +24,10 @@ use roc_types::subs::{Subs, VarStore, Variable};
use std::collections::{HashMap, HashSet};
use std::fs;
use std::io;
use std::iter;
use std::path::{Path, PathBuf};
use std::str::from_utf8_unchecked;
use std::sync::{Arc, Mutex};
use tokio::sync::mpsc;
use tokio::task::spawn_blocking;
/// Filename extension for normal Roc modules
const ROC_FILE_EXTENSION: &str = "roc";
@ -47,11 +47,6 @@ pub struct LoadedModule {
pub src: Box<str>,
}
#[derive(Debug, Clone)]
struct Env {
pub src_dir: PathBuf,
}
#[derive(Debug)]
pub enum BuildProblem<'a> {
FileNotFound(&'a Path),
@ -112,8 +107,8 @@ enum MaybeShared<'a, 'b, A, B> {
type SharedModules<'a, 'b> = MaybeShared<'a, 'b, ModuleIds, IdentIdsByModule>;
type IdentIdsByModule = MutMap<ModuleId, IdentIds>;
type MsgSender = mpsc::Sender<Msg>;
type MsgReceiver = mpsc::Receiver<Msg>;
type MsgSender = Sender<Msg>;
type MsgReceiver = Receiver<Msg>;
/// The loading process works like this, starting from the given filename (e.g. "main.roc"):
///
@ -167,7 +162,7 @@ pub fn load<'a>(
) -> Result<LoadedModule, LoadingProblem> {
use self::MaybeShared::*;
let (msg_tx, msg_rx): (MsgSender, MsgReceiver) = mpsc::channel(1024);
let (msg_tx, msg_rx): (MsgSender, MsgReceiver) = unbounded();
let mut module_ids = ModuleIds::default();
let mut root_exposed_ident_ids: IdentIdsByModule = IdentIds::exposed_builtins(0);
@ -178,10 +173,80 @@ pub fn load<'a>(
Unique(&mut module_ids, &mut root_exposed_ident_ids),
)?;
load_deps(root_id, msg_tx, msg_rx, stdlib, src_dir, module_ids, root_exposed_ident_ids, exposed_types).await
load_deps(
root_id,
msg_tx,
msg_rx,
stdlib,
src_dir,
module_ids,
root_exposed_ident_ids,
exposed_types,
)
}
async fn load_deps<'a>(
#[derive(Debug)]
struct Env {
pub root_id: ModuleId,
pub src_dir: PathBuf,
pub exposed_types: SubsByModule,
pub can_problems: Vec<roc_problem::can::Problem>,
pub headers_parsed: MutSet<ModuleId>,
pub type_problems: Vec<solve::TypeError>,
/// This is the "final" list of IdentIds, after canonicalization and constraint gen
/// have completed for a given module.
pub constrained_ident_ids: MutMap<ModuleId, IdentIds>,
/// From now on, these will be used by multiple threads; time to make an Arc<Mutex<_>>!
pub arc_modules: Arc<Mutex<ModuleIds>>,
pub ident_ids_by_module: Arc<Mutex<IdentIdsByModule>>,
/// All the dependent modules we've already begun loading -
/// meaning we should never kick off another load_module on them!
pub loading_started: MutSet<ModuleId>,
pub declarations_by_id: MutMap<ModuleId, Vec<Declaration>>,
pub exposed_symbols_by_module: MutMap<ModuleId, MutSet<Symbol>>,
/// Modules which are waiting for certain headers to be parsed
pub waiting_for_headers: MutMap<ModuleId, MutSet<ModuleId>>,
// When the key ModuleId gets solved, iterate through each of the given modules
// and remove that ModuleId from the appropriate waiting_for_headers entry.
// If the relevant module's waiting_for_headers entry is now empty, canonicalize the module.
pub header_listeners: MutMap<ModuleId, Vec<ModuleId>>,
pub unparsed_modules: MutMap<ModuleId, ModuleHeader>,
// Modules which are waiting for certain deps to be solved
pub waiting_for_solve: MutMap<ModuleId, MutSet<ModuleId>>,
// When the key ModuleId gets solved, iterate through each of the given modules
// and remove that ModuleId from the appropriate waiting_for_solve entry.
// If the relevant module's waiting_for_solve entry is now empty, solve the module.
pub solve_listeners: MutMap<ModuleId, Vec<ModuleId>>,
#[allow(clippy::type_complexity)]
pub unsolved_modules: MutMap<
ModuleId,
(Module, Box<str>, MutSet<ModuleId>, Constraint, VarStore),
>,
/// Idle worker threads will try to take from this first, and if it's empty,
/// steal from each other.
pub main_task_queue: Injector<BuildTask>,
}
enum BuildTask {
NoOp
}
fn load_deps<'a>(
root_id: ModuleId,
msg_tx: MsgSender,
mut msg_rx: MsgReceiver,
@ -191,70 +256,92 @@ async fn load_deps<'a>(
root_exposed_ident_ids: IdentIdsByModule,
mut exposed_types: SubsByModule,
) -> Result<LoadedModule, LoadingProblem> {
use self::MaybeShared::*;
let mut type_problems = Vec::new();
let mut can_problems = Vec::new();
thread::scope(|scope| {
let mut headers_parsed = MutSet::default();
// We've already parsed the root's header. (But only its header, so far.)
headers_parsed.insert(root_id);
let env = Env {
src_dir: src_dir.clone(),
};
let mut loading_started = MutSet::default();
// This is the "final" list of IdentIds, after canonicalization and constraint gen
// have completed for a given module.
let mut constrained_ident_ids = IdentIds::exposed_builtins(0);
// From now on, these will be used by multiple threads; time to make an Arc<Mutex<_>>!
let arc_modules = Arc::new(Mutex::new(module_ids));
let ident_ids_by_module: Arc<Mutex<IdentIdsByModule>> =
Arc::new(Mutex::new(root_exposed_ident_ids));
// All the dependent modules we've already begun loading -
// meaning we should never kick off another load_module on them!
let mut loading_started: MutSet<ModuleId> = MutSet::default();
// If the root module we're compiling happens to be an interface,
// If the root module we're still processing happens to be an interface,
// it's possible that something else will import it. That will
// necessarily cause a cyclic import error, but in the meantime
// we still shouldn't load it.
loading_started.insert(root_id);
// The declarations we'll ultimately be returning
let mut declarations_by_id: MutMap<ModuleId, Vec<Declaration>> = MutMap::default();
let env = Env {
root_id,
src_dir,
exposed_types,
headers_parsed,
loading_started,
can_problems: Vec::new(),
type_problems: Vec::new(),
arc_modules: Arc::new(Mutex::new(module_ids)),
constrained_ident_ids: IdentIds::exposed_builtins(0),
ident_ids_by_module: Arc::new(Mutex::new(root_exposed_ident_ids)),
declarations_by_id: MutMap::default(),
exposed_symbols_by_module: MutMap::default(),
waiting_for_headers: MutMap::default(),
header_listeners: MutMap::default(),
unparsed_modules: MutMap::default(),
waiting_for_solve: MutMap::default(),
solve_listeners: MutMap::default(),
unsolved_modules: MutMap::default(),
main_task_queue: Injector::new()
};
let mut exposed_symbols_by_module: MutMap<ModuleId, MutSet<Symbol>> = MutMap::default();
// The root module will have already queued up messages to process,
// and processing those messages will in turn queue up more messages.
while let Ok(msg) = msg_rx.recv() {
match update(env, stdlib, msg_tx, msg_rx, msg) {
Ok(Some(loaded_module)) => {
// We're done!
return Ok(loaded_module);
}
Ok(None) => {
// We have more messages to process. Keep waiting!
}
Err(loading_problem) => {
// We hit an unrecoverable LoadingProblem. Bail out!
return Err(loading_problem);
}
}
}
// Modules which are waiting for certain headers to be parsed
let mut waiting_for_headers: MutMap<ModuleId, MutSet<ModuleId>> = MutMap::default();
// The msg_rx receiver closed unexpectedly before we finished solving everything
Err(LoadingProblem::MsgChannelDied)
})
.unwrap()
}
// When the key ModuleId gets solved, iterate through each of the given modules
// and remove that ModuleId from the appropriate waiting_for_headers entry.
// If the relevant module's waiting_for_headers entry is now empty, canonicalize the module.
let mut header_listeners: MutMap<ModuleId, Vec<ModuleId>> = MutMap::default();
let mut unparsed_modules: MutMap<ModuleId, ModuleHeader> = MutMap::default();
// Modules which are waiting for certain deps to be solved
let mut waiting_for_solve: MutMap<ModuleId, MutSet<ModuleId>> = MutMap::default();
// When the key ModuleId gets solved, iterate through each of the given modules
// and remove that ModuleId from the appropriate waiting_for_solve entry.
// If the relevant module's waiting_for_solve entry is now empty, solve the module.
let mut solve_listeners: MutMap<ModuleId, Vec<ModuleId>> = MutMap::default();
#[allow(clippy::type_complexity)]
let mut unsolved_modules: MutMap<
ModuleId,
(Module, Box<str>, MutSet<ModuleId>, Constraint, VarStore),
> = MutMap::default();
// Parse and canonicalize the module's deps
while let Some(msg) = msg_rx.recv().await {
fn update(mut env: Env, stdlib: &StdLib, msg_tx: MsgSender, mut msg_rx: MsgReceiver, msg: Msg) -> Result<Option<LoadedModule>, LoadingProblem> {
use self::MaybeShared::*;
use self::Msg::*;
let Env {
root_id,
src_dir,
exposed_types,
can_problems,
type_problems,
headers_parsed,
arc_modules,
constrained_ident_ids,
ident_ids_by_module,
loading_started,
declarations_by_id,
exposed_symbols_by_module,
waiting_for_headers,
header_listeners,
unparsed_modules,
waiting_for_solve,
solve_listeners,
unsolved_modules,
main_task_queue,
} = env;
match msg {
Header(header) => {
let home = header.module_id;
@ -324,17 +411,15 @@ async fn load_deps<'a>(
// we actually start loading it.
loading_started.insert(*dep_id);
let env = env.clone();
let msg_tx = msg_tx.clone();
let dep_name = dep_name.clone();
// Provide mutexes of ModuleIds and IdentIds by module,
// so other modules can populate them as they load.
let shared =
Shared(Arc::clone(&arc_modules), Arc::clone(&ident_ids_by_module));
let shared = Shared(Arc::clone(&arc_modules), Arc::clone(&ident_ids_by_module));
// Start loading this module in the background.
spawn_blocking(move || load_module(env, dep_name, msg_tx, shared));
spawn_blocking(move || load_module(src_dir.as_path(), dep_name, msg_tx, shared));
}
}
@ -443,9 +528,6 @@ async fn load_deps<'a>(
type_problems.extend(solved_module.problems);
if module_id == root_id {
// Once we've solved the originally requested module, we're done!
msg_rx.close();
let solved = Arc::try_unwrap(solved_subs).unwrap_or_else(|_| {
panic!("There were still outstanding Arc references to Solved<Subs>")
});
@ -462,7 +544,7 @@ async fn load_deps<'a>(
all_ident_ids: constrained_ident_ids,
};
return Ok(LoadedModule {
return Ok(Some(LoadedModule {
module_id: root_id,
interns,
solved,
@ -471,16 +553,13 @@ async fn load_deps<'a>(
declarations_by_id,
exposed_vars_by_symbol: solved_module.exposed_vars_by_symbol,
src,
});
}));
} else {
// This was a dependency. Write it down and keep processing messages.
debug_assert!(!exposed_types.contains_key(&module_id));
exposed_types.insert(
module_id,
ExposedModuleTypes::Valid(
solved_module.solved_types,
solved_module.aliases,
),
ExposedModuleTypes::Valid(solved_module.solved_types, solved_module.aliases),
);
// Notify all the listeners that this solved.
@ -517,22 +596,22 @@ async fn load_deps<'a>(
}
}
}
}
// The msg_rx receiver closed unexpectedly before we finished solving everything
Err(LoadingProblem::MsgChannelDied)
// We did some work but haven't finish loading the module yet.
// If we had, this would have been Ok(Some(loaded_module)) instead.
Ok(None)
}
/// Load a module by its module name, rather than by its filename
fn load_module(
env: Env,
src_dir: &Path,
module_name: ModuleName,
msg_tx: MsgSender,
module_ids: SharedModules<'_, '_>,
) -> Result<ModuleId, LoadingProblem> {
let mut filename = PathBuf::new();
filename.push(env.src_dir);
filename.push(src_dir);
// Convert dots in module name to directories
for part in module_name.as_str().split(MODULE_SEPARATOR) {
@ -553,17 +632,14 @@ fn load_module(
/// corresponding Stealer, which can steal from it. Stealers can be shared across threads.)
///
/// Based on https://docs.rs/crossbeam/0.7.3/crossbeam/deque/index.html#examples
fn find_task<T>(
local: &Worker<T>,
global: &Injector<T>,
stealers: &[Stealer<T>],
) -> Option<T> {
fn find_task<T>(local: &Worker<T>, global: &Injector<T>, stealers: &[Stealer<T>]) -> Option<T> {
// Pop a task from the local queue, if not empty.
local.pop().or_else(|| {
// Otherwise, we need to look for a task elsewhere.
iter::repeat_with(|| {
// Try stealing a task from the global queue.
global.steal()
global
.steal()
// Or try stealing a task from one of the other threads.
.or_else(|| stealers.iter().map(|s| s.steal()).collect())
})