bug-gnulib
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

new module pipe-filter


From: Bruno Haible
Subject: new module pipe-filter
Date: Mon, 20 Jul 2009 01:22:56 +0200
User-agent: KMail/1.9.9

Eric Blake wrote:
> > Such piping, where you write from the current process and read into
> > the current process, may hang on BSD systems, because some data is
> > present in system buffers but the system wants the buffers to be full
> > before it continues. A fix for this hang is to enable non-blocking I/O
> > and use a loop with select() that alternately reads and writes. See
> > <http://git.savannah.gnu.org/gitweb/?p=gettext.git;a=blob;f=gettext-tools/src/msgfilter.c;hb=HEAD#l580>
> 
> Any chance of porting that over to gnulib?

I'm adding a new module 'pipe-filter', that does this.

> At any rate, my understanding 
> is that bison's use of a subpipe is to call m4, which, thanks to the way
> diversions are used, produces no output until after all the input has been
> consumed.  Thus, bison's usage pattern is immune to this particular
> deadlock.

Still, this does not sound very future-proof.

> But you are correct that to avoid deadlock in a generic filter 
> child application, portable applications cannot write more than PIPE_MAX
> bytes without checking whether the read end needs draining, and ...
> avoid problems with partial buffers.

Yes, these are the two possible deadlock types: when both processes are
writing to each other and the pipe buffers in the OS are full, or when
both processes are reading from each other and some data is stuck in
buffers and is not being returned because either an fflush is missing or
a partial read has been disallowed. No generic solution can be found
against the missing fflush, but for the other two cases this modules handles
it.


2009-07-19  Bruno Haible  <address@hidden>

        New module 'pipe-filter'.
        * lib/pipe-filter.h: New file.
        * lib/pipe-filter.c: New file.
        * modules/pipe-filter: New file.

============================== lib/pipe-filter.h ==============================
/* Filtering of data through a subprocess.
   Copyright (C) 2009 Free Software Foundation, Inc.
   Written by Bruno Haible <address@hidden>, 2009.

   This program is free software: you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 3 of the License, or
   (at your option) any later version.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program.  If not, see <http://www.gnu.org/licenses/>.  */

#ifndef _PIPE_FILTER_H
#define _PIPE_FILTER_H

#include <stdbool.h>
#include <stddef.h>


#ifdef __cplusplus
extern "C" {
#endif


/* Piping data through a subprocess in the naïve way - write data to the
   subprocess and read from the subprocess when you expect it to have
   produced results - is subject to two kinds of deadlocks:
   1) If you write more than PIPE_MAX bytes or, more generally, if you write
      more bytes than the subprocess can handle at once, the subprocess
      may write its data and wait on you to read it, but you are currently
      busy writing.
   2) When you don't know ahead of time how many bytes the subprocess
      will produce, the usual technique of calling read (fd, buf, BUFSIZ)
      with a fixed BUFSIZ will, on Linux 2.2.17 and on BSD systems, cause
      the read() call to block until *all* of the buffer has been filled.
      But the subprocess cannot produce more data until you gave it more
      input.  But you are currently busy reading from it.
   This module provides a function that pipes data through the subprocess,
   without risking these deadlocks.  */


typedef const void * (*prepare_write_fn) (size_t *num_bytes_p,
                                          void *private_data);
typedef void (*done_write_fn) (void *data_written, size_t num_bytes_written,
                               void *private_data);
typedef void * (*prepare_read_fn) (size_t *num_bytes_p,
                                   void *private_data);
typedef void (*done_read_fn) (void *data_read, size_t num_bytes_read,
                              void *private_data);

/* Create a subprocess and pipe some data through it.
   Arguments:
   - progname is the program name used in error messages.
   - prog_path is the file name of the program to invoke.
   - prog_argv is a NULL terminated argument list, starting with prog_path as
     first element.
   - If null_stderr is true, the subprocess' stderr will be redirected to
     /dev/null, and the usual error message to stderr will be omitted.
     This is suitable when the subprocess does not fulfill an important task.
   - If exit_on_error is true, any error will cause the main process to exit
     with an error status.
   If the subprocess does not terminate correctly, exit if exit_on_error is
   true, otherwise return 127.
   Data is alternatingly written to the subprocess, through the functions
   prepare_write and done_write, and read from the subprocess, through the
   functions prepare_read and done_read.
   Callback arguments:
   - prepare_write (&num_bytes, p) must either return a pointer to data that
     is ready to be written and set num_bytes to the number of bytes ready to
     be written, or return NULL when no more bytes are to be written.
   - done_write (data_written, num_bytes_written) is called after
     num_bytes_written bytes were written.  It is guaranteed that
     num_bytes_written > 0.
   - prepare_read (&num_bytes, p) must return a pointer to a buffer for data
     that can be read and set num_bytes to the size of that buffer
     (must be > 0).
   - done_read (data_read, num_bytes_read, p) is called after num_bytes_read
     bytes were read into the buffer.
   Here p is always the private_data argument.
   Note that the prepare_write/done_write functions and the
   prepare_read/done_read functions may be called in different threads than
   the current thread (depending on platform). But they will not be called
   after the pipe_through_subprocess function has returned.
   Return 0 upon success, or (only if exit_on_error is false):
   - -1 with errno set upon failure,
   - the positive exit code of the subprocess if that failed.  */
extern int pipe_through_subprocess (const char *progname,
                                    const char *prog_path, char **prog_argv,
                                    bool null_stderr, bool exit_on_error,
                                    prepare_write_fn prepare_write,
                                    done_write_fn done_write,
                                    prepare_read_fn prepare_read,
                                    done_read_fn done_read,
                                    void *private_data);


#ifdef __cplusplus
}
#endif


#endif /* _PIPE_FILTER_H */
============================== lib/pipe-filter.c ==============================
/* Filtering of data through a subprocess.
   Copyright (C) 2001-2003, 2008-2009 Free Software Foundation, Inc.
   Written by Bruno Haible <address@hidden>, 2009.

   This program is free software: you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 3 of the License, or
   (at your option) any later version.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program.  If not, see <http://www.gnu.org/licenses/>.  */

#include <config.h>

#include "pipe-filter.h"

#include <errno.h>
#include <fcntl.h>
#include <stdbool.h>
#include <stdint.h>
#include <stdlib.h>
#include <sys/select.h>
#include <unistd.h>

#include "error.h"
#include "gettext.h"
#include "pipe.h"
#include "wait-process.h"

#define _(str) gettext (str)

#ifndef SSIZE_MAX
# define SSIZE_MAX ((ssize_t) (SIZE_MAX / 2))
#endif

/* We use a child process, and communicate through a bidirectional pipe.
   To avoid deadlocks, let the child process decide when it wants to read
   or to write, and let the parent behave accordingly.  The parent uses
   select() to know whether it must write or read.  On platforms without
   select(), we use non-blocking I/O.  (This means the parent is busy
   looping while waiting for the child.  Not good.  But hardly any platform
   lacks select() nowadays.)  */

/* On BeOS select() works only on sockets, not on normal file descriptors.  */
#ifdef __BEOS__
# undef HAVE_SELECT
#endif

#ifdef EINTR

/* EINTR handling for close(), read(), write(), select().
   These functions can return -1/EINTR even though we don't have any
   signal handlers set up, namely when we get interrupted via SIGSTOP.  */

static inline int
nonintr_close (int fd)
{
  int retval;

  do
    retval = close (fd);
  while (retval < 0 && errno == EINTR);

  return retval;
}
#undef close /* avoid warning related to gnulib module unistd */
#define close nonintr_close

static inline ssize_t
nonintr_read (int fd, void *buf, size_t count)
{
  ssize_t retval;

  do
    retval = read (fd, buf, count);
  while (retval < 0 && errno == EINTR);

  return retval;
}
#define read nonintr_read

static inline ssize_t
nonintr_write (int fd, const void *buf, size_t count)
{
  ssize_t retval;

  do
    retval = write (fd, buf, count);
  while (retval < 0 && errno == EINTR);

  return retval;
}
#undef write /* avoid warning on VMS */
#define write nonintr_write

# if HAVE_SELECT

static inline int
nonintr_select (int n, fd_set *readfds, fd_set *writefds, fd_set *exceptfds,
                struct timeval *timeout)
{
  int retval;

  do
    retval = select (n, readfds, writefds, exceptfds, timeout);
  while (retval < 0 && errno == EINTR);

  return retval;
}
#  undef select /* avoid warning on VMS */
#  define select nonintr_select

# endif

#endif

/* Non-blocking I/O.  */
#ifndef O_NONBLOCK
# define O_NONBLOCK O_NDELAY
#endif
#if HAVE_SELECT
# define IS_EAGAIN(errcode) 0
#else
# ifdef EWOULDBLOCK
#  define IS_EAGAIN(errcode) ((errcode) == EAGAIN || (errcode) == EWOULDBLOCK)
# else
#  define IS_EAGAIN(errcode) ((errcode) == EAGAIN)
# endif
#endif

#if (defined _WIN32 || defined __WIN32__) && ! defined __CYGWIN__

struct locals
{
  prepare_write_fn prepare_write;
  done_write_fn done_write;
  prepare_read_fn prepare_read;
  done_read_fn done_read;
  void *private_data;
  int fd[2];
  volatile bool writer_terminated;
  volatile int writer_final_errno;
  volatile bool reader_terminated;
  volatile int reader_final_errno;
};

static unsigned int WINAPI
writer_thread_func (void *thread_arg)
{
  struct locals *l = (struct locals *) thread_arg;

  for (;;)
    {
      size_t bufsize;
      const void *buf = l->prepare_write (&bufsize, l->private_data);
      if (buf != NULL)
        {
          ssize_t nwritten;

          if (bufsize > SSIZE_MAX)
            bufsize = SSIZE_MAX;
          nwritten = write (l->fd[1], buf, bufsize);
          if (nwritten < 0)
            {
              l->writer_final_errno = errno;
              break;
            }
          if (nwritten > 0)
            l->done_write ((void *) buf, nwritten, l->private_data);
        }
      else
        break;
    }

  l->writer_terminated = true;
  _endthreadex (0); /* calls ExitThread (0) */
  abort ();
}

static unsigned int WINAPI
reader_thread_func (void *thread_arg)
{
  struct locals *l = (struct locals *) thread_arg;

  for (;;)
    {
      size_t bufsize;
      void *buf = l->prepare_read (&bufsize, l->private_data);
      if (!(buf != NULL && bufsize > 0))
        /* prepare_read returned wrong values.  */
        abort ();
      if (bufsize > SSIZE_MAX)
        bufsize = SSIZE_MAX;
      {
        ssize_t nread = read (l->fd[0], buf, bufsize);
        if (nread < 0)
          {
            l->reader_final_errno = errno;
            break;
          }
        if (nread > 0)
          l->done_read (buf, nread, l->private_data);
        else
          break;
      }
    }

  l->reader_terminated = true;
  _endthreadex (0); /* calls ExitThread (0) */
  abort ();
}

#endif

int
pipe_through_subprocess (const char *progname,
                         const char *prog_path, char **prog_argv,
                         bool null_stderr, bool exit_on_error,
                         prepare_write_fn prepare_write,
                         done_write_fn done_write,
                         prepare_read_fn prepare_read,
                         done_read_fn done_read,
                         void *private_data)
{
  pid_t child;
  int fd[2];

  /* Open a bidirectional pipe to a subprocess.  */
  child = create_pipe_bidi (progname, prog_path, prog_argv,
                            null_stderr, true, exit_on_error,
                            fd);

#if (defined _WIN32 || defined __WIN32__) && ! defined __CYGWIN__
  /* Native Woe32 API.  */
  /* Pipes have a non-blocking mode, see function SetNamedPipeHandleState and
     the article "Named Pipe Type, Read, and Wait Modes", but Microsoft's
     documentation discourages its use.  So don't use it.
     Asynchronous I/O is also not suitable because it notifies the caller only
     about completion of the I/O request, not about intermediate progress.
     So do the writing and the reading in separate threads.  */
  {
    struct locals l;
    HANDLE handles[2];
    #define writer_thread_handle handles[0]
    #define reader_thread_handle handles[1]
    bool writer_cleaned_up;
    bool reader_cleaned_up;

    l.prepare_write = prepare_write;
    l.done_write = done_write;
    l.prepare_read = prepare_read;
    l.done_read = done_read;
    l.private_data = private_data;
    l.fd[0] = fd[0];
    l.fd[1] = fd[1];
    l.writer_terminated = false;
    l.writer_final_errno = 0;
    l.reader_terminated = false;
    l.reader_final_errno = 0;

    writer_thread_handle =
      (HANDLE) _beginthreadex (NULL, 100000, writer_thread_func, &l, 0, NULL);
    reader_thread_handle =
      (HANDLE) _beginthreadex (NULL, 100000, reader_thread_func, &l, 0, NULL);
    if (writer_thread_handle == NULL || reader_thread_handle == NULL)
      {
        if (exit_on_error)
          error (EXIT_FAILURE, 0, _("creation of threads failed"));
        if (reader_thread_handle != NULL)
          CloseHandle (reader_thread_handle);
        if (writer_thread_handle != NULL)
          CloseHandle (writer_thread_handle);
        goto fail;
      }
    writer_cleaned_up = false;
    reader_cleaned_up = false;
    for (;;)
      {
        DWORD ret;

        /* Here !(writer_cleaned_up && reader_cleaned_up).  */
        if (writer_cleaned_up)
          ret = WaitForSingleObject (reader_thread_handle, INFINITE);
        else if (reader_cleaned_up)
          ret = WaitForSingleObject (writer_thread_handle, INFINITE);
        else
          ret = WaitForMultipleObjects (2, handles, FALSE, INFINITE);
        if (!(ret == WAIT_OBJECT_0 + 0 || ret == WAIT_OBJECT_0 + 1))
          abort ();

        if (l.writer_terminated)
          {
            /* The writer thread has just terminated.  */
            l.writer_terminated = false;
            CloseHandle (writer_thread_handle);
            if (l.writer_final_errno)
              {
                if (exit_on_error)
                  error (EXIT_FAILURE, l.writer_final_errno,
                         _("write to %s subprocess failed"), progname);
                if (!reader_cleaned_up)
                  {
                    TerminateThread (reader_thread_handle, 1);
                    CloseHandle (reader_thread_handle);
                  }
                goto fail;
              }
            /* Tell the child there is nothing more the parent will send.  */
            close (fd[1]);
            writer_cleaned_up = true;
          }
        if (l.reader_terminated)
          {
            /* The reader thread has just terminated.  */
            l.reader_terminated = false;
            CloseHandle (reader_thread_handle);
            if (l.reader_final_errno)
              {
                if (exit_on_error)
                  error (EXIT_FAILURE, l.reader_final_errno,
                         _("read from %s subprocess failed"), progname);
                if (!writer_cleaned_up)
                  {
                    TerminateThread (writer_thread_handle, 1);
                    CloseHandle (writer_thread_handle);
                  }
                goto fail;
              }
            reader_cleaned_up = true;
          }
        if (writer_cleaned_up && reader_cleaned_up)
          break;
      }
  }
#else
  {
    bool done_writing;

    /* Enable non-blocking I/O.  This permits the read() and write() calls
       to return -1/EAGAIN without blocking; this is important for polling
       if HAVE_SELECT is not defined.  It also permits the read() and write()
       calls to return after partial reads/writes; this is important if
       HAVE_SELECT is defined, because select() only says that some data
       can be read or written, not how many.  Without non-blocking I/O,
       Linux 2.2.17 and BSD systems prefer to block instead of returning
       with partial results.  */
    {
      int fcntl_flags;

      if ((fcntl_flags = fcntl (fd[1], F_GETFL, 0)) < 0
          || fcntl (fd[1], F_SETFL, fcntl_flags | O_NONBLOCK) < 0
          || (fcntl_flags = fcntl (fd[0], F_GETFL, 0)) < 0
          || fcntl (fd[0], F_SETFL, fcntl_flags | O_NONBLOCK) < 0)
        {
          if (exit_on_error)
            error (EXIT_FAILURE, errno,
                   _("cannot set up nonblocking I/O to %s subprocess"),
                   progname);
          goto fail;
        }
    }

    done_writing = false;
    for (;;)
      {
# if HAVE_SELECT
        int n;
        fd_set readfds;
        fd_set writefds;

        FD_ZERO (&readfds);
        FD_SET (fd[0], &readfds);
        n = fd[0] + 1;
        if (!done_writing)
          {
            FD_ZERO (&writefds);
            FD_SET (fd[1], &writefds);
            if (n <= fd[1])
              n = fd[1] + 1;
          }

        n = select (n, &readfds, (!done_writing ? &writefds : NULL), NULL,
                    NULL);
        if (n < 0)
          {
            if (exit_on_error)
              error (EXIT_FAILURE, errno,
                     _("communication with %s subprocess failed"), progname);
            goto fail;
          }
        if (!done_writing && FD_ISSET (fd[1], &writefds))
          goto try_write;
        if (FD_ISSET (fd[0], &readfds))
          goto try_read;
        /* How could select() return if none of the two descriptors is ready?  
*/
        abort ();
# endif

        /* Attempt to write.  */
# if HAVE_SELECT
      try_write:
# endif
        if (!done_writing)
          {
            size_t bufsize;
            const void *buf = prepare_write (&bufsize, private_data);
            if (buf != NULL)
              {
                ssize_t nwritten;

                if (bufsize > SSIZE_MAX)
                  bufsize = SSIZE_MAX;
                nwritten = write (fd[1], buf, bufsize);
                if (nwritten < 0 && !IS_EAGAIN (errno))
                  {
                    if (exit_on_error)
                      error (EXIT_FAILURE, errno,
                             _("write to %s subprocess failed"), progname);
                    goto fail;
                  }
                if (nwritten > 0)
                  done_write ((void *) buf, nwritten, private_data);
              }
            else
              {
                /* Tell the child there is nothing more the parent will send.  
*/
                close (fd[1]);
                done_writing = true;
              }
          }
# if HAVE_SELECT
        continue;
# endif

        /* Attempt to read.  */
# if HAVE_SELECT
      try_read:
# endif
        {
          size_t bufsize;
          void *buf = prepare_read (&bufsize, private_data);
          if (!(buf != NULL && bufsize > 0))
            /* prepare_read returned wrong values.  */
            abort ();
          {
            ssize_t nread = read (fd[0], buf, bufsize);
            if (nread < 0 && !IS_EAGAIN (errno))
              {
                if (exit_on_error)
                  error (EXIT_FAILURE, errno,
                         _("read from %s subprocess failed"), progname);
                goto fail;
              }
            if (nread > 0)
              done_read (buf, nread, private_data);
            if (nread == 0 && done_writing)
              break;
          }
        }
# if HAVE_SELECT
        continue;
# endif
      }
  }
#endif

  close (fd[0]);

  /* Remove zombie process from process list.  */
  {
    int exitstatus =
      wait_subprocess (child, progname, false, null_stderr,
                       true, exit_on_error, NULL);
    if (exitstatus != 0 && exit_on_error)
      error (EXIT_FAILURE, 0, _("%s subprocess terminated with exit code %d"),
             progname, exitstatus);
    return exitstatus;
  }

 fail:
  {
    int saved_errno = errno;
    close (fd[1]);
    close (fd[0]);
    wait_subprocess (child, progname, true, true, true, false, NULL);
    errno = saved_errno;
    return -1;
  }
}
============================= modules/pipe-filter =============================
Description:
Filtering of data through a subprocess.

Files:
lib/pipe-filter.h
lib/pipe-filter.c

Depends-on:
pipe
wait-process
error
exit
gettext-h
stdbool
stdint
sys_select
unistd

configure.ac:
AC_REQUIRE([AC_C_INLINE])
AC_CHECK_FUNCS([select])

Makefile.am:
lib_SOURCES += pipe-filter.c

Include:
"pipe-filter.h"

License:
GPL

Maintainer:
Bruno Haible

===============================================================================




reply via email to

[Prev in Thread] Current Thread [Next in Thread]