# Copyright (c) IMToolkit Development Team
# This toolkit is released under the MIT License, see LICENSE.txt
import os
from tqdm import trange
if os.getenv("USECUPY") == "1":
from cupy import *
else:
from numpy import *
from .Simulator import Simulator
from .Util import getXORtoErrorBitsArray, inv_dB, randn_c
[docs]class DifferentialMLDSimulator(Simulator):
"""A simulator that relies on the non-coherent maximum likelihood detector, that does not require precise estimates of channel state information at the receiver. The environment variable USECUPY determines whether to use cupy or not.
Args:
codes (ndarray): an input codebook, which is generated on the CPU memory and is transferred into the GPU memory.
channel (imtoolkit.Channel): a channel model used in simulation.
"""
def __init__(self, codes, channel):
super().__init__(codes, channel)
[docs] def simulateBERReference(self, params, outputFile=True, printValue=True):
"""Simulates BER values at multiple SNRs, where the straightforward reference algorithm is used. Note that this time complexity is unrealistically high.
Args:
params (imtoolkit.Parameter): simulation parameters.
outputFile (bool): a flag that determines whether to output the obtained results to the results/ directory.
printValue (bool): a flag that determines whether to print the simulated values.
Returns:
dict: a dict that has two keys: snr_dB and ber, and contains the corresponding results. All the results are transferred into the CPU memory.
"""
IT, M, N, Nc, B, codes = params.IT, params.M, params.N, self.Nc, self.B, self.codes
snr_dBs = linspace(params.snrfrom, params.to, params.len)
sigmav2s = 1.0 / inv_dB(snr_dBs)
xor2ebits = getXORtoErrorBitsArray(Nc)
bers = zeros(params.len)
for i in trange(params.len):
errorBits = 0
v0 = randn_c(N, M) * sqrt(sigmav2s[i]) # N \times M
s0 = eye(M, dtype=complex)
for _ in range(IT):
codei = random.randint(0, Nc)
s1 = matmul(s0, codes[codei]) # differential encoding
self.channel.randomize()
h = self.channel.getChannel() # N \times M
v1 = randn_c(N, M) * sqrt(sigmav2s[i]) # N \times M
y0 = matmul(h, s0) + v0 # N \times M
y1 = matmul(h, s1) + v1 # N \times M
# non-coherent detection that is free from the channel matrix h
p = square(abs(y1 - matmul(y0, codes))) # Nc \times N \times M
norms = p.sum(axis=(1, 2)) # summation over the (N,M) axes
mini = argmin(norms)
errorBits += xor2ebits[codei ^ mini].sum()
v0 = v1
s0 = s1
bers[i] = errorBits / (IT * B)
if printValue:
print("At SNR = %1.2f dB, BER = %d / %d = %1.10e" % (snr_dBs[i], errorBits, IT * B, bers[i]))
ret = self.dicToNumpy({"snr_dB": snr_dBs, "ber": bers})
if outputFile:
self.saveCSV(params.arg, ret)
print(ret)
return ret
[docs] def simulateBERParallel(self, params, outputFile=True, printValue=True):
"""Simulates BER values at multiple SNRs, where the massively parallel algorithm is used. This implementation is especially designed for cupy.
Args:
params (imtoolkit.Parameter): simulation parameters.
outputFile (bool): a flag that determines whether to output the obtained results to the results/ directory.
printValue (bool): a flag that determines whether to print the simulated values.
Returns:
dict: a dict that has two keys: snr_dB and ber, and contains the corresponding results. All the results are transferred into the CPU memory.
"""
M, N, ITo, ITi, Nc, B, codes = params.M, params.N, params.ITo, params.ITi, self.Nc, self.B, self.codes
if Nc > ITi:
print("ITi should be larger than Nc = %d." % Nc)
snr_dBs = linspace(params.snrfrom, params.to, params.len)
sigmav2s = 1.0 / inv_dB(snr_dBs)
xor2ebits = getXORtoErrorBitsArray(Nc)
codesmat = hstack(codes) # M \times M * Nc
eyes = tile(eye(M, dtype=complex), ITi).T.reshape(ITi, M, M) # ITi \times M \times M
indspermute = random.permutation(arange(ITi))
codei = tile(arange(Nc), int(ceil(ITi / Nc)))[0:ITi]
X1 = take(codes, codei, axis=0) # ITi \times M \times M very slow
V0 = randn_c(ITi, N, M) # ITi \times N \times M
S0 = eyes
bers = zeros(len(snr_dBs))
for ito in trange(ITo):
self.channel.randomize()
H = self.channel.getChannel().reshape(ITi, N, M) # ITi \times N \times M
V1 = randn_c(ITi, N, M) # ITi \times N \times M
S1 = matmul(S0, X1)
for i in range(len(snr_dBs)):
Y0 = matmul(H, S0) + V0 * sqrt(sigmav2s[i]) # ITi \times N \times M
Y1 = matmul(H, S1) + V1 * sqrt(sigmav2s[i]) # ITi \times N \times M
Y0X = matmul(Y0, codesmat) # ITi \times N \times M * Nc
Ydiff = tile(Y1, Nc) - Y0X # ITi \times N \times M * Nc
Ydifffro = square(abs(Ydiff)).reshape(ITi, N, Nc, M) # ITi \times N \times Nc \times M
norms = sum(Ydifffro, axis=(1, 3)) # ITi \times Nc
mini = argmin(norms, axis=1) # ITi
errorBits = sum(xor2ebits[codei ^ mini])
bers[i] += errorBits
nbits = (ito + 1) * ITi * B
if printValue:
print("At SNR = %1.2f dB, BER = %d / %d = %1.10e" % (snr_dBs[i], bers[i], nbits, bers[i] / nbits))
V0 = V1
S0 = S1
codei = codei[indspermute]
X1 = X1[indspermute]
bers /= ITo * ITi * B
ret = self.dicToNumpy({"snr_dB": snr_dBs, "ber": bers})
if outputFile:
self.saveCSV(params.arg, ret)
print(ret)
return ret