diff --git a/Cargo.lock b/Cargo.lock index 1e53b5d..48fff88 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -90,6 +90,12 @@ dependencies = [ "libc", ] +[[package]] +name = "lazy_static" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" + [[package]] name = "libc" version = "0.2.139" @@ -110,6 +116,7 @@ name = "luad" version = "0.1.1" dependencies = [ "clap", + "lazy_static", "libc", "mlua", "nix", diff --git a/Cargo.toml b/Cargo.toml index 2b80dcf..5d03454 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,10 +15,11 @@ toml = "0.5" libc = "0.2" rand = "0.8.5" twoway = "0.2.2" +lazy_static = "1.5.0" [profile.release] opt-level = 3 # 's' for size lto = true # panic = 'abort' -codegen-units = 1 \ No newline at end of file +codegen-units = 1 diff --git a/config-example.toml b/config-example.toml index ea98fed..2c2e6af 100644 --- a/config-example.toml +++ b/config-example.toml @@ -9,4 +9,8 @@ user = "dany" # group name group = "dany" -debug = false \ No newline at end of file +debug = true + +workers = 5 + +log_backend = "stderr" # or syslog (default) \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index e8db6ce..2d6881b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -26,7 +26,7 @@ mod utils; pub use utils::{is_unix_socket, on_exit, privdrop}; mod logs; -pub use logs::{LogLevel, LOG}; +pub use logs::{LogLevel, LogManager}; mod fcgi; pub use fcgi::{fcgi_send_slice, process_request}; @@ -36,3 +36,6 @@ pub use luapi::{ lua_new_bytes, lua_new_bytes_from_string, lua_new_from_slice, lua_slice_magic, vec_from_variadic, LuaSlice, LuabyteArray, }; + +mod pool; +pub use pool::ThreadPool; diff --git a/src/logs.rs b/src/logs.rs index 4881d76..459ff1c 100644 --- a/src/logs.rs +++ b/src/logs.rs @@ -1,17 +1,19 @@ -use crate::{APP_VERSION, DAEMON_NAME, ERR}; +use lazy_static::lazy_static; + +use crate::{APP_VERSION, DAEMON_NAME}; use std::ffi::CString; use std::fmt::Arguments; +use std::fmt::Write; +use std::sync::{Arc, Mutex}; + +lazy_static! { + static ref G_LOG_BACKEND: Arc> = Arc::new(Mutex::new(LogBackend::None)); +} /// LOG_MASK is used to create the priority mask in setlogmask /// For a mask UPTO specified /// used with [Priority] /// -/// # Examples -/// -/// ``` -/// use luad::LOG_UPTO; -/// LOG_UPTO!(LogLevel::ERROR) -/// ``` #[macro_export] macro_rules! LOG_UPTO { @@ -26,7 +28,7 @@ macro_rules! LOG_UPTO macro_rules! INFO { ($($args:tt)*) => ({ let prefix = format!(":info@[{}:{}]: ",file!(), line!()); - let _ = crate::LOG::log(&prefix[..], &crate::LogLevel::INFO, format_args!($($args)*)); + let _ = crate::LogManager::log(&prefix[..], crate::LogLevel::INFO, format_args!($($args)*)); }) } @@ -36,7 +38,7 @@ macro_rules! INFO { macro_rules! WARN { ($($args:tt)*) => ({ let prefix = format!(":warning@[{}:{}]: ",file!(), line!()); - let _ = crate::LOG::log(&prefix[..], &crate::LogLevel::WARN, format_args!($($args)*)); + let _ = crate::LogManager::log(&prefix[..], crate::LogLevel::WARN, format_args!($($args)*)); }) } @@ -46,7 +48,7 @@ macro_rules! WARN { macro_rules! ERROR { ($($args:tt)*) => ({ let prefix = format!(":error@[{}:{}]: ",file!(), line!()); - let _ = crate::LOG::log(&prefix[..], &crate::LogLevel::ERROR, format_args!($($args)*)); + let _ = crate::LogManager::log(&prefix[..], crate::LogLevel::ERROR, format_args!($($args)*)); }) } @@ -56,27 +58,44 @@ macro_rules! ERROR { macro_rules! DEBUG { ($($args:tt)*) => ({ let prefix = format!(":debug@[{}:{}]: ",file!(), line!()); - let _ = crate::LOG::log(&prefix[..], &crate::LogLevel::DEBUG, format_args!($($args)*)); + let _ = crate::LogManager::log(&prefix[..], crate::LogLevel::DEBUG, format_args!($($args)*)); }) } /// Different Logging levels for `LOG` +#[repr(i32)] +#[derive(Copy, Clone)] pub enum LogLevel { /// Error conditions - ERROR, + ERROR = libc::LOG_ERR, /// Normal, but significant, condition - INFO, + INFO = libc::LOG_NOTICE, /// Warning conditions - WARN, + WARN = libc::LOG_WARNING, /// Debugs message - DEBUG, + DEBUG = libc::LOG_INFO, +} + +trait Log { + fn set_level(&mut self, level: LogLevel); + fn log( + &self, + prefix: &str, + level: LogLevel, + args: Arguments<'_>, + ) -> Result<(), Box>; +} +enum LogBackend { + SysLog(SysLog), + Stderr(StderrLog), + None, } /// Log struct wrapper /// -pub struct LOG {} +struct SysLog {} -impl LOG { +impl SysLog { /// Init the system log /// /// This should be called only once in the entire lifetime @@ -84,8 +103,7 @@ impl LOG { /// be keep alive during the lifetime of the program (the main function). /// When it is dropped, the connection to the system log will be /// closed automatically - #[must_use] - pub fn init_log() -> Self { + fn new() -> Self { // connect to the system log unsafe { libc::openlog( @@ -97,11 +115,14 @@ impl LOG { } Self {} } - /// Enable the Log debug +} + +impl Log for SysLog { + /// Set log level /// - pub fn enable_debug() { + fn set_level(&mut self, level: LogLevel) { unsafe { - let _ = libc::setlogmask(LOG_UPTO!(libc::LOG_INFO)); + let _ = libc::setlogmask(LOG_UPTO!(level as i32)); } } @@ -117,29 +138,25 @@ impl LOG { /// # Errors /// /// * `std error` - All errors related to formatted and C string manipulation - pub fn log(prefix: &str, level: &LogLevel, args: Arguments<'_>) -> Result<(), std::io::Error> { - use std::fmt::Write; - let sysloglevel = match level { - LogLevel::ERROR => libc::LOG_ERR, - LogLevel::WARN => libc::LOG_WARNING, - LogLevel::INFO => libc::LOG_NOTICE, - _ => libc::LOG_INFO, - }; + fn log( + &self, + prefix: &str, + level: LogLevel, + args: Arguments<'_>, + ) -> Result<(), Box> { let mut output = String::new(); - if output.write_fmt(args).is_err() { - return Err(ERR!("Unable to create format string from arguments")); - } - let log_fmt = format!("{}(v{}){}%s\n", DAEMON_NAME, APP_VERSION, prefix); + output.write_fmt(args)?; + let log_fmt = format!("{}(v{}){}%s", DAEMON_NAME, APP_VERSION, prefix); let fmt = CString::new(log_fmt.as_bytes())?; let c_msg = CString::new(output.as_bytes())?; unsafe { - libc::syslog(sysloglevel, fmt.as_ptr(), c_msg.as_ptr()); + libc::syslog(level as i32, fmt.as_ptr(), c_msg.as_ptr()); } Ok(()) } } -impl Drop for LOG { +impl Drop for SysLog { /// The connection to the syslog will be closed /// automatically when the log object is drop fn drop(&mut self) { @@ -149,3 +166,124 @@ impl Drop for LOG { } } } + +struct StderrLog { + level: LogLevel, +} + +impl StderrLog { + pub fn new() -> Self { + Self { + level: LogLevel::INFO, + } + } +} + +impl Log for StderrLog { + fn set_level(&mut self, level: LogLevel) { + self.level = level + } + + /// Wrapper function that log error or info message to the + /// stderr + /// + /// # Arguments + /// + /// * `prefix` - Prefix of the log message + /// * `level` - Log level + /// * `args` - Arguments object representing a format string and its arguments + /// + /// # Errors + /// + /// * `std error` - All errors related to formatted and C string manipulation + fn log( + &self, + prefix: &str, + level: LogLevel, + args: Arguments<'_>, + ) -> Result<(), Box> { + let mut output = String::new(); + if (self.level as i32) < (level as i32) { + return Ok(()); + } + output.write_fmt(args)?; + eprintln!("{}(v{}){}: {}", DAEMON_NAME, APP_VERSION, prefix, output); + Ok(()) + } +} + +impl Log for Arc> { + fn set_level(&mut self, level: LogLevel) { + match self.lock().as_deref_mut().unwrap() { + LogBackend::SysLog(log) => log.set_level(level), + LogBackend::Stderr(log) => log.set_level(level), + LogBackend::None => (), + } + } + + fn log( + &self, + prefix: &str, + level: LogLevel, + args: Arguments<'_>, + ) -> Result<(), Box> { + match self.lock().as_deref().unwrap() { + LogBackend::SysLog(log) => log.log(prefix, level, args), + LogBackend::Stderr(log) => log.log(prefix, level, args), + LogBackend::None => Ok(()), + } + } +} + +/// Public log manager structure +pub struct LogManager {} + +impl LogManager { + /// Init the logging system + /// + /// # Arguments + /// + /// - `backend` (`Option`) - backend name + /// + pub fn init_log(backend: &Option) { + *LogManager::log_instance().lock().as_deref_mut().unwrap() = match backend { + Some(s) if s == "syslog" => LogBackend::SysLog(SysLog::new()), + Some(s) if s == "stderr" => LogBackend::Stderr(StderrLog::new()), + Some(_) => LogBackend::None, + None => LogBackend::None, + }; + } + + fn log_instance() -> Arc> { + G_LOG_BACKEND.clone() + } + + /// Set the log level + /// + /// # Arguments + /// + /// - `level` (`LogLevel`) - log level + /// + pub fn set_level(level: LogLevel) { + LogManager::log_instance().set_level(level); + } + + /// Log a message + /// + /// # Arguments + /// + /// - `prefix` (`&str`) - Log prefix + /// - `level` (`LogLevel`) - Log level + /// - `args` (`Arguments<'_>`) - Arguments + /// + /// # Returns + /// + /// - `Result<(), Box>` + pub fn log( + prefix: &str, + level: LogLevel, + args: Arguments<'_>, + ) -> Result<(), Box> { + LogManager::log_instance().log(prefix, level, args) + } +} diff --git a/src/main.rs b/src/main.rs index dab3279..fa3798f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -25,7 +25,8 @@ use std::os::fd::FromRawFd; use std::os::unix::io::AsRawFd; use std::os::unix::net::UnixListener; use std::path::Path; -use std::thread; + +const DEFAULT_WORKER_NUMBER: usize = 4; /// Callback: clean up function /// @@ -65,6 +66,7 @@ fn handle_request(stream: &mut T) { /// /// * `socket_opt` - The socket string that the server listens on fn serve(config: &Config) { + let pool = ThreadPool::new(config.workers.unwrap_or(DEFAULT_WORKER_NUMBER)); // bind to a socket if any if let Some(socket_name) = config.socket.as_deref() { // test if the socket name is an unix domain socket @@ -76,9 +78,7 @@ fn serve(config: &Config) { let listener = UnixListener::bind(socket_name.replace("unix:", "")).unwrap(); for client in listener.incoming() { let mut stream = client.unwrap(); - let _ = thread::spawn(move || { - handle_request(&mut stream); - }); + pool.execute(move || handle_request(&mut stream)); } } else { // TCP socket eg. 127.0.0.1:9000 @@ -86,9 +86,7 @@ fn serve(config: &Config) { let listener = TcpListener::bind(socket_name).unwrap(); for client in listener.incoming() { let mut stream = client.unwrap(); - let _ = thread::spawn(move || { - handle_request(&mut stream); - }); + pool.execute(move || handle_request(&mut stream)); } } } else { @@ -103,9 +101,7 @@ fn serve(config: &Config) { for client in listener.incoming() { let mut stream = client.unwrap(); - let _ = thread::spawn(move || { - handle_request(&mut stream); - }); + pool.execute(move || handle_request(&mut stream)); } } else { DEBUG!("Stdin is used as TCP Socket"); @@ -113,9 +109,7 @@ fn serve(config: &Config) { for client in listener.incoming() { let mut stream = client.unwrap(); - let _ = thread::spawn(move || { - handle_request(&mut stream); - }); + pool.execute(move || handle_request(&mut stream)); } } } @@ -128,6 +122,8 @@ struct Config { user: Option, group: Option, debug: bool, + workers: Option, + log_backend: Option, } /// Main application entry @@ -135,7 +131,6 @@ struct Config { /// Run a `fastCGI` server fn main() { on_exit(clean_up); - let _log = LOG::init_log(); let matches = clap::App::new(DAEMON_NAME) .author(APP_AUTHOR) .about("Lua FastCGI daemon") @@ -156,6 +151,8 @@ fn main() { user: None, group: None, debug: false, + workers: None, + log_backend: Some("syslog".to_string()), }; if let Ok(val) = std::env::var("debug") { if val == "1" || val == "true" { @@ -183,9 +180,10 @@ fn main() { } None => {} } + LogManager::init_log(&config.log_backend); if config.debug { - INFO!("Debug enabled"); - LOG::enable_debug(); + LogManager::set_level(LogLevel::DEBUG); + DEBUG!("Configuration: {:?}", config); } serve(&config); } diff --git a/src/pool.rs b/src/pool.rs new file mode 100644 index 0000000..eec0727 --- /dev/null +++ b/src/pool.rs @@ -0,0 +1,97 @@ +use std::sync::mpsc::{self, Receiver}; +use std::sync::{Arc, Mutex}; +use std::thread; + +use crate::{INFO, WARN}; + +/// A thread pool of workers +/// +/// # Fields +/// +/// - `workers` (`Vec`) - all workers +/// - `dispatcher` (`Option>`) - job dispatcher +/// +pub struct ThreadPool { + workers: Vec, + dispatcher: Option>, +} + +struct Worker { + id: usize, + thread: thread::JoinHandle<()>, +} + +type Job = Box; + +impl Worker { + pub fn new(id: usize, rx: Arc>>) -> Self { + let thread = thread::spawn(move || loop { + let message = rx.lock().unwrap().recv(); + match message { + Ok(job) => job(), + Err(error) => { + WARN!( + "Unable to fetch job from dispatcher: {}. Shut down worker {}", + error, + id + ); + break; + } + } + }); + INFO!("Start new worker: {}", id); + Self { id, thread } + } +} + +impl ThreadPool { + /// Create a new threadpool of workers + /// + /// # Arguments + /// + /// - `size` (`usize`) - number of worker + /// + /// # Returns + /// + /// - `Self` + /// + pub fn new(size: usize) -> Self { + assert!(size > 0); + let (dispatcher, receiver) = mpsc::channel(); + let dispatcher = Some(dispatcher); + let receiver = Arc::new(Mutex::new(receiver)); + let mut workers = Vec::with_capacity(size); + for id in 0..size { + workers.push(Worker::new(id, receiver.clone())); + } + ThreadPool { + workers, + dispatcher, + } + } + + /// Execute a job + /// + /// # Arguments + /// + /// - `&self` (`undefined`) + /// - `f` (`F: F: FnOnce() + Send + 'static,`) - Job handle + /// + pub fn execute(&self, f: F) + where + F: FnOnce() + Send + 'static, + { + let job = Box::new(f); + self.dispatcher.as_ref().unwrap().send(job).unwrap(); + } +} + +impl Drop for ThreadPool { + fn drop(&mut self) { + drop(self.dispatcher.take()); + for worker in self.workers.drain(..) { + INFO!("Dropping worker {}", worker.id); + worker.thread.join().unwrap(); + } + } +}