Write newlines after reading the file

This makes the prefetches much more effective,
at the cost of one copy_nonoverlapping
This commit is contained in:
Richard Feldman 2023-09-11 15:52:35 -04:00
parent 061d2e5df6
commit af863f54b9
No known key found for this signature in database
GPG key ID: F1F21AA5B1D9E43B

View file

@ -12,6 +12,7 @@
use bumpalo::{self, Bump};
use core::{
alloc::Layout,
mem::{align_of, MaybeUninit},
ptr::{self, NonNull},
};
@ -82,7 +83,15 @@ impl<'a> Src64<'a> {
}
// Safety: we got capacity by rounding up to the nearest 64B
let dest = unsafe { allocate_and_pad_with_newlines(arena, capacity)? }.as_ptr() as *mut u8;
let dest = unsafe { allocate_chunks(arena, capacity)? }.as_ptr() as *mut u8;
// Safety: `dest` has a length of `capacity`, which has been rounded up to a multiple of 64.
unsafe {
let trailing_newlines_needed = capacity - src_len;
// Start writing newlines right after the last of the bytes we got from the file.
write_newlines(dest.add(src_len), trailing_newlines_needed);
};
// Safety: we just allocated `dest` to have len >= src.len(), and they're both u8 arrays.
unsafe {
@ -128,7 +137,7 @@ impl<'a> Src64<'a> {
// Safety: round_up_to_nearest_u64 will give us a capacity that is
// at least 64, and also a multiple of 64.
match unsafe { allocate_and_pad_with_newlines(arena, capacity) } {
match unsafe { allocate_chunks(arena, capacity) } {
Some(buf) => {
// Read bytes equal to file_size into the arena allocation.
//
@ -205,6 +214,46 @@ impl<'a> Src64<'a> {
return Err(FileErr::ReadErr);
}
// Before we write newlines to the last chunk, branchlessly prefetch the first four 64-byte chunks.
// We're about to have a cache miss due to loading the last chunk from main memory (DMA will have
// written it there without having gone through the CPU), and if we don't prefetch here, then we'll
// immediately get a second cache miss when we start traversing the loaded file. The prefetch means
// by the time we finish resolving the first cache miss on the last chunk, continuing with the first
// chunk(s) won't be a cache miss anymore because they'll already be in cache.
//
// We can do further prefetches in the actual tokenization loop.
{
// We know capacity >= 64, so this will never wrap.
let last_chunk_offset = capacity - 64;
// Prefetch the first 64-byte chunk.
prefetch_read(buf, 0);
// Prefetch the second 64-byte chunk, using min() to branchlessly avoid prefetching an address we might not own.
prefetch_read(buf, 64.min(last_chunk_offset));
// Prefetch the third 64-byte chunk, using min() to branchlessly avoid prefetching an address we might not own.
prefetch_read(buf, 128.min(last_chunk_offset));
// Prefetch the fourth 64-byte chunk, using min() to branchlessly avoid prefetching an address we might not own.
prefetch_read(buf, 192.min(last_chunk_offset));
// Further prefetching can happen in the tokenization loop. Now that we've prefetched the first 3 pages,
// we should be able to prefetch the others in the loop with enough time before the tokenizer arrives there.
}
if capacity > file_size {
debug_assert!(capacity - file_size < 64);
debug_assert!(capacity - file_size > 0);
let trailing_newlines_needed = capacity - file_size;
// Safety: `buf_ptr` has a length of `capacity`, which has been rounded up to a multiple of 64.
unsafe {
// Start writing newlines right after the last of the bytes we got from the file.
write_newlines(buf.as_ptr().add(file_size), trailing_newlines_needed);
};
}
// Safety: bytes_ptr came from an allocation of `capacity` bytes, it's had
// newlines filled at the end, and `file_size` bytes written over the rest.
let bytes =
@ -225,7 +274,7 @@ fn round_up_to_nearest_64(num: usize) -> usize {
}
/// Safety: capacity must be a multiple of 64, and must be at least 64.
unsafe fn allocate_and_pad_with_newlines(arena: &Bump, capacity: usize) -> Option<NonNull<u8>> {
unsafe fn allocate_chunks(arena: &Bump, capacity: usize) -> Option<NonNull<u8>> {
// Compare capacity here instead of size because this file limit is based on what we can record row and line
// numbers for, and those can theoretically oveflow on the trailing newlines we may have added.
// This distinction will most likely come up in practice zero times ever, but it could come up in fuzzing.
@ -242,85 +291,60 @@ unsafe fn allocate_and_pad_with_newlines(arena: &Bump, capacity: usize) -> Optio
let layout = unsafe { Layout::from_size_align_unchecked(capacity, Src64::BYTES_ALIGNMENT) };
// We have to use alloc_layout here because we have stricter alignment requirements than normal slices.
let buf_ptr: NonNull<u8> = arena.alloc_layout(layout);
// Branchlessly prefetch the first three 64-byte chunks, and the last chunk. This prevents a double cache miss:
// first a cache miss when we write the newlines to the end, and then a second cache miss when we start
// working on the beginning of the allocation. We can do further prefetches in the actual tokenization loop.
{
// We know capacity >= 64, so this will never wrap.
let last_chunk_offset = capacity - 64;
// Prefetch the last 64-byte chunk. (We do this one first because we'll be writing newlines to it first.)
prefetch_readwrite(buf_ptr, last_chunk_offset);
// Prefetch the first 64-byte chunk. The rest of these only need reading, since we never write to them.
prefetch_read(buf_ptr, 0);
// Prefetch the second 64-byte chunk, using min() to branchlessly avoid prefetching an address we might not own.
prefetch_read(buf_ptr, 64.min(last_chunk_offset));
// Prefetch the third 64-byte chunk, using min() to branchlessly avoid prefetching an address we might not own.
prefetch_read(buf_ptr, 128.min(last_chunk_offset));
// Further prefetching can happen in the tokenization loop. Now that we've prefetched the first 3 pages,
// we should be able to prefetch the others in the loop with enough time before the tokenizer arrives there.
}
// Safety: `buf_ptr` has a length of `capacity`, which has been rounded up to a multiple of 64.
unsafe { fill_last_64_bytes_with_newlines(buf_ptr, capacity) };
Some(buf_ptr)
Some(arena.alloc_layout(layout))
}
/// This is branchless so there can't be mispredictions. We know the buffer's length is a multiple of 64,
/// so we can just always do four SIMD writes and call it a day. (Eight if we don't have SIMD.)
/// This is branchless so there can't be mispredictions.
///
/// Safety: this pointer must have an alignment of at least 64,
/// and the length must be both at least 64 and also a multiple of 64.
unsafe fn fill_last_64_bytes_with_newlines(ptr: NonNull<u8>, len: usize) {
debug_assert_eq!(
ptr.as_ptr() as usize % 16,
0,
"The pointer's alignment must be at least 16."
);
debug_assert_eq!(len % 64, 0, "The buffer's length must be a multiple of 64.");
debug_assert!(len >= 64, "The buffer's length must be at least 64.");
// Safety: this function's docs note that it must be given a slice with at least 64 bytes in it.
let last_64_bytes = ptr.as_ptr().add(len - 64);
unsafe fn write_newlines(dest: *mut u8, len: usize) {
debug_assert!(len <= 64);
#[cfg(target_feature = "sse2")]
{
use core::arch::x86_64::{__m128i, _mm_set1_epi8, _mm_storeu_si128};
let mut buf: MaybeUninit<[__m128i; 4]> = MaybeUninit::uninit();
let newline = _mm_set1_epi8(b'\n' as i8);
let ptr = last_64_bytes as *mut __m128i;
let ptr = buf.as_mut_ptr() as *mut __m128i;
debug_assert_eq!(ptr.align_offset(align_of::<__m128i>()), 0);
_mm_storeu_si128(ptr.add(0), newline);
_mm_storeu_si128(ptr.add(1), newline);
_mm_storeu_si128(ptr.add(2), newline);
_mm_storeu_si128(ptr.add(3), newline);
core::ptr::copy_nonoverlapping(ptr as *const u8, dest.as_ptr(), len);
}
#[cfg(target_feature = "neon")]
{
use core::arch::aarch64::{vdupq_n_s8, vst1q_s8};
use core::arch::aarch64::{int8x16_t, vdupq_n_s8, vst1q_s8};
let mut buf: MaybeUninit<[int8x16_t; 4]> = MaybeUninit::uninit();
let newline = vdupq_n_s8(b'\n' as i8);
let ptr = last_64_bytes as *mut i8;
let ptr = buf.as_mut_ptr() as *mut i8;
debug_assert_eq!(ptr.align_offset(align_of::<int8x16_t>()), 0);
vst1q_s8(ptr.add(0), newline);
vst1q_s8(ptr.add(16), newline);
vst1q_s8(ptr.add(32), newline);
vst1q_s8(ptr.add(48), newline);
core::ptr::copy_nonoverlapping(ptr as *const u8, dest, len);
}
#[cfg(not(any(target_feature = "sse2", target_feature = "neon")))]
{
// We don't have access to SIMD, so do eight 64-bit writes instead of four 128-bit writes.
let mut buf: MaybeUninit<[u64; 8]> = MaybeUninit::uninit();
let newline_repeated = (b'\n' as u64) * 0x0101010101010101;
let ptr = last_64_bytes as *mut u64;
let ptr = buf.as_mut_ptr() as *mut u64;
debug_assert_eq!(ptr.align_offset(align_of::<u64>()), 0);
*ptr.add(0) = newline_repeated;
*ptr.add(1) = newline_repeated;
@ -330,6 +354,8 @@ unsafe fn fill_last_64_bytes_with_newlines(ptr: NonNull<u8>, len: usize) {
*ptr.add(5) = newline_repeated;
*ptr.add(6) = newline_repeated;
*ptr.add(7) = newline_repeated;
core::ptr::copy_nonoverlapping(ptr as *const u8, dest.as_ptr(), len);
}
}
@ -357,30 +383,6 @@ fn prefetch_read<T>(non_null_ptr: NonNull<T>, offset: usize) {
// If we're not on x64 or aarch64, just do nothing!
}
#[inline(always)]
fn prefetch_readwrite<T>(non_null_ptr: NonNull<T>, offset: usize) {
// Use inline asm until this is stabilized:
// https://doc.rust-lang.org/std/intrinsics/fn.prefetch_write_data.html
#[cfg(target_arch = "x86_64")]
unsafe {
core::arch::asm!(
"prefetchw [{}]",
in(reg) non_null_ptr.as_ptr().add(offset)
);
}
#[cfg(target_arch = "aarch64")]
unsafe {
core::arch::asm!(
"prfm PSTL1KEEP, [{}]",
in(reg) non_null_ptr.as_ptr().add(offset)
);
}
// If we're not on x64 or aarch64, just do nothing!
}
#[cfg(test)]
mod src64_tests {
use super::{FileErr, Src64, MAX_ROC_SOURCE_FILE_SIZE};