[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
new module pipe-filter
From: |
Bruno Haible |
Subject: |
new module pipe-filter |
Date: |
Mon, 20 Jul 2009 01:22:56 +0200 |
User-agent: |
KMail/1.9.9 |
Eric Blake wrote:
> > Such piping, where you write from the current process and read into
> > the current process, may hang on BSD systems, because some data is
> > present in system buffers but the system wants the buffers to be full
> > before it continues. A fix for this hang is to enable non-blocking I/O
> > and use a loop with select() that alternately reads and writes. See
> > <http://git.savannah.gnu.org/gitweb/?p=gettext.git;a=blob;f=gettext-tools/src/msgfilter.c;hb=HEAD#l580>
>
> Any chance of porting that over to gnulib?
I'm adding a new module 'pipe-filter', that does this.
> At any rate, my understanding
> is that bison's use of a subpipe is to call m4, which, thanks to the way
> diversions are used, produces no output until after all the input has been
> consumed. Thus, bison's usage pattern is immune to this particular
> deadlock.
Still, this does not sound very future-proof.
> But you are correct that to avoid deadlock in a generic filter
> child application, portable applications cannot write more than PIPE_MAX
> bytes without checking whether the read end needs draining, and ...
> avoid problems with partial buffers.
Yes, these are the two possible deadlock types: when both processes are
writing to each other and the pipe buffers in the OS are full, or when
both processes are reading from each other and some data is stuck in
buffers and is not being returned because either an fflush is missing or
a partial read has been disallowed. No generic solution can be found
against the missing fflush, but for the other two cases this modules handles
it.
2009-07-19 Bruno Haible <address@hidden>
New module 'pipe-filter'.
* lib/pipe-filter.h: New file.
* lib/pipe-filter.c: New file.
* modules/pipe-filter: New file.
============================== lib/pipe-filter.h ==============================
/* Filtering of data through a subprocess.
Copyright (C) 2009 Free Software Foundation, Inc.
Written by Bruno Haible <address@hidden>, 2009.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>. */
#ifndef _PIPE_FILTER_H
#define _PIPE_FILTER_H
#include <stdbool.h>
#include <stddef.h>
#ifdef __cplusplus
extern "C" {
#endif
/* Piping data through a subprocess in the naïve way - write data to the
subprocess and read from the subprocess when you expect it to have
produced results - is subject to two kinds of deadlocks:
1) If you write more than PIPE_MAX bytes or, more generally, if you write
more bytes than the subprocess can handle at once, the subprocess
may write its data and wait on you to read it, but you are currently
busy writing.
2) When you don't know ahead of time how many bytes the subprocess
will produce, the usual technique of calling read (fd, buf, BUFSIZ)
with a fixed BUFSIZ will, on Linux 2.2.17 and on BSD systems, cause
the read() call to block until *all* of the buffer has been filled.
But the subprocess cannot produce more data until you gave it more
input. But you are currently busy reading from it.
This module provides a function that pipes data through the subprocess,
without risking these deadlocks. */
typedef const void * (*prepare_write_fn) (size_t *num_bytes_p,
void *private_data);
typedef void (*done_write_fn) (void *data_written, size_t num_bytes_written,
void *private_data);
typedef void * (*prepare_read_fn) (size_t *num_bytes_p,
void *private_data);
typedef void (*done_read_fn) (void *data_read, size_t num_bytes_read,
void *private_data);
/* Create a subprocess and pipe some data through it.
Arguments:
- progname is the program name used in error messages.
- prog_path is the file name of the program to invoke.
- prog_argv is a NULL terminated argument list, starting with prog_path as
first element.
- If null_stderr is true, the subprocess' stderr will be redirected to
/dev/null, and the usual error message to stderr will be omitted.
This is suitable when the subprocess does not fulfill an important task.
- If exit_on_error is true, any error will cause the main process to exit
with an error status.
If the subprocess does not terminate correctly, exit if exit_on_error is
true, otherwise return 127.
Data is alternatingly written to the subprocess, through the functions
prepare_write and done_write, and read from the subprocess, through the
functions prepare_read and done_read.
Callback arguments:
- prepare_write (&num_bytes, p) must either return a pointer to data that
is ready to be written and set num_bytes to the number of bytes ready to
be written, or return NULL when no more bytes are to be written.
- done_write (data_written, num_bytes_written) is called after
num_bytes_written bytes were written. It is guaranteed that
num_bytes_written > 0.
- prepare_read (&num_bytes, p) must return a pointer to a buffer for data
that can be read and set num_bytes to the size of that buffer
(must be > 0).
- done_read (data_read, num_bytes_read, p) is called after num_bytes_read
bytes were read into the buffer.
Here p is always the private_data argument.
Note that the prepare_write/done_write functions and the
prepare_read/done_read functions may be called in different threads than
the current thread (depending on platform). But they will not be called
after the pipe_through_subprocess function has returned.
Return 0 upon success, or (only if exit_on_error is false):
- -1 with errno set upon failure,
- the positive exit code of the subprocess if that failed. */
extern int pipe_through_subprocess (const char *progname,
const char *prog_path, char **prog_argv,
bool null_stderr, bool exit_on_error,
prepare_write_fn prepare_write,
done_write_fn done_write,
prepare_read_fn prepare_read,
done_read_fn done_read,
void *private_data);
#ifdef __cplusplus
}
#endif
#endif /* _PIPE_FILTER_H */
============================== lib/pipe-filter.c ==============================
/* Filtering of data through a subprocess.
Copyright (C) 2001-2003, 2008-2009 Free Software Foundation, Inc.
Written by Bruno Haible <address@hidden>, 2009.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>. */
#include <config.h>
#include "pipe-filter.h"
#include <errno.h>
#include <fcntl.h>
#include <stdbool.h>
#include <stdint.h>
#include <stdlib.h>
#include <sys/select.h>
#include <unistd.h>
#include "error.h"
#include "gettext.h"
#include "pipe.h"
#include "wait-process.h"
#define _(str) gettext (str)
#ifndef SSIZE_MAX
# define SSIZE_MAX ((ssize_t) (SIZE_MAX / 2))
#endif
/* We use a child process, and communicate through a bidirectional pipe.
To avoid deadlocks, let the child process decide when it wants to read
or to write, and let the parent behave accordingly. The parent uses
select() to know whether it must write or read. On platforms without
select(), we use non-blocking I/O. (This means the parent is busy
looping while waiting for the child. Not good. But hardly any platform
lacks select() nowadays.) */
/* On BeOS select() works only on sockets, not on normal file descriptors. */
#ifdef __BEOS__
# undef HAVE_SELECT
#endif
#ifdef EINTR
/* EINTR handling for close(), read(), write(), select().
These functions can return -1/EINTR even though we don't have any
signal handlers set up, namely when we get interrupted via SIGSTOP. */
static inline int
nonintr_close (int fd)
{
int retval;
do
retval = close (fd);
while (retval < 0 && errno == EINTR);
return retval;
}
#undef close /* avoid warning related to gnulib module unistd */
#define close nonintr_close
static inline ssize_t
nonintr_read (int fd, void *buf, size_t count)
{
ssize_t retval;
do
retval = read (fd, buf, count);
while (retval < 0 && errno == EINTR);
return retval;
}
#define read nonintr_read
static inline ssize_t
nonintr_write (int fd, const void *buf, size_t count)
{
ssize_t retval;
do
retval = write (fd, buf, count);
while (retval < 0 && errno == EINTR);
return retval;
}
#undef write /* avoid warning on VMS */
#define write nonintr_write
# if HAVE_SELECT
static inline int
nonintr_select (int n, fd_set *readfds, fd_set *writefds, fd_set *exceptfds,
struct timeval *timeout)
{
int retval;
do
retval = select (n, readfds, writefds, exceptfds, timeout);
while (retval < 0 && errno == EINTR);
return retval;
}
# undef select /* avoid warning on VMS */
# define select nonintr_select
# endif
#endif
/* Non-blocking I/O. */
#ifndef O_NONBLOCK
# define O_NONBLOCK O_NDELAY
#endif
#if HAVE_SELECT
# define IS_EAGAIN(errcode) 0
#else
# ifdef EWOULDBLOCK
# define IS_EAGAIN(errcode) ((errcode) == EAGAIN || (errcode) == EWOULDBLOCK)
# else
# define IS_EAGAIN(errcode) ((errcode) == EAGAIN)
# endif
#endif
#if (defined _WIN32 || defined __WIN32__) && ! defined __CYGWIN__
struct locals
{
prepare_write_fn prepare_write;
done_write_fn done_write;
prepare_read_fn prepare_read;
done_read_fn done_read;
void *private_data;
int fd[2];
volatile bool writer_terminated;
volatile int writer_final_errno;
volatile bool reader_terminated;
volatile int reader_final_errno;
};
static unsigned int WINAPI
writer_thread_func (void *thread_arg)
{
struct locals *l = (struct locals *) thread_arg;
for (;;)
{
size_t bufsize;
const void *buf = l->prepare_write (&bufsize, l->private_data);
if (buf != NULL)
{
ssize_t nwritten;
if (bufsize > SSIZE_MAX)
bufsize = SSIZE_MAX;
nwritten = write (l->fd[1], buf, bufsize);
if (nwritten < 0)
{
l->writer_final_errno = errno;
break;
}
if (nwritten > 0)
l->done_write ((void *) buf, nwritten, l->private_data);
}
else
break;
}
l->writer_terminated = true;
_endthreadex (0); /* calls ExitThread (0) */
abort ();
}
static unsigned int WINAPI
reader_thread_func (void *thread_arg)
{
struct locals *l = (struct locals *) thread_arg;
for (;;)
{
size_t bufsize;
void *buf = l->prepare_read (&bufsize, l->private_data);
if (!(buf != NULL && bufsize > 0))
/* prepare_read returned wrong values. */
abort ();
if (bufsize > SSIZE_MAX)
bufsize = SSIZE_MAX;
{
ssize_t nread = read (l->fd[0], buf, bufsize);
if (nread < 0)
{
l->reader_final_errno = errno;
break;
}
if (nread > 0)
l->done_read (buf, nread, l->private_data);
else
break;
}
}
l->reader_terminated = true;
_endthreadex (0); /* calls ExitThread (0) */
abort ();
}
#endif
int
pipe_through_subprocess (const char *progname,
const char *prog_path, char **prog_argv,
bool null_stderr, bool exit_on_error,
prepare_write_fn prepare_write,
done_write_fn done_write,
prepare_read_fn prepare_read,
done_read_fn done_read,
void *private_data)
{
pid_t child;
int fd[2];
/* Open a bidirectional pipe to a subprocess. */
child = create_pipe_bidi (progname, prog_path, prog_argv,
null_stderr, true, exit_on_error,
fd);
#if (defined _WIN32 || defined __WIN32__) && ! defined __CYGWIN__
/* Native Woe32 API. */
/* Pipes have a non-blocking mode, see function SetNamedPipeHandleState and
the article "Named Pipe Type, Read, and Wait Modes", but Microsoft's
documentation discourages its use. So don't use it.
Asynchronous I/O is also not suitable because it notifies the caller only
about completion of the I/O request, not about intermediate progress.
So do the writing and the reading in separate threads. */
{
struct locals l;
HANDLE handles[2];
#define writer_thread_handle handles[0]
#define reader_thread_handle handles[1]
bool writer_cleaned_up;
bool reader_cleaned_up;
l.prepare_write = prepare_write;
l.done_write = done_write;
l.prepare_read = prepare_read;
l.done_read = done_read;
l.private_data = private_data;
l.fd[0] = fd[0];
l.fd[1] = fd[1];
l.writer_terminated = false;
l.writer_final_errno = 0;
l.reader_terminated = false;
l.reader_final_errno = 0;
writer_thread_handle =
(HANDLE) _beginthreadex (NULL, 100000, writer_thread_func, &l, 0, NULL);
reader_thread_handle =
(HANDLE) _beginthreadex (NULL, 100000, reader_thread_func, &l, 0, NULL);
if (writer_thread_handle == NULL || reader_thread_handle == NULL)
{
if (exit_on_error)
error (EXIT_FAILURE, 0, _("creation of threads failed"));
if (reader_thread_handle != NULL)
CloseHandle (reader_thread_handle);
if (writer_thread_handle != NULL)
CloseHandle (writer_thread_handle);
goto fail;
}
writer_cleaned_up = false;
reader_cleaned_up = false;
for (;;)
{
DWORD ret;
/* Here !(writer_cleaned_up && reader_cleaned_up). */
if (writer_cleaned_up)
ret = WaitForSingleObject (reader_thread_handle, INFINITE);
else if (reader_cleaned_up)
ret = WaitForSingleObject (writer_thread_handle, INFINITE);
else
ret = WaitForMultipleObjects (2, handles, FALSE, INFINITE);
if (!(ret == WAIT_OBJECT_0 + 0 || ret == WAIT_OBJECT_0 + 1))
abort ();
if (l.writer_terminated)
{
/* The writer thread has just terminated. */
l.writer_terminated = false;
CloseHandle (writer_thread_handle);
if (l.writer_final_errno)
{
if (exit_on_error)
error (EXIT_FAILURE, l.writer_final_errno,
_("write to %s subprocess failed"), progname);
if (!reader_cleaned_up)
{
TerminateThread (reader_thread_handle, 1);
CloseHandle (reader_thread_handle);
}
goto fail;
}
/* Tell the child there is nothing more the parent will send. */
close (fd[1]);
writer_cleaned_up = true;
}
if (l.reader_terminated)
{
/* The reader thread has just terminated. */
l.reader_terminated = false;
CloseHandle (reader_thread_handle);
if (l.reader_final_errno)
{
if (exit_on_error)
error (EXIT_FAILURE, l.reader_final_errno,
_("read from %s subprocess failed"), progname);
if (!writer_cleaned_up)
{
TerminateThread (writer_thread_handle, 1);
CloseHandle (writer_thread_handle);
}
goto fail;
}
reader_cleaned_up = true;
}
if (writer_cleaned_up && reader_cleaned_up)
break;
}
}
#else
{
bool done_writing;
/* Enable non-blocking I/O. This permits the read() and write() calls
to return -1/EAGAIN without blocking; this is important for polling
if HAVE_SELECT is not defined. It also permits the read() and write()
calls to return after partial reads/writes; this is important if
HAVE_SELECT is defined, because select() only says that some data
can be read or written, not how many. Without non-blocking I/O,
Linux 2.2.17 and BSD systems prefer to block instead of returning
with partial results. */
{
int fcntl_flags;
if ((fcntl_flags = fcntl (fd[1], F_GETFL, 0)) < 0
|| fcntl (fd[1], F_SETFL, fcntl_flags | O_NONBLOCK) < 0
|| (fcntl_flags = fcntl (fd[0], F_GETFL, 0)) < 0
|| fcntl (fd[0], F_SETFL, fcntl_flags | O_NONBLOCK) < 0)
{
if (exit_on_error)
error (EXIT_FAILURE, errno,
_("cannot set up nonblocking I/O to %s subprocess"),
progname);
goto fail;
}
}
done_writing = false;
for (;;)
{
# if HAVE_SELECT
int n;
fd_set readfds;
fd_set writefds;
FD_ZERO (&readfds);
FD_SET (fd[0], &readfds);
n = fd[0] + 1;
if (!done_writing)
{
FD_ZERO (&writefds);
FD_SET (fd[1], &writefds);
if (n <= fd[1])
n = fd[1] + 1;
}
n = select (n, &readfds, (!done_writing ? &writefds : NULL), NULL,
NULL);
if (n < 0)
{
if (exit_on_error)
error (EXIT_FAILURE, errno,
_("communication with %s subprocess failed"), progname);
goto fail;
}
if (!done_writing && FD_ISSET (fd[1], &writefds))
goto try_write;
if (FD_ISSET (fd[0], &readfds))
goto try_read;
/* How could select() return if none of the two descriptors is ready?
*/
abort ();
# endif
/* Attempt to write. */
# if HAVE_SELECT
try_write:
# endif
if (!done_writing)
{
size_t bufsize;
const void *buf = prepare_write (&bufsize, private_data);
if (buf != NULL)
{
ssize_t nwritten;
if (bufsize > SSIZE_MAX)
bufsize = SSIZE_MAX;
nwritten = write (fd[1], buf, bufsize);
if (nwritten < 0 && !IS_EAGAIN (errno))
{
if (exit_on_error)
error (EXIT_FAILURE, errno,
_("write to %s subprocess failed"), progname);
goto fail;
}
if (nwritten > 0)
done_write ((void *) buf, nwritten, private_data);
}
else
{
/* Tell the child there is nothing more the parent will send.
*/
close (fd[1]);
done_writing = true;
}
}
# if HAVE_SELECT
continue;
# endif
/* Attempt to read. */
# if HAVE_SELECT
try_read:
# endif
{
size_t bufsize;
void *buf = prepare_read (&bufsize, private_data);
if (!(buf != NULL && bufsize > 0))
/* prepare_read returned wrong values. */
abort ();
{
ssize_t nread = read (fd[0], buf, bufsize);
if (nread < 0 && !IS_EAGAIN (errno))
{
if (exit_on_error)
error (EXIT_FAILURE, errno,
_("read from %s subprocess failed"), progname);
goto fail;
}
if (nread > 0)
done_read (buf, nread, private_data);
if (nread == 0 && done_writing)
break;
}
}
# if HAVE_SELECT
continue;
# endif
}
}
#endif
close (fd[0]);
/* Remove zombie process from process list. */
{
int exitstatus =
wait_subprocess (child, progname, false, null_stderr,
true, exit_on_error, NULL);
if (exitstatus != 0 && exit_on_error)
error (EXIT_FAILURE, 0, _("%s subprocess terminated with exit code %d"),
progname, exitstatus);
return exitstatus;
}
fail:
{
int saved_errno = errno;
close (fd[1]);
close (fd[0]);
wait_subprocess (child, progname, true, true, true, false, NULL);
errno = saved_errno;
return -1;
}
}
============================= modules/pipe-filter =============================
Description:
Filtering of data through a subprocess.
Files:
lib/pipe-filter.h
lib/pipe-filter.c
Depends-on:
pipe
wait-process
error
exit
gettext-h
stdbool
stdint
sys_select
unistd
configure.ac:
AC_REQUIRE([AC_C_INLINE])
AC_CHECK_FUNCS([select])
Makefile.am:
lib_SOURCES += pipe-filter.c
Include:
"pipe-filter.h"
License:
GPL
Maintainer:
Bruno Haible
===============================================================================
- Re: Help with create_pipe_bidi, (continued)
- Re: Help with create_pipe_bidi, Bruno Haible, 2009/07/19
- Re: Help with create_pipe_bidi, Eric Blake, 2009/07/20
- Re: Help with create_pipe_bidi, Bruno Haible, 2009/07/20
- Re: Help with create_pipe_bidi, Eric Blake, 2009/07/20
- Re: dup2 on mingw, Bruno Haible, 2009/07/20
- Re: dup2 on mingw, Eric Blake, 2009/07/21
- Re: dup2 on mingw, Eric Blake, 2009/07/21
- Re: dup2 on mingw, Eric Blake, 2009/07/21
Re: Help with create_pipe_bidi, Bruno Haible, 2009/07/18
- Re: Help with create_pipe_bidi, Eric Blake, 2009/07/18
- Re: new module pipe-filter, Paolo Bonzini, 2009/07/21
- Re: new module pipe-filter, Bruno Haible, 2009/07/21
- Re: new module pipe-filter, Paolo Bonzini, 2009/07/21
- Re: new module pipe-filter, Paolo Bonzini, 2009/07/22
- Re: new module pipe-filter, Bruce Korb, 2009/07/31