Files
OpenXNAV/3_timing+event_generation/sdr_io.py
2023-10-19 12:58:15 -04:00

295 lines
9.7 KiB
Python

import numpy as np
import adi
import matplotlib.pyplot as plt
def sinusoid(N=10000,sample_rate=10000,freq=2500):
'''
Creates sinusoidal pattern for SDR transmission.
Parameters
----------
N : int, optional
Number of points in final array. The default is 10000.
sample_rate : int, optional
Sampling frequency of the signal. The default is 10000 Hz.
freq : int, optional
Frequency of the wave itself. The default is 2500 Hz.
Returns
-------
samples : array_like
Waveform with specified frequency and sampling resolution.
'''
t = np.arange(N)/sample_rate
samples = 0.5*np.exp(2.0j*np.pi*freq*t) # Simulate a sinusoid of 100 kHz, so it should show up at 915.1 MHz at the receiver
return samples
def stretch_pulse_train(pulse_train,stretch_factor):
'''
Takes any pulse train (sequence of amplitude vs. time data) and repeats
each point a specified number of times.
E.g. stretch_pulse_train([1,0],3) returns [1,1,1,0,0,0]
Parameters
----------
pulse_train : array_like
Pulse sequence to be converted to stretched pulse sequence.
stretch_factor : int
Factor by which you wish to increase the length of the pulse train.
Returns
-------
new_pulse_train : array_like
Stretched pulse train.
'''
N = len(pulse_train)*stretch_factor
new_pulse_train = np.zeros(N)
for i in range(int(stretch_factor)):
new_pulse_train[i:N:stretch_factor] = pulse_train
return new_pulse_train
def pulse_train_to_tx(pulse_train,num_periods=1,bits_per_period=4):
'''
Takes any pulse train (sequence of amplitude vs. time data) and modulates
it with a sinusoidal wave so that it can be transmitted by an SDR.
This function uses the stretch_pulse_train function to stretch the pulse
train according to the resolution specified by the function arguments.
Parameters
----------
pulse_train : array_like
Pulse sequence to be converted to SDR transmittable waveform.
num_periods : int, optional
Number of sinusoidal periods each pulse signal will be stretched to.
The default is 1.
bits_per_period : int, optional
Wavelength of each sinusoidal period. The default is 4.
Returns
-------
tx_pulse_train : array_like
Waveform created from stretched pulse sequence.
'''
pulse_stretch = stretch_pulse_train(pulse_train,
num_periods*bits_per_period)
N = len(pulse_stretch)
rx_freq = len(pulse_train)
sin_factor = sinusoid(N,N,rx_freq*num_periods)
tx_pulse_train = pulse_stretch*sin_factor
return tx_pulse_train
def rx_to_pulse_train(rx_samples,num_periods,bits_per_period):
'''
Takes a received SDR signal generated by pulse_train_to_tx and converts it
to a binary pulse train.
Parameters
----------
rx_samples : array_like
Received pulse sequence such as would be generated by
pulse_train_to_tx.
num_periods : int
Number of sinusoidal periods each pulse signal has been stretched to.
bits_per_period : int
Wavelength of each sinusoidal period.
Returns
-------
rx_pulse_train : array_like
Sequence of binary signals generated from pulse signal waveform.
'''
rx_abs = np.abs(rx_samples)
bits_per_symbol = num_periods*bits_per_period
check_value = 0.5 * (max(rx_abs) + min(rx_abs))
rx_pulse_train = (rx_abs[::bits_per_symbol] > check_value).astype(int)
return rx_pulse_train
def check_fidelity(tx_pulse_train,rx_pulse_train):
'''
Checks each phase shift to find the shift at which the dot product of the
transmitted and received pulse trains is equal to the sum of one of the
pulse trains. This would imply that all the ones in the pulse train are
aligned and matching.
Assumptions:
- Both pulse trains must be binary (1s and 0s only).
- This check only works if the pulse trains have the same number of 1s
and 0s as each other. This is verified before entering the for loop.
Parameters
----------
tx_pulse_train : array_like
Transmitted binary pulse sequence.
rx_pulse_train : array_like
Received binary pulse sequence.
Returns
-------
bool
Check as to whether the binary pulse sequences match (with or without a
phase shift).
'''
t_len = len(tx_pulse_train)
r_len = len(rx_pulse_train)
t_sum = sum(tx_pulse_train)
r_sum = sum(rx_pulse_train)
if t_len != r_len:
print('Error: Pulse train lengths do not match.')
return False
if t_sum != r_sum:
return False
for i in np.linspace(r_len,0,1+r_len).astype(int):#int(r_len/2),r_len):
r_temp = np.append(rx_pulse_train[i:],rx_pulse_train[:i])
if np.dot(r_temp,tx_pulse_train) == t_sum:
print('Out of phase by',i)
return True
return False
def sdr_tx_rx(sample_rate,center_freq,
tx_sdr,rx_sdr,
tx_data,tx_length,
plot_rx_pulse_train = True,plot_rx_samples = True,plot_fft = True,check_fid = True):
'''
Transmit and receive a binary pulse sequence through SDR(s) to verify
signal fidelity.
Parameters
----------
sample_rate : int
Sampling frequency of the signal.
center_freq : int
Transmission frequency of SDR.
tx_sdr : str
IP address of transmitting SDR.
rx_sdr : str
IP address of receiving SDR. Can be the same as tx_sdr, as long as that
SDR is connected to itself.
tx_data : array_like
Test pulse sequence to be transmitted and received.
tx_length : int
Length of transmitted pulse sequence. Necessary to specify in case you
want to transmit only a portion of tx_data.
plot_rx_pulse_train : bool, optional
Specify whether you want to generate a plot of the received pulse
sequence. The default is True.
plot_rx_samples : bool, optional
Specify whether you want to generate a plot of the received
non-processed sinusoidal waveform. The default is True.
plot_fft : bool, optional
Specify whether you want to plot the Fourier transform of the received
waveform. The default is True.
check_fid : bool, optional
Specify whether you want to compare the transmitted and received pulse
sequences for transmission fidelity. The default is True.
Returns
-------
None.
'''
tx_pulse_train = tx_data[:tx_length] #change this line to input/truncate pulse train
num_periods = 1
bits_per_period = 3
num_samps = len(tx_pulse_train)*num_periods*bits_per_period # number of samples per call to rx()
sdr1 = adi.Pluto(tx_sdr)
sdr2 = adi.Pluto(rx_sdr)
sdr1.sample_rate = int(sample_rate)
sdr1.sample_rate = int(sample_rate)
# Config Tx
sdr1.tx_rf_bandwidth = int(sample_rate) # filter cutoff, just set it to the same as sample rate
sdr1.tx_lo = int(center_freq)
sdr1.tx_hardwaregain_chan0 = -50 # Increase to increase tx power, valid range is -90 to 0 dB
sdr2.tx_rf_bandwidth = int(sample_rate) # filter cutoff, just set it to the same as sample rate
sdr2.tx_lo = int(center_freq)
sdr2.tx_hardwaregain_chan0 = -50 # Increase to increase tx power, valid range is -90 to 0 dB
# Config Rx
sdr1.rx_lo = int(center_freq)
sdr1.rx_rf_bandwidth = int(sample_rate)
sdr1.rx_buffer_size = num_samps
sdr1.gain_control_mode_chan0 = 'manual'
sdr1.rx_hardwaregain_chan0 = 70.0 # dB, increase to increase the receive gain, but be careful not to saturate the ADC
sdr2.rx_lo = int(center_freq)
sdr2.rx_rf_bandwidth = int(sample_rate)
sdr2.rx_buffer_size = num_samps
sdr2.gain_control_mode_chan0 = 'manual'
sdr2.rx_hardwaregain_chan0 = 70.0 # dB, increase to increase the receive gain, but be careful not to saturate the ADC
# Create transmit waveform (defined by function in pulse train module)
samples = pulse_train_to_tx(tx_pulse_train,num_periods,bits_per_period)
samples *= 2**14 # The PlutoSDR expects samples to be between -2^14 and +2^14, not -1 and +1 like some SDRs
# Start the transmitter
sdr1.tx_cyclic_buffer = True # Enable cyclic buffers
sdr1.tx(samples) # start transmitting
# Clear buffer just to be safe
for ii in range(10):
raw_data = sdr2.rx()
# Receive samples
rx_samples = sdr2.rx()
# Stop transmitting
sdr1.tx_destroy_buffer()
# Calculate power spectral density (frequency domain version of signal)
psd = np.abs(np.fft.fftshift(np.fft.fft(rx_samples)))**2
psd_dB = 10*np.log10(psd)
f = np.linspace(sample_rate/-2, sample_rate/2, len(psd))
# Plot received samples in time domain. Toggle commenting out real, imag, abs
if plot_rx_samples:
plt.figure()
#plt.plot(np.real(rx_samples))#[:1000]))
#plt.plot(np.imag(rx_samples))#[:1000]))
plt.plot(np.abs(rx_samples))#[:1000]))
plt.xlabel("Time")
# Plot processed received pulse train
rx_pulse_train = rx_to_pulse_train(rx_samples,num_periods,bits_per_period)
if plot_rx_pulse_train:
plt.figure()
plt.plot(rx_pulse_train)#[:2500]))
plt.xlabel("Pulse Index")
#Plot freq domain
if plot_fft:
plt.figure()
plt.plot(f/1e6, psd_dB)
plt.xlabel("Frequency [MHz]")
plt.ylabel("PSD")
plt.show()
# Verify fidelity of pulse train transmission
if check_fid:
fidelity_check = check_fidelity(tx_pulse_train, rx_pulse_train)
if fidelity_check:
print('Success! Perfect Transmission!')
else:
print('Uh oh. Something was lost in translation.')