Added on_timer callback event so that server can check regularly external state.

This is a basic solution to implement a way to check for time to time for events to notify websocket clients.
This commit is contained in:
Jocelyn Fiat
2017-10-24 17:43:06 +02:00
parent a0c1ab5232
commit e834b2b360
4 changed files with 227 additions and 103 deletions

View File

@@ -88,6 +88,23 @@ feature -- Websocket events
deferred deferred
end end
feature {WEB_SOCKET} -- Timeout.
timer_delay: INTEGER
-- Maximal duration in seconds between two `on_timeout` event.
-- Disable timeout event, by setting it to `0` (default).
set_timer_delay (nb_secs: INTEGER)
do
timer_delay := nb_secs
end
on_timer (ws: WEB_SOCKET)
-- Called every `timer_delay` seconds.
-- Note: redefine to use.
do
end
feature -- Websocket events: implemented feature -- Websocket events: implemented
on_pong (ws: WEB_SOCKET; a_message: READABLE_STRING_8) on_pong (ws: WEB_SOCKET; a_message: READABLE_STRING_8)
@@ -126,7 +143,7 @@ feature -- Websocket events: implemented
end end
note note
copyright: "2011-2016, Jocelyn Fiat, Javier Velilla, Eiffel Software and others" copyright: "2011-2017, Jocelyn Fiat, Javier Velilla, Eiffel Software and others"
license: "Eiffel Forum License v2 (see http://www.eiffel.com/licensing/forum.txt)" license: "Eiffel Forum License v2 (see http://www.eiffel.com/licensing/forum.txt)"
source: "[ source: "[
Eiffel Software Eiffel Software

View File

@@ -1,7 +1,7 @@
note note
description: "[ description: "[
Object representing the websocket connection. Object representing the websocket connection.
It contains the `request` and `response`, and more important the `socket` itself. It contains internally the `request` and `response`, and more important the `socket` itself.
]" ]"
date: "$Date$" date: "$Date$"
revision: "$Revision$" revision: "$Revision$"
@@ -10,16 +10,14 @@ class
WEB_SOCKET WEB_SOCKET
inherit inherit
WEB_SOCKET_WRITER
WGI_STANDALONE_CONNECTOR_EXPORTER WGI_STANDALONE_CONNECTOR_EXPORTER
WSF_RESPONSE_EXPORTER WSF_RESPONSE_EXPORTER
WGI_EXPORTER WGI_EXPORTER
HTTPD_LOGGER_CONSTANTS
WEB_SOCKET_CONSTANTS
SHARED_BASE64 SHARED_BASE64
create create
@@ -32,7 +30,7 @@ feature {NONE} -- Initialization
request := req request := req
response := res response := res
is_verbose := False is_verbose := False
verbose_level := notice_level verbose_level := {HTTPD_LOGGER_CONSTANTS}.notice_level
if if
attached {WGI_STANDALONE_INPUT_STREAM} req.input as r_input attached {WGI_STANDALONE_INPUT_STREAM} req.input as r_input
@@ -44,12 +42,7 @@ feature {NONE} -- Initialization
end end
end end
feature -- Access feature {NONE} -- Access
socket: HTTPD_STREAM_SOCKET
-- Underlying connected socket.
feature {NONE} -- Access
request: WSF_REQUEST request: WSF_REQUEST
-- Associated request. -- Associated request.
@@ -75,14 +68,7 @@ feature -- Status
has_error: BOOLEAN has_error: BOOLEAN
-- Error occured during processing? -- Error occured during processing?
feature -- Socket status feature -- Status report
is_ready_for_reading: BOOLEAN
-- Is `socket' ready for reading?
--| at this point, socket should be set to blocking.
do
Result := socket.ready_for_reading
end
is_open_read: BOOLEAN is_open_read: BOOLEAN
-- Is `socket' open for reading? -- Is `socket' open for reading?
@@ -96,12 +82,6 @@ feature -- Socket status
Result := socket.is_open_write Result := socket.is_open_write
end end
socket_descriptor: INTEGER
-- Descriptor for current `socket'.
do
Result := socket.descriptor
end
feature -- Element change feature -- Element change
set_is_verbose (b: BOOLEAN) set_is_verbose (b: BOOLEAN)
@@ -129,7 +109,7 @@ feature -- Basic operation
end end
end end
feature -- Basic Operation feature {WSF_WEBSOCKET_EXECUTION} -- Basic Operation
open_ws_handshake open_ws_handshake
-- The opening handshake is intended to be compatible with HTTP-based -- The opening handshake is intended to be compatible with HTTP-based
@@ -164,10 +144,10 @@ feature -- Basic Operation
-- TODO extract to a validator handshake or something like that. -- TODO extract to a validator handshake or something like that.
if is_verbose then if is_verbose then
log ("%NReceive <====================", debug_level) log ("%NReceive <====================", {HTTPD_LOGGER_CONSTANTS}.debug_level)
if attached req.raw_header_data as rhd then if attached req.raw_header_data as rhd then
check raw_header_is_valid_as_string_8: rhd.is_valid_as_string_8 end check raw_header_is_valid_as_string_8: rhd.is_valid_as_string_8 end
log (rhd.to_string_8, debug_level) log (rhd.to_string_8, {HTTPD_LOGGER_CONSTANTS}.debug_level)
end end
end end
if if
@@ -186,7 +166,7 @@ feature -- Basic Operation
attached req.http_host -- Host header must be present attached req.http_host -- Host header must be present
then then
if is_verbose then if is_verbose then
log ("key " + l_ws_key, debug_level) log ("key " + l_ws_key, {HTTPD_LOGGER_CONSTANTS}.debug_level)
end end
-- Sending the server's opening handshake -- Sending the server's opening handshake
create l_sha1.make create l_sha1.make
@@ -198,9 +178,9 @@ feature -- Basic Operation
res.header.add_header_key_value ("Sec-WebSocket-Accept", l_key) res.header.add_header_key_value ("Sec-WebSocket-Accept", l_key)
if is_verbose then if is_verbose then
log ("%N================> Send Handshake", debug_level) log ("%N================> Send Handshake", {HTTPD_LOGGER_CONSTANTS}.debug_level)
if attached {HTTP_HEADER} res.header as h then if attached {HTTP_HEADER} res.header as h then
log (h.string, debug_level) log (h.string, {HTTPD_LOGGER_CONSTANTS}.debug_level)
end end
end end
res.set_status_code_with_reason_phrase (101, "Switching Protocols") res.set_status_code_with_reason_phrase (101, "Switching Protocols")
@@ -208,7 +188,7 @@ feature -- Basic Operation
else else
has_error := True has_error := True
if is_verbose then if is_verbose then
log ("Error (opening_handshake)!!!", debug_level) log ("Error (opening_handshake)!!!", {HTTPD_LOGGER_CONSTANTS}.debug_level)
end end
-- If we cannot complete the handshake, then the server MUST stop processing the client's handshake and return an HTTP response with an -- If we cannot complete the handshake, then the server MUST stop processing the client's handshake and return an HTTP response with an
-- appropriate error code (such as 400 Bad Request). -- appropriate error code (such as 400 Bad Request).
@@ -219,80 +199,77 @@ feature -- Basic Operation
end end
end end
feature -- Response! feature {WEB_SOCKET_HANDLER} -- Networking
send (a_opcode:INTEGER; a_message: READABLE_STRING_8) socket: HTTPD_STREAM_SOCKET
-- Underlying connected socket.
has_input: BOOLEAN
-- Set by `wait_for_input`.
wait_for_input (cb: detachable WEB_SOCKET_EVENT_I)
local local
i: INTEGER l_timeout, nb: INTEGER
l_chunk_size: INTEGER l_cb_timeout: INTEGER
l_chunk: READABLE_STRING_8
l_header_message: STRING
l_message_count: INTEGER
n: NATURAL_64
retried: BOOLEAN
do do
debug ("ws") has_input := False
print (">>do_send (..., "+ opcode_name (a_opcode) +", ..)%N") if cb = Void then
end has_input := socket.ready_for_reading
if not retried then else
create l_header_message.make_empty l_cb_timeout := cb.timer_delay
l_header_message.append_code ((0x80 | a_opcode).to_natural_32) l_timeout := socket.timeout
l_message_count := a_message.count if l_cb_timeout = 0 then
n := l_message_count.to_natural_64 -- timeout event not enabled.
if l_message_count > 0xffff then has_input := socket.ready_for_reading
--! Improve. this code needs to be checked.
l_header_message.append_code ((0 | 127).to_natural_32)
l_header_message.append_character ((n |>> 56).to_character_8)
l_header_message.append_character ((n |>> 48).to_character_8)
l_header_message.append_character ((n |>> 40).to_character_8)
l_header_message.append_character ((n |>> 32).to_character_8)
l_header_message.append_character ((n |>> 24).to_character_8)
l_header_message.append_character ((n |>> 16).to_character_8)
l_header_message.append_character ((n |>> 8).to_character_8)
l_header_message.append_character ( n.to_character_8)
elseif l_message_count > 125 then
l_header_message.append_code ((0 | 126).to_natural_32)
l_header_message.append_code ((n |>> 8).as_natural_32)
l_header_message.append_character (n.to_character_8)
else else
l_header_message.append_code (n.as_natural_32) cb.on_timer (Current)
end if l_cb_timeout > l_timeout then
socket.put_string_8_noexception (l_header_message) -- event timeout duration is bigger than socket timeout
if not socket.was_error then -- thus, no on_timeout before next frame waiting
l_chunk_size := 16_384 -- 16K TODO: see if we should make it customizable. has_input := socket.ready_for_reading
if l_message_count < l_chunk_size then
socket.put_string_8_noexception (a_message)
else else
from from
i := 0 l_timeout := socket.timeout
nb := l_timeout
socket.set_timeout (l_cb_timeout) -- FIXME: for now 1 sec is the smaller timeout we can use.
until until
l_chunk_size = 0 or socket.was_error has_input or nb <= 0
loop loop
debug ("ws") has_input := socket.ready_for_reading
print ("Sending chunk " + (i + 1).out + " -> " + (i + l_chunk_size).out +" / " + l_message_count.out + "%N") if not has_input then
-- Call on_timeout only if there is no input,
-- otherwise it was called once before the initial wait.
socket.set_timeout (l_timeout)
cb.on_timer (Current)
socket.set_timeout (l_cb_timeout)
end end
l_chunk := a_message.substring (i + 1, l_message_count.min (i + l_chunk_size)) nb := nb - l_cb_timeout
socket.put_string_8_noexception (l_chunk)
if l_chunk.count < l_chunk_size then
l_chunk_size := 0
end
i := i + l_chunk_size
end
debug ("ws")
print ("Sending chunk done%N")
end end
socket.set_timeout (l_timeout)
end end
end end
else
-- FIXME: what should be done on rescue?
end end
rescue
retried := True
io.put_string ("Internal error in " + generator + ".do_send (conn, a_opcode=" + a_opcode.out + ", a_message) !%N")
retry
end end
socket_descriptor: INTEGER
-- Descriptor for current `socket'.
do
Result := socket.descriptor
end
socket_put_string (s: READABLE_STRING_8)
do
socket.put_string_8_noexception (s)
end
socket_was_error: BOOLEAN
do
Result := socket.was_error
end
feature {WEB_SOCKET_HANDLER} -- Frame
next_frame: detachable WEB_SOCKET_FRAME next_frame: detachable WEB_SOCKET_FRAME
-- TODO Binary messages -- TODO Binary messages
-- Handle error responses in a better way. -- Handle error responses in a better way.
@@ -402,7 +379,7 @@ feature -- Response!
if Result.is_valid then if Result.is_valid then
--| valid frame/fragment --| valid frame/fragment
if is_verbose then if is_verbose then
log ("+ frame " + opcode_name (l_opcode) + " (fin=" + l_fin.out + ")", debug_level) log ("+ frame " + opcode_name (l_opcode) + " (fin=" + l_fin.out + ")", {HTTPD_LOGGER_CONSTANTS}.debug_level)
end end
-- rsv validation -- rsv validation
@@ -420,7 +397,7 @@ feature -- Response!
end end
else else
if is_verbose then if is_verbose then
log ("+ INVALID frame " + opcode_name (l_opcode) + " (fin=" + l_fin.out + ")", debug_level) log ("+ INVALID frame " + opcode_name (l_opcode) + " (fin=" + l_fin.out + ")", {HTTPD_LOGGER_CONSTANTS}.debug_level)
end end
end end
@@ -548,7 +525,7 @@ feature -- Response!
end end
end end
if is_verbose then if is_verbose then
log (" Received " + l_fetch_count.out + " out of " + l_len.out + " bytes <===============", debug_level) log (" Received " + l_fetch_count.out + " out of " + l_len.out + " bytes <===============", {HTTPD_LOGGER_CONSTANTS}.debug_level)
end end
debug ("ws") debug ("ws")
print (" -> ") print (" -> ")
@@ -580,7 +557,7 @@ feature -- Response!
if Result /= Void then if Result /= Void then
if attached Result.error as err then if attached Result.error as err then
if is_verbose then if is_verbose then
log (" !Invalid frame: " + err.string, debug_level) log (" !Invalid frame: " + err.string, {HTTPD_LOGGER_CONSTANTS}.debug_level)
end end
end end
if Result.is_injected_control then if Result.is_injected_control then
@@ -624,8 +601,7 @@ feature -- Response!
retry retry
end end
feature {NONE} -- Encoding
feature -- Encoding
digest (a_sha1: SHA1): STRING digest (a_sha1: SHA1): STRING
-- Digest of `a_sha1'. -- Digest of `a_sha1'.
@@ -672,7 +648,7 @@ feature {NONE} -- Socket helpers
end end
end end
feature -- Masking Data Client - Server feature {NONE} -- Masking Data Client - Server
unmask (a_chunk: STRING_8; a_pos: INTEGER; a_key: READABLE_STRING_8) unmask (a_chunk: STRING_8; a_pos: INTEGER; a_key: READABLE_STRING_8)
local local
@@ -795,7 +771,6 @@ feature {NONE} -- Debug
end end
end end
note note
copyright: "2011-2017, Jocelyn Fiat, Javier Velilla, Eiffel Software and others" copyright: "2011-2017, Jocelyn Fiat, Javier Velilla, Eiffel Software and others"
license: "Eiffel Forum License v2 (see http://www.eiffel.com/licensing/forum.txt)" license: "Eiffel Forum License v2 (see http://www.eiffel.com/licensing/forum.txt)"

View File

@@ -59,7 +59,8 @@ feature -- Execution
debug ("dbglog") debug ("dbglog")
dbglog (generator + ".execute_websocket (loop) WS_REQUEST_HANDLER.process_request {" + ws.socket_descriptor.out + "}") dbglog (generator + ".execute_websocket (loop) WS_REQUEST_HANDLER.process_request {" + ws.socket_descriptor.out + "}")
end end
if ws.is_ready_for_reading then ws.wait_for_input (callbacks)
if ws.has_input then
l_frame := ws.next_frame l_frame := ws.next_frame
if l_frame /= Void and then l_frame.is_valid then if l_frame /= Void and then l_frame.is_valid then
if attached l_frame.injected_control_frames as l_injections then if attached l_frame.injected_control_frames as l_injections then
@@ -140,7 +141,7 @@ feature {NONE} -- Logging
end end
note note
copyright: "2011-2016, Jocelyn Fiat, Javier Velilla, Eiffel Software and others" copyright: "2011-2017, Jocelyn Fiat, Javier Velilla, Eiffel Software and others"
license: "Eiffel Forum License v2 (see http://www.eiffel.com/licensing/forum.txt)" license: "Eiffel Forum License v2 (see http://www.eiffel.com/licensing/forum.txt)"
source: "[ source: "[
Eiffel Software Eiffel Software

View File

@@ -0,0 +1,131 @@
note
description: "Summary description for {WEB_SOCKET_WRITER}."
date: "$Date$"
revision: "$Revision$"
deferred class
WEB_SOCKET_WRITER
inherit
WEB_SOCKET_CONSTANTS
feature -- Messages
send_text (a_message: READABLE_STRING_8)
-- Send text frame `a_message`.
do
send (text_frame, a_message)
end
send_connection_close (a_message: detachable READABLE_STRING_8)
-- Send connection close frame `a_message`.
do
send (connection_close_frame, a_message)
end
send_binary (a_data: READABLE_STRING_8)
-- Send binary frame `a_data`.
do
send (Binary_frame, a_data)
end
feature -- Custom Message
send (a_opcode: INTEGER; a_message: detachable READABLE_STRING_8)
local
i: INTEGER
l_chunk_size: INTEGER
l_chunk: READABLE_STRING_8
l_header_message: STRING
l_message_count: INTEGER
n: NATURAL_64
retried: BOOLEAN
do
debug ("ws")
print (">>do_send (..., "+ opcode_name (a_opcode) +", ..)%N")
end
if not retried then
create l_header_message.make_empty
l_header_message.append_code ((0x80 | a_opcode).to_natural_32)
if a_message /= Void then
l_message_count := a_message.count
else
l_message_count := 0
end
n := l_message_count.to_natural_64
if l_message_count > 0xffff then
--! Improve. this code needs to be checked.
l_header_message.append_code ((0 | 127).to_natural_32)
l_header_message.append_character ((n |>> 56).to_character_8)
l_header_message.append_character ((n |>> 48).to_character_8)
l_header_message.append_character ((n |>> 40).to_character_8)
l_header_message.append_character ((n |>> 32).to_character_8)
l_header_message.append_character ((n |>> 24).to_character_8)
l_header_message.append_character ((n |>> 16).to_character_8)
l_header_message.append_character ((n |>> 8).to_character_8)
l_header_message.append_character ( n.to_character_8)
elseif l_message_count > 125 then
l_header_message.append_code ((0 | 126).to_natural_32)
l_header_message.append_code ((n |>> 8).as_natural_32)
l_header_message.append_character (n.to_character_8)
else
l_header_message.append_code (n.as_natural_32)
end
socket_put_string (l_header_message)
if not socket_was_error then
l_chunk_size := 16_384 -- 16K TODO: see if we should make it customizable.
if a_message = Void or else l_message_count < l_chunk_size then
if a_message /= Void then
socket_put_string (a_message)
end
else
from
i := 0
until
l_chunk_size = 0 or socket_was_error
loop
debug ("ws")
print ("Sending chunk " + (i + 1).out + " -> " + (i + l_chunk_size).out +" / " + l_message_count.out + "%N")
end
l_chunk := a_message.substring (i + 1, l_message_count.min (i + l_chunk_size))
socket_put_string (l_chunk)
if l_chunk.count < l_chunk_size then
l_chunk_size := 0
end
i := i + l_chunk_size
end
debug ("ws")
print ("Sending chunk done%N")
end
end
end
else
-- FIXME: what should be done on rescue?
end
rescue
retried := True
io.put_string ("Internal error in " + generator + ".do_send (conn, a_opcode=" + a_opcode.out + ", a_message) !%N")
retry
end
feature {NONE} -- Networking
socket_put_string (s: READABLE_STRING_8)
deferred
end
socket_was_error: BOOLEAN
deferred
end
note
copyright: "2011-2017, Jocelyn Fiat, Javier Velilla, Eiffel Software and others"
license: "Eiffel Forum License v2 (see http://www.eiffel.com/licensing/forum.txt)"
source: "[
Eiffel Software
5949 Hollister Ave., Goleta, CA 93117 USA
Telephone 805-685-1006, Fax 805-685-6869
Website http://www.eiffel.com
Customer support http://support.eiffel.com
]"
end