perf: reduce size of the watch entry (#1190)

* dev: reduce size of the watch entry

* feat: watch.rs changes
This commit is contained in:
Myriad-Dreamin 2025-01-19 12:26:57 +08:00 committed by GitHub
parent 56714675b7
commit 884a4b50e7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 51 additions and 55 deletions

View file

@ -18,14 +18,13 @@ use typst::diag::{EcoString, FileError, FileResult};
use crate::vfs::{
notify::{FileChangeSet, FileSnapshot, FilesystemEvent, NotifyMessage, UpstreamUpdateEvent},
system::SystemAccessModel,
Bytes, PathAccessModel,
PathAccessModel,
};
use tinymist_std::ImmutPath;
type WatcherPair = (RecommendedWatcher, mpsc::UnboundedReceiver<NotifyEvent>);
type NotifyEvent = notify::Result<notify::Event>;
type FileEntry = (/* key */ ImmutPath, /* value */ FileSnapshot);
type NotifyFilePair = FileResult</* content */ Bytes>;
/// The state of a watched file.
///
@ -39,7 +38,7 @@ enum WatchState {
/// stable. So we need to recheck the file after a while.
EmptyOrRemoval {
recheck_at: usize,
payload: NotifyFilePair,
payload: FileSnapshot,
},
}
@ -65,7 +64,7 @@ struct WatchedEntry {
/// The state of the entry.
state: WatchState,
/// Previous content of the file.
prev: Option<NotifyFilePair>,
prev: Option<FileSnapshot>,
/// Previous metadata of the file.
prev_meta: FileResult<std::fs::Metadata>,
}
@ -85,7 +84,7 @@ struct UndeterminedNotifyEvent {
/// The actor that watches files.
/// It is used to watch files and send events to the consumers
#[derive(Debug)]
pub struct NotifyActor {
pub struct NotifyActor<F: FnMut(FilesystemEvent)> {
/// The access model of the actor.
/// We concrete the access model to `SystemAccessModel` for now.
inner: SystemAccessModel,
@ -95,10 +94,6 @@ pub struct NotifyActor {
/// The logical tick of the actor.
logical_tick: usize,
/// Output of the actor.
/// See [`FilesystemEvent`] for more information.
sender: mpsc::UnboundedSender<FilesystemEvent>,
/// Internal channel for recheck events.
undetermined_send: mpsc::UnboundedSender<UndeterminedNotifyEvent>,
undetermined_recv: mpsc::UnboundedReceiver<UndeterminedNotifyEvent>,
@ -106,13 +101,15 @@ pub struct NotifyActor {
/// The hold entries for watching, one entry for per file.
watched_entries: HashMap<ImmutPath, WatchedEntry>,
interrupted_by_events: F,
/// The builtin watcher object.
watcher: Option<WatcherPair>,
}
impl NotifyActor {
impl<F: FnMut(FilesystemEvent) + Send + Sync> NotifyActor<F> {
/// Create a new actor.
fn new(sender: mpsc::UnboundedSender<FilesystemEvent>) -> NotifyActor {
pub fn new(interrupted_by_events: F) -> Self {
let (undetermined_send, undetermined_recv) = mpsc::unbounded_channel();
let (watcher_sender, watcher_receiver) = mpsc::unbounded_channel();
let watcher = log_notify_error(
@ -134,7 +131,7 @@ impl NotifyActor {
lifetime: 1,
logical_tick: 1,
sender,
interrupted_by_events,
undetermined_send,
undetermined_recv,
@ -144,11 +141,6 @@ impl NotifyActor {
}
}
/// Send a filesystem event to remove.
fn send(&mut self, msg: FilesystemEvent) {
log_send_error("fs_event", self.sender.send(msg));
}
/// Get the notify event from the watcher.
async fn get_notify_event(watcher: &mut Option<WatcherPair>) -> Option<NotifyEvent> {
match watcher {
@ -158,14 +150,15 @@ impl NotifyActor {
}
/// Main loop of the actor.
async fn run(mut self, mut inbox: mpsc::UnboundedReceiver<NotifyMessage>) {
pub async fn run(mut self, mut inbox: mpsc::UnboundedReceiver<NotifyMessage>) {
use NotifyMessage::*;
/// The event of the actor.
#[derive(Debug)]
enum ActorEvent {
/// Recheck the notify event.
ReCheck(UndeterminedNotifyEvent),
/// external message to change notifier's state
Message(NotifyMessage),
Message(Option<NotifyMessage>),
/// notify event from builtin watcher
NotifyEvent(NotifyEvent),
}
@ -173,15 +166,9 @@ impl NotifyActor {
'event_loop: loop {
// Get the event from the inbox or the watcher.
let event = tokio::select! {
Some(it) = inbox.recv() => Some(ActorEvent::Message(it)),
Some(it) = Self::get_notify_event(&mut self.watcher) => Some(ActorEvent::NotifyEvent(it)),
Some(it) = self.undetermined_recv.recv() => Some(ActorEvent::ReCheck(it)),
};
// Failed to get the event.
let Some(event) = event else {
log::info!("failed to get event, exiting...");
return;
it = inbox.recv() => ActorEvent::Message(it),
Some(it) = Self::get_notify_event(&mut self.watcher) => ActorEvent::NotifyEvent(it),
Some(it) = self.undetermined_recv.recv() => ActorEvent::ReCheck(it),
};
// Increase the logical tick per event.
@ -190,16 +177,20 @@ impl NotifyActor {
// log::info!("vfs-notify event {event:?}");
// function entries to handle some event
match event {
ActorEvent::Message(NotifyMessage::Settle) => {
ActorEvent::Message(None) => {
log::info!("failed to get event, exiting...");
break 'event_loop;
}
ActorEvent::Message(Some(Settle)) => {
log::info!("NotifyActor: settle event received");
break 'event_loop;
}
ActorEvent::Message(NotifyMessage::UpstreamUpdate(event)) => {
ActorEvent::Message(Some(UpstreamUpdate(event))) => {
self.invalidate_upstream(event);
}
ActorEvent::Message(NotifyMessage::SyncDependency(paths)) => {
ActorEvent::Message(Some(SyncDependency(paths))) => {
if let Some(changeset) = self.update_watches(&paths) {
self.send(FilesystemEvent::Update(changeset));
(self.interrupted_by_events)(FilesystemEvent::Update(changeset));
}
}
ActorEvent::NotifyEvent(event) => {
@ -223,7 +214,7 @@ impl NotifyActor {
let changeset = self.update_watches(&event.invalidates).unwrap_or_default();
// Send the event to the consumer.
self.send(FilesystemEvent::UpstreamUpdate {
(self.interrupted_by_events)(FilesystemEvent::UpstreamUpdate {
changeset,
upstream_event: Some(event),
});
@ -353,7 +344,7 @@ impl NotifyActor {
// Send file updates.
if !changeset.is_empty() {
self.send(FilesystemEvent::Update(changeset));
(self.interrupted_by_events)(FilesystemEvent::Update(changeset));
}
}
@ -403,12 +394,13 @@ impl NotifyActor {
return None;
}
// Check path and content
let mut file = self.inner.content(&path);
// Check meta, path, and content
let mut file = FileSnapshot::from(self.inner.content(&path));
// Check state in fast path: compare state, return None on not sending
// the file change
match (&entry.prev, &mut file) {
match (entry.prev.as_deref(), file.as_mut()) {
// update the content of the entry in the following cases:
// + Case 1: previous content is clear
// + Case 2: previous content is not clear but some error, and the
@ -420,7 +412,7 @@ impl NotifyActor {
// or truncating the file. They are possibly flushing the file
// but not finished yet.
WatchState::Stable => {
if matches!(err, FileError::NotFound(..) | FileError::Other(..)) {
if matches!(err.as_ref(), FileError::NotFound(..) | FileError::Other(..)) {
entry.state = WatchState::EmptyOrRemoval {
recheck_at: self.logical_tick,
payload: file.clone(),
@ -487,7 +479,7 @@ impl NotifyActor {
entry.prev = Some(file.clone());
// Slow path: trigger the file change for consumer
Some((path, file.into()))
Some((path, file))
}
/// Recheck the notify event after a while.
@ -520,12 +512,12 @@ impl NotifyActor {
payload,
} => {
if recheck_at == event.at_logical_tick {
log::debug!("notify event real happened {event:?}, state: {:?}", payload);
log::debug!("notify event real happened {event:?}, state: {payload:?}");
// Send the underlying change to the consumer
let mut changeset = FileChangeSet::default();
changeset.inserts.push((event.path, payload.into()));
self.send(FilesystemEvent::Update(changeset));
changeset.inserts.push((event.path, payload));
(self.interrupted_by_events)(FilesystemEvent::Update(changeset));
}
}
};
@ -548,19 +540,9 @@ fn log_send_error<T>(chan: &'static str, res: Result<(), mpsc::error::SendError<
pub async fn watch_deps(
inbox: mpsc::UnboundedReceiver<NotifyMessage>,
mut interrupted_by_events: impl FnMut(FilesystemEvent),
interrupted_by_events: impl FnMut(FilesystemEvent) + Send + Sync + 'static,
) {
// Setup file watching.
let (tx, mut rx) = mpsc::unbounded_channel();
let actor = NotifyActor::new(tx);
// Watch messages to notify
tokio::spawn(actor.run(inbox));
// Handle events.
log::debug!("start watching files...");
while let Some(event) = rx.recv().await {
interrupted_by_events(event);
}
log::debug!("stop watching files...");
// Watch messages to notify
tokio::spawn(NotifyActor::new(interrupted_by_events).run(inbox));
}