pspp-dev
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [patch 07/19] casereader and casewriter implementation


From: John Darrington
Subject: Re: [patch 07/19] casereader and casewriter implementation
Date: Thu, 7 Jun 2007 08:27:38 +0800
User-agent: Mutt/1.5.13 (2006-08-11)

I'm not sure that there might not be some constness issues here.
But we can fix these as and when they manifest themselves.  So let's
commit this now.

On Tue, Jun 05, 2007 at 11:27:34PM -0700, address@hidden wrote:
     Casereaders and casewriters are the basis of the new data processing
     implementation.  A casereader is a uniform interface to reading cases
     from a data source; a casewriter is a uniform interface to writing
     cases to a data sink.  This patch adds the infrastructure for each.
     
     Index: merge/src/data/casereader.c
     ===================================================================
     --- /dev/null      1970-01-01 00:00:00.000000000 +0000
     +++ merge/src/data/casereader.c    2007-06-05 21:26:32.000000000 -0700
     @@ -0,0 +1,605 @@
     +/* PSPP - computes sample statistics.
     +   Copyright (C) 2007 Free Software Foundation, Inc.
     +
     +   This program is free software; you can redistribute it and/or
     +   modify it under the terms of the GNU General Public License as
     +   published by the Free Software Foundation; either version 2 of the
     +   License, or (at your option) any later version.
     +
     +   This program is distributed in the hope that it will be useful, but
     +   WITHOUT ANY WARRANTY; without even the implied warranty of
     +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
     +   General Public License for more details.
     +
     +   You should have received a copy of the GNU General Public License
     +   along with this program; if not, write to the Free Software
     +   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
     +   02110-1301, USA. */
     +
     +#include <config.h>
     +
     +#include <data/casereader.h>
     +#include <data/casereader-provider.h>
     +
     +#include <stdlib.h>
     +
     +#include <data/casewindow.h>
     +#include <data/casewriter.h>
     +#include <data/settings.h>
     +#include <libpspp/assertion.h>
     +#include <libpspp/heap.h>
     +#include <libpspp/taint.h>
     +
     +#include "xalloc.h"
     +
     +/* A casereader. */
     +struct casereader 
     +  {
     +    struct taint *taint;                  /* Corrupted? */
     +    size_t value_cnt;                     /* Values per case. */
     +    casenumber case_cnt;                  /* Number of cases,
     +                                             CASENUMBER_MAX if unknown. */
     +    const struct casereader_class *class; /* Class. */
     +    void *aux;                            /* Auxiliary data for class. */
     +  };
     +
     +static void insert_shim (struct casereader *);
     +
     +/* Creates a new case in C and reads the next case from READER
     +   into it.  The caller owns C and must destroy C when its data
     +   is no longer needed.  Return true if successful, false when
     +   cases have been exhausted or upon detection of an I/O error.
     +   In the latter case, C is set to the null case.
     +
     +   The case returned is effectively consumed: it can never be
     +   read again through READER.  If this is inconvenient, READER
     +   may be cloned in advance with casereader_clone, or
     +   casereader_peek may be used instead. */
     +bool
     +casereader_read (struct casereader *reader, struct ccase *c) 
     +{
     +  if (reader->case_cnt != 0 && reader->class->read (reader, reader->aux, 
c)) 
     +    {
     +      assert (case_get_value_cnt (c) == reader->value_cnt);
     +      if (reader->case_cnt != CASENUMBER_MAX)
     +        reader->case_cnt--;
     +      return true; 
     +    }
     +  else 
     +    {
     +      reader->case_cnt = 0;
     +      case_nullify (c);
     +      return false; 
     +    }
     +}
     +
     +/* Destroys READER.
     +   Returns false if an I/O error was detected on READER, true
     +   otherwise. */
     +bool
     +casereader_destroy (struct casereader *reader) 
     +{
     +  bool ok = true;
     +  if (reader != NULL) 
     +    {
     +      reader->class->destroy (reader, reader->aux);
     +      ok = taint_destroy (reader->taint);
     +      free (reader);
     +    }
     +  return ok;
     +}
     +
     +/* Returns a clone of READER.  READER and its clone may be used
     +   to read the same sequence of cases in the same order, barring
     +   I/O errors. */
     +struct casereader *
     +casereader_clone (const struct casereader *reader_) 
     +{
     +  struct casereader *reader = (struct casereader *) reader_;
     +  struct casereader *clone;
     +
     +  if (reader->class->clone == NULL)
     +    insert_shim (reader);
     +  clone = reader->class->clone (reader, reader->aux);
     +  assert (clone != NULL);
     +  assert (clone != reader);
     +  return clone;
     +}
     +
     +/* Makes a copy of ORIGINAL into *NEW1 (if NEW1 is non-null) and
     +   *NEW2 (if NEW2 is non-null), then destroys ORIGINAL. */
     +void
     +casereader_split (struct casereader *original,
     +                  struct casereader **new1, struct casereader **new2) 
     +{
     +  if (new1 != NULL && new2 != NULL) 
     +    {
     +      *new1 = casereader_rename (original);
     +      *new2 = casereader_clone (*new1);
     +    }
     +  else if (new1 != NULL)
     +    *new1 = casereader_rename (original);
     +  else if (new2 != NULL)
     +    *new2 = casereader_rename (original);
     +  else
     +    casereader_destroy (original);
     +}
     +
     +/* Returns a copy of READER, which is itself destroyed.
     +   Useful for taking over ownership of a casereader, to enforce
     +   preventing the original owner from accessing the casereader
     +   again. */
     +struct casereader *
     +casereader_rename (struct casereader *reader) 
     +{
     +  struct casereader *new = xmemdup (reader, sizeof *reader);
     +  free (reader);
     +  return new;
     +}
     +
     +/* Exchanges the casereaders referred to by A and B. */
     +void
     +casereader_swap (struct casereader *a, struct casereader *b) 
     +{
     +  if (a != b) 
     +    {
     +      struct casereader tmp = *a;
     +      *a = *b;
     +      *b = tmp;
     +    }
     +}
     +
     +/* Creates a new case in C and reads the (IDX + 1)'th case from
     +   READER into it.  The caller owns C and must destroy C when its
     +   data is no longer needed.  Return true if successful, false
     +   when cases have been exhausted or upon detection of an I/O
     +   error.  In the latter case, C is set to the null case. */
     +bool
     +casereader_peek (struct casereader *reader, casenumber idx, struct ccase 
*c)
     +{
     +  if (idx < reader->case_cnt)
     +    {
     +      if (reader->class->peek == NULL)
     +        insert_shim (reader);
     +      if (reader->class->peek (reader, reader->aux, idx, c))
     +        return true;
     +      else if (casereader_error (reader)) 
     +        reader->case_cnt = 0;
     +    }
     +  if (reader->case_cnt > idx)
     +    reader->case_cnt = idx;
     +  case_nullify (c);
     +  return false;
     +}
     +
     +/* Returns true if an I/O error or another hard error has
     +   occurred on READER, a clone of READER, or on some object on
     +   which READER's data has a dependency, false otherwise. */
     +bool
     +casereader_error (const struct casereader *reader) 
     +{
     +  return taint_is_tainted (reader->taint);
     +}
     +
     +/* Marks READER as having encountered an error.
     +
     +   Ordinarily, this function should be called by the
     +   implementation of a casereader, not by the casereader's
     +   client.  Instead, casereader clients should usually ensure
     +   that a casereader's error state is correct by using
     +   taint_propagate to propagate to the casereader's taint
     +   structure, which may be obtained via casereader_get_taint. */
     +void
     +casereader_force_error (struct casereader *reader) 
     +{
     +  taint_set_taint (reader->taint);
     +}
     +
     +/* Returns READER's associate taint object, for use with
     +   taint_propagate and other taint functions. */
     +const struct taint *
     +casereader_get_taint (const struct casereader *reader) 
     +{
     +  return reader->taint;
     +}
     +
     +/* Returns the number of cases that will be read by successive
     +   calls to casereader_read for READER, assuming that no errors
     +   occur.  Upon an error condition, the case count drops to 0, so
     +   that no more cases can be obtained.
     +
     +   Not all casereaders can predict the number of cases that they
     +   will produce without actually reading all of them.  In that
     +   case, this function returns CASENUMBER_MAX.  To obtain the
     +   actual number of cases in such a casereader, use
     +   casereader_count_cases. */
     +casenumber
     +casereader_get_case_cnt (struct casereader *reader) 
     +{
     +  return reader->case_cnt;
     +}
     +
     +/* Returns the number of cases that will be read by successive
     +   calls to casereader_read for READER, assuming that no errors
     +   occur.  Upon an error condition, the case count drops to 0, so
     +   that no more cases can be obtained.
     +
     +   For a casereader that cannot predict the number of cases it
     +   will produce, this function actually reads (and discards) all
     +   of the contents of a clone of READER.  Thus, the return value
     +   is always correct in the absence of I/O errors. */
     +casenumber
     +casereader_count_cases (struct casereader *reader)
     +{
     +  if (reader->case_cnt == CASENUMBER_MAX)
     +    {
     +      casenumber n_cases = 0;
     +      struct ccase c;
     +
     +      struct casereader *clone = casereader_clone (reader);
     +
     +      for (; casereader_read (clone, &c); case_destroy (&c))
     +        n_cases++;
     +
     +      casereader_destroy (clone);
     +      reader->case_cnt = n_cases;
     +    }
     +
     +  return reader->case_cnt;
     +}
     +
     +/* Returns the number of struct values in each case in READER. */
     +size_t
     +casereader_get_value_cnt (struct casereader *reader) 
     +{
     +  return reader->value_cnt;
     +}
     +
     +/* Copies all the cases in READER to WRITER, propagating errors
     +   appropriately. */
     +void
     +casereader_transfer (struct casereader *reader, struct casewriter *writer)
     +{
     +  struct ccase c;
     +
     +  taint_propagate (casereader_get_taint (reader),
     +                   casewriter_get_taint (writer));
     +  while (casereader_read (reader, &c))
     +    casewriter_write (writer, &c);
     +  casereader_destroy (reader);
     +}
     +
     +/* Creates and returns a new casereader.  This function is
     +   intended for use by casereader implementations, not by
     +   casereader clients.
     +
     +   This function is most suited for creating a casereader for a
     +   data source that is naturally sequential.
     +   casereader_create_random may be more appropriate for a data
     +   source that supports random access.
     +
     +   Ordinarily, specify a null pointer for TAINT, in which case
     +   the new casereader will have a new, unique taint object.  If
     +   the new casereader should have a clone of an existing taint
     +   object, specify that object as TAINT.  (This is most commonly
     +   useful in an implementation of the "clone" casereader_class
     +   function, in which case the cloned casereader should have the
     +   same taint object as the original casereader.)
     +
     +   VALUE_CNT must be the number of struct values per case read
     +   from the casereader.
     +
     +   CASE_CNT is an upper limit on the number of cases that
     +   casereader_read will return from the casereader in successive
     +   calls.  Ordinarily, this is the actual number of cases in the
     +   data source or CASENUMBER_MAX if the number of cases cannot be
     +   predicted in advance.
     +
     +   CLASS and AUX are a set of casereader implementation-specific
     +   member functions and auxiliary data to pass to those member
     +   functions, respectively. */
     +struct casereader *
     +casereader_create_sequential (const struct taint *taint,
     +                              size_t value_cnt, casenumber case_cnt,
     +                              const struct casereader_class *class, void 
*aux) 
     +{
     +  struct casereader *reader = xmalloc (sizeof *reader);
     +  reader->taint = taint != NULL ? taint_clone (taint) : taint_create ();
     +  reader->value_cnt = value_cnt;
     +  reader->case_cnt = case_cnt;
     +  reader->class = class;
     +  reader->aux = aux;
     +  return reader;
     +}
     +
     +/* Random-access casereader implementation.
     +
     +   This is a set of wrappers around casereader_create_sequential
     +   and struct casereader_class to make it easy to create
     +   efficient casereaders for data sources that natively support
     +   random access. */
     +
     +/* One clone of a random reader. */
     +struct random_reader
     +  {
     +    struct random_reader_shared *shared; /* Data shared among clones. */
     +    struct heap_node heap_node; /* Node in shared data's heap of readers. 
*/
     +    casenumber offset;          /* Number of cases already read. */
     +  };
     +
     +/* Returns the random_reader in which the given heap_node is
     +   embedded. */
     +static struct random_reader *
     +random_reader_from_heap_node (const struct heap_node *node) 
     +{
     +  return heap_data (node, struct random_reader, heap_node);
     +}
     +
     +/* Data shared among clones of a random reader. */
     +struct random_reader_shared 
     +  {
     +    struct heap *readers;       /* Heap of struct random_readers. */
     +    casenumber min_offset;      /* Smallest offset of any random_reader. 
*/
     +    const struct casereader_random_class *class;
     +    void *aux;
     +  };
     +
     +static struct casereader_class random_reader_casereader_class;
     +
     +/* Creates and returns a new random_reader with the given SHARED
     +   data and OFFSET.  Inserts the new random reader into the
     +   shared heap. */
     +static struct random_reader *
     +make_random_reader (struct random_reader_shared *shared, casenumber 
offset)
     +{
     +  struct random_reader *br = xmalloc (sizeof *br);
     +  br->offset = offset;
     +  br->shared = shared;
     +  heap_insert (shared->readers, &br->heap_node);
     +  return br;
     +}
     +
     +/* Compares random_readers A and B by offset and returns a
     +   strcmp()-like result. */
     +static int
     +compare_random_readers_by_offset (const struct heap_node *a_,
     +                                  const struct heap_node *b_,
     +                                  const void *aux UNUSED) 
     +{
     +  const struct random_reader *a = random_reader_from_heap_node (a_);
     +  const struct random_reader *b = random_reader_from_heap_node (b_);
     +  return a->offset < b->offset ? -1 : a->offset > b->offset;
     +}
     +
     +/* Creates and returns a new casereader.  This function is
     +   intended for use by casereader implementations, not by
     +   casereader clients.
     +
     +   This function is most suited for creating a casereader for a
     +   data source that supports random access.
     +   casereader_create_sequential is more appropriate for a data
     +   source that is naturally sequential.
     +
     +   VALUE_CNT must be the number of struct values per case read
     +   from the casereader.
     +
     +   CASE_CNT is an upper limit on the number of cases that
     +   casereader_read will return from the casereader in successive
     +   calls.  Ordinarily, this is the actual number of cases in the
     +   data source or CASENUMBER_MAX if the number of cases cannot be
     +   predicted in advance.
     +
     +   CLASS and AUX are a set of casereader implementation-specific
     +   member functions and auxiliary data to pass to those member
     +   functions, respectively. */
     +struct casereader *
     +casereader_create_random (size_t value_cnt, casenumber case_cnt,
     +                          const struct casereader_random_class *class,
     +                          void *aux) 
     +{
     +  struct random_reader_shared *shared = xmalloc (sizeof *shared);
     +  shared->readers = heap_create (compare_random_readers_by_offset, NULL);
     +  shared->class = class;
     +  shared->aux = aux;
     +  shared->min_offset = 0;
     +  return casereader_create_sequential (NULL, value_cnt, case_cnt,
     +                                       &random_reader_casereader_class,
     +                                       make_random_reader (shared, 0));
     +}
     +
     +/* Reassesses the min_offset in SHARED based on the minimum
     +   offset in the heap.   */
     +static void
     +advance_random_reader (struct casereader *reader,
     +                       struct random_reader_shared *shared) 
     +{
     +  casenumber old, new;
     +
     +  old = shared->min_offset;
     +  new = random_reader_from_heap_node (heap_minimum 
(shared->readers))->offset;
     +  assert (new >= old);
     +  if (new > old)
     +    {
     +      shared->min_offset = new;
     +      shared->class->advance (reader, shared->aux, new - old);
     +    }
     +}
     +
     +/* struct casereader_class "read" function for random reader. */
     +static bool
     +random_reader_read (struct casereader *reader, void *br_, struct ccase *c)
     +{
     +  struct random_reader *br = br_;
     +  struct random_reader_shared *shared = br->shared;
     +
     +  if (shared->class->read (reader, shared->aux,
     +                           br->offset - shared->min_offset, c)) 
     +    {
     +      br->offset++;
     +      heap_changed (shared->readers, &br->heap_node);
     +      advance_random_reader (reader, shared); 
     +      return true;
     +    }
     +  else
     +    return false; 
     +}
     +
     +/* struct casereader_class "destroy" function for random
     +   reader. */
     +static void
     +random_reader_destroy (struct casereader *reader, void *br_) 
     +{
     +  struct random_reader *br = br_;
     +  struct random_reader_shared *shared = br->shared;
     +
     +  heap_delete (shared->readers, &br->heap_node);
     +  if (heap_is_empty (shared->readers)) 
     +    {
     +      heap_destroy (shared->readers);
     +      shared->class->destroy (reader, shared->aux);
     +      free (shared);
     +    }
     +  else
     +    advance_random_reader (reader, shared);
     +
     +  free (br);
     +}
     +
     +/* struct casereader_class "clone" function for random reader. */
     +static struct casereader *
     +random_reader_clone (struct casereader *reader, void *br_) 
     +{
     +  struct random_reader *br = br_;
     +  struct random_reader_shared *shared = br->shared;
     +  return casereader_create_sequential (casereader_get_taint (reader),
     +                                       casereader_get_value_cnt (reader),
     +                                       casereader_get_case_cnt (reader),
     +                                       &random_reader_casereader_class,
     +                                       make_random_reader (shared,
     +                                                           br->offset));
     +}
     +
     +/* struct casereader_class "peek" function for random reader. */
     +static bool
     +random_reader_peek (struct casereader *reader, void *br_,
     +                    casenumber idx, struct ccase *c) 
     +{
     +  struct random_reader *br = br_;
     +  struct random_reader_shared *shared = br->shared;
     +
     +  return shared->class->read (reader, shared->aux,
     +                              br->offset - shared->min_offset + idx, c);
     +}
     +
     +/* Casereader class for random reader. */
     +static struct casereader_class random_reader_casereader_class = 
     +  {
     +    random_reader_read,
     +    random_reader_destroy,
     +    random_reader_clone,
     +    random_reader_peek,
     +  };
     +
     +/* Buffering shim for implementing clone and peek operations.
     +
     +   The "clone" and "peek" operations aren't implemented by all
     +   types of casereaders, but we have to expose a uniform
     +   interface anyhow.  We do this by interposing a buffering
     +   casereader on top of the existing casereader on the first call
     +   to "clone" or "peek".  The buffering casereader maintains a
     +   window of cases that spans the positions of the original
     +   casereader and all of its clones (the "clone set"), from the
     +   position of the casereader that has read the fewest cases to
     +   the position of the casereader that has read the most.  
     +
     +   Thus, if all of the casereaders in the clone set are at
     +   approximately the same position, only a few cases are buffered
     +   and there is little inefficiency.  If, on the other hand, one
     +   casereader is not used to read any cases at all, but another
     +   one is used to read all of the cases, the entire contents of
     +   the casereader is copied into the buffer.  This still might
     +   not be so inefficient, given that case data in memory is
     +   shared across multiple identical copies, but in the worst case
     +   the window implementation will write cases to disk instead of
     +   maintaining them in-memory. */
     +
     +/* A buffering shim for a non-clonable or non-peekable
     +   casereader. */
     +struct shim 
     +  {
     +    struct casewindow *window;          /* Window of buffered cases. */
     +    struct casereader *subreader;       /* Subordinate casereader. */
     +  };
     +
     +static struct casereader_random_class shim_class;
     +
     +/* Interposes a buffering shim atop READER. */
     +static void
     +insert_shim (struct casereader *reader) 
     +{
     +  size_t value_cnt = casereader_get_value_cnt (reader);
     +  casenumber case_cnt = casereader_get_case_cnt (reader);
     +  struct shim *b = xmalloc (sizeof *b);
     +  b->window = casewindow_create (value_cnt, get_workspace_cases 
(value_cnt));
     +  b->subreader = casereader_create_random (value_cnt, case_cnt,
     +                                           &shim_class, b);
     +  casereader_swap (reader, b->subreader);
     +  taint_propagate (casewindow_get_taint (b->window),
     +                   casereader_get_taint (reader));
     +  taint_propagate (casereader_get_taint (b->subreader),
     +                   casereader_get_taint (reader));
     +}
     +
     +/* Ensures that B's window contains at least CASE_CNT cases.
     +   Return true if successful, false upon reaching the end of B's
     +   subreader or an I/O error. */
     +static bool
     +prime_buffer (struct shim *b, casenumber case_cnt) 
     +{
     +  while (casewindow_get_case_cnt (b->window) < case_cnt) 
     +    {
     +      struct ccase tmp;
     +      if (!casereader_read (b->subreader, &tmp))
     +        return false;
     +      casewindow_push_head (b->window, &tmp);
     +    }
     +  return true;
     +}
     +
     +/* Reads the case at the given 0-based OFFSET from the front of
     +   the window into C.  Returns true if successful, false if
     +   OFFSET is beyond the end of file or upon I/O error. */
     +static bool
     +shim_read (struct casereader *reader UNUSED, void *b_,
     +           casenumber offset, struct ccase *c) 
     +{
     +  struct shim *b = b_;
     +  return (prime_buffer (b, offset + 1)
     +          && casewindow_get_case (b->window, offset, c));
     +}
     +
     +/* Destroys B. */
     +static void
     +shim_destroy (struct casereader *reader UNUSED, void *b_) 
     +{
     +  struct shim *b = b_;
     +  casewindow_destroy (b->window);
     +  casereader_destroy (b->subreader);
     +  free (b);
     +}
     +
     +/* Discards CNT cases from the front of B's window. */
     +static void
     +shim_advance (struct casereader *reader UNUSED, void *b_, casenumber 
case_cnt)
     +{
     +  struct shim *b = b_;
     +  casewindow_pop_tail (b->window, case_cnt);
     +}
     +
     +/* Class for the buffered reader. */
     +static struct casereader_random_class shim_class = 
     +  {
     +    shim_read,
     +    shim_destroy,
     +    shim_advance,
     +  };
     Index: merge/src/data/casereader.h
     ===================================================================
     --- /dev/null      1970-01-01 00:00:00.000000000 +0000
     +++ merge/src/data/casereader.h    2007-06-05 21:26:32.000000000 -0700
     @@ -0,0 +1,116 @@
     +/* PSPP - computes sample statistics.
     +   Copyright (C) 2007 Free Software Foundation, Inc.
     +
     +   This program is free software; you can redistribute it and/or
     +   modify it under the terms of the GNU General Public License as
     +   published by the Free Software Foundation; either version 2 of the
     +   License, or (at your option) any later version.
     +
     +   This program is distributed in the hope that it will be useful, but
     +   WITHOUT ANY WARRANTY; without even the implied warranty of
     +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
     +   General Public License for more details.
     +
     +   You should have received a copy of the GNU General Public License
     +   along with this program; if not, write to the Free Software
     +   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
     +   02110-1301, USA. */
     +
     +/* Casereader client interface.
     +
     +   A casereader abstracts interfaces through which cases may be
     +   read.  A casereader may be a front-end for a system file, a
     +   portable file, the active file in a data set, or anything else
     +   on which a casereader interface has been overlaid.  Casereader
     +   layering, in which a casereader acts as a filter or translator
     +   on top of another casereader, is also supported.
     +
     +   There is no central interface for obtaining casereaders: a
     +   casereader for reading a system file is obtained from the
     +   system file reading module, and so on.  Once a casereader has
     +   been obtained, by whatever means, the interface to it is
     +   uniform.  The most important functions for casereader usage
     +   are:
     +
     +     - casereader_read: Reads a case from the casereader.  The
     +       case is consumed and cannot be read again.  The caller is
     +       responsible for destroying the case.
     +
     +     - casereader_clone: Makes a copy of a casereader.  May be
     +       used to read one or a set of cases from a casereader
     +       repeatedly.
     +
     +     - casereader_destroy: Destroys a casereader.
     +
     +   Casereaders can encounter error conditions, such as I/O
     +   errors, as they read cases.  Error conditions prevent any more
     +   cases from being read from the casereader.  Error conditions
     +   are reported by casereader_error.  Error condition may be
     +   propagated to or from a casereader with taint_propagate using
     +   the casereader's taint object, which may be obtained with
     +   casereader_get_taint. */
     +
     +#ifndef DATA_CASEREADER_H
     +#define DATA_CASEREADER_H 1
     +
     +#include <libpspp/compiler.h>
     +#include <data/case.h>
     +#include <data/missing-values.h>
     +
     +struct dictionary;
     +struct casereader;
     +struct casewriter;
     +
     +bool casereader_read (struct casereader *, struct ccase *);
     +bool casereader_destroy (struct casereader *);
     +
     +struct casereader *casereader_clone (const struct casereader *);
     +void casereader_split (struct casereader *,
     +                       struct casereader **, struct casereader **);
     +struct casereader *casereader_rename (struct casereader *);
     +void casereader_swap (struct casereader *, struct casereader *);
     +
     +bool casereader_peek (struct casereader *, casenumber, struct ccase *)
     +     WARN_UNUSED_RESULT;
     +
     +bool casereader_error (const struct casereader *);
     +void casereader_force_error (struct casereader *);
     +const struct taint *casereader_get_taint (const struct casereader *);
     +
     +casenumber casereader_get_case_cnt (struct casereader *);
     +casenumber casereader_count_cases (struct casereader *);
     +size_t casereader_get_value_cnt (struct casereader *);
     +
     +void casereader_transfer (struct casereader *, struct casewriter *);
     +
     +struct casereader *
     +casereader_create_filter_func (struct casereader *,
     +                               bool (*include) (const struct ccase *,
     +                                                void *aux),
     +                               bool (*destroy) (void *aux),
     +                               void *aux,
     +                               struct casewriter *exclude);
     +struct casereader *
     +casereader_create_filter_weight (struct casereader *,
     +                                 const struct dictionary *dict,
     +                                 bool *warn_on_invalid,
     +                                 struct casewriter *exclude);
     +struct casereader *
     +casereader_create_filter_missing (struct casereader *,
     +                                  struct variable **vars, size_t var_cnt,
     +                                  enum mv_class,
     +                                  struct casewriter *exclude);
     +
     +struct casereader *
     +casereader_create_counter (struct casereader *, casenumber *counter,
     +                           casenumber initial_value);
     +
     +struct casereader *
     +casereader_create_translator (struct casereader *, size_t 
output_value_cnt,
     +                              void (*translate) (const struct ccase 
*input,
     +                                                 struct ccase *output,
     +                                                 void *aux),
     +                              bool (*destroy) (void *aux),
     +                              void *aux);
     +
     +#endif /* data/casereader.h */
     Index: merge/src/data/casewriter.c
     ===================================================================
     --- /dev/null      1970-01-01 00:00:00.000000000 +0000
     +++ merge/src/data/casewriter.c    2007-06-05 21:27:49.000000000 -0700
     @@ -0,0 +1,287 @@
     +/* PSPP - computes sample statistics.
     +   Copyright (C) 2007 Free Software Foundation, Inc.
     +
     +   This program is free software; you can redistribute it and/or
     +   modify it under the terms of the GNU General Public License as
     +   published by the Free Software Foundation; either version 2 of the
     +   License, or (at your option) any later version.
     +
     +   This program is distributed in the hope that it will be useful, but
     +   WITHOUT ANY WARRANTY; without even the implied warranty of
     +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
     +   General Public License for more details.
     +
     +   You should have received a copy of the GNU General Public License
     +   along with this program; if not, write to the Free Software
     +   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
     +   02110-1301, USA. */
     +
     +#include <config.h>
     +
     +#include <data/casewriter.h>
     +#include <data/casewriter-provider.h>
     +
     +#include <assert.h>
     +#include <stdlib.h>
     +
     +#include <data/casereader.h>
     +#include <data/casereader-provider.h>
     +#include <data/casewindow.h>
     +#include <data/settings.h>
     +#include <libpspp/compiler.h>
     +#include <libpspp/taint.h>
     +
     +#include "xalloc.h"
     +
     +/* A casewriter. */
     +struct casewriter
     +  {
     +    struct taint *taint;
     +    casenumber case_cnt;
     +    const struct casewriter_class *class;
     +    void *aux;
     +  };
     +
     +static struct casewriter *create_casewriter_window (size_t value_cnt,
     +                                                    casenumber 
max_in_core);
     +
     +/* Writes case C to WRITER. */
     +void
     +casewriter_write (struct casewriter *writer, struct ccase *c)
     +{
     +  writer->class->write (writer, writer->aux, c);
     +}
     +
     +/* Destroys WRITER.
     +   Returns true if successful, false if an I/O error was
     +   encountered on WRITER or on some object on which WRITER has a
     +   dependency. */
     +bool
     +casewriter_destroy (struct casewriter *writer) 
     +{
     +  bool ok = true;
     +  if (writer != NULL)
     +    {
     +      writer->class->destroy (writer, writer->aux);
     +      ok = taint_destroy (writer->taint);
     +      free (writer);
     +    }
     +  return ok;
     +}
     +
     +/* Destroys WRITER and in its place returns a casereader that can
     +   be used to read back the data written to WRITER.  WRITER must
     +   not be used again after calling this function, even as an
     +   argument to casewriter_destroy.
     +
     +   Not all casewriters implement this function.  Behavior is
     +   undefined if it is called on one that does not.
     +
     +   If an I/O error was encountered on WRITER or on some object on
     +   which WRITER has a dependency, then the error will be
     +   propagated to the new casereader. */
     +struct casereader *
     +casewriter_make_reader (struct casewriter *writer)
     +{
     +  struct casereader *reader;
     +  reader = writer->class->convert_to_reader (writer, writer->aux);
     +  taint_propagate (writer->taint, casereader_get_taint (reader));
     +  taint_destroy (writer->taint);
     +  free (writer);
     +  return reader;
     +}
     +
     +/* Returns a copy of WRITER, which is itself destroyed.
     +   Useful for taking over ownership of a casewriter, to enforce
     +   preventing the original owner from accessing the casewriter
     +   again. */
     +struct casewriter *
     +casewriter_rename (struct casewriter *writer)
     +{
     +  struct casewriter *new = xmemdup (writer, sizeof *writer);
     +  free (writer);
     +  return new;
     +}
     +
     +/* Returns true if an I/O error or another hard error has
     +   occurred on WRITER, a clone of WRITER, or on some object on
     +   which WRITER's data has a dependency, false otherwise. */
     +bool
     +casewriter_error (const struct casewriter *writer) 
     +{
     +  return taint_is_tainted (writer->taint);
     +}
     +
     +/* Marks WRITER as having encountered an error.
     +
     +   Ordinarily, this function should be called by the
     +   implementation of a casewriter, not by the casewriter's
     +   client.  Instead, casewriter clients should usually ensure
     +   that a casewriter's error state is correct by using
     +   taint_propagate to propagate to the casewriter's taint
     +   structure, which may be obtained via casewriter_get_taint. */
     +void
     +casewriter_force_error (struct casewriter *writer) 
     +{
     +  taint_set_taint (writer->taint);
     +}
     +
     +/* Returns WRITER's associate taint object, for use with
     +   taint_propagate and other taint functions. */
     +const struct taint *
     +casewriter_get_taint (const struct casewriter *writer) 
     +{
     +  return writer->taint;
     +}
     +
     +/* Creates and returns a new casewriter with the given CLASS and
     +   auxiliary data AUX. */
     +struct casewriter *
     +casewriter_create (const struct casewriter_class *class, void *aux) 
     +{
     +  struct casewriter *writer = xmalloc (sizeof *writer);
     +  writer->taint = taint_create ();
     +  writer->case_cnt = 0;
     +  writer->class = class;
     +  writer->aux = aux;
     +  return writer;
     +}
     +
     +/* Returns a casewriter for cases with VALUE_CNT struct values
     +   per case.  The cases written to the casewriter will be kept in
     +   memory, unless the amount of memory used grows too large, in
     +   which case they will be written to disk.
     +
     +   A casewriter created with this function may be passed to
     +   casewriter_make_reader. 
     +
     +   This is usually the right kind of casewriter to use. */
     +struct casewriter *
     +autopaging_writer_create (size_t value_cnt) 
     +{
     +  return create_casewriter_window (value_cnt, get_workspace_cases 
(value_cnt));
     +}
     +
     +/* Returns a casewriter for cases with VALUE_CNT struct values
     +   per case.  The cases written to the casewriter will be kept in
     +   memory.
     +
     +   A casewriter created with this function may be passed to
     +   casewriter_make_reader. */
     +struct casewriter *
     +mem_writer_create (size_t value_cnt) 
     +{
     +  return create_casewriter_window (value_cnt, CASENUMBER_MAX);
     +}
     +
     +/* Returns a casewriter for cases with VALUE_CNT struct values
     +   per case.  The cases written to the casewriter will be written
     +   to disk.
     +
     +   A casewriter created with this function may be passed to
     +   casewriter_make_reader. */
     +struct casewriter *
     +tmpfile_writer_create (size_t value_cnt) 
     +{
     +  return create_casewriter_window (value_cnt, 0);
     +}
     +
     +static const struct casewriter_class casewriter_window_class;
     +static const struct casereader_random_class casereader_window_class;
     +
     +/* Creates and returns a new casewriter based on a casewindow.
     +   Each of the casewriter's cases are composed of VALUE_CNT
     +   struct values.  The casewriter's cases will be maintained in
     +   memory until MAX_IN_CORE_CASES have been written, at which
     +   point they will be written to disk. */
     +static struct casewriter *
     +create_casewriter_window (size_t value_cnt, casenumber max_in_core_cases) 
     +{
     +  struct casewindow *window = casewindow_create (value_cnt, 
max_in_core_cases);
     +  struct casewriter *writer = casewriter_create (&casewriter_window_class,
     +                                                 window);
     +  taint_propagate (casewindow_get_taint (window),
     +                   casewriter_get_taint (writer));
     +  return writer;
     +}
     +
     +/* Writes case C to casewindow writer WINDOW. */
     +static void
     +casewriter_window_write (struct casewriter *writer UNUSED, void *window_,
     +                         struct ccase *c) 
     +{
     +  struct casewindow *window = window_;
     +  casewindow_push_head (window, c);
     +}
     +
     +/* Destroys casewindow writer WINDOW. */
     +static void
     +casewriter_window_destroy (struct casewriter *writer UNUSED, void 
*window_)
     +{
     +  struct casewindow *window = window_;
     +  casewindow_destroy (window);
     +}
     +
     +/* Converts casewindow writer WINDOW to a casereader and returns
     +   the casereader. */
     +static struct casereader *
     +casewriter_window_convert_to_reader (struct casewriter *writer UNUSED,
     +                                     void *window_) 
     +{
     +  struct casewindow *window = window_;
     +  struct casereader *reader;
     +  reader = casereader_create_random (casewindow_get_value_cnt (window),
     +                                     casewindow_get_case_cnt (window),
     +                                     &casereader_window_class, window);
     +  taint_propagate (casewindow_get_taint (window),
     +                   casereader_get_taint (reader));
     +  return reader;
     +}
     +
     +/* Reads the case at the given 0-based OFFSET from the front of
     +   WINDOW into C.  Returns true if successful, false if
     +   OFFSET is beyond the end of file or upon I/O error. */
     +static bool
     +casereader_window_read (struct casereader *reader UNUSED, void *window_,
     +                        casenumber offset, struct ccase *c) 
     +{
     +  struct casewindow *window = window_;
     +  if (offset >= casewindow_get_case_cnt (window))
     +    return false;
     +  else
     +    return casewindow_get_case (window, offset, c);
     +}
     +
     +/* Destroys casewindow reader WINDOW. */
     +static void
     +casereader_window_destroy (struct casereader *reader UNUSED, void 
*window_)
     +{
     +  struct casewindow *window = window_;
     +  casewindow_destroy (window);
     +}
     +
     +/* Discards CASE_CNT cases from the front of WINDOW. */
     +static void
     +casereader_window_advance (struct casereader *reader UNUSED, void 
*window_,
     +                           casenumber case_cnt) 
     +{
     +  struct casewindow *window = window_;
     +  casewindow_pop_tail (window, case_cnt);
     +}
     +
     +/* Class for casewindow writer. */
     +static const struct casewriter_class casewriter_window_class = 
     +  {
     +    casewriter_window_write,
     +    casewriter_window_destroy,
     +    casewriter_window_convert_to_reader,
     +  };
     +
     +/* Class for casewindow reader. */
     +static const struct casereader_random_class casereader_window_class = 
     +  {
     +    casereader_window_read,
     +    casereader_window_destroy,
     +    casereader_window_advance,
     +  };
     +
     Index: merge/src/data/casewriter.h
     ===================================================================
     --- /dev/null      1970-01-01 00:00:00.000000000 +0000
     +++ merge/src/data/casewriter.h    2007-06-05 21:26:32.000000000 -0700
     @@ -0,0 +1,52 @@
     +/* PSPP - computes sample statistics.
     +   Copyright (C) 2007 Free Software Foundation, Inc.
     +
     +   This program is free software; you can redistribute it and/or
     +   modify it under the terms of the GNU General Public License as
     +   published by the Free Software Foundation; either version 2 of the
     +   License, or (at your option) any later version.
     +
     +   This program is distributed in the hope that it will be useful, but
     +   WITHOUT ANY WARRANTY; without even the implied warranty of
     +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
     +   General Public License for more details.
     +
     +   You should have received a copy of the GNU General Public License
     +   along with this program; if not, write to the Free Software
     +   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
     +   02110-1301, USA. */
     +
     +#ifndef DATA_CASEWRITER_H
     +#define DATA_CASEWRITER_H 1
     +
     +#include <stdbool.h>
     +#include <stddef.h>
     +#include <data/transformations.h>
     +#include <libpspp/compiler.h>
     +
     +struct casewriter;
     +
     +void casewriter_write (struct casewriter *, struct ccase *);
     +bool casewriter_destroy (struct casewriter *);
     +
     +struct casereader *casewriter_make_reader (struct casewriter *);
     +
     +struct casewriter *casewriter_rename (struct casewriter *);
     +
     +bool casewriter_error (const struct casewriter *);
     +void casewriter_force_error (struct casewriter *);
     +const struct taint *casewriter_get_taint (const struct casewriter *);
     +
     +struct casewriter *mem_writer_create (size_t value_cnt);
     +struct casewriter *tmpfile_writer_create (size_t value_cnt);
     +struct casewriter *autopaging_writer_create (size_t value_cnt);
     +
     +struct casewriter *
     +casewriter_create_translator (struct casewriter *, 
     +                              void (*translate) (const struct ccase 
*input,
     +                                                 struct ccase *output,
     +                                                 void *aux),
     +                              bool (*destroy) (void *aux),
     +                              void *aux);
     +
     +#endif /* data/casewriter.h */
     Index: merge/src/data/automake.mk
     ===================================================================
     --- merge.orig/src/data/automake.mk        2007-06-05 21:26:14.000000000 
-0700
     +++ merge/src/data/automake.mk     2007-06-05 22:14:22.000000000 -0700
     @@ -19,8 +21,17 @@
        src/data/casefile.c \
        src/data/casefile-factory.h \
        src/data/casefile-private.h \
     +  src/data/casereader-filter.c \
     +  src/data/casereader-provider.h \
     +  src/data/casereader-translator.c \
     +  src/data/casereader.c \
     +  src/data/casereader.h \
        src/data/casewindow.c \
        src/data/casewindow.h \
     +  src/data/casewriter-provider.h \
     +  src/data/casewriter-translator.c \
     +  src/data/casewriter.c \
     +  src/data/casewriter.h \
        src/data/fastfile.c \
        src/data/fastfile.h \
        src/data/fastfile-factory.h \
     Index: merge/src/data/casereader-filter.c
     ===================================================================
     --- /dev/null      1970-01-01 00:00:00.000000000 +0000
     +++ merge/src/data/casereader-filter.c     2007-06-05 21:26:32.000000000 
-0700
     @@ -0,0 +1,244 @@
     +/* PSPP - computes sample statistics.
     +   Copyright (C) 2007 Free Software Foundation, Inc.
     +
     +   This program is free software; you can redistribute it and/or
     +   modify it under the terms of the GNU General Public License as
     +   published by the Free Software Foundation; either version 2 of the
     +   License, or (at your option) any later version.
     +
     +   This program is distributed in the hope that it will be useful, but
     +   WITHOUT ANY WARRANTY; without even the implied warranty of
     +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
     +   General Public License for more details.
     +
     +   You should have received a copy of the GNU General Public License
     +   along with this program; if not, write to the Free Software
     +   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
     +   02110-1301, USA. */
     +
     +#include <config.h>
     +
     +#include <data/casereader.h>
     +
     +#include <stdlib.h>
     +
     +#include <data/casereader-provider.h>
     +#include <data/casewriter.h>
     +#include <data/variable.h>
     +#include <data/dictionary.h>
     +#include <libpspp/taint.h>
     +#include <libpspp/message.h>
     +
     +#include "xalloc.h"
     +
     +#include "gettext.h"
     +#define _(msgid) gettext (msgid)
     +
     +struct casereader_filter 
     +  {
     +    struct casereader *subreader;
     +    bool (*include) (const struct ccase *, void *aux);
     +    bool (*destroy) (void *aux);
     +    void *aux;
     +    struct casewriter *exclude;
     +  };
     +
     +static struct casereader_class casereader_filter_class;
     +
     +struct casereader *
     +casereader_create_filter_func (struct casereader *subreader,
     +                               bool (*include) (const struct ccase *,
     +                                                void *aux),
     +                               bool (*destroy) (void *aux),
     +                               void *aux,
     +                               struct casewriter *exclude) 
     +{
     +  struct casereader_filter *filter = xmalloc (sizeof *filter);
     +  struct casereader *reader;
     +  filter->subreader = casereader_rename (subreader);
     +  filter->include = include;
     +  filter->destroy = destroy;
     +  filter->aux = aux;
     +  filter->exclude = exclude;
     +  reader = casereader_create_sequential (
     +    NULL, casereader_get_value_cnt (filter->subreader), CASENUMBER_MAX,
     +    &casereader_filter_class, filter);
     +  taint_propagate (casereader_get_taint (filter->subreader),
     +                   casereader_get_taint (reader));
     +  return reader;
     +}
     +
     +static bool
     +casereader_filter_read (struct casereader *reader UNUSED, void *filter_,
     +                        struct ccase *c) 
     +
     +{
     +  struct casereader_filter *filter = filter_;
     +  for (;;)
     +    {
     +      if (!casereader_read (filter->subreader, c))
     +        return false;
     +      else if (filter->include (c, filter->aux)) 
     +        return true;
     +      else if (filter->exclude != NULL)
     +        casewriter_write (filter->exclude, c);
     +      else
     +        case_destroy (c); 
     +    }
     +}
     +
     +static void
     +casereader_filter_destroy (struct casereader *reader, void *filter_) 
     +{
     +  struct casereader_filter *filter = filter_;
     +  casereader_destroy (filter->subreader);
     +  if (filter->destroy != NULL && !filter->destroy (filter->aux))
     +    casereader_force_error (reader);
     +  free (filter);
     +}
     +
     +static struct casereader_class casereader_filter_class = 
     +  {
     +    casereader_filter_read,
     +    casereader_filter_destroy,
     +
     +    /* We could in fact delegate clone to the subreader, if the
     +       filter function is required to have no memory and if we
     +       added reference counting.  But it might be useful to have
     +       filter functions with memory and in any case this would
     +       require a little extra work. */
     +    NULL,
     +    NULL,
     +  };
     +
     +struct casereader_filter_weight 
     +  {
     +    const struct variable *weight_var;
     +    bool *warn_on_invalid;
     +    bool local_warn_on_invalid;
     +  };
     +
     +static bool
     +casereader_filter_weight_include (const struct ccase *c, void *cfw_) 
     +{
     +  struct casereader_filter_weight *cfw = cfw_;
     +  double value = case_num (c, cfw->weight_var);
     +  if (value >= 0.0 && !var_is_num_missing (cfw->weight_var, value, 
MV_ANY))
     +    return true;
     +  else
     +    {
     +      if (*cfw->warn_on_invalid) 
     +        {
     +    msg (SW, _("At least one case in the data read had a weight value "
     +               "that was user-missing, system-missing, zero, or "
     +               "negative.  These case(s) were ignored."));
     +          *cfw->warn_on_invalid = false;
     +        }
     +      return false;
     +    }
     +}
     +
     +static bool
     +casereader_filter_weight_destroy (void *cfw_) 
     +{
     +  struct casereader_filter_weight *cfw = cfw_;
     +  free (cfw);
     +  return true;
     +}
     +
     +struct casereader *
     +casereader_create_filter_weight (struct casereader *reader,
     +                                 const struct dictionary *dict,
     +                                 bool *warn_on_invalid,
     +                                 struct casewriter *exclude) 
     +{
     +  struct variable *weight_var = dict_get_weight (dict);
     +  if (weight_var != NULL) 
     +    {
     +      struct casereader_filter_weight *cfw = xmalloc (sizeof *cfw);
     +      cfw->weight_var = weight_var;
     +      cfw->warn_on_invalid = (warn_on_invalid
     +                               ? warn_on_invalid
     +                               : &cfw->local_warn_on_invalid);
     +      cfw->local_warn_on_invalid = true;
     +      reader = casereader_create_filter_func (reader,
     +                                              
casereader_filter_weight_include,
     +                                              
casereader_filter_weight_destroy,
     +                                              cfw, exclude);
     +    }
     +  else
     +    reader = casereader_rename (reader);
     +  return reader;
     +}
     +
     +struct casereader_filter_missing 
     +  {
     +    struct variable **vars;
     +    size_t var_cnt;
     +    enum mv_class class;
     +  };
     +
     +static bool
     +casereader_filter_missing_include (const struct ccase *c, void *cfm_) 
     +{
     +  const struct casereader_filter_missing *cfm = cfm_;
     +  size_t i;
     +
     +  for (i = 0; i < cfm->var_cnt; i++)
     +    {
     +      struct variable *var = cfm->vars[i];
     +      const union value *value = case_data (c, var);
     +      if (var_is_value_missing (var, value, cfm->class))
     +        return false;
     +    }
     +  return true;
     +}
     +
     +static bool
     +casereader_filter_missing_destroy (void *cfm_) 
     +{
     +  struct casereader_filter_missing *cfm = cfm_;
     +  free (cfm->vars);
     +  free (cfm);
     +  return true;
     +}
     +
     +struct casereader *
     +casereader_create_filter_missing (struct casereader *reader,
     +                                  struct variable **vars, size_t var_cnt,
     +                                  enum mv_class class,
     +                                  struct casewriter *exclude) 
     +{
     +  if (var_cnt > 0 && class != MV_NEVER) 
     +    {
     +      struct casereader_filter_missing *cfm = xmalloc (sizeof *cfm);
     +      cfm->vars = xmemdup (vars, sizeof *vars * var_cnt);
     +      cfm->var_cnt = var_cnt;
     +      cfm->class = class;
     +      return casereader_create_filter_func (reader,
     +                                            
casereader_filter_missing_include,
     +                                            
casereader_filter_missing_destroy,
     +                                            cfm,
     +                                            exclude);
     +    }
     +  else
     +    return casereader_rename (reader);
     +}
     +
     +
     +static bool
     +casereader_counter_include (const struct ccase *c UNUSED, void *counter_) 
     +{
     +  casenumber *counter = counter_;
     +  ++*counter;
     +  return true;
     +}
     +
     +struct casereader *
     +casereader_create_counter (struct casereader *reader, casenumber *counter,
     +                           casenumber initial_value) 
     +{
     +  *counter = initial_value;
     +  return casereader_create_filter_func (reader, 
casereader_counter_include,
     +                                        NULL, counter, NULL);
     +}
     Index: merge/src/data/casereader-translator.c
     ===================================================================
     --- /dev/null      1970-01-01 00:00:00.000000000 +0000
     +++ merge/src/data/casereader-translator.c 2007-06-05 21:26:32.000000000 
-0700
     @@ -0,0 +1,96 @@
     +/* PSPP - computes sample statistics.
     +   Copyright (C) 2007 Free Software Foundation, Inc.
     +
     +   This program is free software; you can redistribute it and/or
     +   modify it under the terms of the GNU General Public License as
     +   published by the Free Software Foundation; either version 2 of the
     +   License, or (at your option) any later version.
     +
     +   This program is distributed in the hope that it will be useful, but
     +   WITHOUT ANY WARRANTY; without even the implied warranty of
     +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
     +   General Public License for more details.
     +
     +   You should have received a copy of the GNU General Public License
     +   along with this program; if not, write to the Free Software
     +   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
     +   02110-1301, USA. */
     +
     +#include <config.h>
     +
     +#include <data/casereader.h>
     +
     +#include <stdlib.h>
     +
     +#include <data/casereader-provider.h>
     +#include <libpspp/taint.h>
     +
     +#include "xalloc.h"
     +
     +struct casereader_translator
     +  {
     +    struct casereader *subreader;
     +
     +    void (*translate) (const struct ccase *input, struct ccase *output,
     +                       void *aux);
     +    bool (*destroy) (void *aux);
     +    void *aux;
     +  };
     +
     +static struct casereader_class casereader_translator_class;
     +
     +struct casereader *
     +casereader_create_translator (struct casereader *subreader,
     +                              size_t output_value_cnt,
     +                              void (*translate) (const struct ccase 
*input,
     +                                                 struct ccase *output,
     +                                                 void *aux),
     +                              bool (*destroy) (void *aux),
     +                              void *aux) 
     +{
     +  struct casereader_translator *ct = xmalloc (sizeof *ct);
     +  struct casereader *reader;
     +  ct->subreader = casereader_rename (subreader);
     +  ct->translate = translate;
     +  ct->destroy = destroy;
     +  ct->aux = aux;
     +  reader = casereader_create_sequential (
     +    NULL, output_value_cnt, casereader_get_case_cnt (ct->subreader),
     +    &casereader_translator_class, ct);
     +  taint_propagate (casereader_get_taint (ct->subreader),
     +                   casereader_get_taint (reader));
     +  return reader;
     +}
     +
     +static bool
     +casereader_translator_read (struct casereader *reader UNUSED,
     +                            void *ct_, struct ccase *c) 
     +{
     +  struct casereader_translator *ct = ct_;
     +  struct ccase tmp;
     +
     +  if (casereader_read (ct->subreader, &tmp)) 
     +    {
     +      ct->translate (&tmp, c, ct->aux);
     +      return true; 
     +    }
     +  else
     +    return false;
     +}
     +
     +static void
     +casereader_translator_destroy (struct casereader *reader UNUSED, void 
*ct_) 
     +{
     +  struct casereader_translator *ct = ct_;
     +  casereader_destroy (ct->subreader);
     +  ct->destroy (ct->aux);
     +  free (ct);
     +}
     +
     +static struct casereader_class casereader_translator_class = 
     +  {
     +    casereader_translator_read,
     +    casereader_translator_destroy,
     +    NULL,
     +    NULL,
     +  };
     Index: merge/src/data/casewriter-translator.c
     ===================================================================
     --- /dev/null      1970-01-01 00:00:00.000000000 +0000
     +++ merge/src/data/casewriter-translator.c 2007-06-05 21:26:32.000000000 
-0700
     @@ -0,0 +1,98 @@
     +/* PSPP - computes sample statistics.
     +   Copyright (C) 2007 Free Software Foundation, Inc.
     +
     +   This program is free software; you can redistribute it and/or
     +   modify it under the terms of the GNU General Public License as
     +   published by the Free Software Foundation; either version 2 of the
     +   License, or (at your option) any later version.
     +
     +   This program is distributed in the hope that it will be useful, but
     +   WITHOUT ANY WARRANTY; without even the implied warranty of
     +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
     +   General Public License for more details.
     +
     +   You should have received a copy of the GNU General Public License
     +   along with this program; if not, write to the Free Software
     +   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
     +   02110-1301, USA. */
     +
     +#include <config.h>
     +
     +#include <data/casewriter.h>
     +#include <data/casewriter-provider.h>
     +
     +#include <stdlib.h>
     +
     +#include <libpspp/taint.h>
     +
     +#include "xalloc.h"
     +
     +struct casewriter_translator
     +  {
     +    struct casewriter *subwriter;
     +
     +    void (*translate) (const struct ccase *input, struct ccase *output,
     +                       void *aux);
     +    bool (*destroy) (void *aux);
     +    void *aux;
     +  };
     +
     +static struct casewriter_class casewriter_translator_class;
     +
     +struct casewriter *
     +casewriter_create_translator (struct casewriter *subwriter,
     +                              void (*translate) (const struct ccase 
*input,
     +                                                 struct ccase *output,
     +                                                 void *aux),
     +                              bool (*destroy) (void *aux),
     +                              void *aux) 
     +{
     +  struct casewriter_translator *ct = xmalloc (sizeof *ct);
     +  struct casewriter *writer;
     +  ct->subwriter = casewriter_rename (subwriter);
     +  ct->translate = translate;
     +  ct->destroy = destroy;
     +  ct->aux = aux;
     +  writer = casewriter_create (&casewriter_translator_class, ct);
     +  taint_propagate (casewriter_get_taint (ct->subwriter),
     +                   casewriter_get_taint (writer));
     +  return writer;
     +}
     +
     +static void
     +casewriter_translator_write (struct casewriter *writer UNUSED,
     +                             void *ct_, struct ccase *c) 
     +{
     +  struct casewriter_translator *ct = ct_;
     +  struct ccase tmp;
     +
     +  ct->translate (c, &tmp, ct->aux);
     +  casewriter_write (ct->subwriter, &tmp);
     +}
     +
     +static void
     +casewriter_translator_destroy (struct casewriter *writer UNUSED, void 
*ct_) 
     +{
     +  struct casewriter_translator *ct = ct_;
     +  casewriter_destroy (ct->subwriter);
     +  ct->destroy (ct->aux);
     +  free (ct);
     +}
     +
     +static struct casereader *
     +casewriter_translator_convert_to_reader (struct casewriter *writer UNUSED,
     +                                         void *ct_)
     +{
     +  struct casewriter_translator *ct = ct_;
     +  struct casereader *reader = casewriter_make_reader (ct->subwriter);
     +  free (ct);
     +  ct->destroy (ct->aux);
     +  return reader;
     +}
     +
     +static struct casewriter_class casewriter_translator_class = 
     +  {
     +    casewriter_translator_write,
     +    casewriter_translator_destroy,
     +    casewriter_translator_convert_to_reader,
     +  };
     Index: merge/src/data/casereader-provider.h
     ===================================================================
     --- /dev/null      1970-01-01 00:00:00.000000000 +0000
     +++ merge/src/data/casereader-provider.h   2007-06-05 21:26:32.000000000 
-0700
     @@ -0,0 +1,161 @@
     +/* PSPP - computes sample statistics.
     +   Copyright (C) 2007 Free Software Foundation, Inc.
     +
     +   This program is free software; you can redistribute it and/or
     +   modify it under the terms of the GNU General Public License as
     +   published by the Free Software Foundation; either version 2 of the
     +   License, or (at your option) any later version.
     +
     +   This program is distributed in the hope that it will be useful, but
     +   WITHOUT ANY WARRANTY; without even the implied warranty of
     +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
     +   General Public License for more details.
     +
     +   You should have received a copy of the GNU General Public License
     +   along with this program; if not, write to the Free Software
     +   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
     +   02110-1301, USA. */
     +
     +/* Definitions needed to implement a new type of casereader.
     +   Code that only uses casereaders does not need this header.
     +
     +   Two functions to create casereaders are supplied:
     +
     +        - casereader_create_sequential, to create a casereader
     +          for a data source that is naturally sequential.  The
     +          casereader layer will automatically, as needed,
     +          simulate the ability to access cases randomly.
     +
     +        - casereader_create_random, to create a casereader for a
     +          data source that supports random access to cases.  (This
     +          function is in fact implemented as a set of wrappers
     +          around casereader_create_sequential.)
     +
     +   Which function is used has no effect on the set of operations
     +   that may be performed on the resulting casereader, only on how
     +   the casereader is implemented internally. */
     +
     +#ifndef DATA_CASEREADER_PROVIDER_H
     +#define DATA_CASEREADER_PROVIDER_H 1
     +
     +#include <data/casereader.h>
     +
     +/* Casereader class for sequential data sources. */
     +struct casereader_class 
     +  {
     +    /* Mandatory.
     +
     +       Reads the next case from READER into case C, which the
     +       casereader must create and which the client is responsible
     +       for destroying.  If successful, returns true and advances
     +       READER to the next case, so that the next call to this
     +       function will read the next case.  The case just read will
     +       never be read again by a call to this function for READER.
     +
     +       At end of file or upon an I/O error, returns false.  After
     +       false is returned once, this function will not be called
     +       again for the given READER.
     +
     +       If an I/O error occurs, this function should call
     +       casereader_force_error on READER. */
     +    bool (*read) (struct casereader *reader, void *aux, struct ccase *c);
     +
     +    /* Mandatory.
     +
     +       Destroys READER.
     +
     +       If an I/O error is detected during destruction, this
     +       function should call casereader_force_error on READER. */
     +    void (*destroy) (struct casereader *reader, void *aux);
     +
     +    /* Optional: if convenient and efficiently implementable,
     +       supply this function as an optimization for use by
     +       casereader_clone.  (But it might be easier to use the
     +       random-access casereader wrapper instead.)
     +
     +       Creates and returns a clone of READER.  The clone must
     +       read the same case data in the same sequence as READER,
     +       starting from the same position.  The only allowable
     +       exception to this rule is that I/O errors may force the
     +       clone or the original casereader to stop reading after
     +       differing numbers of cases.
     +
     +       The clone should have a clone of READER's taint object,
     +       accomplished by passing casereader_get_taint (READER) to
     +       casereader_create. */
     +    struct casereader *(*clone) (struct casereader *reader, void *aux);
     +
     +    /* Optional: if convenient and efficiently implementable,
     +       supply as an optimization for use by casereader_peek.
     +       (But it might be easier to use the random-access
     +       casereader wrapper instead.)
     +
     +       Reads the case at 0-based offset IDX from the beginning of
     +       READER into case C, which the casereader must create and
     +       which the client is responsible for destroying.
     +
     +       At end of file or upon an I/O error, returns false.  If
     +       this function returns false, then it will never be called
     +       again for an equal or greater value of IDX, and the "read"
     +       member function will never be called to advance as far as
     +       IDX cases further into the casereader.  That is, returning
     +       false indicates that the casereader has fewer than IDX
     +       cases left.
     +
     +       If an I/O error occurs, this function should call
     +       casereader_force_error on READER. */
     +    bool (*peek) (struct casereader *reader, void *aux, casenumber idx,
     +                  struct ccase *c);
     +  };
     +
     +struct casereader *
     +casereader_create_sequential (const struct taint *,
     +                              size_t value_cnt, casenumber case_cnt,
     +                              const struct casereader_class *, void *);
     +
     +/* Casereader class for random-access data sources. */
     +struct casereader_random_class
     +  {
     +    /* Mandatory.
     +
     +       Reads the case at 0-based offset IDX from the beginning of
     +       READER into case C, which the casereader must create and
     +       which the client is responsible for destroying.
     +
     +       At end of file or upon an I/O error, returns false.  If
     +       this function returns false, then it will never be called
     +       again for an equal or greater value of IDX, and the "read"
     +       member function will never be called to advance as far as
     +       IDX cases further into the casereader.  That is, returning
     +       false indicates that the casereader has fewer than IDX
     +       cases.
     +
     +       If an I/O error occurs, this function should call
     +       casereader_force_error on READER. */
     +    bool (*read) (struct casereader *reader, void *aux, casenumber idx,
     +                  struct ccase *c);
     +
     +    /* Mandatory.
     +
     +       Destroys READER.
     +
     +       If an I/O error is detected during destruction, this
     +       function should call casereader_force_error on READER. */
     +    void (*destroy) (struct casereader *reader, void *aux);
     +
     +    /* Mandatory.
     +       
     +       A call to this function tells the callee that the CNT
     +       cases at the beginning of READER will never be read again.
     +       The casereader implementation should free any resources
     +       associated with those cases.  After this function returns,
     +       the IDX argument in future calls to the "read" function
     +       will be relative to remaining cases. */
     +    void (*advance) (struct casereader *reader, void *aux, casenumber 
cnt);
     +  };
     +
     +struct casereader *
     +casereader_create_random (size_t value_cnt, casenumber case_cnt,
     +                          const struct casereader_random_class *, void 
*aux);
     +
     +#endif /* data/casereader-provider.h */
     Index: merge/src/data/casewriter-provider.h
     ===================================================================
     --- /dev/null      1970-01-01 00:00:00.000000000 +0000
     +++ merge/src/data/casewriter-provider.h   2007-06-05 21:26:32.000000000 
-0700
     @@ -0,0 +1,63 @@
     +/* PSPP - computes sample statistics.
     +   Copyright (C) 2006 Free Software Foundation, Inc.
     +
     +   This program is free software; you can redistribute it and/or
     +   modify it under the terms of the GNU General Public License as
     +   published by the Free Software Foundation; either version 2 of the
     +   License, or (at your option) any later version.
     +
     +   This program is distributed in the hope that it will be useful, but
     +   WITHOUT ANY WARRANTY; without even the implied warranty of
     +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
     +   General Public License for more details.
     +
     +   You should have received a copy of the GNU General Public License
     +   along with this program; if not, write to the Free Software
     +   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
     +   02110-1301, USA. */
     +
     +#ifndef DATA_CASEWRITER_PROVIDER_H
     +#define DATA_CASEWRITER_PROVIDER_H 1
     +
     +#include <data/casewriter.h>
     +
     +struct casewriter_class 
     +  {
     +    /* Mandatory.
     +
     +       Writes case C to WRITER.  Destroys C before returning.
     +
     +       If an I/O error occurs, this function should call
     +       casewriter_force_error on WRITER.  Some I/O error
     +       detection may be deferred to the "destroy" member function
     +       (e.g. writes to disk need not be flushed by "write") . */
     +    void (*write) (struct casewriter *writer, void *aux, struct ccase *c);
     +
     +    /* Mandatory.
     +
     +       Finalizes output and destroys WRITER.
     +
     +       If an I/O error is detected while finalizing output
     +       (e.g. while flushing output to disk), this function should
     +       call casewriter_force_error on WRITER. */
     +    void (*destroy) (struct casewriter *writer, void *aux);
     +
     +    /* Optional: supply if practical and desired by clients.
     +
     +       Finalizes output to WRITER, destroys WRITER, and in its
     +       place returns a casereader that can be used to read back
     +       the data written to WRITER.  WRITER will not be used again
     +       after calling this function, even as an argument to
     +       casewriter_destroy.
     +
     +       If an I/O error is detected while finalizing output
     +       (e.g. while flushing output to disk), this function should
     +       call casewriter_force_error on WRITER.  The caller will
     +       ensure that the error is propagated to the returned
     +       casereader. */
     +    struct casereader *(*convert_to_reader) (struct casewriter *, void 
*aux);
     +  };
     +
     +struct casewriter *casewriter_create (const struct casewriter_class *, 
void *);
     +
     +#endif /* data/casewriter-provider.h */
     
     --
     
     
     
     _______________________________________________
     pspp-dev mailing list
     address@hidden
     http://lists.gnu.org/mailman/listinfo/pspp-dev

-- 
PGP Public key ID: 1024D/2DE827B3 
fingerprint = 8797 A26D 0854 2EAB 0285  A290 8A67 719C 2DE8 27B3
See http://pgp.mit.edu or any PGP keyserver for public key.


Attachment: signature.asc
Description: Digital signature


reply via email to

[Prev in Thread] Current Thread [Next in Thread]