You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							248 lines
						
					
					
						
							6.8 KiB
						
					
					
				
			
		
		
	
	
							248 lines
						
					
					
						
							6.8 KiB
						
					
					
				| 
 | |
| local socket = require'socket'
 | |
| local tools = require'websocket.tools'
 | |
| local frame = require'websocket.frame'
 | |
| local handshake = require'websocket.handshake'
 | |
| local debug = require'debug'
 | |
| local tconcat = table.concat
 | |
| local tinsert = table.insert
 | |
| 
 | |
| local ev = function(ws)
 | |
|   ws = ws or {}
 | |
|   local ev = require'ev'
 | |
|   local sock
 | |
|   local loop = ws.loop or ev.Loop.default
 | |
|   local fd
 | |
|   local message_io
 | |
|   local handshake_io
 | |
|   local send_io_stop
 | |
|   local async_send
 | |
|   local self = {}
 | |
|   self.state = 'CLOSED'
 | |
|   local close_timer
 | |
|   local user_on_message
 | |
|   local user_on_close
 | |
|   local user_on_open
 | |
|   local user_on_error
 | |
|   local cleanup = function()
 | |
|     if close_timer then
 | |
|       close_timer:stop(loop)
 | |
|       close_timer = nil
 | |
|     end
 | |
|     if handshake_io then
 | |
|       handshake_io:stop(loop)
 | |
|       handshake_io:clear_pending(loop)
 | |
|       handshake_io = nil
 | |
|     end
 | |
|     if send_io_stop then
 | |
|       send_io_stop()
 | |
|       send_io_stop = nil
 | |
|     end
 | |
|     if message_io then
 | |
|       message_io:stop(loop)
 | |
|       message_io:clear_pending(loop)
 | |
|       message_io = nil
 | |
|     end
 | |
|     if sock then
 | |
|       sock:shutdown()
 | |
|       sock:close()
 | |
|       sock = nil
 | |
|     end
 | |
|   end
 | |
| 
 | |
|   local on_close = function(was_clean,code,reason)
 | |
|     cleanup()
 | |
|     self.state = 'CLOSED'
 | |
|     if user_on_close then
 | |
|       user_on_close(self,was_clean,code,reason or '')
 | |
|     end
 | |
|   end
 | |
|   local on_error = function(err,dont_cleanup)
 | |
|     if not dont_cleanup then
 | |
|       cleanup()
 | |
|     end
 | |
|     if user_on_error then
 | |
|       user_on_error(self,err)
 | |
|     else
 | |
|       print('Error',err)
 | |
|     end
 | |
|   end
 | |
|   local on_open = function(_,headers)
 | |
|     self.state = 'OPEN'
 | |
|     if user_on_open then
 | |
|       user_on_open(self,headers['sec-websocket-protocol'],headers)
 | |
|     end
 | |
|   end
 | |
|   local handle_socket_err = function(err,io,sock)
 | |
|     if self.state == 'OPEN' then
 | |
|       on_close(false,1006,err)
 | |
|     elseif self.state ~= 'CLOSED' then
 | |
|       on_error(err)
 | |
|     end
 | |
|   end
 | |
|   local on_message = function(message,opcode)
 | |
|     if opcode == frame.TEXT or opcode == frame.BINARY then
 | |
|       if user_on_message then
 | |
|         user_on_message(self,message,opcode)
 | |
|       end
 | |
|     elseif opcode == frame.CLOSE then
 | |
|       if self.state ~= 'CLOSING' then
 | |
|         self.state = 'CLOSING'
 | |
|         local code,reason = frame.decode_close(message)
 | |
|         local encoded = frame.encode_close(code)
 | |
|         encoded = frame.encode(encoded,frame.CLOSE,true)
 | |
|         async_send(encoded,
 | |
|           function()
 | |
|             on_close(true,code or 1005,reason)
 | |
|           end,handle_socket_err)
 | |
|       else
 | |
|         on_close(true,1005,'')
 | |
|       end
 | |
|     end
 | |
|   end
 | |
| 
 | |
|   self.send = function(_,message,opcode)
 | |
|     local encoded = frame.encode(message,opcode or frame.TEXT,true)
 | |
|     async_send(encoded, nil, handle_socket_err)
 | |
|   end
 | |
| 
 | |
|   self.connect = function(_,url,ws_protocol)
 | |
|     if self.state ~= 'CLOSED' then
 | |
|       on_error('wrong state',true)
 | |
|       return
 | |
|     end
 | |
|     local protocol,host,port,uri = tools.parse_url(url)
 | |
|     if protocol ~= 'ws' then
 | |
|       on_error('bad protocol')
 | |
|       return
 | |
|     end
 | |
|     local ws_protocols_tbl = {''}
 | |
|     if type(ws_protocol) == 'string' then
 | |
|         ws_protocols_tbl = {ws_protocol}
 | |
|     elseif type(ws_protocol) == 'table' then
 | |
|         ws_protocols_tbl = ws_protocol
 | |
|     end
 | |
|     self.state = 'CONNECTING'
 | |
|     assert(not sock)
 | |
|     sock = socket.tcp()
 | |
|     fd = sock:getfd()
 | |
|     assert(fd > -1)
 | |
|     -- set non blocking
 | |
|     sock:settimeout(0)
 | |
|     sock:setoption('tcp-nodelay',true)
 | |
|     async_send,send_io_stop = require'websocket.ev_common'.async_send(sock,loop)
 | |
|     handshake_io = ev.IO.new(
 | |
|       function(loop,connect_io)
 | |
|         connect_io:stop(loop)
 | |
|         local key = tools.generate_key()
 | |
|         local req = handshake.upgrade_request
 | |
|         {
 | |
|           key = key,
 | |
|           host = host,
 | |
|           port = port,
 | |
|           protocols = ws_protocols_tbl,
 | |
|           origin = ws.origin,
 | |
|           uri = uri
 | |
|         }
 | |
|         async_send(
 | |
|           req,
 | |
|           function()
 | |
|             local resp = {}
 | |
|             local response = ''
 | |
|             local read_upgrade = function(loop,read_io)
 | |
|               -- this seems to be possible, i don't understand why though :(
 | |
|               if not sock then
 | |
|                 read_io:stop(loop)
 | |
|                 handshake_io = nil
 | |
|                 return
 | |
|               end
 | |
|               repeat
 | |
|                 local byte,err,pp = sock:receive(1)
 | |
|                 if byte then
 | |
|                   response = response..byte
 | |
|                 elseif err then
 | |
|                   if err == 'timeout' then
 | |
|                     return
 | |
|                   else
 | |
|                     read_io:stop(loop)
 | |
|                     on_error('accept failed')
 | |
|                     return
 | |
|                   end
 | |
|                 end
 | |
|               until response:sub(#response-3) == '\r\n\r\n'
 | |
|               read_io:stop(loop)
 | |
|               handshake_io = nil
 | |
|               local headers = handshake.http_headers(response)
 | |
|               local expected_accept = handshake.sec_websocket_accept(key)
 | |
|               if headers['sec-websocket-accept'] ~= expected_accept then
 | |
|                 self.state = 'CLOSED'
 | |
|                 on_error('accept failed')
 | |
|                 return
 | |
|               end
 | |
|               message_io = require'websocket.ev_common'.message_io(
 | |
|                 sock,loop,
 | |
|                 on_message,
 | |
|               handle_socket_err)
 | |
|               on_open(self, headers)
 | |
|             end
 | |
|             handshake_io = ev.IO.new(read_upgrade,fd,ev.READ)
 | |
|             handshake_io:start(loop)-- handshake
 | |
|           end,
 | |
|         handle_socket_err)
 | |
|       end,fd,ev.WRITE)
 | |
|     local connected,err = sock:connect(host,port)
 | |
|     if connected then
 | |
|       handshake_io:callback()(loop,handshake_io)
 | |
|     elseif err == 'timeout' or err == 'Operation already in progress' then
 | |
|       handshake_io:start(loop)-- connect
 | |
|     else
 | |
|       self.state = 'CLOSED'
 | |
|       on_error(err)
 | |
|     end
 | |
|   end
 | |
| 
 | |
|   self.on_close = function(_,on_close_arg)
 | |
|     user_on_close = on_close_arg
 | |
|   end
 | |
| 
 | |
|   self.on_error = function(_,on_error_arg)
 | |
|     user_on_error = on_error_arg
 | |
|   end
 | |
| 
 | |
|   self.on_open = function(_,on_open_arg)
 | |
|     user_on_open = on_open_arg
 | |
|   end
 | |
| 
 | |
|   self.on_message = function(_,on_message_arg)
 | |
|     user_on_message = on_message_arg
 | |
|   end
 | |
| 
 | |
|   self.close = function(_,code,reason,timeout)
 | |
|     if handshake_io then
 | |
|       handshake_io:stop(loop)
 | |
|       handshake_io:clear_pending(loop)
 | |
|     end
 | |
|     if self.state == 'CONNECTING' then
 | |
|       self.state = 'CLOSING'
 | |
|       on_close(false,1006,'')
 | |
|       return
 | |
|     elseif self.state == 'OPEN' then
 | |
|       self.state = 'CLOSING'
 | |
|       timeout = timeout or 3
 | |
|       local encoded = frame.encode_close(code or 1000,reason)
 | |
|       encoded = frame.encode(encoded,frame.CLOSE,true)
 | |
|       -- this should let the other peer confirm the CLOSE message
 | |
|       -- by 'echoing' the message.
 | |
|       async_send(encoded)
 | |
|       close_timer = ev.Timer.new(function()
 | |
|           close_timer = nil
 | |
|           on_close(false,1006,'timeout')
 | |
|         end,timeout)
 | |
|       close_timer:start(loop)
 | |
|     end
 | |
|   end
 | |
| 
 | |
|   return self
 | |
| end
 | |
| 
 | |
| return ev
 | |
| 
 |