This commit is contained in:
mio 2026-01-21 11:05:41 +08:00
parent b62ee8223e
commit 68cc018828
9 changed files with 250152 additions and 169 deletions

View File

@ -3,7 +3,6 @@ import os
import numpy as np
import time
# --- 自动路径修正 ---
current_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.abspath(os.path.join(current_dir, "../../"))
if project_root not in sys.path:
@ -11,79 +10,67 @@ if project_root not in sys.path:
from soft_arm_sim.model.pcc_kinematics import SoftArmKinematics
def generate_dataset(num_samples=50000, save_path=None):
"""
生成 PCC 机械臂正运动学数据并保存为 CSV
"""
print(f"🚀 开始生成数据集,目标数量: {num_samples} ...")
def generate_dataset(num_samples=150000, save_path=None): # 增加到 15万
print(f"🚀 开始生成数据集 (平滑约束版),目标: {num_samples} ...")
start_time = time.time()
# 1. 初始化运动学模型
kinematics = SoftArmKinematics(
num_sections=3,
section_length=0.24,
disks_per_section=3,
disk_radius=0.033
)
kinematics = SoftArmKinematics(3, 0.24, 3, 0.033)
# 2. 采样范围
theta_min, theta_max = 0.0, np.pi / 2.5
phi_min, phi_max = -np.pi, np.pi
# 1. 生成协同的姿态
# 我们不再让三段完全独立,而是让它们倾向于向同一个方向弯曲
# 这模拟了“能量最小”的自然状态
# 3. 随机生成 Joint Space (Theta, Phi)
# Shape: (N, 6) -> [theta1, phi1, theta2, phi2, theta3, phi3]
q_data = np.zeros((num_samples, 6))
for i in range(3):
q_data[:, i*2] = np.random.uniform(theta_min, theta_max, num_samples)
q_data[:, i*2+1] = np.random.uniform(phi_min, phi_max, num_samples)
y_data = np.zeros((num_samples, 9)) # [t1, c1, s1, t2, c2, s2, t3, c3, s3]
# 4. 计算 Task Space (x, y, z)
# Shape: (N, 3)
# --- 策略:全局趋势 + 局部微调 ---
# 生成一个“主弯曲角”和“主方向”
main_theta = np.random.uniform(0.0, np.pi/2.2, num_samples)
main_phi = np.random.uniform(-np.pi, np.pi, num_samples)
for i in range(3): # 3个段
# 每一段的弯曲程度在主弯曲角基础上浮动
# 这样的数据生成的机器人是平滑的 C 形或轻微 S 形,而不是剧烈的乱麻形
# 这种“平滑先验”让逆运动学变得有唯一解
noise_theta = np.random.normal(0, 0.1, num_samples) # 小噪声
noise_phi = np.random.normal(0, 0.2, num_samples) # 小噪声
thetas = np.clip(main_theta + noise_theta, 0, np.pi/2)
phis = main_phi + noise_phi # 允许 phi 稍微偏转
# 填充
y_data[:, i*3 + 0] = thetas
y_data[:, i*3 + 1] = np.cos(phis)
y_data[:, i*3 + 2] = np.sin(phis)
# 暂存用于 FK
if i == 0: t1, p1 = thetas, phis
if i == 1: t2, p2 = thetas, phis
if i == 2: t3, p3 = thetas, phis
# 2. 计算 FK
pos_data = np.zeros((num_samples, 3))
print("🔄 正在计算正运动学 (FK)...")
print("🔄 正在计算正运动学...")
for i in range(num_samples):
# 构造输入
raw_q = q_data[i]
q_input = [
(raw_q[0], raw_q[1], 0.24),
(raw_q[2], raw_q[3], 0.24),
(raw_q[4], raw_q[5], 0.24)
(t1[i], p1[i], 0.24),
(t2[i], p2[i], 0.24),
(t3[i], p3[i], 0.24)
]
# 正运动学解算
transforms, _ = kinematics.forward(q_input)
# 取末端位置
pos_data[i] = transforms[-1][:3, 3]
if (i + 1) % 10000 == 0:
print(f" 进度: {i + 1}/{num_samples}")
if (i+1) % 10000 == 0: print(f" 进度: {i+1}/{num_samples}")
# 5. 保存为 CSV
# 3. 保存
if save_path is None:
save_path = os.path.join(current_dir, "pcc_dataset_3sec.csv")
save_path = os.path.join(current_dir, "pcc_dataset_smooth.csv") # 改名区分
print(f"💾 正在保存 CSV 文件...")
combined_data = np.hstack((pos_data, y_data))
header = "x,y,z,t1,c1,s1,t2,c2,s2,t3,c3,s3"
np.savetxt(save_path, combined_data, delimiter=',', header=header, comments='', fmt='%.6f')
# 将 位置(3列) 和 角度(6列) 拼接到一起 -> 总共9列
# 列顺序: x, y, z, theta1, phi1, theta2, phi2, theta3, phi3
combined_data = np.hstack((pos_data, q_data))
# 定义表头
header_str = "x,y,z,theta1,phi1,theta2,phi2,theta3,phi3"
# 保存
np.savetxt(save_path, combined_data, delimiter=',', header=header_str, comments='', fmt='%.6f')
duration = time.time() - start_time
print(f"✅ 完成!")
print(f" 耗时: {duration:.2f}")
print(f" 保存路径: {save_path}")
def main():
generate_dataset(num_samples=50000)
print(f"✅ 完成!耗时: {time.time()-start_time:.2f}s")
if __name__ == "__main__":
main()
generate_dataset()

View File

@ -25,63 +25,80 @@ class PinnController(Node):
def __init__(self):
super().__init__('pinn_controller')
# 1. 加载模型和统计数据
# 1. 加载模型
self.device = torch.device("cpu")
self.model = PINN_IK().to(self.device)
self.stats = None # 用于存储归一化参数
# 注意:这里 output_dim 必须是 9 (三角编码版)
self.model = PINN_IK(output_dim=9).to(self.device)
self.stats = None
model_path = os.path.join(current_dir, "best_model_v2.pth")
# 确保这里的文件名和你 train.py 最后保存的文件名一致
# 上一次如果是 best_model_smooth.pth 就用这个
model_path = os.path.join(current_dir, "best_model_smooth.pth")
if os.path.exists(model_path):
checkpoint = torch.load(model_path, map_location=self.device)
# =======================================================
# 🔧 修复点:添加 weights_only=False
# =======================================================
checkpoint = torch.load(model_path, map_location=self.device, weights_only=False)
# 加载权重
self.model.load_state_dict(checkpoint['model_state'])
# 加载归一化参数
self.stats = checkpoint['stats']
self.model.eval()
self.get_logger().info(f"✅ 模型 V2 (带归一化) 加载成功")
self.get_logger().info(f"✅ 模型加载成功: {model_path}")
else:
self.get_logger().error(f"❌ 找不到模型: {model_path},请运行新的 train.py")
self.get_logger().error(f"❌ 找不到模型: {model_path}")
self.fk_solver = SoftArmKinematics(
num_sections=3, section_length=0.24, disks_per_section=3, disk_radius=0.033
)
# 验证用的 FK 工具
self.fk_solver = SoftArmKinematics(3, 0.24, 3, 0.033)
# 2. 通信接口
self.cmd_pub = self.create_publisher(Float64MultiArray, 'soft_arm/command', 10)
self.traj_pub = self.create_publisher(Marker, 'planned_trajectory', 10)
self.target_marker_pub = self.create_publisher(Marker, 'target_marker', 10)
self.create_subscription(PoseStamped, '/goal_pose', self.goal_callback, 10)
self.current_pos = np.array([0.0, 0.0, 0.72])
self.get_logger().info("等待目标指令...")
self.get_logger().info("等待目标指令... (请在 Rviz 中点击)")
def decode_output(self, raw_output):
"""
解码: [t1, cos1, sin1, ...] -> [t1, phi1, ...]
"""
# 反归一化
y_mean = self.stats['y_mean']
y_std = self.stats['y_std']
real_vals = raw_output * y_std + y_mean
final_angles = []
for i in range(3):
theta = real_vals[i*3 + 0]
cos_v = real_vals[i*3 + 1]
sin_v = real_vals[i*3 + 2]
# 三角解码 phi = atan2(sin, cos)
phi = np.arctan2(sin_v, cos_v)
final_angles.append(theta)
final_angles.append(phi)
return np.array(final_angles)
def goal_callback(self, msg):
target_pos = np.array([msg.pose.position.x, msg.pose.position.y, 0.5]) # 强制Z=0.5测试
# 强制 Z=0.5 进行测试,防止点到地面
target_pos = np.array([msg.pose.position.x, msg.pose.position.y, 0.5])
dist = np.linalg.norm(target_pos)
if dist > 0.72:
self.get_logger().warn(f"⚠️ 目标点太远 ({dist:.2f}m),可能无法到达")
if dist > 0.75:
self.get_logger().warn(f"目标太远 ({dist:.2f}m),忽略")
return
self.get_logger().info(f"收到目标: {target_pos},开始规划...")
self.get_logger().info(f"目标: {target_pos},开始规划")
self.visualize_target(target_pos)
threading.Thread(target=self.execute_move, args=(target_pos,)).start()
def preprocess_input(self, pos):
""" 将真实位置 (x,y,z) 归一化 """
if self.stats is None: return pos
X_mean = self.stats['X_mean']
X_std = self.stats['X_std']
return (pos - X_mean) / X_std
def postprocess_output(self, norm_angles):
""" 将归一化角度还原为真实角度 """
if self.stats is None: return norm_angles
y_mean = self.stats['y_mean']
y_std = self.stats['y_std']
return norm_angles * y_std + y_mean
def execute_move(self, target_pos):
# 插值规划
steps = 50
trajectory = []
for t in np.linspace(0, 1, steps):
@ -89,29 +106,33 @@ class PinnController(Node):
trajectory.append(pt)
self.visualize_trajectory(trajectory)
final_angles = None
final_angles_decoded = None
for pt in trajectory:
with torch.no_grad():
# 1. 预处理 (归一化)
norm_input = self.preprocess_input(pt)
# 预处理输入 (归一化)
X_mean = self.stats['X_mean']; X_std = self.stats['X_std']
norm_input = (pt - X_mean) / X_std
tensor_input = torch.FloatTensor(norm_input).to(self.device)
# 2. 推理
# 推理
norm_output = self.model(tensor_input).numpy()
# 3. 后处理 (反归一化)
real_angles = self.postprocess_output(norm_output)
final_angles = real_angles
# 解码输出
real_angles = self.decode_output(norm_output)
final_angles_decoded = real_angles
# 发送指令
msg = Float64MultiArray()
msg.data = real_angles.tolist()
self.cmd_pub.publish(msg)
time.sleep(0.04)
time.sleep(0.04) # 控制速度
self.current_pos = target_pos
self.get_logger().info("运动完成")
if final_angles is not None:
self.check_accuracy(target_pos, final_angles)
# 验证精度
if final_angles_decoded is not None:
self.check_accuracy(target_pos, final_angles_decoded)
def check_accuracy(self, target_pos, angles):
q_input = []
@ -121,13 +142,8 @@ class PinnController(Node):
actual_pos = transforms[-1][:3, 3]
error = np.linalg.norm(target_pos - actual_pos)
print("\n" + "="*30)
print(f"🎯 目标位置: {target_pos}")
print(f"🤖 实际位置: {actual_pos}")
print(f"❌ 误差: {error:.4f} m")
print("="*30 + "\n")
print(f"\n>>> 精度验证 <<<\n目标: {target_pos}\n实际: {actual_pos}\n误差: {error:.4f} m (Loss较低时此误差应很小)\n")
# ... visualize 函数保持不变 ...
def visualize_target(self, pos):
marker = Marker()
marker.header.frame_id = "base_link"

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -2,19 +2,19 @@ import torch
import torch.nn as nn
class PINN_IK(nn.Module):
def __init__(self, input_dim=3, output_dim=6):
def __init__(self, input_dim=3, output_dim=9): # 输出改为 9
super(PINN_IK, self).__init__()
# 升级为更深、更宽的网络
# 增加网络深度和宽度,增强拟合能力
self.net = nn.Sequential(
nn.Linear(input_dim, 128),
nn.Tanh(),
nn.Linear(128, 256),
nn.Tanh(),
nn.Linear(256, 256),
nn.Tanh(),
nn.Linear(256, 128),
nn.Tanh(),
nn.Linear(128, output_dim)
nn.Linear(input_dim, 256),
nn.LeakyReLU(0.1), # LeakyReLU 防止梯度消失
nn.Linear(256, 512),
nn.LeakyReLU(0.1),
nn.Linear(512, 512),
nn.LeakyReLU(0.1),
nn.Linear(512, 256),
nn.LeakyReLU(0.1),
nn.Linear(256, output_dim)
)
def forward(self, x):

View File

@ -7,72 +7,55 @@ import os
import sys
from torch.utils.data import DataLoader, TensorDataset
# --- 路径修复 ---
current_dir = os.path.dirname(os.path.abspath(__file__))
if current_dir not in sys.path:
sys.path.append(current_dir)
from pinn_model import PINN_IK
def train_model():
# 1. 路径处理
csv_path = os.path.join(current_dir, "pcc_dataset_3sec.csv")
model_save_path = os.path.join(current_dir, "best_model_v2.pth")
csv_path = os.path.join(current_dir, "pcc_dataset_smooth.csv") # 新文件名
model_save_path = os.path.join(current_dir, "best_model_smooth.pth")
if not os.path.exists(csv_path):
print("错误:找不到数据集")
print("请先运行 dataset_generator.py (新版)")
return
# 2. 加载数据
print("正在加载数据...")
print("加载数据中...")
df = pd.read_csv(csv_path)
# 分离输入输出
X_raw = df.iloc[:, 0:3].values.astype(np.float32) # x, y, z
y_raw = df.iloc[:, 3:].values.astype(np.float32) # angles
# 前3列是输入 (x,y,z)
X_raw = df.iloc[:, 0:3].values.astype(np.float32)
# 后9列是标签 (t1, c1, s1, ...)
y_raw = df.iloc[:, 3:].values.astype(np.float32)
# --- 关键步骤:数据标准化 (Standardization) ---
# 计算均值和标准差
X_mean = X_raw.mean(axis=0)
X_std = X_raw.std(axis=0) + 1e-6 # 加一点点防止除以0
# --- 归一化 ---
# 标签里的 cos/sin 本身就在 -1到1 之间,其实不需要强行归一化
# 但 Theta 需要。为了简单,我们还是整体归一化,推理时反归一化即可。
X_mean = X_raw.mean(axis=0); X_std = X_raw.std(axis=0) + 1e-6
y_mean = y_raw.mean(axis=0); y_std = y_raw.std(axis=0) + 1e-6
y_mean = y_raw.mean(axis=0)
y_std = y_raw.std(axis=0) + 1e-6
# 归一化公式: (x - mean) / std
X_norm = (X_raw - X_mean) / X_std
y_norm = (y_raw - y_mean) / y_std
print("数据归一化完成。")
print(f"输入 Mean: {X_mean}, Std: {X_std}")
# 3. 准备 Tensor
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Training on: {device}")
print(f"Device: {device}")
dataset = TensorDataset(torch.from_numpy(X_norm), torch.from_numpy(y_norm))
# 增加 Batch Size 提高稳定性
loader = DataLoader(dataset, batch_size=256, shuffle=True)
loader = DataLoader(dataset, batch_size=512, shuffle=True) # 大 Batch Size
# 4. 初始化模型
model = PINN_IK().to(device)
model = PINN_IK(output_dim=9).to(device)
criterion = nn.MSELoss()
# 使用 AdamW 优化器,效果通常更好
optimizer = optim.AdamW(model.parameters(), lr=0.001)
# 学习率衰减策略每100个epoch学习率乘0.5
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=100, gamma=0.5)
optimizer = optim.AdamW(model.parameters(), lr=0.0005) # 稍微降低初始 LR
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=10, factor=0.5)
# 5. 训练循环 (增加 Epoch 到 300)
epochs = 300
print(f"开始训练 {epochs} 个 Epoch...")
epochs = 200
print("开始训练...")
for epoch in range(epochs):
total_loss = 0
model.train()
for batch_x, batch_y in loader:
batch_x, batch_y = batch_x.to(device), batch_y.to(device)
optimizer.zero_grad()
outputs = model(batch_x)
loss = criterion(outputs, batch_y)
@ -80,24 +63,19 @@ def train_model():
optimizer.step()
total_loss += loss.item()
scheduler.step()
avg_loss = total_loss / len(loader)
scheduler.step(avg_loss) # 根据 Loss 调整 LR
if (epoch+1) % 10 == 0:
avg_loss = total_loss / len(loader)
current_lr = optimizer.param_groups[0]['lr']
print(f"Epoch [{epoch+1}/{epochs}] | Loss: {avg_loss:.6f} | LR: {current_lr:.6f}")
lr = optimizer.param_groups[0]['lr']
print(f"Epoch [{epoch+1}/{epochs}] | Loss: {avg_loss:.6f} | LR: {lr:.6f}")
# 6. 保存模型 + 归一化参数
# 我们保存一个字典,包含模型权重和数据的统计信息
save_dict = {
'model_state': model.state_dict(),
'stats': {
'X_mean': X_mean, 'X_std': X_std,
'y_mean': y_mean, 'y_std': y_std
}
'stats': {'X_mean': X_mean, 'X_std': X_std, 'y_mean': y_mean, 'y_std': y_std}
}
torch.save(save_dict, model_save_path)
print(f"✅ 模型及元数据已保存至: {model_save_path}")
print("✅ 训练完成")
if __name__ == "__main__":
train_model()