coreutils
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [PATCH] tee: Add --pipe-check to allow instantly detecting closed ou


From: Pádraig Brady
Subject: Re: [PATCH] tee: Add --pipe-check to allow instantly detecting closed outputs
Date: Sat, 19 Nov 2022 19:15:46 +0000
User-agent: Mozilla/5.0 (X11; Linux x86_64; rv:102.0) Gecko/20100101 Thunderbird/102.0

On 19/11/2022 08:23, Arsen Arsenović wrote:
This flag comes in useful in scripts in which tee is used to duplicate
output over process substitutions, so that dying processes and no stdin
won't lead to a hang.

Without this option, a command like ``tee >(compiler1) >(compiler2) |
compiler3'' would hang forever if stdin was a terminal and the compilers
didn't expect any data, but since tee -P would do polling, the compilers
dying could mean that tee terminates.

        * src/tee.c (long_options): Add -P, --pipe-check.
        (main): Parse out -P, --pipe-check.
        (usage): Add -P, --pipe-check.
        (tee_files): Enable polling inputs and outputs to drive the tee,
        rather than exclusively using read and write blocking to control
        data flow.
        * doc/coreutils.texi (tee invocation): Document -P, --pipecheck.
---
Hi there,

While working on some compiler hacks, I found it necessary to copy data in
parallel to multiple compilers to compare their results.  Currently, with
``tee'', that is only mostly possible, since tee always expects to read some
data on stdin before it can see whether it can terminate.  This breaks unless
stdin is /dev/null, since stdin would never get EOF or some data to wake tee up
and have it terminate.

The polling implementation provided below will wait on and remove terminating
outputs from the list of files ``tee'' watches when an error on them occurs,
except in the special case of stdin, which will never be handled by the poll
loop, and only serves to break out of the poll in the case of new data.

I couldn't figure out a decent test to write for this, the solutions I came up
all relied on PIPESTATUS, which I wasn't sure is permitted in tests, since they
all share a #!/bin/sh bang, but if that's allowed, I could add a test like the
following in:

   sleep 11 | timeout 10 tee -P | true
   [[ "${PIPESTATUS[1]}" -eq 0 ]] || fail=1

Note that the above does take 10-11 seconds in all cases, but the test needs
some long-enough-lived program that does not write data, and sleep fits the
bill.  The timeout could probably get knocked down a good bit.

Thanks in advance, have a great day.

Tested on x86_64-pc-linux-gnu.

  NEWS               |   3 ++
  doc/coreutils.texi |  11 +++++
  src/tee.c          | 103 ++++++++++++++++++++++++++++++++++++++++++---
  3 files changed, 110 insertions(+), 7 deletions(-)

diff --git a/NEWS b/NEWS
index b6b5201..3a6bbfe 100644
--- a/NEWS
+++ b/NEWS
@@ -62,6 +62,9 @@ GNU coreutils NEWS                                    -*- 
outline -*-
    wc now accepts the --total={auto,never,always,only} option
    to give explicit control over when the total is output.
+ tee now accepts the --pipe-check flag, to enable polling input and output
+  file descriptors, rather than only relying on stdin for notifications.
+
  ** Improvements
date --debug now diagnoses if multiple --date or --set options are
diff --git a/doc/coreutils.texi b/doc/coreutils.texi
index fca7f69..c372e2e 100644
--- a/doc/coreutils.texi
+++ b/doc/coreutils.texi
@@ -14075,6 +14075,17 @@ them.
  @opindex --ignore-interrupts
  Ignore interrupt signals.
+@item -P
+@itemx --pipe-check
+@opindex -P
+@opindex --pipe-check
+
+Polls file descriptors instead of just waiting on standard input, to
+allow dying pipes to be detected instantly, rather than waiting for
+standard input to write some data first.  This is especially useful to
+permit process substitutions to notify @command{tee} of completion, so
+that it stops waiting for input data when all outputs are closed.
+
  @item -p
  @itemx --output-error[=@var{mode}]
  @opindex -p
diff --git a/src/tee.c b/src/tee.c
index 971b768..25ab5b5 100644
--- a/src/tee.c
+++ b/src/tee.c
@@ -20,6 +20,7 @@
  #include <sys/types.h>
  #include <signal.h>
  #include <getopt.h>
+#include <poll.h>
#include "system.h"
  #include "argmatch.h"
@@ -37,7 +38,7 @@
    proper_name ("Richard M. Stallman"), \
    proper_name ("David MacKenzie")
-static bool tee_files (int nfiles, char **files);
+static bool tee_files (int nfiles, char **files, bool pipecheck);
/* If true, append to output files rather than truncating them. */
  static bool append;
@@ -61,6 +62,7 @@ static struct option const long_options[] =
    {"append", no_argument, NULL, 'a'},
    {"ignore-interrupts", no_argument, NULL, 'i'},
    {"output-error", optional_argument, NULL, 'p'},
+  {"pipe-check", no_argument, NULL, 'P'},
    {GETOPT_HELP_OPTION_DECL},
    {GETOPT_VERSION_OPTION_DECL},
    {NULL, 0, NULL, 0}
@@ -89,6 +91,7 @@ usage (int status)
  Copy standard input to each FILE, and also to standard output.\n\
  \n\
    -a, --append              append to the given FILEs, do not overwrite\n\
+  -P, --pipe-check          polls before reading, to detect closed pipes\n\
    -i, --ignore-interrupts   ignore interrupt signals\n\
  "), stdout);
        fputs (_("\
@@ -118,6 +121,7 @@ int
  main (int argc, char **argv)
  {
    bool ok;
+  bool pipecheck = false;
    int optc;
initialize_main (&argc, &argv);
@@ -131,7 +135,7 @@ main (int argc, char **argv)
    append = false;
    ignore_interrupts = false;
- while ((optc = getopt_long (argc, argv, "aip", long_options, NULL)) != -1)
+  while ((optc = getopt_long (argc, argv, "aiPp", long_options, NULL)) != -1)
      {
        switch (optc)
          {
@@ -143,6 +147,10 @@ main (int argc, char **argv)
            ignore_interrupts = true;
            break;
+ case 'P':
+          pipecheck = true;
+          break;
+
          case 'p':
            if (optarg)
              output_error = XARGMATCH ("--output-error", optarg,
@@ -169,22 +177,24 @@ main (int argc, char **argv)
    /* Do *not* warn if tee is given no file arguments.
       POSIX requires that it work when given no arguments.  */
- ok = tee_files (argc - optind, &argv[optind]);
+  ok = tee_files (argc - optind, &argv[optind], pipecheck);
    if (close (STDIN_FILENO) != 0)
      die (EXIT_FAILURE, errno, "%s", _("standard input"));
return ok ? EXIT_SUCCESS : EXIT_FAILURE;
  }
-/* Copy the standard input into each of the NFILES files in FILES
-   and into the standard output.  As a side effect, modify FILES[-1].
-   Return true if successful.  */
+/* Copy the standard input into each of the NFILES files in FILES and into the
+   standard output.  If PIPECHECK is true, polls files to ensure writability,
+   to exit, for instance, if process substitutions terminate.  As a side
+   effect, modify FILES[-1].  Return true if successful.  */
static bool
-tee_files (int nfiles, char **files)
+tee_files (int nfiles, char **files, bool pipecheck)
  {
    size_t n_outputs = 0;
    FILE **descriptors;
+  struct pollfd *pollfds = NULL;
    char buffer[BUFSIZ];
    ssize_t bytes_read = 0;
    int i;
@@ -226,8 +236,81 @@ tee_files (int nfiles, char **files)
          }
      }
+ if (pipecheck)
+    {
+      /* Allocate a poll struct for each output file, and stdin, to detect when
+         pipes are closed.  stdin needs to be watched to be able to stop
+         waiting and start reading.  */
+      pollfds = xnmalloc (nfiles + 2, sizeof *pollfds);
+      pollfds[0] = (struct pollfd) {
+        .fd = fileno(stdin),
+        .events = POLLIN | POLLPRI,
+      };
+
+      /* Convert all FILEs to pollfds.  */
+      for (i = 0; i <= nfiles; i++)
+        {
+          pollfds[i + 1] = (struct pollfd) {
+            .fd = fileno(descriptors[i]),
+            .events = POLLPRI,
+          };
+        }
+    }
+
    while (n_outputs)
      {
+      if (pollfds)
+        {
+          /* Time for the actual poll.  The pollfds array matches the layout of
+             the descriptors table, except shifted forward by one, to make room
+             for stdin.  */
+          bool found = false;
+          short revents;
+          int ret = poll (pollfds, nfiles + 2, -1);
+          if (ret < 0)
+            {
+              if (errno == EINTR)
+                continue;
+
+              error(1, errno, "poll");
+            }
+
+          /* Ok, now that we're done with the chores, let's see if any of the
+             members failed.  */
+          for (i = 0; i < nfiles + 2 && ret > 0; i++)
+            {
+              revents = pollfds[i].revents;
+
+              /* If 0, no events happened on this FD.  */
+              if (!revents)
+                continue;
+
+              /* Okay, we found one more.  */
+              ret--;
+
+              /* stdin is okay, we just have it polled to terminate poll () on
+                 data.  stdin is always 0 in our mapping above.  */
+              if (i == 0)
+                continue;
+
+              /* We want to be careful not to underflow this array when stdin
+                 closes before any outputs.  */
+              if (revents & POLLERR)
+                {
+                  /* OK, pipecheck did it's job, and found a failed pipe.
+                     Remove this file.  */
+                  pollfds[i].fd = -1;
+                  n_outputs--;
+                  fclose(descriptors[i - 1]);
+                  descriptors[i - 1] = NULL;
+                  found = true;
+                }
+            }
+
+          /* If we found any events, we want to repoll.  */
+          if (found)
+            continue;
+        }
        bytes_read = read (STDIN_FILENO, buffer, sizeof buffer);
        if (bytes_read < 0 && errno == EINTR)
          continue;
@@ -252,6 +335,12 @@ tee_files (int nfiles, char **files)
                         w_errno, "%s", quotef (files[i]));
                }
              descriptors[i] = NULL;
+
+            /* Since the state is uglily duplicated, we have to keep both lists
+               clear of file descriptors we're done with.  A negative fd value
+               will simply set revents to zero.  */
+            if (pollfds)
+              pollfds[i + 1].fd = -1;
              if (fail)
                ok = false;
              n_outputs--;

Thanks a lot for the patch.

There is similar functionality in tail.c (which should probably be reused
if we do decide to implement this in tee as it's tricky to do portably).
Anyway the tail(1) case makes sense considering:

  tail -f file.log | grep -q trigger && process_immediately

So a similar use case with tee might be:

  tee -p <file.in >(grep -q trigger_1) | grep -q trigger_2 && 
process_immediately

However tee wouldn't be waiting for more input in that case.
It would either consume the whole file, or exit when processing it.

So a more appropriate case is:

  intermittent_output |
  tee -p >(grep -q trigger_1) | grep -q trigger_2 && process_immediately

Where intermittent_output is stdin, the use case is a bit contrived
as any input will cause tee to exit.
The general more general non stdin case above has some merit,
though not as common as the tail example I think.

I'll think a bit more about it.

thanks!
Pádraig



reply via email to

[Prev in Thread] Current Thread [Next in Thread]