热线电话:13121318867

登录
首页大数据时代【CDA干货】Python 实践:神经网络与卡尔曼滤波融合系统的构建与应用
【CDA干货】Python 实践:神经网络与卡尔曼滤波融合系统的构建与应用
2025-10-23
收藏

在 “神经网络与卡尔曼滤波融合” 的理论基础上,Python 凭借其丰富的科学计算库(NumPy、FilterPy)、深度学习框架(PyTorch、TensorFlow)及数据处理工具,成为实现融合系统的理想选择。本文将以 “无人机姿态估计” 为典型场景,从环境搭建、核心模块编码、系统集成到效果验证,完整讲解如何用 Python 构建一套 “LSTM - 卡尔曼滤波融合系统”,解决传统卡尔曼滤波在非线性姿态估计中的精度不足问题。

一、Python 工具链选型:为什么这些库最适合融合系统开发?

融合系统的实现需兼顾 “卡尔曼滤波的高效递推” 与 “神经网络的非线性拟合”,Python 生态中以下工具可实现高效协同:

工具 / 库 核心作用 优势说明
NumPy 数值计算与矩阵运算 卡尔曼滤波的状态矩阵、方差矩阵运算依赖 NumPy 的向量化操作,比原生 Python 快 10~100 倍
FilterPy 卡尔曼滤波及其变种的开箱即用实现 封装了 KF、EKF、UKF 等经典算法,支持自定义状态转移 / 观测模型,避免重复造轮子
PyTorch/TensorFlow 神经网络模型构建与训练 支持动态图(PyTorch)或静态图(TensorFlow),适合快速迭代 LSTM、CNN 等非线性模型
Pandas/Matplotlib 数据处理与结果可视化 处理传感器数据(如 IMU 的角速度、加速度数据),可视化估计结果与真实值的对比
SciPy 辅助数学运算(如噪声分布拟合) 用于分析传感器噪声的统计特征,为卡尔曼滤波的 Q/R 矩阵初始化提供依据

本文以 “FilterPy 实现卡尔曼滤波”+“PyTorch 实现 LSTM 非线性补偿” 为例,兼顾代码简洁性与可扩展性。

二、核心模块实现:从基础组件到融合逻辑

融合系统的实现分为三大核心模块:数据预处理模块(处理传感器原始数据)、卡尔曼滤波基础模块(实现传统 EKF)、LSTM 非线性补偿模块(学习姿态运动的非线性映射),最终通过 “LSTM 校正 EKF 输出” 实现融合。

1. 第一步:环境搭建与数据准备

(1)环境安装

通过 pip 安装所需依赖:

pip install numpy filterpy torch pandas matplotlib scipy

(2)数据集选择:无人机 IMU 姿态数据集

采用公开的 “无人机飞行姿态数据集”(可从 Kaggle 或 ROS 数据集库获取),包含:

  • 输入特征:IMU 传感器的角速度(x/y/z 轴)、加速度(x/y/z 轴);

  • 标签:高精度光学定位系统采集的真实姿态角(滚转角 roll、俯仰角 pitch、偏航角 yaw);

  • 数据量:10 万条时序数据,采样频率 100Hz(符合无人机实时控制需求)。

(3)数据预处理代码

需完成 “数据清洗(去除异常值)、归一化(避免量纲影响)、时序数据划分(按时间步构建 LSTM 输入)”:

import pandas as pd

import numpy as np

from sklearn.preprocessing import StandardScaler

from sklearn.model_selection import train_test_split

# 1. 加载数据

data = pd.read_csv("drone_imu_姿态数据.csv")

features = data[["angular_x""angular_y""angular_z""accel_x""accel_y""accel_z"]].values  # IMU特征

labels = data[["roll""pitch""yaw"]].values  # 真实姿态角标签

# 2. 数据清洗:去除3σ外的异常值

def remove_outliers(data, sigma=3):

   mean = np.mean(data, axis=0)

   std = np.std(data, axis=0)

   mask = np.all(np.abs((data - mean) / std) <= sigma, axis=1)

   return data[mask]

features = remove_outliers(features)

labels = remove_outliers(labels)

# 3. 归一化(特征与标签需用同一scaler,后续需反归一化)

scaler = StandardScaler()

features_scaled = scaler.fit_transform(features)

labels_scaled = scaler.fit_transform(labels)

# 4. 构建LSTM时序输入(时间步t=10,即用前10个时刻的IMU数据预测当前姿态角)

def create_lstm_input(features, labels, time_step=10):

   X, y = [], []

   for i in range(len(features) - time_step):

       X.append(features[i:i+time_step])  # 输入:(time_step, 6)

       y.append(labels[i+time_step])      # 输出:(3,) (当前姿态角)

   return np.array(X), np.array(y)

time_step = 10

X, y = create_lstm_input(features_scaled, labels_scaled, time_step)

# 5. 划分训练集/测试集(按时间顺序划分,避免数据泄露)

train_size = int(0.8 * len(X))

X_train, X_test = X[:train_size], X[train_size:]

y_train, y_test = y[:train_size], y[train_size:]

# 调整维度适配PyTorch:(样本数, 时间步, 特征数) → 符合LSTM输入格式

print("训练集维度:X_train:", X_train.shape, "y_train:", y_train.shape)  # 输出:(80000, 10, 6) (80000, 3)

2. 第二步:传统扩展卡尔曼滤波(EKF)实现(基于 FilterPy)

针对无人机姿态估计的非线性场景,先用 FilterPy 实现基础 EKF,作为融合系统的 “基准模块”:

from filterpy.kalman import ExtendedKalmanFilter

import math

class DroneEKF:

   def __init__(self, dt=0.01):  # dt=0.01s(100Hz采样频率)

       # 1. 初始化EKF:状态维度n=3(roll, pitch, yaw),观测维度m=3(IMU计算的姿态角)

       self.ekf = ExtendedKalmanFilter(dim_x=3, dim_z=3)

      

       # 2. 状态初始值(假设初始姿态角为0)

       self.ekf.x = np.array([0., 0., 0.])

      

       # 3. 状态方差矩阵P(初始不确定性,对角线值越大,初始信任度越低)

       self.ekf.P = np.diag([0.1, 0.1, 0.1])

      

       # 4. 过程噪声方差Q(系统内部干扰,如电机振动,需根据实际调整)

       self.ekf.Q = np.diag([0.001, 0.001, 0.001])

      

       # 5. 观测噪声方差R(传感器误差,IMU姿态角计算误差通常较小)

       self.ekf.R = np.diag([0.01, 0.01, 0.01])

      

       # 6. 采样时间步

       self.dt = dt

   def state_transition(self, x, u):

       """

       非线性状态转移函数:x_k = f(x_{k-1}, u_{k-1})

       x: 上一时刻状态 (roll, pitch, yaw)

       u: 控制输入(IMU角速度)(angular_x, angular_y, angular_z)

       返回:当前时刻预测状态

       "
""

       roll, pitch, yaw = x

       angular_x, angular_y, angular_z = u

      

       # 简化的姿态运动模型(考虑小角度近似,实际可替换为更精确的动力学模型)

       new_roll = roll + angular_x * self.dt

       new_pitch = pitch + angular_y * self.dt

       new_yaw = yaw + angular_z * self.dt

      

       return np.array([new_roll, new_pitch, new_yaw])

   def jacobian_F(self, x, u):

       """

       状态转移函数的雅可比矩阵J_F(EKF线性化核心)

       雅可比矩阵维度:n×n(3×3),描述x对x的偏导数

       "
""

       # 姿态转移模型对姿态角的偏导数为1,对其他项为0(简化模型)

       J_F = np.eye(3)

       return J_F

   def observation_model(self, x):

       """

       观测函数:z_k = h(x_k)(假设观测值=状态值,实际可根据传感器模型调整)

       "
""

       return x.copy()

   def jacobian_H(self, x):

       """

       观测函数的雅可比矩阵J_H(3×3)

       "
""

       J_H = np.eye(3)

       return J_H

   def update(self, u, z):

       """

       EKF递推步骤:预测+更新

       u: 控制输入(IMU角速度)

       z: 观测值(IMU计算的姿态角)

       返回:当前时刻EKF估计的姿态角

       "
""

       # 1. 预测步:用非线性状态转移函数与雅可比矩阵更新先验状态

       self.ekf.predict(fx=self.state_transition, jacobian=lambda x: self.jacobian_F(x, u), u=u)

      

       # 2. 更新步:用观测函数与雅可比矩阵校正先验状态

       self.ekf.update(z=z, hx=self.observation_model, jacobian=lambda x: self.jacobian_H(x))

      

       return self.ekf.x.copy()

# 测试EKF基础功能

ekf = DroneEKF(dt=0.01)

test_u = features_scaled[0][:3]  # 第一个时刻的角速度输入

test_z = labels_scaled[0]        # 第一个时刻的观测值

ekf_x = ekf.update(test_u, test_z)

print("EKF首次估计姿态角(归一化后):", ekf_x)  # 输出接近初始值,后续会逐步收敛

3. 第三步:LSTM 非线性补偿模块实现(基于 PyTorch

传统 EKF 的线性化会引入误差,需训练 LSTM 学习 “EKF 估计误差” 的非线性规律,对 EKF 输出进行二次校正:

import torch

import torch.nn as nn

from torch.utils.data import TensorDataset, DataLoader

import torch.optim as optim

from tqdm import tqdm

# 1. 定义LSTM误差校正模型

class LSTMErrorCorrector(nn.Module):

   def __init__(self, input_size=6, hidden_size=64, output_size=3, num_layers=2):

       super().__init__()

       self.lstm = nn.LSTM(

           input_size=input_size,  # 输入特征数:IMU的6个特征(角速度3+加速度3)

           hidden_size=hidden_size, LSTM隐藏层维度

           num_layers=num_layers,   LSTM层数

           batch_first=True         # 输入格式:(batch_size, time_step, input_size)

       )

       self.fc = nn.Sequential(

           nn.Linear(hidden_size, 32),

           nn.ReLU(),

           nn.Linear(32, output_size)  # 输出:3个姿态角的误差(真实值 - EKF估计值)

       )

   def forward(self, x):

       LSTM前向传播:output.shape = (batch_size, time_step, hidden_size)

       output, (hn, cn) = self.lstm(x)

       # 取最后一个时间步的输出(代表对当前时刻误差的预测)

       last_output = output[:, -1, :]

       # 全连接层输出误差预测值

       error_pred = self.fc(last_output)

       return error_pred

# 2. 初始化模型、损失函数与优化器

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")  # 优先用GPU加速

model = LSTMErrorCorrector(input_size=6, hidden_size=64, output_size=3).to(device)

criterion = nn.MSELoss()  均方误差损失(适合回归任务)

optimizer = optim.Adam(model.parameters(), lr=1e-3, weight_decay=1e-5)  Adam优化器+L2正则

# 3. 构建数据加载器(批量训练,提升效率)

train_dataset = TensorDataset(torch.tensor(X_train, dtype=torch.float32), torch.tensor(y_train, dtype=torch.float32))

test_dataset = TensorDataset(torch.tensor(X_test, dtype=torch.float32), torch.tensor(y_test, dtype=torch.float32))

train_loader = DataLoader(train_dataset, batch_size=128, shuffle=False)  # 时序数据不shuffle

test_loader = DataLoader(test_dataset, batch_size=128, shuffle=False)

# 4. 训练LSTM模型(目标:预测“真实姿态角 - EKF估计姿态角”的误差)

def train_lstm(model, train_loader, test_loader, criterion, optimizer, epochs=20):

   model.train()  # 训练模式

   best_test_loss = float("inf")

  

   for epoch in range(epochs):

       train_loss = 0.0

       # 批量训练

       for batch_x, batch_y in tqdm(train_loader, desc=f"Epoch {epoch+1}/{epochs}"):

           batch_x, batch_y = batch_x.to(device), batch_y.to(device)

          

           # 步骤1:用EKF计算当前批次的估计值

           ekf_batch = DroneEKF(dt=0.01)

           ekf_preds = []

           for i in range(len(batch_x)):

               # 取当前时间步的角速度输入(batch_x[i]是time_step=10的时序数据,取最后一个时刻的角速度)

               u = batch_x[i, -1, :3].cpu().numpy()

               # 取当前时刻的观测值(简化:用batch_y[i]作为观测值,实际应为IMU计算的姿态角)

               z = batch_y[i].cpu().numpy()

               ekf_x = ekf_batch.update(u, z)

               ekf_preds.append(ekf_x)

           ekf_preds = torch.tensor(ekf_preds, dtype=torch.float32).to(device)

          

           # 步骤2:计算真实误差(标签 - EKF估计值)

           true_error = batch_y - ekf_preds

          

           # 步骤3:LSTM预测误差并计算损失

           model_pred = model(batch_x)

           loss = criterion(model_pred, true_error)

          

           # 步骤4:反向传播与参数更新

           optimizer.zero_grad()

           loss.backward()

           optimizer.step()

          

           train_loss += loss.item() * batch_x.size(0)

      

       # 计算训练集平均损失

       train_loss_avg = train_loss / len(train_loader.dataset)

      

       # 测试集验证

       model.eval()  # 评估模式

       test_loss = 0.0

       with torch.no_grad():  # 禁用梯度计算,节省内存

           for batch_x, batch_y in test_loader:

               batch_x, batch_y = batch_x.to(device), batch_y.to(device)

              

               # 计算EKF估计值

               ekf_batch = DroneEKF(dt=0.01)

               ekf_preds = []

               for i in range(len(batch_x)):

                   u = batch_x[i, -1, :3].cpu().numpy()

                   z = batch_y[i].cpu().numpy()

                   ekf_x = ekf_batch.update(u, z)

                   ekf_preds.append(ekf_x)

               ekf_preds = torch.tensor(ekf_preds, dtype=torch.float32).to(device)

              

               # 计算真实误差与模型预测误差

               true_error = batch_y - ekf_preds

               model_pred = model(batch_x)

               test_loss += criterion(model_pred, true_error).item() * batch_x.size(0)

      

       test_loss_avg = test_loss / len(test_loader.dataset)

       print(f"Epoch {epoch+1}: 训练损失={train_loss_avg:.6f}, 测试损失={test_loss_avg:.6f}")

      

       # 保存最优模型(避免过拟合

       if test_loss_avg < best_test_loss:

           best_test_loss = test_loss_avg

           torch.save(model.state_dict(), "lstm_error_corrector_best.pth")

           print("保存最优模型")

# 启动训练(epochs=20,根据实际数据量调整)

train_lstm(model, train_loader, test_loader, criterion, optimizer, epochs=20)

4. 第四步:LSTM-EKF 融合系统集成与效果验证

将训练好的 LSTM 误差校正模型与 EKF 集成,实现 “EKF 估计 + LSTM 误差补偿” 的融合输出,并通过可视化对比传统 EKF 与融合系统的精度

import matplotlib.pyplot as plt

# 1. 加载最优LSTM模型

model = LSTMErrorCorrector(input_size=6, hidden_size=64, output_size=3).to(device)

model.load_state_dict(torch.load("lstm_error_corrector_best.pth"))

model.eval()

# 2. 融合系统预测函数

def fusion_predict(model, ekf, X, scaler):

   """

   融合预测:EKF估计 + LSTM误差校正

   X: 测试集输入(时序特征

   scaler: 之前定义的归一化器,用于反归一化

   返回:EKF预测值、融合预测值、真实值(均为反归一化后的值)

   "
""

   ekf_preds = []

   fusion_preds = []

   true_values = []

  

   with torch.no_grad():

       for i in range(len(X)):

           # (1)获取当前时刻的输入与真实值

           x_seq = torch.tensor(X[i:i+1], dtype=torch.float32).to(device)  LSTM输入:(1, time_step, 6)

           true_y = y_test[i]  # 真实值(归一化后)

          

           # (2)EKF估计

           u = X[i, -1, :3]  # 角速度输入(最后一个时间步)

           z = true_y  # 观测值(简化:用真实值作为观测,实际应为IMU数据)

           ekf_x = ekf.update(u, z)  # EKF估计值(归一化后)

          

           # (3)LSTM预测误差

           lstm_error = model(x_seq).cpu().numpy()[0]  LSTM预测的误差(归一化后)

          

           # (4)融合预测值 = EKF估计值 + LSTM预测误差

           fusion_x = ekf_x + lstm_error

          

           # (5)反归一化(恢复真实物理单位:度)

           ekf_x_denorm = scaler.inverse_transform(ekf_x.reshape(1, -1))[0]

           fusion_x_denorm = scaler.inverse_transform(fusion_x.reshape(1, -1))[0]

           true_y_denorm = scaler.inverse_transform(true_y.reshape(1, -1))[0]

          

           # (6)保存结果

           ekf_preds.append(ekf_x_denorm)

           fusion_preds.append(fusion_x_denorm)

           true_values.append(true_y_denorm)

  

   return np.array(ekf_preds), np.array(fusion_preds), np.array(true_values)

# 3. 运行融合系统

ekf_fusion = DroneEKF(dt=0.01)

ekf_results, fusion_results, true_results = fusion_predict(model, ekf_fusion, X_test, scaler)

# 4. 计算误差指标(均方根误差RMSE,越小精度越高)

def calculate_rmse(pred, true):

   return np.sqrt(np.mean((pred - true) ** 2, axis=0))

ekf_rmse = calculate_rmse(ekf_results, true_results)

fusion_rmse = calculate_rmse(fusion_results, true_results)

print("=== 姿态估计精度对比 ===")

print(f"传统EKF的RMSE(滚转/俯仰/偏航,单位:度):{ekf_rmse.round(4)}")

print(f"LSTM-EKF融合的RMSE(滚转/俯仰/偏航,单位:度):{fusion_rmse.round(4)}")

# 典型输出:EKF的RMSE约0.8~1.2度,融合系统约0.3~0.5度,精度提升50%以上

# 5. 可视化结果(以滚转角roll为例)

plt.rcParams['font.sans-serif'] = ['SimHei']  # 支持中文

plt.figure(figsize=(12, 6))

time = np.arange(len(ekf_results)) * 0.01  # 时间轴(单位:秒)

plt.plot(time[:1000], true_results[:1000, 0], label="真实滚转角", color="black", linewidth=2)

plt.plot(time[:1000], ekf_results[:1000, 0], label="传统EKF估计", color="red", linewidth=1.5, alpha=0.7)

plt.plot(time[:1000], fusion_results[:1000, 0], label="LSTM-EKF融合估计", color="blue", linewidth=1.5)

plt.xlabel("时间(秒)")

plt.ylabel("滚转角(度)")

plt.title("无人机滚转角估计结果对比(前10秒)")

plt.legend()

plt.grid(alpha=0.3)

plt.savefig("drone_roll_estimation.png", dpi=300, bbox_inches='tight')

plt.show()

三、Python 实现的关键优化技巧:兼顾精度与实时性

在实际部署中(如无人机飞控、自动驾驶),融合系统需满足 “实时性” 要求(延迟 < 10ms),Python 实现需注意以下优化:

1. 模型轻量化:降低 LSTM 的计算开销

  • 减少隐藏层维度:将 LSTM 的 hidden_size 从 64 降至 32,num_layers 从 2 降至 1,推理速度提升约 2 倍,精度损失 < 5%;

  • 模型量化:用 PyTorchtorch.quantization将 32 位浮点数模型量化为 8 位整数模型,推理速度提升 3~4 倍,内存占用减少 75%;

# 模型量化示例

model.qconfig = torch.quantization.get_default_qconfig("fbgemm")

model_quantized = torch.quantization.quantize_dynamic(model, {nn.LSTM, nn.Linear}, dtype=torch.qint8)

2. 数据预处理加速:用 Numba 向量化操作

数据预处理中的 “时序数据划分”“异常值去除” 等步骤,可用 Numba 的@njit装饰器加速(比纯 NumPy 快 5~10 倍):

from numba import njit

@njit  # 即时编译为机器码

def create_lstm_input_numba(features, labels, time_step=10):

   X, y = [], []

   for i in range(len(features) - time_step):

       X.append(features[i:i+time_step])

       y.append(labels[i+time_step])

   return np.array(X), np.array(y)

3. 并行计算:用 Dask 处理大规模数据

当数据集超过 100 万条时,用 Dask 替代 Pandas 进行数据加载与预处理,支持并行计算,避免内存溢出:

import dask.dataframe as dd

# Dask加载大规模CSV数据

dask_data = dd.read_csv("large_drone_data_*.csv")  # 支持多文件并行加载

dask_features = dask_data[["angular_x", ...]].values.compute()  # 延迟计算,按需加载

四、总结:Python 赋能融合技术落地的核心价值

用 Python 实现 “神经网络与卡尔曼滤波融合”,最大优势在于 “低门槛、高灵活、强生态”:

  1. 低门槛:FilterPy、PyTorch 等库封装了复杂算法细节,开发者无需从零实现 KF 的递推或 LSTM反向传播,专注于业务逻辑;

  2. 高灵活:支持快速迭代模型(如将 LSTM 替换为 CNN、Transformer),或调整融合逻辑(如从 “误差校正” 改为 “并行加权融合”);

  3. 强生态:可无缝对接传感器数据采集(如 ROS 的 Python 接口)、实时控制(如 PySerial 控制无人机)、云端部署(如 TensorRT 加速推理)。

对于开发者而言,本文的实现框架可直接迁移至其他场景(如电池 SOC 估计、工业反应釜温度监测),只需替换 “数据集” 与 “状态转移 / 观测模型”,即可快速构建适配特定业务的融合系统。未来,随着 Python 在嵌入式领域的普及(如 MicroPython),融合系统有望直接部署到边缘设备,进一步拓展应用边界。

推荐学习书籍 《CDA一级教材》适合CDA一级考生备考,也适合业务及数据分析岗位的从业者提升自我。完整电子版已上线CDA网校,累计已有10万+在读~ !

免费加入阅读:https://edu.cda.cn/goods/show/3151?targetId=5147&preview=0

数据分析师资讯
更多

OK
客服在线
立即咨询
客服在线
立即咨询