This commit is contained in:
mio 2026-01-21 10:42:40 +08:00
parent c44bbd9270
commit b62ee8223e
10 changed files with 50381 additions and 0 deletions

View File

@ -39,6 +39,15 @@ setup(
# 测试控制器 (对应 control/test_controller.py 中的 main 函数) # 测试控制器 (对应 control/test_controller.py 中的 main 函数)
'sine_controller = soft_arm_sim.control.test_controller:main', 'sine_controller = soft_arm_sim.control.test_controller:main',
# 数据生成
'generate_data = soft_arm_sim.deeplearning.dataset_generator:main',
# 训练脚本 (可选,一般直接用 python 跑也行)
'train_model = soft_arm_sim.deeplearning.train:train_model',
# 推理控制节点
'pinn_controller = soft_arm_sim.deeplearning.inference_node:main',
], ],
}, },
) )

Binary file not shown.

View File

@ -0,0 +1,89 @@
import sys
import os
import numpy as np
import time
# --- 自动路径修正 ---
current_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.abspath(os.path.join(current_dir, "../../"))
if project_root not in sys.path:
sys.path.append(project_root)
from soft_arm_sim.model.pcc_kinematics import SoftArmKinematics
def generate_dataset(num_samples=50000, save_path=None):
"""
生成 PCC 机械臂正运动学数据并保存为 CSV
"""
print(f"🚀 开始生成数据集,目标数量: {num_samples} ...")
start_time = time.time()
# 1. 初始化运动学模型
kinematics = SoftArmKinematics(
num_sections=3,
section_length=0.24,
disks_per_section=3,
disk_radius=0.033
)
# 2. 采样范围
theta_min, theta_max = 0.0, np.pi / 2.5
phi_min, phi_max = -np.pi, np.pi
# 3. 随机生成 Joint Space (Theta, Phi)
# Shape: (N, 6) -> [theta1, phi1, theta2, phi2, theta3, phi3]
q_data = np.zeros((num_samples, 6))
for i in range(3):
q_data[:, i*2] = np.random.uniform(theta_min, theta_max, num_samples)
q_data[:, i*2+1] = np.random.uniform(phi_min, phi_max, num_samples)
# 4. 计算 Task Space (x, y, z)
# Shape: (N, 3)
pos_data = np.zeros((num_samples, 3))
print("🔄 正在计算正运动学 (FK)...")
for i in range(num_samples):
# 构造输入
raw_q = q_data[i]
q_input = [
(raw_q[0], raw_q[1], 0.24),
(raw_q[2], raw_q[3], 0.24),
(raw_q[4], raw_q[5], 0.24)
]
# 正运动学解算
transforms, _ = kinematics.forward(q_input)
# 取末端位置
pos_data[i] = transforms[-1][:3, 3]
if (i + 1) % 10000 == 0:
print(f" 进度: {i + 1}/{num_samples}")
# 5. 保存为 CSV
if save_path is None:
save_path = os.path.join(current_dir, "pcc_dataset_3sec.csv")
print(f"💾 正在保存 CSV 文件...")
# 将 位置(3列) 和 角度(6列) 拼接到一起 -> 总共9列
# 列顺序: x, y, z, theta1, phi1, theta2, phi2, theta3, phi3
combined_data = np.hstack((pos_data, q_data))
# 定义表头
header_str = "x,y,z,theta1,phi1,theta2,phi2,theta3,phi3"
# 保存
np.savetxt(save_path, combined_data, delimiter=',', header=header_str, comments='', fmt='%.6f')
duration = time.time() - start_time
print(f"✅ 完成!")
print(f" 耗时: {duration:.2f}")
print(f" 保存路径: {save_path}")
def main():
generate_dataset(num_samples=50000)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,158 @@
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import PoseStamped, Point
from std_msgs.msg import Float64MultiArray
from visualization_msgs.msg import Marker
import torch
import numpy as np
import time
import threading
import os
import sys
# --- 路径修复 ---
current_dir = os.path.dirname(os.path.abspath(__file__))
if current_dir not in sys.path:
sys.path.append(current_dir)
project_root = os.path.abspath(os.path.join(current_dir, "../../"))
if project_root not in sys.path:
sys.path.append(project_root)
from pinn_model import PINN_IK
from soft_arm_sim.model.pcc_kinematics import SoftArmKinematics
class PinnController(Node):
def __init__(self):
super().__init__('pinn_controller')
# 1. 加载模型和统计数据
self.device = torch.device("cpu")
self.model = PINN_IK().to(self.device)
self.stats = None # 用于存储归一化参数
model_path = os.path.join(current_dir, "best_model_v2.pth")
if os.path.exists(model_path):
checkpoint = torch.load(model_path, map_location=self.device)
# 加载权重
self.model.load_state_dict(checkpoint['model_state'])
# 加载归一化参数
self.stats = checkpoint['stats']
self.model.eval()
self.get_logger().info(f"✅ 模型 V2 (带归一化) 加载成功")
else:
self.get_logger().error(f"❌ 找不到模型: {model_path},请运行新的 train.py")
self.fk_solver = SoftArmKinematics(
num_sections=3, section_length=0.24, disks_per_section=3, disk_radius=0.033
)
self.cmd_pub = self.create_publisher(Float64MultiArray, 'soft_arm/command', 10)
self.traj_pub = self.create_publisher(Marker, 'planned_trajectory', 10)
self.target_marker_pub = self.create_publisher(Marker, 'target_marker', 10)
self.create_subscription(PoseStamped, '/goal_pose', self.goal_callback, 10)
self.current_pos = np.array([0.0, 0.0, 0.72])
self.get_logger().info("等待目标指令...")
def goal_callback(self, msg):
target_pos = np.array([msg.pose.position.x, msg.pose.position.y, 0.5]) # 强制Z=0.5测试
dist = np.linalg.norm(target_pos)
if dist > 0.72:
self.get_logger().warn(f"⚠️ 目标点太远 ({dist:.2f}m),可能无法到达")
self.get_logger().info(f"收到目标: {target_pos},开始规划...")
self.visualize_target(target_pos)
threading.Thread(target=self.execute_move, args=(target_pos,)).start()
def preprocess_input(self, pos):
""" 将真实位置 (x,y,z) 归一化 """
if self.stats is None: return pos
X_mean = self.stats['X_mean']
X_std = self.stats['X_std']
return (pos - X_mean) / X_std
def postprocess_output(self, norm_angles):
""" 将归一化角度还原为真实角度 """
if self.stats is None: return norm_angles
y_mean = self.stats['y_mean']
y_std = self.stats['y_std']
return norm_angles * y_std + y_mean
def execute_move(self, target_pos):
steps = 50
trajectory = []
for t in np.linspace(0, 1, steps):
pt = self.current_pos + t * (target_pos - self.current_pos)
trajectory.append(pt)
self.visualize_trajectory(trajectory)
final_angles = None
for pt in trajectory:
with torch.no_grad():
# 1. 预处理 (归一化)
norm_input = self.preprocess_input(pt)
tensor_input = torch.FloatTensor(norm_input).to(self.device)
# 2. 推理
norm_output = self.model(tensor_input).numpy()
# 3. 后处理 (反归一化)
real_angles = self.postprocess_output(norm_output)
final_angles = real_angles
msg = Float64MultiArray()
msg.data = real_angles.tolist()
self.cmd_pub.publish(msg)
time.sleep(0.04)
self.current_pos = target_pos
self.get_logger().info("运动完成")
if final_angles is not None:
self.check_accuracy(target_pos, final_angles)
def check_accuracy(self, target_pos, angles):
q_input = []
for i in range(3):
q_input.append((angles[i*2], angles[i*2+1], 0.24))
transforms, _ = self.fk_solver.forward(q_input)
actual_pos = transforms[-1][:3, 3]
error = np.linalg.norm(target_pos - actual_pos)
print("\n" + "="*30)
print(f"🎯 目标位置: {target_pos}")
print(f"🤖 实际位置: {actual_pos}")
print(f"❌ 误差: {error:.4f} m")
print("="*30 + "\n")
# ... visualize 函数保持不变 ...
def visualize_target(self, pos):
marker = Marker()
marker.header.frame_id = "base_link"
marker.type = Marker.SPHERE; marker.action = Marker.ADD
marker.scale.x = 0.05; marker.scale.y = 0.05; marker.scale.z = 0.05
marker.color.a = 1.0; marker.color.r = 1.0; marker.color.g = 0.0; marker.color.b = 0.0
marker.pose.position.x = float(pos[0]); marker.pose.position.y = float(pos[1]); marker.pose.position.z = float(pos[2])
self.target_marker_pub.publish(marker)
def visualize_trajectory(self, points):
marker = Marker()
marker.header.frame_id = "base_link"
marker.type = Marker.SPHERE_LIST; marker.action = Marker.ADD
marker.scale.x = 0.01; marker.scale.y = 0.01; marker.scale.z = 0.01
marker.color.a = 1.0; marker.color.r = 1.0; marker.color.g = 1.0
for p in points:
pt = Point(); pt.x, pt.y, pt.z = float(p[0]), float(p[1]), float(p[2]); marker.points.append(pt)
self.traj_pub.publish(marker)
def main(args=None):
rclpy.init(args=args)
node = PinnController()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,21 @@
import torch
import torch.nn as nn
class PINN_IK(nn.Module):
def __init__(self, input_dim=3, output_dim=6):
super(PINN_IK, self).__init__()
# 升级为更深、更宽的网络
self.net = nn.Sequential(
nn.Linear(input_dim, 128),
nn.Tanh(),
nn.Linear(128, 256),
nn.Tanh(),
nn.Linear(256, 256),
nn.Tanh(),
nn.Linear(256, 128),
nn.Tanh(),
nn.Linear(128, output_dim)
)
def forward(self, x):
return self.net(x)

View File

@ -0,0 +1,103 @@
import torch
import torch.nn as nn
import torch.optim as optim
import pandas as pd
import numpy as np
import os
import sys
from torch.utils.data import DataLoader, TensorDataset
# --- 路径修复 ---
current_dir = os.path.dirname(os.path.abspath(__file__))
if current_dir not in sys.path:
sys.path.append(current_dir)
from pinn_model import PINN_IK
def train_model():
# 1. 路径处理
csv_path = os.path.join(current_dir, "pcc_dataset_3sec.csv")
model_save_path = os.path.join(current_dir, "best_model_v2.pth")
if not os.path.exists(csv_path):
print("❌ 错误:找不到数据集")
return
# 2. 加载数据
print("正在加载数据...")
df = pd.read_csv(csv_path)
# 分离输入输出
X_raw = df.iloc[:, 0:3].values.astype(np.float32) # x, y, z
y_raw = df.iloc[:, 3:].values.astype(np.float32) # angles
# --- 关键步骤:数据标准化 (Standardization) ---
# 计算均值和标准差
X_mean = X_raw.mean(axis=0)
X_std = X_raw.std(axis=0) + 1e-6 # 加一点点防止除以0
y_mean = y_raw.mean(axis=0)
y_std = y_raw.std(axis=0) + 1e-6
# 归一化公式: (x - mean) / std
X_norm = (X_raw - X_mean) / X_std
y_norm = (y_raw - y_mean) / y_std
print("数据归一化完成。")
print(f"输入 Mean: {X_mean}, Std: {X_std}")
# 3. 准备 Tensor
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Training on: {device}")
dataset = TensorDataset(torch.from_numpy(X_norm), torch.from_numpy(y_norm))
# 增加 Batch Size 提高稳定性
loader = DataLoader(dataset, batch_size=256, shuffle=True)
# 4. 初始化模型
model = PINN_IK().to(device)
criterion = nn.MSELoss()
# 使用 AdamW 优化器,效果通常更好
optimizer = optim.AdamW(model.parameters(), lr=0.001)
# 学习率衰减策略每100个epoch学习率乘0.5
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=100, gamma=0.5)
# 5. 训练循环 (增加 Epoch 到 300)
epochs = 300
print(f"开始训练 {epochs} 个 Epoch...")
for epoch in range(epochs):
total_loss = 0
model.train()
for batch_x, batch_y in loader:
batch_x, batch_y = batch_x.to(device), batch_y.to(device)
optimizer.zero_grad()
outputs = model(batch_x)
loss = criterion(outputs, batch_y)
loss.backward()
optimizer.step()
total_loss += loss.item()
scheduler.step()
if (epoch+1) % 10 == 0:
avg_loss = total_loss / len(loader)
current_lr = optimizer.param_groups[0]['lr']
print(f"Epoch [{epoch+1}/{epochs}] | Loss: {avg_loss:.6f} | LR: {current_lr:.6f}")
# 6. 保存模型 + 归一化参数
# 我们保存一个字典,包含模型权重和数据的统计信息
save_dict = {
'model_state': model.state_dict(),
'stats': {
'X_mean': X_mean, 'X_std': X_std,
'y_mean': y_mean, 'y_std': y_std
}
}
torch.save(save_dict, model_save_path)
print(f"✅ 模型及元数据已保存至: {model_save_path}")
if __name__ == "__main__":
train_model()