;; Copyright (C) 2015 Amirouche Boubekki ;; This library is free software; you can redistribute it and/or ;; modify it under the terms of the GNU Lesser General Public ;; License as published by the Free Software Foundation; either ;; version 3 of the License, or (at your option) any later version. ;; ;; This library is distributed in the hope that it will be useful, ;; but WITHOUT ANY WARRANTY; without even the implied warranty of ;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ;; Lesser General Public License for more details. ;; ;; You should have received a copy of the GNU Lesser General Public ;; License along with this library; if not, write to the Free Software ;; Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA ;; 02110-1301 USA (define-module (async)) (use-modules (rnrs bytevectors)) (use-modules (srfi srfi-69)) ;; hash-table (use-modules (ice-9 q)) (use-modules (ice-9 match)) (use-modules (ice-9 rdelim)) (use-modules (ice-9 binary-ports)) (use-modules (srfi srfi-9)) ;; srfi-queue :o) (define make-queue make-q) (define push! enq!) (define pop! deq!) (define empty? q-empty?) (define-record-type (make-loop running tasks readers writers) loop? (running loop-running loop-running!) (tasks loop-tasks loop-tasks!) (readers loop-readers loop-readers!) (writers loop-writers loop-writers!)) (define loop (make-loop #false (make-queue) (make-hash-table) (make-hash-table))) (define-public *loop* (make-fluid loop)) (define-public (loop-call-later thunk) ;; support delay (let ((loop (fluid-ref *loop*))) (push! (loop-tasks loop) thunk) #true)) (define (loop-add-reader port callback) (let* ((loop (fluid-ref *loop*)) (readers (loop-readers loop)) (queue (hash-table-ref readers port make-queue))) (push! queue callback) (hash-table-set! readers port queue))) (define (loop-add-writer port callback) (let* ((loop (fluid-ref *loop*)) (writers (loop-writers loop)) (queue (hash-table-ref writers port make-queue))) (push! queue callback) (hash-table-set! writers port queue))) (define (call-read-callback port) (let* ((loop (fluid-ref *loop*)) (queue (hash-table-ref (loop-readers loop) port))) (when (and queue (not (empty? queue))) (let ((callback (pop! queue))) (when (empty? queue) (hash-table-delete! (loop-readers loop) port)) (callback))))) (define (call-write-callback port) (let* ((loop (fluid-ref *loop*)) (queue (hash-table-ref (loop-writers loop) port))) (when (and queue (not (empty? queue))) (let ((callback (pop! queue))) (when (empty? queue) (hash-table-delete! (loop-writers loop) port)) (callback))))) (define (loop-run-once) (let ((loop (fluid-ref *loop*)) (readers (hash-table-keys (loop-readers loop))) (writers (hash-table-keys (loop-writers loop)))) ;; first select ready ports (match (select readers writers '() 0) ;; FIXME: replace 0 with time for next task ((to-read to-write _) (for-each call-read-callback to-read) (for-each call-write-callback to-write))) ;; execute tasks (while (not (empty? (loop-tasks loop))) ((pop! (loop-tasks loop)))))) (define-public (loop-run-forever) (let* ((loop (fluid-ref *loop*))) (loop-running! loop #true) (while (loop-running loop) (call-with-prompt 'loop loop-run-once (lambda (cc callback) (callback cc)))))) (define-public (loop-running?) (let ((loop (fluid-ref *loop*))) (loop-running loop))) (define-public (loop-stop!) (let ((loop (fluid-ref *loop*))) (loop-running! loop #false))) ;; non blocking accept (define-public (accept/ port) (abort-to-prompt 'loop (lambda (cc) (loop-add-reader port (lambda () (cc (accept port))))))) ;;; recv/ and send/ ;; recv/ (define (%recv/ port) (let next ((out '())) (if (char-ready? port) (let ((byte (get-u8 port))) (if (eof-object? byte) (reverse out) (next (cons byte out)))) (reverse out)))) (define-public (recv/ port) (abort-to-prompt 'loop (lambda (cc) (loop-add-reader port (lambda () (cc (%recv/ port))))))) (define (bv-drop bv count) (u8-list->bytevector (list-tail (bytevector->u8-list bv) count))) ;; send/ (define (%send/ port message cc) (let loop ((message message)) (let* ((count (send port message)) (message (bv-drop message count))) (if (eq? (bytevector-length message) 0) (cc) (loop-add-writer port (lambda () (loop message))))))) (define-public (send/ port message) (abort-to-prompt 'loop (lambda (cc) (loop-add-writer port (lambda () (%send/ port message cc)))))) ;; XXX: write replacement for those blocking procedures (define-public (read/ socket) (abort-to-prompt 'loop (lambda (cc) (loop-add-reader socket (lambda () (cc (read socket))))))) (define-public (write/ message socket) (abort-to-prompt 'loop ;; Abort to the event loop (lambda (cc) ;; and call this with the continuation `cc` (loop-add-writer socket ;; register this callback to be called ;; when the socket is ready. ;; When the socket is ready write message ;; and return the using continuation cc ;; given by the abort (lambda () (cc (write message socket))))))) ;; (define-public (display/ message socket) ;; (abort-to-prompt 'loop (lambda (cc) (loop-add-writer socket (lambda () (cc (display message socket))))))) ;; (define-public (read-line/ socket) ;; (abort-to-prompt 'loop (lambda (cc) (loop-add-writer socket (lambda () (cc (read-line socket))))))) ;;; basic TCP sockets (define make-socket socket) (define-public (make-client-socket port) (let ((socket (make-socket PF_INET SOCK_STREAM 0))) (connect socket AF_INET INADDR_LOOPBACK port) (fcntl socket F_SETFL (logior O_NONBLOCK (fcntl socket F_GETFL))) socket)) (define-public (make-server-socket port) (let ((socket (make-socket PF_INET SOCK_STREAM 0))) (bind socket (make-socket-address AF_INET INADDR_ANY port)) (listen socket 128) (fcntl socket F_SETFL (logior O_NONBLOCK (fcntl socket F_GETFL))) socket))