Add files via upload

Python examples of Phaser radar operation
This commit is contained in:
Jon Kraft
2023-12-05 09:55:55 -07:00
committed by GitHub
parent 2ee34ff179
commit 266b3b0058
5 changed files with 1706 additions and 0 deletions

549
CFAR_RADAR_Waterfall.py Normal file
View File

@@ -0,0 +1,549 @@
#!/usr/bin/env python3
# Must use Python 3
# Copyright (C) 2022 Analog Devices, Inc.
#
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
# - Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# - Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in
# the documentation and/or other materials provided with the
# distribution.
# - Neither the name of Analog Devices, Inc. nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
# - The use of this software may or may not infringe the patent rights
# of one or more patent holders. This license does not release you
# from the requirement that you obtain separate licenses from these
# patent holders to use this software.
# - Use of the software either in source or binary form, must be run
# on or directly connected to an Analog Devices Inc. component.
#
# THIS SOFTWARE IS PROVIDED BY ANALOG DEVICES "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
# INCLUDING, BUT NOT LIMITED TO, NON-INFRINGEMENT, MERCHANTABILITY AND FITNESS FOR A
# PARTICULAR PURPOSE ARE DISCLAIMED.
#
# IN NO EVENT SHALL ANALOG DEVICES BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, INTELLECTUAL PROPERTY
# RIGHTS, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
# BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
# STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
# THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
'''FMCW Radar Demo with Phaser (CN0566)
Jon Kraft, Nov 19 2023'''
# Imports
import adi
from target_detection_dbfs import cfar
import sys
import time
import matplotlib.pyplot as plt
import numpy as np
import pyqtgraph as pg
from PyQt5.QtCore import Qt
from PyQt5.QtWidgets import *
from pyqtgraph.Qt import QtCore, QtGui
# Instantiate all the Devices
rpi_ip = "ip:phaser.local" # IP address of the Raspberry Pi
sdr_ip = "ip:192.168.2.1" # "192.168.2.1, or pluto.local" # IP address of the Transceiver Block
my_sdr = adi.ad9361(uri=sdr_ip)
my_phaser = adi.CN0566(uri=rpi_ip, sdr=my_sdr)
# Initialize both ADAR1000s, set gains to max, and all phases to 0
my_phaser.configure(device_mode="rx")
my_phaser.load_gain_cal()
my_phaser.load_phase_cal()
for i in range(0, 8):
my_phaser.set_chan_phase(i, 0)
gain_list = [8, 34, 84, 127, 127, 84, 34, 8] # Blackman taper
for i in range(0, len(gain_list)):
my_phaser.set_chan_gain(i, gain_list[i], apply_cal=True)
# Setup Raspberry Pi GPIO states
try:
my_phaser._gpios.gpio_tx_sw = 0 # 0 = TX_OUT_2, 1 = TX_OUT_1
my_phaser._gpios.gpio_vctrl_1 = 1 # 1=Use onboard PLL/LO source (0=disable PLL and VCO, and set switch to use external LO input)
my_phaser._gpios.gpio_vctrl_2 = 1 # 1=Send LO to transmit circuitry (0=disable Tx path, and send LO to LO_OUT)
except:
my_phaser.gpios.gpio_tx_sw = 0 # 0 = TX_OUT_2, 1 = TX_OUT_1
my_phaser.gpios.gpio_vctrl_1 = 1 # 1=Use onboard PLL/LO source (0=disable PLL and VCO, and set switch to use external LO input)
my_phaser.gpios.gpio_vctrl_2 = 1 # 1=Send LO to transmit circuitry (0=disable Tx path, and send LO to LO_OUT)
sample_rate = 0.6e6
center_freq = 2.1e9
signal_freq = 100e3
num_slices = 50
fft_size = 1024 * 16
img_array = np.ones((num_slices, fft_size))*(-100)
# Configure SDR Rx
my_sdr.sample_rate = int(sample_rate)
my_sdr.rx_lo = int(center_freq) # set this to output_freq - (the freq of the HB100)
my_sdr.rx_enabled_channels = [0, 1] # enable Rx1 (voltage0) and Rx2 (voltage1)
my_sdr.rx_buffer_size = int(fft_size)
my_sdr.gain_control_mode_chan0 = "manual" # manual or slow_attack
my_sdr.gain_control_mode_chan1 = "manual" # manual or slow_attack
my_sdr.rx_hardwaregain_chan0 = int(30) # must be between -3 and 70
my_sdr.rx_hardwaregain_chan1 = int(30) # must be between -3 and 70
# Configure SDR Tx
my_sdr.tx_lo = int(center_freq)
my_sdr.tx_enabled_channels = [0, 1]
my_sdr.tx_cyclic_buffer = True # must set cyclic buffer to true for the tdd burst mode. Otherwise Tx will turn on and off randomly
my_sdr.tx_hardwaregain_chan0 = -88 # must be between 0 and -88
my_sdr.tx_hardwaregain_chan1 = -0 # must be between 0 and -88
# Configure the ADF4159 Rampling PLL
output_freq = 12.145e9
BW = 500e6
num_steps = 1000
ramp_time = 1.2e3 # us
ramp_time_s = ramp_time / 1e6
my_phaser.frequency = int(output_freq / 4) # Output frequency divided by 4
my_phaser.freq_dev_range = int(
BW / 4
) # frequency deviation range in Hz. This is the total freq deviation of the complete freq ramp
my_phaser.freq_dev_step = int(
(BW/4) / num_steps
) # frequency deviation step in Hz. This is fDEV, in Hz. Can be positive or negative
my_phaser.freq_dev_time = int(
ramp_time
) # total time (in us) of the complete frequency ramp
print("requested freq dev time = ", ramp_time)
print("actual freq dev time = ", my_phaser.freq_dev_time)
my_phaser.delay_word = 4095 # 12 bit delay word. 4095*PFD = 40.95 us. For sawtooth ramps, this is also the length of the Ramp_complete signal
my_phaser.delay_clk = "PFD" # can be 'PFD' or 'PFD*CLK1'
my_phaser.delay_start_en = 0 # delay start
my_phaser.ramp_delay_en = 0 # delay between ramps.
my_phaser.trig_delay_en = 0 # triangle delay
my_phaser.ramp_mode = "continuous_triangular" # ramp_mode can be: "disabled", "continuous_sawtooth", "continuous_triangular", "single_sawtooth_burst", "single_ramp_burst"
my_phaser.sing_ful_tri = (
0 # full triangle enable/disable -- this is used with the single_ramp_burst mode
)
my_phaser.tx_trig_en = 0 # start a ramp with TXdata
my_phaser.enable = 0 # 0 = PLL enable. Write this last to update all the registers
# Print config
print(
"""
CONFIG:
Sample rate: {sample_rate}MHz
Num samples: 2^{Nlog2}
Bandwidth: {BW}MHz
Ramp time: {ramp_time}ms
Output frequency: {output_freq}MHz
IF: {signal_freq}kHz
""".format(
sample_rate=sample_rate / 1e6,
Nlog2=int(np.log2(fft_size)),
BW=BW / 1e6,
ramp_time=ramp_time / 1e3,
output_freq=output_freq / 1e6,
signal_freq=signal_freq / 1e3,
)
)
# Create a sinewave waveform
fs = int(my_sdr.sample_rate)
print("sample_rate:", fs)
N = int(my_sdr.rx_buffer_size)
fc = int(signal_freq / (fs / N)) * (fs / N)
ts = 1 / float(fs)
t = np.arange(0, N * ts, ts)
i = np.cos(2 * np.pi * t * fc) * 2 ** 14
q = np.sin(2 * np.pi * t * fc) * 2 ** 14
iq = 1 * (i + 1j * q)
# Send data
my_sdr._ctx.set_timeout(0)
my_sdr.tx([iq * 0.5, iq]) # only send data to the 2nd channel (that's all we need)
c = 3e8
default_rf_bw = 500e6
N_frame = fft_size
freq = np.linspace(-fs / 2, fs / 2, int(N_frame))
slope = BW / ramp_time_s
dist = (freq - signal_freq) * c / (4 * slope)
plot_threshold = False
cfar_toggle = False
class Window(QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle("Interactive FFT")
self.setGeometry(100, 100, 800, 800) # (x,y, width, height)
self.setFixedWidth(1600)
self.num_rows = 12
self.setWindowFlag(QtCore.Qt.WindowCloseButtonHint, False) #remove the window's close button
self.UiComponents()
# showing all the widgets
self.show()
# method for components
def UiComponents(self):
widget = QWidget()
global layout
layout = QGridLayout()
# Control Panel
control_label = QLabel("PHASER Simple FMCW Radar")
font = control_label.font()
font.setPointSize(24)
control_label.setFont(font)
font.setPointSize(12)
control_label.setAlignment(Qt.AlignHCenter) # | Qt.AlignVCenter)
layout.addWidget(control_label, 0, 0, 1, 2)
# Check boxes
self.thresh_check = QCheckBox("Plot CFAR Threshold")
font = self.thresh_check.font()
font.setPointSize(10)
self.thresh_check.setFont(font)
self.thresh_check.stateChanged.connect(self.change_thresh)
layout.addWidget(self.thresh_check, 2, 0)
self.cfar_check = QCheckBox("Apply CFAR Threshold")
font = self.cfar_check.font()
self.cfar_check.setFont(font)
self.cfar_check.stateChanged.connect(self.change_cfar)
layout.addWidget(self.cfar_check, 2, 1)
# RF bandwidth slider
self.bw_slider = QSlider(Qt.Horizontal)
self.bw_slider.setMinimum(100)
self.bw_slider.setMaximum(500)
self.bw_slider.setValue(int(default_rf_bw / 1e6))
self.bw_slider.setTickInterval(50)
self.bw_slider.setMaximumWidth(200)
self.bw_slider.setTickPosition(QSlider.TicksBelow)
self.bw_slider.valueChanged.connect(self.get_range_res)
layout.addWidget(self.bw_slider, 4, 0)
self.set_bw = QPushButton("Set RF Bandwidth")
self.set_bw.setMaximumWidth(200)
self.set_bw.pressed.connect(self.set_range_res)
layout.addWidget(self.set_bw, 5, 0, 1, 1)
self.quit_button = QPushButton("Quit")
self.quit_button.pressed.connect(self.end_program)
layout.addWidget(self.quit_button, 30, 0, 4, 4)
#CFAR Sliders
self.cfar_bias = QSlider(Qt.Horizontal)
self.cfar_bias.setMinimum(0)
self.cfar_bias.setMaximum(50)
self.cfar_bias.setValue(36)
self.cfar_bias.setTickInterval(2)
self.cfar_bias.setMaximumWidth(200)
self.cfar_bias.setTickPosition(QSlider.TicksBelow)
self.cfar_bias.valueChanged.connect(self.get_cfar_values)
layout.addWidget(self.cfar_bias, 8, 0)
self.cfar_bias_label = QLabel("CFAR Bias (dB): %0.0f" % (self.cfar_bias.value()))
self.cfar_bias_label.setFont(font)
self.cfar_bias_label.setAlignment(Qt.AlignLeft)
self.cfar_bias_label.setMinimumWidth(100)
self.cfar_bias_label.setMaximumWidth(200)
layout.addWidget(self.cfar_bias_label, 8, 1)
self.cfar_guard = QSlider(Qt.Horizontal)
self.cfar_guard.setMinimum(1)
self.cfar_guard.setMaximum(40)
self.cfar_guard.setValue(27)
self.cfar_guard.setTickInterval(4)
self.cfar_guard.setMaximumWidth(200)
self.cfar_guard.setTickPosition(QSlider.TicksBelow)
self.cfar_guard.valueChanged.connect(self.get_cfar_values)
layout.addWidget(self.cfar_guard, 10, 0)
self.cfar_guard_label = QLabel("Num Guard Cells: %0.0f" % (self.cfar_guard.value()))
self.cfar_guard_label.setFont(font)
self.cfar_guard_label.setAlignment(Qt.AlignLeft)
self.cfar_guard_label.setMinimumWidth(100)
self.cfar_guard_label.setMaximumWidth(200)
layout.addWidget(self.cfar_guard_label, 10, 1)
self.cfar_ref = QSlider(Qt.Horizontal)
self.cfar_ref.setMinimum(1)
self.cfar_ref.setMaximum(100)
self.cfar_ref.setValue(16)
self.cfar_ref.setTickInterval(10)
self.cfar_ref.setMaximumWidth(200)
self.cfar_ref.setTickPosition(QSlider.TicksBelow)
self.cfar_ref.valueChanged.connect(self.get_cfar_values)
layout.addWidget(self.cfar_ref, 12, 0)
self.cfar_ref_label = QLabel("Num Ref Cells: %0.0f" % (self.cfar_ref.value()))
self.cfar_ref_label.setFont(font)
self.cfar_ref_label.setAlignment(Qt.AlignLeft)
self.cfar_ref_label.setMinimumWidth(100)
self.cfar_ref_label.setMaximumWidth(200)
layout.addWidget(self.cfar_ref_label, 12, 1)
# waterfall level slider
self.low_slider = QSlider(Qt.Horizontal)
self.low_slider.setMinimum(-100)
self.low_slider.setMaximum(0)
self.low_slider.setValue(-100)
self.low_slider.setTickInterval(20)
self.low_slider.setMaximumWidth(200)
self.low_slider.setTickPosition(QSlider.TicksBelow)
self.low_slider.valueChanged.connect(self.get_water_levels)
layout.addWidget(self.low_slider, 16, 0)
self.high_slider = QSlider(Qt.Horizontal)
self.high_slider.setMinimum(-100)
self.high_slider.setMaximum(0)
self.high_slider.setValue(0)
self.high_slider.setTickInterval(20)
self.high_slider.setMaximumWidth(200)
self.high_slider.setTickPosition(QSlider.TicksBelow)
self.high_slider.valueChanged.connect(self.get_water_levels)
layout.addWidget(self.high_slider, 18, 0)
self.water_label = QLabel("Waterfall Intensity Levels")
self.water_label.setFont(font)
self.water_label.setAlignment(Qt.AlignCenter)
self.water_label.setMinimumWidth(100)
self.water_label.setMaximumWidth(200)
layout.addWidget(self.water_label, 15, 0,1,1)
self.low_label = QLabel("LOW LEVEL: %0.0f" % (self.low_slider.value()))
self.low_label.setFont(font)
self.low_label.setAlignment(Qt.AlignLeft)
self.low_label.setMinimumWidth(100)
self.low_label.setMaximumWidth(200)
layout.addWidget(self.low_label, 16, 1)
self.high_label = QLabel("HIGH LEVEL: %0.0f" % (self.high_slider.value()))
self.high_label.setFont(font)
self.high_label.setAlignment(Qt.AlignLeft)
self.high_label.setMinimumWidth(100)
self.high_label.setMaximumWidth(200)
layout.addWidget(self.high_label, 18, 1)
self.steer_slider = QSlider(Qt.Horizontal)
self.steer_slider.setMinimum(-80)
self.steer_slider.setMaximum(80)
self.steer_slider.setValue(0)
self.steer_slider.setTickInterval(20)
self.steer_slider.setMaximumWidth(200)
self.steer_slider.setTickPosition(QSlider.TicksBelow)
self.steer_slider.valueChanged.connect(self.get_steer_angle)
layout.addWidget(self.steer_slider, 22, 0)
self.steer_title = QLabel("Receive Steering Angle")
self.steer_title.setFont(font)
self.steer_title.setAlignment(Qt.AlignCenter)
self.steer_title.setMinimumWidth(100)
self.steer_title.setMaximumWidth(200)
layout.addWidget(self.steer_title, 21, 0)
self.steer_label = QLabel("%0.0f DEG" % (self.steer_slider.value()))
self.steer_label.setFont(font)
self.steer_label.setAlignment(Qt.AlignLeft)
self.steer_label.setMinimumWidth(100)
self.steer_label.setMaximumWidth(200)
layout.addWidget(self.steer_label, 22, 1,1,2)
# FFT plot
self.fft_plot = pg.plot()
self.fft_plot.setMinimumWidth(600)
self.fft_curve = self.fft_plot.plot(freq, pen={'color':'y', 'width':2})
self.fft_threshold = self.fft_plot.plot(freq, pen={'color':'r', 'width':8})
title_style = {"size": "20pt"}
label_style = {"color": "#FFF", "font-size": "14pt"}
self.fft_plot.setLabel("bottom", text="Frequency", units="Hz", **label_style)
self.fft_plot.setLabel("left", text="Magnitude", units="dB", **label_style)
self.fft_plot.setTitle("Received Signal - Frequency Spectrum", **title_style)
layout.addWidget(self.fft_plot, 0, 2, self.num_rows, 1)
self.fft_plot.setYRange(-100, -20)
self.fft_plot.setXRange(100e3, 130e3)
# Waterfall plot
self.waterfall = pg.PlotWidget()
self.imageitem = pg.ImageItem()
self.waterfall.addItem(self.imageitem)
# Use a viridis colormap
pos = np.array([0.0, 0.25, 0.5, 0.75, 1.0])
color = np.array([[68, 1, 84,255], [59, 82, 139,255], [33, 145, 140,255], [94, 201, 98,255], [253, 231, 37,255]], dtype=np.ubyte)
lut = pg.ColorMap(pos, color).getLookupTable(0.0, 1.0, 256)
self.imageitem.setLookupTable(lut)
self.imageitem.setLevels([0,1])
# self.imageitem.scale(0.35, sample_rate / (N)) # this is deprecated -- we have to use setTransform instead
tr = QtGui.QTransform()
tr.translate(0,-sample_rate/2)
tr.scale(0.35, sample_rate / (N))
self.imageitem.setTransform(tr)
zoom_freq = 20e3
self.waterfall.setRange(yRange=(signal_freq, signal_freq + zoom_freq))
self.waterfall.setTitle("Waterfall Spectrum", **title_style)
self.waterfall.setLabel("left", "Frequency", units="Hz", **label_style)
self.waterfall.setLabel("bottom", "Time", units="sec", **label_style)
layout.addWidget(self.waterfall, 0 + self.num_rows + 1, 2, self.num_rows, 1)
self.img_array = np.ones((num_slices, fft_size))*(-100)
widget.setLayout(layout)
# setting this widget as central widget of the main window
self.setCentralWidget(widget)
def get_range_res(self):
""" Updates the slider bar label with RF bandwidth and range resolution
Returns:
None
"""
bw = self.bw_slider.value() * 1e6
range_res = c / (2 * bw)
def get_cfar_values(self):
""" Updates the cfar values
Returns:
None
"""
self.cfar_bias_label.setText("CFAR Bias (dB): %0.0f" % (self.cfar_bias.value()))
self.cfar_guard_label.setText("Num Guard Cells: %0.0f" % (self.cfar_guard.value()))
self.cfar_ref_label.setText("Num Ref Cells: %0.0f" % (self.cfar_ref.value()))
def get_water_levels(self):
""" Updates the waterfall intensity levels
Returns:
None
"""
if self.low_slider.value() > self.high_slider.value():
self.low_slider.setValue(self.high_slider.value())
self.low_label.setText("LOW LEVEL: %0.0f" % (self.low_slider.value()))
self.high_label.setText("HIGH LEVEL: %0.0f" % (self.high_slider.value()))
def get_steer_angle(self):
""" Updates the steering angle readout
Returns:
None
"""
self.steer_label.setText("%0.0f DEG" % (self.steer_slider.value()))
phase_delta = (
2
* 3.14159
* 10.25e9
* 0.014
* np.sin(np.radians(self.steer_slider.value()))
/ (3e8)
)
my_phaser.set_beam_phase_diff(np.degrees(phase_delta))
def set_range_res(self):
""" Sets the RF bandwidth
Returns:
None
"""
global dist, slope
bw = self.bw_slider.value() * 1e6
slope = bw / ramp_time_s
dist = (freq - signal_freq) * c / (4 * slope)
my_phaser.freq_dev_range = int(bw / 4) # frequency deviation range in Hz
my_phaser.enable = 0
def end_program(self):
""" Gracefully shutsdown the program and Pluto
Returns:
None
"""
my_sdr.tx_destroy_buffer()
self.close()
def change_thresh(self, state):
""" Toggles between showing cfar threshold values
Args:
state (QtCore.Qt.Checked) : State of check box
Returns:
None
"""
global plot_threshold
plot_state = win.fft_plot.getViewBox().state
if state == QtCore.Qt.Checked:
plot_threshold = True
else:
plot_threshold = False
def change_cfar(self, state):
""" Toggles between enabling/disabling CFAR
Args:
state (QtCore.Qt.Checked) : State of check box
Returns:
None
"""
global cfar_toggle
if state == QtCore.Qt.Checked:
cfar_toggle = True
else:
cfar_toggle = False
# create pyqt5 app
App = QApplication(sys.argv)
# create the instance of our Window
win = Window()
index = 0
def update():
""" Updates the FFT in the window
Returns:
None
"""
global index, plot_threshold, freq, dist
label_style = {"color": "#FFF", "font-size": "14pt"}
data = my_sdr.rx()
data = data[0] + data[1]
win_funct = np.blackman(len(data))
y = data * win_funct
data_fft = np.fft.fft(y, n=fft_size)
sp = np.absolute(data_fft)
sp = np.fft.fftshift(sp)
s_mag = np.abs(sp) / np.sum(win_funct)
s_mag = np.maximum(s_mag, 10 ** (-15))
s_dbfs = 20 * np.log10(s_mag / (2 ** 11))
bias = win.cfar_bias.value()
num_guard_cells = win.cfar_guard.value()
num_ref_cells = win.cfar_ref.value()
cfar_method = 'average'
if (True):
threshold, targets = cfar(s_dbfs, num_guard_cells, num_ref_cells, bias, cfar_method)
s_dbfs_cfar = targets.filled(-200) # fill the values below the threshold with -200 dBFS
s_dbfs_threshold = threshold
win.fft_threshold.setData(freq, s_dbfs_threshold)
if plot_threshold:
win.fft_threshold.setVisible(True)
else:
win.fft_threshold.setVisible(False)
win.img_array = np.roll(win.img_array, 1, axis=0)
if cfar_toggle:
win.fft_curve.setData(freq, s_dbfs_cfar)
win.img_array[0] = s_dbfs_cfar
else:
win.fft_curve.setData(freq, s_dbfs)
win.img_array[0] = s_dbfs
win.imageitem.setLevels([win.low_slider.value(), win.high_slider.value()])
win.imageitem.setImage(win.img_array, autoLevels=False)
if index == 1:
win.fft_plot.enableAutoRange("xy", False)
index = index + 1
timer = QtCore.QTimer()
timer.timeout.connect(update)
timer.start(0)
# start the app
sys.exit(App.exec())

302
CW_RADAR_Waterfall.py Normal file
View File

@@ -0,0 +1,302 @@
#!/usr/bin/env python3
# Must use Python 3
# Copyright (C) 2022 Analog Devices, Inc.
#
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
# - Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# - Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in
# the documentation and/or other materials provided with the
# distribution.
# - Neither the name of Analog Devices, Inc. nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
# - The use of this software may or may not infringe the patent rights
# of one or more patent holders. This license does not release you
# from the requirement that you obtain separate licenses from these
# patent holders to use this software.
# - Use of the software either in source or binary form, must be run
# on or directly connected to an Analog Devices Inc. component.
#
# THIS SOFTWARE IS PROVIDED BY ANALOG DEVICES "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
# INCLUDING, BUT NOT LIMITED TO, NON-INFRINGEMENT, MERCHANTABILITY AND FITNESS FOR A
# PARTICULAR PURPOSE ARE DISCLAIMED.
#
# IN NO EVENT SHALL ANALOG DEVICES BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, INTELLECTUAL PROPERTY
# RIGHTS, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
# BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
# STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
# THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
'''CW Radar Demo with Phaser (CN0566)
Jon Kraft, Nov 19 2023'''
# Imports
import adi
import sys
import time
import matplotlib.pyplot as plt
import numpy as np
import pyqtgraph as pg
from PyQt5.QtCore import Qt
from PyQt5.QtWidgets import *
from pyqtgraph.Qt import QtCore, QtGui
# Instantiate all the Devices
rpi_ip = "ip:phaser.local" # IP address of the Raspberry Pi
sdr_ip = "ip:192.168.2.1" # "192.168.2.1, or pluto.local" # IP address of the Transceiver Block
my_sdr = adi.ad9361(uri=sdr_ip)
my_phaser = adi.CN0566(uri=rpi_ip, sdr=my_sdr)
# Initialize both ADAR1000s, set gains to max, and all phases to 0
my_phaser.configure(device_mode="rx")
my_phaser.load_gain_cal()
my_phaser.load_phase_cal()
for i in range(0, 8):
my_phaser.set_chan_phase(i, 0)
gain_list = [8, 34, 84, 127, 127, 84, 34, 8] # Blackman taper
for i in range(0, len(gain_list)):
my_phaser.set_chan_gain(i, gain_list[i], apply_cal=True)
# Setup Raspberry Pi GPIO states
try:
my_phaser._gpios.gpio_tx_sw = 0 # 0 = TX_OUT_2, 1 = TX_OUT_1
my_phaser._gpios.gpio_vctrl_1 = 1 # 1=Use onboard PLL/LO source (0=disable PLL and VCO, and set switch to use external LO input)
my_phaser._gpios.gpio_vctrl_2 = 1 # 1=Send LO to transmit circuitry (0=disable Tx path, and send LO to LO_OUT)
except:
my_phaser.gpios.gpio_tx_sw = 0 # 0 = TX_OUT_2, 1 = TX_OUT_1
my_phaser.gpios.gpio_vctrl_1 = 1 # 1=Use onboard PLL/LO source (0=disable PLL and VCO, and set switch to use external LO input)
my_phaser.gpios.gpio_vctrl_2 = 1 # 1=Send LO to transmit circuitry (0=disable Tx path, and send LO to LO_OUT)
sample_rate = 0.6e6
center_freq = 2.1e9
signal_freq = 100e3
num_slices = 50
fft_size = 1024 * 64
img_array = np.ones((num_slices, fft_size))*(-100)
# Configure SDR Rx
my_sdr.sample_rate = int(sample_rate)
my_sdr.rx_lo = int(center_freq) # set this to output_freq - (the freq of the HB100)
my_sdr.rx_enabled_channels = [0, 1] # enable Rx1 (voltage0) and Rx2 (voltage1)
my_sdr.rx_buffer_size = int(fft_size)
my_sdr.gain_control_mode_chan0 = "manual" # manual or slow_attack
my_sdr.gain_control_mode_chan1 = "manual" # manual or slow_attack
my_sdr.rx_hardwaregain_chan0 = int(30) # must be between -3 and 70
my_sdr.rx_hardwaregain_chan1 = int(30) # must be between -3 and 70
# Configure SDR Tx
my_sdr.tx_lo = int(center_freq)
my_sdr.tx_enabled_channels = [0, 1]
my_sdr.tx_cyclic_buffer = True # must set cyclic buffer to true for the tdd burst mode. Otherwise Tx will turn on and off randomly
my_sdr.tx_hardwaregain_chan0 = -88 # must be between 0 and -88
my_sdr.tx_hardwaregain_chan1 = -0 # must be between 0 and -88
# Configure the ADF4159 Rampling PLL
output_freq = 12.145e9
my_phaser.frequency = int(output_freq / 4) # Output frequency divided by 4
my_phaser.ramp_mode = "disabled" # ramp_mode can be: "disabled", "continuous_sawtooth", "continuous_triangular", "single_sawtooth_burst", "single_ramp_burst"
my_phaser.enable = 0 # 0 = PLL enable. Write this last to update all the registers
# Create a sinewave waveform
fs = int(my_sdr.sample_rate)
N = int(my_sdr.rx_buffer_size)
fc = int(signal_freq / (fs / N)) * (fs / N)
ts = 1 / float(fs)
t = np.arange(0, N * ts, ts)
i = np.cos(2 * np.pi * t * fc) * 2 ** 14
q = np.sin(2 * np.pi * t * fc) * 2 ** 14
iq = 1 * (i + 1j * q)
# Send data
my_sdr._ctx.set_timeout(0)
my_sdr.tx([iq * 0.5, iq]) # only send data to the 2nd channel (that's all we need)
c = 3e8
N_frame = fft_size
freq = np.linspace(-fs / 2, fs / 2, int(N_frame))
class Window(QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle("Interactive FFT")
self.setGeometry(100, 100, 800, 800) # (x,y, width, height)
self.setFixedWidth(1600)
self.num_rows = 12
self.setWindowFlag(QtCore.Qt.WindowCloseButtonHint, False) #remove the window's close button
self.UiComponents()
# showing all the widgets
self.show()
# method for components
def UiComponents(self):
widget = QWidget()
global layout
layout = QGridLayout()
# Control Panel
control_label = QLabel("PHASER Simple CW Radar")
font = control_label.font()
font.setPointSize(24)
control_label.setFont(font)
font.setPointSize(12)
control_label.setAlignment(Qt.AlignHCenter) # | Qt.AlignVCenter)
layout.addWidget(control_label, 0, 0, 1, 2)
# Buttons
self.quit_button = QPushButton("Quit")
self.quit_button.pressed.connect(self.end_program)
layout.addWidget(self.quit_button, 30, 0, 4, 4)
# waterfall level slider
self.low_slider = QSlider(Qt.Horizontal)
self.low_slider.setMinimum(-100)
self.low_slider.setMaximum(0)
self.low_slider.setValue(-40)
self.low_slider.setTickInterval(20)
self.low_slider.setMaximumWidth(200)
self.low_slider.setTickPosition(QSlider.TicksBelow)
self.low_slider.valueChanged.connect(self.get_water_levels)
layout.addWidget(self.low_slider, 8, 0)
self.high_slider = QSlider(Qt.Horizontal)
self.high_slider.setMinimum(-100)
self.high_slider.setMaximum(0)
self.high_slider.setValue(-28)
self.high_slider.setTickInterval(20)
self.high_slider.setMaximumWidth(200)
self.high_slider.setTickPosition(QSlider.TicksBelow)
self.high_slider.valueChanged.connect(self.get_water_levels)
layout.addWidget(self.high_slider, 10, 0)
self.water_label = QLabel("Waterfall Intensity Levels")
self.water_label.setFont(font)
self.water_label.setAlignment(Qt.AlignCenter)
self.water_label.setMinimumWidth(300)
layout.addWidget(self.water_label, 7, 0)
self.low_label = QLabel("LOW LEVEL: %0.0f" % (self.low_slider.value()))
self.low_label.setFont(font)
self.low_label.setAlignment(Qt.AlignLeft)
self.low_label.setMinimumWidth(100)
layout.addWidget(self.low_label, 8, 1)
self.high_label = QLabel("HIGH LEVEL: %0.0f" % (self.high_slider.value()))
self.high_label.setFont(font)
self.high_label.setAlignment(Qt.AlignLeft)
self.high_label.setMinimumWidth(100)
layout.addWidget(self.high_label, 10, 1)
# FFT plot
self.fft_plot = pg.plot()
self.fft_plot.setMinimumWidth(600)
self.fft_curve = self.fft_plot.plot(freq, pen={'color':'y', 'width':2})
title_style = {"size": "20pt"}
label_style = {"color": "#FFF", "font-size": "14pt"}
self.fft_plot.setLabel("bottom", text="Frequency", units="Hz", **label_style)
self.fft_plot.setLabel("left", text="Magnitude", units="dB", **label_style)
self.fft_plot.setTitle("Received Signal - Frequency Spectrum", **title_style)
layout.addWidget(self.fft_plot, 0, 2, self.num_rows, 1)
self.fft_plot.setYRange(-60, 0)
self.fft_plot.setXRange(99e3, 101e3)
# Waterfall plot
self.waterfall = pg.PlotWidget()
self.imageitem = pg.ImageItem()
self.waterfall.addItem(self.imageitem)
# Use a viridis colormap
pos = np.array([0.0, 0.25, 0.5, 0.75, 1.0])
color = np.array([[68, 1, 84,255], [59, 82, 139,255], [33, 145, 140,255], [94, 201, 98,255], [253, 231, 37,255]], dtype=np.ubyte)
lut = pg.ColorMap(pos, color).getLookupTable(0.0, 1.0, 256)
self.imageitem.setLookupTable(lut)
self.imageitem.setLevels([0,1])
# self.imageitem.scale(0.35, sample_rate / (N)) # this is deprecated -- we have to use setTransform instead
tr = QtGui.QTransform()
tr.translate(0,-sample_rate/2)
tr.scale(0.35, sample_rate / (N))
self.imageitem.setTransform(tr)
zoom_freq = 0.3e3
self.waterfall.setRange(yRange=(signal_freq - zoom_freq, signal_freq + zoom_freq))
self.waterfall.setTitle("Waterfall Spectrum", **title_style)
self.waterfall.setLabel("left", "Frequency", units="Hz", **label_style)
self.waterfall.setLabel("bottom", "Time", units="sec", **label_style)
layout.addWidget(self.waterfall, 0 + self.num_rows + 1, 2, self.num_rows, 1)
self.img_array = np.ones((num_slices, fft_size))*(-100)
widget.setLayout(layout)
# setting this widget as central widget of the main window
self.setCentralWidget(widget)
def get_water_levels(self):
""" Updates the waterfall intensity levels
Returns:
None
"""
if self.low_slider.value() > self.high_slider.value():
self.low_slider.setValue(self.high_slider.value())
self.low_label.setText("LOW LEVEL: %0.0f" % (self.low_slider.value()))
self.high_label.setText("HIGH LEVEL: %0.0f" % (self.high_slider.value()))
def end_program(self):
""" Gracefully shutsdown the program and Pluto
Returns:
None
"""
my_sdr.tx_destroy_buffer()
self.close()
# create pyqt5 app
App = QApplication(sys.argv)
# create the instance of our Window
win = Window()
index = 0
def update():
""" Updates the FFT in the window
Returns:
None
"""
global index, freq, dist
label_style = {"color": "#FFF", "font-size": "14pt"}
data = my_sdr.rx()
data = data[0] + data[1]
win_funct = np.blackman(len(data))
y = data * win_funct
sp = np.absolute(np.fft.fft(y))
sp = np.fft.fftshift(sp)
s_mag = np.abs(sp) / np.sum(win_funct)
s_mag = np.maximum(s_mag, 10 ** (-15))
s_dbfs = 20 * np.log10(s_mag / (2 ** 11))
win.fft_curve.setData(freq, s_dbfs)
win.fft_plot.setLabel("bottom", text="Frequency", units="Hz", **label_style)
win.img_array = np.roll(win.img_array, 1, axis=0)
win.img_array[0] = s_dbfs
win.imageitem.setLevels([win.low_slider.value(), win.high_slider.value()])
win.imageitem.setImage(win.img_array, autoLevels=False)
if index == 1:
win.fft_plot.enableAutoRange("xy", False)
index = index + 1
timer = QtCore.QTimer()
timer.timeout.connect(update)
timer.start(0)
# start the app
sys.exit(App.exec())

486
FMCW_RADAR_Waterfall.py Normal file
View File

@@ -0,0 +1,486 @@
#!/usr/bin/env python3
# Must use Python 3
# Copyright (C) 2022 Analog Devices, Inc.
#
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
# - Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# - Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in
# the documentation and/or other materials provided with the
# distribution.
# - Neither the name of Analog Devices, Inc. nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
# - The use of this software may or may not infringe the patent rights
# of one or more patent holders. This license does not release you
# from the requirement that you obtain separate licenses from these
# patent holders to use this software.
# - Use of the software either in source or binary form, must be run
# on or directly connected to an Analog Devices Inc. component.
#
# THIS SOFTWARE IS PROVIDED BY ANALOG DEVICES "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
# INCLUDING, BUT NOT LIMITED TO, NON-INFRINGEMENT, MERCHANTABILITY AND FITNESS FOR A
# PARTICULAR PURPOSE ARE DISCLAIMED.
#
# IN NO EVENT SHALL ANALOG DEVICES BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, INTELLECTUAL PROPERTY
# RIGHTS, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
# BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
# STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
# THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
'''FMCW Radar Demo with Phaser (CN0566)
Jon Kraft, Nov 19 2023'''
# Imports
import adi
import sys
import time
import matplotlib.pyplot as plt
import numpy as np
import pyqtgraph as pg
from PyQt5.QtCore import Qt
from PyQt5.QtWidgets import *
from pyqtgraph.Qt import QtCore, QtGui
# Instantiate all the Devices
rpi_ip = "ip:phaser.local" # IP address of the Raspberry Pi
sdr_ip = "ip:192.168.2.1" # "192.168.2.1, or pluto.local" # IP address of the Transceiver Block
my_sdr = adi.ad9361(uri=sdr_ip)
my_phaser = adi.CN0566(uri=rpi_ip, sdr=my_sdr)
# Initialize both ADAR1000s, set gains to max, and all phases to 0
my_phaser.configure(device_mode="rx")
my_phaser.load_gain_cal()
my_phaser.load_phase_cal()
for i in range(0, 8):
my_phaser.set_chan_phase(i, 0)
gain_list = [8, 34, 84, 127, 127, 84, 34, 8] # Blackman taper
for i in range(0, len(gain_list)):
my_phaser.set_chan_gain(i, gain_list[i], apply_cal=True)
# Setup Raspberry Pi GPIO states
try:
my_phaser._gpios.gpio_tx_sw = 0 # 0 = TX_OUT_2, 1 = TX_OUT_1
my_phaser._gpios.gpio_vctrl_1 = 1 # 1=Use onboard PLL/LO source (0=disable PLL and VCO, and set switch to use external LO input)
my_phaser._gpios.gpio_vctrl_2 = 1 # 1=Send LO to transmit circuitry (0=disable Tx path, and send LO to LO_OUT)
except:
my_phaser.gpios.gpio_tx_sw = 0 # 0 = TX_OUT_2, 1 = TX_OUT_1
my_phaser.gpios.gpio_vctrl_1 = 1 # 1=Use onboard PLL/LO source (0=disable PLL and VCO, and set switch to use external LO input)
my_phaser.gpios.gpio_vctrl_2 = 1 # 1=Send LO to transmit circuitry (0=disable Tx path, and send LO to LO_OUT)
sample_rate = 0.6e6
center_freq = 2.1e9
signal_freq = 100e3
num_slices = 400
fft_size = 1024 * 4
img_array = np.ones((num_slices, fft_size))*(-100)
# Configure SDR Rx
my_sdr.sample_rate = int(sample_rate)
my_sdr.rx_lo = int(center_freq) # set this to output_freq - (the freq of the HB100)
my_sdr.rx_enabled_channels = [0, 1] # enable Rx1 (voltage0) and Rx2 (voltage1)
my_sdr.rx_buffer_size = int(fft_size)
my_sdr.gain_control_mode_chan0 = "manual" # manual or slow_attack
my_sdr.gain_control_mode_chan1 = "manual" # manual or slow_attack
my_sdr.rx_hardwaregain_chan0 = int(30) # must be between -3 and 70
my_sdr.rx_hardwaregain_chan1 = int(30) # must be between -3 and 70
# Configure SDR Tx
my_sdr.tx_lo = int(center_freq)
my_sdr.tx_enabled_channels = [0, 1]
my_sdr.tx_cyclic_buffer = True # must set cyclic buffer to true for the tdd burst mode. Otherwise Tx will turn on and off randomly
my_sdr.tx_hardwaregain_chan0 = -88 # must be between 0 and -88
my_sdr.tx_hardwaregain_chan1 = -0 # must be between 0 and -88
# Configure the ADF4159 Rampling PLL
output_freq = 12.145e9
BW = 500e6
num_steps = 1000
ramp_time = 1.2e3 # us
ramp_time_s = ramp_time / 1e6
my_phaser.frequency = int(output_freq / 4) # Output frequency divided by 4
my_phaser.freq_dev_range = int(
BW / 4
) # frequency deviation range in Hz. This is the total freq deviation of the complete freq ramp
my_phaser.freq_dev_step = int(
(BW/4) / num_steps
) # frequency deviation step in Hz. This is fDEV, in Hz. Can be positive or negative
my_phaser.freq_dev_time = int(
ramp_time
) # total time (in us) of the complete frequency ramp
print("requested freq dev time = ", ramp_time)
print("actual freq dev time = ", my_phaser.freq_dev_time)
my_phaser.delay_word = 4095 # 12 bit delay word. 4095*PFD = 40.95 us. For sawtooth ramps, this is also the length of the Ramp_complete signal
my_phaser.delay_clk = "PFD" # can be 'PFD' or 'PFD*CLK1'
my_phaser.delay_start_en = 0 # delay start
my_phaser.ramp_delay_en = 0 # delay between ramps.
my_phaser.trig_delay_en = 0 # triangle delay
my_phaser.ramp_mode = "continuous_triangular" # ramp_mode can be: "disabled", "continuous_sawtooth", "continuous_triangular", "single_sawtooth_burst", "single_ramp_burst"
my_phaser.sing_ful_tri = (
0 # full triangle enable/disable -- this is used with the single_ramp_burst mode
)
my_phaser.tx_trig_en = 0 # start a ramp with TXdata
my_phaser.enable = 0 # 0 = PLL enable. Write this last to update all the registers
# Print config
print(
"""
CONFIG:
Sample rate: {sample_rate}MHz
Num samples: 2^{Nlog2}
Bandwidth: {BW}MHz
Ramp time: {ramp_time}ms
Output frequency: {output_freq}MHz
IF: {signal_freq}kHz
""".format(
sample_rate=sample_rate / 1e6,
Nlog2=int(np.log2(fft_size)),
BW=BW / 1e6,
ramp_time=ramp_time / 1e3,
output_freq=output_freq / 1e6,
signal_freq=signal_freq / 1e3,
)
)
# Create a sinewave waveform
fs = int(my_sdr.sample_rate)
print("sample_rate:", fs)
N = int(my_sdr.rx_buffer_size)
fc = int(signal_freq / (fs / N)) * (fs / N)
ts = 1 / float(fs)
t = np.arange(0, N * ts, ts)
i = np.cos(2 * np.pi * t * fc) * 2 ** 14
q = np.sin(2 * np.pi * t * fc) * 2 ** 14
iq = 1 * (i + 1j * q)
# Send data
my_sdr._ctx.set_timeout(0)
my_sdr.tx([iq * 0.5, iq]) # only send data to the 2nd channel (that's all we need)
c = 3e8
default_rf_bw = 500e6
N_frame = fft_size
freq = np.linspace(-fs / 2, fs / 2, int(N_frame))
slope = BW / ramp_time_s
dist = (freq - signal_freq) * c / (4 * slope)
plot_dist = False
class Window(QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle("Interactive FFT")
self.setGeometry(100, 100, 800, 800) # (x,y, width, height)
self.setFixedWidth(1600)
self.num_rows = 12
self.setWindowFlag(QtCore.Qt.WindowCloseButtonHint, False) #remove the window's close button
self.UiComponents()
# showing all the widgets
self.show()
# method for components
def UiComponents(self):
widget = QWidget()
global layout
layout = QGridLayout()
# Control Panel
control_label = QLabel("PHASER Simple FMCW Radar")
font = control_label.font()
font.setPointSize(24)
control_label.setFont(font)
font.setPointSize(12)
control_label.setAlignment(Qt.AlignHCenter) # | Qt.AlignVCenter)
layout.addWidget(control_label, 0, 0, 1, 2)
# Check boxes
self.x_axis_check = QCheckBox("Toggle Range/Frequency x-axis")
font = self.x_axis_check.font()
font.setPointSize(10)
self.x_axis_check.setFont(font)
self.x_axis_check.stateChanged.connect(self.change_x_axis)
layout.addWidget(self.x_axis_check, 2, 0)
# Range resolution
# Changes with the RF BW slider
self.range_res_label = QLabel(
"B<sub>RF</sub>: %0.2f MHz - R<sub>res</sub>: %0.2f m"
% (default_rf_bw / 1e6, c / (2 * default_rf_bw))
)
font = self.range_res_label.font()
font.setPointSize(10)
self.range_res_label.setFont(font)
self.range_res_label.setAlignment(Qt.AlignLeft)
self.range_res_label.setMaximumWidth(200)
self.range_res_label.setMinimumWidth(100)
layout.addWidget(self.range_res_label, 4, 1)
# RF bandwidth slider
self.bw_slider = QSlider(Qt.Horizontal)
self.bw_slider.setMinimum(100)
self.bw_slider.setMaximum(500)
self.bw_slider.setValue(int(default_rf_bw / 1e6))
self.bw_slider.setTickInterval(50)
self.bw_slider.setMaximumWidth(200)
self.bw_slider.setTickPosition(QSlider.TicksBelow)
self.bw_slider.valueChanged.connect(self.get_range_res)
layout.addWidget(self.bw_slider, 4, 0)
self.set_bw = QPushButton("Set RF Bandwidth")
self.set_bw.setMaximumWidth(200)
self.set_bw.pressed.connect(self.set_range_res)
layout.addWidget(self.set_bw, 5, 0, 1, 1)
self.quit_button = QPushButton("Quit")
self.quit_button.pressed.connect(self.end_program)
layout.addWidget(self.quit_button, 30, 0, 4, 4)
# waterfall level slider
self.low_slider = QSlider(Qt.Horizontal)
self.low_slider.setMinimum(-100)
self.low_slider.setMaximum(0)
self.low_slider.setValue(-43)
self.low_slider.setTickInterval(20)
self.low_slider.setMaximumWidth(200)
self.low_slider.setTickPosition(QSlider.TicksBelow)
self.low_slider.valueChanged.connect(self.get_water_levels)
layout.addWidget(self.low_slider, 8, 0)
self.high_slider = QSlider(Qt.Horizontal)
self.high_slider.setMinimum(-100)
self.high_slider.setMaximum(0)
self.high_slider.setValue(-23)
self.high_slider.setTickInterval(20)
self.high_slider.setMaximumWidth(200)
self.high_slider.setTickPosition(QSlider.TicksBelow)
self.high_slider.valueChanged.connect(self.get_water_levels)
layout.addWidget(self.high_slider, 10, 0)
self.water_label = QLabel("Waterfall Intensity Levels")
self.water_label.setFont(font)
self.water_label.setAlignment(Qt.AlignCenter)
self.water_label.setMinimumWidth(100)
self.water_label.setMaximumWidth(200)
layout.addWidget(self.water_label, 7, 0,1,1)
self.low_label = QLabel("LOW LEVEL: %0.0f" % (self.low_slider.value()))
self.low_label.setFont(font)
self.low_label.setAlignment(Qt.AlignLeft)
self.low_label.setMinimumWidth(100)
self.low_label.setMaximumWidth(200)
layout.addWidget(self.low_label, 8, 1)
self.high_label = QLabel("HIGH LEVEL: %0.0f" % (self.high_slider.value()))
self.high_label.setFont(font)
self.high_label.setAlignment(Qt.AlignLeft)
self.high_label.setMinimumWidth(100)
self.high_label.setMaximumWidth(200)
layout.addWidget(self.high_label, 10, 1)
self.steer_slider = QSlider(Qt.Horizontal)
self.steer_slider.setMinimum(-80)
self.steer_slider.setMaximum(80)
self.steer_slider.setValue(0)
self.steer_slider.setTickInterval(20)
self.steer_slider.setMaximumWidth(200)
self.steer_slider.setTickPosition(QSlider.TicksBelow)
self.steer_slider.valueChanged.connect(self.get_steer_angle)
layout.addWidget(self.steer_slider, 14, 0)
self.steer_title = QLabel("Receive Steering Angle")
self.steer_title.setFont(font)
self.steer_title.setAlignment(Qt.AlignCenter)
self.steer_title.setMinimumWidth(100)
self.steer_title.setMaximumWidth(200)
layout.addWidget(self.steer_title, 13, 0)
self.steer_label = QLabel("%0.0f DEG" % (self.steer_slider.value()))
self.steer_label.setFont(font)
self.steer_label.setAlignment(Qt.AlignLeft)
self.steer_label.setMinimumWidth(100)
self.steer_label.setMaximumWidth(200)
layout.addWidget(self.steer_label, 14, 1,1,2)
# FFT plot
self.fft_plot = pg.plot()
self.fft_plot.setMinimumWidth(600)
self.fft_curve = self.fft_plot.plot(freq, pen={'color':'y', 'width':2})
title_style = {"size": "20pt"}
label_style = {"color": "#FFF", "font-size": "14pt"}
self.fft_plot.setLabel("bottom", text="Frequency", units="Hz", **label_style)
self.fft_plot.setLabel("left", text="Magnitude", units="dB", **label_style)
self.fft_plot.setTitle("Received Signal - Frequency Spectrum", **title_style)
layout.addWidget(self.fft_plot, 0, 2, self.num_rows, 1)
self.fft_plot.setYRange(-60, 0)
self.fft_plot.setXRange(100e3, 200e3)
# Waterfall plot
self.waterfall = pg.PlotWidget()
self.imageitem = pg.ImageItem()
self.waterfall.addItem(self.imageitem)
# Use a viridis colormap
pos = np.array([0.0, 0.25, 0.5, 0.75, 1.0])
color = np.array([[68, 1, 84,255], [59, 82, 139,255], [33, 145, 140,255], [94, 201, 98,255], [253, 231, 37,255]], dtype=np.ubyte)
lut = pg.ColorMap(pos, color).getLookupTable(0.0, 1.0, 256)
self.imageitem.setLookupTable(lut)
self.imageitem.setLevels([0,1])
# self.imageitem.scale(0.35, sample_rate / (N)) # this is deprecated -- we have to use setTransform instead
tr = QtGui.QTransform()
tr.translate(0,-sample_rate/2)
tr.scale(0.35, sample_rate / (N))
self.imageitem.setTransform(tr)
zoom_freq = 20e3
self.waterfall.setRange(yRange=(signal_freq, signal_freq + zoom_freq))
self.waterfall.setTitle("Waterfall Spectrum", **title_style)
self.waterfall.setLabel("left", "Frequency", units="Hz", **label_style)
self.waterfall.setLabel("bottom", "Time", units="sec", **label_style)
layout.addWidget(self.waterfall, 0 + self.num_rows + 1, 2, self.num_rows, 1)
self.img_array = np.ones((num_slices, fft_size))*(-100)
widget.setLayout(layout)
# setting this widget as central widget of the main window
self.setCentralWidget(widget)
def get_range_res(self):
""" Updates the slider bar label with RF bandwidth and range resolution
Returns:
None
"""
bw = self.bw_slider.value() * 1e6
range_res = c / (2 * bw)
self.range_res_label.setText(
"B<sub>RF</sub>: %0.2f MHz - R<sub>res</sub>: %0.2f m"
% (bw / 1e6, c / (2 * bw))
)
def get_water_levels(self):
""" Updates the waterfall intensity levels
Returns:
None
"""
if self.low_slider.value() > self.high_slider.value():
self.low_slider.setValue(self.high_slider.value())
self.low_label.setText("LOW LEVEL: %0.0f" % (self.low_slider.value()))
self.high_label.setText("HIGH LEVEL: %0.0f" % (self.high_slider.value()))
def get_steer_angle(self):
""" Updates the steering angle readout
Returns:
None
"""
self.steer_label.setText("%0.0f DEG" % (self.steer_slider.value()))
phase_delta = (
2
* 3.14159
* 10.25e9
* 0.014
* np.sin(np.radians(self.steer_slider.value()))
/ (3e8)
)
my_phaser.set_beam_phase_diff(np.degrees(phase_delta))
def set_range_res(self):
""" Sets the RF bandwidth
Returns:
None
"""
global dist, slope
bw = self.bw_slider.value() * 1e6
slope = bw / ramp_time_s
dist = (freq - signal_freq) * c / (4 * slope)
print("New slope: %0.2fMHz/s" % (slope / 1e6))
if self.x_axis_check.isChecked() == True:
print("Range axis")
plot_dist = True
range_x = (100e3) * c / (4 * slope)
self.fft_plot.setXRange(0, range_x)
else:
print("Frequency axis")
plot_dist = False
self.fft_plot.setXRange(100e3, 140e3)
my_phaser.freq_dev_range = int(bw / 4) # frequency deviation range in Hz
my_phaser.enable = 0
def end_program(self):
""" Gracefully shutsdown the program and Pluto
Returns:
None
"""
my_sdr.tx_destroy_buffer()
self.close()
def change_x_axis(self, state):
""" Toggles between showing frequency and range for the x-axis
Args:
state (QtCore.Qt.Checked) : State of check box
Returns:
None
"""
global plot_dist, slope
plot_state = win.fft_plot.getViewBox().state
if state == QtCore.Qt.Checked:
print("Range axis")
plot_dist = True
range_x = (100e3) * c / (4 * slope)
self.fft_plot.setXRange(0, range_x)
else:
print("Frequency axis")
plot_dist = False
self.fft_plot.setXRange(100e3, 200e3)
# create pyqt5 app
App = QApplication(sys.argv)
# create the instance of our Window
win = Window()
index = 0
def update():
""" Updates the FFT in the window
Returns:
None
"""
global index, plot_dist, freq, dist
label_style = {"color": "#FFF", "font-size": "14pt"}
data = my_sdr.rx()
data = data[0] + data[1]
win_funct = np.blackman(len(data))
y = data * win_funct
sp = np.absolute(np.fft.fft(y))
sp = np.fft.fftshift(sp)
s_mag = np.abs(sp) / np.sum(win_funct)
s_mag = np.maximum(s_mag, 10 ** (-15))
s_dbfs = 20 * np.log10(s_mag / (2 ** 11))
if plot_dist:
win.fft_curve.setData(dist, s_dbfs)
win.fft_plot.setLabel("bottom", text="Distance", units="m", **label_style)
else:
win.fft_curve.setData(freq, s_dbfs)
win.fft_plot.setLabel("bottom", text="Frequency", units="Hz", **label_style)
win.img_array = np.roll(win.img_array, 1, axis=0)
win.img_array[0] = s_dbfs
win.imageitem.setLevels([win.low_slider.value(), win.high_slider.value()])
win.imageitem.setImage(win.img_array, autoLevels=False)
if index == 1:
win.fft_plot.enableAutoRange("xy", False)
index = index + 1
timer = QtCore.QTimer()
timer.timeout.connect(update)
timer.start(0)
# start the app
sys.exit(App.exec())

314
Range_Doppler_Plot.py Normal file
View File

@@ -0,0 +1,314 @@
# %%
# Copyright (C) 2019 Analog Devices, Inc.
#
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
# - Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# - Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in
# the documentation and/or other materials provided with the
# distribution.
# - Neither the name of Analog Devices, Inc. nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
# - The use of this software may or may not infringe the patent rights
# of one or more patent holders. This license does not release you
# from the requirement that you obtain separate licenses from these
# patent holders to use this software.
# - Use of the software either in source or binary form, must be run
# on or directly connected to an Analog Devices Inc. component.
#
# THIS SOFTWARE IS PROVIDED BY ANALOG DEVICES "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
# INCLUDING, BUT NOT LIMITED TO, NON-INFRINGEMENT, MERCHANTABILITY AND FITNESS FOR A
# PARTICULAR PURPOSE ARE DISCLAIMED.
#
# IN NO EVENT SHALL ANALOG DEVICES BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, INTELLECTUAL PROPERTY
# RIGHTS, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
# BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
# STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
# THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
'''This script uses the new Pluto TDD engine
As of Nov 2023, this is in the "dev_phaser_merge" branch of https://github.com/analogdevicesinc/pyadi-iio
Also, make sure your Pluto firmware is updated to rev 0.38 (or later)
To view the plot properly in Spyder, select Tools->Preferences, IPython Console, and change Graphics backend from Inline to Automatic
'''
# Imports
import adi
import matplotlib
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
import numpy as np
import time
# Instantiate all the Devices
rpi_ip = "ip:phaser.local" # IP address of the Raspberry Pi
sdr_ip = "ip:192.168.2.1" # "192.168.2.1, or pluto.local" # IP address of the Transceiver Block
my_sdr = adi.ad9361(uri=sdr_ip)
my_phaser = adi.CN0566(uri=rpi_ip, sdr=my_sdr)
# Initialize both ADAR1000s, set gains to max, and all phases to 0
my_phaser.configure(device_mode="rx")
my_phaser.load_gain_cal()
my_phaser.load_phase_cal()
for i in range(0, 8):
my_phaser.set_chan_phase(i, 0)
gain_list = [8, 34, 84, 127, 127, 84, 34, 8] # Blackman taper
for i in range(0, len(gain_list)):
my_phaser.set_chan_gain(i, gain_list[i], apply_cal=True)
# Setup Raspberry Pi GPIO states
try:
my_phaser._gpios.gpio_tx_sw = 0 # 0 = TX_OUT_2, 1 = TX_OUT_1
my_phaser._gpios.gpio_vctrl_1 = 1 # 1=Use onboard PLL/LO source (0=disable PLL and VCO, and set switch to use external LO input)
my_phaser._gpios.gpio_vctrl_2 = 1 # 1=Send LO to transmit circuitry (0=disable Tx path, and send LO to LO_OUT)
except:
my_phaser.gpios.gpio_tx_sw = 0 # 0 = TX_OUT_2, 1 = TX_OUT_1
my_phaser.gpios.gpio_vctrl_1 = 1 # 1=Use onboard PLL/LO source (0=disable PLL and VCO, and set switch to use external LO input)
my_phaser.gpios.gpio_vctrl_2 = 1 # 1=Send LO to transmit circuitry (0=disable Tx path, and send LO to LO_OUT)
# Parameters
sample_rate = 20e6
center_freq = 2.1e9
signal_freq = 0.1e6
# Configure SDR Rx
my_sdr.sample_rate = int(sample_rate)
my_sdr.rx_lo = int(center_freq) # set this to output_freq - (the freq of the HB100)
my_sdr.rx_enabled_channels = [0, 1] # enable Rx1 (voltage0) and Rx2 (voltage1)
my_sdr.gain_control_mode_chan0 = 'manual' # manual or slow_attack
my_sdr.gain_control_mode_chan1 = 'manual' # manual or slow_attack
my_sdr.rx_hardwaregain_chan0 = int(30) # must be between -3 and 70
my_sdr.rx_hardwaregain_chan1 = int(30) # must be between -3 and 70
# Configure SDR Tx
my_sdr.tx_lo = int(center_freq)
my_sdr.tx_enabled_channels = [0, 1]
my_sdr.tx_cyclic_buffer = True # must set cyclic buffer to true for the tdd burst mode. Otherwise Tx will turn on and off randomly
my_sdr.tx_hardwaregain_chan0 = -88 # must be between 0 and -88
my_sdr.tx_hardwaregain_chan1 = -0 # must be between 0 and -88
# Read properties
print("RX LO %s" % (my_sdr.rx_lo))
# Configure the ADF4159 Rampling PLL
output_freq = 12.145e9
BW = 500e6
num_steps = 200
ramp_time = 0.2e3 # us
my_phaser.frequency = int(output_freq / 4) # Output frequency divided by 4
my_phaser.freq_dev_range = int(
BW / 4
) # frequency deviation range in Hz. This is the total freq deviation of the complete freq ramp
my_phaser.freq_dev_step = int(
(BW/4) / num_steps
) # frequency deviation step in Hz. This is fDEV, in Hz. Can be positive or negative
my_phaser.freq_dev_time = int(
ramp_time
) # total time (in us) of the complete frequency ramp
print("requested freq dev time (us) = ", ramp_time)
ramp_time = my_phaser.freq_dev_time
print("actual freq dev time (us) = ", ramp_time)
my_phaser.delay_word = 4095 # 12 bit delay word. 4095*PFD = 40.95 us. For sawtooth ramps, this is also the length of the Ramp_complete signal
my_phaser.delay_clk = "PFD" # can be 'PFD' or 'PFD*CLK1'
my_phaser.delay_start_en = 0 # delay start
my_phaser.ramp_delay_en = 0 # delay between ramps.
my_phaser.trig_delay_en = 0 # triangle delay
my_phaser.ramp_mode = "single_sawtooth_burst" # ramp_mode can be: "disabled", "continuous_sawtooth", "continuous_triangular", "single_sawtooth_burst", "single_ramp_burst"
my_phaser.sing_ful_tri = (
0 # full triangle enable/disable -- this is used with the single_ramp_burst mode
)
my_phaser.tx_trig_en = 1 # start a ramp with TXdata
my_phaser.enable = 0 # 0 = PLL enable. Write this last to update all the registers
# %%
# Configure TDD controller
sdr_pins = adi.one_bit_adc_dac(sdr_ip)
sdr_pins.gpio_tdd_ext_sync = True # If set to True, this enables external capture triggering using the L24N GPIO on the Pluto. When set to false, an internal trigger pulse will be generated every second
tdd = adi.tddn(sdr_ip)
sdr_pins.gpio_phaser_enable = True
tdd.enable = False # disable TDD to configure the registers
tdd.sync_external = True
tdd.startup_delay_ms = 1
tdd.frame_length_ms = ramp_time/1e3 + 0.2 # each GPIO toggle is spaced this far apart
tdd.burst_count = 128 # number of chirps in one continuous receive buffer
tdd.out_channel0_enable = True
tdd.out_channel0_polarity = False
tdd.out_channel0_on_ms = 0.01 # each GPIO pulse will be 100us (0.6ms - 0.5ms). And the first trigger will happen 0.5ms into the buffer
tdd.out_channel0_off_ms = 0.2
tdd.out_channel1_enable = True
tdd.out_channel1_polarity = False
tdd.out_channel1_on_ms = 0
tdd.out_channel1_off_ms = 0.1
tdd.out_channel2_enable = False
tdd.enable = True
# buffer size needs to be greater than the frame_time
frame_time = tdd.frame_length_ms*tdd.burst_count # time in ms
print("frame_time: ", frame_time, "ms")
buffer_time = 0
power=12
while frame_time > buffer_time:
power=power+1
buffer_size = int(2**power)
buffer_time = buffer_size/my_sdr.sample_rate*1000 # buffer time in ms
if power==23:
break # max pluto buffer size is 2**23, but for tdd burst mode, set to 2**22
print("buffer_size:", buffer_size)
my_sdr.rx_buffer_size = buffer_size
print("buffer_time:", buffer_time, " ms")
# Create a sinewave waveform
#fs = int(my_sdr.sample_rate)
fs = sample_rate
print("sample_rate:", fs)
N = buffer_size
fc = int(signal_freq / (fs / N)) * (fs / N)
ts = 1 / float(fs)
t = np.arange(0, N * ts, ts)
i = np.cos(2 * np.pi * t * fc) * 2 ** 14
q = np.sin(2 * np.pi * t * fc) * 2 ** 14
iq = 0.9* (i + 1j * q)
my_sdr._ctx.set_timeout(30000)
my_sdr._rx_init_channels()
# Send data
my_sdr.tx([iq, iq])
# %%
PRI = tdd.frame_length_ms / 1e3
PRF = 1 / PRI
num_bursts = tdd.burst_count
# Split into frames
N_frame = int(PRI / ts)
# Obtain range-FFT x-axis
c = 3e8
wavelength = c / (output_freq - center_freq)
ramp_time_s = ramp_time / 1e6
slope = BW / ramp_time_s
freq = np.linspace(-fs / 2, fs / 2, N_frame)
dist = (freq - signal_freq) * c / (2 * slope)
# Resolutions
R_res = c / (2 * BW)
v_res = wavelength / (2 * num_bursts * PRI)
# Doppler spectrum limits
max_doppler_freq = PRF / 2
max_doppler_vel = max_doppler_freq * wavelength / 2
# First ramp starts with some offset (as defined in the TDD section above)
start_offset_time = tdd.out_channel0_on_ms/1e3
# From start of each ramp, how many "good" points do we want?
# For best freq linearity, stay away from the start of the ramps
begin_offset_time = 0.02e-3
good_ramp_time = ramp_time_s - begin_offset_time
good_ramp_samples = int(good_ramp_time * fs)
start_offset_samples = int((start_offset_time+begin_offset_time)*fs)
# %%
range_doppler_fig, ax = plt.subplots(figsize=(16, 16))
extent = [-max_doppler_vel, max_doppler_vel, dist.min(), dist.max()]
# %%
# Collect data
my_phaser.gpios.gpio_burst = 0
my_phaser.gpios.gpio_burst = 1
my_phaser.gpios.gpio_burst = 0
data = my_sdr.rx()
chan1 = data[0]
chan2 = data[1]
sum_data = chan1+chan2
# Process data
# Make a 2D array of the chirps for each burst
rx_bursts = np.zeros((num_bursts, good_ramp_samples), dtype=complex)
for burst in range(num_bursts):
start_index = start_offset_samples + (burst) * N_frame
stop_index = start_index + good_ramp_samples
rx_bursts[burst] = sum_data[start_index:stop_index]
rx_bursts_fft = np.fft.fftshift(abs(np.fft.fft2(rx_bursts)))
# %%
i = 0
cmn = ''
def get_radar_data(frame):
global range_doppler
# Collect data
my_phaser.gpios.gpio_burst = 0
my_phaser.gpios.gpio_burst = 1
my_phaser.gpios.gpio_burst = 0
data = my_sdr.rx()
chan1 = data[0]
chan2 = data[1]
sum_data = chan1+chan2
# Process data
# Make a 2D array of the chirps for each burst
rx_bursts = np.zeros((num_bursts, good_ramp_samples), dtype=complex)
for burst in range(num_bursts):
start_index = start_offset_samples + (burst) * N_frame
stop_index = start_index + good_ramp_samples
rx_bursts[burst] = sum_data[start_index:stop_index]
rx_bursts_fft = np.fft.fftshift(abs(np.fft.fft2(rx_bursts)))
range_doppler.set_data(np.log10(rx_bursts_fft).T)
return [range_doppler]
# %%
cmaps = ['inferno', 'plasma']
cmn = cmaps[0]
plot_data = np.log10(rx_bursts_fft).T
plot_data = np.clip(plot_data, 0, 6)
range_doppler = ax.imshow(plot_data, aspect='auto',
extent=extent, origin='lower', cmap=matplotlib.colormaps.get_cmap(cmn),
)
ax.set_title('Range Doppler Spectrum', fontsize=24)
ax.set_xlabel('Velocity [m/s]', fontsize=22)
ax.set_ylabel('Range [m]', fontsize=22)
max_range = 40
ax.set_xlim([-15, 15])
ax.set_ylim([0, max_range])
ax.set_yticks(np.arange(0, max_range, 4))
plt.xticks(fontsize=20)
plt.yticks(fontsize=20)
print("Press CTRL+C to stop the loop")
try:
while True:
FuncAnimation(range_doppler_fig, get_radar_data, blit=True, frames=1, repeat=False)
plt.show()
plt.pause(.1)
except KeyboardInterrupt: # press ctrl-c to stop the loop
pass
# %%
# Pluto transmit shutdown
my_sdr.tx_destroy_buffer()
print("Buffer Destroyed!")
# # To disable TDD and revert to non-TDD (standard) mode
tdd.enable = False
sdr_pins.gpio_phaser_enable = False
tdd.out_channel1_polarity = not(sdr_pins.gpio_phaser_enable)
tdd.out_channel2_polarity = sdr_pins.gpio_phaser_enable
tdd.enable = True
tdd.enable = False

55
target_detection_dbfs.py Normal file
View File

@@ -0,0 +1,55 @@
'''
target_detection_dbfs.py
Original code from Marshall Bruner, Colorado State University
https://github.com/brunerm99/ADI_Radar_DSP
Modified by Jon Kraft to use dBFS values
'''
import numpy as np
from scipy.interpolate import interp1d
def cfar(X_k, num_guard_cells, num_ref_cells, bias=1, cfar_method='average',
fa_rate=0.2):
N = X_k.size
cfar_values = np.ma.masked_all(X_k.shape)
for center_index in range(num_guard_cells + num_ref_cells, N - (num_guard_cells + num_ref_cells)):
min_index = center_index - (num_guard_cells + num_ref_cells)
min_guard = center_index - num_guard_cells
max_index = center_index + (num_guard_cells + num_ref_cells) + 1
max_guard = center_index + num_guard_cells + 1
lower_nearby = X_k[min_index:min_guard]
upper_nearby = X_k[max_guard:max_index]
lower_mean = np.mean(lower_nearby)
upper_mean = np.mean(upper_nearby)
if (cfar_method == 'average'):
mean = np.mean(np.concatenate((lower_nearby, upper_nearby)))
output = mean + bias
elif (cfar_method == 'greatest'):
mean = max(lower_mean, upper_mean)
output = mean + bias
elif (cfar_method == 'smallest'):
mean = min(lower_mean, upper_mean)
output = mean + bias
elif (cfar_method == 'false_alarm'):
refs = np.concatenate((lower_nearby, upper_nearby))
noise_variance = np.sum(refs**2 / refs.size)
output = (noise_variance * -2 * np.log(fa_rate))**0.5
else:
raise Exception('No CFAR method received')
cfar_values[center_index] = output
cfar_values[np.where(cfar_values == np.ma.masked)] = np.min(cfar_values)
targets_only = np.ma.masked_array(np.copy(X_k))
targets_only[np.where(abs(X_k) > abs(cfar_values))] = np.ma.masked
if (cfar_method == 'false_alarm'):
return cfar_values, targets_only, noise_variance
else:
return cfar_values, targets_only