You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
lua-lib/websocket/client_ev.lua

248 lines
6.8 KiB

local socket = require'socket'
local tools = require'websocket.tools'
local frame = require'websocket.frame'
local handshake = require'websocket.handshake'
local debug = require'debug'
local tconcat = table.concat
local tinsert = table.insert
local ev = function(ws)
ws = ws or {}
local ev = require'ev'
local sock
local loop = ws.loop or ev.Loop.default
local fd
local message_io
local handshake_io
local send_io_stop
local async_send
local self = {}
self.state = 'CLOSED'
local close_timer
local user_on_message
local user_on_close
local user_on_open
local user_on_error
local cleanup = function()
if close_timer then
close_timer:stop(loop)
close_timer = nil
end
if handshake_io then
handshake_io:stop(loop)
handshake_io:clear_pending(loop)
handshake_io = nil
end
if send_io_stop then
send_io_stop()
send_io_stop = nil
end
if message_io then
message_io:stop(loop)
message_io:clear_pending(loop)
message_io = nil
end
if sock then
sock:shutdown()
sock:close()
sock = nil
end
end
local on_close = function(was_clean,code,reason)
cleanup()
self.state = 'CLOSED'
if user_on_close then
user_on_close(self,was_clean,code,reason or '')
end
end
local on_error = function(err,dont_cleanup)
if not dont_cleanup then
cleanup()
end
if user_on_error then
user_on_error(self,err)
else
print('Error',err)
end
end
local on_open = function(_,headers)
self.state = 'OPEN'
if user_on_open then
user_on_open(self,headers['sec-websocket-protocol'],headers)
end
end
local handle_socket_err = function(err,io,sock)
if self.state == 'OPEN' then
on_close(false,1006,err)
elseif self.state ~= 'CLOSED' then
on_error(err)
end
end
local on_message = function(message,opcode)
if opcode == frame.TEXT or opcode == frame.BINARY then
if user_on_message then
user_on_message(self,message,opcode)
end
elseif opcode == frame.CLOSE then
if self.state ~= 'CLOSING' then
self.state = 'CLOSING'
local code,reason = frame.decode_close(message)
local encoded = frame.encode_close(code)
encoded = frame.encode(encoded,frame.CLOSE,true)
async_send(encoded,
function()
on_close(true,code or 1005,reason)
end,handle_socket_err)
else
on_close(true,1005,'')
end
end
end
self.send = function(_,message,opcode)
local encoded = frame.encode(message,opcode or frame.TEXT,true)
async_send(encoded, nil, handle_socket_err)
end
self.connect = function(_,url,ws_protocol)
if self.state ~= 'CLOSED' then
on_error('wrong state',true)
return
end
local protocol,host,port,uri = tools.parse_url(url)
if protocol ~= 'ws' then
on_error('bad protocol')
return
end
local ws_protocols_tbl = {''}
if type(ws_protocol) == 'string' then
ws_protocols_tbl = {ws_protocol}
elseif type(ws_protocol) == 'table' then
ws_protocols_tbl = ws_protocol
end
self.state = 'CONNECTING'
assert(not sock)
sock = socket.tcp()
fd = sock:getfd()
assert(fd > -1)
-- set non blocking
sock:settimeout(0)
sock:setoption('tcp-nodelay',true)
async_send,send_io_stop = require'websocket.ev_common'.async_send(sock,loop)
handshake_io = ev.IO.new(
function(loop,connect_io)
connect_io:stop(loop)
local key = tools.generate_key()
local req = handshake.upgrade_request
{
key = key,
host = host,
port = port,
protocols = ws_protocols_tbl,
origin = ws.origin,
uri = uri
}
async_send(
req,
function()
local resp = {}
local response = ''
local read_upgrade = function(loop,read_io)
-- this seems to be possible, i don't understand why though :(
if not sock then
read_io:stop(loop)
handshake_io = nil
return
end
repeat
local byte,err,pp = sock:receive(1)
if byte then
response = response..byte
elseif err then
if err == 'timeout' then
return
else
read_io:stop(loop)
on_error('accept failed')
return
end
end
until response:sub(#response-3) == '\r\n\r\n'
read_io:stop(loop)
handshake_io = nil
local headers = handshake.http_headers(response)
local expected_accept = handshake.sec_websocket_accept(key)
if headers['sec-websocket-accept'] ~= expected_accept then
self.state = 'CLOSED'
on_error('accept failed')
return
end
message_io = require'websocket.ev_common'.message_io(
sock,loop,
on_message,
handle_socket_err)
on_open(self, headers)
end
handshake_io = ev.IO.new(read_upgrade,fd,ev.READ)
handshake_io:start(loop)-- handshake
end,
handle_socket_err)
end,fd,ev.WRITE)
local connected,err = sock:connect(host,port)
if connected then
handshake_io:callback()(loop,handshake_io)
elseif err == 'timeout' or err == 'Operation already in progress' then
handshake_io:start(loop)-- connect
else
self.state = 'CLOSED'
on_error(err)
end
end
self.on_close = function(_,on_close_arg)
user_on_close = on_close_arg
end
self.on_error = function(_,on_error_arg)
user_on_error = on_error_arg
end
self.on_open = function(_,on_open_arg)
user_on_open = on_open_arg
end
self.on_message = function(_,on_message_arg)
user_on_message = on_message_arg
end
self.close = function(_,code,reason,timeout)
if handshake_io then
handshake_io:stop(loop)
handshake_io:clear_pending(loop)
end
if self.state == 'CONNECTING' then
self.state = 'CLOSING'
on_close(false,1006,'')
return
elseif self.state == 'OPEN' then
self.state = 'CLOSING'
timeout = timeout or 3
local encoded = frame.encode_close(code or 1000,reason)
encoded = frame.encode(encoded,frame.CLOSE,true)
-- this should let the other peer confirm the CLOSE message
-- by 'echoing' the message.
async_send(encoded)
close_timer = ev.Timer.new(function()
close_timer = nil
on_close(false,1006,'timeout')
end,timeout)
close_timer:start(loop)
end
end
return self
end
return ev