mirror of
https://github.com/lxsang/luafcgi.git
synced 2025-02-23 12:22:49 +01:00
Support for websocket connection
This commit is contained in:
parent
b5acb19bc7
commit
2654fec94a
54
Cargo.lock
generated
54
Cargo.lock
generated
@ -70,6 +70,17 @@ dependencies = [
|
|||||||
"vec_map",
|
"vec_map",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "getrandom"
|
||||||
|
version = "0.2.8"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "c05aeb6a22b8f62540c194aac980f2115af067bfe15a0734d7277a768d396b31"
|
||||||
|
dependencies = [
|
||||||
|
"cfg-if",
|
||||||
|
"libc",
|
||||||
|
"wasi",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "hermit-abi"
|
name = "hermit-abi"
|
||||||
version = "0.1.19"
|
version = "0.1.19"
|
||||||
@ -102,6 +113,7 @@ dependencies = [
|
|||||||
"libc",
|
"libc",
|
||||||
"mlua",
|
"mlua",
|
||||||
"nix",
|
"nix",
|
||||||
|
"rand",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_derive",
|
"serde_derive",
|
||||||
"toml",
|
"toml",
|
||||||
@ -188,6 +200,12 @@ version = "0.3.26"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "6ac9a59f73473f1b8d852421e59e64809f025994837ef743615c6d0c5b305160"
|
checksum = "6ac9a59f73473f1b8d852421e59e64809f025994837ef743615c6d0c5b305160"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "ppv-lite86"
|
||||||
|
version = "0.2.17"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "proc-macro2"
|
name = "proc-macro2"
|
||||||
version = "1.0.49"
|
version = "1.0.49"
|
||||||
@ -206,6 +224,36 @@ dependencies = [
|
|||||||
"proc-macro2",
|
"proc-macro2",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "rand"
|
||||||
|
version = "0.8.5"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404"
|
||||||
|
dependencies = [
|
||||||
|
"libc",
|
||||||
|
"rand_chacha",
|
||||||
|
"rand_core",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "rand_chacha"
|
||||||
|
version = "0.3.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88"
|
||||||
|
dependencies = [
|
||||||
|
"ppv-lite86",
|
||||||
|
"rand_core",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "rand_core"
|
||||||
|
version = "0.6.4"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c"
|
||||||
|
dependencies = [
|
||||||
|
"getrandom",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "rustc-hash"
|
name = "rustc-hash"
|
||||||
version = "1.1.0"
|
version = "1.1.0"
|
||||||
@ -291,6 +339,12 @@ version = "0.8.2"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191"
|
checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "wasi"
|
||||||
|
version = "0.11.0+wasi-snapshot-preview1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "winapi"
|
name = "winapi"
|
||||||
version = "0.3.9"
|
version = "0.3.9"
|
||||||
|
@ -13,6 +13,7 @@ serde = {version = "1.0", features = ["derive"]}
|
|||||||
serde_derive = "1.0"
|
serde_derive = "1.0"
|
||||||
toml = "0.5"
|
toml = "0.5"
|
||||||
libc = "0.2"
|
libc = "0.2"
|
||||||
|
rand = "0.8.5"
|
||||||
|
|
||||||
[profile.release]
|
[profile.release]
|
||||||
opt-level = 's'
|
opt-level = 's'
|
||||||
|
599
src/lib.rs
599
src/lib.rs
@ -1,12 +1,12 @@
|
|||||||
use mlua::prelude::*;
|
use mlua::{prelude::*, Variadic};
|
||||||
use nix;
|
use nix;
|
||||||
|
use rand::Rng;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::ffi::CString;
|
use std::ffi::CString;
|
||||||
use std::fmt::Arguments;
|
use std::fmt::Arguments;
|
||||||
use std::io::{BufRead, BufReader, Error, ErrorKind, Read, Write};
|
use std::io::{BufRead, BufReader, Error, ErrorKind, Read, Write};
|
||||||
use std::os::fd::RawFd;
|
use std::os::fd::RawFd;
|
||||||
use std::os::unix::io::AsRawFd;
|
use std::os::unix::io::AsRawFd;
|
||||||
|
|
||||||
/// app author
|
/// app author
|
||||||
pub const APP_AUTHOR: &str = "Dany LE <mrsang@iohub.dev>";
|
pub const APP_AUTHOR: &str = "Dany LE <mrsang@iohub.dev>";
|
||||||
|
|
||||||
@ -107,6 +107,13 @@ macro_rules! ERR {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[macro_export]
|
||||||
|
macro_rules! BITV {
|
||||||
|
($v:expr,$i:expr) => {
|
||||||
|
($v & (1 << $i)) >> $i
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
/// Macro for error log helper
|
/// Macro for error log helper
|
||||||
///
|
///
|
||||||
#[macro_export]
|
#[macro_export]
|
||||||
@ -439,7 +446,7 @@ impl std::fmt::Display for FcgiHeader {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, PartialEq)]
|
#[derive(Debug)]
|
||||||
enum FCGIRequestState {
|
enum FCGIRequestState {
|
||||||
WaitForParams,
|
WaitForParams,
|
||||||
WaitForStdin,
|
WaitForStdin,
|
||||||
@ -464,60 +471,301 @@ struct FGCIRequest {
|
|||||||
data: Option<Vec<u8>>,
|
data: Option<Vec<u8>>,
|
||||||
state: FCGIRequestState,
|
state: FCGIRequestState,
|
||||||
}
|
}
|
||||||
#[repr(C)]
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
enum WSHeaderOpcode {
|
||||||
|
Data,
|
||||||
|
Text,
|
||||||
|
Bin,
|
||||||
|
Close,
|
||||||
|
Ping,
|
||||||
|
Pong,
|
||||||
|
Unknown,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::fmt::Display for WSHeaderOpcode {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||||
|
let s = match self {
|
||||||
|
WSHeaderOpcode::Text => "WSHeaderOpcode::Text",
|
||||||
|
WSHeaderOpcode::Bin => "WSHeaderOpcode::Bin",
|
||||||
|
WSHeaderOpcode::Close => "WSHeaderOpcode::Close",
|
||||||
|
WSHeaderOpcode::Ping => "WSHeaderOpcode::Ping",
|
||||||
|
WSHeaderOpcode::Pong => "WSHeaderOpcode::Pong",
|
||||||
|
WSHeaderOpcode::Unknown => "WSHeaderOpcode::Unknown",
|
||||||
|
WSHeaderOpcode::Data => "WSHeaderOpcode::Data",
|
||||||
|
};
|
||||||
|
write!(f, "{}", s)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl WSHeaderOpcode {
|
||||||
|
fn from_u8(v: u8) -> WSHeaderOpcode {
|
||||||
|
match v {
|
||||||
|
0x0 => WSHeaderOpcode::Data,
|
||||||
|
0x1 => WSHeaderOpcode::Text,
|
||||||
|
0x2 => WSHeaderOpcode::Bin,
|
||||||
|
0x8 => WSHeaderOpcode::Close,
|
||||||
|
0x9 => WSHeaderOpcode::Ping,
|
||||||
|
0xA => WSHeaderOpcode::Pong,
|
||||||
|
_ => WSHeaderOpcode::Unknown,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
fn as_u8(&self) -> u8 {
|
||||||
|
match self {
|
||||||
|
WSHeaderOpcode::Text => 0x1,
|
||||||
|
WSHeaderOpcode::Bin => 0x2,
|
||||||
|
WSHeaderOpcode::Close => 0x8,
|
||||||
|
WSHeaderOpcode::Ping => 0x9,
|
||||||
|
WSHeaderOpcode::Pong => 0xA,
|
||||||
|
WSHeaderOpcode::Unknown => 0xFF,
|
||||||
|
WSHeaderOpcode::Data => 0x0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
struct WSHeader {
|
||||||
|
fin: u8,
|
||||||
|
opcode: WSHeaderOpcode,
|
||||||
|
len: usize,
|
||||||
|
mask: u8,
|
||||||
|
mask_key: Vec<u8>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl mlua::UserData for WSHeader {
|
||||||
|
fn add_fields<'lua, F: mlua::UserDataFields<'lua, Self>>(fields: &mut F) {
|
||||||
|
fields.add_field_method_get("fin", |_, this| Ok(this.fin));
|
||||||
|
fields.add_field_method_get("opcode", |_, this| Ok(this.opcode.as_u8()));
|
||||||
|
fields.add_field_method_get("len", |_, this| Ok(this.len));
|
||||||
|
fields.add_field_method_get("mask", |_, this| Ok(this.mask));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
impl WSHeader {
|
||||||
|
fn read_from(stream: &mut FCGIOStream) -> Result<WSHeader, Box<dyn std::error::Error>> {
|
||||||
|
let mut header = WSHeader {
|
||||||
|
fin: 0,
|
||||||
|
opcode: WSHeaderOpcode::Close,
|
||||||
|
len: 0,
|
||||||
|
mask: 0,
|
||||||
|
mask_key: vec![0; 4],
|
||||||
|
};
|
||||||
|
let mut bytes = stream.stdin_read_exact(2)?;
|
||||||
|
if BITV!(bytes[0], 6) == 1 || BITV!(bytes[0], 5) == 1 || BITV!(bytes[0], 4) == 1 {
|
||||||
|
return Err(Box::new(ERR!("Reserved bits 4,5,6 must be 0")));
|
||||||
|
}
|
||||||
|
header.fin = BITV!(bytes[0], 7);
|
||||||
|
header.opcode = WSHeaderOpcode::from_u8(bytes[0] & 0x0F);
|
||||||
|
header.mask = BITV!(bytes[1], 7);
|
||||||
|
let len = bytes[1] & 0x7F;
|
||||||
|
if len <= 125 {
|
||||||
|
header.len = len as usize;
|
||||||
|
} else if len == 126 {
|
||||||
|
bytes = stream.stdin_read_exact(2)?;
|
||||||
|
header.len = ((bytes[0] as usize) << 8) + (bytes[1] as usize);
|
||||||
|
} else {
|
||||||
|
bytes = stream.stdin_read_exact(8)?;
|
||||||
|
// TODO we only support up to 4 bytes len
|
||||||
|
header.len = ((bytes[4] as usize) << 24)
|
||||||
|
+ ((bytes[5] as usize) << 16)
|
||||||
|
+ ((bytes[6] as usize) << 8)
|
||||||
|
+ (bytes[7] as usize);
|
||||||
|
}
|
||||||
|
header.mask_key = stream.stdin_read_exact(4)?;
|
||||||
|
DEBUG!("Read WS header: {:?}", header);
|
||||||
|
match header.opcode {
|
||||||
|
WSHeaderOpcode::Ping => {
|
||||||
|
DEBUG!("Receive PING from client, send PONG");
|
||||||
|
let data = header.read_data_from(stream)?;
|
||||||
|
let mut respond_header = WSHeader {
|
||||||
|
fin: 1,
|
||||||
|
opcode: WSHeaderOpcode::Pong,
|
||||||
|
len: data.len(),
|
||||||
|
mask: !header.mask,
|
||||||
|
mask_key: Vec::new(),
|
||||||
|
};
|
||||||
|
respond_header.send_to(stream, &data)?;
|
||||||
|
}
|
||||||
|
WSHeaderOpcode::Pong => {}
|
||||||
|
_ => {}
|
||||||
|
};
|
||||||
|
Ok(header)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn read_data_from(
|
||||||
|
&mut self,
|
||||||
|
stream: &mut FCGIOStream,
|
||||||
|
) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
|
||||||
|
let mut vec = stream.stdin_read_exact(self.len)?;
|
||||||
|
if self.mask == 1 {
|
||||||
|
for i in 0..vec.len() {
|
||||||
|
vec[i] = vec[i] ^ self.mask_key[i % 4];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(vec)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn send_to(
|
||||||
|
&mut self,
|
||||||
|
stream: &mut FCGIOStream,
|
||||||
|
data: &[u8],
|
||||||
|
) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
let mut frame: Vec<u8>;
|
||||||
|
if self.mask == 1 {
|
||||||
|
let mut rng = rand::thread_rng();
|
||||||
|
let r = rng.gen::<u32>();
|
||||||
|
self.mask_key = vec![0, 4];
|
||||||
|
self.mask_key[0] = ((r >> 24) & 0xFF) as u8;
|
||||||
|
self.mask_key[1] = ((r >> 16) & 0xFF) as u8;
|
||||||
|
self.mask_key[2] = ((r >> 8) & 0xFF) as u8;
|
||||||
|
self.mask_key[3] = (r & 0xFF) as u8;
|
||||||
|
let mut masked_data = data.to_vec();
|
||||||
|
for i in 0..data.len() {
|
||||||
|
masked_data[i] = masked_data[i] ^ self.mask_key[i % 4];
|
||||||
|
}
|
||||||
|
// send out header + data
|
||||||
|
frame = self.as_bytes();
|
||||||
|
if masked_data.len() > 0 {
|
||||||
|
frame.append(&mut masked_data);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
frame = self.as_bytes();
|
||||||
|
if data.len() > 0 {
|
||||||
|
frame.extend(data);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
stream.write_record(frame)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn as_bytes(&self) -> Vec<u8> {
|
||||||
|
let mut vec: Vec<u8> = Vec::new();
|
||||||
|
vec.push((self.fin << 7) | self.opcode.as_u8());
|
||||||
|
if self.len <= 125 {
|
||||||
|
vec.push((self.mask << 7) | (self.len as u8));
|
||||||
|
} else if self.len < 65536 {
|
||||||
|
vec.extend([
|
||||||
|
(self.mask << 7) | 126,
|
||||||
|
((self.len) >> 8) as u8,
|
||||||
|
((self.len) & 0x00FF) as u8,
|
||||||
|
]);
|
||||||
|
} else {
|
||||||
|
vec.extend([
|
||||||
|
(self.mask << 7) | 127,
|
||||||
|
0,
|
||||||
|
0,
|
||||||
|
0,
|
||||||
|
0,
|
||||||
|
((self.len) >> 24) as u8,
|
||||||
|
(((self.len) >> 16) & 0x00FF) as u8,
|
||||||
|
(((self.len) >> 8) & 0x00FF) as u8,
|
||||||
|
((self.len) & 0x00FF) as u8,
|
||||||
|
]);
|
||||||
|
}
|
||||||
|
if self.mask == 1 {
|
||||||
|
vec.extend(&self.mask_key);
|
||||||
|
}
|
||||||
|
return vec;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
struct FCGIOStream {
|
struct FCGIOStream {
|
||||||
fd: RawFd,
|
fd: RawFd,
|
||||||
id: u16,
|
id: u16,
|
||||||
|
ws: bool,
|
||||||
|
stdin_buffer: Vec<u8>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl FCGIOStream {
|
impl FCGIOStream {
|
||||||
|
fn read_stdin_record(&mut self) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
if !self.ws {
|
||||||
|
WARN!("read_stdin_record is only active when the current connection is websocket");
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
let header = fcgi_read_header(self)?;
|
||||||
|
match header.kind {
|
||||||
|
FCGIHeaderType::Stdin => {
|
||||||
|
let body = fcgi_read_body(self, &header)?;
|
||||||
|
self.stdin_buffer.extend(body);
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
WARN!(
|
||||||
|
"Expect FCGIHeaderType::Stdin record, received {}. Ignore it",
|
||||||
|
header.kind
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn stdin_read_exact(&mut self, len: usize) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
|
||||||
|
while self.stdin_buffer.len() < len {
|
||||||
|
self.read_stdin_record()?;
|
||||||
|
}
|
||||||
|
// consume first n bytes of the buffer vector
|
||||||
|
Ok(self.stdin_buffer.drain(0..len).collect::<Vec<u8>>())
|
||||||
|
}
|
||||||
|
|
||||||
fn write_record(&mut self, buf: Vec<u8>) -> Result<(), Box<dyn std::error::Error>> {
|
fn write_record(&mut self, buf: Vec<u8>) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
fcgi_send_stdout(self, self.id, Some(buf))?;
|
fcgi_send_stdout(self, self.id, Some(buf))?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
fn write_variadic(
|
}
|
||||||
&mut self,
|
|
||||||
values: mlua::Variadic<LuaValue>,
|
|
||||||
) -> Result<(), Box<dyn std::error::Error>> {
|
|
||||||
let mut output: Vec<u8> = Vec::new();
|
|
||||||
|
|
||||||
for value in values {
|
fn vec_from_variadic(
|
||||||
match &value {
|
values: mlua::Variadic<LuaValue>,
|
||||||
LuaNil => {}
|
bin_only: bool,
|
||||||
LuaValue::Boolean(v) => output.extend(v.to_string().as_bytes()),
|
) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
|
||||||
LuaValue::Integer(v) => output.extend(v.to_string().as_bytes()),
|
let mut output: Vec<u8> = Vec::new();
|
||||||
LuaValue::Number(v) => output.extend(v.to_string().as_bytes()),
|
let error = Box::new(ERR!("Unsupported data type"));
|
||||||
LuaValue::String(v) => output.extend(v.as_bytes()),
|
for value in values {
|
||||||
LuaValue::LightUserData(_)
|
match &value {
|
||||||
| LuaValue::Table(_)
|
LuaNil => {}
|
||||||
| LuaValue::Function(_)
|
LuaValue::Boolean(v) => {
|
||||||
| LuaValue::Thread(_) => {
|
if bin_only {
|
||||||
return Err(Box::new(ERR!("Unsupported data type")));
|
return Err(error);
|
||||||
}
|
}
|
||||||
LuaValue::UserData(v) => {
|
output.extend(v.to_string().as_bytes());
|
||||||
if v.is::<LuabyteArray>() {
|
}
|
||||||
let arr = v.borrow::<LuabyteArray>()?;
|
LuaValue::Integer(v) => {
|
||||||
output.extend(&arr.0);
|
if bin_only {
|
||||||
} else {
|
return Err(error);
|
||||||
let st = value.to_pointer() as *const LuaSlice;
|
}
|
||||||
if unsafe { (*st).magic } != LUA_SLICE_MAGIC {
|
output.extend(v.to_string().as_bytes());
|
||||||
return Err(Box::new(ERR!("Unsupported data type")));
|
}
|
||||||
}
|
LuaValue::Number(v) => {
|
||||||
let data_slice =
|
if bin_only {
|
||||||
unsafe { std::slice::from_raw_parts((*st).data, (*st).len) };
|
return Err(error);
|
||||||
output.extend(data_slice);
|
}
|
||||||
|
output.extend(v.to_string().as_bytes());
|
||||||
|
}
|
||||||
|
LuaValue::String(v) => {
|
||||||
|
output.extend(v.as_bytes());
|
||||||
|
}
|
||||||
|
LuaValue::LightUserData(_)
|
||||||
|
| LuaValue::Table(_)
|
||||||
|
| LuaValue::Function(_)
|
||||||
|
| LuaValue::Thread(_) => {
|
||||||
|
return Err(error);
|
||||||
|
}
|
||||||
|
LuaValue::UserData(v) => {
|
||||||
|
if v.is::<LuabyteArray>() {
|
||||||
|
let arr = v.borrow::<LuabyteArray>()?;
|
||||||
|
output.extend(&arr.0);
|
||||||
|
} else {
|
||||||
|
let st = value.to_pointer() as *const LuaSlice;
|
||||||
|
if unsafe { (*st).magic } != LUA_SLICE_MAGIC {
|
||||||
|
return Err(error);
|
||||||
}
|
}
|
||||||
}
|
let data_slice = unsafe { std::slice::from_raw_parts((*st).data, (*st).len) };
|
||||||
LuaValue::Error(e) => {
|
output.extend(data_slice);
|
||||||
fcgi_send_stderr(self, self.id, Some(e.to_string().into()))?;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
LuaValue::Error(e) => {
|
||||||
|
return Err(Box::new(ERR!(e.to_string())));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if output.len() > 0 {
|
|
||||||
self.write_record(output)?;
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
Ok(output)
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Write for FCGIOStream {
|
impl Write for FCGIOStream {
|
||||||
@ -540,13 +788,29 @@ impl Write for FCGIOStream {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Read for FCGIOStream {
|
||||||
|
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
|
||||||
|
let ret = unsafe { libc::read(self.fd, buf.as_ptr() as *mut libc::c_void, buf.len()) };
|
||||||
|
if ret < 0 {
|
||||||
|
let msg = format!("Unable to read data from {}: return {}", self.fd, ret);
|
||||||
|
return Err(ERR!(msg));
|
||||||
|
}
|
||||||
|
Ok(ret as usize)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl mlua::UserData for FCGIOStream {
|
impl mlua::UserData for FCGIOStream {
|
||||||
fn add_methods<'lua, M: mlua::UserDataMethods<'lua, Self>>(methods: &mut M) {
|
fn add_methods<'lua, M: mlua::UserDataMethods<'lua, Self>>(methods: &mut M) {
|
||||||
methods.add_method_mut(
|
methods.add_method_mut(
|
||||||
"echo",
|
"echo",
|
||||||
|_, this: &mut FCGIOStream, values: mlua::Variadic<_>| {
|
|_, this: &mut FCGIOStream, values: mlua::Variadic<_>| {
|
||||||
this.write_variadic(values)
|
let output = vec_from_variadic(values, false)
|
||||||
.map_err(|e| mlua::Error::external(ERR!(e.to_string())))
|
.map_err(|e| mlua::Error::external(ERR!(e.to_string())))?;
|
||||||
|
if output.len() > 0 {
|
||||||
|
this.write_record(output)
|
||||||
|
.map_err(|e| mlua::Error::external(ERR!(e.to_string())))?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
methods.add_method_mut("send_file", |_, this: &mut FCGIOStream, path: String| {
|
methods.add_method_mut("send_file", |_, this: &mut FCGIOStream, path: String| {
|
||||||
@ -570,12 +834,113 @@ impl mlua::UserData for FCGIOStream {
|
|||||||
methods.add_method_mut(
|
methods.add_method_mut(
|
||||||
"print",
|
"print",
|
||||||
|_, this: &mut FCGIOStream, values: mlua::Variadic<_>| {
|
|_, this: &mut FCGIOStream, values: mlua::Variadic<_>| {
|
||||||
this.write_variadic(values)
|
let output = vec_from_variadic(values, false)
|
||||||
.map_err(|e| mlua::Error::external(ERR!(e.to_string())))
|
.map_err(|e| mlua::Error::external(ERR!(e.to_string())))?;
|
||||||
|
if output.len() > 0 {
|
||||||
|
this.write_record(output)
|
||||||
|
.map_err(|e| mlua::Error::external(ERR!(e.to_string())))?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
|
methods.add_method("is_ws", |_, this: &FCGIOStream, ()| Ok(this.ws));
|
||||||
methods.add_method("fd", |_, this: &FCGIOStream, ()| Ok(this.fd));
|
methods.add_method("fd", |_, this: &FCGIOStream, ()| Ok(this.fd));
|
||||||
methods.add_method("id", |_, this: &FCGIOStream, ()| Ok(this.id));
|
methods.add_method("id", |_, this: &FCGIOStream, ()| Ok(this.id));
|
||||||
|
|
||||||
|
// websocket specific methods
|
||||||
|
methods.add_method_mut("ws_header", |_, this: &mut FCGIOStream, ()| {
|
||||||
|
let header = WSHeader::read_from(this)
|
||||||
|
.map_err(|e| mlua::Error::external(ERR!(e.to_string())))?;
|
||||||
|
Ok(header)
|
||||||
|
});
|
||||||
|
|
||||||
|
methods.add_method_mut(
|
||||||
|
"ws_read",
|
||||||
|
|_, this: &mut FCGIOStream, value: mlua::Value| match value {
|
||||||
|
LuaValue::UserData(v) => {
|
||||||
|
if v.is::<WSHeader>() {
|
||||||
|
let mut header = v.borrow_mut::<WSHeader>()?;
|
||||||
|
let vec = header
|
||||||
|
.read_data_from(this)
|
||||||
|
.map_err(|e| mlua::Error::external(ERR!(e.to_string())))?;
|
||||||
|
return Ok(LuabyteArray(vec));
|
||||||
|
}
|
||||||
|
|
||||||
|
Err(mlua::Error::external(ERR!(
|
||||||
|
"Invalid user-data used as websocket header"
|
||||||
|
)))
|
||||||
|
}
|
||||||
|
_ => Err(mlua::Error::external(ERR!(
|
||||||
|
"Invalid data used as websocket header"
|
||||||
|
))),
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
methods.add_method_mut(
|
||||||
|
"ws_send",
|
||||||
|
|_, this: &mut FCGIOStream, (is_bin, values): (bool, Variadic<mlua::Value>)| {
|
||||||
|
let output = vec_from_variadic(values, is_bin)
|
||||||
|
.map_err(|e| mlua::Error::external(ERR!(e.to_string())))?;
|
||||||
|
if output.len() > 0 {
|
||||||
|
let mut header = WSHeader {
|
||||||
|
fin: 1,
|
||||||
|
opcode: WSHeaderOpcode::Text,
|
||||||
|
len: output.len(),
|
||||||
|
mask: 0,
|
||||||
|
mask_key: Vec::new(),
|
||||||
|
};
|
||||||
|
header
|
||||||
|
.send_to(this, &output)
|
||||||
|
.map_err(|e| mlua::Error::external(ERR!(e.to_string())))?
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
methods.add_method_mut("ws_close", |_, this: &mut FCGIOStream, code: u32| {
|
||||||
|
let mut header = WSHeader {
|
||||||
|
fin: 1,
|
||||||
|
opcode: WSHeaderOpcode::Close,
|
||||||
|
len: 2,
|
||||||
|
mask: 0,
|
||||||
|
mask_key: Vec::new(),
|
||||||
|
};
|
||||||
|
header
|
||||||
|
.send_to(this, &[(code >> 8) as u8, (code & 0xFF) as u8])
|
||||||
|
.map_err(|e| mlua::Error::external(ERR!(e.to_string())))?;
|
||||||
|
Ok(())
|
||||||
|
});
|
||||||
|
|
||||||
|
methods.add_method_mut("ws_send_file", |_, this: &mut FCGIOStream, path: String| {
|
||||||
|
let file = std::fs::File::open(path)?;
|
||||||
|
let mut buf_reader = BufReader::with_capacity(2048, file);
|
||||||
|
let mut is_first = true;
|
||||||
|
|
||||||
|
loop {
|
||||||
|
let buffer = buf_reader.fill_buf()?;
|
||||||
|
let length = buffer.len();
|
||||||
|
let mut header = WSHeader {
|
||||||
|
fin: if length == 0 { 1 } else { 0 },
|
||||||
|
opcode: WSHeaderOpcode::Data,
|
||||||
|
len: length,
|
||||||
|
mask: 0,
|
||||||
|
mask_key: Vec::new(),
|
||||||
|
};
|
||||||
|
if is_first {
|
||||||
|
header.opcode = WSHeaderOpcode::Bin;
|
||||||
|
is_first = false;
|
||||||
|
}
|
||||||
|
header
|
||||||
|
.send_to(this, &buffer)
|
||||||
|
.map_err(|e| mlua::Error::external(ERR!(e.to_string())))?;
|
||||||
|
if length == 0 {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
buf_reader.consume(length);
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
});
|
||||||
|
|
||||||
methods.add_method("log_info", |_, _: &FCGIOStream, string: String| {
|
methods.add_method("log_info", |_, _: &FCGIOStream, string: String| {
|
||||||
INFO!("{}", string);
|
INFO!("{}", string);
|
||||||
Ok(())
|
Ok(())
|
||||||
@ -612,10 +977,21 @@ fn fcgi_execute_request_handle(rq: &mut FGCIRequest) -> Result<(), Box<dyn std::
|
|||||||
global.set("_SERVER", request)?;
|
global.set("_SERVER", request)?;
|
||||||
|
|
||||||
// put the fcgio object
|
// put the fcgio object
|
||||||
let fcgio = FCGIOStream {
|
let mut fcgio = FCGIOStream {
|
||||||
fd: rq.fd,
|
fd: rq.fd,
|
||||||
id: rq.id,
|
id: rq.id,
|
||||||
|
ws: false,
|
||||||
|
stdin_buffer: Vec::new(),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// check if the connection is upgraded as websocket
|
||||||
|
if let Some(header) = rq.params.get("HTTP_UPGRADE") {
|
||||||
|
if header == "websocket" {
|
||||||
|
INFO!("Websocket is enabled on the current connection {}", rq.id);
|
||||||
|
fcgio.ws = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
global.set("fcgio", fcgio)?;
|
global.set("fcgio", fcgio)?;
|
||||||
|
|
||||||
// support for byte array
|
// support for byte array
|
||||||
@ -747,62 +1123,72 @@ pub fn process_request<T: Read + Write + AsRawFd>(
|
|||||||
}
|
}
|
||||||
FCGIHeaderType::Params => {
|
FCGIHeaderType::Params => {
|
||||||
if let Some(rq) = requests.get_mut(&header.id) {
|
if let Some(rq) = requests.get_mut(&header.id) {
|
||||||
if rq.state != FCGIRequestState::WaitForParams {
|
match &rq.state {
|
||||||
WARN!(
|
FCGIRequestState::WaitForParams => {
|
||||||
"Should not receive a param record as the request is in {} state",
|
if header.length == 0 {
|
||||||
rq.state
|
DEBUG!(
|
||||||
);
|
"All param records read, now wait for stdin data on request: {}",
|
||||||
} else {
|
header.id
|
||||||
if header.length == 0 {
|
);
|
||||||
DEBUG!(
|
rq.state = FCGIRequestState::WaitForStdin;
|
||||||
"All param records read, now wait for stdin data on request: {}",
|
} else {
|
||||||
header.id
|
fcgi_decode_params(rq, &fcgi_read_body(stream, &header)?)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
__ => {
|
||||||
|
WARN!(
|
||||||
|
"Should not receive a param record as the request is in {} state",
|
||||||
|
rq.state
|
||||||
);
|
);
|
||||||
rq.state = FCGIRequestState::WaitForStdin;
|
|
||||||
} else {
|
|
||||||
fcgi_decode_params(rq, &fcgi_read_body(stream, &header)?)?;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
WARN!("Uknow request {}, ignore param record", header.id);
|
WARN!("Uknown request {}, ignore param record", header.id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
FCGIHeaderType::Stdin => {
|
FCGIHeaderType::Stdin => {
|
||||||
if let Some(rq) = requests.get_mut(&header.id) {
|
if let Some(rq) = requests.get_mut(&header.id) {
|
||||||
if rq.state != FCGIRequestState::WaitForStdin {
|
match &rq.state {
|
||||||
WARN!(
|
FCGIRequestState::WaitForStdin => {
|
||||||
"Should not receive a stdin record as the request is in {} state",
|
if header.length == 0 {
|
||||||
rq.state
|
DEBUG!(
|
||||||
);
|
"All stdin records read, now wait for stdout data on request: {}",
|
||||||
} else {
|
header.id
|
||||||
if header.length == 0 {
|
);
|
||||||
DEBUG!(
|
rq.state = FCGIRequestState::WaitForStdout;
|
||||||
"All stdin records read, now wait for stdout data on request: {}",
|
if let Err(error) = fcgi_execute_request_handle(rq) {
|
||||||
header.id
|
// send stderror
|
||||||
);
|
fcgi_send_stderr(stream, header.id, Some(error))?;
|
||||||
rq.state = FCGIRequestState::WaitForStdout;
|
}
|
||||||
if let Err(error) = fcgi_execute_request_handle(rq) {
|
fcgi_send_stderr(stream, header.id, None)?;
|
||||||
// send stderror
|
fcgi_send_stdout(stream, header.id, None)?;
|
||||||
fcgi_send_stderr(stream, header.id, Some(error))?;
|
// send end connection
|
||||||
}
|
fcgi_send_end_request(
|
||||||
fcgi_send_stderr(stream, header.id, None)?;
|
stream,
|
||||||
fcgi_send_stdout(stream, header.id, None)?;
|
header.id,
|
||||||
// send end connection
|
EndRequestStatus::Complete,
|
||||||
fcgi_send_end_request(stream, header.id, EndRequestStatus::Complete)?;
|
)?;
|
||||||
break;
|
break;
|
||||||
} else {
|
} else {
|
||||||
let body = fcgi_read_body(stream, &header)?;
|
let body = fcgi_read_body(stream, &header)?;
|
||||||
if let None = rq.data {
|
if let None = rq.data {
|
||||||
rq.data = Some(Vec::new())
|
rq.data = Some(Vec::new())
|
||||||
}
|
}
|
||||||
match rq.data.take() {
|
match rq.data.take() {
|
||||||
Some(mut data) => {
|
Some(mut data) => {
|
||||||
data.extend(body);
|
data.extend(body);
|
||||||
rq.data = Some(data);
|
rq.data = Some(data);
|
||||||
|
}
|
||||||
|
None => {}
|
||||||
}
|
}
|
||||||
None => {}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
_ => {
|
||||||
|
WARN!(
|
||||||
|
"Should not receive a stdin record as the request is in {} state",
|
||||||
|
rq.state
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
WARN!("Uknow request {}, ignore stdin record", header.id);
|
WARN!("Uknow request {}, ignore stdin record", header.id);
|
||||||
@ -820,17 +1206,14 @@ pub fn process_request<T: Read + Write + AsRawFd>(
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn fcgi_read_header<T: Read + Write + AsRawFd>(stream: &mut T) -> Result<FcgiHeader, Error> {
|
fn fcgi_read_header<T: Read + Write>(stream: &mut T) -> Result<FcgiHeader, Error> {
|
||||||
let mut buf = vec![0; FCGI_HEADER_LEN];
|
let mut buf = vec![0; FCGI_HEADER_LEN];
|
||||||
stream.read_exact(&mut buf)?;
|
stream.read_exact(&mut buf)?;
|
||||||
let header: FcgiHeader = FcgiHeader::from_bytes(&buf);
|
let header: FcgiHeader = FcgiHeader::from_bytes(&buf);
|
||||||
Ok(header)
|
Ok(header)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn fcgi_read_body<T: Read + Write + AsRawFd>(
|
fn fcgi_read_body<T: Read + Write>(stream: &mut T, header: &FcgiHeader) -> Result<Vec<u8>, Error> {
|
||||||
stream: &mut T,
|
|
||||||
header: &FcgiHeader,
|
|
||||||
) -> Result<Vec<u8>, Error> {
|
|
||||||
let mut buf = vec![0; header.length as usize];
|
let mut buf = vec![0; header.length as usize];
|
||||||
stream.read_exact(&mut buf)?;
|
stream.read_exact(&mut buf)?;
|
||||||
let mut pad: Vec<u8> = vec![0; header.padding as usize];
|
let mut pad: Vec<u8> = vec![0; header.padding as usize];
|
||||||
@ -903,8 +1286,16 @@ impl mlua::UserData for LuabyteArray {
|
|||||||
fn add_methods<'lua, M: mlua::UserDataMethods<'lua, Self>>(methods: &mut M) {
|
fn add_methods<'lua, M: mlua::UserDataMethods<'lua, Self>>(methods: &mut M) {
|
||||||
methods.add_method("size", |_, this: &LuabyteArray, ()| Ok(this.0.len()));
|
methods.add_method("size", |_, this: &LuabyteArray, ()| Ok(this.0.len()));
|
||||||
|
|
||||||
methods.add_method("ptr", |_, this: &LuabyteArray, ()| {
|
methods.add_method("into", |_, this: &LuabyteArray, value: mlua::Value| {
|
||||||
Ok(this.0.as_ptr() as usize)
|
let st = value.to_pointer() as *mut LuaSlice;
|
||||||
|
if unsafe { (*st).magic } != LUA_SLICE_MAGIC {
|
||||||
|
return Err(mlua::Error::external(ERR!("Unsupported data type")));
|
||||||
|
}
|
||||||
|
unsafe {
|
||||||
|
(*st).data = this.0.as_ptr() as *const u8;
|
||||||
|
(*st).len = this.0.len();
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
});
|
});
|
||||||
|
|
||||||
methods.add_method("fileout", |_, this: &LuabyteArray, path: String| {
|
methods.add_method("fileout", |_, this: &LuabyteArray, path: String| {
|
||||||
@ -946,6 +1337,13 @@ impl mlua::UserData for LuabyteArray {
|
|||||||
Ok(string) => Ok(Some(string)),
|
Ok(string) => Ok(Some(string)),
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
|
methods.add_method_mut("extend", |_, this, values: Variadic<mlua::Value>| {
|
||||||
|
let mut output = vec_from_variadic(values, true)
|
||||||
|
.map_err(|e| mlua::Error::external(ERR!(e.to_string())))?;
|
||||||
|
this.0.append(&mut output);
|
||||||
|
Ok(())
|
||||||
|
});
|
||||||
|
|
||||||
methods.add_meta_method_mut(
|
methods.add_meta_method_mut(
|
||||||
mlua::MetaMethod::NewIndex,
|
mlua::MetaMethod::NewIndex,
|
||||||
|_, this, (index, value): (isize, u8)| {
|
|_, this, (index, value): (isize, u8)| {
|
||||||
@ -976,7 +1374,12 @@ pub struct LuaSlice {
|
|||||||
#[no_mangle]
|
#[no_mangle]
|
||||||
pub extern "C" fn fcgi_send_slice(fd: RawFd, id: u16, ptr: *const u8, size: usize) -> isize {
|
pub extern "C" fn fcgi_send_slice(fd: RawFd, id: u16, ptr: *const u8, size: usize) -> isize {
|
||||||
let data_slice = unsafe { std::slice::from_raw_parts(ptr, size) }.to_vec();
|
let data_slice = unsafe { std::slice::from_raw_parts(ptr, size) }.to_vec();
|
||||||
let mut stream = FCGIOStream { fd, id };
|
let mut stream = FCGIOStream {
|
||||||
|
fd,
|
||||||
|
id,
|
||||||
|
ws: false,
|
||||||
|
stdin_buffer: Vec::new(),
|
||||||
|
};
|
||||||
|
|
||||||
if let Err(error) = fcgi_send_stdout(&mut stream, id, Some(data_slice)) {
|
if let Err(error) = fcgi_send_stdout(&mut stream, id, Some(data_slice)) {
|
||||||
ERROR!("Unable to send data slice: {}", error);
|
ERROR!("Unable to send data slice: {}", error);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user