ruff server sets worker thread pool size based on the user's available cores (#10399)

## Summary

Fixes #10369.

## Test Plan

N/A
This commit is contained in:
Jane Lewis 2024-03-18 14:06:59 -07:00 committed by GitHub
parent 1a2f9f082d
commit d9f1cdbea1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 62 additions and 17 deletions

11
Cargo.lock generated
View file

@ -1520,6 +1520,16 @@ dependencies = [
"autocfg",
]
[[package]]
name = "num_cpus"
version = "1.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43"
dependencies = [
"hermit-abi",
"libc",
]
[[package]]
name = "number_prefix"
version = "0.4.0"
@ -2031,6 +2041,7 @@ dependencies = [
"log",
"mimalloc",
"notify",
"num_cpus",
"path-absolutize",
"rayon",
"regex",

View file

@ -65,6 +65,7 @@ memchr = { version = "2.7.1" }
mimalloc = { version = "0.1.39" }
natord = { version = "1.0.9" }
notify = { version = "6.1.1" }
num_cpus = { version = "1.16.0" }
once_cell = { version = "1.19.0" }
path-absolutize = { version = "3.1.1" }
pathdiff = { version = "0.2.1" }

View file

@ -41,6 +41,7 @@ is-macro = { workspace = true }
itertools = { workspace = true }
log = { workspace = true }
notify = { workspace = true }
num_cpus = { workspace = true }
path-absolutize = { workspace = true, features = ["once_cell_cache"] }
rayon = { workspace = true }
regex = { workspace = true }

View file

@ -496,7 +496,7 @@ pub struct FormatCommand {
pub range: Option<FormatRange>,
}
#[derive(Clone, Debug, clap::Parser)]
#[derive(Copy, Clone, Debug, clap::Parser)]
pub struct ServerCommand {
/// Enable preview mode; required for regular operation
#[arg(long)]

View file

@ -1,3 +1,5 @@
use std::num::NonZeroUsize;
use crate::ExitStatus;
use anyhow::Result;
use ruff_linter::logging::LogLevel;
@ -9,7 +11,11 @@ use tracing_subscriber::{
};
use tracing_tree::time::Uptime;
pub(crate) fn run_server(preview: bool, log_level: LogLevel) -> Result<ExitStatus> {
pub(crate) fn run_server(
preview: bool,
worker_threads: NonZeroUsize,
log_level: LogLevel,
) -> Result<ExitStatus> {
if !preview {
tracing::error!("--preview needs to be provided as a command line argument while the server is still unstable.\nFor example: `ruff server --preview`");
return Ok(ExitStatus::Error);
@ -33,7 +39,7 @@ pub(crate) fn run_server(preview: bool, log_level: LogLevel) -> Result<ExitStatu
tracing::subscriber::set_global_default(subscriber)?;
let server = Server::new()?;
let server = Server::new(worker_threads)?;
server.run().map(|()| ExitStatus::Success)
}

View file

@ -2,6 +2,7 @@
use std::fs::File;
use std::io::{self, stdout, BufWriter, Write};
use std::num::NonZeroUsize;
use std::path::{Path, PathBuf};
use std::process::ExitCode;
use std::sync::mpsc::channel;
@ -204,10 +205,15 @@ fn format(args: FormatCommand, global_options: GlobalConfigArgs) -> Result<ExitS
}
}
#[allow(clippy::needless_pass_by_value)] // TODO: remove once we start taking arguments from here
fn server(args: ServerCommand, log_level: LogLevel) -> Result<ExitStatus> {
let ServerCommand { preview } = args;
commands::server::run_server(preview, log_level)
// by default, we set the number of worker threads to `num_cpus`, with a maximum of 4.
let worker_threads = num_cpus::get().max(4);
commands::server::run_server(
preview,
NonZeroUsize::try_from(worker_threads).expect("a non-zero worker thread count"),
log_level,
)
}
pub fn check(args: CheckCommand, global_options: GlobalConfigArgs) -> Result<ExitStatus> {

View file

@ -1,5 +1,7 @@
//! Scheduling, I/O, and API endpoints.
use std::num::NonZeroUsize;
use lsp::Connection;
use lsp_server as lsp;
use lsp_types as types;
@ -27,11 +29,12 @@ pub(crate) type Result<T> = std::result::Result<T, api::Error>;
pub struct Server {
conn: lsp::Connection,
threads: lsp::IoThreads,
worker_threads: NonZeroUsize,
session: Session,
}
impl Server {
pub fn new() -> crate::Result<Self> {
pub fn new(worker_threads: NonZeroUsize) -> crate::Result<Self> {
let (conn, threads) = lsp::Connection::stdio();
let (id, params) = conn.initialize_start()?;
@ -66,19 +69,27 @@ impl Server {
Ok(Self {
conn,
threads,
worker_threads,
session: Session::new(&server_capabilities, &workspaces)?,
})
}
pub fn run(self) -> crate::Result<()> {
let result = event_loop_thread(move || Self::event_loop(&self.conn, self.session))?.join();
let result = event_loop_thread(move || {
Self::event_loop(&self.conn, self.session, self.worker_threads)
})?
.join();
self.threads.join()?;
result
}
fn event_loop(connection: &Connection, session: Session) -> crate::Result<()> {
fn event_loop(
connection: &Connection,
session: Session,
worker_threads: NonZeroUsize,
) -> crate::Result<()> {
// TODO(jane): Make thread count configurable
let mut scheduler = schedule::Scheduler::new(session, 4, &connection.sender);
let mut scheduler = schedule::Scheduler::new(session, worker_threads, &connection.sender);
for msg in &connection.receiver {
let task = match msg {
lsp::Message::Request(req) => {

View file

@ -1,3 +1,5 @@
use std::num::NonZeroUsize;
use crossbeam::channel::Sender;
use crate::session::Session;
@ -42,13 +44,14 @@ pub(crate) struct Scheduler {
impl Scheduler {
pub(super) fn new(
session: Session,
thread_count: usize,
worker_threads: NonZeroUsize,
sender: &Sender<lsp_server::Message>,
) -> Self {
const FMT_THREADS: usize = 1;
Self {
session,
fmt_pool: thread::Pool::new(1),
background_pool: thread::Pool::new(thread_count),
fmt_pool: thread::Pool::new(NonZeroUsize::try_from(FMT_THREADS).unwrap()),
background_pool: thread::Pool::new(worker_threads),
client: Client::new(sender),
}
}

View file

@ -13,9 +13,12 @@
//! The thread pool is implemented entirely using
//! the threading utilities in [`crate::server::schedule::thread`].
use std::sync::{
use std::{
num::NonZeroUsize,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
};
use crossbeam::channel::{Receiver, Sender};
@ -41,12 +44,15 @@ struct Job {
}
impl Pool {
pub(crate) fn new(threads: usize) -> Pool {
pub(crate) fn new(threads: NonZeroUsize) -> Pool {
// Override OS defaults to avoid stack overflows on platforms with low stack size defaults.
const STACK_SIZE: usize = 2 * 1024 * 1024;
const INITIAL_PRIORITY: ThreadPriority = ThreadPriority::Worker;
let (job_sender, job_receiver) = crossbeam::channel::bounded(threads);
let threads = usize::from(threads);
// Channel buffer capacity is between 2 and 4, depending on the pool size.
let (job_sender, job_receiver) = crossbeam::channel::bounded(std::cmp::min(threads * 2, 4));
let extant_tasks = Arc::new(AtomicUsize::new(0));
let mut handles = Vec::with_capacity(threads);