limbo/core/vdbe/hash_table.rs

2410 lines
90 KiB
Rust
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

use crate::{
error::LimboError,
io::{Buffer, Completion, TempFile, IO},
io_yield_one,
storage::sqlite3_ondisk::{read_varint, write_varint},
translate::collate::CollationSeq,
turso_assert,
types::{IOCompletions, IOResult, Value, ValueRef},
CompletionError, Result,
};
use parking_lot::RwLock;
use rapidhash::fast::RapidHasher;
use std::hash::Hasher;
use std::sync::{atomic, Arc};
use std::{cell::RefCell, collections::VecDeque};
use std::{
cmp::{Eq, Ordering},
sync::atomic::AtomicUsize,
};
use turso_macros::AtomicEnum;
const DEFAULT_SEED: u64 = 1337;
// set to a *very* small 32KB, intentionally to trigger frequent spilling during tests
#[cfg(debug_assertions)]
pub const DEFAULT_MEM_BUDGET: usize = 32 * 1024;
/// 64MB default memory budget for hash joins.
/// TODO: make configurable via PRAGMA
#[cfg(not(debug_assertions))]
pub const DEFAULT_MEM_BUDGET: usize = 64 * 1024 * 1024;
const DEFAULT_BUCKETS: usize = 1024;
/// Number of partitions for grace hash join
pub const NUM_PARTITIONS: usize = 16;
/// Bits used for partition selection from hash
const PARTITION_BITS: u32 = 4;
const NULL_HASH: u8 = 0;
const INT_HASH: u8 = 1;
const FLOAT_HASH: u8 = 2;
const TEXT_HASH: u8 = 3;
const BLOB_HASH: u8 = 4;
/// Hash function for join keys using rapidhash
/// Takes collation into account when hashing text values
fn hash_join_key(key_values: &[ValueRef], collations: &[CollationSeq]) -> u64 {
let mut hasher = RapidHasher::new(DEFAULT_SEED);
for (idx, value) in key_values.iter().enumerate() {
match value {
ValueRef::Null => {
hasher.write_u8(NULL_HASH);
}
ValueRef::Integer(i) => {
// Hash integers in the same bucket as numerically equivalent REALs so e.g. 10 and 10.0 have the same hash.
let f = *i as f64;
if (f as i64) == *i && f.is_finite() {
hasher.write_u8(FLOAT_HASH);
let bits = normalized_f64_bits(f);
hasher.write(&bits.to_le_bytes());
} else {
// Fallback to the integer domain when the float representation would lose precision.
hasher.write_u8(INT_HASH);
hasher.write_i64(*i);
}
}
ValueRef::Float(f) => {
hasher.write_u8(FLOAT_HASH);
let bits = normalized_f64_bits(*f);
hasher.write(&bits.to_le_bytes());
}
ValueRef::Text(text) => {
let collation = collations.get(idx).unwrap_or(&CollationSeq::Binary);
hasher.write_u8(TEXT_HASH);
match collation {
CollationSeq::NoCase => {
let lowercase = text.as_str().to_lowercase();
hasher.write(lowercase.as_bytes());
}
CollationSeq::Rtrim => {
let trimmed = text.as_str().trim_end();
hasher.write(trimmed.as_bytes());
}
CollationSeq::Binary | CollationSeq::Unset => {
hasher.write(text.as_bytes());
}
}
}
ValueRef::Blob(blob) => {
hasher.write_u8(BLOB_HASH);
hasher.write(blob);
}
}
}
hasher.finish()
}
/// Normalize signed zero so 0.0 and -0.0 hash the same.
#[inline]
fn normalized_f64_bits(f: f64) -> u64 {
if f == 0.0 {
0.0f64.to_bits()
} else {
f.to_bits()
}
}
/// Check if any of the key values is NULL.
/// Rows with NULL join keys should be skipped in hash joins since NULL != NULL in SQL.
fn has_null_key(key_values: &[Value]) -> bool {
key_values.iter().any(|v| matches!(v, Value::Null))
}
/// Check if any of the key value refs is NULL.
fn has_null_key_ref(key_values: &[ValueRef]) -> bool {
key_values.iter().any(|v| matches!(v, ValueRef::Null))
}
/// Check if two key value arrays are equal, taking collation into account.
fn keys_equal(key1: &[Value], key2: &[ValueRef], collations: &[CollationSeq]) -> bool {
if key1.len() != key2.len() {
return false;
}
for (idx, (v1, v2)) in key1.iter().zip(key2.iter()).enumerate() {
let collation = collations.get(idx).copied().unwrap_or(CollationSeq::Binary);
if !values_equal(v1.as_ref(), *v2, collation) {
return false;
}
}
true
}
/// Check if two values are equal, using the specified collation for text comparison.
/// NOTE: In SQL, NULL = NULL evaluates to NULL (falsy), so this returns false for NULL comparisons.
fn values_equal(v1: ValueRef, v2: ValueRef, collation: CollationSeq) -> bool {
match (v1, v2) {
// NULL = NULL is false in SQL (actually NULL, which is falsy)
(ValueRef::Null, _) | (_, ValueRef::Null) => false,
(ValueRef::Integer(i1), ValueRef::Integer(i2)) => i1 == i2,
(ValueRef::Float(f1), ValueRef::Float(f2)) => f1 == f2,
(ValueRef::Integer(i), ValueRef::Float(f)) | (ValueRef::Float(f), ValueRef::Integer(i)) => {
ValueRef::Integer(i) == ValueRef::Float(f)
}
(ValueRef::Blob(b1), ValueRef::Blob(b2)) => b1 == b2,
(ValueRef::Text(t1), ValueRef::Text(t2)) => {
// Use collation for text comparison
collation.compare_strings(t1.as_str(), t2.as_str()) == Ordering::Equal
}
_ => false,
}
}
/// State machine states for hash table operations.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum HashTableState {
Building,
Probing,
Spilled,
Closed,
}
/// A single entry in a hash table bucket.
#[derive(Debug, Clone)]
pub struct HashEntry {
/// Hash value of the join keys.
pub hash: u64,
/// The join key values.
pub key_values: Vec<Value>,
/// The rowid of the row in the build table.
/// During probe phase, we'll use SeekRowid to fetch the full row
/// (unless payload_values contains all needed columns).
pub rowid: i64,
/// Optional payload values - columns from the build table that are stored
/// directly in the hash entry to avoid SeekRowid during probe phase.
/// When populated, these are the result columns needed from the build table,
/// stored in column index order as specified during hash table construction.
pub payload_values: Vec<Value>,
}
impl HashEntry {
fn new(hash: u64, key_values: Vec<Value>, rowid: i64) -> Self {
Self {
hash,
key_values,
rowid,
payload_values: Vec::new(),
}
}
fn new_with_payload(
hash: u64,
key_values: Vec<Value>,
rowid: i64,
payload_values: Vec<Value>,
) -> Self {
Self {
hash,
key_values,
rowid,
payload_values,
}
}
/// Returns true if this entry has payload values stored.
pub fn has_payload(&self) -> bool {
!self.payload_values.is_empty()
}
/// Get the size of this entry in bytes (approximate).
fn size_bytes(&self) -> usize {
let value_size = |v: &Value| match v {
Value::Null => 1,
Value::Integer(_) => 8,
Value::Float(_) => 8,
Value::Text(t) => t.as_str().len(),
Value::Blob(b) => b.len(),
};
let key_size: usize = self.key_values.iter().map(value_size).sum();
let payload_size: usize = self.payload_values.iter().map(value_size).sum();
key_size + payload_size + 8 + 8 // +8 for hash, +8 for rowid
}
/// Serialize this entry to bytes for disk storage.
/// Format: [hash:8][rowid:8][num_keys:varint][keys...][num_payload:varint][payload...]
/// Each value is: [type:1][len:varint (for text/blob)][data]
fn serialize(&self, buf: &mut Vec<u8>) {
buf.extend_from_slice(&self.hash.to_le_bytes());
buf.extend_from_slice(&self.rowid.to_le_bytes());
// Write number of keys and key values
let varint_buf = &mut [0u8; 9];
let len = write_varint(varint_buf, self.key_values.len() as u64);
buf.extend_from_slice(&varint_buf[..len]);
for value in &self.key_values {
Self::serialize_value(value, buf, varint_buf);
}
// Write number of payload values and payload values
let len = write_varint(varint_buf, self.payload_values.len() as u64);
buf.extend_from_slice(&varint_buf[..len]);
for value in &self.payload_values {
Self::serialize_value(value, buf, varint_buf);
}
}
/// Helper to serialize a single Value to bytes.
fn serialize_value(value: &Value, buf: &mut Vec<u8>, varint_buf: &mut [u8; 9]) {
match value {
Value::Null => {
buf.push(NULL_HASH);
}
Value::Integer(i) => {
buf.push(INT_HASH);
buf.extend_from_slice(&i.to_le_bytes());
}
Value::Float(f) => {
buf.push(FLOAT_HASH);
buf.extend_from_slice(&f.to_le_bytes());
}
Value::Text(t) => {
buf.push(TEXT_HASH);
let bytes = t.as_str().as_bytes();
let len = write_varint(varint_buf, bytes.len() as u64);
buf.extend_from_slice(&varint_buf[..len]);
buf.extend_from_slice(bytes);
}
Value::Blob(b) => {
buf.push(BLOB_HASH);
let len = write_varint(varint_buf, b.len() as u64);
buf.extend_from_slice(&varint_buf[..len]);
buf.extend_from_slice(b);
}
}
}
/// Deserialize an entry from bytes, returning (entry, bytes_consumed) or error.
fn deserialize(buf: &[u8]) -> Result<(Self, usize)> {
if buf.len() < 16 {
return Err(LimboError::Corrupt(
"HashEntry: buffer too small for header".to_string(),
));
}
// buffer len checked above
let hash = u64::from_le_bytes(buf[0..8].try_into().expect("expect 8 bytes"));
let rowid = i64::from_le_bytes(buf[8..16].try_into().expect("expect 8 bytes"));
let mut offset = 16;
// Read number of keys and key values
let (num_keys, varint_len) = read_varint(&buf[offset..])?;
offset += varint_len;
let mut key_values = Vec::with_capacity(num_keys as usize);
for _ in 0..num_keys {
let (value, consumed) = Self::deserialize_value(&buf[offset..])?;
key_values.push(value);
offset += consumed;
}
// Read number of payload values and payload values
let (num_payload, varint_len) = read_varint(&buf[offset..])?;
offset += varint_len;
let mut payload_values = Vec::with_capacity(num_payload as usize);
for _ in 0..num_payload {
let (value, consumed) = Self::deserialize_value(&buf[offset..])?;
payload_values.push(value);
offset += consumed;
}
Ok((
Self {
hash,
key_values,
rowid,
payload_values,
},
offset,
))
}
/// Helper to deserialize a single Value from bytes.
/// Returns (Value, bytes_consumed).
fn deserialize_value(buf: &[u8]) -> Result<(Value, usize)> {
if buf.is_empty() {
return Err(LimboError::Corrupt(
"HashEntry: unexpected end of buffer".to_string(),
));
}
let value_type = buf[0];
let mut offset = 1;
let value = match value_type {
NULL_HASH => Value::Null,
INT_HASH => {
if offset + 8 > buf.len() {
return Err(LimboError::Corrupt(
"HashEntry: buffer too small for integer".to_string(),
));
}
let i =
i64::from_le_bytes(buf[offset..offset + 8].try_into().expect("expect 8 bytes"));
offset += 8;
Value::Integer(i)
}
FLOAT_HASH => {
if offset + 8 > buf.len() {
return Err(LimboError::Corrupt(
"HashEntry: buffer too small for float".to_string(),
));
}
let f =
f64::from_le_bytes(buf[offset..offset + 8].try_into().expect("expect 8 bytes"));
offset += 8;
Value::Float(f)
}
TEXT_HASH => {
let (str_len, varint_len) = read_varint(&buf[offset..])?;
offset += varint_len;
if offset + str_len as usize > buf.len() {
return Err(LimboError::Corrupt(
"HashEntry: buffer too small for text".to_string(),
));
}
let s = String::from_utf8(buf[offset..offset + str_len as usize].to_vec())
.map_err(|_| LimboError::Corrupt("Invalid UTF-8 in text".to_string()))?;
offset += str_len as usize;
Value::Text(s.into())
}
BLOB_HASH => {
let (blob_len, varint_len) = read_varint(&buf[offset..])?;
offset += varint_len;
if offset + blob_len as usize > buf.len() {
return Err(LimboError::Corrupt(
"HashEntry: buffer too small for blob".to_string(),
));
}
let b = buf[offset..offset + blob_len as usize].to_vec();
offset += blob_len as usize;
Value::Blob(b)
}
_ => {
return Err(LimboError::Corrupt(format!(
"HashEntry: unknown value type {value_type}",
)));
}
};
Ok((value, offset))
}
}
/// Get partition index from hash value
#[inline(always)]
fn partition_from_hash(hash: u64) -> usize {
// Use top bits for partition to distribute evenly
((hash >> (64 - PARTITION_BITS)) as usize) & (NUM_PARTITIONS - 1)
}
/// A bucket in the hash table. Uses chaining for collision resolution.
#[derive(Debug, Clone)]
pub struct HashBucket {
entries: Vec<HashEntry>,
}
impl HashBucket {
fn new() -> Self {
Self {
entries: Vec::new(),
}
}
fn insert(&mut self, entry: HashEntry) {
self.entries.push(entry);
}
fn find_matches<'a>(
&'a self,
hash: u64,
probe_keys: &[ValueRef],
collations: &[CollationSeq],
) -> Vec<&'a HashEntry> {
self.entries
.iter()
.filter(|entry| {
entry.hash == hash && keys_equal(&entry.key_values, probe_keys, collations)
})
.collect()
}
fn is_empty(&self) -> bool {
self.entries.is_empty()
}
fn size_bytes(&self) -> usize {
self.entries.iter().map(|e| e.size_bytes()).sum()
}
}
/// I/O state for spilled partition operations
#[derive(Debug, AtomicEnum, Clone, Copy, PartialEq, Eq)]
pub enum SpillIOState {
None,
WaitingForWrite,
WriteComplete,
WaitingForRead,
ReadComplete,
Error,
}
/// State of a partition in a spilled hash table
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PartitionState {
/// data is in partition_buffers
InMemory,
/// Has been written to disk, not yet loaded
OnDisk,
/// Is being loaded from disk (I/O in progress)
Loading,
/// Has been loaded from disk and is ready for probing
Loaded,
}
/// A chunk of partition data spilled to disk.
/// A partition may be spilled multiple times, creating multiple chunks.
#[derive(Debug)]
struct SpillChunk {
/// File offset where this chunk's data starts
file_offset: u64,
/// Size in bytes of this chunk on disk
size_bytes: usize,
/// Number of entries in this chunk
num_entries: usize,
}
/// Tracks a partition that has been spilled to disk during grace hash join.
pub struct SpilledPartition {
/// Partition index (0 to NUM_PARTITIONS-1)
pub partition_idx: usize,
/// Chunks of data belonging to this partition (may have multiple spills)
chunks: Vec<SpillChunk>,
/// Current state of the partition
state: PartitionState,
/// I/O state for async operations
io_state: Arc<AtomicSpillIOState>,
/// Read buffer for loading partition back
read_buffer: Arc<RwLock<Vec<u8>>>,
/// Length of data in read buffer
buffer_len: Arc<AtomicUsize>,
/// Hash buckets for this partition (populated after loading)
buckets: Vec<HashBucket>,
/// Current chunk being loaded (for multi-chunk reads)
current_chunk_idx: usize,
/// Approximate memory used by the resident buckets for this partition
resident_mem: usize,
}
impl SpilledPartition {
fn new(partition_idx: usize) -> Self {
Self {
partition_idx,
chunks: Vec::new(),
state: PartitionState::OnDisk,
io_state: Arc::new(AtomicSpillIOState::new(SpillIOState::None)),
read_buffer: Arc::new(RwLock::new(Vec::new())),
buffer_len: Arc::new(atomic::AtomicUsize::new(0)),
buckets: Vec::new(),
current_chunk_idx: 0,
resident_mem: 0,
}
}
/// Add a new chunk of data to this partition
fn add_chunk(&mut self, file_offset: u64, size_bytes: usize, num_entries: usize) {
self.chunks.push(SpillChunk {
file_offset,
size_bytes,
num_entries,
});
}
/// Get total size in bytes across all chunks
fn total_size_bytes(&self) -> usize {
self.chunks.iter().map(|c| c.size_bytes).sum()
}
/// Get total number of entries across all chunks
fn total_num_entries(&self) -> usize {
self.chunks.iter().map(|c| c.num_entries).sum()
}
fn buffer_len(&self) -> usize {
self.buffer_len.load(atomic::Ordering::SeqCst)
}
/// Check if partition is ready for probing
pub fn is_loaded(&self) -> bool {
matches!(
self.state,
PartitionState::Loaded | PartitionState::InMemory
)
}
/// Check if there are more chunks to load
fn has_more_chunks(&self) -> bool {
self.current_chunk_idx < self.chunks.len()
}
/// Get the current chunk to load, if any
fn current_chunk(&self) -> Option<&SpillChunk> {
self.chunks.get(self.current_chunk_idx)
}
}
/// In-memory partition buffer for grace hash join.
/// During build phase, entries are first accumulated here before spilling.
struct PartitionBuffer {
/// Entries in this partition
entries: Vec<HashEntry>,
/// Total memory used by entries in this partition
mem_used: usize,
}
impl PartitionBuffer {
fn new() -> Self {
Self {
entries: Vec::new(),
mem_used: 0,
}
}
fn insert(&mut self, entry: HashEntry) {
self.mem_used += entry.size_bytes();
self.entries.push(entry);
}
fn clear(&mut self) {
self.entries.clear();
self.mem_used = 0;
}
fn is_empty(&self) -> bool {
self.entries.is_empty()
}
}
/// Configuration for the hash table.
#[derive(Debug, Clone)]
pub struct HashTableConfig {
/// Initial number of buckets (must be power of 2).
pub initial_buckets: usize,
/// Maximum memory budget in bytes.
pub mem_budget: usize,
/// Number of keys in the join condition.
pub num_keys: usize,
/// Collation sequences for each join key.
pub collations: Vec<CollationSeq>,
}
impl Default for HashTableConfig {
fn default() -> Self {
Self {
initial_buckets: DEFAULT_BUCKETS,
mem_budget: DEFAULT_MEM_BUDGET,
num_keys: 1,
collations: vec![CollationSeq::Binary],
}
}
}
struct SpillState {
/// In-memory partition buffers for grace hash join.
/// When spilling is triggered, entries are partitioned by hash before writing.
partition_buffers: Vec<PartitionBuffer>,
/// Spilled partitions metadata, tracks what's on disk
partitions: Vec<SpilledPartition>,
/// Current file offset for next spill write
next_spill_offset: u64,
/// Temporary file for spilled data.
temp_file: TempFile,
}
impl SpillState {
fn new(io: &Arc<dyn IO>) -> Result<Self> {
Ok(SpillState {
partition_buffers: (0..NUM_PARTITIONS)
.map(|_| PartitionBuffer::new())
.collect(),
partitions: Vec::new(),
next_spill_offset: 0,
temp_file: TempFile::new(io)?,
})
}
fn find_partition_mut(&mut self, logical_idx: usize) -> Option<&mut SpilledPartition> {
self.partitions
.iter_mut()
.find(|p| p.partition_idx == logical_idx)
}
fn find_partition(&self, logical_idx: usize) -> Option<&SpilledPartition> {
self.partitions
.iter()
.find(|p| p.partition_idx == logical_idx)
}
}
/// HashTable is the build-side data structure used for hash joins. It behaves like a
/// standard in-memory hash table until a configurable memory budget is exceeded, at
/// which point it transparently switches to a grace-hash-join style layout and spills
/// partitions to disk.
///
/// # Overview
///
/// The table is keyed by an N-column join key. Keys are hashed using a stable
/// rapidhash hasher that is aware of SQLite-style collations for text values.
/// Each entry stores:
///
/// - the precomputed hash value,
/// - a owned copy of the join key values, and
/// - the rowid of the build-side row, used later to SeekRowid into the build table.
///
/// Collisions within a hash bucket are resolved using simple chaining (a `Vec<HashEntry>`),
/// and equality is determined by comparing the stored key values against probe keys using
/// the same collation-aware comparison logic that was used when hashing.
///
/// - Construction:
/// - `mem_budget` is an approximate upper bound on memory consumed by the
/// build side for this join. It applies to:
/// * `mem_used`: the base in-memory structures (buckets in non-spilled
/// mode, plus any never-spilled partitions).
/// * `loaded_partitions_mem`: additional resident memory for partitions
/// that were spilled to disk and later reloaded for probing.
///
/// - Build phase ([HashTableState::Building] / `::Spilled`):
/// - `insert`:
/// * Inserts (key_values, rowid) into the in-memory hash table.
/// * Tracks per-entry size via [HashEntry::size_bytes] and increments
/// `mem_used`.
/// * If `mem_used + new_entry_size > mem_budget`:
/// - On first overflow, transitions into spilled mode:
/// * Allocates `SpillState`.
/// * Redistributes existing buckets into `NUM_PARTITIONS`
/// partition buffers via `partition_from_hash`.
/// * Sets `state to [HashTableState::Spilled].
/// - Spills whole partition buffers to disk, always picking the largest
/// non-empty partition first, until the new entry fits.
/// - Spilling:
/// * Serializes entries in a partition buffer into a temp file,
/// appending as one `SpillChunk` (file_offset, size, #entries).
/// * Clears that partition buffer and reduces `mem_used` by
/// its `partition.mem_used`.
/// * Updates `SpilledPartition.chunks` and `next_spill_offset`.
/// - In spilled mode:
/// * New inserts are written into the corresponding `PartitionBuffer`.
/// * These entries are either:
/// - Spilled later (creating new chunks), or
/// - Materialized in-memory as never-spilled partitions if they never
/// required spilling at finalize time (see below).
///
/// - `finalize_build`:
/// - Completes pending writes for any partitions that were already spilled.
/// - For each `partition_buffers[i]`:
/// * If there is already a `SpilledPartition` for `i` (i.e. we have
/// existing chunks), we spill the buffer to disk (creating more chunks)
/// and clear it, reducing `mem_used` accordingly.
/// * Otherwise, the partition has never been spilled:
/// - We call `materialize_partition_in_memory(i)`:
/// * Takes owned entries out of the buffer.
/// * Builds in-memory buckets for that partition.
/// * Marks the resulting `SpilledPartition` as
/// `PartitionState::InMemory`.
/// * These InMemory partitions are not tracked in
/// `loaded_partitions_lru` and do not contribute to
/// `loaded_partitions_mem`.
/// - After this, the table transitions to `HashTableState::Probing`.
///
/// - Probe phase (`HashTableState::Probing`):
/// - Non-spilled case:
/// * If `spill_state.is_none()`, probing is directly over `buckets`.
/// * `probe()` / `next_match()` walk the bucket chain for the hash.
/// - Spilled case:
/// * All build-side data now lives in `SpilledPartition`s:
/// - Some partitions may be `InMemory` (never spilled, always
/// resident, counted only in `mem_used`).
/// - Some partitions may be `OnDisk` with one or more `chunks`
/// (spilled build side).
/// - Some partitions may be `Loaded` (disk-backed but currently
/// resident in memory as buckets; counted in `loaded_partitions_mem`
/// and tracked in `loaded_partitions_lru`).
/// * The VDBE is expected to:
/// - Compute the partition index for a probe key:
/// `partition_for_keys(probe_keys)`.
/// - Ensure the partition is resident:
/// `load_spilled_partition(partition_idx)`
/// (no-op for never-spilled / InMemory partitions).
/// - Then probe via:
/// `probe_partition(partition_idx, keys)`
/// or the top-level `probe()` / `next_match()` helpers.
///
/// - Probe-time cache / thrash behavior:
/// - `loaded_partitions_lru` and `loaded_partitions_mem` form an LRU cache of
/// *only spilled* partitions that are currently loaded.
/// - When loading or parsing a spilled partition:
/// * We estimate its memory footprint as:
/// `resident_mem = partition_bucket_mem(partition.buckets)`
/// and set `partition.resident_mem`.
/// * Before keeping it resident, we call:
/// `evict_partitions_to_fit(resident_mem, protect_idx)`
/// which:
/// - Repeatedly evicts the least-recently-used partition whose state
/// is `Loaded` and that has backing `chunks` (`!chunks.is_empty()`),
/// skipping `protect_idx`.
/// - Eviction clears the partitions buckets, resets its state to
/// `OnDisk`, zeros `resident_mem`, and decrements
/// `loaded_partitions_mem`.
/// - The loop stops once:
/// mem_used + loaded_partitions_mem + incoming_mem <= mem_budget
/// or there is no further evictable candidate, in which case the
/// hash join may temporarily exceed `mem_budget` (bounded by one
/// partitions `resident_mem`).
/// * After eviction, we call:
/// `record_partition_resident(partition_idx, resident_mem)`
/// which:
/// - Adjusts `loaded_partitions_mem` by replacing the prior
/// `resident_mem` for that partition.
/// - Marks the partition as most recently used in
/// `loaded_partitions_lru`.
/// - For partitions that are already loaded, `load_spilled_partition()`
/// simply updates their position in the LRU without changing the memory
/// accounting.
pub struct HashTable {
/// The hash buckets (used when not spilled).
buckets: Vec<HashBucket>,
/// Number of entries in the table.
num_entries: usize,
/// Current memory usage in bytes.
mem_used: usize,
/// Memory budget in bytes.
mem_budget: usize,
/// Number of join keys.
num_keys: usize,
/// Collation sequences for each join key.
collations: Vec<CollationSeq>,
/// Current state of the hash table.
state: HashTableState,
/// IO object for disk operations.
io: Arc<dyn IO>,
/// Current probe position bucket index.
probe_bucket_idx: usize,
/// Current probe entry index within bucket.
probe_entry_idx: usize,
/// Cached hash of current probe keys (to avoid recomputing)
current_probe_hash: Option<u64>,
/// Current probe key values being searched.
current_probe_keys: Option<Vec<Value>>,
spill_state: Option<SpillState>,
/// Index of current spilled partition being probed
current_spill_partition_idx: usize,
/// LRU of loaded partitions to cap probe-time memory
loaded_partitions_lru: RefCell<VecDeque<usize>>,
/// Memory used by resident (loaded or in-memory) partitions
loaded_partitions_mem: usize,
}
enum SpillAction {
AlreadyLoaded,
NeedsParsing,
WaitingForIO,
NoChunks,
LoadChunk {
read_size: usize,
file_offset: u64,
io_state: Arc<AtomicSpillIOState>,
buffer_len: Arc<AtomicUsize>,
read_buffer_ref: Arc<RwLock<Vec<u8>>>,
},
Restart,
NotFound,
}
impl HashTable {
/// Create a new hash table.
pub fn new(config: HashTableConfig, io: Arc<dyn IO>) -> Self {
let buckets = (0..config.initial_buckets)
.map(|_| HashBucket::new())
.collect();
Self {
buckets,
num_entries: 0,
mem_used: 0,
mem_budget: config.mem_budget,
num_keys: config.num_keys,
collations: config.collations,
state: HashTableState::Building,
io,
probe_bucket_idx: 0,
probe_entry_idx: 0,
current_probe_keys: None,
current_probe_hash: None,
spill_state: None,
current_spill_partition_idx: 0,
loaded_partitions_lru: VecDeque::new().into(),
loaded_partitions_mem: 0,
}
}
/// Get the current state of the hash table.
pub fn get_state(&self) -> &HashTableState {
&self.state
}
/// Insert a row into the hash table, returns IOResult because this may spill to disk.
/// When memory budget is exceeded, triggers grace hash join by partitioning and spilling.
/// Rows with NULL join keys are skipped since NULL != NULL in SQL.
pub fn insert(
&mut self,
key_values: Vec<Value>,
rowid: i64,
payload_values: Vec<Value>,
) -> Result<IOResult<()>> {
turso_assert!(
self.state == HashTableState::Building || self.state == HashTableState::Spilled,
"Cannot insert into hash table in state {:?}",
self.state
);
// Skip rows with NULL join keys - they can never match anything since NULL != NULL in SQL
if has_null_key(&key_values) {
return Ok(IOResult::Done(()));
}
// Compute hash of the join keys using collations
let key_refs: Vec<ValueRef> = key_values.iter().map(|v| v.as_ref()).collect();
let hash = hash_join_key(&key_refs, &self.collations);
let entry = if payload_values.is_empty() {
HashEntry::new(hash, key_values, rowid)
} else {
HashEntry::new_with_payload(hash, key_values, rowid, payload_values)
};
let entry_size = entry.size_bytes();
// Check if we would exceed memory budget
if self.mem_used + entry_size > self.mem_budget {
if self.spill_state.is_none() {
tracing::debug!(
"Hash table memory budget exceeded (used: {}, budget: {}), spilling to disk",
self.mem_used,
self.mem_budget
);
// First time exceeding budget, trigger spill
// Move all existing bucket entries into partition buffers
self.spill_state = Some(SpillState::new(&self.io)?);
self.redistribute_to_partitions();
self.state = HashTableState::Spilled;
};
// Spill whole partitions until the new entry fits
if let Some(c) = self.spill_partitions_for_entry(entry_size)? {
// I/O pending, caller will re-enter after completion and retry the insert.
if !c.finished() {
return Ok(IOResult::IO(IOCompletions::Single(c)));
}
}
}
if let Some(spill_state) = &mut self.spill_state {
// In spilled mode, insert into partition buffer
let partition_idx = partition_from_hash(hash);
spill_state.partition_buffers[partition_idx].insert(entry);
} else {
// Normal mode, insert into hash bucket
let bucket_idx = (hash as usize) % self.buckets.len();
self.buckets[bucket_idx].insert(entry);
}
self.num_entries += 1;
self.mem_used += entry_size;
Ok(IOResult::Done(()))
}
/// Redistribute existing bucket entries into partition buffers for grace hash join.
fn redistribute_to_partitions(&mut self) {
for bucket in self.buckets.drain(..) {
for entry in bucket.entries {
let partition_idx = partition_from_hash(entry.hash);
self.spill_state
.as_mut()
.expect("spill state must exist")
.partition_buffers[partition_idx]
.insert(entry);
}
}
}
/// Return the next partition which should be spilled to disk, for simplicity,
/// we always select the largest non-empty partition buffer.
fn next_partition_to_spill(&self, _required_free: usize) -> Option<usize> {
let spill_state = self.spill_state.as_ref()?;
spill_state
.partition_buffers
.iter()
.enumerate()
.filter(|(_, p)| !p.is_empty())
.max_by_key(|(_, p)| p.mem_used)
.map(|(idx, _)| idx)
}
/// Spill the given partition buffer to disk and return the pending completion.
fn spill_partition(&mut self, partition_idx: usize) -> Result<Option<Completion>> {
let spill_state = self.spill_state.as_mut().expect("Spill state must exist");
let partition = &spill_state.partition_buffers[partition_idx];
if partition.is_empty() {
return Ok(None);
}
// Serialize entries from partition buffer
let estimated_size: usize = partition.entries.iter().map(|e| e.size_bytes() + 9).sum();
let mut serialized_data = Vec::with_capacity(estimated_size);
let mut len_buf = [0u8; 9];
let mut entry_buf = Vec::new();
for entry in &partition.entries {
entry.serialize(&mut entry_buf);
let len_varint_size = write_varint(&mut len_buf, entry_buf.len() as u64);
serialized_data.extend_from_slice(&len_buf[..len_varint_size]);
serialized_data.extend_from_slice(&entry_buf);
entry_buf.clear();
}
let file_offset = spill_state.next_spill_offset;
let data_size = serialized_data.len();
let num_entries = spill_state.partition_buffers[partition_idx].entries.len();
let mem_freed = spill_state.partition_buffers[partition_idx].mem_used;
spill_state.partition_buffers[partition_idx].clear();
// Find existing partition or create new one
let io_state = if let Some(existing) = spill_state.find_partition_mut(partition_idx) {
existing.add_chunk(file_offset, data_size, num_entries);
existing.io_state.clone()
} else {
let mut new_partition = SpilledPartition::new(partition_idx);
new_partition.add_chunk(file_offset, data_size, num_entries);
let io_state = new_partition.io_state.clone();
spill_state.partitions.push(new_partition);
io_state
};
io_state.set(SpillIOState::WaitingForWrite);
let buffer = Buffer::new_temporary(data_size);
buffer.as_mut_slice().copy_from_slice(&serialized_data);
let buffer_ref = Arc::new(buffer);
let _buffer_ref_clone = buffer_ref.clone();
let write_complete = Box::new(move |res: Result<i32, crate::CompletionError>| match res {
Ok(_) => {
// keep buffer alive for async IO
let _buf = _buffer_ref_clone.clone();
tracing::trace!("Successfully wrote spilled partition to disk");
io_state.set(SpillIOState::WriteComplete);
}
Err(e) => {
tracing::error!("Error writing spilled partition to disk: {e:?}");
io_state.set(SpillIOState::Error);
}
});
let completion = Completion::new_write(write_complete);
let file = spill_state.temp_file.file.clone();
let completion = file.pwrite(file_offset, buffer_ref, completion)?;
// Update state
self.mem_used -= mem_freed;
spill_state.next_spill_offset += data_size as u64;
Ok(Some(completion))
}
/// Spill as many whole partitions as needed to keep the incoming entry within budget.
fn spill_partitions_for_entry(&mut self, entry_size: usize) -> Result<Option<Completion>> {
loop {
if self.mem_used + entry_size <= self.mem_budget {
return Ok(None);
}
let required_free = self.mem_used + entry_size - self.mem_budget;
let Some(partition_idx) = self.next_partition_to_spill(required_free) else {
return Ok(None);
};
if let Some(c) = self.spill_partition(partition_idx)? {
// Wait for caller to re-enter after the I/O completes.
if !c.finished() {
return Ok(Some(c));
}
}
}
}
/// Convert a never-spilled partition buffer into in-memory buckets for probing.
fn materialize_partition_in_memory(&mut self, partition_idx: usize) {
let spill_state = self.spill_state.as_mut().expect("spill state must exist");
if spill_state.find_partition(partition_idx).is_some() {
return;
}
let partition_buffer = &mut spill_state.partition_buffers[partition_idx];
if partition_buffer.is_empty() {
return;
}
let entries = std::mem::take(&mut partition_buffer.entries);
// we don't change self.mem_used here, as these entries
// were always in memory. were just changing their layout
partition_buffer.mem_used = 0;
let bucket_count = entries.len().next_power_of_two().max(64);
let mut buckets = (0..bucket_count)
.map(|_| HashBucket::new())
.collect::<Vec<_>>();
for entry in entries {
let bucket_idx = (entry.hash as usize) % bucket_count;
buckets[bucket_idx].insert(entry);
}
let mut partition = SpilledPartition::new(partition_idx);
partition.state = PartitionState::InMemory;
partition.buckets = buckets;
partition.resident_mem = 0;
spill_state.partitions.push(partition);
}
/// Finalize the build phase and prepare for probing.
/// If spilled, flushes remaining in-memory partition entries to disk.
pub fn finalize_build(&mut self) -> Result<IOResult<()>> {
turso_assert!(
self.state == HashTableState::Building || self.state == HashTableState::Spilled,
"Cannot finalize build in state {:?}",
self.state
);
if self.spill_state.is_some() {
{
// Check for pending writes from previous call
let spill_state = self.spill_state.as_ref().expect("spill state must exist");
for spilled in &spill_state.partitions {
if matches!(spilled.io_state.get(), SpillIOState::WaitingForWrite) {
io_yield_one!(Completion::new_yield());
}
}
}
// Determine which partitions need to spill vs stay in memory without holding
// a mutable borrow across the spill/materialize calls.
let mut spill_targets = Vec::new();
let mut materialize_targets = Vec::new();
{
let spill_state = self.spill_state.as_ref().expect("spill state must exist");
for partition_idx in 0..NUM_PARTITIONS {
let partition = &spill_state.partition_buffers[partition_idx];
if partition.is_empty() {
continue;
}
if spill_state.find_partition(partition_idx).is_some() {
spill_targets.push(partition_idx);
} else {
materialize_targets.push(partition_idx);
}
}
}
for partition_idx in spill_targets {
if let Some(completion) = self.spill_partition(partition_idx)? {
// Return I/O completion to caller, they will re-enter after completion
if !completion.finished() {
io_yield_one!(completion);
}
}
}
for partition_idx in materialize_targets {
self.materialize_partition_in_memory(partition_idx);
}
}
self.current_spill_partition_idx = 0;
self.state = HashTableState::Probing;
Ok(IOResult::Done(()))
}
/// Probe the hash table with the given keys, returns the first matching entry if found.
/// NOTE: Calling `probe` on a spilled table requires the relevant partition to be loaded.
/// Returns None immediately if any probe key is NULL since NULL != NULL in SQL.
pub fn probe(&mut self, probe_keys: Vec<Value>) -> Option<&HashEntry> {
turso_assert!(
self.state == HashTableState::Probing,
"Cannot probe hash table in state {:?}",
self.state
);
// Skip probing if any key is NULL - NULL can never match anything in SQL
if has_null_key(&probe_keys) {
self.current_probe_keys = Some(probe_keys);
self.current_probe_hash = None;
return None;
}
// Store probe keys first
self.current_probe_keys = Some(probe_keys);
// Compute hash of probe keys using collations
let probe_keys_ref = self
.current_probe_keys
.as_ref()
.expect("prob keys were set");
let key_refs: Vec<ValueRef> = probe_keys_ref.iter().map(|v| v.as_ref()).collect();
let hash = hash_join_key(&key_refs, &self.collations);
self.current_probe_hash = Some(hash);
// Reset probe state
self.probe_entry_idx = 0;
if let Some(spill_state) = self.spill_state.as_ref() {
// In spilled mode, search through loaded entries from spilled partitions
// that match this probe key's partition
let target_partition = partition_from_hash(hash);
let partition = spill_state.find_partition(target_partition)?;
if partition.buckets.is_empty() {
return None;
}
self.touch_partition_lru(target_partition);
let bucket_idx = (hash as usize) % partition.buckets.len();
self.probe_bucket_idx = bucket_idx;
self.current_spill_partition_idx = target_partition;
let bucket = &partition.buckets[bucket_idx];
for (idx, entry) in bucket.entries.iter().enumerate() {
if entry.hash == hash && keys_equal(&entry.key_values, &key_refs, &self.collations)
{
self.probe_entry_idx = idx + 1;
return Some(entry);
}
}
None
} else {
// Normal mode - search in hash buckets
let bucket_idx = (hash as usize) % self.buckets.len();
self.probe_bucket_idx = bucket_idx;
let bucket = &self.buckets[bucket_idx];
for (idx, entry) in bucket.entries.iter().enumerate() {
if entry.hash == hash && keys_equal(&entry.key_values, &key_refs, &self.collations)
{
self.probe_entry_idx = idx + 1;
return Some(entry);
}
}
None
}
}
/// Get the next matching entry for the current probe keys.
pub fn next_match(&mut self) -> Option<&HashEntry> {
turso_assert!(
self.state == HashTableState::Probing,
"Cannot get next match in state {:?}",
self.state
);
turso_assert!(self.current_probe_keys.is_some(), "probe keys must be set");
let probe_keys = self.current_probe_keys.as_ref()?;
let key_refs: Vec<ValueRef> = probe_keys.iter().map(|v| v.as_ref()).collect();
let hash = match self.current_probe_hash {
Some(h) => h,
None => {
let h = hash_join_key(&key_refs, &self.collations);
self.current_probe_hash = Some(h);
h
}
};
if let Some(spill_state) = self.spill_state.as_ref() {
let partition_idx = self.current_spill_partition_idx;
// sanity check to ensure we cached the correct position
debug_assert_eq!(partition_idx, partition_from_hash(hash));
let partition = spill_state.find_partition(partition_idx)?;
if partition.buckets.is_empty() {
return None;
}
let bucket = &partition.buckets[self.probe_bucket_idx];
// Continue from where we left off
for idx in self.probe_entry_idx..bucket.entries.len() {
let entry = &bucket.entries[idx];
if entry.hash == hash && keys_equal(&entry.key_values, &key_refs, &self.collations)
{
self.probe_entry_idx = idx + 1;
return Some(entry);
}
}
None
} else {
// non-spilled case, seach in main buckets
let bucket = &self.buckets[self.probe_bucket_idx];
for idx in self.probe_entry_idx..bucket.entries.len() {
let entry = &bucket.entries[idx];
if entry.hash == hash && keys_equal(&entry.key_values, &key_refs, &self.collations)
{
// update probe entry index for next call
self.probe_entry_idx = idx + 1;
return Some(entry);
}
}
None
}
}
/// Get the number of spilled partitions.
pub fn num_partitions(&self) -> usize {
self.spill_state
.as_ref()
.map(|s| s.partitions.len())
.unwrap_or(0)
}
/// Check if a specific partition is loaded and ready for probing.
pub fn is_partition_loaded(&self, partition_idx: usize) -> bool {
self.spill_state
.as_ref()
.and_then(|s| s.find_partition(partition_idx))
.is_some_and(|p| p.is_loaded())
}
/// Re-entrantly load spilled partitions from disk
pub fn load_spilled_partition(&mut self, partition_idx: usize) -> Result<IOResult<()>> {
loop {
// to avoid holding mut borrows, split this into two phases.
let action = {
let spill_state = match &mut self.spill_state {
Some(s) => s,
None => return Ok(IOResult::Done(())),
};
let spilled = match spill_state.find_partition_mut(partition_idx) {
Some(p) => p,
None => return Ok(IOResult::Done(())),
};
let io_state = spilled.io_state.get();
if matches!(io_state, SpillIOState::Error) {
return Err(LimboError::InternalError(
"hash join spill I/O failure".into(),
));
}
// Already fully loaded
if spilled.is_loaded() {
SpillAction::AlreadyLoaded
} else if matches!(io_state, SpillIOState::WaitingForRead) {
// We've scheduled a read, caller must wait for completion.
SpillAction::WaitingForIO
} else if matches!(io_state, SpillIOState::ReadComplete) {
// A chunk finished reading: advance to next chunk.
spilled.current_chunk_idx += 1;
spilled.io_state.set(SpillIOState::None);
if spilled.has_more_chunks() {
// We have more chunks to read, loop will re-enter and schedule next one.
SpillAction::Restart
} else {
// All chunks have been read, we now need to parse everything.
SpillAction::NeedsParsing
}
} else {
match spilled.current_chunk() {
Some(chunk) => {
let read_size = chunk.size_bytes;
let file_offset = chunk.file_offset;
if read_size == 0 {
// Empty chunk: skip it and move to the next.
spilled.current_chunk_idx += 1;
if spilled.has_more_chunks() {
SpillAction::Restart
} else {
// No more chunks, but nothing to read; mark loaded.
spilled.state = PartitionState::Loaded;
SpillAction::NoChunks
}
} else {
// Non-empty chunk, schedule a read for it.
let buffer_len = spilled.buffer_len.clone();
let read_buffer_ref = spilled.read_buffer.clone();
spilled.io_state.set(SpillIOState::WaitingForRead);
spilled.state = PartitionState::Loading;
SpillAction::LoadChunk {
read_size,
file_offset,
io_state: spilled.io_state.clone(),
buffer_len,
read_buffer_ref,
}
}
}
None => {
// No chunks at all: partition is logically empty, mark as loaded.
spilled.state = PartitionState::Loaded;
SpillAction::NoChunks
}
}
}
};
match action {
SpillAction::AlreadyLoaded => {
self.touch_partition_lru(partition_idx);
return Ok(IOResult::Done(()));
}
SpillAction::NoChunks => {
self.evict_partitions_to_fit(0, partition_idx);
self.record_partition_resident(partition_idx, 0);
return Ok(IOResult::Done(()));
}
SpillAction::NotFound => {
return Ok(IOResult::Done(()));
}
SpillAction::NeedsParsing => {
// All chunks are read, build buckets from the accumulated buffer.
self.parse_partition_entries(partition_idx)?;
return Ok(IOResult::Done(()));
}
SpillAction::WaitingForIO => {
io_yield_one!(Completion::new_yield());
}
SpillAction::Restart => {
// We advanced state (e.g., moved past an empty chunk or completed a chunk),
// so just loop again and recompute the next action.
continue;
}
SpillAction::LoadChunk {
read_size,
file_offset,
io_state,
buffer_len,
read_buffer_ref,
} => {
let read_buffer = Arc::new(Buffer::new_temporary(read_size));
let read_complete = Box::new(
move |res: Result<(Arc<Buffer>, i32), CompletionError>| match res {
Ok((buf, bytes_read)) => {
tracing::trace!(
"Completed read of spilled partition chunk: bytes_read={}",
bytes_read
);
// Append data to our persistent buffer (accumulating chunks).
let mut persistent_buf = read_buffer_ref.write();
persistent_buf
.extend_from_slice(&buf.as_slice()[..bytes_read as usize]);
buffer_len.fetch_add(bytes_read as usize, atomic::Ordering::SeqCst);
io_state.set(SpillIOState::ReadComplete);
}
Err(e) => {
tracing::error!("Error reading spilled partition chunk: {e:?}");
io_state.set(SpillIOState::Error);
}
},
);
let completion = Completion::new_read(read_buffer, read_complete);
let spill_state = self.spill_state.as_ref().expect("spill state must exist");
let c = spill_state.temp_file.file.pread(file_offset, completion)?;
if !c.finished() {
io_yield_one!(c);
}
}
}
}
}
/// Parse entries from the read buffer into buckets for a partition.
fn parse_partition_entries(&mut self, partition_idx: usize) -> Result<()> {
let resident_mem = {
let spill_state = self.spill_state.as_mut().expect("spill state must exist");
let partition = spill_state
.find_partition_mut(partition_idx)
.expect("partition must exist for parsing");
let data_len = partition.buffer_len();
let data_guard = partition.read_buffer.read();
let data = &data_guard[..data_len];
let mut entries = Vec::new();
let mut offset = 0;
while offset < data.len() {
let (entry_len, varint_size) = read_varint(&data[offset..])?;
offset += varint_size;
if offset + entry_len as usize > data.len() {
return Err(LimboError::Corrupt("HashEntry: truncated entry".into()));
}
let (entry, consumed) =
HashEntry::deserialize(&data[offset..offset + entry_len as usize])?;
turso_assert!(
consumed == entry_len as usize,
"expected to consume entire entry"
);
entries.push(entry);
offset += entry_len as usize;
}
drop(data_guard);
let total_num_entries = partition.total_num_entries();
tracing::trace!(
"parsing partition entries: partition_idx={}, data_len={}, parsed_entries={}, total_num_entries={}",
partition_idx, data_len, entries.len(), total_num_entries
);
let bucket_count = total_num_entries.next_power_of_two().max(64);
partition.buckets = (0..bucket_count).map(|_| HashBucket::new()).collect();
for entry in entries {
let bucket_idx = (entry.hash as usize) % bucket_count;
partition.buckets[bucket_idx].insert(entry);
}
partition.state = PartitionState::Loaded;
partition.resident_mem = Self::partition_bucket_mem(&partition.buckets);
// Release staging buffer to free memory now that buckets are built.
partition.buffer_len.store(0, atomic::Ordering::SeqCst);
partition.read_buffer.write().clear();
partition.resident_mem
};
// Evict other partitions if needed before keeping this one resident.
self.evict_partitions_to_fit(resident_mem, partition_idx);
self.record_partition_resident(partition_idx, resident_mem);
Ok(())
}
/// Probe a specific partition with the given keys. The partition must be loaded first via `load_spilled_partition`.
/// VDBE *must* call load_spilled_partition(partition_idx) and get IOResult::Done before calling probe.
/// Returns None immediately if any probe key is NULL since NULL != NULL in SQL.
pub fn probe_partition(
&mut self,
partition_idx: usize,
probe_keys: &[Value],
) -> Option<&HashEntry> {
// Skip probing if any key is NULL - NULL can never match anything in SQL
if has_null_key(probe_keys) {
self.current_probe_keys = Some(probe_keys.to_vec());
self.current_probe_hash = None;
return None;
}
let key_refs: Vec<ValueRef> = probe_keys.iter().map(|v| v.as_ref()).collect();
let hash = hash_join_key(&key_refs, &self.collations);
// Store probe keys for subsequent next_match calls
self.current_probe_keys = Some(probe_keys.to_vec());
self.current_probe_hash = Some(hash);
self.touch_partition_lru(partition_idx);
let spill_state = self.spill_state.as_ref()?;
let partition = spill_state.find_partition(partition_idx)?;
if !partition.is_loaded() || partition.buckets.is_empty() {
return None;
}
let bucket_idx = (hash as usize) % partition.buckets.len();
let bucket = &partition.buckets[bucket_idx];
self.probe_bucket_idx = bucket_idx;
self.current_spill_partition_idx = partition_idx;
for (idx, entry) in bucket.entries.iter().enumerate() {
if entry.hash == hash && keys_equal(&entry.key_values, &key_refs, &self.collations) {
self.probe_entry_idx = idx + 1;
return Some(entry);
}
}
None
}
/// Get the partition index for a given probe key hash.
pub fn partition_for_keys(&self, probe_keys: &[Value]) -> usize {
let key_refs: Vec<ValueRef> = probe_keys.iter().map(|v| v.as_ref()).collect();
let hash = hash_join_key(&key_refs, &self.collations);
partition_from_hash(hash)
}
/// Returns true if the hash table has spilled to disk.
pub fn has_spilled(&self) -> bool {
self.spill_state.is_some()
}
/// Approximate memory used by a partition's buckets.
fn partition_bucket_mem(buckets: &[HashBucket]) -> usize {
buckets.iter().map(|b| b.size_bytes()).sum()
}
/// Touch a partition for LRU ordering without changing its accounted memory.
fn touch_partition_lru(&self, partition_idx: usize) {
let mut lru = self.loaded_partitions_lru.borrow_mut();
if let Some(pos) = lru.iter().position(|p| *p == partition_idx) {
lru.remove(pos);
}
lru.push_back(partition_idx);
}
/// Record that a partition is resident with the given memory footprint and update LRU.
fn record_partition_resident(&mut self, partition_idx: usize, mem_used: usize) {
if let Some(spill_state) = self.spill_state.as_mut() {
if let Some(partition) = spill_state.find_partition_mut(partition_idx) {
self.loaded_partitions_mem = self
.loaded_partitions_mem
.saturating_sub(partition.resident_mem);
partition.resident_mem = mem_used;
self.loaded_partitions_mem += mem_used;
self.touch_partition_lru(partition_idx);
}
}
}
/// Evict least-recently used spillable partitions until there is room for `incoming_mem`.
fn evict_partitions_to_fit(&mut self, incoming_mem: usize, protect_idx: usize) {
while self.mem_used + self.loaded_partitions_mem + incoming_mem > self.mem_budget {
let Some(victim_idx) = self.next_evictable(protect_idx) else {
break;
};
let mut freed = 0;
if let Some(spill_state) = self.spill_state.as_mut() {
if let Some(victim) = spill_state.find_partition_mut(victim_idx) {
if matches!(victim.state, PartitionState::Loaded) {
freed = victim.resident_mem;
victim.buckets.clear();
victim.state = PartitionState::OnDisk;
victim.resident_mem = 0;
victim.current_chunk_idx = 0;
victim.buffer_len.store(0, atomic::Ordering::SeqCst);
victim.read_buffer.write().clear();
victim.io_state.set(SpillIOState::None);
}
}
}
self.loaded_partitions_mem = self.loaded_partitions_mem.saturating_sub(freed);
}
}
/// Find the next evictable partition (LRU) that is not protected and has backing spill data.
fn next_evictable(&mut self, protect_idx: usize) -> Option<usize> {
let spill_state = self.spill_state.as_ref()?;
let len = self.loaded_partitions_lru.borrow().len();
for i in 0..len {
let lru = self.loaded_partitions_lru.borrow();
let candidate = lru[i];
if candidate == protect_idx {
continue;
}
if let Some(p) = spill_state.find_partition(candidate) {
let has_disk = !p.chunks.is_empty();
drop(lru);
if matches!(p.state, PartitionState::Loaded) && has_disk {
self.loaded_partitions_lru.borrow_mut().remove(i);
return Some(candidate);
}
}
}
None
}
/// Close the hash table and free resources.
pub fn close(&mut self) {
self.state = HashTableState::Closed;
self.buckets.clear();
self.num_entries = 0;
self.mem_used = 0;
self.loaded_partitions_lru.borrow_mut().clear();
self.loaded_partitions_mem = 0;
let _ = self.spill_state.take();
}
}
#[cfg(test)]
mod hashtests {
use super::*;
use crate::MemoryIO;
#[test]
fn test_hash_function_consistency() {
// Test that the same keys produce the same hash
let keys1 = vec![
ValueRef::Integer(42),
ValueRef::Text(crate::types::TextRef::new(
"hello",
crate::types::TextSubtype::Text,
)),
];
let keys2 = vec![
ValueRef::Integer(42),
ValueRef::Text(crate::types::TextRef::new(
"hello",
crate::types::TextSubtype::Text,
)),
];
let keys3 = vec![
ValueRef::Integer(43),
ValueRef::Text(crate::types::TextRef::new(
"hello",
crate::types::TextSubtype::Text,
)),
];
let collations = vec![CollationSeq::Binary, CollationSeq::Binary];
let hash1 = hash_join_key(&keys1, &collations);
let hash2 = hash_join_key(&keys2, &collations);
let hash3 = hash_join_key(&keys3, &collations);
assert_eq!(hash1, hash2);
assert_ne!(hash1, hash3);
}
#[test]
fn test_hash_function_numeric_equivalence() {
let collations = vec![CollationSeq::Binary];
// Zero variants should hash identically
let h_zero = hash_join_key(&[ValueRef::Float(0.0)], &collations);
let h_neg_zero = hash_join_key(&[ValueRef::Float(-0.0)], &collations);
let h_int_zero = hash_join_key(&[ValueRef::Integer(0)], &collations);
assert_eq!(h_zero, h_neg_zero);
assert_eq!(h_zero, h_int_zero);
// Integer/float representations of the same numeric value should match
let h_ten_int = hash_join_key(&[ValueRef::Integer(10)], &collations);
let h_ten_float = hash_join_key(&[ValueRef::Float(10.0)], &collations);
assert_eq!(h_ten_int, h_ten_float);
let h_neg_ten_int = hash_join_key(&[ValueRef::Integer(-10)], &collations);
let h_neg_ten_float = hash_join_key(&[ValueRef::Float(-10.0)], &collations);
assert_eq!(h_neg_ten_int, h_neg_ten_float);
// Positive/negative values should still differ
assert_ne!(h_ten_int, h_neg_ten_int);
}
#[test]
fn test_keys_equal() {
let key1 = vec![Value::Integer(42), Value::Text("hello".to_string().into())];
let key2 = vec![
ValueRef::Integer(42),
ValueRef::Text(crate::types::TextRef::new(
"hello",
crate::types::TextSubtype::Text,
)),
];
let key3 = vec![
ValueRef::Integer(43),
ValueRef::Text(crate::types::TextRef::new(
"hello",
crate::types::TextSubtype::Text,
)),
];
let collations = vec![CollationSeq::Binary, CollationSeq::Binary];
assert!(keys_equal(&key1, &key2, &collations));
assert!(!keys_equal(&key1, &key3, &collations));
}
#[test]
fn test_hash_table_basic() {
let io = Arc::new(MemoryIO::new());
let config = HashTableConfig {
initial_buckets: 4,
mem_budget: 1024 * 1024,
num_keys: 1,
collations: vec![CollationSeq::Binary],
};
let mut ht = HashTable::new(config, io);
// Insert some entries (late materialization - only store rowids)
let key1 = vec![Value::Integer(1)];
let _ = ht.insert(key1.clone(), 100, vec![]).unwrap();
let key2 = vec![Value::Integer(2)];
let _ = ht.insert(key2.clone(), 200, vec![]).unwrap();
let _ = ht.finalize_build();
// Probe for key1
let result = ht.probe(key1.clone());
assert!(result.is_some());
let entry1 = result.unwrap();
assert_eq!(entry1.key_values[0].as_ref(), ValueRef::Integer(1));
assert_eq!(entry1.rowid, 100);
// Probe for key2
let result = ht.probe(key2);
assert!(result.is_some());
let entry2 = result.unwrap();
assert_eq!(entry2.key_values[0].as_ref(), ValueRef::Integer(2));
assert_eq!(entry2.rowid, 200);
// Probe for non-existent key
let result = ht.probe(vec![Value::Integer(999)]);
assert!(result.is_none());
}
#[test]
fn test_hash_table_collisions() {
let io = Arc::new(MemoryIO::new());
let config = HashTableConfig {
initial_buckets: 2, // Small number to force collisions
mem_budget: 1024 * 1024,
num_keys: 1,
collations: vec![CollationSeq::Binary],
};
let mut ht = HashTable::new(config, io);
// Insert multiple entries (late materialization - only store rowids)
for i in 0..10 {
let key = vec![Value::Integer(i)];
let _ = ht.insert(key, i * 100, vec![]).unwrap();
}
let _ = ht.finalize_build();
// Verify all entries can be found
for i in 0..10 {
let result = ht.probe(vec![Value::Integer(i)]);
assert!(result.is_some());
let entry = result.unwrap();
assert_eq!(entry.key_values[0].as_ref(), ValueRef::Integer(i));
assert_eq!(entry.rowid, i * 100);
}
}
#[test]
fn test_hash_table_duplicate_keys() {
let io = Arc::new(MemoryIO::new());
let config = HashTableConfig {
initial_buckets: 4,
mem_budget: 1024 * 1024,
num_keys: 1,
collations: vec![CollationSeq::Binary],
};
let mut ht = HashTable::new(config, io);
// Insert multiple entries with the same key
let key = vec![Value::Integer(42)];
for i in 0..3 {
let _ = ht.insert(key.clone(), 1000 + i, vec![]).unwrap();
}
let _ = ht.finalize_build();
// Probe should return first match
let result = ht.probe(key.clone());
assert!(result.is_some());
assert_eq!(result.unwrap().rowid, 1000);
// next_match should return additional matches
let result2 = ht.next_match();
assert!(result2.is_some());
assert_eq!(result2.unwrap().rowid, 1001);
let result3 = ht.next_match();
assert!(result3.is_some());
assert_eq!(result3.unwrap().rowid, 1002);
// No more matches
let result4 = ht.next_match();
assert!(result4.is_none());
}
#[test]
fn test_hash_entry_serialization() {
// Test that entries serialize and deserialize correctly
let entry = HashEntry::new(
12345,
vec![
Value::Integer(42),
Value::Text("hello".to_string().into()),
Value::Null,
Value::Float(std::f64::consts::PI),
],
100,
);
let mut buf = Vec::new();
entry.serialize(&mut buf);
let (deserialized, consumed) = HashEntry::deserialize(&buf).unwrap();
assert_eq!(consumed, buf.len());
assert_eq!(deserialized.hash, entry.hash);
assert_eq!(deserialized.rowid, entry.rowid);
assert_eq!(deserialized.key_values.len(), entry.key_values.len());
for (v1, v2) in deserialized.key_values.iter().zip(entry.key_values.iter()) {
match (v1, v2) {
(Value::Integer(i1), Value::Integer(i2)) => assert_eq!(i1, i2),
(Value::Text(t1), Value::Text(t2)) => assert_eq!(t1.as_str(), t2.as_str()),
(Value::Float(f1), Value::Float(f2)) => assert!((f1 - f2).abs() < 1e-10),
(Value::Null, Value::Null) => {}
_ => panic!("Value type mismatch"),
}
}
}
#[test]
fn test_partition_from_hash() {
// Test partition distribution
let mut counts = [0usize; NUM_PARTITIONS];
for i in 0u64..10000 {
let hash = i.wrapping_mul(0x9E3779B97F4A7C15); // Simple hash spreading
let partition = partition_from_hash(hash);
assert!(partition < NUM_PARTITIONS);
counts[partition] += 1;
}
// Check reasonable distribution (each partition should have some entries)
for count in counts {
assert!(count > 0, "Each partition should have some entries");
}
}
#[test]
fn test_spill_chunk_tracking() {
// Test that SpilledPartition can track multiple chunks
let mut partition = SpilledPartition::new(5);
assert_eq!(partition.partition_idx, 5);
assert!(partition.chunks.is_empty());
assert_eq!(partition.total_size_bytes(), 0);
assert_eq!(partition.total_num_entries(), 0);
// Add first chunk
partition.add_chunk(0, 1000, 50);
assert_eq!(partition.chunks.len(), 1);
assert_eq!(partition.total_size_bytes(), 1000);
assert_eq!(partition.total_num_entries(), 50);
// Add second chunk
partition.add_chunk(1000, 500, 25);
assert_eq!(partition.chunks.len(), 2);
assert_eq!(partition.total_size_bytes(), 1500);
assert_eq!(partition.total_num_entries(), 75);
// Check individual chunks
assert_eq!(partition.chunks[0].file_offset, 0);
assert_eq!(partition.chunks[0].size_bytes, 1000);
assert_eq!(partition.chunks[1].file_offset, 1000);
assert_eq!(partition.chunks[1].size_bytes, 500);
}
#[test]
fn test_hash_function_respects_collation_nocase() {
use crate::types::{TextRef, TextSubtype};
let keys1 = vec![ValueRef::Text(TextRef::new("Hello", TextSubtype::Text))];
let keys2 = vec![ValueRef::Text(TextRef::new("hello", TextSubtype::Text))];
// Under BINARY: hashes must differ
let bin_coll = vec![CollationSeq::Binary];
let h1_bin = hash_join_key(&keys1, &bin_coll);
let h2_bin = hash_join_key(&keys2, &bin_coll);
assert_ne!(h1_bin, h2_bin);
// Under NOCASE: hashes should be equal
let nocase_coll = vec![CollationSeq::NoCase];
let h1_nc = hash_join_key(&keys1, &nocase_coll);
let h2_nc = hash_join_key(&keys2, &nocase_coll);
assert_eq!(h1_nc, h2_nc);
}
#[test]
fn test_values_equal_with_collations() {
use crate::types::{TextRef, TextSubtype};
let h1 = ValueRef::Text(TextRef::new("Hello ", TextSubtype::Text));
let h2 = ValueRef::Text(TextRef::new("hello", TextSubtype::Text));
// Binary: case / trailing spaces matter
assert!(!values_equal(h1, h2, CollationSeq::Binary));
// NOCASE: case-insensitive but trailing spaces still matter -> likely false
assert!(!values_equal(h1, h2, CollationSeq::NoCase));
// RTRIM: ignore trailing spaces, but case is still significant
let h3 = ValueRef::Text(TextRef::new("Hello", TextSubtype::Text));
assert!(values_equal(h1, h3, CollationSeq::Rtrim));
}
#[test]
fn test_keys_equal_with_collations() {
use crate::types::{TextRef, TextSubtype};
let key1 = vec![Value::Text("Hello".into())];
let key2 = vec![ValueRef::Text(TextRef::new("hello", TextSubtype::Text))];
// Binary: not equal
assert!(!keys_equal(&key1, &key2, &[CollationSeq::Binary]));
// NOCASE: equal
assert!(keys_equal(&key1, &key2, &[CollationSeq::NoCase]));
}
#[test]
fn test_hash_entry_deserialization_truncated() {
let entry = HashEntry::new(123, vec![Value::Integer(1), Value::Text("abc".into())], 42);
let mut buf = Vec::new();
entry.serialize(&mut buf);
// Cut off the buffer mid-entry
let truncated = &buf[..buf.len() - 2];
let res = HashEntry::deserialize(truncated);
assert!(
res.is_err(),
"truncated buffer should be rejected as corrupt"
);
}
#[test]
fn test_hash_entry_deserialization_garbage_type_tag() {
let entry = HashEntry::new(1, vec![Value::Integer(10)], 7);
let mut buf = Vec::new();
entry.serialize(&mut buf);
// Compute the exact offset of the *first* type tag.
// Layout: [0..8] hash | [8..16] rowid | varint(num_keys) | type | payload...
let mut corrupted = buf.clone();
let mut offset = 16;
let (_num_keys, varint_len) = read_varint(&corrupted[offset..]).unwrap();
offset += varint_len;
corrupted[offset] = 0xFF;
let res = HashEntry::deserialize(&corrupted);
assert!(
res.is_err(),
"invalid type tag should be rejected as corrupt"
);
}
fn insert_many_force_spill(ht: &mut HashTable, start: i64, count: i64) {
for i in 0..count {
let rowid = start + i;
let key = vec![Value::Integer(rowid)];
let _ = ht.insert(key, rowid, vec![]);
}
}
#[test]
fn test_hash_table_spill_and_load_partition_round_trip() {
let io = Arc::new(MemoryIO::new());
let config = HashTableConfig {
initial_buckets: 4,
// very small budget to force spill
mem_budget: 1024,
num_keys: 1,
collations: vec![CollationSeq::Binary],
};
let mut ht = HashTable::new(config, io);
// Insert enough fat rows to exceed budget and force spills
insert_many_force_spill(&mut ht, 0, 1024);
let _ = ht.finalize_build().unwrap();
assert!(ht.has_spilled(), "hash table should have spilled");
// Pick a key and find its partition
let probe_key = vec![Value::Integer(10)];
let partition_idx = ht.partition_for_keys(&probe_key);
// Load that partition into memory
match ht.load_spilled_partition(partition_idx).unwrap() {
IOResult::Done(()) => {}
IOResult::IO(_) => panic!("test harness must drive IO completions here"),
}
assert!(
ht.is_partition_loaded(partition_idx),
"partition must be resident after load_spilled_partition"
);
// Probe via partition API
let entry = ht.probe_partition(partition_idx, &probe_key);
assert!(entry.is_some()); // here
assert_eq!(entry.unwrap().rowid, 10);
}
#[test]
fn test_partition_lru_eviction() {
let io = Arc::new(MemoryIO::new());
// tiny mem_budget so only ~1 partition can stay resident
let config = HashTableConfig {
initial_buckets: 4,
mem_budget: 8 * 1024,
num_keys: 1,
collations: vec![CollationSeq::Binary],
};
let mut ht = HashTable::new(config, io);
// Insert two disjoint key ranges that will hash to different partitions
insert_many_force_spill(&mut ht, 0, 256);
insert_many_force_spill(&mut ht, 256, 1024);
let _ = ht.finalize_build().unwrap();
assert!(ht.has_spilled());
let key_a = vec![Value::Integer(1)];
let key_b = vec![Value::Integer(10_001)];
let pa = ht.partition_for_keys(&key_a);
let pb = ht.partition_for_keys(&key_b);
assert_ne!(pa, pb);
// Load partition A
while let IOResult::IO(_) = ht.load_spilled_partition(pa).unwrap() {}
assert!(ht.is_partition_loaded(pa));
// Now load partition B, this should (under tight memory) evict A
let _ = ht.load_spilled_partition(pb).unwrap();
assert!(ht.is_partition_loaded(pb));
// Depending on mem_budget and actual entry sizes, A should now be evicted
// We can't *guarantee* that without knowing exact sizes, but in practice
// this test will detect regressions in the LRU bookkeeping.
assert!(
!ht.is_partition_loaded(pa) || ht.loaded_partitions_mem <= ht.mem_budget,
"either partition A is evicted, or loaded memory is within budget"
);
}
#[test]
fn test_probe_partition_with_duplicate_keys() {
let io = Arc::new(MemoryIO::new());
let config = HashTableConfig {
initial_buckets: 4,
mem_budget: 8 * 1024,
num_keys: 1,
collations: vec![CollationSeq::Binary],
};
let mut ht = HashTable::new(config, io);
let key = vec![Value::Integer(42)];
for i in 0..1024 {
match ht.insert(key.clone(), 1000 + i, vec![]).unwrap() {
IOResult::Done(()) => {}
IOResult::IO(_) => panic!("memory IO"),
}
}
match ht.finalize_build().unwrap() {
IOResult::Done(()) => {}
IOResult::IO(_) => panic!("memory IO"),
}
assert!(ht.has_spilled());
let partition_idx = ht.partition_for_keys(&key);
match ht.load_spilled_partition(partition_idx).unwrap() {
IOResult::Done(()) => {}
IOResult::IO(_) => panic!("memory IO"),
}
assert!(ht.is_partition_loaded(partition_idx));
// First probe should give us the first rowid
let entry1 = ht.probe_partition(partition_idx, &key).unwrap();
assert_eq!(entry1.rowid, 1000);
// Then iterate through the rest with next_match
for i in 0..1023 {
let next = ht.next_match().unwrap();
assert_eq!(next.rowid, 1001 + i);
}
assert!(ht.next_match().is_none());
}
#[test]
fn test_hash_table_with_payload() {
let io = Arc::new(MemoryIO::new());
let config = HashTableConfig {
initial_buckets: 4,
mem_budget: 1024 * 1024,
num_keys: 1,
collations: vec![CollationSeq::Binary],
};
let mut ht = HashTable::new(config, io);
// Insert entries with payload values (simulating cached result columns)
let key1 = vec![Value::Integer(1)];
let payload1 = vec![
Value::Text("Alice".into()),
Value::Integer(30),
Value::Float(1000.50),
];
let _ = ht.insert(key1.clone(), 100, payload1.clone()).unwrap();
let key2 = vec![Value::Integer(2)];
let payload2 = vec![
Value::Text("Bob".into()),
Value::Integer(25),
Value::Float(2000.75),
];
let _ = ht.insert(key2.clone(), 200, payload2.clone()).unwrap();
let _ = ht.finalize_build();
// Probe and verify payload is returned correctly
let result = ht.probe(key1);
assert!(result.is_some());
let entry1 = result.unwrap();
assert_eq!(entry1.rowid, 100);
assert!(entry1.has_payload());
assert_eq!(entry1.payload_values.len(), 3);
assert_eq!(entry1.payload_values[0], Value::Text("Alice".into()));
assert_eq!(entry1.payload_values[1], Value::Integer(30));
assert_eq!(entry1.payload_values[2], Value::Float(1000.50));
let result = ht.probe(key2);
assert!(result.is_some());
let entry2 = result.unwrap();
assert_eq!(entry2.rowid, 200);
assert!(entry2.has_payload());
assert_eq!(entry2.payload_values[0], Value::Text("Bob".into()));
assert_eq!(entry2.payload_values[1], Value::Integer(25));
assert_eq!(entry2.payload_values[2], Value::Float(2000.75));
}
#[test]
fn test_hash_table_payload_with_nulls() {
let io = Arc::new(MemoryIO::new());
let config = HashTableConfig {
initial_buckets: 4,
mem_budget: 1024 * 1024,
num_keys: 1,
collations: vec![CollationSeq::Binary],
};
let mut ht = HashTable::new(config, io);
// Insert entry with NULL values in payload
let key = vec![Value::Integer(1)];
let payload = vec![Value::Null, Value::Text("test".into()), Value::Null];
let _ = ht.insert(key.clone(), 100, payload).unwrap();
let _ = ht.finalize_build();
let result = ht.probe(key);
assert!(result.is_some());
let entry = result.unwrap();
assert_eq!(entry.payload_values.len(), 3);
assert_eq!(entry.payload_values[0], Value::Null);
assert_eq!(entry.payload_values[1], Value::Text("test".into()));
assert_eq!(entry.payload_values[2], Value::Null);
}
#[test]
fn test_null_keys_are_skipped() {
// In SQL, NULL = NULL is false (actually NULL which is falsy).
// Hash joins should skip rows with NULL keys during both insert and probe.
let io = Arc::new(MemoryIO::new());
let config = HashTableConfig {
initial_buckets: 4,
mem_budget: 1024 * 1024,
num_keys: 2,
collations: vec![CollationSeq::Binary, CollationSeq::Binary],
};
let mut ht = HashTable::new(config, io);
// Insert entry with NULL key - should be silently skipped
let null_key = vec![Value::Null, Value::Integer(1)];
let _ = ht.insert(null_key.clone(), 100, vec![]).unwrap();
// Insert entry with non-NULL keys
let valid_key = vec![Value::Integer(1), Value::Integer(2)];
let _ = ht.insert(valid_key.clone(), 200, vec![]).unwrap();
// Insert another entry where second key is NULL
let null_key2 = vec![Value::Integer(1), Value::Null];
let _ = ht.insert(null_key2.clone(), 300, vec![]).unwrap();
let _ = ht.finalize_build();
// Only one entry should be in the table (the one with valid keys)
assert_eq!(ht.num_entries, 1);
// Probing with NULL key should return None
let result = ht.probe(null_key);
assert!(result.is_none());
// Probing with valid key should return the entry
let result = ht.probe(valid_key);
assert!(result.is_some());
assert_eq!(result.unwrap().rowid, 200);
// Probing with NULL in second position should also return None
let result = ht.probe(null_key2);
assert!(result.is_none());
}
#[test]
fn test_hash_table_payload_with_blobs() {
let io = Arc::new(MemoryIO::new());
let config = HashTableConfig {
initial_buckets: 4,
mem_budget: 1024 * 1024,
num_keys: 1,
collations: vec![CollationSeq::Binary],
};
let mut ht = HashTable::new(config, io);
// Insert entry with blob payload
let key = vec![Value::Integer(1)];
let blob_data = vec![0xDE, 0xAD, 0xBE, 0xEF];
let payload = vec![Value::Blob(blob_data.clone()), Value::Integer(42)];
let _ = ht.insert(key.clone(), 100, payload).unwrap();
let _ = ht.finalize_build();
let result = ht.probe(key);
assert!(result.is_some());
let entry = result.unwrap();
assert_eq!(entry.payload_values.len(), 2);
assert_eq!(entry.payload_values[0], Value::Blob(blob_data));
assert_eq!(entry.payload_values[1], Value::Integer(42));
}
#[test]
fn test_hash_table_payload_duplicate_keys() {
let io = Arc::new(MemoryIO::new());
let config = HashTableConfig {
initial_buckets: 4,
mem_budget: 1024 * 1024,
num_keys: 1,
collations: vec![CollationSeq::Binary],
};
let mut ht = HashTable::new(config, io);
// Insert multiple entries with the same key but different payloads
let key = vec![Value::Integer(42)];
let _ = ht
.insert(
key.clone(),
100,
vec![Value::Text("first".into()), Value::Integer(1)],
)
.unwrap();
let _ = ht
.insert(
key.clone(),
200,
vec![Value::Text("second".into()), Value::Integer(2)],
)
.unwrap();
let _ = ht
.insert(
key.clone(),
300,
vec![Value::Text("third".into()), Value::Integer(3)],
)
.unwrap();
let _ = ht.finalize_build();
// First probe should return first match
let result = ht.probe(key);
assert!(result.is_some());
let entry1 = result.unwrap();
assert_eq!(entry1.rowid, 100);
assert_eq!(entry1.payload_values[0], Value::Text("first".into()));
assert_eq!(entry1.payload_values[1], Value::Integer(1));
// next_match should return subsequent matches with their payloads
let entry2 = ht.next_match().unwrap();
assert_eq!(entry2.rowid, 200);
assert_eq!(entry2.payload_values[0], Value::Text("second".into()));
assert_eq!(entry2.payload_values[1], Value::Integer(2));
let entry3 = ht.next_match().unwrap();
assert_eq!(entry3.rowid, 300);
assert_eq!(entry3.payload_values[0], Value::Text("third".into()));
assert_eq!(entry3.payload_values[1], Value::Integer(3));
// No more matches
assert!(ht.next_match().is_none());
}
#[test]
fn test_hash_entry_payload_serialization() {
// Test that payload values survive serialization/deserialization
let entry = HashEntry::new_with_payload(
12345,
vec![Value::Integer(1), Value::Text("key".into())],
100,
vec![
Value::Text("payload_text".into()),
Value::Integer(999),
Value::Float(std::f64::consts::PI),
Value::Null,
Value::Blob(vec![1, 2, 3, 4]),
],
);
let mut buf = Vec::new();
entry.serialize(&mut buf);
let (deserialized, bytes_consumed) = HashEntry::deserialize(&buf).unwrap();
assert_eq!(bytes_consumed, buf.len());
// Verify key values
assert_eq!(deserialized.hash, entry.hash);
assert_eq!(deserialized.rowid, entry.rowid);
assert_eq!(deserialized.key_values.len(), 2);
assert_eq!(deserialized.key_values[0], Value::Integer(1));
assert_eq!(deserialized.key_values[1], Value::Text("key".into()));
// Verify payload values
assert_eq!(deserialized.payload_values.len(), 5);
assert_eq!(
deserialized.payload_values[0],
Value::Text("payload_text".into())
);
assert_eq!(deserialized.payload_values[1], Value::Integer(999));
assert_eq!(
deserialized.payload_values[2],
Value::Float(std::f64::consts::PI)
);
assert_eq!(deserialized.payload_values[3], Value::Null);
assert_eq!(
deserialized.payload_values[4],
Value::Blob(vec![1, 2, 3, 4])
);
}
#[test]
fn test_hash_entry_empty_payload() {
// Test that entries without payload work correctly
let entry = HashEntry::new(12345, vec![Value::Integer(1)], 100);
assert!(!entry.has_payload());
assert!(entry.payload_values.is_empty());
// Serialization should still work
let mut buf = Vec::new();
entry.serialize(&mut buf);
let (deserialized, _) = HashEntry::deserialize(&buf).unwrap();
assert!(!deserialized.has_payload());
assert!(deserialized.payload_values.is_empty());
assert_eq!(deserialized.rowid, 100);
}
#[test]
fn test_hash_entry_size_includes_payload() {
let entry_no_payload = HashEntry::new(12345, vec![Value::Integer(1)], 100);
let entry_with_payload = HashEntry::new_with_payload(
12345,
vec![Value::Integer(1)],
100,
vec![
Value::Text("a]long payload string".into()),
Value::Integer(42),
],
);
// Entry with payload should have larger size
assert!(entry_with_payload.size_bytes() > entry_no_payload.size_bytes());
}
}