/* Synchronous writing, asynchronous reading of pipes connected to a subprocess. Copyright (C) 2009 Free Software Foundation, Inc. Written by Paolo Bonzini , 2009. This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see . */ #include #include "pipe-filter.h" #include #include #include #include #include #include #include #include #include "error.h" #include "gettext.h" #include "pipe.h" #include "wait-process.h" #include "xalloc.h" #define _(str) gettext (str) #ifndef SSIZE_MAX # define SSIZE_MAX ((ssize_t) (SIZE_MAX / 2)) #endif /* We use a child process, and communicate through a bidirectional pipe. To avoid deadlocks, let the child process decide when it wants to write, and let the parent read accordingly. The parent uses select() to know whether it must write or read. On platforms without select(), we use non-blocking I/O. (This means the parent is busy looping while waiting for the child. Not good. But hardly any platform lacks select() nowadays.) */ /* On BeOS select() works only on sockets, not on normal file descriptors. */ #ifdef __BEOS__ # undef HAVE_SELECT #endif #ifdef EINTR /* EINTR handling for close(), read(), write(), select(). These functions can return -1/EINTR even though we don't have any signal handlers set up, namely when we get interrupted via SIGSTOP. */ static inline int nonintr_close (int fd) { int retval; do retval = close (fd); while (retval < 0 && errno == EINTR); return retval; } #undef close /* avoid warning related to gnulib module unistd */ #define close nonintr_close static inline ssize_t nonintr_read (int fd, void *buf, size_t count) { ssize_t retval; do retval = read (fd, buf, count); while (retval < 0 && errno == EINTR); return retval; } #define read nonintr_read static inline ssize_t nonintr_write (int fd, const void *buf, size_t count) { ssize_t retval; do retval = write (fd, buf, count); while (retval < 0 && errno == EINTR); return retval; } #undef write /* avoid warning on VMS */ #define write nonintr_write # if HAVE_SELECT static inline int nonintr_select (int n, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout) { int retval; do retval = select (n, readfds, writefds, exceptfds, timeout); while (retval < 0 && errno == EINTR); return retval; } # undef select /* avoid warning on VMS */ # define select nonintr_select # endif #endif /* Non-blocking I/O. */ #ifndef O_NONBLOCK # define O_NONBLOCK O_NDELAY #endif #if HAVE_SELECT # define IS_EAGAIN(errcode) 0 #else # ifdef EWOULDBLOCK # define IS_EAGAIN(errcode) ((errcode) == EAGAIN || (errcode) == EWOULDBLOCK) # else # define IS_EAGAIN(errcode) ((errcode) == EAGAIN) # endif #endif struct filter { pid_t child; const char *progname; bool null_stderr; bool exit_on_error; char *read_buf; size_t read_bufsize; done_read_fn done_read; void *private_data; int fd[2]; int exit_code; volatile bool reader_terminated; volatile bool writer_terminated; #if (defined _WIN32 || defined __WIN32__) && ! defined __CYGWIN__ volatile int writer_final_errno; volatile int reader_final_errno; const char *write_buf; size_t write_bufsize; HANDLE hReader, hWriter; /* thread synchronization yadda yadda */ #else fd_set readfds, writefds; #endif }; #if (defined _WIN32 || defined __WIN32__) && ! defined __CYGWIN__ static unsigned int WINAPI writer_thread_func (void *thread_arg) { /* writing yadda yadda */ } static unsigned int WINAPI reader_thread_func (void *thread_arg) { /* reading yadda yadda */ } static int filter_init (struct filter *f) { f->writer_final_errno = 0; f->reader_final_errno = 0; f->write_buf = NULL; f->write_bufsize = 0; /* create synchronization objects yadda yadda */ f->hWriter = (HANDLE) _beginthreadex (NULL, 100000, writer_thread_func, f, 0, NULL); f->hReader = (HANDLE) _beginthreadex (NULL, 100000, reader_thread_func, f, 0, NULL); if (f->hWriter == NULL || f->hReader == NULL) { if (f->exit_on_error) error (EXIT_FAILURE, 0, _("creation of threads failed")); return -1; } else return 0; } static int filter_loop (struct filter *f, const char *buf, size_t size) { /* wake up threads yadda yadda */ if (f->writer_final_errno || f->reader_final_errno) { errno = (f->writer_final_errno ? f->writer_final_errno : f->reader_final_errno); return -1; } else return 0; } static void filter_cleanup (struct filter *f, bool try_io) { TerminateThread (f->hWriter, 1); close (f->fd[1]); f->writer_terminated = true; /* clean up synchronization objects yadda yadda */ f->reader_terminated = true; close (f->fd[0]); } #else static int filter_init (struct filter *f) { /* Enable non-blocking I/O. This permits the read() and write() calls to return -1/EAGAIN without blocking; this is important for polling if HAVE_SELECT is not defined. It also permits the read() and write() calls to return after partial reads/writes; this is important if HAVE_SELECT is defined, because select() only says that some data can be read or written, not how many. Without non-blocking I/O, Linux 2.2.17 and BSD systems prefer to block instead of returning with partial results. */ int fcntl_flags; if ((fcntl_flags = fcntl (f->fd[1], F_GETFL, 0)) < 0 || fcntl (f->fd[1], F_SETFL, fcntl_flags | O_NONBLOCK) < 0 || (fcntl_flags = fcntl (f->fd[0], F_GETFL, 0)) < 0 || fcntl (f->fd[0], F_SETFL, fcntl_flags | O_NONBLOCK) < 0) { if (f->exit_on_error) error (EXIT_FAILURE, errno, _("cannot set up nonblocking I/O to %s subprocess"), f->progname); return -1; } FD_ZERO (&f->readfds); FD_ZERO (&f->writefds); return 0; } static int filter_loop (struct filter *f, const char *buf, size_t bufsize) { static struct timeval tv0; for (;;) { #if HAVE_SELECT int n = f->fd[0] > f->fd[1] ? f->fd[0] + 1 : f->fd[1] + 1; if (!f->reader_terminated) FD_SET (f->fd[0], &f->readfds); if (!f->writer_terminated) FD_SET (f->fd[1], &f->writefds); n = select (n, &f->readfds, (bufsize ? &f->writefds : NULL), NULL, (!f->writer_terminated && !bufsize ? &tv0 : NULL)); if (n == 0) break; if (n < 0) { if (f->exit_on_error) error (EXIT_FAILURE, errno, _("communication with %s subprocess failed"), f->progname); f->writer_terminated = true; return -1; } if (bufsize && FD_ISSET (f->fd[1], &f->writefds)) goto try_write; if (FD_ISSET (f->fd[0], &f->readfds)) goto try_read; break; #endif /* Attempt to write. */ #if HAVE_SELECT try_write: #endif if (bufsize) { ssize_t nwritten = write (f->fd[1], buf, bufsize > SSIZE_MAX ? SSIZE_MAX : bufsize); if (nwritten < 0) { if (IS_EAGAIN (errno)) continue; if (f->exit_on_error) error (EXIT_FAILURE, errno, _("write to %s subprocess failed"), f->progname); f->writer_terminated = true; return -1; } else { bufsize -= nwritten; buf += nwritten; } } #if HAVE_SELECT continue; #endif /* Attempt to read. */ #if HAVE_SELECT try_read: #endif { ssize_t nread = read (f->fd[0], f->read_buf, f->read_bufsize); if (nread < 0) { if (IS_EAGAIN (errno)) continue; if (f->exit_on_error) error (EXIT_FAILURE, errno, _("read from %s subprocess failed"), f->progname); } if (nread <= 0) { f->reader_terminated = true; return 0; } else f->done_read (f->read_buf, nread, f->private_data); } #if HAVE_SELECT continue; #endif } } static void filter_cleanup (struct filter *f, bool try_io) { close (f->fd[1]); f->writer_terminated = true; if (try_io && !f->reader_terminated) filter_loop (f, NULL, 0); f->reader_terminated = true; close (f->fd[0]); } #endif static void filter_terminate (struct filter *f) { if (f->exit_code == -1) { filter_cleanup (f, !f->reader_terminated && !f->writer_terminated); f->exit_code = wait_subprocess (f->child, f->progname, true, f->null_stderr, true, f->exit_on_error, NULL); if (f->exit_on_error && f->exit_code) error (EXIT_FAILURE, 0, _("subprocess %s failed (exit status %d)"), f->progname, f->exit_code); } } /* Create a subprocess and pipe some data through it. progname is the program name used in error messages. prog_path is the file name of the program to invoke. prog_argv is a NULL terminated argument list, starting with prog_path as first element. If null_stderr is true, the subprocess' stderr will be redirected to /dev/null, and the usual error message to stderr will be omitted. This is suitable when the subprocess does not fulfill an important task. If exit_on_error is true, any error will cause the main process to exit with an error status. If the subprocess does not start correctly, exit if exit_on_error is true, otherwise return NULL and set errno. The caller will write to the subprocess through filter_write; during calls to filter_write, the done_read function may be called to process any data that the subprocess has written. done_read will receive at most read_bufsize bytes stored into buf, as well as a copy of private_data. */ struct filter * filter_create (const char *progname, const char *prog_path, const char **prog_argv, bool null_stderr, bool exit_on_error, char *read_buf, size_t read_bufsize, done_read_fn done_read, void *private_data) { struct filter *f = xmalloc (sizeof (struct filter)); pid_t child; int fd[2]; /* Open a bidirectional pipe to a subprocess. */ f->child = create_pipe_bidi (progname, prog_path, (char **) prog_argv, null_stderr, true, exit_on_error, fd); f->progname = progname; f->null_stderr = null_stderr; f->exit_on_error = exit_on_error; f->exit_code = -1; f->read_buf = read_buf; f->read_bufsize = read_bufsize; f->done_read = done_read; f->private_data = private_data; f->reader_terminated = false; f->writer_terminated = false; f->fd[0] = fd[0]; f->fd[1] = fd[1]; if (filter_init (f) < 0) filter_terminate (f); return f; } /* Write size bytes starting at buf into the pipe and in the meanwhile possibly call the done_read function specified in create_filter. The done_read function may be called in a different thread than the current thread, depending on the platform. However, it will always be called before filter_write has returned (or else will be delayed to the next call to filter_write or filter_close). Return only after all the entire buffer has been written to the pipe. If the subprocess exits early with zero status, subsequent writes will becomes no-ops and zero is returned. If there is a problem reading or writing, return -1 and set errno. If the subprocess exits early with nonzero status, return the status. (In either case, filter_write will instead exit if exit_on_error was passed as true). Otherwise return 0. */ int filter_write (struct filter *f, const char *buf, size_t size) { int rc, save_errno; assert (buf); if (f->exit_code != -1) return f->exit_code; if (!size) return 0; rc = filter_loop (f, buf, size); if (!f->reader_terminated && !f->writer_terminated) return 0; save_errno = errno; filter_terminate (f); errno = save_errno; return (rc < 0 ? rc : f->exit_code); } /* Finish reading the output via the done_read function specified in create_filter. The done_read function may be called in a different thread than. However, it will always be called before filter_close has returned. The write side of the pipe is closed as soon as filter_close starts, while the read side will be closed just before it finishes. If there is a problem reading or closing the pipe, return -1 and set errno. If the subprocess exits early with nonzero status, return the status. (In either case, filter_close will instead exit if exit_on_error was passed as true). Otherwise return 0. */ int filter_close (struct filter *f) { int rc, save_errno; filter_terminate (f); rc = f->exit_code; save_errno = errno; free (f); errno = save_errno; return rc; }