guile-user
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[8sync]: Initial <websocket> client actor support, with problems.


From: Jan Nieuwenhuizen
Subject: [8sync]: Initial <websocket> client actor support, with problems.
Date: Fri, 19 Apr 2019 15:27:06 +0200

Hi!

Find attached some initial web socket client support.  Finally I found
a good excuse to start playing with 8sync, it's great!

Current status: 8sync websocket server works with a node ws client,
8sync websocket client works with a node ws server; demos included.

I want(ed) the web socket server to work a factory for <websocket>
(client) actors, so that once created by listen (server) or open
(client), <websocket> actors are identical.

Apart from the inexplicable error that I get when connecting the
8s-server with the 8s-client...

--8<---------------cut here---------------start------------->8---
15:15:25 address@hidden:~/src/8sync/demos/websocket [env]
$ ~/src/verum/branch/scheme/pre-inst-env ./8s-server.scm
listening: 1236
Zzzzzzzz....
Zzzzzzzz....

*** Caught exception with key 'goops-error and arguments: (#f "Unbound slot in 
object ~S" (#<<websocket-server> 1018d80>) ()) ***
          17 (apply-smob/1 #<catch-closure f37240>)
In ice-9/boot-9.scm:
    705:2 16 (call-with-prompt _ _ #<procedure default-prompt-handler (k proc)>)
In ice-9/eval.scm:
    619:8 15 (_ #(#(#<directory (guile-user) ffe140>)))
In ice-9/boot-9.scm:
   2312:4 14 (save-module-excursion _)
  3831:12 13 (_)
In 8sync/actors.scm:
    812:6 12 (run-hive #<<hive> 146c190> () #:cleanup _ #:handle-signals _)
In ice-9/control.scm:
    91:24 11 (call-with-escape-continuation _)
In 8sync/agenda.scm:
    568:6 10 (run-agenda #<<agenda> queue: (() . #f) prompt-tag: ("prompt") 
read-port-map: #<hash-table 146f0?> ?)
    631:5  9 (agenda-run-once! #<<agenda> queue: (() . #f) prompt-tag: 
("prompt") read-port-map: #<hash-table ?>)
In ice-9/boot-9.scm:
    829:9  8 (catch #t #<procedure 14a2e00 at 8sync/agenda.scm:638:11 ()> 
#<procedure 7f6c52beec30 at 8sync/a?> ?)
In 8sync/actors.scm:
    596:6  7 (call-catching-coroutine #<procedure 14a2dc0 at 
8sync/actors.scm:611:7 ()> _)
    576:6  6 (call-catching-errors)
In ice-9/boot-9.scm:
    829:9  5 (catch #t #<procedure 14a2dc0 at 8sync/actors.scm:611:7 ()> 
#<procedure 7f6c52b45958 at 8sync/ac?> ?)
In 8sync/actors.scm:
   616:18  4 (_)
In 8sync/systems/web.scm:
    215:7  3 (web-server-handle-request #<<websocket-server> 1018d80> 
#<<message> id: "281ff5d5da6059fe11c081?> ?)
In unknown file:
           2 (scm-error goops-error #f "Unbound slot in object ~S" 
(#<<websocket-server> 1018d80>) ())
In ice-9/boot-9.scm:
   751:25  1 (dispatch-exception 0 goops-error (#f "Unbound slot in object ~S" 
(#<<websocket-server> 1018d?>) ?))
In 8sync/agenda.scm:
   597:23  0 (print-error-and-continue _ . _)

While reading request:
Throw to key `hive-unresumable-coroutine' with args `("Won't resume coroutine; 
got an *error* as a reply" #:message #<<message> id: 
"281ff5d5da6059fe11c08187a167693d:7" to: 
#("<websocket-server>:9ed027e904d70fd0261ab485f583769e" 
"67ac79ac32e0bb3ac3f78eabb86a7a5") from: #("hive" 
"67ac79ac32e0bb3ac3f78eabb86a7a5") action: *error* body: (#:original-message 
#<<message> id: "281ff5d5da6059fe11c08187a167693d:6" to: 
#("<websocket-server>:9ed027e904d70fd0261ab485f583769e" 
"67ac79ac32e0bb3ac3f78eabb86a7a5") from: 
#("<websocket-server>:9ed027e904d70fd0261ab485f583769e" 
"67ac79ac32e0bb3ac3f78eabb86a7a5") action: handle-request body: (#<<request> 
method: GET uri: #<<uri> scheme: #f userinfo: #f host: #f port: #f path: "/" 
query: #f fragment: #f> version: (1 . 1) headers: ((host "localhost" . #f) 
(upgrade "WebSocket") (connection upgrade) (sec-websocket-key . 
"uQgASbCin4y9XYTgFuvArA==") (sec-websocket-version . "13")) meta: () port: 
#<input-output: socket 16>> #f) in-reply-to: #f wants-reply: #t replied: #t> 
#:error-key goops-error #:error-args (#f "Unbound slot in object ~S" 
(#<<websocket-server> 1018d80>) ())) in-reply-to: 
"281ff5d5da6059fe11c08187a167693d:6" wants-reply: #f replied: #f>)'.
Zzzzzzzz....
--8<---------------cut here---------------end--------------->8---

...which slot is unbound, where is that unbound slot accessed, and is
that the initial error and is the "Won't resume coroutine" the
collateral damage, or is it the other way around.  In that case, why
won't we resume the coroutine?  %-/

I'm also not happy with the interface of <websocket> yet (url/uri,
declarative opening vs opening by bootstrap message), but it starts to
work.  The patch includes demos/debug scripts.

Any help/guidance much appreciated!

Greetings,
janneke

>From f10da4a41185fd2c15093e53b80abac80c5a7e4a Mon Sep 17 00:00:00 2001
From: Jan Nieuwenhuizen <address@hidden>
Date: Thu, 18 Apr 2019 07:10:41 +0200
Subject: [PATCH 1/2] doc: Fix initial sleeper.

* doc/8sync.texi (Writing our own actors): Add missing slot to initial
sleeper example.
---
 doc/8sync.texi | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/doc/8sync.texi b/doc/8sync.texi
index 67eba86..7412ea1 100644
--- a/doc/8sync.texi
+++ b/doc/8sync.texi
@@ -441,7 +441,8 @@ How about an actor that start sleeping, and keeps sleeping?
 (define-class <sleeper> (<actor>)
   (actions #:allocation #:each-subclass
            #:init-thunk (build-actions
-                         (*init* sleeper-loop))))
+                         (*init* sleeper-loop)))
+  (sleep-secs #:init-value 1 #:getter sleeper-sleep-secs))
 
 (define (sleeper-loop actor message)
   (while (actor-alive? actor)
-- 
2.20.1

>From 3e068e9f89289310a967f3bd747f1b20125313b2 Mon Sep 17 00:00:00 2001
From: Jan Nieuwenhuizen <address@hidden>
Date: Thu, 18 Apr 2019 21:59:04 +0200
Subject: [PATCH 2/2] websocket: Initial <websocket> client actor support.

* 8sync/systems/websocket/client.scm: Initial actor support.
* 8sync/systems/websocket/server.scm (make-websocket-actor): New
function.
(<websocket-server>): Call it for websocket upgrade, create new
<websocket> actor.
(websocket-client-loop): Remove.
* demos/websocket/8s-client.scm: New file.
* demos/websocket/8s-server.scm: New file.
* demos/websocket/ws-client.js: New file, for bootstrapping/debugging.
* demos/websocket/ws-server.js: New file, for bootstrapping/debugging.
* Makefile.am (EXTRA_DIST): Add them.
---
 8sync/systems/websocket/client.scm | 328 +++++++++++++++++++++--------
 8sync/systems/websocket/server.scm | 155 ++++----------
 Makefile.am                        |   6 +-
 demos/websocket/8s-client.scm      |  61 ++++++
 demos/websocket/8s-server.scm      |  76 +++++++
 demos/websocket/node-client.js     |  26 +++
 demos/websocket/ws-client.js       |  44 ++++
 demos/websocket/ws-server.js       |  48 +++++
 8 files changed, 535 insertions(+), 209 deletions(-)
 create mode 100755 demos/websocket/8s-client.scm
 create mode 100755 demos/websocket/8s-server.scm
 create mode 100644 demos/websocket/node-client.js
 create mode 100755 demos/websocket/ws-client.js
 create mode 100755 demos/websocket/ws-server.js

diff --git a/8sync/systems/websocket/client.scm 
b/8sync/systems/websocket/client.scm
index 86f82ef..3cdf9c6 100644
--- a/8sync/systems/websocket/client.scm
+++ b/8sync/systems/websocket/client.scm
@@ -1,5 +1,7 @@
 ;;; guile-websocket --- WebSocket client/server
 ;;; Copyright © 2016 David Thompson <address@hidden>
+;;; Copyright © 2017 Christopher Allan Webber <address@hidden>
+;;; Copyright © 2019 Jan (janneke) Nieuwenhuizen <address@hidden>
 ;;;
 ;;; This file is part of guile-websocket.
 ;;;
@@ -25,27 +27,160 @@
 
 (define-module (8sync systems websocket client)
   #:use-module (ice-9 match)
+  #:use-module (srfi srfi-26)
   #:use-module (rnrs bytevectors)
   #:use-module (rnrs io ports)
-  #:use-module (srfi srfi-9)
-  #:use-module (srfi srfi-9 gnu)
   #:use-module (web request)
   #:use-module (web response)
   #:use-module (web uri)
+  #:use-module (oop goops)
+  #:use-module (8sync)
+  #:use-module (8sync ports)
   #:use-module (8sync contrib base64)
   #:use-module (8sync systems websocket frame)
   #:use-module (8sync systems websocket utils)
-  #:export (make-websocket
-            websocket?
-            websocket-uri
-            websocket-state
-            websocket-connecting?
-            websocket-open?
-            websocket-closing?
-            websocket-closed?
-            close-websocket
+  #:export (
+            <websocket>
+            ;; make-websocket
+            ;; websocket?
+            ;; websocket-uri
+            .url ;; hmm
+            ;; websocket-state
+            ;; websocket-connecting?
+            ;; websocket-open?
+            ;; websocket-closing?
+            ;; websocket-closed?
+            websocket-close
+            websocket-connect
             websocket-send
-            websocket-receive))
+            websocket-loop
+            .on-close
+            .on-error
+            .on-open
+            .on-message
+            ;; websocket-receive
+            ))
+
+(define no-op (const #f))
+
+(define-actor <websocket> (<actor>)
+  ((*init* websocket-init)
+   (close websocket-close)
+   (open websocket-open)
+   (send websocket-send))
+
+  (state #:accessor .state #:init-value 'closed #:init-keyword #:state)
+  (socket #:accessor .socket #:init-value #f #:init-keyword #:socket)
+  (url #:getter .url #:init-value #f #:init-keyword #:url)
+  (uri #:accessor .uri #:init-value #f #:init-keyword #:uri)
+  (entropy-port #:accessor .entropy-port #:init-form (open-entropy-port))
+
+  (on-close #:init-keyword #:on-close
+                 #:init-value no-op
+                 #:accessor .on-close)
+  (on-error #:init-keyword #:on-error
+            #:init-value no-op
+            #:accessor .on-error)
+  (on-message #:init-keyword #:on-message
+              #:accessor .on-message)
+  (on-open #:init-keyword #:on-open
+                #:init-value no-op
+                #:accessor .on-open))
+
+(define-method (websocket-close (websocket <websocket>) message)
+  (close-port (.socket websocket))
+  (set! (.socket websocket) #f)
+  (set! (.state websocket) 'closed)
+  ((.on-close websocket) websocket))
+
+(define-method (websocket-open (websocket <websocket>) message uri-or-string)
+  (if (eq? (.state 'closed))
+      (let ((uri (match uri-or-string
+                   ((? uri? uri) uri)
+                   ((? string? str) (string->uri str)))))
+        (if (websocket-uri? uri)
+            (catch 'system-error
+              (lambda _
+                (set! (.uri websocket) uri)
+                (let ((sock (make-client-socket uri)))
+                  (set! (.socket websocket) sock)
+                  (handshake websocket)
+                  (websocket-loop websocket message)))
+              (lambda (key . args)
+                ((.on-error websocket) websocket (format #f "open failed: ~s: 
~s" uri-or-string args))))
+            ((.on-error websocket) websocket (format #f "not a websocket uri: 
~s" uri-or-string))))
+      ((.on-error websocket) websocket (format #f "cannot open websocket in 
state: ~s" (.state websocket)))))
+
+(define-method (websocket-send (websocket <websocket>) message data)
+  (catch 'system-error
+    (lambda _
+      (write-frame
+       (cond ((string? data)
+              (make-text-frame data))
+             ((bytevector? data)
+              (make-binary-frame data)))
+       (.socket websocket)))
+    (lambda (key . args)
+      ((.on-error websocket) (format #f "send failed: ~s\n" websocket)))))
+
+(define-method (websocket-init (websocket <websocket>) message)
+  (and=> (.url websocket) (cut websocket-open websocket message <>)))
+
+(define-method (websocket-loop (websocket <websocket>) message)
+
+  (define (handle-data-frame type data)
+    ((.on-message websocket)
+     websocket
+     (match type
+       ('text   (utf8->string data))
+       ('binary data))))
+
+  (define (read-frame-maybe)
+    (and (not (eof-object? (lookahead-u8 (.socket websocket))))
+         (read-frame (.socket websocket))))
+
+  (define (close-down)
+    (websocket-close websocket message))
+
+  ((.on-open websocket) websocket)
+
+  (let loop ((fragments '())
+             (type #f))
+    (let* ((socket (.socket websocket))
+           (frame (and (eq? (.state websocket) 'open)
+                       (read-frame-maybe))))
+      (cond
+       ;; EOF - port is closed.
+       ;; @@: Sometimes the eof object appears here as opposed to
+       ;;   at lookahead, but I'm not sure why
+       ((or (not frame) (eof-object? frame))
+        (close-down))
+       ;; Per section 5.4, control frames may appear interspersed
+       ;; along with a fragmented message.
+       ((close-frame? frame)
+        ;; Per section 5.5.1, echo the close frame back to the
+        ;; socket before closing the socket.  The socket may no
+        ;; longer be listening.
+        (false-if-exception
+         (write-frame (make-close-frame (frame-data frame)) socket))
+        (close-down))
+       ((ping-frame? frame)
+        ;; Per section 5.5.3, a pong frame must include the exact
+        ;; same data as the ping frame.
+        (write-frame (make-pong-frame (frame-data frame)) socket)
+        (loop fragments type))
+       ((pong-frame? frame)           ; silently ignore pongs
+        (loop fragments type))
+       ((first-fragment-frame? frame) ; begin accumulating fragments
+        (loop (list frame) (frame-type frame)))
+       ((final-fragment-frame? frame) ; concatenate all fragments
+        (handle-data-frame type (frame-concatenate (reverse fragments)))
+        (loop '() #f))
+       ((fragment-frame? frame)       ; add a fragment
+        (loop (cons frame fragments) type))
+       ((data-frame? frame)           ; unfragmented data frame
+        (handle-data-frame (frame-type frame) (frame-data frame))
+        (loop '() #f))))))
 
 ;; See Section 3 - WebSocket URIs
 (define (encrypted-websocket-scheme? uri)
@@ -64,6 +199,10 @@ scheme."
            (unencrypted-websocket-scheme? uri))
        (not (uri-fragment uri))))
 
+(define (set-nonblocking! port)
+  (fcntl port F_SETFL (logior O_NONBLOCK (fcntl port F_GETFL)))
+  (setvbuf port 'block 1024))
+
 (define (make-client-socket uri)
   "Connect a socket to the remote resource described by URI."
   (let* ((port (uri-port uri))
@@ -74,48 +213,53 @@ scheme."
                                  (if port
                                      AI_NUMERICSERV
                                      0))))
-         (s (with-fluids ((%default-port-encoding #f))
-              (socket (addrinfo:fam info) SOCK_STREAM IPPROTO_IP))))
+         (sock (with-fluids ((%default-port-encoding #f))
+                 (socket (addrinfo:fam info) SOCK_STREAM IPPROTO_IP))))
+
+    (set-nonblocking! sock)
+    ;; Disable buffering for websockets
+    (setvbuf sock 'none)
+
     ;; TODO: Configure I/O buffering?
-    (connect s (addrinfo:addr info))
-    s))
+    (connect sock (addrinfo:addr info))
+    sock))
 
-(define-record-type <websocket>
-  (%make-websocket uri socket entropy-port state)
-  websocket?
-  (uri websocket-uri)
-  (socket websocket-socket)
-  (entropy-port websocket-entropy-port)
-  (state websocket-state set-websocket-state!))
+;; (define-record-type <websocket>
+;;   (%make-websocket uri socket entropy-port state)
+;;   websocket?
+;;   (uri websocket-uri)
+;;   (socket websocket-socket)
+;;   (entropy-port websocket-entropy-port)
+;;   (state websocket-state set-websocket-state!))
 
-(define (display-websocket ws port)
-  (format port "#<websocket ~a ~a>"
-          (uri->string (websocket-uri ws))
-          (websocket-state ws)))
+;; (define (display-websocket ws port)
+;;   (format port "#<websocket ~a ~a>"
+;;           (uri->string (websocket-uri ws))
+;;           (websocket-state ws)))
 
-(set-record-type-printer! <websocket> display-websocket)
+;; (set-record-type-printer! <websocket> display-websocket)
 
-(define (websocket-connecting? ws)
-  "Return #t if the WebSocket WS is in the connecting state."
-  (eq? (websocket-state ws) 'connecting))
+;; (define (websocket-connecting? ws)
+;;   "Return #t if the WebSocket WS is in the connecting state."
+;;   (eq? (websocket-state ws) 'connecting))
 
-(define (websocket-open? ws)
-  "Return #t if the WebSocket WS is in the open state."
-  (eq? (websocket-state ws) 'open))
+;; (define (websocket-open? ws)
+;;   "Return #t if the WebSocket WS is in the open state."
+;;   (eq? (websocket-state ws) 'open))
 
-(define (websocket-closing? ws)
-  "Return #t if the WebSocket WS is in the closing state."
-  (eq? (websocket-state ws) 'closing))
+;; (define (websocket-closing? ws)
+;;   "Return #t if the WebSocket WS is in the closing state."
+;;   (eq? (websocket-state ws) 'closing))
 
-(define (websocket-closed? ws)
-  "Return #t if the WebSocket WS is in the closed state."
-  (eq? (websocket-state ws) 'closed))
+;; (define (websocket-closed? ws)
+;;   "Return #t if the WebSocket WS is in the closed state."
+;;   (eq? (websocket-state ws) 'closed))
 
-(define (generate-client-key ws)
+(define (generate-client-key websocket)
   "Return a random, base64 encoded nonce using the entropy source of
 WS."
   (base64-encode
-   (get-bytevector-n (websocket-entropy-port ws) 16)))
+   (get-bytevector-n (.entropy-port websocket) 16)))
 
 ;; See Section 4.1 - Client Requirements
 (define (make-handshake-request uri key)
@@ -129,12 +273,12 @@ KEY."
                    (sec-websocket-version . "13"))))
     (build-request uri #:method 'GET #:headers headers)))
 
-(define (handshake ws)
-  "Perform the WebSocket handshake for the client WS."
-  (let ((key (generate-client-key ws)))
-    (write-request (make-handshake-request (websocket-uri ws) key)
-                   (websocket-socket ws))
-    (let* ((response (read-response (websocket-socket ws)))
+(define (handshake websocket)
+  "Perform the WebSocket handshake for the client WEBSOCKET."
+  (let ((key (generate-client-key websocket)))
+    (write-request (make-handshake-request (.uri websocket) key)
+                   (.socket websocket))
+    (let* ((response (read-response (.socket websocket)))
            (headers (response-headers response))
            (upgrade (assoc-ref headers 'upgrade))
            (connection (assoc-ref headers 'connection))
@@ -144,10 +288,12 @@ KEY."
                (string-ci=? (car upgrade) "websocket")
                (equal? connection '(upgrade))
                (string=? (string-trim-both accept) (make-accept-key key)))
-          (set-websocket-state! ws 'open)
+          (set! (.state websocket) 'open)
           (begin
-            (close-websocket ws)
-            (error "websocket handshake failed" (websocket-uri ws)))))))
+            (websocket-close websocket)
+            ((.on-error websocket) websocket
+             (format #f "websocket handshake failed: ~s"
+                     (uri->string (.uri websocket)))))))))
 
 (define (open-entropy-port)
   "Return an open input port to a reliable source of entropy for the
@@ -156,25 +302,25 @@ current system."
   ;; exactly portable.
   (open-input-file "/dev/urandom"))
 
-(define (make-websocket uri-or-string)
-  "Create and establish a new WebSocket connection for the remote
-resource described by URI-OR-STRING."
-  (let ((uri (match uri-or-string
-               ((? uri? uri) uri)
-               ((? string? str) (string->uri str)))))
-    (if (websocket-uri? uri)
-        (let ((ws (%make-websocket uri
-                                   (make-client-socket uri)
-                                   (open-entropy-port)
-                                   'connecting)))
-          (handshake ws)
-          ws)
-        (error "not a websocket uri" uri))))
-
-(define (close-websocket ws)
-  "Close the WebSocket connection for the client WS."
-  (let ((socket (websocket-socket ws)))
-    (set-websocket-state! ws 'closing)
+;; (define (make-websocket uri-or-string)
+;;   "Create and establish a new WebSocket connection for the remote
+;; resource described by URI-OR-STRING."
+;;   (let ((uri (match uri-or-string
+;;                ((? uri? uri) uri)
+;;                ((? string? str) (string->uri str)))))
+;;     (if (websocket-uri? uri)
+;;         (let ((ws (%make-websocket uri
+;;                                    (make-client-socket uri)
+;;                                    (open-entropy-port)
+;;                                    'connecting)))
+;;           (handshake ws)
+;;           ws)
+;;         (error "not a websocket uri" uri))))
+
+(define-method (websocket-close (websocket <websocket>))
+  "Close the WebSocket connection for the client WEBSOCKET."
+  (let ((socket (.socket websocket)))
+    (set! (.state websocket) 'closing)
     (write-frame (make-close-frame (make-bytevector 0)) socket)
     ;; Per section 5.5.1 , wait for the server to close the connection
     ;; for a reasonable amount of time.
@@ -185,26 +331,26 @@ resource described by URI-OR-STRING."
            (read-frame socket) ; throw it away
            (loop)))))
     (close-port socket)
-    (close-port (websocket-entropy-port ws))
-    (set-websocket-state! ws 'closed)
+    (close-port (.entropy-port websocket))
+    (set! (.state websocket) 'closed)
     *unspecified*))
 
-(define (generate-masking-key ws)
-  "Create a new masking key using the entropy source of WS."
-  ;; Masking keys are 32 bits long.
-  (get-bytevector-n (websocket-entropy-port ws) 4))
-
-(define (websocket-send ws data)
-  "Send DATA, a string or bytevector, to the server that WS is
-connected to."
-  ;; TODO: Send frames over some threshold in fragments.
-  (write-frame (make-text-frame data (generate-masking-key ws))
-               (websocket-socket ws)))
-
-(define (websocket-receive ws)
-  "Read a response from the server that WS is connected to."
-  ;; TODO: Handle fragmented frames and control frames.
-  (let ((frame (read-frame (websocket-socket ws))))
-    (if (binary-frame? frame)
-        (frame-data frame)
-        (text-frame->string frame))))
+;; (define (generate-masking-key ws)
+;;   "Create a new masking key using the entropy source of WS."
+;;   ;; Masking keys are 32 bits long.
+;;   (get-bytevector-n (websocket-entropy-port ws) 4))
+
+;; (define (websocket-send ws data)
+;;   "Send DATA, a string or bytevector, to the server that WS is
+;; connected to."
+;;   ;; TODO: Send frames over some threshold in fragments.
+;;   (write-frame (make-text-frame data (generate-masking-key ws))
+;;                (websocket-socket ws)))
+
+;; (define (websocket-receive ws)
+;;   "Read a response from the server that WS is connected to."
+;;   ;; TODO: Handle fragmented frames and control frames.
+;;   (let ((frame (read-frame (websocket-socket ws))))
+;;     (if (binary-frame? frame)
+;;         (frame-data frame)
+;;         (text-frame->string frame))))
diff --git a/8sync/systems/websocket/server.scm 
b/8sync/systems/websocket/server.scm
index 6283255..a1da790 100644
--- a/8sync/systems/websocket/server.scm
+++ b/8sync/systems/websocket/server.scm
@@ -1,6 +1,7 @@
 ;;; guile-websocket --- WebSocket client/server
 ;;; Copyright © 2015 David Thompson <address@hidden>
 ;;; Copyright © 2017 Christopher Allan Webber <address@hidden>
+;;; Copyright © 2019 Jan (janneke) Nieuwenhuizen <address@hidden>
 ;;;
 ;;; This file is part of guile-websocket.
 ;;;
@@ -35,17 +36,10 @@
   #:use-module (8sync)
   #:use-module (8sync ports)
   #:use-module (8sync systems web)
+  #:use-module (8sync systems websocket client)
   #:use-module (8sync systems websocket frame)
   #:use-module (8sync systems websocket utils)
-  #:export (<websocket-server>
-            .websocket-handler))
-
-;; See section 4.2 for explanation of the handshake.
-(define (read-handshake-request client-socket)
-  "Read HTTP request from CLIENT-SOCKET that should contain the
-headers required for a WebSocket handshake."
-  ;; See section 4.2.1.
-  (read-request client-socket))
+  #:export (<websocket-server>))
 
 (define (make-handshake-response client-key)
   "Return an HTTP response object for upgrading to a WebSocket
@@ -60,123 +54,50 @@ string."
 
 (define no-op (const #f))
 
-(define (make-simple-counter)
-  (let ((count 0))
-    (lambda ()
-      (set! count (1+ count))
-      count)))
-
 (define-actor <websocket-server> (<web-server>)
-  ((ws-send websocket-server-send))
+  ()
   (upgrade-paths #:init-value `(("websocket" .
-                                 ,(wrap-apply websocket-client-loop)))
+                                 ,(wrap-apply make-websocket-actor)))
                  #:allocation #:each-subclass
                  #:accessor .upgrade-paths)
 
-  (gen-client-id #:init-thunk make-simple-counter)
-
-  ;; active websocket connections
-  (ws-clients #:init-thunk make-hash-table
-              #:accessor .ws-clients)
-
-  (on-ws-message #:init-keyword #:on-ws-message 
-                 #:getter .on-ws-message)
-  (on-ws-client-connect #:init-keyword #:on-ws-client-connect
-                        #:init-value no-op
-                        #:getter .on-ws-client-connect)
-  (on-ws-client-disconnect #:init-keyword #:on-ws-client-disconnect
-                           #:init-value no-op
-                           #:getter .on-ws-client-disconnect))
-
-(define (web-server-gen-client-id websocket-server)
-  ((slot-ref websocket-server 'gen-client-id)))
-
-(define (websocket-client-loop websocket-server client request body)
-  "Serve client connected via CLIENT by performing the HTTP
-handshake and listening for control and data frames.  HANDLER is
-called for each complete message that is received."
-  ;; TODO: We'll also want to handle stuff like the sub-protocol.
-  (define (handle-data-frame type data)
-    ((.on-ws-message websocket-server)
-     websocket-server client-id
-     (match type
-       ('text   (utf8->string data))
-       ('binary data))))
-
-  (define (read-frame-maybe)
-    (and (not (eof-object? (lookahead-u8 client)))
-         (read-frame client)))
-
-  ;; Allows other actors to send things to this specific client
-  ;; @@: We probably could just increment a counter...
-  (define client-id (web-server-gen-client-id websocket-server))
-
-  (define (close-down)
-    (close-port client)
-    (hash-remove! (.ws-clients websocket-server) client-id)
-    ((.on-ws-client-disconnect websocket-server)
-     websocket-server client-id))
-
-  (hash-set! (.ws-clients websocket-server) client-id client)
+  (on-ws-connection #:init-keyword #:on-ws-connection
+                    #:init-value no-op
+                    #:getter .on-ws-connection)
+
+  (on-ws-close #:init-keyword #:on-ws-close
+                    #:init-value no-op
+                    #:getter .on-ws-close)
+  (on-ws-error #:init-keyword #:on-ws-error
+                    #:init-value no-op
+                    #:getter .on-ws-error)
+  (on-ws-message #:init-keyword #:on-ws-message
+                    #:init-value no-op
+                    #:getter .on-ws-message)
+  (on-ws-open #:init-keyword #:on-ws-open
+                    #:init-value no-op
+                    #:getter .on-ws-open))
+
+(define (make-websocket-actor websocket-server client request body)
+  "Setup websocket actor connected via CLIENT by performing the HTTP
+handshake."
 
   ;; Disable buffering for websockets
   (setvbuf client 'none)
 
-  ((.on-ws-client-connect websocket-server)
-   websocket-server client-id)
-
   ;; Perform the HTTP handshake and upgrade to WebSocket protocol.
   (let* ((client-key (assoc-ref (request-headers request) 'sec-websocket-key))
          (response (make-handshake-response client-key)))
-    (write-response response client)
-    (let loop ((fragments '())
-               (type #f))
-      (let ((frame (read-frame-maybe)))
-        (cond
-         ;; EOF - port is closed.
-         ;; @@: Sometimes the eof object appears here as opposed to
-         ;;   at lookahead, but I'm not sure why
-         ((or (not frame) (eof-object? frame))
-          (close-down))
-         ;; Per section 5.4, control frames may appear interspersed
-         ;; along with a fragmented message.
-         ((close-frame? frame)
-          ;; Per section 5.5.1, echo the close frame back to the
-          ;; client before closing the socket.  The client may no
-          ;; longer be listening.
-          (false-if-exception
-           (write-frame (make-close-frame (frame-data frame)) client))
-          (close-down))
-         ((ping-frame? frame)
-          ;; Per section 5.5.3, a pong frame must include the exact
-          ;; same data as the ping frame.
-          (write-frame (make-pong-frame (frame-data frame)) client)
-          (loop fragments type))
-         ((pong-frame? frame) ; silently ignore pongs
-          (loop fragments type))
-         ((first-fragment-frame? frame) ; begin accumulating fragments
-          (loop (list frame) (frame-type frame)))
-         ((final-fragment-frame? frame) ; concatenate all fragments
-          (handle-data-frame type (frame-concatenate (reverse fragments)))
-          (loop '() #f))
-         ((fragment-frame? frame) ; add a fragment
-          (loop (cons frame fragments) type))
-         ((data-frame? frame) ; unfragmented data frame
-          (handle-data-frame (frame-type frame) (frame-data frame))
-          (loop '() #f)))))))
-
-(define (websocket-server-send websocket-server message client-id data)
-  (cond ((hash-ref (.ws-clients websocket-server) client-id) =>
-         (lambda (client)
-           (write-frame
-            (cond ((string? data)
-                   (make-text-frame data))
-                  ((bytevector? data)
-                   (make-binary-frame data)))
-            client)
-           ;; ok is like success, amirite
-           (<-reply message 'ok)))
-        (else
-         ;; No such client with that id.
-         ;; Either it closed, or it was never there.
-         (<-reply message 'client-gone))))
+    (write-response response client))
+
+  (let* ((websocket-id (create-actor websocket-server <websocket>
+                                     #:socket client
+                                     #:state 'open
+                                     #:on-close (.on-ws-close websocket-server)
+                                     #:on-error (.on-ws-error websocket-server)
+                                     #:on-message (.on-ws-message 
websocket-server)
+                                     #:on-open (.on-ws-open websocket-server)))
+         (hive ((@@ (8sync actors) actor-hive) websocket-server))
+         (websocket ((@@ (8sync actors) hive-resolve-local-actor) hive 
websocket-id)))
+    ((.on-ws-connection websocket-server) websocket)
+    (websocket-loop websocket 'message)))
diff --git a/Makefile.am b/Makefile.am
index 926d755..79cf325 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -91,7 +91,11 @@ EXTRA_DIST =                                                 
\
        demos/ircbot.scm                                \
        demos/actors/botherbotherbother.scm             \
        demos/actors/simplest-possible.scm              \
-       demos/actors/robotscanner.scm
+       demos/actors/robotscanner.scm                   \
+       demos/websocket/8s-client.scm                   \
+       demos/websocket/8s-server.scm                   \
+       demos/websocket/ws-client.js                    \
+       demos/websocket/ws-server.js
 
 
 ## Make changelog on demand
diff --git a/demos/websocket/8s-client.scm b/demos/websocket/8s-client.scm
new file mode 100755
index 0000000..09fe137
--- /dev/null
+++ b/demos/websocket/8s-client.scm
@@ -0,0 +1,61 @@
+#! /usr/bin/env guile
+# -*-scheme-*-
+!#
+
+;;; 8sync --- Asynchronous programming for Guile
+;;; Copyright © 2019 Jan (janneke) Nieuwenhuizen <address@hidden>
+;;;
+;;; This file is part of 8sync.
+;;;
+;;; 8sync is free software: you can redistribute it and/or modify it
+;;; under the terms of the GNU Lesser General Public License as
+;;; published by the Free Software Foundation, either version 3 of the
+;;; License, or (at your option) any later version.
+;;;
+;;; 8sync is distributed in the hope that it will be useful,
+;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
+;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+;;; GNU Lesser General Public License for more details.
+;;;
+;;; You should have received a copy of the GNU Lesser General Public
+;;; License along with 8sync.  If not, see <http://www.gnu.org/licenses/>.
+
+(define-module (8s-client)
+  #:use-module (oop goops)
+  #:use-module (8sync)
+  #:use-module (8sync systems websocket client)
+
+  #:export (main))
+
+(define %default-server "ws://localhost:1236")
+
+(define-actor <sleeper> (<actor>)
+  ((*init* sleeper-loop))
+  (sleep-secs #:init-value 3 #:getter sleeper-sleep-secs))
+
+(define (sleeper-loop actor message)
+  (while (actor-alive? actor)
+    (display "Zzzzzzzz....\n")
+    ;; Sleep for a bit
+    (8sleep (sleeper-sleep-secs actor))))
+
+(define (main . args)
+  (let* ((hive (make-hive))
+         (sleeper (bootstrap-actor hive <sleeper>))
+         (websocket-id (bootstrap-actor hive <websocket>
+                                        #:url %default-server ;; toggle open 
with url/with message
+                                        #:on-close (lambda (ws)
+                                                     (format 
(current-error-port) "on-close: ~s\n" ws))
+                                        #:on-error (lambda (ws e)
+                                                     (format 
(current-error-port) "on-error: ~s:~s\n" ws e))
+                                        #:on-message (lambda (ws msg)
+                                                       (format 
(current-error-port) "on-message: ~s:~s\n" ws msg))
+                                        #:on-open (lambda (ws)
+                                                    (format 
(current-error-port) "on-open: ~s\n" ws)
+                                                    (websocket-send ws 
'message "Hello, Web Socket!"))))
+         (websocket ((@@ (8sync actors) hive-resolve-local-actor) hive 
websocket-id)))
+    (if (.url websocket)
+        (run-hive hive '())
+        (run-hive hive (list (bootstrap-message hive websocket-id 'open 
%default-server))))))
+
+(main (command-line))
diff --git a/demos/websocket/8s-server.scm b/demos/websocket/8s-server.scm
new file mode 100755
index 0000000..45e62ba
--- /dev/null
+++ b/demos/websocket/8s-server.scm
@@ -0,0 +1,76 @@
+#! /usr/bin/env guile
+# -*-scheme-*-
+!#
+
+;;; 8sync --- Asynchronous programming for Guile
+;;; Copyright © 2019 Jan (janneke) Nieuwenhuizen <address@hidden>
+;;;
+;;; This file is part of 8sync.
+;;;
+;;; 8sync is free software: you can redistribute it and/or modify it
+;;; under the terms of the GNU Lesser General Public License as
+;;; published by the Free Software Foundation, either version 3 of the
+;;; License, or (at your option) any later version.
+;;;
+;;; 8sync is distributed in the hope that it will be useful,
+;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
+;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+;;; GNU Lesser General Public License for more details.
+;;;
+;;; You should have received a copy of the GNU Lesser General Public
+;;; License along with 8sync.  If not, see <http://www.gnu.org/licenses/>.
+
+(define-module (8s-server)
+  #:use-module (oop goops)
+  #:use-module (8sync)
+  #:use-module (8sync systems web)
+  #:use-module (8sync systems websocket client)
+  #:use-module (8sync systems websocket server)
+  #:export (main))
+
+(define %server-port 1236)
+
+(define-actor <sleeper> (<actor>)
+  ((*init* sleeper-loop))
+  (sleep-secs #:init-value 3 #:getter sleeper-sleep-secs))
+
+(define (sleeper-loop actor message)
+  (while (actor-alive? actor)
+    (display "Zzzzzzzz....\n")
+    ;; Sleep for a bit
+    (8sleep (sleeper-sleep-secs actor))))
+
+(define (main . args)
+  (let* ((hive (make-hive))
+         (sleeper (bootstrap-actor hive <sleeper>))
+         (server (bootstrap-actor
+                  hive <websocket-server>
+                  #:port %server-port
+                  #:on-ws-connection
+                  (lambda (ws)
+                    (format (current-error-port) "on-ws-connection: ws=~s\n" 
ws)
+                    (when #f
+                      (set! (.on-close ws)
+                           (lambda (ws)
+                             (format (current-error-port) "on-close: ~s\n" 
ws)))
+                      (set! (.on-error ws)
+                            (lambda (ws e)
+                              (format (current-error-port) "on-error: ~s, 
~s\n" ws e)))
+                      (set! (.on-message ws)
+                            (lambda (ws msg)
+                              (format (current-error-port) "on-message: ~s\n" 
msg)))
+                      (set! (.on-open ws)
+                            (lambda (ws)
+                              (format (current-error-port) "on-open: ~s\n" 
ws)))))
+                  #:on-ws-close (lambda (ws)
+                                 (format (current-error-port) "on-close: ~s\n" 
ws))
+                  #:on-ws-error (lambda (ws e)
+                                  (format (current-error-port) "on-error: ~s: 
~s\n" ws e))
+                  #:on-ws-message (lambda (ws msg)
+                                    (format (current-error-port) "on-message: 
~s: ~s\n" ws msg))
+                  #:on-ws-open (lambda (ws)
+                                 (format (current-error-port) "on-open: ~s\n" 
ws)))))
+    (format (current-error-port) "listening: ~s\n" %server-port)
+    (run-hive hive '())))
+
+(main (command-line))
diff --git a/demos/websocket/node-client.js b/demos/websocket/node-client.js
new file mode 100644
index 0000000..401d3b7
--- /dev/null
+++ b/demos/websocket/node-client.js
@@ -0,0 +1,26 @@
+#! /usr/bin/env node
+
+var default_server = 'ws://localhost:1236';
+
+// npm install -g ws
+function main () {
+  var ws = new require ('ws') (default_server);
+  ws.onopen = function () {
+    console.log ('ws.onopen');
+    process.nextTick (function () {
+      ws.send ('Hello Web Socket');
+    });
+  }
+  ws.onerror = function (e) {
+    console.log ('ws.onerror: e=%s', '' + e);
+  }
+  ws.onclose = function () {
+    console.log ('ws.onclose');
+  }
+  ws.onmessage = function (event) {
+    var msg = event.data;
+    console.log ('ws.onmessage: ws=%j, msg=%s', msg);
+  };
+}
+
+main ();
diff --git a/demos/websocket/ws-client.js b/demos/websocket/ws-client.js
new file mode 100755
index 0000000..134fa01
--- /dev/null
+++ b/demos/websocket/ws-client.js
@@ -0,0 +1,44 @@
+#! /usr/bin/env node
+
+/// 8sync --- Asynchronous programming for Guile
+/// Copyright © 2019 Jan (janneke) Nieuwenhuizen <address@hidden>
+///
+/// This file is part of 8sync.
+///
+/// 8sync is free software: you can redistribute it and/or modify it
+/// under the terms of the GNU Lesser General Public License as
+/// published by the Free Software Foundation, either version 3 of the
+/// License, or (at your option) any later version.
+///
+/// 8sync is distributed in the hope that it will be useful,
+/// but WITHOUT ANY WARRANTY; without even the implied warranty of
+/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+/// GNU Lesser General Public License for more details.
+///
+/// You should have received a copy of the GNU Lesser General Public
+/// License along with 8sync.  If not, see <http://www.gnu.org/licenses/>.
+
+var default_server = 'ws://localhost:1236';
+
+// npm install -g ws
+function main () {
+  var ws = new require ('ws') (default_server);
+  ws.onopen = function () {
+    console.log ('ws.onopen');
+    process.nextTick (function () {
+      ws.send ('Hello Web Socket');
+    });
+  }
+  ws.onerror = function (e) {
+    console.log ('ws.onerror: e=%s', '' + e);
+  }
+  ws.onclose = function () {
+    console.log ('ws.onclose');
+  }
+  ws.onmessage = function (event) {
+    var msg = event.data;
+    console.log ('ws.onmessage: msg=%s', msg);
+  };
+}
+
+main ();
diff --git a/demos/websocket/ws-server.js b/demos/websocket/ws-server.js
new file mode 100755
index 0000000..47a4209
--- /dev/null
+++ b/demos/websocket/ws-server.js
@@ -0,0 +1,48 @@
+#! /usr/bin/env node
+
+/// 8sync --- Asynchronous programming for Guile
+/// Copyright © 2019 Jan (janneke) Nieuwenhuizen <address@hidden>
+///
+/// This file is part of 8sync.
+///
+/// 8sync is free software: you can redistribute it and/or modify it
+/// under the terms of the GNU Lesser General Public License as
+/// published by the Free Software Foundation, either version 3 of the
+/// License, or (at your option) any later version.
+///
+/// 8sync is distributed in the hope that it will be useful,
+/// but WITHOUT ANY WARRANTY; without even the implied warranty of
+/// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+/// GNU Lesser General Public License for more details.
+///
+/// You should have received a copy of the GNU Lesser General Public
+/// License along with 8sync.  If not, see <http://www.gnu.org/licenses/>.
+
+var server_port = 1236;
+
+// npm install -g ws
+function main () {
+  var wss = new require ('ws').Server ({port:server_port});
+  console.log ('listening: %s', server_port);
+  wss.on ('connection', function (ws) {
+    ws.onopen = function () {
+      console.log ('ws.onopen');
+      // process.nextTick (function () {
+      //   ws.send ('Hello Web Socket');
+      //   ws.close ();
+      // });
+    }
+    ws.onerror = function (e) {
+      console.log ('ws.onerror: e=%s', '' + e);
+    }
+    ws.onclose = function () {
+      console.log ('ws.onclose');
+    }
+    ws.onmessage = function (event) {
+      var msg = event.data;
+      console.log ('ws.onmessage: msg=%s', msg);
+    };
+  });
+}
+
+main ();
-- 
2.20.1

-- 
Jan Nieuwenhuizen <address@hidden> | GNU LilyPond http://lilypond.org
Freelance IT http://JoyofSource.com | Avatar® http://AvatarAcademy.com

reply via email to

[Prev in Thread] Current Thread [Next in Thread]