feat: add thread pool + allow multiple logging system (syslog + stderr)

This commit is contained in:
2026-04-01 05:36:18 +02:00
parent 034b840816
commit 6e81197419
7 changed files with 303 additions and 55 deletions

7
Cargo.lock generated
View File

@@ -90,6 +90,12 @@ dependencies = [
"libc",
]
[[package]]
name = "lazy_static"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe"
[[package]]
name = "libc"
version = "0.2.139"
@@ -110,6 +116,7 @@ name = "luad"
version = "0.1.1"
dependencies = [
"clap",
"lazy_static",
"libc",
"mlua",
"nix",

View File

@@ -15,10 +15,11 @@ toml = "0.5"
libc = "0.2"
rand = "0.8.5"
twoway = "0.2.2"
lazy_static = "1.5.0"
[profile.release]
opt-level = 3
# 's' for size
lto = true
# panic = 'abort'
codegen-units = 1
codegen-units = 1

View File

@@ -9,4 +9,8 @@ user = "dany"
# group name
group = "dany"
debug = false
debug = true
workers = 5
log_backend = "stderr" # or syslog (default)

View File

@@ -26,7 +26,7 @@ mod utils;
pub use utils::{is_unix_socket, on_exit, privdrop};
mod logs;
pub use logs::{LogLevel, LOG};
pub use logs::{LogLevel, LogManager};
mod fcgi;
pub use fcgi::{fcgi_send_slice, process_request};
@@ -36,3 +36,6 @@ pub use luapi::{
lua_new_bytes, lua_new_bytes_from_string, lua_new_from_slice, lua_slice_magic,
vec_from_variadic, LuaSlice, LuabyteArray,
};
mod pool;
pub use pool::ThreadPool;

View File

@@ -1,17 +1,19 @@
use crate::{APP_VERSION, DAEMON_NAME, ERR};
use lazy_static::lazy_static;
use crate::{APP_VERSION, DAEMON_NAME};
use std::ffi::CString;
use std::fmt::Arguments;
use std::fmt::Write;
use std::sync::{Arc, Mutex};
lazy_static! {
static ref G_LOG_BACKEND: Arc<Mutex<LogBackend>> = Arc::new(Mutex::new(LogBackend::None));
}
/// LOG_MASK is used to create the priority mask in setlogmask
/// For a mask UPTO specified
/// used with [Priority]
///
/// # Examples
///
/// ```
/// use luad::LOG_UPTO;
/// LOG_UPTO!(LogLevel::ERROR)
/// ```
#[macro_export]
macro_rules! LOG_UPTO
{
@@ -26,7 +28,7 @@ macro_rules! LOG_UPTO
macro_rules! INFO {
($($args:tt)*) => ({
let prefix = format!(":info@[{}:{}]: ",file!(), line!());
let _ = crate::LOG::log(&prefix[..], &crate::LogLevel::INFO, format_args!($($args)*));
let _ = crate::LogManager::log(&prefix[..], crate::LogLevel::INFO, format_args!($($args)*));
})
}
@@ -36,7 +38,7 @@ macro_rules! INFO {
macro_rules! WARN {
($($args:tt)*) => ({
let prefix = format!(":warning@[{}:{}]: ",file!(), line!());
let _ = crate::LOG::log(&prefix[..], &crate::LogLevel::WARN, format_args!($($args)*));
let _ = crate::LogManager::log(&prefix[..], crate::LogLevel::WARN, format_args!($($args)*));
})
}
@@ -46,7 +48,7 @@ macro_rules! WARN {
macro_rules! ERROR {
($($args:tt)*) => ({
let prefix = format!(":error@[{}:{}]: ",file!(), line!());
let _ = crate::LOG::log(&prefix[..], &crate::LogLevel::ERROR, format_args!($($args)*));
let _ = crate::LogManager::log(&prefix[..], crate::LogLevel::ERROR, format_args!($($args)*));
})
}
@@ -56,27 +58,44 @@ macro_rules! ERROR {
macro_rules! DEBUG {
($($args:tt)*) => ({
let prefix = format!(":debug@[{}:{}]: ",file!(), line!());
let _ = crate::LOG::log(&prefix[..], &crate::LogLevel::DEBUG, format_args!($($args)*));
let _ = crate::LogManager::log(&prefix[..], crate::LogLevel::DEBUG, format_args!($($args)*));
})
}
/// Different Logging levels for `LOG`
#[repr(i32)]
#[derive(Copy, Clone)]
pub enum LogLevel {
/// Error conditions
ERROR,
ERROR = libc::LOG_ERR,
/// Normal, but significant, condition
INFO,
INFO = libc::LOG_NOTICE,
/// Warning conditions
WARN,
WARN = libc::LOG_WARNING,
/// Debugs message
DEBUG,
DEBUG = libc::LOG_INFO,
}
trait Log {
fn set_level(&mut self, level: LogLevel);
fn log(
&self,
prefix: &str,
level: LogLevel,
args: Arguments<'_>,
) -> Result<(), Box<dyn std::error::Error>>;
}
enum LogBackend {
SysLog(SysLog),
Stderr(StderrLog),
None,
}
/// Log struct wrapper
///
pub struct LOG {}
struct SysLog {}
impl LOG {
impl SysLog {
/// Init the system log
///
/// This should be called only once in the entire lifetime
@@ -84,8 +103,7 @@ impl LOG {
/// be keep alive during the lifetime of the program (the main function).
/// When it is dropped, the connection to the system log will be
/// closed automatically
#[must_use]
pub fn init_log() -> Self {
fn new() -> Self {
// connect to the system log
unsafe {
libc::openlog(
@@ -97,11 +115,14 @@ impl LOG {
}
Self {}
}
/// Enable the Log debug
}
impl Log for SysLog {
/// Set log level
///
pub fn enable_debug() {
fn set_level(&mut self, level: LogLevel) {
unsafe {
let _ = libc::setlogmask(LOG_UPTO!(libc::LOG_INFO));
let _ = libc::setlogmask(LOG_UPTO!(level as i32));
}
}
@@ -117,29 +138,25 @@ impl LOG {
/// # Errors
///
/// * `std error` - All errors related to formatted and C string manipulation
pub fn log(prefix: &str, level: &LogLevel, args: Arguments<'_>) -> Result<(), std::io::Error> {
use std::fmt::Write;
let sysloglevel = match level {
LogLevel::ERROR => libc::LOG_ERR,
LogLevel::WARN => libc::LOG_WARNING,
LogLevel::INFO => libc::LOG_NOTICE,
_ => libc::LOG_INFO,
};
fn log(
&self,
prefix: &str,
level: LogLevel,
args: Arguments<'_>,
) -> Result<(), Box<dyn std::error::Error>> {
let mut output = String::new();
if output.write_fmt(args).is_err() {
return Err(ERR!("Unable to create format string from arguments"));
}
let log_fmt = format!("{}(v{}){}%s\n", DAEMON_NAME, APP_VERSION, prefix);
output.write_fmt(args)?;
let log_fmt = format!("{}(v{}){}%s", DAEMON_NAME, APP_VERSION, prefix);
let fmt = CString::new(log_fmt.as_bytes())?;
let c_msg = CString::new(output.as_bytes())?;
unsafe {
libc::syslog(sysloglevel, fmt.as_ptr(), c_msg.as_ptr());
libc::syslog(level as i32, fmt.as_ptr(), c_msg.as_ptr());
}
Ok(())
}
}
impl Drop for LOG {
impl Drop for SysLog {
/// The connection to the syslog will be closed
/// automatically when the log object is drop
fn drop(&mut self) {
@@ -149,3 +166,124 @@ impl Drop for LOG {
}
}
}
struct StderrLog {
level: LogLevel,
}
impl StderrLog {
pub fn new() -> Self {
Self {
level: LogLevel::INFO,
}
}
}
impl Log for StderrLog {
fn set_level(&mut self, level: LogLevel) {
self.level = level
}
/// Wrapper function that log error or info message to the
/// stderr
///
/// # Arguments
///
/// * `prefix` - Prefix of the log message
/// * `level` - Log level
/// * `args` - Arguments object representing a format string and its arguments
///
/// # Errors
///
/// * `std error` - All errors related to formatted and C string manipulation
fn log(
&self,
prefix: &str,
level: LogLevel,
args: Arguments<'_>,
) -> Result<(), Box<dyn std::error::Error>> {
let mut output = String::new();
if (self.level as i32) < (level as i32) {
return Ok(());
}
output.write_fmt(args)?;
eprintln!("{}(v{}){}: {}", DAEMON_NAME, APP_VERSION, prefix, output);
Ok(())
}
}
impl Log for Arc<Mutex<LogBackend>> {
fn set_level(&mut self, level: LogLevel) {
match self.lock().as_deref_mut().unwrap() {
LogBackend::SysLog(log) => log.set_level(level),
LogBackend::Stderr(log) => log.set_level(level),
LogBackend::None => (),
}
}
fn log(
&self,
prefix: &str,
level: LogLevel,
args: Arguments<'_>,
) -> Result<(), Box<dyn std::error::Error>> {
match self.lock().as_deref().unwrap() {
LogBackend::SysLog(log) => log.log(prefix, level, args),
LogBackend::Stderr(log) => log.log(prefix, level, args),
LogBackend::None => Ok(()),
}
}
}
/// Public log manager structure
pub struct LogManager {}
impl LogManager {
/// Init the logging system
///
/// # Arguments
///
/// - `backend` (`Option<String>`) - backend name
///
pub fn init_log(backend: &Option<String>) {
*LogManager::log_instance().lock().as_deref_mut().unwrap() = match backend {
Some(s) if s == "syslog" => LogBackend::SysLog(SysLog::new()),
Some(s) if s == "stderr" => LogBackend::Stderr(StderrLog::new()),
Some(_) => LogBackend::None,
None => LogBackend::None,
};
}
fn log_instance() -> Arc<Mutex<LogBackend>> {
G_LOG_BACKEND.clone()
}
/// Set the log level
///
/// # Arguments
///
/// - `level` (`LogLevel`) - log level
///
pub fn set_level(level: LogLevel) {
LogManager::log_instance().set_level(level);
}
/// Log a message
///
/// # Arguments
///
/// - `prefix` (`&str`) - Log prefix
/// - `level` (`LogLevel`) - Log level
/// - `args` (`Arguments<'_>`) - Arguments
///
/// # Returns
///
/// - `Result<(), Box<dyn std::error::Error>>`
pub fn log(
prefix: &str,
level: LogLevel,
args: Arguments<'_>,
) -> Result<(), Box<dyn std::error::Error>> {
LogManager::log_instance().log(prefix, level, args)
}
}

View File

@@ -25,7 +25,8 @@ use std::os::fd::FromRawFd;
use std::os::unix::io::AsRawFd;
use std::os::unix::net::UnixListener;
use std::path::Path;
use std::thread;
const DEFAULT_WORKER_NUMBER: usize = 4;
/// Callback: clean up function
///
@@ -65,6 +66,7 @@ fn handle_request<T: Read + Write + AsRawFd>(stream: &mut T) {
///
/// * `socket_opt` - The socket string that the server listens on
fn serve(config: &Config) {
let pool = ThreadPool::new(config.workers.unwrap_or(DEFAULT_WORKER_NUMBER));
// bind to a socket if any
if let Some(socket_name) = config.socket.as_deref() {
// test if the socket name is an unix domain socket
@@ -76,9 +78,7 @@ fn serve(config: &Config) {
let listener = UnixListener::bind(socket_name.replace("unix:", "")).unwrap();
for client in listener.incoming() {
let mut stream = client.unwrap();
let _ = thread::spawn(move || {
handle_request(&mut stream);
});
pool.execute(move || handle_request(&mut stream));
}
} else {
// TCP socket eg. 127.0.0.1:9000
@@ -86,9 +86,7 @@ fn serve(config: &Config) {
let listener = TcpListener::bind(socket_name).unwrap();
for client in listener.incoming() {
let mut stream = client.unwrap();
let _ = thread::spawn(move || {
handle_request(&mut stream);
});
pool.execute(move || handle_request(&mut stream));
}
}
} else {
@@ -103,9 +101,7 @@ fn serve(config: &Config) {
for client in listener.incoming() {
let mut stream = client.unwrap();
let _ = thread::spawn(move || {
handle_request(&mut stream);
});
pool.execute(move || handle_request(&mut stream));
}
} else {
DEBUG!("Stdin is used as TCP Socket");
@@ -113,9 +109,7 @@ fn serve(config: &Config) {
for client in listener.incoming() {
let mut stream = client.unwrap();
let _ = thread::spawn(move || {
handle_request(&mut stream);
});
pool.execute(move || handle_request(&mut stream));
}
}
}
@@ -128,6 +122,8 @@ struct Config {
user: Option<String>,
group: Option<String>,
debug: bool,
workers: Option<usize>,
log_backend: Option<String>,
}
/// Main application entry
@@ -135,7 +131,6 @@ struct Config {
/// Run a `fastCGI` server
fn main() {
on_exit(clean_up);
let _log = LOG::init_log();
let matches = clap::App::new(DAEMON_NAME)
.author(APP_AUTHOR)
.about("Lua FastCGI daemon")
@@ -156,6 +151,8 @@ fn main() {
user: None,
group: None,
debug: false,
workers: None,
log_backend: Some("syslog".to_string()),
};
if let Ok(val) = std::env::var("debug") {
if val == "1" || val == "true" {
@@ -183,9 +180,10 @@ fn main() {
}
None => {}
}
LogManager::init_log(&config.log_backend);
if config.debug {
INFO!("Debug enabled");
LOG::enable_debug();
LogManager::set_level(LogLevel::DEBUG);
DEBUG!("Configuration: {:?}", config);
}
serve(&config);
}

97
src/pool.rs Normal file
View File

@@ -0,0 +1,97 @@
use std::sync::mpsc::{self, Receiver};
use std::sync::{Arc, Mutex};
use std::thread;
use crate::{INFO, WARN};
/// A thread pool of workers
///
/// # Fields
///
/// - `workers` (`Vec<Worker>`) - all workers
/// - `dispatcher` (`Option<mpsc::Sender<Job>>`) - job dispatcher
///
pub struct ThreadPool {
workers: Vec<Worker>,
dispatcher: Option<mpsc::Sender<Job>>,
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl Worker {
pub fn new(id: usize, rx: Arc<Mutex<Receiver<Job>>>) -> Self {
let thread = thread::spawn(move || loop {
let message = rx.lock().unwrap().recv();
match message {
Ok(job) => job(),
Err(error) => {
WARN!(
"Unable to fetch job from dispatcher: {}. Shut down worker {}",
error,
id
);
break;
}
}
});
INFO!("Start new worker: {}", id);
Self { id, thread }
}
}
impl ThreadPool {
/// Create a new threadpool of workers
///
/// # Arguments
///
/// - `size` (`usize`) - number of worker
///
/// # Returns
///
/// - `Self`
///
pub fn new(size: usize) -> Self {
assert!(size > 0);
let (dispatcher, receiver) = mpsc::channel();
let dispatcher = Some(dispatcher);
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, receiver.clone()));
}
ThreadPool {
workers,
dispatcher,
}
}
/// Execute a job
///
/// # Arguments
///
/// - `&self` (`undefined`)
/// - `f` (`F: F: FnOnce() + Send + 'static,`) - Job handle
///
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.dispatcher.as_ref().unwrap().send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
drop(self.dispatcher.take());
for worker in self.workers.drain(..) {
INFO!("Dropping worker {}", worker.id);
worker.thread.join().unwrap();
}
}
}