qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[PATCH] block-jobs: add final flush


From: Vladimir Sementsov-Ogievskiy
Subject: [PATCH] block-jobs: add final flush
Date: Wed, 4 Oct 2023 16:56:32 +0300

From: Vladimir Sementsov-Ogievskiy <vsementsov@virtuozzo.com>

Actually block job is not completed without the final flush. It's
rather unexpected to have broken target when job was successfully
completed long ago and now we fail to flush or process just
crashed/killed.

Mirror job already has mirror_flush() for this. So, it's OK.

Add similar things for other jobs: backup, stream, commit.

Note, that stream has (documented) different treatment of IGNORE
action: it don't retry the operation, continue execution and report
error at last. We keep it for final flush too.

Signed-off-by: Vladimir Sementsov-Ogievskiy <vsementsov@yandex-team.ru>
---

Was: [PATCH v4] block-jobs: flush target at the end of .run()
  But now rewritten.
Supersedes: <20230725174008.1147467-1-vsementsov@yandex-team.ru>

 block/backup.c             |  2 +-
 block/block-copy.c         |  7 +++++++
 block/commit.c             | 16 ++++++++++++----
 block/stream.c             | 21 +++++++++++++++++----
 include/block/block-copy.h |  1 +
 5 files changed, 38 insertions(+), 9 deletions(-)

diff --git a/block/backup.c b/block/backup.c
index db3791f4d1..6a1321092a 100644
--- a/block/backup.c
+++ b/block/backup.c
@@ -156,7 +156,7 @@ static int coroutine_fn backup_loop(BackupBlockJob *job)
         job->bg_bcs_call = s = block_copy_async(job->bcs, 0,
                 QEMU_ALIGN_UP(job->len, job->cluster_size),
                 job->perf.max_workers, job->perf.max_chunk,
-                backup_block_copy_callback, job);
+                true, backup_block_copy_callback, job);
 
         while (!block_copy_call_finished(s) &&
                !job_is_cancelled(&job->common.job))
diff --git a/block/block-copy.c b/block/block-copy.c
index 1c60368d72..9b8672d4c8 100644
--- a/block/block-copy.c
+++ b/block/block-copy.c
@@ -54,6 +54,7 @@ typedef struct BlockCopyCallState {
     int max_workers;
     int64_t max_chunk;
     bool ignore_ratelimit;
+    bool need_final_flush;
     BlockCopyAsyncCallbackFunc cb;
     void *cb_opaque;
     /* Coroutine where async block-copy is running */
@@ -880,6 +881,10 @@ block_copy_common(BlockCopyCallState *call_state)
          */
     } while (ret > 0 && !qatomic_read(&call_state->cancelled));
 
+    if (ret == 0 && call_state->need_final_flush) {
+        ret = bdrv_co_flush(s->target->bs);
+    }
+
     qatomic_store_release(&call_state->finished, true);
 
     if (call_state->cb) {
@@ -935,6 +940,7 @@ int coroutine_fn block_copy(BlockCopyState *s, int64_t 
start, int64_t bytes,
 BlockCopyCallState *block_copy_async(BlockCopyState *s,
                                      int64_t offset, int64_t bytes,
                                      int max_workers, int64_t max_chunk,
+                                     bool need_final_flush,
                                      BlockCopyAsyncCallbackFunc cb,
                                      void *cb_opaque)
 {
@@ -946,6 +952,7 @@ BlockCopyCallState *block_copy_async(BlockCopyState *s,
         .bytes = bytes,
         .max_workers = max_workers,
         .max_chunk = max_chunk,
+        .need_final_flush = need_final_flush,
         .cb = cb,
         .cb_opaque = cb_opaque,
 
diff --git a/block/commit.c b/block/commit.c
index aa45beb0f0..5205c77ec9 100644
--- a/block/commit.c
+++ b/block/commit.c
@@ -120,6 +120,7 @@ static int coroutine_fn commit_run(Job *job, Error **errp)
     int64_t n = 0; /* bytes */
     QEMU_AUTO_VFREE void *buf = NULL;
     int64_t len, base_len;
+    BlockErrorAction action;
 
     len = blk_co_getlength(s->top);
     if (len < 0) {
@@ -169,9 +170,8 @@ static int coroutine_fn commit_run(Job *job, Error **errp)
             }
         }
         if (ret < 0) {
-            BlockErrorAction action =
-                block_job_error_action(&s->common, s->on_error,
-                                       error_in_source, -ret);
+            action = block_job_error_action(&s->common, s->on_error,
+                                            error_in_source, -ret);
             if (action == BLOCK_ERROR_ACTION_REPORT) {
                 return ret;
             } else {
@@ -187,7 +187,15 @@ static int coroutine_fn commit_run(Job *job, Error **errp)
         }
     }
 
-    return 0;
+    do {
+        ret = blk_co_flush(s->base);
+        if (ret < 0) {
+            action = block_job_error_action(&s->common, s->on_error,
+                                            false, -ret);
+        }
+    } while (ret < 0 && action != BLOCK_ERROR_ACTION_REPORT);
+
+    return ret;
 }
 
 static const BlockJobDriver commit_job_driver = {
diff --git a/block/stream.c b/block/stream.c
index 133cb72fb4..41eb536feb 100644
--- a/block/stream.c
+++ b/block/stream.c
@@ -143,6 +143,8 @@ static int coroutine_fn stream_run(Job *job, Error **errp)
     int64_t offset = 0;
     int error = 0;
     int64_t n = 0; /* bytes */
+    BlockErrorAction action;
+    int ret;
 
     if (unfiltered_bs == s->base_overlay) {
         /* Nothing to stream */
@@ -159,7 +161,6 @@ static int coroutine_fn stream_run(Job *job, Error **errp)
 
     for ( ; offset < len; offset += n) {
         bool copy;
-        int ret;
 
         /* Note that even when no rate limit is applied we need to yield
          * with no pending I/O here so that bdrv_drain_all() returns.
@@ -196,8 +197,8 @@ static int coroutine_fn stream_run(Job *job, Error **errp)
             ret = stream_populate(s->blk, offset, n);
         }
         if (ret < 0) {
-            BlockErrorAction action =
-                block_job_error_action(&s->common, s->on_error, true, -ret);
+            action = block_job_error_action(&s->common, s->on_error,
+                                            true, -ret);
             if (action == BLOCK_ERROR_ACTION_STOP) {
                 n = 0;
                 continue;
@@ -206,7 +207,7 @@ static int coroutine_fn stream_run(Job *job, Error **errp)
                 error = ret;
             }
             if (action == BLOCK_ERROR_ACTION_REPORT) {
-                break;
+                return error;
             }
         }
 
@@ -217,6 +218,18 @@ static int coroutine_fn stream_run(Job *job, Error **errp)
         }
     }
 
+    do {
+        ret = blk_co_flush(s->blk);
+        if (ret < 0) {
+            action = block_job_error_action(&s->common, s->on_error,
+                                            false, -ret);
+        }
+    } while (ret < 0 && action == BLOCK_ERROR_ACTION_STOP);
+
+    if (error == 0) {
+        error = ret;
+    }
+
     /* Do not remove the backing file if an error was there but ignored. */
     return error;
 }
diff --git a/include/block/block-copy.h b/include/block/block-copy.h
index 0700953ab8..6fe1e07aa3 100644
--- a/include/block/block-copy.h
+++ b/include/block/block-copy.h
@@ -60,6 +60,7 @@ int coroutine_fn block_copy(BlockCopyState *s, int64_t 
offset, int64_t bytes,
 BlockCopyCallState *block_copy_async(BlockCopyState *s,
                                      int64_t offset, int64_t bytes,
                                      int max_workers, int64_t max_chunk,
+                                     bool need_final_flush,
                                      BlockCopyAsyncCallbackFunc cb,
                                      void *cb_opaque);
 
-- 
2.34.1




reply via email to

[Prev in Thread] Current Thread [Next in Thread]