You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
lua-lib/websocket/server_ev.lua

266 lines
7.2 KiB

local socket = require'socket'
local tools = require'websocket.tools'
local frame = require'websocket.frame'
local handshake = require'websocket.handshake'
local tconcat = table.concat
local tinsert = table.insert
local ev
local loop
local clients = {}
clients[true] = {}
local client = function(sock,protocol)
assert(sock)
sock:setoption('tcp-nodelay',true)
local fd = sock:getfd()
local message_io
local close_timer
local async_send = require'websocket.ev_common'.async_send(sock,loop)
local self = {}
self.state = 'OPEN'
self.sock = sock
local user_on_error
local on_error = function(s,err)
if clients[protocol] ~= nil and clients[protocol][self] ~= nil then
clients[protocol][self] = nil
end
if user_on_error then
user_on_error(self,err)
else
print('Websocket server error',err)
end
end
local user_on_close
local on_close = function(was_clean,code,reason)
if clients[protocol] ~= nil and clients[protocol][self] ~= nil then
clients[protocol][self] = nil
end
if close_timer then
close_timer:stop(loop)
close_timer = nil
end
message_io:stop(loop)
self.state = 'CLOSED'
if user_on_close then
user_on_close(self,was_clean,code,reason or '')
end
sock:shutdown()
sock:close()
end
local handle_sock_err = function(err)
if err == 'closed' then
if self.state ~= 'CLOSED' then
on_close(false,1006,'')
end
else
on_error(err)
end
end
local user_on_message = function() end
local TEXT = frame.TEXT
local BINARY = frame.BINARY
local on_message = function(message,opcode)
if opcode == TEXT or opcode == BINARY then
user_on_message(self,message,opcode)
elseif opcode == frame.CLOSE then
if self.state ~= 'CLOSING' then
self.state = 'CLOSING'
local code,reason = frame.decode_close(message)
local encoded = frame.encode_close(code)
encoded = frame.encode(encoded,frame.CLOSE)
async_send(encoded,
function()
on_close(true,code or 1006,reason)
end,handle_sock_err)
else
on_close(true,1006,'')
end
end
end
self.send = function(_,message,opcode)
local encoded = frame.encode(message,opcode or frame.TEXT)
return async_send(encoded)
end
self.on_close = function(_,on_close_arg)
user_on_close = on_close_arg
end
self.on_error = function(_,on_error_arg)
user_on_error = on_error_arg
end
self.on_message = function(_,on_message_arg)
user_on_message = on_message_arg
end
self.broadcast = function(_,...)
for client in pairs(clients[protocol]) do
if client.state == 'OPEN' then
client:send(...)
end
end
end
self.close = function(_,code,reason,timeout)
if clients[protocol] ~= nil and clients[protocol][self] ~= nil then
clients[protocol][self] = nil
end
if not message_io then
self:start()
end
if self.state == 'OPEN' then
self.state = 'CLOSING'
assert(message_io)
timeout = timeout or 3
local encoded = frame.encode_close(code or 1000,reason or '')
encoded = frame.encode(encoded,frame.CLOSE)
async_send(encoded)
close_timer = ev.Timer.new(function()
close_timer = nil
on_close(false,1006,'timeout')
end,timeout)
close_timer:start(loop)
end
end
self.start = function()
message_io = require'websocket.ev_common'.message_io(
sock,loop,
on_message,
handle_sock_err)
end
return self
end
local listen = function(opts)
assert(opts and (opts.protocols or opts.default))
ev = require'ev'
loop = opts.loop or ev.Loop.default
local user_on_error
local on_error = function(s,err)
if user_on_error then
user_on_error(s,err)
else
print(err)
end
end
local protocols = {}
if opts.protocols then
for protocol in pairs(opts.protocols) do
clients[protocol] = {}
tinsert(protocols,protocol)
end
end
local self = {}
self.on_error = function(self,on_error)
user_on_error = on_error
end
local listener,err = socket.bind(opts.interface or '*',opts.port or 80)
if not listener then
error(err)
end
listener:settimeout(0)
self.sock = function()
return listener
end
local listen_io = ev.IO.new(
function()
local client_sock = listener:accept()
client_sock:settimeout(0)
assert(client_sock)
local request = {}
local last
ev.IO.new(
function(loop,read_io)
repeat
local line,err,part = client_sock:receive('*l')
if line then
if last then
line = last..line
last = nil
end
request[#request+1] = line
elseif err ~= 'timeout' then
on_error(self,'Websocket Handshake failed due to socket err:'..err)
read_io:stop(loop)
return
else
last = part
return
end
until line == ''
read_io:stop(loop)
local upgrade_request = tconcat(request,'\r\n')
local response,protocol = handshake.accept_upgrade(upgrade_request,protocols)
if not response then
print('Handshake failed, Request:')
print(upgrade_request)
client_sock:close()
return
end
local index
ev.IO.new(
function(loop,write_io)
local len = #response
local sent,err = client_sock:send(response,index)
if not sent then
write_io:stop(loop)
print('Websocket client closed while handshake',err)
elseif sent == len then
write_io:stop(loop)
local protocol_handler
local new_client
local protocol_index
if protocol and opts.protocols[protocol] then
protocol_index = protocol
protocol_handler = opts.protocols[protocol]
elseif opts.default then
-- true is the 'magic' index for the default handler
protocol_index = true
protocol_handler = opts.default
else
client_sock:close()
if on_error then
on_error('bad protocol')
end
return
end
new_client = client(client_sock,protocol_index)
clients[protocol_index][new_client] = true
protocol_handler(new_client)
new_client:start(loop)
else
assert(sent < len)
index = sent
end
end,client_sock:getfd(),ev.WRITE):start(loop)
end,client_sock:getfd(),ev.READ):start(loop)
end,listener:getfd(),ev.READ)
self.close = function(keep_clients)
listen_io:stop(loop)
listener:close()
listener = nil
if not keep_clients then
for protocol,clients in pairs(clients) do
for client in pairs(clients) do
client:close()
end
end
end
end
listen_io:start(loop)
return self
end
return {
listen = listen
}