111 lines
2.7 KiB
Rust
111 lines
2.7 KiB
Rust
use std::sync::mpsc::{self, Receiver};
|
|
use std::sync::{Arc, Mutex};
|
|
use std::thread;
|
|
|
|
use crate::{ERR, ERROR, INFO, WARN};
|
|
|
|
/// A thread pool of workers
|
|
///
|
|
/// # Fields
|
|
///
|
|
/// - `workers` (`Vec<Worker>`) - all workers
|
|
/// - `dispatcher` (`Option<mpsc::Sender<Job>>`) - job dispatcher
|
|
///
|
|
pub struct ThreadPool {
|
|
workers: Vec<Worker>,
|
|
dispatcher: Option<mpsc::Sender<Job>>,
|
|
}
|
|
|
|
struct Worker {
|
|
id: usize,
|
|
thread: thread::JoinHandle<()>,
|
|
}
|
|
|
|
type Job = Box<dyn FnOnce() + Send + 'static>;
|
|
|
|
impl Worker {
|
|
pub fn new(id: usize, rx: Arc<Mutex<Receiver<Job>>>) -> Self {
|
|
let thread = thread::spawn(move || loop {
|
|
let message = rx.lock().unwrap().recv();
|
|
match message {
|
|
Ok(job) => job(),
|
|
Err(error) => {
|
|
WARN!(
|
|
"Unable to fetch job from dispatcher: {}. Shut down worker {}",
|
|
error,
|
|
id
|
|
);
|
|
break;
|
|
}
|
|
}
|
|
});
|
|
INFO!("Start new worker: {}", id);
|
|
Self { id, thread }
|
|
}
|
|
}
|
|
|
|
impl ThreadPool {
|
|
/// Create a new threadpool of workers
|
|
///
|
|
/// # Arguments
|
|
///
|
|
/// - `size` (`usize`) - number of worker
|
|
///
|
|
/// # Returns
|
|
///
|
|
/// - `Self`
|
|
///
|
|
pub fn new(size: usize) -> Self {
|
|
assert!(size > 0);
|
|
let (dispatcher, receiver) = mpsc::channel();
|
|
let dispatcher = Some(dispatcher);
|
|
let receiver = Arc::new(Mutex::new(receiver));
|
|
let mut workers = Vec::with_capacity(size);
|
|
for id in 0..size {
|
|
workers.push(Worker::new(id, receiver.clone()));
|
|
}
|
|
ThreadPool {
|
|
workers,
|
|
dispatcher,
|
|
}
|
|
}
|
|
|
|
/// Execute a job
|
|
///
|
|
/// # Arguments
|
|
///
|
|
/// - `&self` (`undefined`)
|
|
/// - `f` (`F: F: FnOnce() + Send + 'static,`) - Job handle
|
|
///
|
|
pub fn execute<F>(&self, f: F)
|
|
where
|
|
F: FnOnce() + Send + 'static,
|
|
{
|
|
let job = Box::new(f);
|
|
if let Err(error) = self
|
|
.dispatcher
|
|
.as_ref()
|
|
.ok_or(ERR!("No sender found in dispatcher"))
|
|
.and_then(|sender| {
|
|
sender
|
|
.send(job)
|
|
.map_err(|_| ERR!("Unable to send job to worker"))
|
|
})
|
|
{
|
|
ERROR!("Error to dispatch request: {}", error);
|
|
}
|
|
}
|
|
}
|
|
|
|
impl Drop for ThreadPool {
|
|
fn drop(&mut self) {
|
|
drop(self.dispatcher.take());
|
|
for worker in self.workers.drain(..) {
|
|
INFO!("Dropping worker {}", worker.id);
|
|
if let Err(_) = worker.thread.join() {
|
|
WARN!("Unable to join worker {}", worker.id);
|
|
}
|
|
}
|
|
}
|
|
}
|