Dear All,
I have been trying to implement a complete Pi/4 DQPSK transmitter and receiver for the past 3 weeks. I started by basing my work on CQPSK.py available on Osmocom's Tetra project, but a few of the classes and functions used in that version were defunct so I started from scratch by creating a 2D constellation and implementing other required stages of a modulator/demodulator (packing,unpacking,gray coding, diff. coding and chunks_to_symbols()). It worked fine, but when I added an RRC filter in both the modulator and the demodulator my output was severely distorted. After a week of struggling with the issue, I have gone back to CQPSK.py. I replaced the defunct functions with the replacements (psk.constellatio[8] to digital.constellation_8psk.points() ), there are
no function or syntax errors, but the output of the modulation and demodulation is still severely distorted. No matter what the input bit string is, It always starts with a long string of 1s, then comes a string of seemingly random 0s and 1s.
My current test input is a 432 bit file based on the 0000000011111111 pattern.
I would greatly appreciate it if someone could take a look at my codes below and advise me on the matter.
Regards,
Vahid
-----------------------------------------------------------------------------------------------------------------
MODEM code (modified CQPSK.py):
#
# Copyright 2005,2006,2007 Free Software Foundation, Inc.
#
# cqpsk.py (C) Copyright 2009, KA1RBI
#
# This file is part of GNU
Radio
#
# GNU Radio is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3, or (at your option)
# any later version.
#
# GNU Radio is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with GNU Radio; see the file COPYING. If not, write to
# the Free Software Foundation, Inc., 51 Franklin Street,
# Boston, MA 02110-1301, USA.
#
# See gnuradio-examples/python/digital for examples
"""
differential PI/4 CQPSK modulation and demodulation.
"""
from gnuradio import gr, gru
from math import pi,
sqrt
#import psk
import cmath
from pprint import pprint
_def_has_gr_digital = False
# address gnuradio 3.5.x changes
try:
from gnuradio import modulation_utils
except ImportError:
from gnuradio import digital
_def_has_gr_digital = True
# default values (used in __init__ and add_options)
_def_samples_per_symbol = 10
_def_excess_bw = 0.35
_def_gray_code = True
_def_verbose = False
_def_log = False
_def_costas_alpha = 0.15
_def_gain_mu = None
_def_mu = 0.5
_def_omega_relative_limit = 0.005
# /////////////////////////////////////////////////////////////////////////////
# CQPSK modulator
#
/////////////////////////////////////////////////////////////////////////////
class cqpsk_mod(gr.hier_block2):
def __init__(self,
samples_per_symbol=_def_samples_per_symbol,
excess_bw=_def_excess_bw,
verbose=_def_verbose,
log=_def_log):
"""
Hierarchical block for RRC-filtered QPSK modulation.
The input is a byte stream (unsigned char) and the
output is the complex modulated signal at
baseband.
@param samples_per_symbol: samples per symbol >= 2
@type samples_per_symbol: integer
@param excess_bw: Root-raised cosine filter excess bandwidth
@type excess_bw: float
@param verbose: Print information about modulator?
@type verbose: bool
@param debug: Print modualtion data to files?
@type debug: bool
"""
gr.hier_block2.__init__(self, "cqpsk_mod",
gr.io_signature(1, 1, gr.sizeof_char), # Input signature
gr.io_signature(1, 1,
gr.sizeof_gr_complex)) # Output signature
self._samples_per_symbol = samples_per_symbol
self._excess_bw = excess_bw
if not isinstance(samples_per_symbol, int) or samples_per_symbol < 2:
raise TypeError, ("sbp must be an integer >= 2, is %d" % samples_per_symbol)
ntaps = 11 * samples_per_symbol
arity = 8
# turn bytes into k-bit vectors
self.bytes2chunks = \
gr.packed_to_unpacked_bb(self.bits_per_symbol(), gr.GR_MSB_FIRST)
#
0 +45 1 [+1]
# 1 +135 3 [+3]
# 2 -45 7 [-1]
# 3 -135 5 [-3]
self.pi4map = [1, 3, 7, 5]
self.symbol_mapper = gr.map_bb(self.pi4map)
self.diffenc = gr.diff_encoder_bb(arity)
self.constel = digital.constellation_8psk()
self.chunks2symbols = gr.chunks_to_symbols_bc(self.constel.points())
# pulse shaping
filter
self.rrc_taps = gr.firdes.root_raised_cosine(
self._samples_per_symbol, # gain (sps since we're interpolating by sps)
self._samples_per_symbol, # sampling rate
1.0, # symbol rate
self._excess_bw, # excess bandwidth (roll-off factor)
ntaps)
self.rrc_filter = gr.interp_fir_filter_ccf(self._samples_per_symbol, self.rrc_taps)
if verbose:
self._print_verbage()
if log:
self._setup_logging()
# Connect & Initialize base class
self.connect(self, self.bytes2chunks, self.symbol_mapper, self.diffenc,
self.chunks2symbols, self.rrc_filter, self)
def samples_per_symbol(self):
return self._samples_per_symbol
def bits_per_symbol(self=None): # staticmethod that's also callable on an instance
return 2
bits_per_symbol = staticmethod(bits_per_symbol) # make it a static method. RTFM
def _print_verbage(self):
print "\nModulator:"
print "bits per symbol: %d" % self.bits_per_symbol()
print "Gray code: %s" % self._gray_code
print "RRS roll-off factor: %f" % self._excess_bw
def _setup_logging(self):
print "Modulation logging turned on."
self.connect(self.bytes2chunks,
gr.file_sink(gr.sizeof_char,
"tx_bytes2chunks.dat"))
self.connect(self.symbol_mapper,
gr.file_sink(gr.sizeof_char, "tx_graycoder.dat"))
self.connect(self.diffenc,
gr.file_sink(gr.sizeof_char, "tx_diffenc.dat"))
self.connect(self.chunks2symbols,
gr.file_sink(gr.sizeof_gr_complex, "tx_chunks2symbols.dat"))
self.connect(self.rrc_filter,
gr.file_sink(gr.sizeof_gr_complex, "tx_rrc_filter.dat"))
def add_options(parser):
"""
Adds QPSK modulation-specific options to the standard parser
"""
parser.add_option("", "--excess-bw", type="float", default=_def_excess_bw,
help="set RRC excess bandwith factor [default=%default] (PSK)")
parser.add_option("", "--no-gray-code",
dest="gray_code",
action="", default=_def_gray_code,
help="disable gray coding on modulated bits (PSK)")
add_options=staticmethod(add_options)
def extract_kwargs_from_options(options):
"""
Given command line options, create dictionary suitable for passing to __init__
"""
return
modulation_utils.extract_kwargs_from_options(dqpsk_mod.__init__,
('self',), options)
extract_kwargs_from_options=staticmethod(extract_kwargs_from_options)
# /////////////////////////////////////////////////////////////////////////////
# CQPSK demodulator
#
# /////////////////////////////////////////////////////////////////////////////
class cqpsk_demod(gr.hier_block2):
def
__init__(self,
samples_per_symbol=_def_samples_per_symbol,
excess_bw=_def_excess_bw,
costas_alpha=_def_costas_alpha,
gain_mu=_def_gain_mu,
mu=_def_mu,
omega_relative_limit=_def_omega_relative_limit,
gray_code=_def_gray_code,
verbose=_def_verbose,
log=_def_log):
"""
Hierarchical block for RRC-filtered CQPSK demodulation
The input is the complex modulated signal at baseband.
The output is a stream of floats in [ -3 / -1 / +1 / +3 ]
@param samples_per_symbol: samples per symbol >= 2
@type samples_per_symbol: float
@param excess_bw: Root-raised cosine filter excess bandwidth
@type excess_bw: float
@param costas_alpha: loop filter gain
@type costas_alphas:
float
@param gain_mu: for M&M block
@type gain_mu: float
@param mu: for M&M block
@type mu: float
@param omega_relative_limit: for M&M block
@type omega_relative_limit: float
@param gray_code: Tell modulator to Gray code the bits
@type gray_code: bool
@param verbose: Print information about modulator?
@type verbose: bool
@param debug: Print modualtion data to files?
@type debug:
bool
"""
gr.hier_block2.__init__(self, "cqpsk_demod",
gr.io_signature(1, 1, gr.sizeof_gr_complex), # Input signature
gr.io_signature(1, 1, gr.sizeof_float)) # Output signature
self._samples_per_symbol = samples_per_symbol
self._excess_bw = excess_bw
self._costas_alpha = costas_alpha
self._mm_gain_mu = gain_mu
self._mm_mu = mu
self._mm_omega_relative_limit =
omega_relative_limit
self._gray_code = gray_code
if samples_per_symbol < 2:
raise TypeError, "sbp must be >= 2, is %d" % samples_per_symbol
arity = pow(2,self.bits_per_symbol())
# Automatic gain control
scale = (1.0/16384.0)
self.pre_scaler = gr.multiply_const_cc(scale) # scale the signal from full-range to +-1
#self.agc = gr.agc2_cc(0.6e-1, 1e-3, 1, 1, 100)
self.agc = gr.feedforward_agc_cc(16, 2.0)
#
RRC data filter
ntaps = 11 * samples_per_symbol
self.rrc_taps = gr.firdes.root_raised_cosine(
1.0, # gain
self._samples_per_symbol, # sampling rate
1.0, # symbol rate
self._excess_bw, # excess bandwidth (roll-off
factor)
ntaps)
self.rrc_filter=gr.interp_fir_filter_ccf(1, self.rrc_taps)
if not self._mm_gain_mu:
sbs_to_mm = {2: 0.050, 3: 0.075, 4: 0.11, 5: 0.125, 6: 0.15, 7: 0.15}
self._mm_gain_mu = sbs_to_mm[samples_per_symbol]
self._mm_omega = self._samples_per_symbol
self._mm_gain_omega = .25 * self._mm_gain_mu * self._mm_gain_mu
self._costas_beta = 0.25 * self._costas_alpha * self._costas_alpha
fmin =
-0.025
fmax = 0.025
if not _def_has_gr_digital:
self.receiver=gr.mpsk_receiver_cc(arity, pi/4.0,
self._costas_alpha, self._costas_beta,
fmin,
fmax,
self._mm_mu, self._mm_gain_mu,
self._mm_omega, self._mm_gain_omega,
self._mm_omega_relative_limit)
else:
self.receiver=digital.mpsk_receiver_cc(arity, pi/4.0,
2*pi/150,
fmin, fmax,
self._mm_mu,
self._mm_gain_mu,
self._mm_omega, self._mm_gain_omega,
self._mm_omega_relative_limit)
self.receiver.set_alpha(self._costas_alpha)
self.receiver.set_beta(self._costas_beta)
# Perform Differential decoding on the constellation
self.diffdec =
gr.diff_phasor_cc()
# take angle of the difference (in radians)
self.to_float = gr.complex_to_arg()
# convert from radians such that signal is in -3/-1/+1/+3
self.rescale = gr.multiply_const_ff( 1 / (pi / 4) )
if verbose:
self._print_verbage()
if log:
self._setup_logging()
# Connect & Initialize base class
self.connect(self, self.pre_scaler, self.agc, self.rrc_filter,
self.receiver,
self.diffdec, self.to_float, self.rescale, self)
def samples_per_symbol(self):
return self._samples_per_symbol
def bits_per_symbol(self=None): # staticmethod that's also callable on an instance
return 2
bits_per_symbol = staticmethod(bits_per_symbol) # make it a static method. RTFM
def _print_verbage(self):
print "\nDemodulator:"
print "bits per symbol: %d" % self.bits_per_symbol()
print "Gray
code: %s" % self._gray_code
print "RRC roll-off factor: %.2f" % self._excess_bw
print "Costas Loop alpha: %.2e" % self._costas_alpha
print "Costas Loop beta: %.2e" % self._costas_beta
print "M&M mu: %.2f" % self._mm_mu
print "M&M mu gain: %.2e" % self._mm_gain_mu
print "M&M omega: %.2f" % self._mm_omega
print "M&M omega
gain: %.2e" % self._mm_gain_omega
print "M&M omega limit: %.2f" % self._mm_omega_relative_limit
def _setup_logging(self):
print "Modulation logging turned on."
self.connect(self.pre_scaler,
gr.file_sink(gr.sizeof_gr_complex, "rx_prescaler.dat"))
self.connect(self.agc,
gr.file_sink(gr.sizeof_gr_complex, "rx_agc.dat"))
self.connect(self.rrc_filter,
gr.file_sink(gr.sizeof_gr_complex, "rx_rrc_filter.dat"))
self.connect(self.receiver,
gr.file_sink(gr.sizeof_gr_complex, "rx_receiver.dat"))
self.connect(self.diffdec,
gr.file_sink(gr.sizeof_gr_complex, "rx_diffdec.dat"))
self.connect(self.to_float,
gr.file_sink(gr.sizeof_float, "rx_to_float.dat"))
self.connect(self.rescale,
gr.file_sink(gr.sizeof_float, "rx_rescale.dat"))
def add_options(parser):
"""
Adds modulation-specific options to the standard parser
"""
parser.add_option("", "--excess-bw", type="float", default=_def_excess_bw,
help="set RRC excess bandwith factor [default=%default] (PSK)")
parser.add_option("",
"--no-gray-code", dest="gray_code",
action="", default=_def_gray_code,
help="disable gray coding on modulated bits (PSK)")
parser.add_option("", "--costas-alpha", type="float", default=_def_costas_alpha,
help="set Costas loop alpha value [default=%default] (PSK)")
parser.add_option("", "--gain-mu", type="float",
default=_def_gain_mu,
help="set M&M symbol sync loop gain mu value [default=%default] (PSK)")
parser.add_option("", "--mu", type="float", default=_def_mu,
help="set M&M symbol sync loop mu value [default=%default] (PSK)")
add_options=staticmethod(add_options)
def extract_kwargs_from_options(options):
"""
Given command line options, create dictionary suitable for passing to __init__
"""
return modulation_utils.extract_kwargs_from_options(
cqpsk_demod.__init__, ('self',), options)
extract_kwargs_from_options=staticmethod(extract_kwargs_from_options)
#
# Add these to the mod/demod registry
#
#modulation_utils.add_type_1_mod('cqpsk', cqpsk_mod)
#modulation_utils.add_type_1_demod('cqpsk', cqpsk_demod)
--------------------------------------------------------------------------------------------------------------
Modulator:
#!/usr/bin/env python
import sys
import math
from gnuradio import gr, gru, audio, eng_notation, blks2, optfir
from gnuradio.eng_option import eng_option
from optparse import OptionParser
# Load it locally or from the module
#try:
import cqpsk
#except:
# from tetra_mod import cqpsk
# accepts an input file in unsigned char format
# applies frequency translation, resampling (interpolation/decimation)
class my_top_block(gr.top_block):
def __init__(self):
gr.top_block.__init__(self)
parser = OptionParser(option_class=eng_option)
parser.add_option("-c", "--calibration", type="eng_float", default=0, help="freq offset")
parser.add_option("-i", "--input-file", type="string", default="in.dat", help="specify the input file")
parser.add_option("-l", "--log", action="", default=False, help="dump debug .dat files")
parser.add_option("-L", "--low-pass", type="eng_float", default=25e3, help="low pass cut-off", metavar="Hz")
parser.add_option("-o", "--output-file",
type="string", default="out.dat", help="specify the output file")
parser.add_option("-s", "--sample-rate", type="int", default=100000000/512, help="input sample rate")
parser.add_option("-v", "--verbose", action="", default=False, help="dump demodulation data")
(options, args) = parser.parse_args()
sample_rate = options.sample_rate
symbol_rate = 18000
sps = sample_rate // symbol_rate
IN = gr.file_source(gr.sizeof_char, options.input_file, repeat = False)
MOD = cqpsk.cqpsk_mod( samples_per_symbol = 2,
excess_bw=0.35,
log=options.log,
verbose=options.verbose)
OUT = gr.file_sink(gr.sizeof_gr_complex, options.output_file)
# r = float(sample_rate) / float(new_sample_rate)
# INTERPOLATOR = gr.fractional_interpolator_cc(0, r)
self.connect(IN,MOD, OUT)
if __name__ == "__main__":
try:
my_top_block().run()
except KeyboardInterrupt:
tb.stop()
------------------------------------------------------------------------------------------------------------------------------
Demodulator:
#!/usr/bin/env python
import sys
import math
from gnuradio import gr, gru, audio, eng_notation, blks2, optfir,
digital
from gnuradio.eng_option import eng_option
from optparse import OptionParser
# Load it locally or from the module
try:
import cqpsk
except:
from tetra_demod import cqpsk
# accepts an input file in complex format
# applies frequency translation, resampling (interpolation/decimation)
class my_top_block(gr.top_block):
def __init__(self):
gr.top_block.__init__(self)
parser = OptionParser(option_class=eng_option)
parser.add_option("-c", "--calibration", type="eng_float", default=0, help="freq offset")
parser.add_option("-i", "--input-file", type="string", default="in.dat", help="specify the input file")
parser.add_option("-l", "--log", action="", default=False, help="dump debug .dat files")
parser.add_option("-L", "--low-pass", type="eng_float", default=25e3, help="low pass cut-off", metavar="Hz")
parser.add_option("-o", "--output-file", type="string", default="out.dat", help="specify the output file")
parser.add_option("-s",
"--sample-rate", type="int", default=100000000/512, help="input sample rate")
parser.add_option("-v", "--verbose", action="", default=False, help="dump demodulation data")
(options, args) = parser.parse_args()
sample_rate = options.sample_rate
symbol_rate = 18000
sps = sample_rate //
symbol_rate
# output rate will be 36,000
ntaps = 11 * sps
new_sample_rate = symbol_rate * sps
channel_taps = gr.firdes.low_pass(1.0, sample_rate, options.low_pass, options.low_pass * 0.1, gr.firdes.WIN_HANN)
FILTER = gr.freq_xlating_fir_filter_ccf(1, channel_taps, options.calibration, sample_rate)
sys.stderr.write("sample rate: %d\n" %(sample_rate))
IN = gr.file_source(gr.sizeof_gr_complex, options.input_file, repeat = False)
DEMOD = cqpsk.cqpsk_demod( samples_per_symbol = sps,
excess_bw=0.35,
costas_alpha=0.03,
gain_mu=0.05,
mu=0.05,
omega_relative_limit=0.05,
log=options.log,
verbose=options.verbose)
self.convert = digital.binary_slicer_fb()
self.convert2 = digital.binary_slicer_fb()
OUT = gr.file_sink(gr.sizeof_float, options.output_file)
self.sink1 = gr.vector_sink_b()
r = float(sample_rate) / float(new_sample_rate)
INTERPOLATOR = gr.fractional_interpolator_cc(0, r)
self.connect(IN, FILTER, INTERPOLATOR, DEMOD, OUT)
self.connect (DEMOD,self.convert, self.sink1)
def print_data(self):
print "data in sink1 is: ",self.sink1.data()
#print "data in sink2 is: ",self.sink2.data()
if __name__ ==
"__main__":
try:
tb = my_top_block()
tb.run()
tb.print_data()
except KeyboardInterrupt:
tb.stop()