from random import randrange
from itertools import combinations
from math import floor
# Superposed Sending
class Actor(object):
"Superposed Sending Actor"
def __init__(
self, name, alphabet=24, collisions=30, dbg=False
):
self.name = name
self.partners = dict()
self.symbols = alphabet * collisions
self.collisions = collisions
self.xMit = False
self.msg = 0
self.dbg = dbg
def __repr__(self):
return self.name
def AddPartner(self, otherActor):
randSymbol = randrange(self.symbols)
randSense = randrange(self.collisions)
if self.dbg > 1:
print(
"Negotiating secret: "
+ self.name
+ "->"
+ otherActor.name
)
print(" value:" + repr(randSymbol))
otherActor.partners.update(
[[self, [randSense, randSymbol]]]
)
self.partners.update(
[
[
otherActor,
[
self.collisions - randSense,
self.symbols - randSymbol,
],
]
]
)
def Renegotiate(self):
for other in list(self.partners.keys()):
self.AddPartner(other)
def Prepare(self):
self.senseChannel = (
sum(
[x[0] for x in list(self.partners.values())]
)
) % self.collisions
self.msgChannel = (
sum(
[x[1] for x in list(self.partners.values())]
)
) % self.symbols
def StoreMsg(self, msg):
self.msg = msg
self.xMit = True
def Notify(self, x):
if self.msg == x:
self.xMit = False
def BroadcastMsg(self, pivot="Nil", dir=+1):
self.Prepare()
if dir == +1:
def tst(x, y):
if y == "Nil":
return True
else:
return x < y
else:
def tst(x, y):
if y == "Nil":
return True
else:
return x >= y
if self.xMit & tst(self.msg, pivot):
self.public_senseChannel = (
self.senseChannel + 1
) % self.collisions
self.public_msgChannel = (
self.msgChannel + self.msg
) % self.symbols
else:
self.public_senseChannel = (
self.senseChannel
) % self.collisions
self.public_msgChannel = (
self.msgChannel
) % self.symbols
return [
self.public_senseChannel,
self.public_msgChannel,
]
class SuperposedSending(object):
"Superposed Sending with collision resolution"
def __init__(self, Actors, alphabet=26, dbg=False):
if type(Actors) == int:
Cr = []
for i in range(0, Actors):
Cr.append(
Actor(
"Actor #" + repr(i + 1),
(alphabet + 1),
Actors,
dbg,
)
)
Actors = Cr
self.alphabet = alphabet
self.Actors = Actors
self.symbols = (alphabet) * len(Actors)
self.dbg = dbg
def __getitem__(self, i):
return self.Actors[i]
def negotiate(self):
for [A, B] in combinations(self.Actors, 2):
A.AddPartner(B)
for A in self.Actors:
A.Prepare()
def Renegotiate(self):
for A in self.Actors:
A.Renegotiate()
def decodeSense(self, thr="Nil", dir=+1):
"Decode Sense Channel"
senders = 0
actualMsg = 0
for A in self.Actors:
[Sense, Msg] = A.BroadcastMsg(thr, dir)
senders += Sense
actualMsg += Msg
return [
int(senders) % (len(self.Actors)),
int(actualMsg) % self.symbols,
]
def NotifyAll(self, msg):
for A in self.Actors:
A.Notify(msg)
def decodeChannel(self, avg="Nil", dir=+1):
[sense, msg] = self.decodeSense(avg, dir)
if self.dbg > 0:
if dir > 0:
print("(I) pivot=", avg, " sense=", sense)
if dir < 0:
print("(D) pivot=", avg, " sense=", sense)
while not (sense == 0):
avg0 = avg
avg = int(msg) / int(sense)
if not (avg0 == "Nil"):
if floor(avg0) == floor(avg):
print(
"Got ", int(floor(avg)), "*", sense
)
self.NotifyAll(int(floor(avg)))
return [int(floor(avg))]
if self.dbg > 0:
print("sense =", sense, end=" ")
print("value =", msg, end=" ")
print("pivot =", avg)
if sense == 1:
print("Got ", msg)
self.NotifyAll(msg)
return [int(msg)]
else:
if self.dbg > 0:
print("Renegotiate channel!")
self.Renegotiate()
return self.decodeChannel(
avg, -1
) + self.decodeChannel(avg, +1)
return []
def SelfTest(self, r=3):
allmsg = []
for A in self.Actors:
rnd = randrange(0, self.alphabet)
t = randrange(0, r)
if t == 1:
A.StoreMsg(rnd)
allmsg.append(rnd)
if self.dbg > 0:
print(
repr(A)
+ " wishes to send "
+ repr(rnd)
)
print(self.decodeSense())
res = self.decodeChannel()
print(
"Senders/Total :"
+ repr(len(allmsg))
+ "/"
+ repr(len(self.Actors))
)
print("Input :", allmsg)
print("Output :", res)
return set(allmsg) == set(res)
X = SuperposedSending(20, 26, 1)