diff --git a/core/incremental/dbsp.rs b/core/incremental/dbsp.rs index d4862b70a..eeab315d3 100644 --- a/core/incremental/dbsp.rs +++ b/core/incremental/dbsp.rs @@ -404,4 +404,57 @@ mod tests { let weight = zset.iter().find(|(k, _)| **k == 1).map(|(_, w)| w); assert_eq!(weight, Some(1)); } + + #[test] + fn test_hashable_row_delta_operations() { + let mut delta = Delta::new(); + + // Test INSERT + delta.insert(1, vec![Value::Integer(1), Value::Integer(100)]); + assert_eq!(delta.len(), 1); + + // Test UPDATE (DELETE + INSERT) - order matters! + delta.delete(1, vec![Value::Integer(1), Value::Integer(100)]); + delta.insert(1, vec![Value::Integer(1), Value::Integer(200)]); + assert_eq!(delta.len(), 3); // Should have 3 operations before consolidation + + // Verify order is preserved + let ops: Vec<_> = delta.changes.iter().collect(); + assert_eq!(ops[0].1, 1); // First insert + assert_eq!(ops[1].1, -1); // Delete + assert_eq!(ops[2].1, 1); // Second insert + + // Test consolidation + delta.consolidate(); + // After consolidation, the first insert and delete should cancel out + // leaving only the second insert + assert_eq!(delta.len(), 1); + + let final_row = &delta.changes[0]; + assert_eq!(final_row.0.rowid, 1); + assert_eq!( + final_row.0.values, + vec![Value::Integer(1), Value::Integer(200)] + ); + assert_eq!(final_row.1, 1); + } + + #[test] + fn test_duplicate_row_consolidation() { + let mut delta = Delta::new(); + + // Insert same row twice + delta.insert(2, vec![Value::Integer(2), Value::Integer(300)]); + delta.insert(2, vec![Value::Integer(2), Value::Integer(300)]); + + assert_eq!(delta.len(), 2); + + delta.consolidate(); + assert_eq!(delta.len(), 1); + + // Weight should be 2 (sum of both inserts) + let final_row = &delta.changes[0]; + assert_eq!(final_row.0.rowid, 2); + assert_eq!(final_row.1, 2); + } } diff --git a/core/incremental/operator.rs b/core/incremental/operator.rs index 278ce4ef9..54cd7e0a0 100644 --- a/core/incremental/operator.rs +++ b/core/incremental/operator.rs @@ -178,65 +178,6 @@ impl ComputationTracker { } } -#[cfg(test)] -mod dbsp_types_tests { - use super::*; - use crate::Value; - - #[test] - fn test_hashable_row_delta_operations() { - let mut delta = Delta::new(); - - // Test INSERT - delta.insert(1, vec![Value::Integer(1), Value::Integer(100)]); - assert_eq!(delta.len(), 1); - - // Test UPDATE (DELETE + INSERT) - order matters! - delta.delete(1, vec![Value::Integer(1), Value::Integer(100)]); - delta.insert(1, vec![Value::Integer(1), Value::Integer(200)]); - assert_eq!(delta.len(), 3); // Should have 3 operations before consolidation - - // Verify order is preserved - let ops: Vec<_> = delta.changes.iter().collect(); - assert_eq!(ops[0].1, 1); // First insert - assert_eq!(ops[1].1, -1); // Delete - assert_eq!(ops[2].1, 1); // Second insert - - // Test consolidation - delta.consolidate(); - // After consolidation, the first insert and delete should cancel out - // leaving only the second insert - assert_eq!(delta.len(), 1); - - let final_row = &delta.changes[0]; - assert_eq!(final_row.0.rowid, 1); - assert_eq!( - final_row.0.values, - vec![Value::Integer(1), Value::Integer(200)] - ); - assert_eq!(final_row.1, 1); - } - - #[test] - fn test_duplicate_row_consolidation() { - let mut delta = Delta::new(); - - // Insert same row twice - delta.insert(2, vec![Value::Integer(2), Value::Integer(300)]); - delta.insert(2, vec![Value::Integer(2), Value::Integer(300)]); - - assert_eq!(delta.len(), 2); - - delta.consolidate(); - assert_eq!(delta.len(), 1); - - // Weight should be 2 (sum of both inserts) - let final_row = &delta.changes[0]; - assert_eq!(final_row.0.rowid, 2); - assert_eq!(final_row.1, 2); - } -} - /// Represents an operator in the dataflow graph #[derive(Debug, Clone)] pub enum QueryOperator {