qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[RFC PATCH 4/9] block/curl: Perform IO in fixed size chunks


From: David Edmondson
Subject: [RFC PATCH 4/9] block/curl: Perform IO in fixed size chunks
Date: Tue, 18 Aug 2020 12:08:40 +0100

Do all IO requests to the remote server in 256kB chunks.

Signed-off-by: David Edmondson <david.edmondson@oracle.com>
---
 block/curl.c       | 151 ++++++++++++++++++++++++++++++++-------------
 block/trace-events |   2 +
 2 files changed, 109 insertions(+), 44 deletions(-)

diff --git a/block/curl.c b/block/curl.c
index d2f4de46c9..cfc518efda 100644
--- a/block/curl.c
+++ b/block/curl.c
@@ -78,6 +78,14 @@ static CURLMcode __curl_multi_socket_action(CURLM 
*multi_handle,
 #define CURL_BLOCK_OPT_SSLVERIFY_DEFAULT true
 #define CURL_BLOCK_OPT_TIMEOUT_DEFAULT 5
 
+/* Must be a non-zero power of 2. */
+#define CURL_BLOCK_SIZE (256 * 1024)
+
+/* Align "n" to the start of the containing block. */
+#define CURL_BLOCK_ALIGN(n) ((n) & ~(CURL_BLOCK_SIZE - 1))
+/* The offset of "n" within its' block. */
+#define CURL_BLOCK_OFFSET(n) ((n) & (CURL_BLOCK_SIZE - 1))
+
 struct BDRVCURLState;
 struct CURLState;
 
@@ -86,11 +94,18 @@ static bool libcurl_initialized;
 typedef struct CURLAIOCB {
     Coroutine *co;
     QEMUIOVector *qiov;
+    uint64_t qiov_offset; /* Offset in qiov to place data. */
 
     uint64_t offset;
     uint64_t bytes;
     int ret;
 
+    /*
+     * start and end indicate the subset of the surrounding
+     * CURL_BLOCK_SIZE sized block that is the subject of this
+     * IOCB. They are offsets from the beginning of the underlying
+     * buffer.
+     */
     size_t start;
     size_t end;
 } CURLAIOCB;
@@ -110,7 +125,6 @@ typedef struct CURLState
     char *orig_buf;
     uint64_t buf_start;
     size_t buf_off;
-    size_t buf_len;
     char range[128];
     char errmsg[CURL_ERROR_SIZE];
     char in_use;
@@ -259,11 +273,11 @@ static size_t curl_read_cb(void *ptr, size_t size, size_t 
nmemb, void *opaque)
         goto read_end;
     }
 
-    if (s->buf_off >= s->buf_len) {
+    if (s->buf_off >= CURL_BLOCK_SIZE) {
         /* buffer full, read nothing */
         goto read_end;
     }
-    realsize = MIN(realsize, s->buf_len - s->buf_off);
+    realsize = MIN(realsize, CURL_BLOCK_SIZE - s->buf_off);
     memcpy(s->orig_buf + s->buf_off, ptr, realsize);
     s->buf_off += realsize;
 
@@ -281,35 +295,44 @@ static bool curl_find_buf(BDRVCURLState *s, uint64_t 
start, uint64_t len,
     uint64_t clamped_end = MIN(end, s->len);
     uint64_t clamped_len = clamped_end - start;
 
-    for (i=0; i<CURL_NUM_STATES; i++) {
+    for (i = 0; i < CURL_NUM_STATES; i++) {
         CURLState *state = &s->states[i];
-        uint64_t buf_end = (state->buf_start + state->buf_off);
-        uint64_t buf_fend = (state->buf_start + state->buf_len);
+        /* The end of the currently valid data. */
+        uint64_t buf_end = state->buf_start + state->buf_off;
+        /* The end of the valid data when the IO completes. */
+        uint64_t buf_fend = state->buf_start + CURL_BLOCK_SIZE;
 
         if (!state->orig_buf)
             continue;
         if (!state->buf_off)
             continue;
 
-        // Does the existing buffer cover our section?
+        /*
+         * Does the existing buffer cover our section?
+         */
         if ((start >= state->buf_start) &&
             (start <= buf_end) &&
             (clamped_end >= state->buf_start) &&
             (clamped_end <= buf_end))
         {
-            char *buf = state->orig_buf + (start - state->buf_start);
+            char *buf = state->orig_buf + CURL_BLOCK_OFFSET(start);
 
             trace_curl_pending_hit(qemu_coroutine_self(),
                                    start, len);
-            qemu_iovec_from_buf(acb->qiov, 0, buf, clamped_len);
+            qemu_iovec_from_buf(acb->qiov, acb->qiov_offset, buf, clamped_len);
             if (clamped_len < len) {
-                qemu_iovec_memset(acb->qiov, clamped_len, 0, len - 
clamped_len);
+                qemu_iovec_memset(acb->qiov, acb->qiov_offset + clamped_len,
+                                  0, len - clamped_len);
             }
             acb->ret = 0;
             return true;
         }
 
-        // Wait for unfinished chunks
+        /*
+         * If an in-progress IO will provide the required data, wait
+         * for it to complete - the initiator will complete this
+         * aiocb.
+         */
         if (state->in_use &&
             (start >= state->buf_start) &&
             (start <= buf_fend) &&
@@ -320,10 +343,10 @@ static bool curl_find_buf(BDRVCURLState *s, uint64_t 
start, uint64_t len,
 
             trace_curl_pending_piggyback(qemu_coroutine_self(),
                                          start, len);
-            acb->start = start - state->buf_start;
+            acb->start = CURL_BLOCK_OFFSET(start);
             acb->end = acb->start + clamped_len;
 
-            for (j=0; j<CURL_NUM_ACB; j++) {
+            for (j = 0; j < CURL_NUM_ACB; j++) {
                 if (!state->acb[j]) {
                     state->acb[j] = acb;
                     return true;
@@ -377,7 +400,7 @@ static void curl_multi_check_completion(BDRVCURLState *s)
             for (i = 0; i < CURL_NUM_ACB; i++) {
                 CURLAIOCB *acb = state->acb[i];
 
-                if (acb == NULL) {
+                if (!acb) {
                     continue;
                 }
 
@@ -385,14 +408,15 @@ static void curl_multi_check_completion(BDRVCURLState *s)
                     /* Assert that we have read all data */
                     assert(state->buf_off >= acb->end);
 
-                    qemu_iovec_from_buf(acb->qiov, 0,
+                    qemu_iovec_from_buf(acb->qiov, acb->qiov_offset,
                                         state->orig_buf + acb->start,
                                         acb->end - acb->start);
 
                     if (acb->end - acb->start < acb->bytes) {
                         size_t offset = acb->end - acb->start;
-                        qemu_iovec_memset(acb->qiov, offset, 0,
-                                          acb->bytes - offset);
+
+                        qemu_iovec_memset(acb->qiov, acb->qiov_offset + offset,
+                                          0, acb->bytes - offset);
                     }
                 }
 
@@ -539,6 +563,7 @@ static int curl_init_state(BDRVCURLState *s, CURLState 
*state)
 static void curl_clean_state(CURLState *s)
 {
     int j;
+
     for (j = 0; j < CURL_NUM_ACB; j++) {
         assert(!s->acb[j]);
     }
@@ -856,18 +881,26 @@ static void curl_setup_preadv(BlockDriverState *bs, 
CURLAIOCB *acb)
 
     BDRVCURLState *s = bs->opaque;
 
-    uint64_t start = acb->offset;
-    uint64_t end;
+    /*
+     * Our caller must ensure that this request does not span two
+     * blocks.
+     */
+    assert(CURL_BLOCK_ALIGN(acb->offset) ==
+           CURL_BLOCK_ALIGN(acb->offset + acb->bytes - 1));
 
     qemu_mutex_lock(&s->mutex);
 
-    // In case we have the requested data already (e.g. read-ahead),
-    // we can just call the callback and be done.
-    if (curl_find_buf(s, start, acb->bytes, acb)) {
+    /*
+     * Check whether the requested data can be found in an existing or
+     * pending IO request.
+     */
+    if (curl_find_buf(s, acb->offset, acb->bytes, acb)) {
         goto out;
     }
 
-    // No cache found, so let's start a new request
+    /*
+     * No cache found, so let's start a new request.
+     */
     for (;;) {
         state = curl_find_state(s);
         if (state) {
@@ -882,16 +915,15 @@ static void curl_setup_preadv(BlockDriverState *bs, 
CURLAIOCB *acb)
         goto out;
     }
 
-    acb->start = 0;
-    acb->end = MIN(acb->bytes, s->len - start);
+    acb->start = CURL_BLOCK_OFFSET(acb->offset);
+    acb->end = acb->start + MIN(acb->bytes, s->len - acb->offset);
 
     state->buf_off = 0;
-    g_free(state->orig_buf);
-    state->buf_start = start;
-    state->buf_len = MIN(acb->end, s->len - start);
-    end = start + state->buf_len - 1;
-    state->orig_buf = g_try_malloc(state->buf_len);
-    if (state->buf_len && state->orig_buf == NULL) {
+    state->buf_start = CURL_BLOCK_ALIGN(acb->offset);
+    if (!state->orig_buf) {
+        state->orig_buf = g_try_malloc(CURL_BLOCK_SIZE);
+    }
+    if (!state->orig_buf) {
         curl_clean_state(state);
         acb->ret = -ENOMEM;
         goto out;
@@ -899,8 +931,10 @@ static void curl_setup_preadv(BlockDriverState *bs, 
CURLAIOCB *acb)
     state->acb[0] = acb;
 
     snprintf(state->range, 127, "%" PRIu64 "-%" PRIu64,
-             s->offset + start, s->offset + end);
-    trace_curl_setup_preadv(qemu_coroutine_self(), start, acb->bytes);
+             s->offset + state->buf_start,
+             s->offset + state->buf_start + CURL_BLOCK_SIZE);
+    trace_curl_setup_preadv(qemu_coroutine_self(), state->buf_start,
+                            CURL_BLOCK_SIZE);
     curl_easy_setopt(state->curl, CURLOPT_RANGE, state->range);
 
     if (curl_multi_add_handle(s->multi, state->curl) != CURLM_OK) {
@@ -921,21 +955,50 @@ out:
 static int coroutine_fn curl_co_preadv(BlockDriverState *bs,
         uint64_t offset, uint64_t bytes, QEMUIOVector *qiov, int flags)
 {
-    CURLAIOCB acb = {
-        .co = qemu_coroutine_self(),
-        .ret = -EINPROGRESS,
-        .qiov = qiov,
-        .offset = offset,
-        .bytes = bytes
-    };
+    /*
+     * The lower layer does all IO in single CURL_BLOCK_SIZE sized and
+     * aligned chunks and cannot handle an IO that spans two blocks,
+     * so split the request here.
+     */
+    int ret = 0;
+    uint64_t qiov_offset = 0;
+    uint64_t off = offset;
 
     trace_curl_co_preadv(qemu_coroutine_self(), offset, bytes);
-    curl_setup_preadv(bs, &acb);
-    while (acb.ret == -EINPROGRESS) {
-        qemu_coroutine_yield();
+
+    while (bytes > 0) {
+        uint64_t len = MIN(bytes, CURL_BLOCK_SIZE - CURL_BLOCK_OFFSET(off));
+        CURLAIOCB acb = {
+            .co = qemu_coroutine_self(),
+            .ret = -EINPROGRESS,
+            .qiov = qiov,
+            .qiov_offset = qiov_offset,
+            .offset = off,
+            .bytes = len,
+        };
+
+        trace_curl_co_preadv_segment(qemu_coroutine_self(), off, len);
+
+        curl_setup_preadv(bs, &acb);
+        while (acb.ret == -EINPROGRESS) {
+            qemu_coroutine_yield();
+        }
+
+        ret = acb.ret;
+        if (ret != 0) {
+            return ret;
+        }
+
+        trace_curl_co_preadv_segment_done(qemu_coroutine_self());
+
+        qiov_offset += len;
+        off += len;
+        bytes -= len;
     }
+
     trace_curl_co_preadv_done(qemu_coroutine_self());
-    return acb.ret;
+
+    return ret;
 }
 
 static void curl_close(BlockDriverState *bs)
diff --git a/block/trace-events b/block/trace-events
index 0b52d2ca1d..72b1e927bf 100644
--- a/block/trace-events
+++ b/block/trace-events
@@ -200,6 +200,8 @@ curl_open(const char *file) "opening %s"
 curl_open_size(uint64_t size) "size = %" PRIu64
 curl_co_preadv(void *co, uint64_t offset, uint64_t bytes) "co %p requests 0x%" 
PRIx64 " + 0x%" PRIx64
 curl_co_preadv_done(void *co) "co %p done"
+curl_co_preadv_segment(void *co, uint64_t offset, uint64_t bytes) "co %p 
requests 0x%" PRIx64 " + 0x%" PRIx64
+curl_co_preadv_segment_done(void *co) "co %p done"
 curl_setup_preadv(void *co, uint64_t offset, uint64_t bytes) "co %p requests 
0x%" PRIx64 " + 0x%" PRIx64
 curl_pending_hit(void *co, uint64_t start, uint64_t len) "co %p finds 0x%" 
PRIx64 " + 0x%" PRIx64
 curl_pending_piggyback(void *co, uint64_t start, uint64_t len) "co %p pending 
0x%" PRIx64 " + 0x%" PRIx64
-- 
2.27.0




reply via email to

[Prev in Thread] Current Thread [Next in Thread]