Get parallel loading working

This commit is contained in:
Richard Feldman 2019-11-30 17:33:09 -05:00
parent 997b6ec4ad
commit 787d76b36a
3 changed files with 307 additions and 177 deletions

View file

@ -13,6 +13,7 @@ use self::problem::RuntimeError::*;
use self::procedure::References;
use self::scope::Scope;
use self::symbol::Symbol;
use crate::can::pattern::PatternType;
use crate::collections::{ImMap, ImSet, MutMap, MutSet};
use crate::constrain::{self, exists};
use crate::graph::{strongly_connected_component, topological_sort};
@ -26,10 +27,12 @@ use crate::types::Expected::{self, *};
use crate::types::Type::{self, *};
use crate::types::{LetConstraint, PExpected, PReason, Reason};
use bumpalo::Bump;
use im::Vector;
use std::fmt::Debug;
pub mod env;
pub mod expr;
pub mod module;
pub mod num;
pub mod operator;
pub mod pattern;
@ -44,6 +47,117 @@ pub mod symbol;
/// map so that expressions within that annotation can share these vars.
type Rigids = ImMap<Box<str>, Type>;
pub fn canonicalize_module_defs<'a>(
arena: &Bump,
loc_defs: bumpalo::collections::Vec<'a, Located<Def<'a>>>,
home: Box<str>,
scope: &mut ImMap<Box<str>, (Symbol, Region)>,
) -> Vector<(Located<Pattern>, Located<Expr>)> {
// TODO FIXME need to remove Subs from this - distribute Variables but don't make Subs yet!
let mut subs = Subs::new();
let mut buf = Vector::new();
for loc_def in loc_defs {
buf.push_back(canonicalize_def(
arena,
loc_def.value,
loc_def.region,
home.clone(),
scope,
&mut subs,
));
}
buf
}
fn canonicalize_def<'a>(
arena: &Bump,
def: Def<'a>,
region: Region,
home: Box<str>,
scope: &mut ImMap<Box<str>, (Symbol, Region)>,
// TODO FIXME need to remove Subs from this - distribute Variables but don't make Subs yet!
subs: &mut Subs,
) -> (Located<Pattern>, Located<Expr>) {
match def {
Def::Annotation(_loc_pattern, _loc_ann) => {
panic!("TODO canonicalize top-level annotations");
}
Def::Body(loc_pattern, loc_expr) => {
let variable = subs.mk_flex_var();
let expected = Expected::NoExpectation(Type::Variable(variable));
let declared_idents = ImMap::default(); // TODO FIXME infer this from scope arg
let declared_variants = ImMap::default(); // TODO get rid of this
let name: Box<str> = "TODOfixme".into();
// Desugar operators (convert them to Apply calls, taking into account
// operator precedence and associativity rules), before doing other canonicalization.
//
// If we did this *during* canonicalization, then each time we
// visited a BinOp node we'd recursively try to apply this to each of its nested
// operators, and then again on *their* nested operators, ultimately applying the
// rules multiple times unnecessarily.
let loc_expr = operator::desugar(arena, &loc_expr);
// If we're canonicalizing the declaration `foo = ...` inside the `Main` module,
// scope_prefix will be "Main.foo$" and its first closure will be named "Main.foo$0"
let scope_prefix = format!("{}.{}$", home, name).into();
let mut scope = Scope::new(scope_prefix, declared_idents.clone());
let mut env = Env::new(home, declared_variants.clone());
let (loc_expr, _) = canonicalize_expr(
&ImMap::default(),
&mut env,
subs,
&mut scope,
region,
&loc_expr.value,
expected,
);
// Exclude the current ident from shadowable_idents; you can't shadow yourself!
// (However, still include it in scope, because you *can* recursively refer to yourself.)
let mut shadowable_idents = scope.idents.clone();
remove_idents(&loc_pattern.value, &mut shadowable_idents);
let pattern_var = subs.mk_flex_var();
let pattern_type = Type::Variable(pattern_var);
let pattern_expected = PExpected::NoExpectation(pattern_type.clone());
let mut pattern_state = PatternState {
headers: ImMap::default(),
vars: Vec::with_capacity(1),
constraints: Vec::with_capacity(1),
};
let loc_pattern = canonicalize_pattern(
&mut env,
&mut pattern_state,
subs,
&mut scope,
PatternType::TopLevelDef,
&loc_pattern.value,
loc_pattern.region,
&mut shadowable_idents,
pattern_expected,
);
(loc_pattern, loc_expr)
}
Def::CustomType(_, _) => {
panic!("TODO remove CustomType syntax");
}
Def::TypeAlias(_, _) => {
panic!("TODO remove TypeAlias syntax");
}
// Ignore spaces
Def::SpaceBefore(def, _) | Def::SpaceAfter(def, _) => {
// TODO FIXME performance disaster!!!
canonicalize_def(arena, def.clone(), region, home, scope, subs)
}
}
}
// TODO trim down these arguments
#[allow(clippy::too_many_arguments)]
pub fn canonicalize_declaration<'a>(

View file

@ -1,25 +1,28 @@
use crate::can::symbol::Symbol;
use crate::collections::{SendMap, SendSet};
use crate::ident::UnqualifiedIdent;
use crate::can::expr::Expr;
use crate::can::pattern::Pattern;
use crate::can::canonicalize_module_defs;
use crate::collections::{SendSet, ImMap};
use crate::module::ModuleName;
use crate::parse::ast::{Attempting, Def, ExposesEntry, ImportsEntry, Module};
use crate::parse::module;
use crate::parse::ast::{self, Attempting, ExposesEntry, ImportsEntry};
use crate::parse::module::{self, module_defs};
use crate::parse::parser::{Fail, Parser, State};
use crate::region::{Located, Region};
use im::Vector;
use bumpalo::Bump;
use tokio::fs::read_to_string;
use tokio::sync::mpsc::{self, Sender, Receiver};
use std::io;
use std::path::{Path, PathBuf};
use tokio::prelude::*;
use crate::can::module::Module;
use futures::future::join_all;
pub struct Loaded<'a> {
pub requested_header: LoadedHeader,
pub dependent_headers: SendMap<ModuleName<'a>, LoadedHeader>,
pub defs: SendMap<ModuleName<'a>, Result<Vector<Located<Def<'a>>>, Fail>>,
pub struct Loaded {
pub requested_module: LoadedModule,
pub deps: Deps,
}
#[derive(Debug, Clone)]
struct Env {
pub src_dir: PathBuf
}
@ -29,54 +32,62 @@ pub enum BuildProblem<'a> {
FileNotFound(&'a Path),
}
#[derive(Debug, PartialEq, Eq)]
pub enum LoadedHeader {
Valid {
declared_name: Option<Box<str>>,
deps: SendSet<Box<str>>,
scope: SendMap<Box<str>, (Symbol, Region)>,
bytes_parsed: usize
},
type Deps = SendSet<Box<str>>;
#[derive(Debug, PartialEq)]
pub enum LoadedModule {
Valid(Module),
FileProblem(io::ErrorKind),
ParsingFailed(Fail),
}
pub async fn load<'a>(src_dir: PathBuf, filename: PathBuf) -> Loaded<'a> {
let handle = tokio::spawn(async move {
let mut env = Env {
src_dir
};
pub async fn load<'a>(src_dir: PathBuf, filename: PathBuf) -> Loaded {
let env = Env { src_dir: src_dir.clone() };
let (tx, mut rx): (Sender<Deps>, Receiver<Deps>) = mpsc::channel(1024);
load_filename(&mut env, &filename).await
let main_tx = tx.clone();
let handle = tokio::spawn(async move {
load_filename(&env, &filename, main_tx).await
});
let requested_header = handle.await;
let requested_module = handle.await.expect("Unable to load requested module.");
let mut other_modules = Vec::new();
let mut all_deps = SendSet::default();
panic!("TODO");
// Get a fresh env, since the previous one has been consumed
let env = Env { src_dir };
// At first, 1 module is pending (namely the `filename` one).
let mut pending = 1;
while let Some(module_deps) = rx.recv().await {
let deps_to_load = module_deps.relative_complement(all_deps.clone());
// // TODO parse defs on a different thread, and parse them
// // directly into a Vector rather than into a Vec, so
// // we don't have to reallocate here.
// let defs = match module::module_defs().parse(&arena, state) {
// Ok((defs, _)) => {
// let mut send_vec = Vector::new();
// We just loaded 1 module, and gained deps_to_load more
pending = pending + deps_to_load.len() - 1;
// for def in defs {
// send_vec.push_back(def);
// }
// Record that these are loaded *before* spawning threads to load them.
all_deps = all_deps.union(deps_to_load.clone());
// Ok(send_vec)
// },
// Err((fail, _)) => Err(fail),
// };
let loaded_modules = join_all(deps_to_load.into_iter().map(|dep|{
let env = env.clone();
let tx = tx.clone();
// Loaded {
// requested_header,
// dependent_headers: env.loaded_headers,
// defs,
// problems: env.problems,
// }
tokio::spawn(async move {
load_module(&env, dep, tx).await
})
})).await;
for module in loaded_modules {
other_modules.push(module.expect("Unable to load dependent module"));
}
// Once we've run out of pending modules to process, we're done!
if pending == 0 {
break;
}
}
Loaded { requested_module, deps: all_deps }
}
/// The long-term plan is for the loading process to work like this, starting from main.roc:
@ -124,31 +135,31 @@ pub async fn load<'a>(src_dir: PathBuf, filename: PathBuf) -> Loaded<'a> {
/// module's canonicalization.
///
/// If a given import has not been loaded yet, load it too.
// fn load_module<'a, 'p>(env: &mut Env<'a, 'p>, module_name: &ModuleName<'a>) -> LoadedHeader<'a> {
// // 1. Convert module_name to filename, using src_dir.
// // 2. Open that file for reading. (If there's a problem, record it and bail.)
// // 3. Read the whole file into a string. (In the future, we can read just the header.)
// // 4. Parse the header.
// // 5. Use the parsed header to load more modules as necessary.
// // 6. Now that all the headers have been parsed, parse the bodies too.
// // 7. Once all the bodies have been parsed, canonicalize beginning with the leaves.
async fn load_module(env: &Env, module_name: Box<str>, tx: Sender<Deps>) -> LoadedModule {
// 1. Convert module_name to filename, using src_dir.
// 2. Open that file for reading. (If there's a problem, record it and bail.)
// 3. Read the whole file into a string. (In the future, we can read just the header.)
// 4. Parse the header.
// 5. Use the parsed header to load more modules as necessary.
// 6. Now that all the headers have been parsed, parse the bodies too.
// 7. Once all the bodies have been parsed, canonicalize beginning with the leaves.
// let mut filename = PathBuf::new();
let mut filename = PathBuf::new();
// filename.push(env.src_dir);
filename.push(env.src_dir.clone());
// // Convert dots in module name to directories
// for part in module_name.as_str().split('.') {
// filename.push(part);
// }
// Convert dots in module name to directories
for part in module_name.split('.') {
filename.push(part);
}
// // End with .roc
// filename.set_extension("roc");
// End with .roc
filename.set_extension("roc");
// load_filename(env, &filename)
// }
load_filename(env, &filename, tx).await
}
async fn load_filename(env: &mut Env, filename: &Path) -> LoadedHeader {
async fn load_filename(env: &Env, filename: &Path, tx: Sender<Deps>) -> LoadedModule {
match read_to_string(filename).await {
Ok(src) => {
let arena = Bump::new();
@ -161,49 +172,74 @@ async fn load_filename(env: &mut Env, filename: &Path) -> LoadedHeader {
let state = State::new(&src, Attempting::Module);
let answer = match module::module().parse(&arena, state) {
Ok((Module::Interface { header }, state)) => {
let declared_name = Some(header.name.value.as_str().into());
Ok((ast::Module::Interface { header }, state)) => {
let declared_name: Box<str> = header.name.value.as_str().into();
let mut scope = SendMap::default();
// TODO check to see if declared_name is consistent with filename.
// If it isn't, report a problem!
let mut scope = ImMap::default();
let mut deps = SendSet::default();
for loc_entry in header.imports {
deps.insert(load_import(env, loc_entry.region, &loc_entry.value, &mut scope));
}
let bytes_parsed = state.bytes_consumed();
tokio::spawn(async move {
let mut tx = tx;
LoadedHeader::Valid { scope, declared_name, deps, bytes_parsed }
// Send the deps to the main thread for processing,
// then continue on to parsing and canonicalizing defs.
tx.send(deps).await.unwrap();
});
let defs = parse_and_canonicalize_defs(&arena, state, declared_name.clone(), &mut scope);
let module = Module { name: Some(declared_name), defs };
LoadedModule::Valid(module)
}
Ok((Module::App { header }, state)) => {
// The app module has no declared name.
let declared_name = None;
let mut scope = SendMap::default();
Ok((ast::Module::App { header }, state)) => {
let mut scope = ImMap::default();
let mut deps = SendSet::default();
for loc_entry in header.imports {
deps.insert(load_import(env, loc_entry.region, &loc_entry.value, &mut scope));
}
let bytes_parsed = state.bytes_consumed();
tokio::spawn(async move {
let mut tx = tx;
LoadedHeader::Valid { scope, declared_name, deps, bytes_parsed }
// Send the deps to the main thread for processing,
// then continue on to parsing and canonicalizing defs.
tx.send(deps).await.unwrap();
});
// The app module has no declared name. Pass it as "".
let defs = parse_and_canonicalize_defs(&arena, state, "".into(), &mut scope);
let module = Module { name: None, defs };
LoadedModule::Valid(module)
}
Err((fail, _)) => LoadedHeader::ParsingFailed(fail),
Err((fail, _)) => LoadedModule::ParsingFailed(fail),
};
answer
}
Err(err) => LoadedHeader::FileProblem(err.kind()),
Err(err) => LoadedModule::FileProblem(err.kind()),
}
}
fn load_import<'a, 'p, 'out>(
env: &mut Env,
fn parse_and_canonicalize_defs(arena: &Bump, state: State<'_>, home: Box<str>, scope: &mut ImMap<Box<str>, (Symbol, Region)>) -> Vector<(Located<Pattern>, Located<Expr>)> {
let (parsed_defs, _) = module_defs().parse(arena, state).expect("TODO gracefully handle parse error on module defs");
canonicalize_module_defs(arena, parsed_defs, home, scope)
}
fn load_import(
env: &Env,
region: Region,
entry: &ImportsEntry<'_>,
scope: &mut SendMap<Box<str>, (Symbol, Region)>,
scope: &mut ImMap<Box<str>, (Symbol, Region)>,
) -> Box<str> {
use crate::parse::ast::ImportsEntry::*;
@ -225,7 +261,7 @@ fn load_import<'a, 'p, 'out>(
}
}
fn expose<'out>(
fn expose(
module_name: ModuleName<'_>,
entry: &ExposesEntry<'_>,
region: Region,
@ -245,30 +281,3 @@ fn expose<'out>(
}
}
}
#[test]
fn test_tokio() {
test_async(async {
let handle = tokio::spawn(async {
println!("doing some work, asynchronously");
// Return a value for the example
"result of the computation"
});
// Wait for the spawned task to finish
let res = handle.await;
println!("got {:?}", res);
})
}
fn test_async<F: std::future::Future>(future: F) -> F::Output {
use tokio::runtime::Runtime;
// Create the runtime
let mut rt = Runtime::new().expect("Error initializing Tokio runtime.");
// Spawn the root task
rt.block_on(future)
}

View file

@ -12,86 +12,93 @@ mod helpers;
#[cfg(test)]
mod test_load {
use crate::helpers::{fixtures_dir, im_map_from_pairs, mut_map_from_pairs};
use bumpalo::Bump;
use roc::can::symbol::Symbol;
use roc::ident::UnqualifiedIdent;
use roc::load::LoadedHeader::*;
use roc::load::{load, LoadedHeader};
use roc::module::ModuleName;
use roc::region::Region;
use roc::load::load;
// use roc::region::Region;
fn test_async<F: std::future::Future>(future: F) -> F::Output {
use tokio::runtime::Runtime;
// Create the runtime
let mut rt = Runtime::new().expect("Error initializing Tokio runtime.");
// Spawn the root task
rt.block_on(future)
}
#[test]
fn interface_with_deps() {
let src_dir = fixtures_dir().join("interface_with_deps");
let filename = src_dir.join("Primary.roc");
let arena = Bump::new();
let loaded = load(&arena, &src_dir, &filename);
assert!(loaded.problems.is_empty());
test_async(async {
let loaded = load(src_dir, filename).await;
});
let dep1_scope = im_map_from_pairs(vec![(
UnqualifiedIdent::new("foo"),
(Symbol::new("Dep3.Blah.", "foo"), Region::new(2, 2, 26, 29)),
)]);
let dep2_scope = im_map_from_pairs(vec![
(
UnqualifiedIdent::new("bar"),
(Symbol::new("Dep3.Blah.", "bar"), Region::new(2, 2, 31, 34)),
),
(
UnqualifiedIdent::new("foo"),
(Symbol::new("Dep3.Blah.", "foo"), Region::new(2, 2, 26, 29)),
),
]);
let dep3_scope = im_map_from_pairs(vec![]);
// assert!(loaded.problems.is_empty());
assert_eq!(
loaded.dependent_headers,
mut_map_from_pairs(vec![
(ModuleName::new("Dep1"), Valid { scope: dep1_scope }),
(ModuleName::new("Dep3.Blah"), Valid { scope: dep3_scope }),
(ModuleName::new("Dep2"), Valid { scope: dep2_scope }),
])
);
// let dep1_scope = im_map_from_pairs(vec![(
// UnqualifiedIdent::new("foo"),
// (Symbol::new("Dep3.Blah.", "foo"), Region::new(2, 2, 26, 29)),
// )]);
// let dep2_scope = im_map_from_pairs(vec![
// (
// UnqualifiedIdent::new("bar"),
// (Symbol::new("Dep3.Blah.", "bar"), Region::new(2, 2, 31, 34)),
// ),
// (
// UnqualifiedIdent::new("foo"),
// (Symbol::new("Dep3.Blah.", "foo"), Region::new(2, 2, 26, 29)),
// ),
// ]);
// let dep3_scope = im_map_from_pairs(vec![]);
assert_eq!(loaded.defs.len(), 4);
// assert_eq!(
// loaded.dependent_headers,
// mut_map_from_pairs(vec![
// (ModuleName::new("Dep1"), Valid { scope: dep1_scope }),
// (ModuleName::new("Dep3.Blah"), Valid { scope: dep3_scope }),
// (ModuleName::new("Dep2"), Valid { scope: dep2_scope }),
// ])
// );
let defs = loaded
.defs
.get(&ModuleName::new("Primary"))
.expect("No defs found for `Primary` module")
.clone()
.expect("Defs failed to parse for `Primary` module");
// assert_eq!(loaded.defs.len(), 4);
assert_eq!(
dbg!(/* problem: module_defs() only parses 1 module - TODO add parsing unit test for it!*/ defs)
.len(),
6
);
// let defs = loaded
// .defs
// .get(&ModuleName::new("Primary"))
// .expect("No defs found for `Primary` module")
// .clone()
// .expect("Defs failed to parse for `Primary` module");
match loaded.requested_header {
LoadedHeader::Valid { scope } => assert_eq!(
scope,
im_map_from_pairs(vec![
(
UnqualifiedIdent::new("bar"),
(Symbol::new("Dep3.Blah.", "bar"), Region::new(2, 2, 51, 54)),
),
(
UnqualifiedIdent::new("foo"),
(Symbol::new("Dep2.", "foo"), Region::new(2, 2, 32, 35)),
),
(
UnqualifiedIdent::new("two"),
(Symbol::new("Dep2.", "two"), Region::new(2, 2, 27, 30)),
),
])
),
// assert_eq!(
// dbg!(/* problem: module_defs() only parses 1 module - TODO add parsing unit test for it!*/ defs)
// .len(),
// 6
// );
other => panic!(
"app_header should have been Valid, but instead was: {:?}",
other
),
};
// match loaded.requested_header {
// LoadedHeader::Valid { scope } => assert_eq!(
// scope,
// im_map_from_pairs(vec![
// (
// UnqualifiedIdent::new("bar"),
// (Symbol::new("Dep3.Blah.", "bar"), Region::new(2, 2, 51, 54)),
// ),
// (
// UnqualifiedIdent::new("foo"),
// (Symbol::new("Dep2.", "foo"), Region::new(2, 2, 32, 35)),
// ),
// (
// UnqualifiedIdent::new("two"),
// (Symbol::new("Dep2.", "two"), Region::new(2, 2, 27, 30)),
// ),
// ])
// ),
// other => panic!(
// "app_header should have been Valid, but instead was: {:?}",
// other
// ),
// };
}
}