refactor: reorganize tools/test module (#30590)

There are no functional changes in the code, it's just a refactor that
moves code around to makes it easier to land
https://github.com/denoland/deno/pull/30504
This commit is contained in:
Bartek Iwańczuk 2025-09-01 23:13:50 +02:00 committed by GitHub
parent 1ef0d0838e
commit ada1580897
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 449 additions and 358 deletions

View file

@ -127,8 +127,9 @@ fn op_register_test(
column_number,
},
};
let container = state.borrow_mut::<TestContainer>();
container.register(description, function);
state
.borrow_mut::<TestContainer>()
.register(description, function);
ret_buf.copy_from_slice(&(id as u32).to_le_bytes());
Ok(())
}

View file

@ -53,14 +53,13 @@ use crate::colors;
use crate::lsp::ReplLanguageServer;
use crate::npm::CliNpmInstaller;
use crate::resolver::CliResolver;
use crate::tools::test::TestEvent;
use crate::tools::test::TestEventReceiver;
use crate::tools::test::TestEventTracker;
use crate::tools::test::TestFailureFormatOptions;
use crate::tools::test::report_tests;
use crate::tools::test::reporters::PrettyTestReporter;
use crate::tools::test::reporters::TestReporter;
use crate::tools::test::run_tests_for_worker;
use crate::tools::test::send_test_event;
use crate::tools::test::worker_has_tests;
fn comment_source_to_position_range(
@ -462,19 +461,18 @@ impl ReplSession {
self.test_event_receiver.take().unwrap(),
(self.test_reporter_factory)(),
));
let event_tracker =
TestEventTracker::new(self.worker.js_runtime.op_state());
run_tests_for_worker(
&mut self.worker,
&self.main_module,
&Default::default(),
&Default::default(),
&event_tracker,
)
.await
.unwrap();
send_test_event(
&self.worker.js_runtime.op_state(),
TestEvent::ForceEndReport,
)
.unwrap();
event_tracker.force_end_report().unwrap();
self.test_event_receiver = Some(report_tests_handle.await.unwrap().1);
}

View file

@ -13,7 +13,9 @@ use std::io::Write;
use std::num::NonZeroUsize;
use std::path::Path;
use std::path::PathBuf;
use std::rc::Rc;
use std::sync::Arc;
use std::sync::LazyLock;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
@ -40,12 +42,6 @@ use deno_core::futures::future;
use deno_core::futures::stream;
use deno_core::located_script_name;
use deno_core::serde_v8;
use deno_core::stats::RuntimeActivity;
use deno_core::stats::RuntimeActivityDiff;
use deno_core::stats::RuntimeActivityStats;
use deno_core::stats::RuntimeActivityStatsFactory;
use deno_core::stats::RuntimeActivityStatsFilter;
use deno_core::stats::RuntimeActivityType;
use deno_core::unsync::spawn;
use deno_core::unsync::spawn_blocking;
use deno_core::url::Url;
@ -92,6 +88,7 @@ use crate::worker::CreateCustomWorkerError;
mod channel;
pub mod fmt;
pub mod reporters;
mod sanitizers;
pub use channel::TestEventReceiver;
pub use channel::TestEventSender;
@ -112,37 +109,10 @@ use crate::tools::coverage::cover_files;
use crate::tools::coverage::reporter;
use crate::tools::test::channel::ChannelClosedError;
/// How many times we're allowed to spin the event loop before considering something a leak.
const MAX_SANITIZER_LOOP_SPINS: usize = 16;
#[derive(Default)]
struct TopLevelSanitizerStats {
map: HashMap<(RuntimeActivityType, Cow<'static, str>), usize>,
}
fn get_sanitizer_item(
activity: RuntimeActivity,
) -> (RuntimeActivityType, Cow<'static, str>) {
let activity_type = activity.activity();
match activity {
RuntimeActivity::AsyncOp(_, _, name) => (activity_type, name.into()),
RuntimeActivity::Resource(_, _, name) => (activity_type, name.into()),
RuntimeActivity::Interval(_, _) => (activity_type, "".into()),
RuntimeActivity::Timer(_, _) => (activity_type, "".into()),
}
}
fn get_sanitizer_item_ref(
activity: &RuntimeActivity,
) -> (RuntimeActivityType, Cow<'_, str>) {
let activity_type = activity.activity();
match activity {
RuntimeActivity::AsyncOp(_, _, name) => (activity_type, (*name).into()),
RuntimeActivity::Resource(_, _, name) => (activity_type, name.into()),
RuntimeActivity::Interval(_, _) => (activity_type, "".into()),
RuntimeActivity::Timer(_, _) => (activity_type, "".into()),
}
}
static SLOW_TEST_TIMEOUT: LazyLock<u64> = LazyLock::new(|| {
let base_timeout = env::var("DENO_SLOW_TEST_TIMEOUT").unwrap_or_default();
base_timeout.parse().unwrap_or(60).max(1)
});
/// The test mode is used to determine how a specifier is to be tested.
#[derive(Debug, Clone, Eq, PartialEq)]
@ -229,11 +199,12 @@ pub struct TestLocation {
pub column_number: u32,
}
// TODO(Bartlomieju): use named fields instead of tuple
#[derive(Default)]
pub(crate) struct TestContainer(
TestDescriptions,
Vec<v8::Global<v8::Function>>,
);
pub(crate) struct TestContainer {
descriptions: TestDescriptions,
test_functions: Vec<v8::Global<v8::Function>>,
}
impl TestContainer {
pub fn register(
@ -241,12 +212,12 @@ impl TestContainer {
description: TestDescription,
function: v8::Global<v8::Function>,
) {
self.0.tests.insert(description.id, description);
self.1.push(function)
self.descriptions.tests.insert(description.id, description);
self.test_functions.push(function)
}
pub fn is_empty(&self) -> bool {
self.1.is_empty()
self.test_functions.is_empty()
}
}
@ -687,11 +658,9 @@ async fn configure_main_worker(
let check_res =
|res: Result<(), CoreError>| match res.map_err(|err| err.into_kind()) {
Ok(()) => Ok(()),
Err(CoreErrorKind::Js(err)) => send_test_event(
&op_state,
TestEvent::UncaughtError(specifier.to_string(), Box::new(err)),
)
.map_err(|e| CoreErrorKind::JsBox(JsErrorBox::from_err(e)).into_box()),
Err(CoreErrorKind::Js(err)) => TestEventTracker::new(op_state.clone())
.uncaught_error(specifier.to_string(), err)
.map_err(|e| CoreErrorKind::JsBox(JsErrorBox::from_err(e)).into_box()),
Err(err) => Err(err.into_box()),
};
@ -728,12 +697,14 @@ pub async fn test_specifier(
jupyter_channel.0,
)
.await?;
let event_tracker = TestEventTracker::new(worker.js_runtime.op_state());
match test_specifier_inner(
&mut worker,
coverage_collector,
specifier.clone(),
fail_fast_tracker,
&event_tracker,
options,
)
.await
@ -741,10 +712,7 @@ pub async fn test_specifier(
Ok(()) => Ok(()),
Err(TestSpecifierError::Core(err)) => match err.into_kind() {
CoreErrorKind::Js(err) => {
send_test_event(
&worker.js_runtime.op_state(),
TestEvent::UncaughtError(specifier.to_string(), Box::new(err)),
)?;
event_tracker.uncaught_error(specifier.to_string(), err)?;
Ok(())
}
err => Err(err.into_box().into()),
@ -765,12 +733,12 @@ pub enum TestSpecifierError {
/// Test a single specifier as documentation containing test programs, an executable test module or
/// both.
#[allow(clippy::too_many_arguments)]
async fn test_specifier_inner(
worker: &mut MainWorker,
mut coverage_collector: Option<CoverageCollector>,
specifier: ModuleSpecifier,
fail_fast_tracker: FailFastTracker,
event_tracker: &TestEventTracker,
options: TestSpecifierOptions,
) -> Result<(), TestSpecifierError> {
// Ensure that there are no pending exceptions before we start running tests
@ -780,8 +748,14 @@ async fn test_specifier_inner(
.dispatch_load_event()
.map_err(|e| CoreErrorKind::Js(e).into_box())?;
run_tests_for_worker(worker, &specifier, &options, &fail_fast_tracker)
.await?;
run_tests_for_worker(
worker,
&specifier,
&options,
&fail_fast_tracker,
event_tracker,
)
.await?;
// Ignore `defaultPrevented` of the `beforeunload` event. We don't allow the
// event loop to continue beyond what's needed to await results.
@ -822,6 +796,18 @@ pub fn worker_has_tests(worker: &mut MainWorker) -> bool {
!state.borrow::<TestContainer>().is_empty()
}
// Each test needs a fresh reqwest connection pool to avoid inter-test weirdness with connections
// failing. If we don't do this, a connection to a test server we just tore down might be re-used in
// the next test.
// TODO(mmastrac): this should be some sort of callback that we can implement for any subsystem
pub fn worker_prepare_for_test(worker: &mut MainWorker) {
worker
.js_runtime
.op_state()
.borrow_mut()
.try_take::<deno_runtime::deno_fetch::Client>();
}
/// Yields to tokio to allow async work to process, and then polls
/// the event loop once.
#[must_use = "The event loop result should be checked"]
@ -841,16 +827,6 @@ pub async fn poll_event_loop(worker: &mut MainWorker) -> Result<(), CoreError> {
.await
}
pub fn send_test_event(
op_state: &RefCell<OpState>,
event: TestEvent,
) -> Result<(), ChannelClosedError> {
op_state
.borrow_mut()
.borrow_mut::<TestEventSender>()
.send(event)
}
#[derive(Debug, thiserror::Error, deno_error::JsError)]
pub enum RunTestsForWorkerErr {
#[class(inherit)]
@ -864,49 +840,68 @@ pub enum RunTestsForWorkerErr {
SerdeV8(#[from] serde_v8::Error),
}
async fn slow_test_watchdog(event_tracker: TestEventTracker, test_id: usize) {
// The slow test warning should pop up every DENO_SLOW_TEST_TIMEOUT*(2**n) seconds,
// with a duration that is doubling each time. So for a warning time of 60s,
// we should get a warning at 60s, 120s, 240s, etc.
let base_timeout = *SLOW_TEST_TIMEOUT;
let mut multiplier = 1;
let mut elapsed = 0;
loop {
tokio::time::sleep(Duration::from_secs(
base_timeout * (multiplier - elapsed),
))
.await;
if event_tracker
.slow(test_id, Duration::from_secs(base_timeout * multiplier))
.is_err()
{
break;
}
multiplier *= 2;
elapsed += 1;
}
}
pub async fn run_tests_for_worker(
worker: &mut MainWorker,
specifier: &ModuleSpecifier,
options: &TestSpecifierOptions,
fail_fast_tracker: &FailFastTracker,
event_tracker: &TestEventTracker,
) -> Result<(), RunTestsForWorkerErr> {
let state_rc = worker.js_runtime.op_state();
// Take whatever tests have been registered
let TestContainer(tests, test_functions) =
let container =
std::mem::take(&mut *state_rc.borrow_mut().borrow_mut::<TestContainer>());
let tests: Arc<TestDescriptions> = tests.into();
send_test_event(&state_rc, TestEvent::Register(tests.clone()))?;
let descriptions = Arc::new(container.descriptions);
event_tracker.register(descriptions.clone())?;
let res = run_tests_for_worker_inner(
worker,
specifier,
tests,
test_functions,
descriptions,
container.test_functions,
options,
event_tracker,
fail_fast_tracker,
)
.await;
_ = send_test_event(&state_rc, TestEvent::Completed);
_ = event_tracker.completed();
res
}
async fn run_tests_for_worker_inner(
worker: &mut MainWorker,
specifier: &ModuleSpecifier,
tests: Arc<TestDescriptions>,
fn compute_tests_to_run(
descs: &TestDescriptions,
test_functions: Vec<v8::Global<v8::Function>>,
options: &TestSpecifierOptions,
fail_fast_tracker: &FailFastTracker,
) -> Result<(), RunTestsForWorkerErr> {
let unfiltered = tests.len();
let state_rc = worker.js_runtime.op_state();
// Build the test plan in a single pass
let mut tests_to_run = Vec::with_capacity(tests.len());
filter: TestFilter,
) -> (Vec<(&TestDescription, v8::Global<v8::Function>)>, bool) {
let mut tests_to_run = Vec::with_capacity(descs.len());
let mut used_only = false;
for ((_, d), f) in tests.tests.iter().zip(test_functions) {
if !options.filter.includes(&d.name) {
for ((_, d), f) in descs.tests.iter().zip(test_functions) {
if !filter.includes(&d.name) {
continue;
}
@ -923,88 +918,53 @@ async fn run_tests_for_worker_inner(
}
tests_to_run.push((d, f));
}
(tests_to_run, used_only)
}
async fn run_tests_for_worker_inner(
worker: &mut MainWorker,
specifier: &ModuleSpecifier,
descs: Arc<TestDescriptions>,
test_functions: Vec<v8::Global<v8::Function>>,
options: &TestSpecifierOptions,
event_tracker: &TestEventTracker,
fail_fast_tracker: &FailFastTracker,
) -> Result<(), RunTestsForWorkerErr> {
let unfiltered = descs.len();
let (mut tests_to_run, used_only) =
compute_tests_to_run(&descs, test_functions, options.filter.clone());
if let Some(seed) = options.shuffle {
tests_to_run.shuffle(&mut SmallRng::seed_from_u64(seed));
}
send_test_event(
&state_rc,
TestEvent::Plan(TestPlan {
origin: specifier.to_string(),
total: tests_to_run.len(),
filtered_out: unfiltered - tests_to_run.len(),
used_only,
}),
)?;
event_tracker.plan(TestPlan {
origin: specifier.to_string(),
total: tests_to_run.len(),
filtered_out: unfiltered - tests_to_run.len(),
used_only,
})?;
let mut had_uncaught_error = false;
let stats = worker.js_runtime.runtime_activity_stats_factory();
let ops = worker.js_runtime.op_names();
// These particular ops may start and stop independently of tests, so we just filter them out
// completely.
let op_id_host_recv_message = ops
.iter()
.position(|op| *op == "op_host_recv_message")
.unwrap();
let op_id_host_recv_ctrl = ops
.iter()
.position(|op| *op == "op_host_recv_ctrl")
.unwrap();
// For consistency between tests with and without sanitizers, we _always_ include
// the actual sanitizer capture before and after a test, but a test that ignores resource
// or op sanitization simply doesn't throw if one of these constraints is violated.
let mut filter = RuntimeActivityStatsFilter::default();
filter = filter.with_resources();
filter = filter.with_ops();
filter = filter.with_timers();
filter = filter.omit_op(op_id_host_recv_ctrl as _);
filter = filter.omit_op(op_id_host_recv_message as _);
// Count the top-level stats so we can filter them out if they complete and restart within
// a test.
let top_level_stats = stats.clone().capture(&filter);
let mut top_level = TopLevelSanitizerStats::default();
for activity in top_level_stats.dump().active {
top_level
.map
.entry(get_sanitizer_item(activity))
.and_modify(|n| *n += 1)
.or_insert(1);
}
let sanitizer_helper = sanitizers::create_test_sanitizer_helper(worker);
for (desc, function) in tests_to_run.into_iter() {
worker_prepare_for_test(worker);
if fail_fast_tracker.should_stop() {
break;
}
// Each test needs a fresh reqwest connection pool to avoid inter-test weirdness with connections
// failing. If we don't do this, a connection to a test server we just tore down might be re-used in
// the next test.
// TODO(mmastrac): this should be some sort of callback that we can implement for any subsystem
worker
.js_runtime
.op_state()
.borrow_mut()
.try_take::<deno_runtime::deno_fetch::Client>();
if desc.ignore {
send_test_event(
&state_rc,
TestEvent::Result(desc.id, TestResult::Ignored, 0),
)?;
event_tracker.ignored(desc)?;
continue;
}
if had_uncaught_error {
send_test_event(
&state_rc,
TestEvent::Result(desc.id, TestResult::Cancelled, 0),
)?;
event_tracker.cancelled(desc)?;
continue;
}
send_test_event(&state_rc, TestEvent::Wait(desc.id))?;
event_tracker.wait(desc)?;
// Poll event loop once, to allow all ops that are already resolved, but haven't
// responded to settle.
@ -1013,41 +973,13 @@ async fn run_tests_for_worker_inner(
poll_event_loop(worker).await?;
// We always capture stats, regardless of sanitization state
let before = stats.clone().capture(&filter);
let before_test_stats = sanitizer_helper.capture_stats();
let earlier = Instant::now();
let call = worker.js_runtime.call(&function);
let slow_state_rc = state_rc.clone();
let slow_test_id = desc.id;
let slow_test_warning = spawn(async move {
// The slow test warning should pop up every DENO_SLOW_TEST_TIMEOUT*(2**n) seconds,
// with a duration that is doubling each time. So for a warning time of 60s,
// we should get a warning at 60s, 120s, 240s, etc.
let base_timeout = env::var("DENO_SLOW_TEST_TIMEOUT").unwrap_or_default();
let base_timeout = base_timeout.parse().unwrap_or(60).max(1);
let mut multiplier = 1;
let mut elapsed = 0;
loop {
tokio::time::sleep(Duration::from_secs(
base_timeout * (multiplier - elapsed),
))
.await;
if send_test_event(
&slow_state_rc,
TestEvent::Slow(
slow_test_id,
Duration::from_secs(base_timeout * multiplier).as_millis() as _,
),
)
.is_err()
{
break;
}
multiplier *= 2;
elapsed += 1;
}
});
let slow_test_warning =
spawn(slow_test_watchdog(event_tracker.clone(), desc.id));
let result = worker
.js_runtime
@ -1058,15 +990,9 @@ async fn run_tests_for_worker_inner(
Ok(r) => r,
Err(error) => match error.into_kind() {
CoreErrorKind::Js(js_error) => {
send_test_event(
&state_rc,
TestEvent::UncaughtError(specifier.to_string(), Box::new(js_error)),
)?;
event_tracker.uncaught_error(specifier.to_string(), js_error)?;
fail_fast_tracker.add_failure();
send_test_event(
&state_rc,
TestEvent::Result(desc.id, TestResult::Cancelled, 0),
)?;
event_tracker.cancelled(desc)?;
had_uncaught_error = true;
continue;
}
@ -1082,21 +1008,15 @@ async fn run_tests_for_worker_inner(
};
if matches!(result, TestResult::Failed(_)) {
fail_fast_tracker.add_failure();
let elapsed = earlier.elapsed().as_millis();
send_test_event(
&state_rc,
TestEvent::Result(desc.id, result, elapsed as u64),
)?;
event_tracker.result(desc, result, earlier.elapsed())?;
continue;
}
// Await activity stabilization
if let Some(diff) = wait_for_activity_to_stabilize(
if let Some(diff) = sanitizers::wait_for_activity_to_stabilize(
worker,
&stats,
&filter,
&top_level,
before,
&sanitizer_helper,
before_test_stats,
desc.sanitize_ops,
desc.sanitize_resources,
)
@ -1106,157 +1026,20 @@ async fn run_tests_for_worker_inner(
if !formatted.is_empty() {
let failure = TestFailure::Leaked(formatted, trailer_notes);
fail_fast_tracker.add_failure();
let elapsed = earlier.elapsed().as_millis();
send_test_event(
&state_rc,
TestEvent::Result(
desc.id,
TestResult::Failed(failure),
elapsed as u64,
),
event_tracker.result(
desc,
TestResult::Failed(failure),
earlier.elapsed(),
)?;
continue;
}
}
let elapsed = earlier.elapsed().as_millis();
send_test_event(
&state_rc,
TestEvent::Result(desc.id, result, elapsed as u64),
)?;
event_tracker.result(desc, result, earlier.elapsed())?;
}
Ok(())
}
/// The sanitizer must ignore ops, resources and timers that were started at the top-level, but
/// completed and restarted, replacing themselves with the same "thing". For example, if you run a
/// `Deno.serve` server at the top level and make fetch requests to it during the test, those ops
/// should not count as completed during the test because they are immediately replaced.
fn is_empty(
top_level: &TopLevelSanitizerStats,
diff: &RuntimeActivityDiff,
) -> bool {
// If the diff is empty, return empty
if diff.is_empty() {
return true;
}
// If the # of appeared != # of disappeared, we can exit fast with not empty
if diff.appeared.len() != diff.disappeared.len() {
return false;
}
// If there are no top-level ops and !diff.is_empty(), we can exit fast with not empty
if top_level.map.is_empty() {
return false;
}
// Otherwise we need to calculate replacement for top-level stats. Sanitizers will not fire
// if an op, resource or timer is replaced and has a corresponding top-level op.
let mut map = HashMap::new();
for item in &diff.appeared {
let item = get_sanitizer_item_ref(item);
let Some(n1) = top_level.map.get(&item) else {
return false;
};
let n2 = map.entry(item).and_modify(|n| *n += 1).or_insert(1);
// If more ops appeared than were created at the top-level, return false
if *n2 > *n1 {
return false;
}
}
// We know that we replaced no more things than were created at the top-level. So now we just want
// to make sure that whatever thing was created has a corresponding disappearance record.
for item in &diff.disappeared {
let item = get_sanitizer_item_ref(item);
// If more things of this type disappeared than appeared, return false
let Some(n1) = map.get_mut(&item) else {
return false;
};
*n1 -= 1;
if *n1 == 0 {
map.remove(&item);
}
}
// If everything is accounted for, we are empty
map.is_empty()
}
async fn wait_for_activity_to_stabilize(
worker: &mut MainWorker,
stats: &RuntimeActivityStatsFactory,
filter: &RuntimeActivityStatsFilter,
top_level: &TopLevelSanitizerStats,
before: RuntimeActivityStats,
sanitize_ops: bool,
sanitize_resources: bool,
) -> Result<Option<RuntimeActivityDiff>, CoreError> {
// First, check to see if there's any diff at all. If not, just continue.
let after = stats.clone().capture(filter);
let mut diff = RuntimeActivityStats::diff(&before, &after);
if is_empty(top_level, &diff) {
// No activity, so we return early
return Ok(None);
}
// We allow for up to MAX_SANITIZER_LOOP_SPINS to get to a point where there is no difference.
// TODO(mmastrac): We could be much smarter about this if we had the concept of "progress" in
// an event loop tick. Ideally we'd be able to tell if we were spinning and doing nothing, or
// spinning and resolving ops.
for _ in 0..MAX_SANITIZER_LOOP_SPINS {
// There was a diff, so let the event loop run once
poll_event_loop(worker).await?;
let after = stats.clone().capture(filter);
diff = RuntimeActivityStats::diff(&before, &after);
if is_empty(top_level, &diff) {
return Ok(None);
}
}
if !sanitize_ops {
diff
.appeared
.retain(|activity| !matches!(activity, RuntimeActivity::AsyncOp(..)));
diff
.disappeared
.retain(|activity| !matches!(activity, RuntimeActivity::AsyncOp(..)));
}
if !sanitize_resources {
diff
.appeared
.retain(|activity| !matches!(activity, RuntimeActivity::Resource(..)));
diff
.disappeared
.retain(|activity| !matches!(activity, RuntimeActivity::Resource(..)));
}
// Since we don't have an option to disable timer sanitization, we use sanitize_ops == false &&
// sanitize_resources == false to disable those.
if !sanitize_ops && !sanitize_resources {
diff.appeared.retain(|activity| {
!matches!(
activity,
RuntimeActivity::Timer(..) | RuntimeActivity::Interval(..)
)
});
diff.disappeared.retain(|activity| {
!matches!(
activity,
RuntimeActivity::Timer(..) | RuntimeActivity::Interval(..)
)
});
}
Ok(if is_empty(top_level, &diff) {
None
} else {
Some(diff)
})
}
static HAS_TEST_RUN_SIGINT_HANDLER: AtomicBool = AtomicBool::new(false);
/// Test a collection of specifiers with test modes concurrently.
@ -1950,6 +1733,88 @@ fn get_target_specifiers(
.collect()
}
#[derive(Clone)]
pub struct TestEventTracker {
op_state: Rc<RefCell<OpState>>,
}
impl TestEventTracker {
pub fn new(op_state: Rc<RefCell<OpState>>) -> Self {
Self { op_state }
}
fn send_event(&self, event: TestEvent) -> Result<(), ChannelClosedError> {
self
.op_state
.borrow_mut()
.borrow_mut::<TestEventSender>()
.send(event)
}
fn slow(
&self,
test_id: usize,
duration: Duration,
) -> Result<(), ChannelClosedError> {
self.send_event(TestEvent::Slow(test_id, duration.as_millis() as _))
}
fn wait(&self, desc: &TestDescription) -> Result<(), ChannelClosedError> {
self.send_event(TestEvent::Wait(desc.id))
}
fn ignored(&self, desc: &TestDescription) -> Result<(), ChannelClosedError> {
self.send_event(TestEvent::Result(desc.id, TestResult::Ignored, 0))
}
fn cancelled(
&self,
desc: &TestDescription,
) -> Result<(), ChannelClosedError> {
self.send_event(TestEvent::Result(desc.id, TestResult::Cancelled, 0))
}
fn register(
&self,
descriptions: Arc<TestDescriptions>,
) -> Result<(), ChannelClosedError> {
self.send_event(TestEvent::Register(descriptions))
}
fn completed(&self) -> Result<(), ChannelClosedError> {
self.send_event(TestEvent::Completed)
}
fn uncaught_error(
&self,
specifier: String,
error: JsError,
) -> Result<(), ChannelClosedError> {
self.send_event(TestEvent::UncaughtError(specifier, Box::new(error)))
}
fn plan(&self, plan: TestPlan) -> Result<(), ChannelClosedError> {
self.send_event(TestEvent::Plan(plan))
}
fn result(
&self,
desc: &TestDescription,
test_result: TestResult,
duration: Duration,
) -> Result<(), ChannelClosedError> {
self.send_event(TestEvent::Result(
desc.id,
test_result,
duration.as_millis() as u64,
))
}
pub(crate) fn force_end_report(&self) -> Result<(), ChannelClosedError> {
self.send_event(TestEvent::ForceEndReport)
}
}
/// Tracks failures for the `--fail-fast` argument in
/// order to tell when to stop running tests.
#[derive(Clone, Default)]
@ -1966,15 +1831,10 @@ impl FailFastTracker {
}
}
pub fn add_failure(&self) -> bool {
if let Some(max_count) = &self.max_count {
self
.failure_count
.fetch_add(1, std::sync::atomic::Ordering::SeqCst)
>= *max_count
} else {
false
}
pub fn add_failure(&self) {
self
.failure_count
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
}
pub fn should_stop(&self) -> bool {

View file

@ -0,0 +1,232 @@
// Copyright 2018-2025 the Deno authors. MIT license.
use std::borrow::Cow;
use std::collections::HashMap;
use deno_core::error::CoreError;
use deno_core::stats::RuntimeActivity;
use deno_core::stats::RuntimeActivityDiff;
use deno_core::stats::RuntimeActivityStats;
use deno_core::stats::RuntimeActivityStatsFactory;
use deno_core::stats::RuntimeActivityStatsFilter;
use deno_core::stats::RuntimeActivityType;
use deno_runtime::worker::MainWorker;
use super::poll_event_loop;
/// How many times we're allowed to spin the event loop before considering something a leak.
const MAX_SANITIZER_LOOP_SPINS: usize = 16;
#[derive(Default)]
struct TopLevelSanitizerStats {
map: HashMap<(RuntimeActivityType, Cow<'static, str>), usize>,
}
fn get_sanitizer_item(
activity: RuntimeActivity,
) -> (RuntimeActivityType, Cow<'static, str>) {
let activity_type = activity.activity();
match activity {
RuntimeActivity::AsyncOp(_, _, name) => (activity_type, name.into()),
RuntimeActivity::Resource(_, _, name) => (activity_type, name.into()),
RuntimeActivity::Interval(_, _) => (activity_type, "".into()),
RuntimeActivity::Timer(_, _) => (activity_type, "".into()),
}
}
fn get_sanitizer_item_ref(
activity: &RuntimeActivity,
) -> (RuntimeActivityType, Cow<'_, str>) {
let activity_type = activity.activity();
match activity {
RuntimeActivity::AsyncOp(_, _, name) => (activity_type, (*name).into()),
RuntimeActivity::Resource(_, _, name) => (activity_type, name.into()),
RuntimeActivity::Interval(_, _) => (activity_type, "".into()),
RuntimeActivity::Timer(_, _) => (activity_type, "".into()),
}
}
pub struct TestSanitizerHelper {
activity_stats: RuntimeActivityStatsFactory,
activity_filter: RuntimeActivityStatsFilter,
top_level_sanitizer_stats: TopLevelSanitizerStats,
}
impl TestSanitizerHelper {
pub fn capture_stats(&self) -> RuntimeActivityStats {
self.activity_stats.clone().capture(&self.activity_filter)
}
}
pub fn create_test_sanitizer_helper(
worker: &mut MainWorker,
) -> TestSanitizerHelper {
let stats = worker.js_runtime.runtime_activity_stats_factory();
let ops = worker.js_runtime.op_names();
// These particular ops may start and stop independently of tests, so we just filter them out
// completely.
let op_id_host_recv_message = ops
.iter()
.position(|op| *op == "op_host_recv_message")
.unwrap();
let op_id_host_recv_ctrl = ops
.iter()
.position(|op| *op == "op_host_recv_ctrl")
.unwrap();
// For consistency between tests with and without sanitizers, we _always_ include
// the actual sanitizer capture before and after a test, but a test that ignores resource
// or op sanitization simply doesn't throw if one of these constraints is violated.
let mut filter = RuntimeActivityStatsFilter::default();
filter = filter.with_resources();
filter = filter.with_ops();
filter = filter.with_timers();
filter = filter.omit_op(op_id_host_recv_ctrl as _);
filter = filter.omit_op(op_id_host_recv_message as _);
// Count the top-level stats so we can filter them out if they complete and restart within
// a test.
let top_level_stats = stats.clone().capture(&filter);
let mut top_level = TopLevelSanitizerStats::default();
for activity in top_level_stats.dump().active {
top_level
.map
.entry(get_sanitizer_item(activity))
.and_modify(|n| *n += 1)
.or_insert(1);
}
TestSanitizerHelper {
activity_stats: stats,
activity_filter: filter,
top_level_sanitizer_stats: top_level,
}
}
/// The sanitizer must ignore ops, resources and timers that were started at the top-level, but
/// completed and restarted, replacing themselves with the same "thing". For example, if you run a
/// `Deno.serve` server at the top level and make fetch requests to it during the test, those ops
/// should not count as completed during the test because they are immediately replaced.
fn is_empty(
top_level: &TopLevelSanitizerStats,
diff: &RuntimeActivityDiff,
) -> bool {
// If the diff is empty, return empty
if diff.is_empty() {
return true;
}
// If the # of appeared != # of disappeared, we can exit fast with not empty
if diff.appeared.len() != diff.disappeared.len() {
return false;
}
// If there are no top-level ops and !diff.is_empty(), we can exit fast with not empty
if top_level.map.is_empty() {
return false;
}
// Otherwise we need to calculate replacement for top-level stats. Sanitizers will not fire
// if an op, resource or timer is replaced and has a corresponding top-level op.
let mut map = HashMap::new();
for item in &diff.appeared {
let item = get_sanitizer_item_ref(item);
let Some(n1) = top_level.map.get(&item) else {
return false;
};
let n2 = map.entry(item).and_modify(|n| *n += 1).or_insert(1);
// If more ops appeared than were created at the top-level, return false
if *n2 > *n1 {
return false;
}
}
// We know that we replaced no more things than were created at the top-level. So now we just want
// to make sure that whatever thing was created has a corresponding disappearance record.
for item in &diff.disappeared {
let item = get_sanitizer_item_ref(item);
// If more things of this type disappeared than appeared, return false
let Some(n1) = map.get_mut(&item) else {
return false;
};
*n1 -= 1;
if *n1 == 0 {
map.remove(&item);
}
}
// If everything is accounted for, we are empty
map.is_empty()
}
pub async fn wait_for_activity_to_stabilize(
worker: &mut MainWorker,
helper: &TestSanitizerHelper,
before_test_stats: RuntimeActivityStats,
sanitize_ops: bool,
sanitize_resources: bool,
) -> Result<Option<RuntimeActivityDiff>, CoreError> {
// First, check to see if there's any diff at all. If not, just continue.
let after_test_stats = helper.capture_stats();
let mut diff =
RuntimeActivityStats::diff(&before_test_stats, &after_test_stats);
if is_empty(&helper.top_level_sanitizer_stats, &diff) {
// No activity, so we return early
return Ok(None);
}
// We allow for up to MAX_SANITIZER_LOOP_SPINS to get to a point where there is no difference.
// TODO(mmastrac): We could be much smarter about this if we had the concept of "progress" in
// an event loop tick. Ideally we'd be able to tell if we were spinning and doing nothing, or
// spinning and resolving ops.
for _ in 0..MAX_SANITIZER_LOOP_SPINS {
// There was a diff, so let the event loop run once
poll_event_loop(worker).await?;
let after_test_stats = helper.capture_stats();
diff = RuntimeActivityStats::diff(&before_test_stats, &after_test_stats);
if is_empty(&helper.top_level_sanitizer_stats, &diff) {
return Ok(None);
}
}
if !sanitize_ops {
diff
.appeared
.retain(|activity| !matches!(activity, RuntimeActivity::AsyncOp(..)));
diff
.disappeared
.retain(|activity| !matches!(activity, RuntimeActivity::AsyncOp(..)));
}
if !sanitize_resources {
diff
.appeared
.retain(|activity| !matches!(activity, RuntimeActivity::Resource(..)));
diff
.disappeared
.retain(|activity| !matches!(activity, RuntimeActivity::Resource(..)));
}
// Since we don't have an option to disable timer sanitization, we use sanitize_ops == false &&
// sanitize_resources == false to disable those.
if !sanitize_ops && !sanitize_resources {
diff.appeared.retain(|activity| {
!matches!(
activity,
RuntimeActivity::Timer(..) | RuntimeActivity::Interval(..)
)
});
diff.disappeared.retain(|activity| {
!matches!(
activity,
RuntimeActivity::Timer(..) | RuntimeActivity::Interval(..)
)
});
}
Ok(if is_empty(&helper.top_level_sanitizer_stats, &diff) {
None
} else {
Some(diff)
})
}