原始需求

udp发送数据包结构:

  1. 起始标志(0x5A,0xA5)2字节
  2. 报文长度(大端,2字节)
  3. 命令码0x66(1字节)
  4. HMI序号0x00(1字节)
  5. 保留0x00(1字节)
  6. 文件名长度n(strlen(文件名)+1,1字节)
  7. 文件名(绝对路径,包含'0',n字节)
  8. 保留0x01(1字节)
  9. 保留0x00(1字节)
  10. 校验和(1字节,从起始标志开始算)

udp应答报文结构:

  1. 起始标志0x5A,0xA5(2字节)
  2. 报文长度(2字节)
  3. 命令码(1字节)
  4. 状态码(0x00失败,0x01成功,1字节)
  5. 校验和(1字节,从起始标志开始算)

需求实现

=========================字节打包测试代码===============================

#!/usr/bin/env python3
# -*- coding: utf-8 -*
import struct
import os

def calculate_checksum(data):
    """计算校验和(所有字节累加和的低8位)"""
    return sum(data) & 0xFF

# ================= 发送固件更新数据包构造 =================
def build_request_packet(filename):
    """
    构造发送数据包
    :param filename: 绝对路径文件名(如"/data/test.txt")
    :return: 字节流数据包
    """
    # 固定头部字段
    start_flag = b'\x5A\xA5'
    command_code = 0x66
    hmi_id = 0x00
    reserved1 = 0x00
    reserved2 = 0x01
    reserved3 = 0x00

    # 处理文件名
    filename_bytes = filename.encode('utf-8') + b'\x00'  # 添加终止符
    filename_len = len(filename_bytes)  # 包含\0的长度

    # 构造可变部分
    variable_part = struct.pack('>B', filename_len) + \
                   filename_bytes + \
                   struct.pack('>B', reserved2) + \
                   struct.pack('>B', reserved3)

    # 计算总长度(从命令码开始到reserved3)
    """
    udp发送数据包结构:
    1. 起始标志(0x5A,0xA5)2字节
    2. 报文长度(大端, 2字节)
    3. 命令码0x66(1字节)
    4. HMI序号0x00(1字节)
    5. 保留0x00(1字节)
    6. 文件名长度n(strlen(文件名)+1, 1字节)
    7. 文件名(绝对路径, 包含'0', n字节)
    8. 保留0x01(1字节)
    9. 保留0x00(1字节)
    10. 校验和(1字节, 从起始标志开始算)
    """
    pkt_length = 2 + 2 + 1 + 1 + 1 + 1 + filename_len + 1 + 1 + 1
    
    # 构造完整数据包(先构造未校验部分)
    packet_without_checksum = struct.pack('>2sHBBBB', 
                                       start_flag,
                                       pkt_length,
                                       command_code,
                                       hmi_id,
                                       reserved1,
                                       filename_len) + \
                            filename_bytes + \
                            struct.pack('>BB', reserved2, reserved3)

    # 计算校验和
    checksum = calculate_checksum(packet_without_checksum)  # 从head开始算

    # 添加校验和
    return packet_without_checksum + struct.pack('>B', checksum)

# ================= 应答数据包解析 =================
def parse_response_packet(data):
    """
    解析应答数据包
    :param data: 字节流数据
    :return: 解析后的字典
    :raises: ValueError 当数据无效时
    """
    # 基础校验
    if len(data) < 7:
        raise ValueError("数据包长度不足")
    
    """
    1. 起始标志0x5A,0xA5(2字节)
    2. 报文长度(2字节) = 7
    3. 命令码(1字节)
    4. 状态码(0x00失败, 0x01成功, 1字节)
    5. 校验和(1字节, 从起始标志开始算)
    """
    # 解析固定部分
    start_flag, pkt_length, cmd_code, status, checksum = \
        struct.unpack('>2sHBBB', data[:7])

    # 校验起始标志
    if start_flag != b'\x5A\xA5':
        raise ValueError("无效的起始标志")

    # 校验数据包长度
    if pkt_length != len(data):
        raise ValueError(f"长度字段不匹配, 预期 {pkt_length} 字节, 实际 {len(data)} 字节")

    # 校验和验证
    calculated_cs = calculate_checksum(data[:-1])
    if checksum != calculated_cs:
        raise ValueError(f"校验和错误, 预期 {checksum:02X}, 实际 {calculated_cs:02X}")

    return status
    # return {
    #     'start_flag': start_flag.hex(),
    #     'packet_length': pkt_length,
    #     'command_code': f'{cmd_code:02X}',
    #     'status': '成功' if status == 0x01 else '失败',
    #     'checksum': f'{checksum:02X}'
    # }


def parse_request(data):
    """解析客户端请求"""
    if len(data) < 10:
        raise ValueError("数据包过短")
    
    # 解包固定部分
    start, length, cmd, hmi, reserved1, filename_len = \
        struct.unpack('>2sHBBBB', data[:8])
    
    # 验证起始标志
    if start != b'\x5A\xA5':
        raise ValueError("无效起始标志")
    
    # 验证校验和
    received_cs = data[-1]
    calculated_cs = sum(data[:-1]) & 0xFF
    if received_cs != calculated_cs:
        raise ValueError(f"校验和错误 {received_cs:02X} vs {calculated_cs:02X}")
    
    # 提取文件名
    filename_end = 8 + filename_len
    filename = data[8:filename_end].rstrip(b'\x00').decode('utf-8')
    
    return filename


def build_response(status):
    """构建应答数据包"""
    start_flag = b'\x5A\xA5'
    cmd_code = 0x66
    pkt_length = 7
    
    data_part = start_flag + struct.pack('>HBB', pkt_length, cmd_code, status)
    checksum = sum(data_part) & 0xFF
    return data_part + struct.pack('>B', checksum)

if __name__ == "__main__":
    req_pack = build_request_packet('/data/test.txt')
    print(' '.join(f'{b:02x}' for b in req_pack))
    read_path = parse_request(req_pack)
    print("read_path = " + read_path)
    res_pack = build_response(1)
    print(' '.join(f'{b:02x}' for b in res_pack))
    res = parse_response_packet(res_pack)
    print("res = " +  str(res))

部分内容解释

字节顺序控制

字符名称说明
@本地序 (默认)与平台相关
=本地标准序按原始数据类型大小对齐
<小端序低位在前 (Intel x86)
>大端序高位在前 (网络协议标准)
!网络序等同于 > (推荐用于网络传输)

数据格式

格式字符C 类型Python 类型字节数取值范围(无符号)
bsigned charint1-128 ~ 127
Bunsigned charint10 ~ 255
hshortint2-32768 ~ 32767
Hunsigned shortint20 ~ 65535
iintint4-2147483648 ~ 2147483647
Iunsigned intint40 ~ 4294967295
llongint4同 i
Lunsigned longint4同 I
qlong longint8极大范围
Qunsigned long longint80 ~ 2^64-1
ffloatfloat4单精度浮点数
ddoublefloat8双精度浮点数