[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
Re: [Discuss-gnuradio] GR, WX, Omnithread, and Condition Variables
From: |
Krzysztof Kamieniecki |
Subject: |
Re: [Discuss-gnuradio] GR, WX, Omnithread, and Condition Variables |
Date: |
Tue, 12 Apr 2005 21:02:33 -0400 |
User-agent: |
Mozilla/5.0 (Windows; U; Windows NT 5.0; en-US; rv:1.7.5) Gecko/20041217 |
The code.
Krzysztof Kamieniecki wrote:
Is anybody using omnithread condition variables with GnuRadio and WX?
I will post my code when I get home later today, but I was hoping somebody had
already solved this type of problem before.
I have created a sink block that stores N number of samples after skipping K
samples into a std::vector<>. There is an omnithread mutex that is used to
protect various data members. There is also a function (wait_for_data(seconds))
that uses a condition variable to wait a finite amount of time for the N
samples to be acquired.
If I start a thread that calls wait_for_data before doing fg.start(), my block's
work function is never called (based on a cout << "DEBUG\n" and cout.flush) and
it looks like the flowgraph is locked up. However the thread continues to loop
and wait_for_data continues to indicate that there is no data is available, so
I beleve the mutex is being released, but I will have to double check that the
acquires and release are balanced.
If I do fg.start(), sleep for 5 seconds, then create the thread everything works
as expected for a while, but things seem to lock up again.
It looks like something is deadlocking, but I'm not exactly certain how
omnithread, WX, and GnuRadio interact in the background.
Any ideas?
_______________________________________________
Discuss-gnuradio mailing list
address@hidden
http://lists.gnu.org/mailman/listinfo/discuss-gnuradio
--
Krzysztof Kamieniecki
callsign:KB1KLB
mailto:address@hidden
#include <kksdr_sampler_sink_c.h>
#include <gr_io_signature.h>
#include <assert.h>
#include <iostream>
kksdr_sampler_sink_c::kksdr_sampler_sink_c (
int channels,
int nstored_samples,
int nskipped_samples)
: gr_sync_block (
"sampler_sink_c",
gr_make_io_signature (channels, channels, sizeof (gr_complex)),
gr_make_io_signature (0, 0, 0))
, d_mutex ()
, d_data_ready_condition (&d_mutex)
, d_data ( channels, nstored_samples )
, d_nstored_samples ( nstored_samples )
, d_nskipped_samples ( nskipped_samples )
, d_nsamples_processed ( 0 )
, d_samples ( 0 )
{
}
bool
kksdr_sampler_sink_c::is_data_ready ()
const
{
return (0 != d_nsamples_processed)
&& (d_nsamples_processed == (d_nstored_samples + d_nskipped_samples));
}
//public:
int
kksdr_sampler_sink_c::work (
int noutput_items,
gr_vector_const_void_star &input_items,
gr_vector_void_star &output_items)
{
bool do_broadcast = false;
{
omni_mutex_lock lock(d_mutex);
size_t const ninput_channels = input_items.size();
assert (ninput_channels == d_data.size());
int current_index = 0;
bool const old_is_data_ready = is_data_ready();
if ( (current_index + d_nsamples_processed) < (d_nskipped_samples +
d_nstored_samples) )
{
if ((current_index + d_nsamples_processed) < d_nskipped_samples)
{
int const samples_to_drop = d_nskipped_samples - (current_index +
d_nsamples_processed);
int const delta_samples = std::min (samples_to_drop, noutput_items);
current_index += delta_samples;
d_nsamples_processed += delta_samples;
}
int const data_start_index = d_nsamples_processed - d_nskipped_samples;
if (data_start_index >= 0)
{
int const samples_to_copy = std::min (noutput_items - current_index,
d_nstored_samples - data_start_index);
for (size_t ix_channel = 0; ix_channel < ninput_channels; ix_channel++)
for (int ix_sample = 0; ix_sample < samples_to_copy; ix_sample++)
d_data[ix_channel][data_start_index + ix_sample] =
*((gr_complex *)(input_items[ix_channel]) + current_index +
ix_sample);
if (samples_to_copy > 0)
d_nsamples_processed += samples_to_copy;
}
}
do_broadcast = !old_is_data_ready && is_data_ready();
d_samples += noutput_items;
}
if ( do_broadcast )
{
d_data_ready_condition.broadcast();
}
return noutput_items;
}
std::vector<gr_complex>
kksdr_sampler_sink_c::data (
int index)
{
omni_mutex_lock lock ( d_mutex );
if((index >= 0) && (index < d_data.size()))
return d_data[index];
else
return std::vector<gr_complex>();
}
void
kksdr_sampler_sink_c::set_samples (
int nstored_samples,
int nskipped_samples)
{
omni_mutex_lock lock ( d_mutex );
d_nstored_samples = nstored_samples;
d_nskipped_samples = nskipped_samples;
for ( int i = 0; i < d_data.size(); i++ )
d_data[i].resize (d_nstored_samples);
int const nsamples_processed = d_nsamples_processed;
d_nsamples_processed = std::min ( nsamples_processed, d_nstored_samples +
d_nskipped_samples );
}
void
kksdr_sampler_sink_c::reset ()
{
omni_mutex_lock lock ( d_mutex );
d_nsamples_processed = 0;
}
bool
kksdr_sampler_sink_c::wait_for_data_ready (
int wait_sec)
{
omni_mutex_lock lock ( d_mutex );
if ( is_data_ready() )
{
return true;
}
else
{
unsigned long sec;
unsigned long nsec;
omni_thread::get_time(&sec,&nsec,wait_sec,0);
if ( d_data_ready_condition.timedwait ( sec, nsec ) )
{
return is_data_ready();
}
else
{
return false;
}
}
}
kksdr_sampler_sink_c_sptr
kksdr_make_sampler_sink_c (
int channels,
int nstored_samples,
int nskipped_samples)
{
return kksdr_sampler_sink_c_sptr ( new kksdr_sampler_sink_c ( channels,
nstored_samples, nskipped_samples ) );
}
#ifndef INCLUDED_KKSDR_SAMPLER_SINK_C_H
#define INCLUDED_KKSDR_SAMPLER_SINK_C_H
#include <gr_sync_block.h>
#include <omnithread.h>
class kksdr_sampler_sink_c;
typedef boost::shared_ptr<kksdr_sampler_sink_c> kksdr_sampler_sink_c_sptr;
kksdr_sampler_sink_c_sptr
kksdr_make_sampler_sink_c (
int channels,
int nstored_samples,
int nskipped_samples);
/*!
* \brief complex sink that writes a fixed number of samples to a vector
* \ingroup sink
*/
class kksdr_sampler_sink_c : public gr_sync_block {
friend
kksdr_sampler_sink_c_sptr
kksdr_make_sampler_sink_c (
int channels,
int nstored_samples,
int nskipped_samples);
omni_mutex d_mutex;
omni_condition d_data_ready_condition;
std::vector< std::vector<gr_complex> > d_data;
volatile int d_nstored_samples;
volatile int d_nskipped_samples;
volatile int d_nsamples_processed;
int d_samples;
kksdr_sampler_sink_c (
int channels,
int nstored_samples,
int nskipped_samples);
bool
is_data_ready()
const;
public:
virtual
int
work (
int noutput_items,
gr_vector_const_void_star &input_items,
gr_vector_void_star &output_items);
std::vector<gr_complex>
data (
int index);
void
set_samples (
int nstored_samples,
int nskipped_samples);
void
reset ();
bool
wait_for_data_ready (
int wait_sec);
};
#endif