From 5e8ae76248ed31496dc6fef7855498a0479159ed Mon Sep 17 00:00:00 2001 From: Diego Nehab Date: Fri, 19 Aug 2005 01:35:26 +0000 Subject: [PATCH] Dispatcher working for check-links. Need to get it working with forwarder. --- etc/check-links.lua | 122 ++++++++++++-------- etc/dispatch.lua | 267 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 344 insertions(+), 45 deletions(-) create mode 100644 etc/dispatch.lua diff --git a/etc/check-links.lua b/etc/check-links.lua index 9d837e4..e06cc91 100644 --- a/etc/check-links.lua +++ b/etc/check-links.lua @@ -1,49 +1,84 @@ ----------------------------------------------------------------------------- --- Little program that checks links in HTML files +-- Little program that checks links in HTML files, using coroutines and +-- non-blocking I/O via the dispatcher module. -- LuaSocket sample files -- Author: Diego Nehab --- RCS ID: $Id$ +-- RCS ID: $$ ----------------------------------------------------------------------------- -local http = require("socket.http") -local url = require("socket.url") -http.TIMEOUT = 10 +local dispatch, url, http, handler + +arg = arg or {} +if table.getn(arg) < 1 then + print("Usage:\n luasocket check-links.lua [-n] {}") + exit() +end + +if arg[1] ~= "-n" then + -- if using blocking I/O, simulate dispatcher interface + url = require("socket.url") + http = require("socket.http") + handler = { + start = function(self, f) + f() + end, + tcp = socket.tcp + } + http.TIMEOUT = 10 +else + -- if non-blocking I/O was requested, disable dispatcher + table.remove(arg, 1) + dispatch = require("dispatch") + dispatch.TIMEOUT = 10 + url = require("socket.url") + http = require("socket.http") + handler = dispatch.newhandler() +end + +local nthreads = 0 + +-- get the status of a URL using the dispatcher +function getstatus(link) + local parsed = url.parse(link, {scheme = "file"}) + if parsed.scheme == "http" then + nthreads = nthreads + 1 + handler:start(function() + local r, c, h, s = http.request{ + method = "HEAD", + url = link, + create = handler.tcp + } + if r and c == 200 then io.write('\t', link, '\n') + else io.write('\t', link, ': ', tostring(c), '\n') end + nthreads = nthreads - 1 + end) + end +end function readfile(path) - path = url.unescape(path) - local file, error = io.open(path, "r") - if file then + path = url.unescape(path) + local file, error = io.open(path, "r") + if file then local body = file:read("*a") - file:close() + file:close() return body else return nil, error end end -function getstatus(u) - local parsed = url.parse(u, {scheme = "file"}) - if parsed.scheme == "http" then - local r, c, h, s = http.request{url = u, method = "HEAD"} - if c ~= 200 then return s or c end - elseif parsed.scheme == "file" then - local file, error = io.open(url.unescape(parsed.path), "r") - if file then file:close() - else return error end - else return string.format("unhandled scheme '%s'", parsed.scheme) end -end - -function retrieve(u) - local parsed = url.parse(u, { scheme = "file" }) +function load(u) + local parsed = url.parse(u, { scheme = "file" }) local body, headers, code, error local base = u - if parsed.scheme == "http" then + if parsed.scheme == "http" then body, code, headers = http.request(u) - if code == 200 then + if code == 200 then + -- if there was a redirect, update base to reflect it base = headers.location or base end - if not body then + if not body then error = code end - elseif parsed.scheme == "file" then - body, error = readfile(parsed.path) + elseif parsed.scheme == "file" then + body, error = readfile(parsed.path) else error = string.format("unhandled scheme '%s'", parsed.scheme) end return base, body, error end @@ -53,35 +88,32 @@ function getlinks(body, base) body = string.gsub(body, "%<%!%-%-.-%-%-%>", "") local links = {} -- extract links - body = string.gsub(body, '[Hh][Rr][Ee][Ff]%s*=%s*"([^"]*)"', function(href) + body = string.gsub(body, '[Hh][Rr][Ee][Ff]%s*=%s*"([^"]*)"', function(href) table.insert(links, url.absolute(base, href)) end) - body = string.gsub(body, "[Hh][Rr][Ee][Ff]%s*=%s*'([^']*)'", function(href) + body = string.gsub(body, "[Hh][Rr][Ee][Ff]%s*=%s*'([^']*)'", function(href) table.insert(links, url.absolute(base, href)) end) - string.gsub(body, "[Hh][Rr][Ee][Ff]%s*=%s*(.-)>", function(href) + string.gsub(body, "[Hh][Rr][Ee][Ff]%s*=%s*(.-)>", function(href) table.insert(links, url.absolute(base, href)) end) return links end -function checklinks(u) - local base, body, error = retrieve(u) +function checklinks(address) + local base, body, error = load(address) if not body then print(error) return end + print("Checking ", base) local links = getlinks(body, base) - for _, l in ipairs(links) do - io.stderr:write("\t", l, "\n") - local err = getstatus(l) - if err then io.stderr:write('\t', l, ": ", err, "\n") end + for _, link in ipairs(links) do + getstatus(link) end end -arg = arg or {} -if table.getn(arg) < 1 then - print("Usage:\n luasocket check-links.lua {}") - exit() -end -for _, a in ipairs(arg) do - print("Checking ", a) - checklinks(url.absolute("file:", a)) +for _, address in ipairs(arg) do + checklinks(url.absolute("file:", address)) end + +while nthreads > 0 do + handler:step() +end diff --git a/etc/dispatch.lua b/etc/dispatch.lua new file mode 100644 index 0000000..e6c14a6 --- /dev/null +++ b/etc/dispatch.lua @@ -0,0 +1,267 @@ +----------------------------------------------------------------------------- +-- A hacked dispatcher module +-- LuaSocket sample files +-- Author: Diego Nehab +-- RCS ID: $$ +----------------------------------------------------------------------------- +local base = _G +local socket = require("socket") +local coroutine = require("coroutine") +module("dispatch") + +-- if too much time goes by without any activity in one of our sockets, we +-- just kill it +TIMEOUT = 10 + +----------------------------------------------------------------------------- +-- Mega hack. Don't try to do this at home. +----------------------------------------------------------------------------- +-- Lua 5.1 has coroutine.running(). We need it here, so we use this terrible +-- hack to emulate it in Lua itself +-- This is very inefficient, but is very good for debugging. +local running +local resume = coroutine.resume +function coroutine.resume(co, ...) + running = co + return resume(co, unpack(arg)) +end + +function coroutine.running() + return running +end + +----------------------------------------------------------------------------- +-- Mega hack. Don't try to do this at home. +----------------------------------------------------------------------------- +-- we can't yield across calls to protect, so we rewrite it with coxpcall +-- make sure you don't require any module that uses socket.protect before +-- loading our hack +function socket.protect(f) + return f +end + +function socket.protect(f) + return function(...) + local co = coroutine.create(f) + while true do + local results = {resume(co, unpack(arg))} + local status = table.remove(results, 1) + if not status then + if type(results[1]) == 'table' then + return nil, results[1][1] + else error(results[1]) end + end + if coroutine.status(co) == "suspended" then + arg = {coroutine.yield(unpack(results))} + else + return unpack(results) + end + end + end +end + +----------------------------------------------------------------------------- +-- socket.tcp() replacement for non-blocking I/O +----------------------------------------------------------------------------- +local function newtrap(dispatcher) + -- try to create underlying socket + local tcp, error = socket.tcp() + if not tcp then return nil, error end + -- put it in non-blocking mode right away + tcp:settimeout(0) + -- metatable for trap produces new methods on demand for those that we + -- don't override explicitly. + local metat = { __index = function(table, key) + table[key] = function(...) + return tcp[key](tcp, unpack(arg)) + end + end} + -- does user want to do his own non-blocking I/O? + local zero = false + -- create a trap object that will behave just like a real socket object + local trap = { } + -- we ignore settimeout to preserve our 0 timeout, but record whether + -- the user wants to do his own non-blocking I/O + function trap:settimeout(mode, value) + if value == 0 then + zero = true + else + zero = false + end + return 1 + end + -- send in non-blocking mode and yield on timeout + function trap:send(data, first, last) + first = (first or 1) - 1 + local result, error + while true do + -- tell dispatcher we want to keep sending before we yield + dispatcher.sending:insert(tcp) + -- mark time we started waiting + dispatcher.context[tcp].last = socket.gettime() + -- return control to dispatcher + -- if upon return the dispatcher tells us we timed out, + -- return an error to whoever called us + if coroutine.yield() == "timeout" then + return nil, "timeout" + end + -- try sending + result, error, first = tcp:send(data, first+1, last) + -- if we are done, or there was an unexpected error, + -- break away from loop + if error ~= "timeout" then return result, error, first end + end + end + -- receive in non-blocking mode and yield on timeout + -- or simply return partial read, if user requested timeout = 0 + function trap:receive(pattern, partial) + local error = "timeout" + local value + while true do + -- tell dispatcher we want to keep receiving before we yield + dispatcher.receiving:insert(tcp) + -- mark time we started waiting + dispatcher.context[tcp].last = socket.gettime() + -- return control to dispatcher + -- if upon return the dispatcher tells us we timed out, + -- return an error to whoever called us + if coroutine.yield() == "timeout" then + return nil, "timeout" + end + -- try receiving + value, error, partial = tcp:receive(pattern, partial) + -- if we are done, or there was an unexpected error, + -- break away from loop + if (error ~= "timeout") or zero then + return value, error, partial + end + end + end + -- connect in non-blocking mode and yield on timeout + function trap:connect(host, port) + local result, error = tcp:connect(host, port) + -- mark time we started waiting + dispatcher.context[tcp].last = socket.gettime() + if error == "timeout" then + -- tell dispatcher we will be able to write uppon connection + dispatcher.sending:insert(tcp) + -- return control to dispatcher + -- if upon return the dispatcher tells us we have a + -- timeout, just abort + if coroutine.yield() == "timeout" then + return nil, "timeout" + end + -- when we come back, check if connection was successful + result, error = tcp:connect(host, port) + if result or error == "already connected" then return 1 + else return nil, "non-blocking connect failed" end + else return result, error end + end + -- accept in non-blocking mode and yield on timeout + function trap:accept() + local result, error = tcp:accept() + while error == "timeout" do + -- mark time we started waiting + dispatcher.context[tcp].last = socket.gettime() + -- tell dispatcher we will be able to read uppon connection + dispatcher.receiving:insert(tcp) + -- return control to dispatcher + -- if upon return the dispatcher tells us we have a + -- timeout, just abort + if coroutine.yield() == "timeout" then + return nil, "timeout" + end + end + return result, error + end + -- remove thread from context + function trap:close() + dispatcher.context[tcp] = nil + return tcp:close() + end + -- add newly created socket to context + dispatcher.context[tcp] = { + thread = coroutine.running() + } + return setmetatable(trap, metat) +end + +----------------------------------------------------------------------------- +-- Our set data structure +----------------------------------------------------------------------------- +local function newset() + local reverse = {} + local set = {} + return setmetatable(set, {__index = { + insert = function(set, value) + if not reverse[value] then + table.insert(set, value) + reverse[value] = table.getn(set) + end + end, + remove = function(set, value) + local index = reverse[value] + if index then + reverse[value] = nil + local top = table.remove(set) + if top ~= value then + reverse[top] = index + set[index] = top + end + end + end + }}) +end + +----------------------------------------------------------------------------- +-- Our dispatcher API. +----------------------------------------------------------------------------- +local metat = { __index = {} } + +function metat.__index:start(func) + local co = coroutine.create(func) + assert(coroutine.resume(co)) +end + +function newhandler() + local dispatcher = { + context = {}, + sending = newset(), + receiving = newset() + } + function dispatcher.tcp() + return newtrap(dispatcher) + end + return setmetatable(dispatcher, metat) +end + +-- step through all active threads +function metat.__index:step() + -- check which sockets are interesting and act on them + local readable, writable = socket.select(self.receiving, + self.sending, 1) + -- for all readable connections, resume their threads + for _, who in ipairs(readable) do + if self.context[who] then + self.receiving:remove(who) + assert(coroutine.resume(self.context[who].thread)) + end + end + -- for all writable connections, do the same + for _, who in ipairs(writable) do + if self.context[who] then + self.sending:remove(who) + assert(coroutine.resume(self.context[who].thread)) + end + end + -- politely ask replacement I/O functions in idle threads to + -- return reporting a timeout + local now = socket.gettime() + for who, data in pairs(self.context) do + if data.last and now - data.last > TIMEOUT then + self.sending:remove(who) + self.receiving:remove(who) + assert(coroutine.resume(self.context[who].thread, "timeout")) + end + end +end