Apparently, non-blocking connect doesn't work on windows if you use 0

timeout in the select call...
This commit is contained in:
Diego Nehab 2005-03-11 00:20:21 +00:00
parent 63e3d7c5b0
commit e57f9e9964
7 changed files with 106 additions and 100 deletions

View File

@ -33,7 +33,7 @@
<Tool
Name="VCLinkerTool"
AdditionalDependencies="ws2_32.lib"
OutputFile="$(OutDir)/lsocket.dll"
OutputFile="$(OutDir)/csocket.dll"
LinkIncremental="2"
GenerateDebugInformation="TRUE"
ProgramDatabaseFile="$(OutDir)/luasocket.pdb"
@ -81,7 +81,7 @@
<Tool
Name="VCLinkerTool"
AdditionalDependencies="ws2_32.lib"
OutputFile="$(OutDir)/lsocket.dll"
OutputFile="$(OutDir)/csocket.dll"
LinkIncremental="1"
GenerateDebugInformation="TRUE"
SubSystem="2"

View File

@ -32,7 +32,7 @@
Name="VCCustomBuildTool"/>
<Tool
Name="VCLinkerTool"
OutputFile="$(OutDir)/lmime.dll"
OutputFile="$(OutDir)/cmime.dll"
LinkIncremental="2"
GenerateDebugInformation="TRUE"
ProgramDatabaseFile="$(OutDir)/mime.pdb"
@ -79,7 +79,7 @@
Name="VCCustomBuildTool"/>
<Tool
Name="VCLinkerTool"
OutputFile="$(OutDir)/lmime.dll"
OutputFile="$(OutDir)/cmime.dll"
LinkIncremental="1"
GenerateDebugInformation="TRUE"
SubSystem="2"

View File

@ -35,6 +35,13 @@ local sending = newset()
-- context for connections and servers
local context = {}
function wait(who, what)
if what == "input" then receiving:insert(who)
else sending:insert(who) end
context[who].last = socket.gettime()
coroutine.yield()
end
-- initializes the forward server
function init()
if table.getn(arg) < 1 then
@ -63,145 +70,142 @@ function init()
end
-- starts a connection in a non-blocking way
function nbkcon(host, port)
local peer, err = socket.tcp()
if not peer then return nil, err end
peer:settimeout(0)
local ret, err = peer:connect(host, port)
if ret then return peer end
if err ~= "timeout" then
peer:close()
return nil, err
function connect(who, host, port)
who:settimeout(0.1)
print("trying to connect peer", who, host, port)
local ret, err = who:connect(host, port)
if not ret and err == "timeout" then
print("got timeout, will wait", who)
wait(who, "output")
ret, err = who:connected()
print("connection results arrived", who, ret, err)
end
if not ret then
print("connection failed", who)
kick(who)
kick(context[who].peer)
else
return forward(who)
end
return peer
end
-- gets rid of a client
-- gets rid of a client and its peer
function kick(who)
if context[who] then
if who and context[who] then
sending:remove(who)
receiving:remove(who)
local peer = context[who].peer
context[who] = nil
who:close()
end
end
-- decides what to do with a thread based on coroutine return
function route(who, status, what)
if status and what then
if what == "receiving" then receiving:insert(who) end
if what == "sending" then sending:insert(who) end
else kick(who) end
end
-- loops accepting connections and creating new threads to deal with them
function accept(server)
while true do
-- accept a new connection and start a new coroutine to deal with it
local client = server:accept()
print("accepted ", client)
if client then
-- start a new connection, non-blockingly, to the forwarding address
local ohost = context[server].ohost
local oport = context[server].oport
local peer = nbkcon(ohost, oport)
-- create contexts for client and peer.
local peer, err = socket.tcp()
if peer then
context[client] = {
last = socket.gettime(),
-- client goes straight to forwarding loop
thread = coroutine.create(forward),
peer = peer,
}
-- make sure peer will be tested for writing in the next select
-- round, which means the connection attempt has finished
sending:insert(peer)
context[peer] = {
last = socket.gettime(),
peer = client,
thread = coroutine.create(chkcon),
-- peer first tries to connect to forwarding address
thread = coroutine.create(connect),
last = socket.gettime()
}
-- put both in non-blocking mode
client:settimeout(0)
peer:settimeout(0)
-- resume peer and client so they can do their thing
local ohost = context[server].ohost
local oport = context[server].oport
coroutine.resume(context[peer].thread, peer, ohost, oport)
coroutine.resume(context[client].thread, client)
else
-- otherwise just dump the client
client:close()
print(err)
client:close()
end
end
-- tell scheduler we are done for now
coroutine.yield("receiving")
wait(server, "input")
end
end
-- forwards all data arriving to the appropriate peer
function forward(who)
print("starting to foward", who)
who:settimeout(0)
while true do
-- wait until we have something to read
wait(who, "input")
-- try to read as much as possible
local data, rec_err, partial = who:receive("*a")
-- if we had an error other than timeout, abort
if rec_err and rec_err ~= "timeout" then return error(rec_err) end
if rec_err and rec_err ~= "timeout" then return kick(who) end
-- if we got a timeout, we probably have partial results to send
data = data or partial
-- renew our timestamp so scheduler sees we are active
context[who].last = socket.gettime()
-- forward what we got right away
local peer = context[who].peer
while true do
-- tell scheduler we need to wait until we can send something
coroutine.yield("sending")
wait(who, "output")
local ret, snd_err
local start = 0
ret, snd_err, start = peer:send(data, start+1)
if ret then break
elseif snd_err ~= "timeout" then return error(snd_err) end
-- renew our timestamp so scheduler sees we are active
context[who].last = socket.gettime()
elseif snd_err ~= "timeout" then return kick(who) end
end
-- if we are done receiving, we are done with this side of the
-- connection
if not rec_err then return nil end
-- otherwise tell schedule we have to wait for more data to arrive
coroutine.yield("receiving")
-- if we are done receiving, we are done
if not rec_err then return kick(who) end
end
end
-- checks if a connection completed successfully and if it did, starts
-- forwarding all data
function chkcon(who)
local ret, err = who:connected()
if ret then
receiving:insert(context[who].peer)
context[who].last = socket.gettime()
coroutine.yield("receiving")
return forward(who)
else return error(err) end
end
-- loop waiting until something happens, restarting the thread to deal with
-- what happened, and routing it to wait until something else happens
function go()
while true do
print("will select for reading")
for i,v in ipairs(receiving) do
print(i, v)
end
print("will select for sending")
for i,v in ipairs(sending) do
print(i, v)
end
-- check which sockets are interesting and act on them
readable, writable = socket.select(receiving, sending, 3)
-- for all readable connections, resume its thread and route it
print("was readable")
for i,v in ipairs(readable) do
print(i, v)
end
print("was writable")
for i,v in ipairs(writable) do
print(i, v)
end
-- for all readable connections, resume its thread
for _, who in ipairs(readable) do
receiving:remove(who)
if context[who] then
route(who, coroutine.resume(context[who].thread, who))
end
coroutine.resume(context[who].thread, who)
end
-- for all writable connections, do the same
for _, who in ipairs(writable) do
sending:remove(who)
if context[who] then
route(who, coroutine.resume(context[who].thread, who))
end
coroutine.resume(context[who].thread, who)
end
-- put all inactive threads in death row
local now = socket.gettime()
local deathrow
for who, data in pairs(context) do
if data.last then
if data.peer then
if now - data.last > TIMEOUT then
-- only create table if someone is doomed
-- only create table if at least one is doomed
deathrow = deathrow or {}
deathrow[who] = true
end

View File

@ -49,7 +49,7 @@ int sock_recvfrom(p_sock ps, char *data, size_t count,
void sock_setnonblocking(p_sock ps);
void sock_setblocking(p_sock ps);
int sock_waitfd(int fd, int sw, p_tm tm);
int sock_waitfd(p_sock ps, int sw, p_tm tm);
int sock_select(int n, fd_set *rfds, fd_set *wfds, fd_set *efds, p_tm tm);
int sock_connect(p_sock ps, SA *addr, socklen_t addr_len, p_tm tm);

View File

@ -228,8 +228,10 @@ static int meth_connect(lua_State *L)
static int meth_connected(lua_State *L)
{
p_tcp tcp = (p_tcp) aux_checkclass(L, "tcp{master}", 1);
int err = sock_connected(&tcp->sock, &tcp->tm);
p_tcp tcp;
int err;
tcp = (p_tcp) aux_checkclass(L, "tcp{master}", 1);
err = sock_connected(&tcp->sock, &tcp->tm);
if (err != IO_DONE) {
lua_pushnil(L);
lua_pushstring(L, sock_strerror(err));

View File

@ -22,10 +22,10 @@
#define WAITFD_R POLLIN
#define WAITFD_W POLLOUT
#define WAITFD_C (POLLIN|POLLOUT)
int sock_waitfd(int fd, int sw, p_tm tm) {
int sock_waitfd(p_sock ps, int sw, p_tm tm) {
int ret;
struct pollfd pfd;
pfd.fd = fd;
pfd.fd = *ps;
pfd.events = sw;
pfd.revents = 0;
if (tm_iszero(tm)) return IO_TIMEOUT; /* optimize timeout == 0 case */
@ -44,7 +44,7 @@ int sock_waitfd(int fd, int sw, p_tm tm) {
#define WAITFD_W 2
#define WAITFD_C (WAITFD_R|WAITFD_W)
int sock_waitfd(int fd, int sw, p_tm tm) {
int sock_waitfd(p_sock ps, int sw, p_tm tm) {
int ret;
fd_set rfds, wfds, *rp, *wp;
struct timeval tv, *tp;
@ -53,8 +53,8 @@ int sock_waitfd(int fd, int sw, p_tm tm) {
do {
/* must set bits within loop, because select may have modifed them */
rp = wp = NULL;
if (sw & WAITFD_R) { FD_ZERO(&rfds); FD_SET(fd, &rfds); rp = &rfds; }
if (sw & WAITFD_W) { FD_ZERO(&wfds); FD_SET(fd, &wfds); wp = &wfds; }
if (sw & WAITFD_R) { FD_ZERO(&rfds); FD_SET(*ps, &rfds); rp = &rfds; }
if (sw & WAITFD_W) { FD_ZERO(&wfds); FD_SET(*ps, &wfds); wp = &wfds; }
t = tm_getretry(tm);
tp = NULL;
if (t >= 0.0) {
@ -62,11 +62,11 @@ int sock_waitfd(int fd, int sw, p_tm tm) {
tv.tv_usec = (int)((t-tv.tv_sec)*1.0e6);
tp = &tv;
}
ret = select(fd+1, rp, wp, NULL, tp);
ret = select(*ps+1, rp, wp, NULL, tp);
} while (ret == -1 && errno == EINTR);
if (ret == -1) return errno;
if (ret == 0) return IO_TIMEOUT;
if (sw == WAITFD_C && FD_ISSET(fd, &rfds)) return IO_CLOSED;
if (sw == WAITFD_C && FD_ISSET(*ps, &rfds)) return IO_CLOSED;
return IO_DONE;
}
#endif
@ -177,7 +177,7 @@ int sock_connect(p_sock ps, SA *addr, socklen_t len, p_tm tm) {
\*-------------------------------------------------------------------------*/
int sock_connected(p_sock ps, p_tm tm) {
int err;
if ((err = sock_waitfd(*ps, WAITFD_C, tm) == IO_CLOSED)) {
if ((err = sock_waitfd(ps, WAITFD_C, tm) == IO_CLOSED)) {
if (recv(*ps, (char *) &err, 0, 0) == 0) return IO_DONE;
else return errno;
} else return err;
@ -198,7 +198,7 @@ int sock_accept(p_sock ps, p_sock pa, SA *addr, socklen_t *len, p_tm tm) {
err = errno;
if (err == EINTR) continue;
if (err != EAGAIN && err != ECONNABORTED) return err;
if ((err = sock_waitfd(*ps, WAITFD_R, tm)) != IO_DONE) return err;
if ((err = sock_waitfd(ps, WAITFD_R, tm)) != IO_DONE) return err;
}
/* can't reach here */
return IO_UNKNOWN;
@ -230,7 +230,7 @@ int sock_send(p_sock ps, const char *data, size_t count, size_t *sent, p_tm tm)
/* if failed fatal reason, report error */
if (err != EAGAIN) return err;
/* wait until we can send something or we timeout */
if ((err = sock_waitfd(*ps, WAITFD_W, tm)) != IO_DONE) return err;
if ((err = sock_waitfd(ps, WAITFD_W, tm)) != IO_DONE) return err;
}
/* can't reach here */
return IO_UNKNOWN;
@ -255,7 +255,7 @@ int sock_sendto(p_sock ps, const char *data, size_t count, size_t *sent,
if (put == 0 || err == EPIPE) return IO_CLOSED;
if (err == EINTR) continue;
if (err != EAGAIN) return err;
if ((err = sock_waitfd(*ps, WAITFD_W, tm)) != IO_DONE) return err;
if ((err = sock_waitfd(ps, WAITFD_W, tm)) != IO_DONE) return err;
}
return IO_UNKNOWN;
}
@ -277,7 +277,7 @@ int sock_recv(p_sock ps, char *data, size_t count, size_t *got, p_tm tm) {
if (taken == 0) return IO_CLOSED;
if (err == EINTR) continue;
if (err != EAGAIN) return err;
if ((err = sock_waitfd(*ps, WAITFD_R, tm)) != IO_DONE) return err;
if ((err = sock_waitfd(ps, WAITFD_R, tm)) != IO_DONE) return err;
}
return IO_UNKNOWN;
}
@ -300,7 +300,7 @@ int sock_recvfrom(p_sock ps, char *data, size_t count, size_t *got,
if (taken == 0) return IO_CLOSED;
if (err == EINTR) continue;
if (err != EAGAIN) return err;
if ((err = sock_waitfd(*ps, WAITFD_R, tm)) != IO_DONE) return err;
if ((err = sock_waitfd(ps, WAITFD_R, tm)) != IO_DONE) return err;
}
return IO_UNKNOWN;
}

View File

@ -45,15 +45,15 @@ int sock_close(void) {
#define WAITFD_E 4
#define WAITFD_C (WAITFD_E|WAITFD_W)
int sock_waitfd(t_sock fd, int sw, p_tm tm) {
int sock_waitfd(p_sock ps, int sw, p_tm tm) {
int ret;
fd_set rfds, wfds, efds, *rp = NULL, *wp = NULL, *ep = NULL;
struct timeval tv, *tp = NULL;
double t;
if (tm_iszero(tm)) return IO_TIMEOUT; /* optimize timeout == 0 case */
if (sw & WAITFD_R) { FD_ZERO(&rfds); FD_SET(fd, &rfds); rp = &rfds; }
if (sw & WAITFD_W) { FD_ZERO(&wfds); FD_SET(fd, &wfds); wp = &wfds; }
if (sw & WAITFD_C) { FD_ZERO(&efds); FD_SET(fd, &efds); ep = &efds; }
if (sw & WAITFD_R) { FD_ZERO(&rfds); FD_SET(*ps, &rfds); rp = &rfds; }
if (sw & WAITFD_W) { FD_ZERO(&wfds); FD_SET(*ps, &wfds); wp = &wfds; }
if (sw & WAITFD_C) { FD_ZERO(&efds); FD_SET(*ps, &efds); ep = &efds; }
if ((t = tm_get(tm)) >= 0.0) {
tv.tv_sec = (int) t;
tv.tv_usec = (int) ((t-tv.tv_sec)*1.0e6);
@ -62,7 +62,7 @@ int sock_waitfd(t_sock fd, int sw, p_tm tm) {
ret = select(0, rp, wp, ep, tp);
if (ret == -1) return WSAGetLastError();
if (ret == 0) return IO_TIMEOUT;
if (sw == WAITFD_C && FD_ISSET(fd, &efds)) return IO_CLOSED;
if (sw == WAITFD_C && FD_ISSET(*ps, &efds)) return IO_CLOSED;
return IO_DONE;
}
@ -127,15 +127,15 @@ int sock_connect(p_sock ps, SA *addr, socklen_t len, p_tm tm) {
/*-------------------------------------------------------------------------*\
* Check if socket is connected
\*-------------------------------------------------------------------------*/
int sock_connected(p_sock ps) {
int sock_connected(p_sock ps, p_tm tm) {
int err;
if ((err = sock_waitfd(*ps, WAITFD_C, tm)) == IO_CLOSED) {
if ((err = sock_waitfd(ps, WAITFD_C, tm)) == IO_CLOSED) {
int len = sizeof(err);
/* give windows time to set the error (yes, disgusting) */
Sleep(0);
/* find out why we failed */
getsockopt(*ps, SOL_SOCKET, SO_ERROR, (char *)&err, &len);
/* we KNOW there was an error. if why is 0, we will return
/* we KNOW there was an error. if 'why' is 0, we will return
* "unknown error", but it's not really our fault */
return err > 0? err: IO_UNKNOWN;
} else return err;
@ -181,7 +181,7 @@ int sock_accept(p_sock ps, p_sock pa, SA *addr, socklen_t *len, p_tm tm) {
/* if we failed because there was no connectoin, keep trying */
if (err != WSAEWOULDBLOCK && err != WSAECONNABORTED) return err;
/* call select to avoid busy wait */
if ((err = sock_waitfd(*ps, WAITFD_R, tm)) != IO_DONE) return err;
if ((err = sock_waitfd(ps, WAITFD_R, tm)) != IO_DONE) return err;
}
/* can't reach here */
return IO_UNKNOWN;
@ -213,7 +213,7 @@ int sock_send(p_sock ps, const char *data, size_t count, size_t *sent, p_tm tm)
/* we can only proceed if there was no serious error */
if (err != WSAEWOULDBLOCK) return err;
/* avoid busy wait */
if ((err = sock_waitfd(*ps, WAITFD_W, tm)) != IO_DONE) return err;
if ((err = sock_waitfd(ps, WAITFD_W, tm)) != IO_DONE) return err;
}
/* can't reach here */
return IO_UNKNOWN;
@ -236,7 +236,7 @@ int sock_sendto(p_sock ps, const char *data, size_t count, size_t *sent,
}
err = WSAGetLastError();
if (err != WSAEWOULDBLOCK) return err;
if ((err = sock_waitfd(*ps, WAITFD_W, tm)) != IO_DONE) return err;
if ((err = sock_waitfd(ps, WAITFD_W, tm)) != IO_DONE) return err;
}
return IO_UNKNOWN;
}
@ -257,7 +257,7 @@ int sock_recv(p_sock ps, char *data, size_t count, size_t *got, p_tm tm) {
if (taken == 0) return IO_CLOSED;
err = WSAGetLastError();
if (err != WSAEWOULDBLOCK) return err;
if ((err = sock_waitfd(*ps, WAITFD_R, tm)) != IO_DONE) return err;
if ((err = sock_waitfd(ps, WAITFD_R, tm)) != IO_DONE) return err;
}
return IO_UNKNOWN;
}
@ -279,7 +279,7 @@ int sock_recvfrom(p_sock ps, char *data, size_t count, size_t *got,
if (taken == 0) return IO_CLOSED;
err = WSAGetLastError();
if (err != WSAEWOULDBLOCK) return err;
if ((err = sock_waitfd(*ps, WAITFD_R, tm)) != IO_DONE) return err;
if ((err = sock_waitfd(ps, WAITFD_R, tm)) != IO_DONE) return err;
}
return IO_UNKNOWN;
}