Source code for imtoolkit.CoherentMLDSimulator

# Copyright (c) IMToolkit Development Team
# This toolkit is released under the MIT License, see LICENSE.txt

import os
from tqdm import trange

if os.getenv("USECUPY") == "1":
    from cupy import *
else:
    from numpy import *

from .Simulator import Simulator
from .Util import getXORtoErrorBitsArray, inv_dB, randn_c


[docs]class CoherentMLDSimulator(Simulator): """A simulator that relies on the coherent maximum likelihood detector, that assumes perfect channel state information at the receiver. The environment variable USECUPY determines whether to use cupy or not.""" def __init__(self, codes, channel): """ Args: codes (ndarray): an input codebook, which is generated on the CPU memory and is transferred into the GPU memory. channel (imtoolkit.Channel): a channel model used in simulation. """ super().__init__(codes, channel)
[docs] def simulateBERReference(self, params, outputFile=True, printValue=True): """Simulates BER values at multiple SNRs, where the straightforward reference algorithm is used. Note that this time complexity is unrealistically high. Args: params (imtoolkit.Parameter): simulation parameters. outputFile (bool): a flag that determines whether to output the obtained results to the results/ directory. printValue (bool): a flag that determines whether to print the simulated values. Returns: dict: a dict that has two keys: snr_dB and ber, and contains the corresponding results. All the results are transferred into the CPU memory. """ IT, N, T, Nc, B, codes = params.IT, params.N, params.T, self.Nc, self.B, self.codes snr_dBs = linspace(params.snrfrom, params.to, params.len) sigmav2s = 1.0 / inv_dB(snr_dBs) xor2ebits = getXORtoErrorBitsArray(Nc) bers = zeros(len(snr_dBs)) for i in trange(len(snr_dBs)): errorBits = 0 for _ in range(IT): codei = random.randint(0, Nc) self.channel.randomize() h = self.channel.getChannel() # N \times M v = randn_c(N, T) * sqrt(sigmav2s[i]) # N \times T y = matmul(h, codes[codei]) + v # N \times T p = square(abs(y - matmul(h, codes))) # Nc \times N \times T norms = sum(p, axis=(1, 2)) # summation over the (N,T) axes mini = argmin(norms) errorBits += sum(xor2ebits[codei ^ mini]) bers[i] = errorBits / (IT * B) if printValue: print("At SNR = %1.2f dB, BER = %d / %d = %1.10e" % (snr_dBs[i], errorBits, IT * B, bers[i])) ret = self.dicToNumpy({"snr_dB": snr_dBs, "ber": bers}) if outputFile: self.saveCSV(params.arg, ret) print(ret) return ret
[docs] def simulateBERParallel(self, params, outputFile=True, printValue=True): """Simulates BER values at multiple SNRs, where the massively parallel algorithm is used. This implementation is especially designed for cupy. Args: params (imtoolkit.Parameter): simulation parameters. outputFile (bool): a flag that determines whether to output the obtained results to the results/ directory. printValue (bool): a flag that determines whether to print the simulated values. Returns: dict: a dict that has two keys: snr_dB and ber, and contains the corresponding results. All the results are transferred into the CPU memory. """ M, N, T, ITo, ITi, Nc, B, codes = params.M, params.N, params.T, params.ITo, params.ITi, self.Nc, self.B, self.codes snr_dBs = linspace(params.snrfrom, params.to, params.len) sigmav2s = 1.0 / inv_dB(snr_dBs) codei = tile(arange(Nc), ITi) xor2ebits = getXORtoErrorBitsArray(Nc) # The followings are old implementation proposed in [ishikawa2019im], which only support the T=1 case # x = tile(codes, Nc) # Nc \times M \times T * Nc # y = x.T # Nc \times M \times T * Nc # diffxy = hstack(x-y) # M \times Nc * Nc x = hstack(tile(codes, Nc)) # M \times T * Nc^2 # x = [codes[0] codes[0] ... codes[0] codes[1] ...] y = tile(hstack(codes), Nc) # M \times T * Nc^2 # y = [codes[0] codes[1] ... codes[Nc-1] codes[0] ...] diffxy = x - y # M \times T * Nc^2 bers = zeros(len(snr_dBs)) for ito in trange(ITo): self.channel.randomize() bigh = self.channel.getChannel() # ITi * N \times M bigv = tile(randn_c(ITi * N, T), Nc * Nc) # ITi * N \times T * Nc^2 for i in range(len(snr_dBs)): ydiff = matmul(bigh, diffxy) + bigv * sqrt(sigmav2s[i]) # ITi * N \times T * Nc^2 # The followings are old implementation of [ishikawa2019im] # ydifffro = power(abs(ydiff.reshape(ITi, N, T * Nc * Nc)), 2) # ITi \times N \times T * Nc * Nc # ydifffrosum = sum(ydifffro, axis = 1) # ITi \times T * Nc * Nc ydifffro = square(abs(ydiff)).reshape(ITi, N, Nc * Nc, T) # ITi \times N \times Nc * Nc \times T ydifffrosum = sum(ydifffro, axis=(1, 3)) # ITi \times Nc * Nc norms = ydifffrosum.reshape(ITi, Nc, Nc) # ITi \times Nc \times Nc mini = argmin(norms, axis=2).reshape(ITi * Nc) errorBits = sum(xor2ebits[codei ^ mini]) bers[i] += errorBits if printValue: nbits = (ito + 1) * ITi * B * Nc print("At SNR = %1.2f dB, BER = %d / %d = %1.10e" % (snr_dBs[i], bers[i], nbits, bers[i] / nbits)) bers /= ITo * ITi * B * Nc ret = self.dicToNumpy({"snr_dB": snr_dBs, "ber": bers}) if outputFile: self.saveCSV(params.arg, ret) print(ret) return ret
[docs] def simulateAMIReference(self, params, outputFile=True, printValue=True): """Simulates AMI values at multiple SNRs, where the straightforward reference algorithm is used. Note that this time complexity is unrealistically high. Args: params (imtoolkit.Parameter): simulation parameters. outputFile (bool): a flag that determines whether to output the obtained results to the results/ directory. printValue (bool): a flag that determines whether to print the simulated values. Returns: dict: a dict that has two keys: snr_dB and ami, and contains the corresponding results. All the results are transferred into the CPU memory. """ IT, N, T, Nc, B, codes = params.IT, params.N, params.T, self.Nc, self.B, self.codes snr_dBs = linspace(params.snrfrom, params.to, params.len) sigmav2s = 1.0 / inv_dB(snr_dBs) amis = zeros(len(snr_dBs)) for i in trange(len(snr_dBs)): sum_outer = 0.0 for _ in range(IT): V = sqrt(sigmav2s[i]) * randn_c(N, T) # V = sqrt(sigmav2) * seedv.reshape(M, 1) # for debug self.channel.randomize() H = self.channel.getChannel() # N \times M for outer in range(Nc): sum_inner = 0.0 for inner in range(Nc): hxy = matmul(H, codes[outer] - codes[inner]) head = hxy + V tail = V coeff = (-square(linalg.norm(head)) + square(linalg.norm(tail))) / sigmav2s[i] sum_inner += exp(coeff) sum_outer += log2(sum_inner) # print("bminus = " + str(sum_outer / Nc / IT)) amis[i] = (B - sum_outer / Nc / IT) / T if printValue: print("At SNR = %1.2f dB, AMI = %1.10f" % (snr_dBs[i], amis[i])) ret = self.dicToNumpy({"snr_dB": snr_dBs, "ami": amis}) if outputFile: print(ret) self.saveCSV(params.arg, ret) return ret
[docs] def simulateAMIParallel(self, params, outputFile=True, printValue=True): """Simulates AMI values at multiple SNRs, where the massively parallel algorithm is used. This implementation is especially designed for cupy. Args: params (imtoolkit.Parameter): simulation parameters. outputFile (bool): a flag that determines whether to output the obtained results to the results/ directory. printValue (bool): a flag that determines whether to print the simulated values. Returns: dict: a dict that has two keys: snr_dB and ami, and contains the corresponding results. All the results are transferred into the CPU memory. """ M, N, T, ITo, ITi, Nc, B, codes = params.M, params.N, params.T, params.ITo, params.ITi, self.Nc, self.B, self.codes snr_dBs = linspace(params.snrfrom, params.to, params.len) sigmav2s = 1.0 / inv_dB(snr_dBs) # The following three variables are the same as those used in simulateBERParallel x = hstack(tile(codes, Nc)) # M \times T * Nc^2 y = tile(hstack(codes), Nc) # M \times T * Nc^2 diffxy = x - y # M \times T * Nc^2 amis = zeros(len(snr_dBs)) for ito in trange(ITo): self.channel.randomize() bigh = self.channel.getChannel() # ITi * N \times M bigv = tile(randn_c(ITi * N, T), Nc * Nc) # ITi * N \times T * Nc^2 bigvfro = square(abs(bigv)).reshape(ITi, N, Nc * Nc, T) # ITi \times N \times Nc^2 \times T frov = sum(bigvfro, axis=(1, 3)).reshape(ITi, Nc, Nc) # ITi \times Nc \times Nc for i in range(len(snr_dBs)): hsplusv = matmul(bigh, diffxy) + bigv * sqrt(sigmav2s[i]) # ITi * N \times T * Nc^2 hsvfro = square(abs(hsplusv)).reshape(ITi, N, Nc * Nc, T) # ITi \times N \times Nc^2 \times T froy = sum(hsvfro, axis=(1, 3)) # ITi \times Nc^2 reds = froy.reshape(ITi, Nc, Nc) # ITi \times Nc \times Nc ecoffs = -reds / sigmav2s[i] + frov # diagonal elements must be zero bminus = mean(log2(sum(exp(ecoffs), axis=2))) amis[i] += (B - bminus) / T if printValue: print("At SNR = %1.2f dB, AMI = %1.10f" % (snr_dBs[i], amis[i] / (ito + 1))) # amis /= ITo ret = self.dicToNumpy({"snr_dB": snr_dBs, "ami": amis}) if outputFile: print(ret) self.saveCSV(params.arg, ret) return ret