Make node graph execution async

Make node macro generate async node implementations

Start propagating async through the node system

Async checkpoint

Make Any<'i> Send + Sync

Determine node io type using panic node

Fix types for raster_node macro

Finish porting node registry?

Fix lifetime errors

Remove Send + Sync requirements and start making node construction async

Async MVP

Fix tests

Clippy fix
This commit is contained in:
Dennis Kobert 2023-05-12 19:12:00 +02:00 committed by Keavon Chambers
parent 8d778e4848
commit 259078c847
30 changed files with 545 additions and 239 deletions

View file

@ -59,6 +59,8 @@ node-macro = { path = "../node-macro" }
boxcar = "0.1.0"
xxhash-rust = { workspace = true }
serde_json = "1.0.96"
reqwest = { version = "0.11.17", features = ["rustls", "rustls-tls"] }
futures = "0.3.28"
[dependencies.serde]
version = "1.0"

View file

@ -1,9 +1,12 @@
use dyn_any::StaticType;
pub use graph_craft::proto::{Any, TypeErasedNode, TypeErasedPinned, TypeErasedPinnedRef};
use graph_craft::proto::{DynFuture, FutureAny};
use graphene_core::NodeIO;
pub use graphene_core::{generic, ops, Node};
use std::marker::PhantomData;
use crate::http::EvalSyncNode;
pub struct DynAnyNode<I, O, Node> {
node: Node,
_i: PhantomData<I>,
@ -12,18 +15,20 @@ pub struct DynAnyNode<I, O, Node> {
impl<'input, _I: 'input + StaticType, _O: 'input + StaticType, N: 'input, S0: 'input> Node<'input, Any<'input>> for DynAnyNode<_I, _O, S0>
where
N: for<'any_input> Node<'any_input, _I, Output = _O>,
N: for<'any_input> Node<'any_input, _I, Output = DynFuture<'any_input, _O>>,
S0: for<'any_input> Node<'any_input, (), Output = &'any_input N>,
{
type Output = Any<'input>;
type Output = FutureAny<'input>;
#[inline]
fn eval(&'input self, input: Any<'input>) -> Self::Output {
let node = self.node.eval(());
{
let node_name = core::any::type_name::<N>();
let input: Box<_I> = dyn_any::downcast(input).unwrap_or_else(|e| panic!("DynAnyNode Input, {0} in:\n{1}", e, node_name));
Box::new(node.eval(*input))
}
let node_name = core::any::type_name::<N>();
let input: Box<_I> = dyn_any::downcast(input).unwrap_or_else(|e| panic!("DynAnyNode Input, {0} in:\n{1}", e, node_name));
let output = async move {
let result = node.eval(*input).await;
Box::new(result) as Any<'input>
};
Box::pin(output)
}
fn reset(self: std::pin::Pin<&mut Self>) {
@ -56,11 +61,13 @@ impl<'input, _I: 'input + StaticType, _O: 'input + StaticType, N: 'input> Node<'
where
N: for<'any_input> Node<'any_input, _I, Output = &'any_input _O>,
{
type Output = Any<'input>;
type Output = FutureAny<'input>;
fn eval(&'input self, input: Any<'input>) -> Self::Output {
let node_name = core::any::type_name::<N>();
let input: Box<_I> = dyn_any::downcast(input).unwrap_or_else(|e| panic!("DynAnyRefNode Input, {e} in:\n{node_name}"));
Box::new(self.node.eval(*input))
let result = self.node.eval(*input);
let output = async move { Box::new(result) as Any<'input> };
Box::pin(output)
}
fn reset(self: std::pin::Pin<&mut Self>) {
let wrapped_node = unsafe { self.map_unchecked_mut(|e| &mut e.node) };
@ -79,14 +86,15 @@ pub struct DynAnyInRefNode<I, O, Node> {
}
impl<'input, _I: 'input + StaticType, _O: 'input + StaticType, N: 'input> Node<'input, Any<'input>> for DynAnyInRefNode<_I, _O, N>
where
N: for<'any_input> Node<'any_input, &'any_input _I, Output = _O>,
N: for<'any_input> Node<'any_input, &'any_input _I, Output = DynFuture<'any_input, _O>>,
{
type Output = Any<'input>;
type Output = FutureAny<'input>;
fn eval(&'input self, input: Any<'input>) -> Self::Output {
{
let node_name = core::any::type_name::<N>();
let input: Box<&_I> = dyn_any::downcast(input).unwrap_or_else(|e| panic!("DynAnyInRefNode Input, {e} in:\n{node_name}"));
Box::new(self.node.eval(*input))
let result = self.node.eval(*input);
Box::pin(async move { Box::new(result.await) as Any<'_> })
}
}
}
@ -96,13 +104,37 @@ impl<_I, _O, S0> DynAnyInRefNode<_I, _O, S0> {
}
}
pub struct FutureWrapperNode<Node> {
node: Node,
}
impl<'i, T: 'i, N: Node<'i, T>> Node<'i, T> for FutureWrapperNode<N>
where
N: Node<'i, T>,
{
type Output = DynFuture<'i, N::Output>;
fn eval(&'i self, input: T) -> Self::Output {
Box::pin(async move { self.node.eval(input) })
}
fn reset(self: std::pin::Pin<&mut Self>) {
let wrapped_node = unsafe { self.map_unchecked_mut(|e| &mut e.node) };
Node::reset(wrapped_node);
}
}
impl<'i, N> FutureWrapperNode<N> {
pub const fn new(node: N) -> Self {
Self { node }
}
}
pub trait IntoTypeErasedNode<'n> {
fn into_type_erased(self) -> TypeErasedPinned<'n>;
}
impl<'n, N: 'n> IntoTypeErasedNode<'n> for N
where
N: for<'i> NodeIO<'i, Any<'i>, Output = Any<'i>> + Send + Sync + 'n,
N: for<'i> NodeIO<'i, Any<'i>, Output = FutureAny<'i>> + 'n,
{
fn into_type_erased(self) -> TypeErasedPinned<'n> {
Box::pin(self)
@ -139,17 +171,51 @@ pub struct DowncastBothNode<'a, I, O> {
_o: PhantomData<O>,
}
impl<'n: 'input, 'input, O: 'input + StaticType, I: 'input + StaticType> Node<'input, I> for DowncastBothNode<'n, I, O> {
type Output = DynFuture<'input, O>;
#[inline]
fn eval(&'input self, input: I) -> Self::Output {
{
let input = Box::new(input);
let future = self.node.eval(input);
Box::pin(async move {
let out = dyn_any::downcast(future.await).unwrap_or_else(|e| panic!("DowncastBothNode Input {e}"));
*out
})
}
}
}
impl<'n, I, O> DowncastBothNode<'n, I, O> {
pub const fn new(node: TypeErasedPinnedRef<'n>) -> Self {
Self {
node,
_i: core::marker::PhantomData,
_o: core::marker::PhantomData,
}
}
}
/// Boxes the input and downcasts the output.
/// Wraps around a node taking Box<dyn DynAny> and returning Box<dyn DynAny>
#[derive(Clone, Copy)]
pub struct DowncastBothSyncNode<'a, I, O> {
node: TypeErasedPinnedRef<'a>,
_i: PhantomData<I>,
_o: PhantomData<O>,
}
impl<'n: 'input, 'input, O: 'input + StaticType, I: 'input + StaticType> Node<'input, I> for DowncastBothSyncNode<'n, I, O> {
type Output = O;
#[inline]
fn eval(&'input self, input: I) -> Self::Output {
{
let input = Box::new(input);
let out = dyn_any::downcast(self.node.eval(input)).unwrap_or_else(|e| panic!("DowncastBothNode Input {e}"));
let future = self.node.eval(input);
let value = EvalSyncNode::new().eval(future);
let out = dyn_any::downcast(value).unwrap_or_else(|e| panic!("DowncastBothNode Input {e}"));
*out
}
}
}
impl<'n, I, O> DowncastBothNode<'n, I, O> {
impl<'n, I, O> DowncastBothSyncNode<'n, I, O> {
pub const fn new(node: TypeErasedPinnedRef<'n>) -> Self {
Self {
node,
@ -166,13 +232,15 @@ pub struct DowncastBothRefNode<'a, I, O> {
_i: PhantomData<(I, O)>,
}
impl<'n: 'input, 'input, O: 'input + StaticType, I: 'input + StaticType> Node<'input, I> for DowncastBothRefNode<'n, I, O> {
type Output = &'input O;
type Output = DynFuture<'input, &'input O>;
#[inline]
fn eval(&'input self, input: I) -> Self::Output {
{
let input = Box::new(input);
let out: Box<&_> = dyn_any::downcast::<&O>(self.node.eval(input)).unwrap_or_else(|e| panic!("DowncastBothRefNode Input {e}"));
*out
Box::pin(async move {
let out: Box<&_> = dyn_any::downcast::<&O>(self.node.eval(input).await).unwrap_or_else(|e| panic!("DowncastBothRefNode Input {e}"));
*out
})
}
}
}
@ -188,10 +256,12 @@ pub struct ComposeTypeErased<'a> {
}
impl<'i, 'a: 'i> Node<'i, Any<'i>> for ComposeTypeErased<'a> {
type Output = Any<'i>;
type Output = DynFuture<'i, Any<'i>>;
fn eval(&'i self, input: Any<'i>) -> Self::Output {
let arg = self.first.eval(input);
self.second.eval(arg)
Box::pin(async move {
let arg = self.first.eval(input).await;
self.second.eval(arg).await
})
}
}
@ -205,6 +275,21 @@ pub fn input_node<O: StaticType>(n: TypeErasedPinnedRef) -> DowncastBothNode<(),
DowncastBothNode::new(n)
}
pub struct PanicNode<I, O>(PhantomData<I>, PhantomData<O>);
impl<'i, I: 'i, O: 'i> Node<'i, I> for PanicNode<I, O> {
type Output = O;
fn eval(&'i self, _: I) -> Self::Output {
unimplemented!("This node should never be evaluated")
}
}
impl<I, O> PanicNode<I, O> {
pub const fn new() -> Self {
Self(PhantomData, PhantomData)
}
}
#[cfg(test)]
mod test {
use super::*;
@ -215,8 +300,8 @@ mod test {
pub fn dyn_input_invalid_eval_panic() {
//let add = DynAnyNode::new(AddNode::new()).into_type_erased();
//add.eval(Box::new(&("32", 32u32)));
let dyn_any = DynAnyNode::<(u32, u32), u32, _>::new(ValueNode::new(AddNode::new()));
let type_erased = dyn_any.into_type_erased();
let dyn_any = DynAnyNode::<(u32, u32), u32, _>::new(ValueNode::new(FutureWrapperNode { node: AddNode::new() }));
let type_erased = Box::pin(dyn_any) as TypeErasedPinned;
let _ref_type_erased = type_erased.as_ref();
//let type_erased = Box::pin(dyn_any) as TypeErasedPinned<'_>;
type_erased.eval(Box::new(&("32", 32u32)));
@ -226,10 +311,10 @@ mod test {
pub fn dyn_input_invalid_eval_panic_() {
//let add = DynAnyNode::new(AddNode::new()).into_type_erased();
//add.eval(Box::new(&("32", 32u32)));
let dyn_any = DynAnyNode::<(u32, u32), u32, _>::new(ValueNode::new(AddNode::new()));
let dyn_any = DynAnyNode::<(u32, u32), u32, _>::new(ValueNode::new(FutureWrapperNode { node: AddNode::new() }));
let type_erased = Box::pin(dyn_any) as TypeErasedPinned<'_>;
type_erased.eval(Box::new((4u32, 2u32)));
let id_node = IdNode::new();
let id_node = FutureWrapperNode::new(IdNode::new());
let type_erased_id = Box::pin(id_node) as TypeErasedPinned;
let type_erased = ComposeTypeErased::new(type_erased.as_ref(), type_erased_id.as_ref());
type_erased.eval(Box::new((4u32, 2u32)));

View file

@ -17,7 +17,7 @@ pub struct GpuCompiler<TypingContext, ShaderIO> {
// TODO: Move to graph-craft
#[node_macro::node_fn(GpuCompiler)]
fn compile_gpu(node: &'input DocumentNode, mut typing_context: TypingContext, io: ShaderIO) -> compilation_client::Shader {
async fn compile_gpu(node: &'input DocumentNode, mut typing_context: TypingContext, io: ShaderIO) -> compilation_client::Shader {
let compiler = graph_craft::executor::Compiler {};
let DocumentNodeImplementation::Network(network) = node.implementation;
let proto_network = compiler.compile_single(network, true).unwrap();
@ -25,7 +25,7 @@ fn compile_gpu(node: &'input DocumentNode, mut typing_context: TypingContext, io
let input_types = proto_network.inputs.iter().map(|id| typing_context.get_type(*id).unwrap()).map(|node_io| node_io.output).collect();
let output_type = typing_context.get_type(proto_network.output).unwrap().output;
let bytes = compilation_client::compile_sync(proto_network, input_types, output_type, io).unwrap();
let bytes = compilation_client::compile(proto_network, input_types, output_type, io).await.unwrap();
bytes
}
@ -34,7 +34,7 @@ pub struct MapGpuNode<Shader> {
}
#[node_macro::node_fn(MapGpuNode)]
fn map_gpu(inputs: Vec<ShaderInput<<NewExecutor as GpuExecutor>::BufferHandle>>, shader: &'any_input compilation_client::Shader) {
async fn map_gpu(inputs: Vec<ShaderInput<<NewExecutor as GpuExecutor>::BufferHandle>>, shader: &'any_input compilation_client::Shader) {
use graph_craft::executor::Executor;
let executor = NewExecutor::new().unwrap();
for input in shader.inputs.iter() {
@ -42,11 +42,13 @@ fn map_gpu(inputs: Vec<ShaderInput<<NewExecutor as GpuExecutor>::BufferHandle>>,
executor.write_buffer(buffer, input.data).unwrap();
}
todo!();
let executor: GpuExecutor = GpuExecutor::new(Context::new_sync().unwrap(), shader.into(), "gpu::eval".into()).unwrap();
/*
let executor: GpuExecutor = GpuExecutor::new(Context::new().await.unwrap(), shader.into(), "gpu::eval".into()).unwrap();
let data: Vec<_> = input.into_iter().collect();
let result = executor.execute(Box::new(data)).unwrap();
let result = dyn_any::downcast::<Vec<_O>>(result).unwrap();
*result
*/
}
pub struct MapGpuSingleImageNode<N> {

View file

@ -0,0 +1,32 @@
use std::future::Future;
use crate::Node;
pub struct GetNode;
#[node_macro::node_fn(GetNode)]
async fn get_node(url: String) -> reqwest::Response {
reqwest::get(url).await.unwrap()
}
pub struct PostNode<Body> {
body: Body,
}
#[node_macro::node_fn(PostNode)]
async fn post_node(url: String, body: String) -> reqwest::Response {
reqwest::Client::new().post(url).body(body).send().await.unwrap()
}
#[derive(Clone, Copy, Debug)]
pub struct EvalSyncNode {}
#[node_macro::node_fn(EvalSyncNode)]
fn eval_sync<F: Future + 'input>(future: F) -> F::Output {
let future = futures::future::maybe_done(future);
futures::pin_mut!(future);
match future.as_mut().take_output() {
Some(value) => value,
_ => panic!("Node construction future returned pending"),
}
}

View file

@ -10,6 +10,8 @@ pub mod memo;
pub mod raster;
pub mod http;
pub mod any;
#[cfg(feature = "gpu")]

View file

@ -12,7 +12,7 @@ pub struct GenerateQuantizationNode<N, M> {
}
#[node_macro::node_fn(GenerateQuantizationNode)]
fn generate_quantization_fn(image_frame: ImageFrame, samples: u32, function: u32) -> [Quantization; 4] {
fn generate_quantization_fn(image_frame: ImageFrame<Color>, samples: u32, function: u32) -> [Quantization; 4] {
let image = image_frame.image;
let len = image.data.len().min(10000);