qemu-block
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[PATCH v3 33/33] block/nbd: drop connection_co


From: Vladimir Sementsov-Ogievskiy
Subject: [PATCH v3 33/33] block/nbd: drop connection_co
Date: Fri, 16 Apr 2021 11:09:11 +0300

OK, that's a big rewrite of the logic.

Pre-patch we have an always running coroutine - connection_co. It does
reply receiving and reconnecting. And it leads to a lot of difficult
and unobvious code around drained sections and context switch. We also
abuse bs->in_flight counter which is increased for connection_co and
temporary decreased in points where we want to allow drained section to
begin. One of these place is in another file: in nbd_read_eof() in
nbd/client.c.

We also cancel reconnect and requests waiting for reconnect on drained
begin which is not correct.

Let's finally drop this always running coroutine and go another way:

1. reconnect_attempt() goes to nbd_co_send_request and called under
   send_mutex

2. We do receive headers in request coroutine. But we also should
   dispatch replies for another pending requests. So,
   nbd_connection_entry() is turned into nbd_receive_replies(), which
   does reply dispatching until it receive another request headers, and
   returns when it receive the requested header.

3. All old staff around drained sections and context switch is dropped.

Signed-off-by: Vladimir Sementsov-Ogievskiy <vsementsov@virtuozzo.com>
---
 block/nbd.c  | 376 ++++++++++++++++-----------------------------------
 nbd/client.c |   2 -
 2 files changed, 119 insertions(+), 259 deletions(-)

diff --git a/block/nbd.c b/block/nbd.c
index 03391bb231..3a7b532790 100644
--- a/block/nbd.c
+++ b/block/nbd.c
@@ -59,7 +59,7 @@
 typedef struct {
     Coroutine *coroutine;
     uint64_t offset;        /* original offset of the request */
-    bool receiving;         /* waiting for connection_co? */
+    bool receiving; /* waiting in first yield of nbd_receive_replies() */
 } NBDClientRequest;
 
 typedef enum NBDClientState {
@@ -75,14 +75,10 @@ typedef struct BDRVNBDState {
 
     CoMutex send_mutex;
     CoQueue free_sema;
-    Coroutine *connection_co;
-    Coroutine *teardown_co;
-    QemuCoSleepState *connection_co_sleep_ns_state;
-    bool drained;
-    bool wait_drained_end;
+    Coroutine *receive_co;
+    Coroutine *in_flight_waiter;
     int in_flight;
     NBDClientState state;
-    bool wait_in_flight;
 
     QEMUTimer *reconnect_delay_timer;
 
@@ -131,33 +127,20 @@ static bool nbd_client_connected(BDRVNBDState *s)
 
 static void nbd_channel_error(BDRVNBDState *s, int ret)
 {
+    if (nbd_client_connected(s)) {
+        qio_channel_shutdown(s->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
+    }
+
     if (ret == -EIO) {
         if (nbd_client_connected(s)) {
             s->state = s->reconnect_delay ? NBD_CLIENT_CONNECTING_WAIT :
                                             NBD_CLIENT_CONNECTING_NOWAIT;
         }
     } else {
-        if (nbd_client_connected(s)) {
-            qio_channel_shutdown(s->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
-        }
         s->state = NBD_CLIENT_QUIT;
     }
 }
 
-static void nbd_recv_coroutines_wake_all(BDRVNBDState *s)
-{
-    int i;
-
-    for (i = 0; i < MAX_NBD_REQUESTS; i++) {
-        NBDClientRequest *req = &s->requests[i];
-
-        if (req->coroutine && req->receiving) {
-            req->receiving = false;
-            aio_co_wake(req->coroutine);
-        }
-    }
-}
-
 static void reconnect_delay_timer_del(BDRVNBDState *s)
 {
     if (s->reconnect_delay_timer) {
@@ -194,117 +177,23 @@ static void reconnect_delay_timer_init(BDRVNBDState *s, 
uint64_t expire_time_ns)
     timer_mod(s->reconnect_delay_timer, expire_time_ns);
 }
 
-static void nbd_client_detach_aio_context(BlockDriverState *bs)
-{
-    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
-
-    /* Timer is deleted in nbd_client_co_drain_begin() */
-    assert(!s->reconnect_delay_timer);
-    /*
-     * If reconnect is in progress we may have no ->ioc.  It will be
-     * re-instantiated in the proper aio context once the connection is
-     * reestablished.
-     */
-    if (s->ioc) {
-        qio_channel_detach_aio_context(QIO_CHANNEL(s->ioc));
-    }
-}
-
-static void nbd_client_attach_aio_context_bh(void *opaque)
-{
-    BlockDriverState *bs = opaque;
-    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
-
-    if (s->connection_co) {
-        /*
-         * The node is still drained, so we know the coroutine has yielded in
-         * nbd_read_eof(), the only place where bs->in_flight can reach 0, or
-         * it is entered for the first time. Both places are safe for entering
-         * the coroutine.
-         */
-        qemu_aio_coroutine_enter(bs->aio_context, s->connection_co);
-    }
-    bdrv_dec_in_flight(bs);
-}
-
-static void nbd_client_attach_aio_context(BlockDriverState *bs,
-                                          AioContext *new_context)
-{
-    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
-
-    /*
-     * s->connection_co is either yielded from nbd_receive_reply or from
-     * nbd_co_reconnect_loop()
-     */
-    if (nbd_client_connected(s)) {
-        qio_channel_attach_aio_context(QIO_CHANNEL(s->ioc), new_context);
-    }
-
-    bdrv_inc_in_flight(bs);
-
-    /*
-     * Need to wait here for the BH to run because the BH must run while the
-     * node is still drained.
-     */
-    aio_wait_bh_oneshot(new_context, nbd_client_attach_aio_context_bh, bs);
-}
-
-static void coroutine_fn nbd_client_co_drain_begin(BlockDriverState *bs)
-{
-    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
-
-    s->drained = true;
-    if (s->connection_co_sleep_ns_state) {
-        qemu_co_sleep_wake(s->connection_co_sleep_ns_state);
-    }
-
-    nbd_co_establish_connection_cancel(s->conn);
-
-    reconnect_delay_timer_del(s);
-
-    if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTING_WAIT) {
-        s->state = NBD_CLIENT_CONNECTING_NOWAIT;
-        qemu_co_queue_restart_all(&s->free_sema);
-    }
-}
-
-static void coroutine_fn nbd_client_co_drain_end(BlockDriverState *bs)
-{
-    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
-
-    s->drained = false;
-    if (s->wait_drained_end) {
-        s->wait_drained_end = false;
-        aio_co_wake(s->connection_co);
-    }
-}
-
-
 static void nbd_teardown_connection(BlockDriverState *bs)
 {
     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
 
+    assert(!s->in_flight);
+    assert(!s->receive_co);
+
     if (s->ioc) {
         /* finish any pending coroutines */
         qio_channel_shutdown(s->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
+        yank_unregister_function(BLOCKDEV_YANK_INSTANCE(s->bs->node_name),
+                                 nbd_yank, s->bs);
+        object_unref(OBJECT(s->ioc));
+        s->ioc = NULL;
     }
 
     s->state = NBD_CLIENT_QUIT;
-    if (s->connection_co) {
-        if (s->connection_co_sleep_ns_state) {
-            qemu_co_sleep_wake(s->connection_co_sleep_ns_state);
-        }
-        nbd_co_establish_connection_cancel(s->conn);
-    }
-    if (qemu_in_coroutine()) {
-        s->teardown_co = qemu_coroutine_self();
-        /* connection_co resumes us when it terminates */
-        qemu_coroutine_yield();
-        s->teardown_co = NULL;
-    } else {
-        BDRV_POLL_WHILE(bs, s->connection_co);
-    }
-    assert(!s->connection_co);
 }
 
 static bool nbd_client_connecting(BDRVNBDState *s)
@@ -367,10 +256,11 @@ int nbd_co_do_establish_connection(BlockDriverState *bs, 
Error **errp)
 {
     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
     int ret;
+    bool blocking = nbd_client_connecting_wait(s);
 
     assert(!s->ioc);
 
-    s->ioc = nbd_co_establish_connection(s->conn, &s->info, true, errp);
+    s->ioc = nbd_co_establish_connection(s->conn, &s->info, blocking, errp);
     if (!s->ioc) {
         return -ECONNREFUSED;
     }
@@ -404,6 +294,7 @@ int nbd_co_do_establish_connection(BlockDriverState *bs, 
Error **errp)
     return 0;
 }
 
+/* called under s->send_mutex */
 static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
 {
     if (!nbd_client_connecting(s)) {
@@ -412,23 +303,29 @@ static coroutine_fn void 
nbd_reconnect_attempt(BDRVNBDState *s)
 
     /* Wait for completion of all in-flight requests */
 
-    qemu_co_mutex_lock(&s->send_mutex);
-
-    while (s->in_flight > 0) {
-        qemu_co_mutex_unlock(&s->send_mutex);
-        nbd_recv_coroutines_wake_all(s);
-        s->wait_in_flight = true;
+    if (s->in_flight) {
+        s->in_flight_waiter = qemu_coroutine_self();
         qemu_coroutine_yield();
-        s->wait_in_flight = false;
-        qemu_co_mutex_lock(&s->send_mutex);
+        assert(!s->in_flight_waiter);
+        assert(!s->in_flight);
     }
 
-    qemu_co_mutex_unlock(&s->send_mutex);
-
     if (!nbd_client_connecting(s)) {
         return;
     }
 
+    if (nbd_client_connecting_wait(s) && s->reconnect_delay &&
+        !s->reconnect_delay_timer)
+    {
+        /*
+         * It's first reconnect attempt after switching to
+         * NBD_CLIENT_CONNECTING_WAIT
+         */
+        reconnect_delay_timer_init(s,
+            qemu_clock_get_ns(QEMU_CLOCK_REALTIME) +
+            s->reconnect_delay * NANOSECONDS_PER_SECOND);
+    }
+
     /*
      * Now we are sure that nobody is accessing the channel, and no one will
      * try until we set the state to CONNECTED.
@@ -446,73 +343,34 @@ static coroutine_fn void 
nbd_reconnect_attempt(BDRVNBDState *s)
     nbd_co_do_establish_connection(s->bs, NULL);
 }
 
-static coroutine_fn void nbd_co_reconnect_loop(BDRVNBDState *s)
-{
-    uint64_t timeout = 1 * NANOSECONDS_PER_SECOND;
-    uint64_t max_timeout = 16 * NANOSECONDS_PER_SECOND;
-
-    if (qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTING_WAIT) {
-        reconnect_delay_timer_init(s, qemu_clock_get_ns(QEMU_CLOCK_REALTIME) +
-                                   s->reconnect_delay * 
NANOSECONDS_PER_SECOND);
-    }
-
-    nbd_reconnect_attempt(s);
-
-    while (nbd_client_connecting(s)) {
-        if (s->drained) {
-            bdrv_dec_in_flight(s->bs);
-            s->wait_drained_end = true;
-            while (s->drained) {
-                /*
-                 * We may be entered once from nbd_client_attach_aio_context_bh
-                 * and then from nbd_client_co_drain_end. So here is a loop.
-                 */
-                qemu_coroutine_yield();
-            }
-            bdrv_inc_in_flight(s->bs);
-        } else {
-            qemu_co_sleep_ns_wakeable(QEMU_CLOCK_REALTIME, timeout,
-                                      &s->connection_co_sleep_ns_state);
-            if (s->drained) {
-                continue;
-            }
-            if (timeout < max_timeout) {
-                timeout *= 2;
-            }
-        }
-
-        nbd_reconnect_attempt(s);
-    }
-
-    reconnect_delay_timer_del(s);
-}
-
-static coroutine_fn void nbd_connection_entry(void *opaque)
+static coroutine_fn void nbd_receive_replies(BDRVNBDState *s, uint64_t handle)
 {
-    BDRVNBDState *s = opaque;
     uint64_t i;
     int ret = 0;
     Error *local_err = NULL;
 
-    while (qatomic_load_acquire(&s->state) != NBD_CLIENT_QUIT) {
-        /*
-         * The NBD client can only really be considered idle when it has
-         * yielded from qio_channel_readv_all_eof(), waiting for data. This is
-         * the point where the additional scheduled coroutine entry happens
-         * after nbd_client_attach_aio_context().
-         *
-         * Therefore we keep an additional in_flight reference all the time and
-         * only drop it temporarily here.
-         */
+    i = HANDLE_TO_INDEX(s, handle);
+    if (s->receive_co) {
+        assert(s->receive_co != qemu_coroutine_self());
 
-        if (nbd_client_connecting(s)) {
-            nbd_co_reconnect_loop(s);
-        }
+        /* Another request coroutine is receiving now */
+        s->requests[i].receiving = true;
+        qemu_coroutine_yield();
+        assert(!s->requests[i].receiving);
 
-        if (!nbd_client_connected(s)) {
-            continue;
+        if (s->receive_co != qemu_coroutine_self()) {
+            /*
+             * We are either failed or done, caller uses nbd_client_connected()
+             * to distinguish.
+             */
+            return;
         }
+    }
+
+    assert(s->receive_co == 0 || s->receive_co == qemu_coroutine_self());
+    s->receive_co = qemu_coroutine_self();
 
+    while (nbd_client_connected(s)) {
         assert(s->reply.handle == 0);
         ret = nbd_receive_reply(s->bs, s->ioc, &s->reply, &local_err);
 
@@ -522,8 +380,21 @@ static coroutine_fn void nbd_connection_entry(void *opaque)
             local_err = NULL;
         }
         if (ret <= 0) {
-            nbd_channel_error(s, ret ? ret : -EIO);
-            continue;
+            ret = ret ? ret : -EIO;
+            nbd_channel_error(s, ret);
+            goto out;
+        }
+
+        if (!nbd_client_connected(s)) {
+            ret = -EIO;
+            goto out;
+        }
+
+        i = HANDLE_TO_INDEX(s, s->reply.handle);
+
+        if (s->reply.handle == handle) {
+            ret = 0;
+            goto out;
         }
 
         /*
@@ -531,50 +402,49 @@ static coroutine_fn void nbd_connection_entry(void 
*opaque)
          * handler acts as a synchronization point and ensures that only
          * one coroutine is called until the reply finishes.
          */
-        i = HANDLE_TO_INDEX(s, s->reply.handle);
         if (i >= MAX_NBD_REQUESTS ||
             !s->requests[i].coroutine ||
             !s->requests[i].receiving ||
             (nbd_reply_is_structured(&s->reply) && !s->info.structured_reply))
         {
             nbd_channel_error(s, -EINVAL);
-            continue;
+            ret = -EINVAL;
+            goto out;
         }
 
-        /*
-         * We're woken up again by the request itself.  Note that there
-         * is no race between yielding and reentering connection_co.  This
-         * is because:
-         *
-         * - if the request runs on the same AioContext, it is only
-         *   entered after we yield
-         *
-         * - if the request runs on a different AioContext, reentering
-         *   connection_co happens through a bottom half, which can only
-         *   run after we yield.
-         */
         s->requests[i].receiving = false;
         aio_co_wake(s->requests[i].coroutine);
         qemu_coroutine_yield();
     }
 
-    qemu_co_queue_restart_all(&s->free_sema);
-    nbd_recv_coroutines_wake_all(s);
-    bdrv_dec_in_flight(s->bs);
-
-    s->connection_co = NULL;
-    if (s->ioc) {
-        qio_channel_detach_aio_context(QIO_CHANNEL(s->ioc));
-        yank_unregister_function(BLOCKDEV_YANK_INSTANCE(s->bs->node_name),
-                                 nbd_yank, s->bs);
-        object_unref(OBJECT(s->ioc));
-        s->ioc = NULL;
-    }
+out:
+    if (ret < 0) {
+        s->receive_co = NULL;
+        for (i = 0; i < MAX_NBD_REQUESTS; i++) {
+            NBDClientRequest *req = &s->requests[i];
 
-    if (s->teardown_co) {
-        aio_co_wake(s->teardown_co);
+            if (req->coroutine && req->receiving) {
+                req->receiving = false;
+                aio_co_wake(req->coroutine);
+            }
+        }
+    } else {
+        /*
+         * If there are still some receiving request, it should become next
+         * "receive_co"
+         */
+        for (i = 0; i < MAX_NBD_REQUESTS; i++) {
+            NBDClientRequest *req = &s->requests[i];
+
+            if (req->coroutine && req->receiving) {
+                req->receiving = false;
+                s->receive_co = req->coroutine;
+                aio_co_wake(req->coroutine);
+                return;
+            }
+        }
+        s->receive_co = NULL;
     }
-    aio_wait_kick();
 }
 
 static int nbd_co_send_request(BlockDriverState *bs,
@@ -585,7 +455,15 @@ static int nbd_co_send_request(BlockDriverState *bs,
     int rc, i = -1;
 
     qemu_co_mutex_lock(&s->send_mutex);
-    while (s->in_flight == MAX_NBD_REQUESTS || nbd_client_connecting_wait(s)) {
+
+    nbd_reconnect_attempt(s);
+
+    if (!nbd_client_connected(s)) {
+        qemu_co_mutex_unlock(&s->send_mutex);
+        return -EIO;
+    }
+
+    while (s->in_flight == MAX_NBD_REQUESTS) {
         qemu_co_queue_wait(&s->free_sema, &s->send_mutex);
     }
 
@@ -636,10 +514,10 @@ err:
             s->requests[i].coroutine = NULL;
             s->in_flight--;
         }
-        if (s->in_flight == 0 && s->wait_in_flight) {
-            aio_co_wake(s->connection_co);
-        } else {
-            qemu_co_queue_next(&s->free_sema);
+        if (s->in_flight == 0 && s->in_flight_waiter) {
+            Coroutine *co = s->in_flight_waiter;
+            s->in_flight_waiter = NULL;
+            aio_co_wake(co);
         }
     }
     qemu_co_mutex_unlock(&s->send_mutex);
@@ -938,9 +816,7 @@ static coroutine_fn int nbd_co_do_receive_one_chunk(
     *request_ret = 0;
 
     /* Wait until we're woken up by nbd_connection_entry.  */
-    s->requests[i].receiving = true;
-    qemu_coroutine_yield();
-    assert(!s->requests[i].receiving);
+    nbd_receive_replies(s, handle);
     if (!nbd_client_connected(s)) {
         error_setg(errp, "Connection closed");
         return -EIO;
@@ -1033,13 +909,8 @@ static coroutine_fn int nbd_co_receive_one_chunk(
     }
     s->reply.handle = 0;
 
-    if (s->connection_co && !s->wait_in_flight) {
-        /*
-         * We must check s->wait_in_flight, because we may entered by
-         * nbd_recv_coroutines_wake_all(), in this case we should not
-         * wake connection_co here, it will woken by last request.
-         */
-        aio_co_wake(s->connection_co);
+    if (s->receive_co) {
+        aio_co_wake(s->receive_co);
     }
 
     return ret;
@@ -1151,8 +1022,10 @@ break_loop:
 
     qemu_co_mutex_lock(&s->send_mutex);
     s->in_flight--;
-    if (s->in_flight == 0 && s->wait_in_flight) {
-        aio_co_wake(s->connection_co);
+    if (s->in_flight == 0 && s->in_flight_waiter) {
+        Coroutine *co = s->in_flight_waiter;
+        s->in_flight_waiter = NULL;
+        aio_co_wake(co);
     } else {
         qemu_co_queue_next(&s->free_sema);
     }
@@ -1980,14 +1853,13 @@ static int nbd_open(BlockDriverState *bs, QDict 
*options, int flags,
                                         monitor_cur());
 
     /* TODO: Configurable retry-until-timeout behaviour.*/
+    s->state = NBD_CLIENT_CONNECTING_WAIT;
     ret = nbd_do_establish_connection(bs, errp);
     if (ret < 0) {
         goto fail;
     }
 
-    s->connection_co = qemu_coroutine_create(nbd_connection_entry, s);
-    bdrv_inc_in_flight(bs);
-    aio_co_schedule(bdrv_get_aio_context(bs), s->connection_co);
+    nbd_client_connection_enable_retry(s->conn);
 
     return 0;
 
@@ -2141,6 +2013,8 @@ static void nbd_cancel_in_flight(BlockDriverState *bs)
         s->state = NBD_CLIENT_CONNECTING_NOWAIT;
         qemu_co_queue_restart_all(&s->free_sema);
     }
+
+    nbd_co_establish_connection_cancel(s->conn);
 }
 
 static BlockDriver bdrv_nbd = {
@@ -2161,10 +2035,6 @@ static BlockDriver bdrv_nbd = {
     .bdrv_refresh_limits        = nbd_refresh_limits,
     .bdrv_co_truncate           = nbd_co_truncate,
     .bdrv_getlength             = nbd_getlength,
-    .bdrv_detach_aio_context    = nbd_client_detach_aio_context,
-    .bdrv_attach_aio_context    = nbd_client_attach_aio_context,
-    .bdrv_co_drain_begin        = nbd_client_co_drain_begin,
-    .bdrv_co_drain_end          = nbd_client_co_drain_end,
     .bdrv_refresh_filename      = nbd_refresh_filename,
     .bdrv_co_block_status       = nbd_client_co_block_status,
     .bdrv_dirname               = nbd_dirname,
@@ -2190,10 +2060,6 @@ static BlockDriver bdrv_nbd_tcp = {
     .bdrv_refresh_limits        = nbd_refresh_limits,
     .bdrv_co_truncate           = nbd_co_truncate,
     .bdrv_getlength             = nbd_getlength,
-    .bdrv_detach_aio_context    = nbd_client_detach_aio_context,
-    .bdrv_attach_aio_context    = nbd_client_attach_aio_context,
-    .bdrv_co_drain_begin        = nbd_client_co_drain_begin,
-    .bdrv_co_drain_end          = nbd_client_co_drain_end,
     .bdrv_refresh_filename      = nbd_refresh_filename,
     .bdrv_co_block_status       = nbd_client_co_block_status,
     .bdrv_dirname               = nbd_dirname,
@@ -2219,10 +2085,6 @@ static BlockDriver bdrv_nbd_unix = {
     .bdrv_refresh_limits        = nbd_refresh_limits,
     .bdrv_co_truncate           = nbd_co_truncate,
     .bdrv_getlength             = nbd_getlength,
-    .bdrv_detach_aio_context    = nbd_client_detach_aio_context,
-    .bdrv_attach_aio_context    = nbd_client_attach_aio_context,
-    .bdrv_co_drain_begin        = nbd_client_co_drain_begin,
-    .bdrv_co_drain_end          = nbd_client_co_drain_end,
     .bdrv_refresh_filename      = nbd_refresh_filename,
     .bdrv_co_block_status       = nbd_client_co_block_status,
     .bdrv_dirname               = nbd_dirname,
diff --git a/nbd/client.c b/nbd/client.c
index 0c2db4bcba..30d5383cb1 100644
--- a/nbd/client.c
+++ b/nbd/client.c
@@ -1434,9 +1434,7 @@ nbd_read_eof(BlockDriverState *bs, QIOChannel *ioc, void 
*buffer, size_t size,
 
         len = qio_channel_readv(ioc, &iov, 1, errp);
         if (len == QIO_CHANNEL_ERR_BLOCK) {
-            bdrv_dec_in_flight(bs);
             qio_channel_yield(ioc, G_IO_IN);
-            bdrv_inc_in_flight(bs);
             continue;
         } else if (len < 0) {
             return -EIO;
-- 
2.29.2




reply via email to

[Prev in Thread] Current Thread [Next in Thread]