Quantcast

Python block composed of other blocks

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Python block composed of other blocks

maiconkist
Hi list,

I'm trying to write a block in python.

What I want to do is create a block that passes the signal through some well know blocks (such as FFT), do its processing and output the signal.

Basically, what I did so far is:

#####
class energy_detector(gr.hier_block2):
        class energy_calculator(gr.sync_block):
                """ Nested class
                """
                def __init__(self, fft_size):
                        gr.sync_block.__init__(
                                        self,
                                        name = "energy_detector",
                                        in_sig =  [np.dtype((np.float32, fft_size))],
                                        out_sig = [np.float32]
                                        )

                        def work(self, input_items, output_items):
                                """ Work here
                                """

                                return len(out)
                       
                """
                Main block
                """
        def __init__(self, fft_size):
                """
                Constructor
                """

                """ Initialize our energy detector block
                        in_sig:  vector of fft_size elements
                        out_sig: one float for each input vector
                """
                gr.hier_block2.__init__(
                                self,
                                name = "energy_detector",
                                in_sig =  [np.dtype((np.float32, fft_size))],
                                out_sig = [np.float32]
                                )

                #
                # Blocks
                #
                self.fft  =  fft. fft_vcc(fft_size, True, [], False)
                self.v2s = gr.vector_to_stream(gr.sizeof_gr_complex, fft_size)
                self.c2f = gr.complex_to_float()
                self.s2v = gr.stream_to_vector(gr.sizeof_float, fft_size)
                self.ec  = self.energy_calculator(fft_size)

                # Flow graph
                self.connect(self, self.fft, self.v2s, self.c2f, self.s2v, self.ec, self)
########

But it does not work. What can I do to achieve the result I need ?

Thanks
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Python block composed of other blocks

Tom Rondeau-2
On Wed, Jan 30, 2013 at 8:26 AM, maiconkist <[hidden email]> wrote:
Hi list,

I'm trying to write a block in python.

What I want to do is create a block that passes the signal through some well
know blocks (such as FFT), do its processing and output the signal.

Basically, what I did so far is:

#####
class energy_detector(gr.hier_block2):
        class energy_calculator(gr.sync_block):
                """ Nested class
                """
                def __init__(self, fft_size):
                        gr.sync_block.__init__(
                                        self,
                                        name = "energy_detector",
                                        in_sig =  [np.dtype((np.float32, fft_size))],
                                        out_sig = [np.float32]
                                        )

                        def work(self, input_items, output_items):
                                """ Work here
                                """

                                return len(out)

                """
                Main block
                """
        def __init__(self, fft_size):
                """
                Constructor
                """

                """ Initialize our energy detector block
                        in_sig:  vector of fft_size elements
                        out_sig: one float for each input vector
                """
                gr.hier_block2.__init__(
                                self,
                                name = "energy_detector",
                                in_sig =  [np.dtype((np.float32, fft_size))],
                                out_sig = [np.float32]
                                )

                #
                # Blocks
                #
                self.fft  =  fft. fft_vcc(fft_size, True, [], False)
                self.v2s = gr.vector_to_stream(gr.sizeof_gr_complex, fft_size)
                self.c2f = gr.complex_to_float()
                self.s2v = gr.stream_to_vector(gr.sizeof_float, fft_size)
                self.ec  = self.energy_calculator(fft_size)

                # Flow graph
                self.connect(self, self.fft, self.v2s, self.c2f, self.s2v, self.ec, self)
########

But it does not work. What can I do to achieve the result I need ?

Thanks

Sorry, but "it does not work" is not a very useful way to get us to help you. What about it does not work? What are the error messages, if any, that you are seeing. What steps have you taken after writing the code to test it? What version of GNU Radio are you using?

You need to help us help you.

Tom
 

_______________________________________________
Discuss-gnuradio mailing list
[hidden email]
https://lists.gnu.org/mailman/listinfo/discuss-gnuradio
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Python block composed of other blocks

maiconkist
Hi Tom,

I simply try to instantiate a object of class energy_detector and got the following error:

TypeError: __init__() got an unexpected keyword argument 'in_sig'

I imagine that a gr.hier_block2 doesn't have such input. I dont know what can be the superclass of my "top block".
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Python block composed of other blocks

maiconkist
I fixed the problem I had by changing in_sing for "input_signature".

But now my testbench never finish executing and print a value that is the 2x the correct value.

My energy_detector code is as follows:

#####
import numpy as np

from gnuradio import gr, blks2

from gnuradio import fft
from gnuradio import uhd

class energy_detector(gr.hier_block2):
        class energy_calculator(gr.sync_block):
                """ Nested class
                """
                def __init__(self, fft_size):
                        gr.sync_block.__init__(
                                        self,
                                        name = "energy_detector",
                                        in_sig =  [np.dtype((np.float32, fft_size))],
                                        out_sig = [np.float32]
                                        )

                def work(self, input_items, output_items):
                        """
                        Energy calculator main function
                        """
                        in0 = input_items[0][:]
                        out = output_items[0][:]

                        out[:] = np.sum( np.square(in0) )
                        print "energy = %f" % out[0]

                        self.consume_each(len(in0))

                        return len(out)
                       

        def __init__(self, fft_size):
                """
                Constructor
                """

                """ Initialize our energy detector block
                        in_sig:  vector of fft_size elements
                        out_sig: one float for each input vector
                """
                gr.hier_block2.__init__(
                                self,
                                name = "energy_detector",
                                input_signature =  gr.io_signature(1, 1, fft_size * gr.sizeof_float),
                                output_signature = gr.io_signature(1, 1, gr.sizeof_float)
                                #in_sig =  [np.dtype((np.float32, fft_size))],
                                #out_sig = [np.float32]
                                )

                #
                # Blocks
                #
                self.fft = fft.fft_vfc(fft_size, True, [], False)
                self.v2s = gr.vector_to_stream(gr.sizeof_gr_complex, fft_size)
                self.c2f = gr.complex_to_float()
                self.s2v = gr.stream_to_vector(gr.sizeof_float, fft_size)
                self.ec  = self.energy_calculator(fft_size)

                # Flow graph
                self.connect(self, self.fft, self.v2s, self.c2f, self.s2v, self.ec, self)

#####


And my testbench do this:

####
                src_data = (1, 1)
                expected_result = np.sum(np.square(src_data))

                # blocks
                src = gr.vector_source_f (src_data)
                s2v = gr.stream_to_vector(gr.sizeof_float, len(src_data))
                ed = energy_detector(len(src_data))
                v2s = gr.vector_to_stream(gr.sizeof_float, 1)
                dst = gr.vector_sink_f ()

                # flowgraph
                self.tb.connect (src, s2v, ed, v2s, dst)
                self.tb.run ()

                result_data = dst.data ()
                self.assertEqual (expected_result, result_data)

####

Any suggestion ?
Loading...