qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [PATCH v3 1/3] QIOChannel: Add io_async_writev & io_async_flush call


From: Daniel P . Berrangé
Subject: Re: [PATCH v3 1/3] QIOChannel: Add io_async_writev & io_async_flush callbacks
Date: Fri, 24 Sep 2021 18:16:04 +0100
User-agent: Mutt/2.0.7 (2021-05-04)

On Wed, Sep 22, 2021 at 07:24:21PM -0300, Leonardo Bras wrote:
> Adds io_async_writev and io_async_flush as optional callback to 
> QIOChannelClass,
> allowing the implementation of asynchronous writes by subclasses.
> 
> How to use them:
> - Write data using qio_channel_async_writev(),
> - Wait write completion with qio_channel_async_flush().
> 
> Notes:
> Some asynchronous implementations may benefit from zerocopy mechanisms, so 
> it's
> recommended to keep the write buffer untouched until the return of
> qio_channel_async_flush().
> 
> As the new callbacks are optional, if a subclass does not implement them
> there will be a fallback to the mandatory synchronous implementation:
> - io_async_writev will fallback to io_writev,
> - io_async_flush will return without changing anything.
> This makes simpler for the user to make use of the asynchronous 
> implementation.
> 
> Also, some functions like qio_channel_writev_full_all() were adapted to
> offer an async version, and make better use of the new callbacks.
> 
> Signed-off-by: Leonardo Bras <leobras@redhat.com>
> ---
>  include/io/channel.h | 93 +++++++++++++++++++++++++++++++++++++-------
>  io/channel.c         | 66 ++++++++++++++++++++++++-------
>  2 files changed, 129 insertions(+), 30 deletions(-)
> 
> diff --git a/include/io/channel.h b/include/io/channel.h
> index 88988979f8..74f2e3ae8a 100644
> --- a/include/io/channel.h
> +++ b/include/io/channel.h
> @@ -136,6 +136,14 @@ struct QIOChannelClass {
>                                    IOHandler *io_read,
>                                    IOHandler *io_write,
>                                    void *opaque);
> +    ssize_t (*io_async_writev)(QIOChannel *ioc,
> +                               const struct iovec *iov,
> +                               size_t niov,
> +                               int *fds,
> +                               size_t nfds,
> +                               Error **errp);
> +   void (*io_async_flush)(QIOChannel *ioc,
> +                          Error **errp);
>  };
>  
>  /* General I/O handling functions */
> @@ -255,12 +263,17 @@ ssize_t qio_channel_readv_full(QIOChannel *ioc,
>   * or QIO_CHANNEL_ERR_BLOCK if no data is can be sent
>   * and the channel is non-blocking
>   */
> -ssize_t qio_channel_writev_full(QIOChannel *ioc,
> -                                const struct iovec *iov,
> -                                size_t niov,
> -                                int *fds,
> -                                size_t nfds,
> -                                Error **errp);
> +ssize_t __qio_channel_writev_full(QIOChannel *ioc,

Using "__" is undesirable as that namespace is reserved.

> +                                  const struct iovec *iov,
> +                                  size_t niov,
> +                                  int *fds,
> +                                  size_t nfds,
> +                                  bool async,
> +                                  Error **errp);
> +#define qio_channel_writev_full(ioc, iov, niov, fds, nfds, errp) \
> +    __qio_channel_writev_full(ioc, iov, niov, fds, nfds, false, errp)
> +#define qio_channel_async_writev_full(ioc, iov, niov, fds, nfds, errp) \
> +    __qio_channel_writev_full(ioc, iov, niov, fds, nfds, true, errp)

The API docs only cover the first function, not the second.


>  /**
>   * qio_channel_readv_all_eof:
> @@ -339,10 +352,15 @@ int qio_channel_readv_all(QIOChannel *ioc,
>   *
>   * Returns: 0 if all bytes were written, or -1 on error
>   */
> -int qio_channel_writev_all(QIOChannel *ioc,
> -                           const struct iovec *iov,
> -                           size_t niov,
> -                           Error **erp);
> +int __qio_channel_writev_all(QIOChannel *ioc,
> +                             const struct iovec *iov,
> +                             size_t niov,
> +                             bool async,
> +                             Error **erp);
> +#define qio_channel_writev_all(ioc, iov, niov, erp) \
> +    __qio_channel_writev_all(ioc, iov, niov, false, erp)
> +#define qio_channel_async_writev_all(ioc, iov, niov, erp) \
> +    __qio_channel_writev_all(ioc, iov, niov, true, erp)


>  
>  /**
>   * qio_channel_readv:
> @@ -849,10 +867,55 @@ int qio_channel_readv_full_all(QIOChannel *ioc,
>   * Returns: 0 if all bytes were written, or -1 on error
>   */
>  
> -int qio_channel_writev_full_all(QIOChannel *ioc,
> -                                const struct iovec *iov,
> -                                size_t niov,
> -                                int *fds, size_t nfds,
> -                                Error **errp);
> +int __qio_channel_writev_full_all(QIOChannel *ioc,
> +                                  const struct iovec *iov,
> +                                  size_t niov,
> +                                  int *fds, size_t nfds,
> +                                  bool async, Error **errp);
> +#define qio_channel_writev_full_all(ioc, iov, niov, fds, nfds, errp) \
> +    __qio_channel_writev_full_all(ioc, iov, niov, fds, nfds, false, errp)
> +#define qio_channel_async_writev_full_all(ioc, iov, niov, fds, nfds, errp) \
> +    __qio_channel_writev_full_all(ioc, iov, niov, fds, nfds, true, errp)
> +
> +/**
> + * qio_channel_async_writev:
> + * @ioc: the channel object
> + * @iov: the array of memory regions to write data from
> + * @niov: the length of the @iov array
> + * @fds: an array of file handles to send
> + * @nfds: number of file handles in @fds
> + * @errp: pointer to a NULL-initialized error object
> + *
> + * Behaves like qio_channel_writev_full, but will send
> + * data asynchronously, this meaning this function
> + * may return before the data is actually sent.
> + *
> + * If at some point it's necessary wait for all data to be
> + * sent, use qio_channel_async_flush().
> + *
> + * If not implemented, falls back to the default writev
> + */

I'm not convinced by the fallback here. If you're
layering I/O channels this is not going to result in
desirable behaviour.

eg if QIOChannelTLS doesn't implement async, then when
you call async_writev, it'lll invoke sync writev on
the QIOChannelTLS, which will in turn invoke the sync
writev on QIOChannelSocket, despite the latter having
async writev support.  I think this is very misleading
behaviour

> +ssize_t qio_channel_async_writev(QIOChannel *ioc,
> +                                 const struct iovec *iov,
> +                                 size_t niov,
> +                                 int *fds,
> +                                 size_t nfds,
> +                                 Error **errp);

This is missing any flags. We need something like

   QIO_CHANNEL_WRITE_FLAG_ZEROCOPY

passed in an 'unsigned int flags' parameter. This in
turn makes me question whether we should have the
common helpers at all, as the api is going to be
different for sync vs async.

The QIOChannelFeature enum probably ought to be
extended with QIO_CHANNEL_FEATURE_WRITE_ZEROCOPY with
support for probing whether that's supported or not.

> +
> +/**
> + * qio_channel_async_flush:
> + * @ioc: the channel object
> + * @errp: pointer to a NULL-initialized error object
> + *
> + * Will lock until every packet queued with qio_channel_async_writev()

s/lock/block/ I presume.

> + * is sent.
> + *
> + * If not implemented, returns without changing anything.
> + */
> +
> +void qio_channel_async_flush(QIOChannel *ioc,
> +                             Error **errp);
> +
>  
>  #endif /* QIO_CHANNEL_H */
> diff --git a/io/channel.c b/io/channel.c
> index e8b019dc36..c4819b922f 100644
> --- a/io/channel.c
> +++ b/io/channel.c
> @@ -67,12 +67,13 @@ ssize_t qio_channel_readv_full(QIOChannel *ioc,
>  }
>  
>  
> -ssize_t qio_channel_writev_full(QIOChannel *ioc,
> -                                const struct iovec *iov,
> -                                size_t niov,
> -                                int *fds,
> -                                size_t nfds,
> -                                Error **errp)
> +ssize_t __qio_channel_writev_full(QIOChannel *ioc,
> +                                  const struct iovec *iov,
> +                                  size_t niov,
> +                                  int *fds,
> +                                  size_t nfds,
> +                                  bool async,
> +                                  Error **errp)
>  {
>      QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
>  
> @@ -83,6 +84,10 @@ ssize_t qio_channel_writev_full(QIOChannel *ioc,
>          return -1;
>      }
>  
> +    if (async && klass->io_async_writev) {
> +        return klass->io_async_writev(ioc, iov, niov, fds, nfds, errp);
> +    }
> +
>      return klass->io_writev(ioc, iov, niov, fds, nfds, errp);
>  }
>  
> @@ -212,19 +217,20 @@ int qio_channel_readv_full_all(QIOChannel *ioc,
>      return ret;
>  }
>  
> -int qio_channel_writev_all(QIOChannel *ioc,
> -                           const struct iovec *iov,
> -                           size_t niov,
> -                           Error **errp)
> +int __qio_channel_writev_all(QIOChannel *ioc,
> +                             const struct iovec *iov,
> +                             size_t niov,
> +                             bool async,
> +                             Error **errp)
>  {
> -    return qio_channel_writev_full_all(ioc, iov, niov, NULL, 0, errp);
> +    return __qio_channel_writev_full_all(ioc, iov, niov, NULL, 0, async, 
> errp);
>  }
>  
> -int qio_channel_writev_full_all(QIOChannel *ioc,
> +int __qio_channel_writev_full_all(QIOChannel *ioc,
>                                  const struct iovec *iov,
>                                  size_t niov,
>                                  int *fds, size_t nfds,
> -                                Error **errp)
> +                                bool async, Error **errp)
>  {
>      int ret = -1;
>      struct iovec *local_iov = g_new(struct iovec, niov);
> @@ -237,8 +243,8 @@ int qio_channel_writev_full_all(QIOChannel *ioc,
>  
>      while (nlocal_iov > 0) {
>          ssize_t len;
> -        len = qio_channel_writev_full(ioc, local_iov, nlocal_iov, fds, nfds,
> -                                      errp);
> +        len = __qio_channel_writev_full(ioc, local_iov, nlocal_iov, fds, 
> nfds,
> +                                        async, errp);
>          if (len == QIO_CHANNEL_ERR_BLOCK) {
>              if (qemu_in_coroutine()) {
>                  qio_channel_yield(ioc, G_IO_OUT);
> @@ -474,6 +480,36 @@ off_t qio_channel_io_seek(QIOChannel *ioc,
>  }
>  
>  
> +ssize_t qio_channel_async_writev(QIOChannel *ioc,
> +                                 const struct iovec *iov,
> +                                 size_t niov,
> +                                 int *fds,
> +                                 size_t nfds,
> +                                 Error **errp)
> +{
> +     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
> +
> +    if (!klass->io_async_writev) {
> +        return klass->io_writev(ioc, iov, niov, fds, nfds, errp);
> +    }
> +
> +     return klass->io_async_writev(ioc, iov, niov, fds, nfds, errp);
> +}
> +
> +
> +void qio_channel_async_flush(QIOChannel *ioc,
> +                             Error **errp)
> +{
> +     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
> +
> +    if (!klass->io_async_flush) {
> +        return;
> +    }
> +
> +     klass->io_async_flush(ioc, errp);
> +}
> +
> +
>  static void qio_channel_restart_read(void *opaque)
>  {
>      QIOChannel *ioc = opaque;
> -- 
> 2.33.0
> 

Regards,
Daniel
-- 
|: https://berrange.com      -o-    https://www.flickr.com/photos/dberrange :|
|: https://libvirt.org         -o-            https://fstop138.berrange.com :|
|: https://entangle-photo.org    -o-    https://www.instagram.com/dberrange :|




reply via email to

[Prev in Thread] Current Thread [Next in Thread]