discuss-gnuradio
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [Discuss-gnuradio] GR, WX, Omnithread, and Condition Variables


From: Krzysztof Kamieniecki
Subject: Re: [Discuss-gnuradio] GR, WX, Omnithread, and Condition Variables
Date: Tue, 12 Apr 2005 21:02:33 -0400
User-agent: Mozilla/5.0 (Windows; U; Windows NT 5.0; en-US; rv:1.7.5) Gecko/20041217

The code.

Krzysztof Kamieniecki wrote:
Is anybody using omnithread condition variables with GnuRadio and WX?

I will post my code when I get home later today, but I was hoping somebody had
already solved this type of problem before.

I have created a sink block that stores N number of samples after skipping K
samples into a std::vector<>. There is an omnithread mutex that is used to
protect various data members. There is also a function (wait_for_data(seconds))
that uses a condition variable to wait a finite amount of time for the N
samples to be acquired.
If I start a thread that calls wait_for_data before doing fg.start(), my block's
work function is never called (based on a cout << "DEBUG\n" and cout.flush) and
it looks like the flowgraph is locked up. However the thread continues to loop
and wait_for_data continues to indicate that there is no data is available, so
I beleve the mutex is being released, but I will have to double check that the
acquires and release are balanced.

If I do fg.start(), sleep for 5 seconds, then create the thread everything works
as expected for a while, but things seem to lock up again.

It looks like something is deadlocking, but I'm not exactly certain how
omnithread, WX, and GnuRadio interact in the background.

Any ideas?



_______________________________________________
Discuss-gnuradio mailing list
address@hidden
http://lists.gnu.org/mailman/listinfo/discuss-gnuradio


--
Krzysztof Kamieniecki
callsign:KB1KLB
mailto:address@hidden
#include <kksdr_sampler_sink_c.h>
#include <gr_io_signature.h>
#include <assert.h>
#include <iostream>


kksdr_sampler_sink_c::kksdr_sampler_sink_c (
   int channels,
   int nstored_samples,
   int nskipped_samples)
: gr_sync_block (
    "sampler_sink_c",
    gr_make_io_signature (channels, channels, sizeof (gr_complex)),
    gr_make_io_signature (0, 0, 0))
, d_mutex ()
, d_data_ready_condition (&d_mutex)
, d_data ( channels, nstored_samples )
, d_nstored_samples ( nstored_samples )
, d_nskipped_samples ( nskipped_samples )
, d_nsamples_processed ( 0 )
, d_samples ( 0 )
{
}

bool
kksdr_sampler_sink_c::is_data_ready ()
const
{
  return (0 != d_nsamples_processed) 
      && (d_nsamples_processed == (d_nstored_samples + d_nskipped_samples));
}

//public:

int
kksdr_sampler_sink_c::work (
    int noutput_items,
    gr_vector_const_void_star &input_items,
    gr_vector_void_star &output_items)
{
  bool do_broadcast = false;
        
  {
    omni_mutex_lock lock(d_mutex);

    size_t const ninput_channels = input_items.size();

    assert (ninput_channels == d_data.size()); 
        
    int current_index = 0;

    bool const old_is_data_ready = is_data_ready();

    if ( (current_index + d_nsamples_processed) < (d_nskipped_samples + 
d_nstored_samples) )
    {
      if ((current_index + d_nsamples_processed) < d_nskipped_samples)
      {
        int const samples_to_drop = d_nskipped_samples - (current_index + 
d_nsamples_processed);
        int const delta_samples = std::min (samples_to_drop, noutput_items);
        current_index += delta_samples;
        d_nsamples_processed += delta_samples;
      }

      int const data_start_index = d_nsamples_processed - d_nskipped_samples;

      if (data_start_index >= 0)
      {
        int const samples_to_copy = std::min (noutput_items - current_index, 
d_nstored_samples - data_start_index);

        for (size_t ix_channel = 0; ix_channel < ninput_channels; ix_channel++)
          for (int ix_sample = 0; ix_sample < samples_to_copy; ix_sample++)
            d_data[ix_channel][data_start_index + ix_sample] =  
              *((gr_complex *)(input_items[ix_channel]) + current_index + 
ix_sample);
     
        if (samples_to_copy > 0)
          d_nsamples_processed += samples_to_copy;
      }   
    }     

    do_broadcast = !old_is_data_ready && is_data_ready();
 
    d_samples += noutput_items;
  }

  if ( do_broadcast )
  {
    d_data_ready_condition.broadcast();
  }

  return noutput_items;
}

std::vector<gr_complex>
kksdr_sampler_sink_c::data (
    int index) 
{
  omni_mutex_lock lock ( d_mutex );

  if((index >= 0) && (index < d_data.size()))
    return d_data[index];
  else
    return std::vector<gr_complex>();
}

void
kksdr_sampler_sink_c::set_samples (
   int nstored_samples,
   int nskipped_samples)
{
  omni_mutex_lock lock ( d_mutex );

  d_nstored_samples = nstored_samples;
  d_nskipped_samples = nskipped_samples;

  for ( int i = 0; i < d_data.size(); i++ )
    d_data[i].resize (d_nstored_samples);
 
  int const nsamples_processed = d_nsamples_processed; 
  d_nsamples_processed = std::min ( nsamples_processed, d_nstored_samples + 
d_nskipped_samples );
}


void
kksdr_sampler_sink_c::reset () 
{
  omni_mutex_lock lock ( d_mutex );
 
  d_nsamples_processed = 0;
}

bool
kksdr_sampler_sink_c::wait_for_data_ready (
    int wait_sec) 
{
  omni_mutex_lock lock ( d_mutex );
 
  if ( is_data_ready() )
  {
    return true;
  }
  else
  {
    unsigned long sec;
    unsigned long nsec;
    omni_thread::get_time(&sec,&nsec,wait_sec,0);
    
    if ( d_data_ready_condition.timedwait ( sec, nsec ) )
    {
      return is_data_ready();
    }
    else
    {
      return false;
    } 
  }
}


kksdr_sampler_sink_c_sptr
kksdr_make_sampler_sink_c (
   int channels,
   int nstored_samples,
   int nskipped_samples)
{
  return kksdr_sampler_sink_c_sptr ( new kksdr_sampler_sink_c ( channels, 
nstored_samples, nskipped_samples ) );
}
#ifndef INCLUDED_KKSDR_SAMPLER_SINK_C_H
#define INCLUDED_KKSDR_SAMPLER_SINK_C_H

#include <gr_sync_block.h>
#include <omnithread.h>

class kksdr_sampler_sink_c;
typedef boost::shared_ptr<kksdr_sampler_sink_c> kksdr_sampler_sink_c_sptr;

kksdr_sampler_sink_c_sptr
kksdr_make_sampler_sink_c (
  int channels,
  int nstored_samples,
  int nskipped_samples);

/*!
 * \brief complex sink that writes a fixed number of samples to a vector
 * \ingroup sink
 */

class kksdr_sampler_sink_c : public gr_sync_block {
  friend 
  kksdr_sampler_sink_c_sptr 
  kksdr_make_sampler_sink_c (
    int channels,
    int nstored_samples, 
    int nskipped_samples);

  omni_mutex                             d_mutex;
  omni_condition                         d_data_ready_condition;
  
  std::vector< std::vector<gr_complex> > d_data;
  volatile int                           d_nstored_samples;
  volatile int                           d_nskipped_samples;
  volatile int                           d_nsamples_processed;
  int                                    d_samples;
  
  kksdr_sampler_sink_c (
      int channels,
      int nstored_samples,
      int nskipped_samples);

  bool
  is_data_ready()
  const;

public:
  virtual 
  int 
  work (
      int noutput_items,
      gr_vector_const_void_star &input_items,
      gr_vector_void_star &output_items);

  std::vector<gr_complex>
  data (
      int index);
  
  void
  set_samples (
      int nstored_samples,
      int nskipped_samples);
 
  void 
  reset ();
  
  bool 
  wait_for_data_ready (
      int wait_sec);
  
};

#endif

reply via email to

[Prev in Thread] Current Thread [Next in Thread]