From e834b2b36065b03b78db46a5f941afb38d0cd775 Mon Sep 17 00:00:00 2001 From: Jocelyn Fiat Date: Tue, 24 Oct 2017 17:43:06 +0200 Subject: [PATCH] Added `on_timer` callback event so that server can check regularly external state. This is a basic solution to implement a way to check for time to time for events to notify websocket clients. --- .../websocket/event/web_socket_event_i.e | 19 +- .../websocket/web_socket.e | 175 ++++++++---------- .../websocket/web_socket_handler.e | 5 +- .../websocket/web_socket_writer.e | 131 +++++++++++++ 4 files changed, 227 insertions(+), 103 deletions(-) create mode 100644 library/server/wsf/connector/standalone_websocket/websocket/web_socket_writer.e diff --git a/library/server/wsf/connector/standalone_websocket/websocket/event/web_socket_event_i.e b/library/server/wsf/connector/standalone_websocket/websocket/event/web_socket_event_i.e index 500b266f..daff711c 100644 --- a/library/server/wsf/connector/standalone_websocket/websocket/event/web_socket_event_i.e +++ b/library/server/wsf/connector/standalone_websocket/websocket/event/web_socket_event_i.e @@ -88,6 +88,23 @@ feature -- Websocket events deferred end +feature {WEB_SOCKET} -- Timeout. + + timer_delay: INTEGER + -- Maximal duration in seconds between two `on_timeout` event. + -- Disable timeout event, by setting it to `0` (default). + + set_timer_delay (nb_secs: INTEGER) + do + timer_delay := nb_secs + end + + on_timer (ws: WEB_SOCKET) + -- Called every `timer_delay` seconds. + -- Note: redefine to use. + do + end + feature -- Websocket events: implemented on_pong (ws: WEB_SOCKET; a_message: READABLE_STRING_8) @@ -126,7 +143,7 @@ feature -- Websocket events: implemented end note - copyright: "2011-2016, Jocelyn Fiat, Javier Velilla, Eiffel Software and others" + copyright: "2011-2017, Jocelyn Fiat, Javier Velilla, Eiffel Software and others" license: "Eiffel Forum License v2 (see http://www.eiffel.com/licensing/forum.txt)" source: "[ Eiffel Software diff --git a/library/server/wsf/connector/standalone_websocket/websocket/web_socket.e b/library/server/wsf/connector/standalone_websocket/websocket/web_socket.e index 6264a365..603a01f4 100644 --- a/library/server/wsf/connector/standalone_websocket/websocket/web_socket.e +++ b/library/server/wsf/connector/standalone_websocket/websocket/web_socket.e @@ -1,7 +1,7 @@ note description: "[ Object representing the websocket connection. - It contains the `request` and `response`, and more important the `socket` itself. + It contains internally the `request` and `response`, and more important the `socket` itself. ]" date: "$Date$" revision: "$Revision$" @@ -10,16 +10,14 @@ class WEB_SOCKET inherit + WEB_SOCKET_WRITER + WGI_STANDALONE_CONNECTOR_EXPORTER WSF_RESPONSE_EXPORTER WGI_EXPORTER - HTTPD_LOGGER_CONSTANTS - - WEB_SOCKET_CONSTANTS - SHARED_BASE64 create @@ -32,7 +30,7 @@ feature {NONE} -- Initialization request := req response := res is_verbose := False - verbose_level := notice_level + verbose_level := {HTTPD_LOGGER_CONSTANTS}.notice_level if attached {WGI_STANDALONE_INPUT_STREAM} req.input as r_input @@ -44,12 +42,7 @@ feature {NONE} -- Initialization end end -feature -- Access - - socket: HTTPD_STREAM_SOCKET - -- Underlying connected socket. - -feature {NONE} -- Access +feature {NONE} -- Access request: WSF_REQUEST -- Associated request. @@ -75,14 +68,7 @@ feature -- Status has_error: BOOLEAN -- Error occured during processing? -feature -- Socket status - - is_ready_for_reading: BOOLEAN - -- Is `socket' ready for reading? - --| at this point, socket should be set to blocking. - do - Result := socket.ready_for_reading - end +feature -- Status report is_open_read: BOOLEAN -- Is `socket' open for reading? @@ -96,12 +82,6 @@ feature -- Socket status Result := socket.is_open_write end - socket_descriptor: INTEGER - -- Descriptor for current `socket'. - do - Result := socket.descriptor - end - feature -- Element change set_is_verbose (b: BOOLEAN) @@ -129,7 +109,7 @@ feature -- Basic operation end end -feature -- Basic Operation +feature {WSF_WEBSOCKET_EXECUTION} -- Basic Operation open_ws_handshake -- The opening handshake is intended to be compatible with HTTP-based @@ -164,10 +144,10 @@ feature -- Basic Operation -- TODO extract to a validator handshake or something like that. if is_verbose then - log ("%NReceive <====================", debug_level) + log ("%NReceive <====================", {HTTPD_LOGGER_CONSTANTS}.debug_level) if attached req.raw_header_data as rhd then check raw_header_is_valid_as_string_8: rhd.is_valid_as_string_8 end - log (rhd.to_string_8, debug_level) + log (rhd.to_string_8, {HTTPD_LOGGER_CONSTANTS}.debug_level) end end if @@ -186,7 +166,7 @@ feature -- Basic Operation attached req.http_host -- Host header must be present then if is_verbose then - log ("key " + l_ws_key, debug_level) + log ("key " + l_ws_key, {HTTPD_LOGGER_CONSTANTS}.debug_level) end -- Sending the server's opening handshake create l_sha1.make @@ -198,9 +178,9 @@ feature -- Basic Operation res.header.add_header_key_value ("Sec-WebSocket-Accept", l_key) if is_verbose then - log ("%N================> Send Handshake", debug_level) + log ("%N================> Send Handshake", {HTTPD_LOGGER_CONSTANTS}.debug_level) if attached {HTTP_HEADER} res.header as h then - log (h.string, debug_level) + log (h.string, {HTTPD_LOGGER_CONSTANTS}.debug_level) end end res.set_status_code_with_reason_phrase (101, "Switching Protocols") @@ -208,7 +188,7 @@ feature -- Basic Operation else has_error := True if is_verbose then - log ("Error (opening_handshake)!!!", debug_level) + log ("Error (opening_handshake)!!!", {HTTPD_LOGGER_CONSTANTS}.debug_level) end -- If we cannot complete the handshake, then the server MUST stop processing the client's handshake and return an HTTP response with an -- appropriate error code (such as 400 Bad Request). @@ -219,80 +199,77 @@ feature -- Basic Operation end end -feature -- Response! +feature {WEB_SOCKET_HANDLER} -- Networking - send (a_opcode:INTEGER; a_message: READABLE_STRING_8) + socket: HTTPD_STREAM_SOCKET + -- Underlying connected socket. + + has_input: BOOLEAN + -- Set by `wait_for_input`. + + wait_for_input (cb: detachable WEB_SOCKET_EVENT_I) local - i: INTEGER - l_chunk_size: INTEGER - l_chunk: READABLE_STRING_8 - l_header_message: STRING - l_message_count: INTEGER - n: NATURAL_64 - retried: BOOLEAN + l_timeout, nb: INTEGER + l_cb_timeout: INTEGER do - debug ("ws") - print (">>do_send (..., "+ opcode_name (a_opcode) +", ..)%N") - end - if not retried then - create l_header_message.make_empty - l_header_message.append_code ((0x80 | a_opcode).to_natural_32) - l_message_count := a_message.count - n := l_message_count.to_natural_64 - if l_message_count > 0xffff then - --! Improve. this code needs to be checked. - l_header_message.append_code ((0 | 127).to_natural_32) - l_header_message.append_character ((n |>> 56).to_character_8) - l_header_message.append_character ((n |>> 48).to_character_8) - l_header_message.append_character ((n |>> 40).to_character_8) - l_header_message.append_character ((n |>> 32).to_character_8) - l_header_message.append_character ((n |>> 24).to_character_8) - l_header_message.append_character ((n |>> 16).to_character_8) - l_header_message.append_character ((n |>> 8).to_character_8) - l_header_message.append_character ( n.to_character_8) - elseif l_message_count > 125 then - l_header_message.append_code ((0 | 126).to_natural_32) - l_header_message.append_code ((n |>> 8).as_natural_32) - l_header_message.append_character (n.to_character_8) + has_input := False + if cb = Void then + has_input := socket.ready_for_reading + else + l_cb_timeout := cb.timer_delay + l_timeout := socket.timeout + if l_cb_timeout = 0 then + -- timeout event not enabled. + has_input := socket.ready_for_reading else - l_header_message.append_code (n.as_natural_32) - end - socket.put_string_8_noexception (l_header_message) - if not socket.was_error then - l_chunk_size := 16_384 -- 16K TODO: see if we should make it customizable. - if l_message_count < l_chunk_size then - socket.put_string_8_noexception (a_message) + cb.on_timer (Current) + if l_cb_timeout > l_timeout then + -- event timeout duration is bigger than socket timeout + -- thus, no on_timeout before next frame waiting + has_input := socket.ready_for_reading else from - i := 0 + l_timeout := socket.timeout + nb := l_timeout + socket.set_timeout (l_cb_timeout) -- FIXME: for now 1 sec is the smaller timeout we can use. until - l_chunk_size = 0 or socket.was_error + has_input or nb <= 0 loop - debug ("ws") - print ("Sending chunk " + (i + 1).out + " -> " + (i + l_chunk_size).out +" / " + l_message_count.out + "%N") + has_input := socket.ready_for_reading + if not has_input then + -- Call on_timeout only if there is no input, + -- otherwise it was called once before the initial wait. + socket.set_timeout (l_timeout) + cb.on_timer (Current) + socket.set_timeout (l_cb_timeout) end - l_chunk := a_message.substring (i + 1, l_message_count.min (i + l_chunk_size)) - socket.put_string_8_noexception (l_chunk) - if l_chunk.count < l_chunk_size then - l_chunk_size := 0 - end - i := i + l_chunk_size - end - debug ("ws") - print ("Sending chunk done%N") + nb := nb - l_cb_timeout end + socket.set_timeout (l_timeout) end end - - else - -- FIXME: what should be done on rescue? end - rescue - retried := True - io.put_string ("Internal error in " + generator + ".do_send (conn, a_opcode=" + a_opcode.out + ", a_message) !%N") - retry end + + socket_descriptor: INTEGER + -- Descriptor for current `socket'. + do + Result := socket.descriptor + end + + socket_put_string (s: READABLE_STRING_8) + do + socket.put_string_8_noexception (s) + end + + socket_was_error: BOOLEAN + do + Result := socket.was_error + end + +feature {WEB_SOCKET_HANDLER} -- Frame + next_frame: detachable WEB_SOCKET_FRAME -- TODO Binary messages -- Handle error responses in a better way. @@ -402,7 +379,7 @@ feature -- Response! if Result.is_valid then --| valid frame/fragment if is_verbose then - log ("+ frame " + opcode_name (l_opcode) + " (fin=" + l_fin.out + ")", debug_level) + log ("+ frame " + opcode_name (l_opcode) + " (fin=" + l_fin.out + ")", {HTTPD_LOGGER_CONSTANTS}.debug_level) end -- rsv validation @@ -420,7 +397,7 @@ feature -- Response! end else if is_verbose then - log ("+ INVALID frame " + opcode_name (l_opcode) + " (fin=" + l_fin.out + ")", debug_level) + log ("+ INVALID frame " + opcode_name (l_opcode) + " (fin=" + l_fin.out + ")", {HTTPD_LOGGER_CONSTANTS}.debug_level) end end @@ -548,7 +525,7 @@ feature -- Response! end end if is_verbose then - log (" Received " + l_fetch_count.out + " out of " + l_len.out + " bytes <===============", debug_level) + log (" Received " + l_fetch_count.out + " out of " + l_len.out + " bytes <===============", {HTTPD_LOGGER_CONSTANTS}.debug_level) end debug ("ws") print (" -> ") @@ -580,7 +557,7 @@ feature -- Response! if Result /= Void then if attached Result.error as err then if is_verbose then - log (" !Invalid frame: " + err.string, debug_level) + log (" !Invalid frame: " + err.string, {HTTPD_LOGGER_CONSTANTS}.debug_level) end end if Result.is_injected_control then @@ -624,8 +601,7 @@ feature -- Response! retry end - -feature -- Encoding +feature {NONE} -- Encoding digest (a_sha1: SHA1): STRING -- Digest of `a_sha1'. @@ -672,7 +648,7 @@ feature {NONE} -- Socket helpers end end -feature -- Masking Data Client - Server +feature {NONE} -- Masking Data Client - Server unmask (a_chunk: STRING_8; a_pos: INTEGER; a_key: READABLE_STRING_8) local @@ -795,7 +771,6 @@ feature {NONE} -- Debug end end - note copyright: "2011-2017, Jocelyn Fiat, Javier Velilla, Eiffel Software and others" license: "Eiffel Forum License v2 (see http://www.eiffel.com/licensing/forum.txt)" diff --git a/library/server/wsf/connector/standalone_websocket/websocket/web_socket_handler.e b/library/server/wsf/connector/standalone_websocket/websocket/web_socket_handler.e index 5268c600..ce274021 100644 --- a/library/server/wsf/connector/standalone_websocket/websocket/web_socket_handler.e +++ b/library/server/wsf/connector/standalone_websocket/websocket/web_socket_handler.e @@ -59,7 +59,8 @@ feature -- Execution debug ("dbglog") dbglog (generator + ".execute_websocket (loop) WS_REQUEST_HANDLER.process_request {" + ws.socket_descriptor.out + "}") end - if ws.is_ready_for_reading then + ws.wait_for_input (callbacks) + if ws.has_input then l_frame := ws.next_frame if l_frame /= Void and then l_frame.is_valid then if attached l_frame.injected_control_frames as l_injections then @@ -140,7 +141,7 @@ feature {NONE} -- Logging end note - copyright: "2011-2016, Jocelyn Fiat, Javier Velilla, Eiffel Software and others" + copyright: "2011-2017, Jocelyn Fiat, Javier Velilla, Eiffel Software and others" license: "Eiffel Forum License v2 (see http://www.eiffel.com/licensing/forum.txt)" source: "[ Eiffel Software diff --git a/library/server/wsf/connector/standalone_websocket/websocket/web_socket_writer.e b/library/server/wsf/connector/standalone_websocket/websocket/web_socket_writer.e new file mode 100644 index 00000000..229afaa8 --- /dev/null +++ b/library/server/wsf/connector/standalone_websocket/websocket/web_socket_writer.e @@ -0,0 +1,131 @@ +note + description: "Summary description for {WEB_SOCKET_WRITER}." + date: "$Date$" + revision: "$Revision$" + +deferred class + WEB_SOCKET_WRITER + +inherit + WEB_SOCKET_CONSTANTS + +feature -- Messages + + send_text (a_message: READABLE_STRING_8) + -- Send text frame `a_message`. + do + send (text_frame, a_message) + end + + send_connection_close (a_message: detachable READABLE_STRING_8) + -- Send connection close frame `a_message`. + do + send (connection_close_frame, a_message) + end + + send_binary (a_data: READABLE_STRING_8) + -- Send binary frame `a_data`. + do + send (Binary_frame, a_data) + end + +feature -- Custom Message + + send (a_opcode: INTEGER; a_message: detachable READABLE_STRING_8) + local + i: INTEGER + l_chunk_size: INTEGER + l_chunk: READABLE_STRING_8 + l_header_message: STRING + l_message_count: INTEGER + n: NATURAL_64 + retried: BOOLEAN + do + debug ("ws") + print (">>do_send (..., "+ opcode_name (a_opcode) +", ..)%N") + end + if not retried then + create l_header_message.make_empty + l_header_message.append_code ((0x80 | a_opcode).to_natural_32) + if a_message /= Void then + l_message_count := a_message.count + else + l_message_count := 0 + end + n := l_message_count.to_natural_64 + if l_message_count > 0xffff then + --! Improve. this code needs to be checked. + l_header_message.append_code ((0 | 127).to_natural_32) + l_header_message.append_character ((n |>> 56).to_character_8) + l_header_message.append_character ((n |>> 48).to_character_8) + l_header_message.append_character ((n |>> 40).to_character_8) + l_header_message.append_character ((n |>> 32).to_character_8) + l_header_message.append_character ((n |>> 24).to_character_8) + l_header_message.append_character ((n |>> 16).to_character_8) + l_header_message.append_character ((n |>> 8).to_character_8) + l_header_message.append_character ( n.to_character_8) + elseif l_message_count > 125 then + l_header_message.append_code ((0 | 126).to_natural_32) + l_header_message.append_code ((n |>> 8).as_natural_32) + l_header_message.append_character (n.to_character_8) + else + l_header_message.append_code (n.as_natural_32) + end + socket_put_string (l_header_message) + if not socket_was_error then + l_chunk_size := 16_384 -- 16K TODO: see if we should make it customizable. + if a_message = Void or else l_message_count < l_chunk_size then + if a_message /= Void then + socket_put_string (a_message) + end + else + from + i := 0 + until + l_chunk_size = 0 or socket_was_error + loop + debug ("ws") + print ("Sending chunk " + (i + 1).out + " -> " + (i + l_chunk_size).out +" / " + l_message_count.out + "%N") + end + l_chunk := a_message.substring (i + 1, l_message_count.min (i + l_chunk_size)) + socket_put_string (l_chunk) + if l_chunk.count < l_chunk_size then + l_chunk_size := 0 + end + i := i + l_chunk_size + end + debug ("ws") + print ("Sending chunk done%N") + end + end + end + else + -- FIXME: what should be done on rescue? + end + rescue + retried := True + io.put_string ("Internal error in " + generator + ".do_send (conn, a_opcode=" + a_opcode.out + ", a_message) !%N") + retry + end + +feature {NONE} -- Networking + + socket_put_string (s: READABLE_STRING_8) + deferred + end + + socket_was_error: BOOLEAN + deferred + end + +note + copyright: "2011-2017, Jocelyn Fiat, Javier Velilla, Eiffel Software and others" + license: "Eiffel Forum License v2 (see http://www.eiffel.com/licensing/forum.txt)" + source: "[ + Eiffel Software + 5949 Hollister Ave., Goleta, CA 93117 USA + Telephone 805-685-1006, Fax 805-685-6869 + Website http://www.eiffel.com + Customer support http://support.eiffel.com + ]" +end