Add cache clearing to stop the memory leak (#1106)

* Add cache clearing

* Add TODO comment

---------

Co-authored-by: Keavon Chambers <keavon@keavon.com>
This commit is contained in:
0HyperCube 2023-04-09 23:30:57 +01:00 committed by Keavon Chambers
parent 9c4164291c
commit a58d51d685
4 changed files with 51 additions and 18 deletions

View file

@ -34,6 +34,7 @@ pub use raster::Color;
pub trait Node<'i, Input: 'i>: 'i {
type Output: 'i;
fn eval<'s: 'i>(&'s self, input: Input) -> Self::Output;
fn reset(self: Pin<&mut Self>) {}
}
#[cfg(feature = "alloc")]

View file

@ -34,7 +34,12 @@ where
Box::new(self.node.eval(*input))
}
}
fn reset(self: std::pin::Pin<&mut Self>) {
let wrapped_node = unsafe { self.map_unchecked_mut(|e| &mut e.node) };
Node::reset(wrapped_node);
}
}
impl<_I, _O, S0> DynAnyRefNode<_I, _O, S0> {
pub const fn new(node: S0) -> Self {
Self { node, _i: core::marker::PhantomData }

View file

@ -2,6 +2,8 @@ use graphene_core::Node;
use std::hash::{Hash, Hasher};
use std::marker::PhantomData;
use std::pin::Pin;
use std::sync::atomic::AtomicBool;
use xxhash_rust::xxh3::Xxh3;
/// Caches the output of a given Node and acts as a proxy
@ -9,7 +11,7 @@ use xxhash_rust::xxh3::Xxh3;
pub struct CacheNode<T, CachedNode> {
// We have to use an append only data structure to make sure the references
// to the cache entries are always valid
cache: boxcar::Vec<(u64, T)>,
cache: boxcar::Vec<(u64, T, AtomicBool)>,
node: CachedNode,
}
impl<'i, T: 'i, I: 'i + Hash, CachedNode: 'i> Node<'i, I> for CacheNode<T, CachedNode>
@ -22,17 +24,25 @@ where
input.hash(&mut hasher);
let hash = hasher.finish();
if let Some((_, cached_value)) = self.cache.iter().find(|(h, _)| *h == hash) {
if let Some((_, cached_value, keep)) = self.cache.iter().find(|(h, _, _)| *h == hash) {
keep.store(true, std::sync::atomic::Ordering::Relaxed);
return cached_value;
} else {
trace!("Cache miss");
let output = self.node.eval(input);
let index = self.cache.push((hash, output));
let index = self.cache.push((hash, output, AtomicBool::new(true)));
return &self.cache[index].1;
}
}
fn reset(mut self: Pin<&mut Self>) {
let old_cache = std::mem::take(&mut self.cache);
self.cache = old_cache.into_iter().filter(|(_, _, keep)| keep.swap(false, std::sync::atomic::Ordering::Relaxed)).collect();
}
}
impl<T, CachedNode> std::marker::Unpin for CacheNode<T, CachedNode> {}
impl<T, CachedNode> CacheNode<T, CachedNode> {
pub fn new(node: CachedNode) -> CacheNode<T, CachedNode> {
CacheNode { cache: boxcar::Vec::new(), node }
@ -72,8 +82,16 @@ impl<'i, T: 'i + Hash> Node<'i, Option<T>> for LetNode<T> {
None => &self.cache.iter().last().expect("Let node was not initialized").1,
}
}
fn reset(mut self: Pin<&mut Self>) {
if let Some(last) = std::mem::take(&mut self.cache).into_iter().last() {
self.cache = boxcar::vec![last];
}
}
}
impl<T> std::marker::Unpin for LetNode<T> {}
impl<T> LetNode<T> {
pub fn new() -> LetNode<T> {
LetNode { cache: boxcar::Vec::new() }

View file

@ -1,6 +1,6 @@
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::error::Error;
use std::{collections::HashMap, sync::Arc};
use std::sync::{Arc, RwLock};
use dyn_any::StaticType;
use graph_craft::document::value::UpcastNode;
@ -59,7 +59,7 @@ impl Executor for DynamicExecutor {
pub struct NodeContainer<'n> {
pub node: TypeErasedPinned<'n>,
// the dependencies are only kept to ensure that the nodes are not dropped while still in use
_dependencies: Vec<Arc<NodeContainer<'static>>>,
_dependencies: Vec<Arc<RwLock<NodeContainer<'static>>>>,
}
impl<'a> core::fmt::Debug for NodeContainer<'a> {
@ -69,7 +69,7 @@ impl<'a> core::fmt::Debug for NodeContainer<'a> {
}
impl<'a> NodeContainer<'a> {
pub fn new(node: TypeErasedPinned<'a>, _dependencies: Vec<Arc<NodeContainer<'static>>>) -> Self {
pub fn new(node: TypeErasedPinned<'a>, _dependencies: Vec<Arc<RwLock<NodeContainer<'static>>>>) -> Self {
Self { node, _dependencies }
}
@ -89,7 +89,7 @@ impl NodeContainer<'static> {
#[derive(Default, Debug, Clone)]
pub struct BorrowTree {
nodes: HashMap<NodeId, Arc<NodeContainer<'static>>>,
nodes: HashMap<NodeId, Arc<RwLock<NodeContainer<'static>>>>,
}
impl BorrowTree {
@ -107,6 +107,11 @@ impl BorrowTree {
for (id, node) in proto_network.nodes {
if !self.nodes.contains_key(&id) {
self.push_node(id, node, typing_context)?;
} else {
let Some(node_container) = self.nodes.get_mut(&id) else { continue };
let mut node_container_writer = node_container.write().unwrap();
let node = node_container_writer.node.as_mut();
node.reset();
}
old_nodes.remove(&id);
}
@ -114,29 +119,33 @@ impl BorrowTree {
}
fn node_refs(&self, nodes: &[NodeId]) -> Vec<TypeErasedPinnedRef<'static>> {
self.node_deps(nodes).into_iter().map(|node| unsafe { node.as_ref().static_ref() }).collect()
self.node_deps(nodes).into_iter().map(|node| unsafe { node.read().unwrap().static_ref() }).collect()
}
fn node_deps(&self, nodes: &[NodeId]) -> Vec<Arc<NodeContainer<'static>>> {
fn node_deps(&self, nodes: &[NodeId]) -> Vec<Arc<RwLock<NodeContainer<'static>>>> {
nodes.iter().map(|node| self.nodes.get(node).unwrap().clone()).collect()
}
fn store_node(&mut self, node: Arc<NodeContainer<'static>>, id: NodeId) -> Arc<NodeContainer<'static>> {
fn store_node(&mut self, node: Arc<RwLock<NodeContainer<'static>>>, id: NodeId) -> Arc<RwLock<NodeContainer<'static>>> {
self.nodes.insert(id, node.clone());
node
}
pub fn get(&self, id: NodeId) -> Option<Arc<NodeContainer<'static>>> {
pub fn get(&self, id: NodeId) -> Option<Arc<RwLock<NodeContainer<'static>>>> {
self.nodes.get(&id).cloned()
}
pub fn eval<'i, I: StaticType + 'i, O: StaticType + 'i>(&self, id: NodeId, input: I) -> Option<O> {
pub fn eval<'i, I: StaticType + 'i, O: StaticType + 'i>(&'i self, id: NodeId, input: I) -> Option<O> {
let node = self.nodes.get(&id).cloned()?;
let output = node.node.eval(Box::new(input));
let reader = node.read().unwrap();
let output = reader.node.eval(Box::new(input));
dyn_any::downcast::<O>(output).ok().map(|o| *o)
}
pub fn eval_any<'i, 's: 'i>(&'s self, id: NodeId, input: Any<'i>) -> Option<Any<'i>> {
pub fn eval_any<'i>(&'i self, id: NodeId, input: Any<'i>) -> Option<Any<'i>> {
let node = self.nodes.get(&id)?;
Some(node.node.eval(input))
// TODO: Comments by @TrueDoctor before this was merged:
// TODO: Oof I dislike the evaluation being an unsafe operation but I guess its fine because it only is a lifetime extension
// TODO: We should ideally let miri run on a test that evaluates the nodegraph multiple times to check if this contains any subtle UB but this looks fine for now
Some(unsafe { (*((&*node.read().unwrap()) as *const NodeContainer)).node.eval(input) })
}
pub fn free_node(&mut self, id: NodeId) {
@ -152,7 +161,7 @@ impl BorrowTree {
let node = Box::pin(upcasted) as TypeErasedPinned<'_>;
let node = NodeContainer { node, _dependencies: vec![] };
let node = unsafe { node.erase_lifetime() };
self.store_node(Arc::new(node), id);
self.store_node(Arc::new(node.into()), id);
}
ConstructionArgs::Nodes(ids) => {
let ids: Vec<_> = ids.iter().map(|(id, _)| *id).collect();
@ -164,7 +173,7 @@ impl BorrowTree {
_dependencies: self.node_deps(&ids),
};
let node = unsafe { node.erase_lifetime() };
self.store_node(Arc::new(node), id);
self.store_node(Arc::new(node.into()), id);
}
};
Ok(())