原始需求
udp发送数据包结构:
- 起始标志(0x5A,0xA5)2字节
- 报文长度(大端,2字节)
- 命令码0x66(1字节)
- HMI序号0x00(1字节)
- 保留0x00(1字节)
- 文件名长度n(strlen(文件名)+1,1字节)
- 文件名(绝对路径,包含'0',n字节)
- 保留0x01(1字节)
- 保留0x00(1字节)
- 校验和(1字节,从起始标志开始算)
udp应答报文结构:
- 起始标志0x5A,0xA5(2字节)
- 报文长度(2字节)
- 命令码(1字节)
- 状态码(0x00失败,0x01成功,1字节)
- 校验和(1字节,从起始标志开始算)
需求实现
=========================字节打包测试代码===============================
#!/usr/bin/env python3
# -*- coding: utf-8 -*
import struct
import os
def calculate_checksum(data):
"""计算校验和(所有字节累加和的低8位)"""
return sum(data) & 0xFF
# ================= 发送固件更新数据包构造 =================
def build_request_packet(filename):
"""
构造发送数据包
:param filename: 绝对路径文件名(如"/data/test.txt")
:return: 字节流数据包
"""
# 固定头部字段
start_flag = b'\x5A\xA5'
command_code = 0x66
hmi_id = 0x00
reserved1 = 0x00
reserved2 = 0x01
reserved3 = 0x00
# 处理文件名
filename_bytes = filename.encode('utf-8') + b'\x00' # 添加终止符
filename_len = len(filename_bytes) # 包含\0的长度
# 构造可变部分
variable_part = struct.pack('>B', filename_len) + \
filename_bytes + \
struct.pack('>B', reserved2) + \
struct.pack('>B', reserved3)
# 计算总长度(从命令码开始到reserved3)
"""
udp发送数据包结构:
1. 起始标志(0x5A,0xA5)2字节
2. 报文长度(大端, 2字节)
3. 命令码0x66(1字节)
4. HMI序号0x00(1字节)
5. 保留0x00(1字节)
6. 文件名长度n(strlen(文件名)+1, 1字节)
7. 文件名(绝对路径, 包含'0', n字节)
8. 保留0x01(1字节)
9. 保留0x00(1字节)
10. 校验和(1字节, 从起始标志开始算)
"""
pkt_length = 2 + 2 + 1 + 1 + 1 + 1 + filename_len + 1 + 1 + 1
# 构造完整数据包(先构造未校验部分)
packet_without_checksum = struct.pack('>2sHBBBB',
start_flag,
pkt_length,
command_code,
hmi_id,
reserved1,
filename_len) + \
filename_bytes + \
struct.pack('>BB', reserved2, reserved3)
# 计算校验和
checksum = calculate_checksum(packet_without_checksum) # 从head开始算
# 添加校验和
return packet_without_checksum + struct.pack('>B', checksum)
# ================= 应答数据包解析 =================
def parse_response_packet(data):
"""
解析应答数据包
:param data: 字节流数据
:return: 解析后的字典
:raises: ValueError 当数据无效时
"""
# 基础校验
if len(data) < 7:
raise ValueError("数据包长度不足")
"""
1. 起始标志0x5A,0xA5(2字节)
2. 报文长度(2字节) = 7
3. 命令码(1字节)
4. 状态码(0x00失败, 0x01成功, 1字节)
5. 校验和(1字节, 从起始标志开始算)
"""
# 解析固定部分
start_flag, pkt_length, cmd_code, status, checksum = \
struct.unpack('>2sHBBB', data[:7])
# 校验起始标志
if start_flag != b'\x5A\xA5':
raise ValueError("无效的起始标志")
# 校验数据包长度
if pkt_length != len(data):
raise ValueError(f"长度字段不匹配, 预期 {pkt_length} 字节, 实际 {len(data)} 字节")
# 校验和验证
calculated_cs = calculate_checksum(data[:-1])
if checksum != calculated_cs:
raise ValueError(f"校验和错误, 预期 {checksum:02X}, 实际 {calculated_cs:02X}")
return status
# return {
# 'start_flag': start_flag.hex(),
# 'packet_length': pkt_length,
# 'command_code': f'{cmd_code:02X}',
# 'status': '成功' if status == 0x01 else '失败',
# 'checksum': f'{checksum:02X}'
# }
def parse_request(data):
"""解析客户端请求"""
if len(data) < 10:
raise ValueError("数据包过短")
# 解包固定部分
start, length, cmd, hmi, reserved1, filename_len = \
struct.unpack('>2sHBBBB', data[:8])
# 验证起始标志
if start != b'\x5A\xA5':
raise ValueError("无效起始标志")
# 验证校验和
received_cs = data[-1]
calculated_cs = sum(data[:-1]) & 0xFF
if received_cs != calculated_cs:
raise ValueError(f"校验和错误 {received_cs:02X} vs {calculated_cs:02X}")
# 提取文件名
filename_end = 8 + filename_len
filename = data[8:filename_end].rstrip(b'\x00').decode('utf-8')
return filename
def build_response(status):
"""构建应答数据包"""
start_flag = b'\x5A\xA5'
cmd_code = 0x66
pkt_length = 7
data_part = start_flag + struct.pack('>HBB', pkt_length, cmd_code, status)
checksum = sum(data_part) & 0xFF
return data_part + struct.pack('>B', checksum)
if __name__ == "__main__":
req_pack = build_request_packet('/data/test.txt')
print(' '.join(f'{b:02x}' for b in req_pack))
read_path = parse_request(req_pack)
print("read_path = " + read_path)
res_pack = build_response(1)
print(' '.join(f'{b:02x}' for b in res_pack))
res = parse_response_packet(res_pack)
print("res = " + str(res))
部分内容解释
字节顺序控制
字符 | 名称 | 说明 |
---|---|---|
@ | 本地序 (默认) | 与平台相关 |
= | 本地标准序 | 按原始数据类型大小对齐 |
< | 小端序 | 低位在前 (Intel x86) |
> | 大端序 | 高位在前 (网络协议标准) |
! | 网络序 | 等同于 > (推荐用于网络传输) |
数据格式
格式字符 | C 类型 | Python 类型 | 字节数 | 取值范围(无符号) |
---|---|---|---|---|
b | signed char | int | 1 | -128 ~ 127 |
B | unsigned char | int | 1 | 0 ~ 255 |
h | short | int | 2 | -32768 ~ 32767 |
H | unsigned short | int | 2 | 0 ~ 65535 |
i | int | int | 4 | -2147483648 ~ 2147483647 |
I | unsigned int | int | 4 | 0 ~ 4294967295 |
l | long | int | 4 | 同 i |
L | unsigned long | int | 4 | 同 I |
q | long long | int | 8 | 极大范围 |
Q | unsigned long long | int | 8 | 0 ~ 2^64-1 |
f | float | float | 4 | 单精度浮点数 |
d | double | float | 8 | 双精度浮点数 |