您的当前位置:首页正文

【联邦学习】联邦学习的量化传输仿真(一)

2024-12-12 来源:个人技术集锦

项目背景

联邦学习虽然已经能够极大地降低在网络上所需要传输的数据量,但是在实际传输过程中,很容易受到“落后者”的影响。而如果抛弃“落后者”,那么神经网络受到的影响究竟有多大?
采用量化传输,就是在原有torch的float16基础上,进行8bit甚至4bit量化,能否在不损失太多模型精度的条件下,减少通信所需的比特数。

量化方式

在模型中,小于1的数比大于1的数更多,因此先从小数的量化开始讨论。
简单的说,小数点后部分不同比特量化结果如下:

1bit量化:0,.5,小数点后只有这两种情况

2bit: 0 .25 .5 .75
编码00 01 10 11

3bit: 0 .125 .25 .375 .5 .625 .75 .875
编码000 001 010 011 100 101 110 111

4bit: 0 0.0625 …

举例:要量化的数是0.825
1bit量化 值:0.5 bias:0.325
编码:1
2bit量化 值:0.75 bias:0.075
编码:11
3bit量化 值:0.75 bias:0.075
编码:110
4bit量化 值:0.8125 bias:0.0125

因此传输时,先传一个2bit量化值11作为基础值,再传一个8bit量化值0001作为补充的精确值,最终误差仅为1.5%。如果补充的值无法到达,只能采用基础值进行训练,也只有9%的误差。反观原本float格式需要一次传32bit。

听起来很不错?
但是数据的动态范围很大,10~10^-5,我们可以科学计数法之后进行再量化,模型数据小数后都是4位,初步发现对模型大小可以减少很多。

仿真中:
并不需要真的实现量化后的编/解码和传输,只需要计算量化后的bit数,得出整体模型的大小,用于传输时延的仿真即可。后续模型计算时采用相应量化后的值。

#量化前数据'fc.2.weight'
tensor([[ 0.0648, -0.0585,  0.0516,  ..., -0.0800, -0.0462, -0.0774],
        [ 0.0770, -0.0377, -0.0328,  ..., -0.0056,  0.0144, -0.0698],
        [-0.0782,  0.0057, -0.0630,  ...,  0.0109,  0.0307,  0.0323]])

#量化后数据'fc.2.weight'
tensor([[ 0.0625, -0.0562,  0.0500,  ..., -0.0750, -0.0437, -0.0750],
        [ 0.0750, -0.0375, -0.0312,  ..., -0.0050,  0.0125, -0.0688],
        [-0.0750,  0.0056, -0.0625,  ...,  0.0063,  0.0250,  0.0312]])

出现0.0063这样的数字是因为tensor的float32精度对数值进行了四舍五入。在函数应用时需注意,GPU上的tensor不能直接转为numpy,需要先放到CPU上(已解决)。

具体的量化操作

from collections import OrderedDict
import torch


def float_to_str(num: float):  # Convert float scientific notation to string
    d = str(num)
    neg = True if d[0] == '-' else False  # determine if it is negative
    point_pos = d.find('.')

    # No need for scientific notation
    neednt = False if abs(num) > 1 or 'e' in d else True
    for i in range(len(d)-point_pos):
        if d[i+point_pos] > '0':
            num_pos = i+point_pos
            break
    if not neednt:
        return(d)
    else:
        d = d[num_pos:num_pos+1]+'.' + \
            d[num_pos+1:]+'e-'+str(num_pos-point_pos)
        if neg:
            d = '-'+d
        return(d)


def float_split(num: float):  # Split out the base value
    if abs(num) > 0.01:
        return(round(num, 2))
    num_str = float_to_str(num)
    if num_str.find('e')-num_str.find('.') < 3 and abs(num) < 1:
        return(num_str)
    if 'e' in num_str:
        first_piece = num_str[:num_str.find(
            '.')+2]+num_str[num_str.find('e'):]
    else:
        first_piece = num_str[:-2]
        second_piece = num_str[4:6]
    return(first_piece)


def Encode(array):  # Encode numbers in model as strings
    if not isinstance(array[0], list):
        for i, num in enumerate(array):
            array[i] = float_split(num)
    else:
        for subarray in array:
            Encode(subarray)


def Param_compression(dict: OrderedDict):  # Inplace change model parameters
    for key, value in dict.items():
        temp = value.tolist()
        Encode(temp)
        dict[key] = temp


def str_to_float(string: str):  # Convert string to float
    return(float(string))


def Decode(array: list):  # Encode strings in model as numbers
    if not isinstance(array[0], list):
        for i, num in enumerate(array):
            array[i] = str_to_float(num)
    else:
        for subarray in array:
            Decode(subarray)


def Param_recovery(dict: OrderedDict):  # Model parameter recovery
    for key, value in dict.items():
        temp = value
        Decode(value)
        dict[key] = torch.tensor(temp, dtype=torch.float32)

信道模拟、传输时延模拟

首先建立一个独立于主线程之外的线程函数Channel_rate(),去计算时刻变化的信道传输速率(每个ue即意味着一个这样的线程),供后面的时延计算函数使用。
还需要这些函数来完成信道传输和接收部分:
2. 时延计算函数Trans_delay(ue,)
3. 基站接受+聚合策略函数BS_receive()
4. 训练函数


def Channel_rate(UE_list, train_args):
    # 作为一个独立的后台线程,职责是为主线程提供信道速度
    while True:
        for ue in UE_list:
            ue.channel_rate += train_args['rate_change_low'] +\
                (train_args['rate_change_high'] -
                 train_args['rate_change_low'])*np.random.rand()
        time.sleep(2)


def Trans_delay(ue, size):
    delay = size/ue.channel_rate
    return(delay)


'''How they used?
t = Thread(target=Channel_rate, args=(UE_list,), daemon=True)
t.start()
'''

这里再提供一个方便测试自己写的信道代码能否正常运行的程序,能够动态的观察信道速率,可视化观看,非常的好用:

#看信道速率的动态变化
model = Net()

UE_list = []

init_ue(10)

t = Thread(target=Channel_rate, args=(UE_list, train_args,), daemon=True)

t.start()

x = []

y = []

for i in range(10):

    y.append([])

plt.ion()

for i in range(20):

    plt.clf()

    x.append(i)

    for num, ue in enumerate(UE_list):

        y[num].append(ue.channel_rate)

    for i, yn in enumerate(y):

        plt.plot(x, yn, label='ue'+str(i+1))

        plt.legend(loc=2)

    plt.pause(0.5)

plt.ioff()

plt.show()

所遇到问题

仿真过程中具体的数值设置,比如初始速率范围、变化范围,这些参数该到哪里去寻找呢?我感觉是不是得参考一些论文有根据的去设置比较好。
开始漫漫的寻找参考文献之路。。。。
(10.19)或许我先不着急寻找最后的参数设置,先用变量来代替!这应该是正确的想法,先把流程跑通吧。

显示全文