|
@@ -64,56 +64,62 @@ class MoveIt_Control:
|
|
|
self.go_home()
|
|
|
self.home_pose=self.get_now_pose()
|
|
|
|
|
|
- # 关节规划,输入6个关节角度(单位:弧度)
|
|
|
- def move_j(self, joint_configuration=None,a=1,v=1):
|
|
|
- # 设置机械臂的目标位置,使用六轴的位置数据进行描述(单位:弧度)
|
|
|
- if joint_configuration==None:
|
|
|
- joint_configuration = [0, 0, 0, 0, 0, 0]
|
|
|
- self.arm.set_max_acceleration_scaling_factor(a)
|
|
|
- self.arm.set_max_velocity_scaling_factor(v)
|
|
|
- self.arm.set_joint_value_target(joint_configuration)
|
|
|
- rospy.loginfo("move_j:"+str(joint_configuration))
|
|
|
- self.arm.go()
|
|
|
- #rospy.sleep(1)
|
|
|
-
|
|
|
|
|
|
- def plan_cartesian_path_LLL(self,point,points,trajectory,trajectory_with_type,a=1,v=1):
|
|
|
+ def plan_cartesian_path_LLL(self,point_s,point_e,waypoints,trajectory,trajectory_with_type,a=1,v=1):
|
|
|
fraction = 0.0 # 路径规划覆盖率
|
|
|
- maxtries = 100 # 最大尝试规划次数
|
|
|
+ maxtries = 50 # 最大尝试规划次数
|
|
|
attempts = 0 # 已经尝试规划次数
|
|
|
|
|
|
- self.arm.set_max_acceleration_scaling_factor(a)
|
|
|
- self.arm.set_max_velocity_scaling_factor(v)
|
|
|
-
|
|
|
- waypoints=[]
|
|
|
- if points:
|
|
|
#起点位置设置为规划组最后一个点
|
|
|
- pose = self.arm.get_current_pose(self.end_effector_link).pose
|
|
|
- wpose = deepcopy(pose)
|
|
|
- wpose.position.x=points[-1][0]
|
|
|
- wpose.position.y=points[-1][1]
|
|
|
- wpose.position.z=points[-1][2]
|
|
|
- wpose.orientation.x=points[-1][3]
|
|
|
- wpose.orientation.y=points[-1][4]
|
|
|
- wpose.orientation.z=points[-1][5]
|
|
|
- wpose.orientation.w=points[-1][6]
|
|
|
- waypoints.append(deepcopy(wpose))
|
|
|
-
|
|
|
- wpose.position.x=point[0]
|
|
|
- wpose.position.y=point[1]
|
|
|
- wpose.position.z=point[2]
|
|
|
- wpose.orientation.x=point[3]
|
|
|
- wpose.orientation.y=point[4]
|
|
|
- wpose.orientation.z=point[5]
|
|
|
- wpose.orientation.w=point[6]
|
|
|
- waypoints.append(deepcopy(wpose))
|
|
|
- else:
|
|
|
- rospy.loginfo("error empty trajectory")
|
|
|
- return
|
|
|
+ pose = self.arm.get_current_pose(self.end_effector_link).pose
|
|
|
+ wpose = deepcopy(pose)
|
|
|
+ waypoint=[]
|
|
|
+ print(waypoint)
|
|
|
+ if waypoints:
|
|
|
+ wpose.position.x=waypoints[-1].position.x
|
|
|
+ wpose.position.y=waypoints[-1].position.y
|
|
|
+ wpose.position.z=waypoints[-1].position.z
|
|
|
+ wpose.orientation.x=waypoints[-1].orientation.x
|
|
|
+ wpose.orientation.y=waypoints[-1].orientation.y
|
|
|
+ wpose.orientation.z=waypoints[-1].orientation.z
|
|
|
+ wpose.orientation.w=waypoints[-1].orientation.w
|
|
|
+ waypoint.append(deepcopy(wpose))
|
|
|
+ # else:
|
|
|
+ # wpose.position.x=self.home_pose[0]
|
|
|
+ # wpose.position.y=self.home_pose[1]
|
|
|
+ # wpose.position.z=self.home_pose[2]
|
|
|
+ # wpose.orientation.x=self.home_pose[3]
|
|
|
+ # wpose.orientation.y=self.home_pose[4]
|
|
|
+ # wpose.orientation.z=self.home_pose[5]
|
|
|
+ # wpose.orientation.w=self.home_pose[6]
|
|
|
+ # waypoint.append(deepcopy(wpose))
|
|
|
+ #waypoints.append(deepcopy(wpose))
|
|
|
+
|
|
|
+ #wpose = deepcopy(pose)
|
|
|
+ wpose.position.x=point_s[0]
|
|
|
+ wpose.position.y=point_s[1]
|
|
|
+ wpose.position.z=point_s[2]
|
|
|
+ wpose.orientation.x=point_s[3]
|
|
|
+ wpose.orientation.y=point_s[4]
|
|
|
+ wpose.orientation.z=point_s[5]
|
|
|
+ wpose.orientation.w=point_s[6]
|
|
|
+ waypoint.append(deepcopy(wpose))
|
|
|
+ waypoints.append(deepcopy(wpose))
|
|
|
+
|
|
|
+ wpose.position.x=point_e[0]
|
|
|
+ wpose.position.y=point_e[1]
|
|
|
+ wpose.position.z=point_e[2]
|
|
|
+ wpose.orientation.x=point_e[3]
|
|
|
+ wpose.orientation.y=point_e[4]
|
|
|
+ wpose.orientation.z=point_e[5]
|
|
|
+ wpose.orientation.w=point_e[6]
|
|
|
+ waypoint.append(deepcopy(wpose))
|
|
|
+ waypoints.append(deepcopy(wpose))
|
|
|
+
|
|
|
# 尝试规划一条笛卡尔空间下的路径,依次通过所有路点
|
|
|
while fraction < 1.0 and attempts < maxtries:
|
|
|
(plan, fraction) = self.arm.compute_cartesian_path(
|
|
|
- waypoints, # waypoint poses,路点列表
|
|
|
+ waypoint, # waypoint poses,路点列表
|
|
|
0.001, # eef_step,终端步进值
|
|
|
0.0, # jump_threshold,跳跃阈值
|
|
|
True) # avoid_collisions,避障规划
|
|
@@ -126,11 +132,10 @@ class MoveIt_Control:
|
|
|
trajj_with_type = mark_the_traj(trajj,"during-l",welding_sequence)
|
|
|
trajj_with_type.points[-len(trajj.joint_trajectory.points)].type = "start"
|
|
|
trajj_with_type.points[-1].type = "end"
|
|
|
- points.append(point)
|
|
|
trajectory.append(trajj)
|
|
|
+ moveit_server.arm.execute(trajj)
|
|
|
trajectory_with_type.append(trajj_with_type)
|
|
|
else:
|
|
|
- rospy.loginfo(plan[3].val)
|
|
|
rospy.loginfo(
|
|
|
"fraction=" + str(fraction) + " success after " + str(maxtries))
|
|
|
|
|
@@ -140,7 +145,7 @@ class MoveIt_Control:
|
|
|
# trajectory_with_type.pop()
|
|
|
# self.hf_fail.append(welding_sequence[self.hf_num])#将焊缝添加到失败列表
|
|
|
#self.hf_num=self.hf_num+1#焊缝计数
|
|
|
- return points,trajectory,trajectory_with_type
|
|
|
+ return waypoints,trajectory,trajectory_with_type
|
|
|
|
|
|
|
|
|
def get_now_pose(self):
|
|
@@ -266,154 +271,9 @@ class MoveIt_Control:
|
|
|
|
|
|
return points,trajectory,trajectory_with_type,er
|
|
|
|
|
|
- def create_path_constraints3(self,start_point,end_point):#创建路径约束
|
|
|
- ocm=moveit_msgs.msg.OrientationConstraint()
|
|
|
- ocm_constraint=moveit_msgs.msg.Constraints()
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
- #计算起点指向终点的向量
|
|
|
- vector = np.array([end_point[0]- start_point[0], end_point[1]- start_point[1], end_point[2]- start_point[2]])
|
|
|
- height = np.linalg.norm(vector)+0.002
|
|
|
- radius = 0.01
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
- # 位置约束
|
|
|
- position_constraint = PositionConstraint()
|
|
|
- position_constraint.header.frame_id = "base_link"
|
|
|
- position_constraint.link_name = self.end_effector_link
|
|
|
- position_constraint.target_point_offset = Vector3(0, 0, 0)
|
|
|
- #构建 shape_msgs/SolidPrimitive 消息
|
|
|
- bounding_volume = SolidPrimitive()
|
|
|
- bounding_volume.type = SolidPrimitive.CYLINDER
|
|
|
- # bounding_volume.dimensions = [0.003,1]
|
|
|
- bounding_volume.dimensions = [height,radius]
|
|
|
- #构建 geometry_msgs/Pose 消息,用于指定圆柱体在空间中的位置和姿态
|
|
|
- pose = Pose()
|
|
|
- pose.position.x = start_point[0] + vector[0] / 2
|
|
|
- pose.position.y = start_point[1] + vector[1] / 2
|
|
|
- pose.position.z = start_point[2] + vector[2] / 2
|
|
|
-
|
|
|
- # 计算圆柱体的旋转矩阵
|
|
|
- z_axis = np.array([0, 0, 1])
|
|
|
- axis = np.cross(z_axis, vector)
|
|
|
- angle = np.arccos(np.dot(z_axis, vector) / np.linalg.norm(vector))
|
|
|
- q = tf.transformations.quaternion_about_axis(angle, axis)
|
|
|
- pose.orientation.x = q[0]
|
|
|
- pose.orientation.y = q[1]
|
|
|
- pose.orientation.z = q[2]
|
|
|
- pose.orientation.w = q[3]
|
|
|
-
|
|
|
- position_constraint.constraint_region.primitives.append(bounding_volume)
|
|
|
- position_constraint.constraint_region.primitive_poses.append(pose)
|
|
|
- position_constraint.weight = 1.0
|
|
|
-
|
|
|
- ocm_constraint.position_constraints.append(position_constraint)
|
|
|
-
|
|
|
- ocm.link_name = "link_end_now" #or whatever your end effector link is
|
|
|
- ocm.header.frame_id = "link_end_now" # or whatever your planning frame is
|
|
|
- ocm.orientation.x=end_point[3]
|
|
|
- ocm.orientation.y=end_point[4]
|
|
|
- ocm.orientation.z=end_point[5]
|
|
|
- ocm.orientation.w=end_point[6]
|
|
|
-# ocm.orientation.w = 1.0
|
|
|
-
|
|
|
- ocm.absolute_x_axis_tolerance = 0.001
|
|
|
- ocm.absolute_y_axis_tolerance = 0.001
|
|
|
- ocm.absolute_z_axis_tolerance = 0.001
|
|
|
- ocm.weight = 1.0
|
|
|
-
|
|
|
-
|
|
|
- ocm_constraint.orientation_constraints.append(ocm)
|
|
|
- return ocm_constraint
|
|
|
-
|
|
|
-
|
|
|
- def move_pl(self,point,points,trajectory,trajectory_with_type,a=1,v=1):#焊缝的规划 只规划当前的点到上一个点
|
|
|
-
|
|
|
- self.arm.set_max_acceleration_scaling_factor(a)
|
|
|
- self.arm.set_max_velocity_scaling_factor(v)
|
|
|
-
|
|
|
- #设定约束
|
|
|
- if trajectory:
|
|
|
- #起点位置设置为规划组最后一个点
|
|
|
- state = self.arm.get_current_state()
|
|
|
- state.joint_state.position = trajectory[-1].joint_trajectory.points[-1].positions
|
|
|
- self.arm.set_start_state(state)
|
|
|
-
|
|
|
- self.arm.set_pose_target(point, self.end_effector_link)#设定目标点为传入的点
|
|
|
-
|
|
|
- # 创建路径约束
|
|
|
- path_constraints = self.create_path_constraints(points[-1],point)#将最后一个点和输入的点(焊缝的起始点)做圆柱轨迹约束
|
|
|
- self.arm.set_path_constraints(path_constraints)#设定约束
|
|
|
- traj = self.arm.plan()#调用plan进行规划
|
|
|
- error=traj[3]
|
|
|
- if error.val == moveit_msgs.msg._MoveItErrorCodes.MoveItErrorCodes.SUCCESS:
|
|
|
- rospy.loginfo("本次焊缝规划 OK")
|
|
|
- rospy.loginfo("*******************")
|
|
|
- trajj = traj[1] #取出规划的信息
|
|
|
- trajj_with_type = mark_the_traj(trajj,"during-l",welding_sequence)
|
|
|
- trajj_with_type.points[-len(trajj.joint_trajectory.points)].type = "start"
|
|
|
- trajj_with_type.points[-1].type = "end"
|
|
|
- points.append(point)
|
|
|
- trajectory.append(trajj)
|
|
|
- trajectory_with_type.append(trajj_with_type)
|
|
|
- else:
|
|
|
- rospy.loginfo("本次焊缝规划失败")
|
|
|
- points.pop()#将本条焊缝的起点从规划中删除
|
|
|
- trajectory.pop()
|
|
|
- trajectory_with_type.pop()
|
|
|
- self.hf_fail.append(welding_sequence[self.hf_num])#将焊缝添加到失败列表
|
|
|
- #清除约束
|
|
|
- self.arm.clear_path_constraints()
|
|
|
- self.hf_num=self.hf_num+1#焊缝计数
|
|
|
- return points,trajectory,trajectory_with_type
|
|
|
|
|
|
|
|
|
- #TODO move_pl_path_constraints
|
|
|
- def create_path_constraints(self,start_point,end_point):
|
|
|
- #计算起点指向终点的向量
|
|
|
- vector = np.array([end_point[0]- start_point[0], end_point[1]- start_point[1], end_point[2]- start_point[2]])
|
|
|
- height = np.linalg.norm(vector)+0.002
|
|
|
- radius = 0.01
|
|
|
|
|
|
- constraints = Constraints()
|
|
|
-
|
|
|
- # 位置约束
|
|
|
- position_constraint = PositionConstraint()
|
|
|
- position_constraint.header.frame_id = "base_link"
|
|
|
- position_constraint.link_name = self.end_effector_link
|
|
|
- position_constraint.target_point_offset = Vector3(0, 0, 0)
|
|
|
- #构建 shape_msgs/SolidPrimitive 消息
|
|
|
- bounding_volume = SolidPrimitive()
|
|
|
- bounding_volume.type = SolidPrimitive.CYLINDER
|
|
|
- # bounding_volume.dimensions = [0.003,1]
|
|
|
- bounding_volume.dimensions = [height,radius]
|
|
|
- #构建 geometry_msgs/Pose 消息,用于指定圆柱体在空间中的位置和姿态
|
|
|
- pose = Pose()
|
|
|
- pose.position.x = start_point[0] + vector[0] / 2
|
|
|
- pose.position.y = start_point[1] + vector[1] / 2
|
|
|
- pose.position.z = start_point[2] + vector[2] / 2
|
|
|
-
|
|
|
- # 计算圆柱体的旋转矩阵
|
|
|
- z_axis = np.array([0, 0, 1])
|
|
|
- axis = np.cross(z_axis, vector)
|
|
|
- angle = np.arccos(np.dot(z_axis, vector) / np.linalg.norm(vector))
|
|
|
- q = tf.transformations.quaternion_about_axis(angle, axis)
|
|
|
- pose.orientation.x = q[0]
|
|
|
- pose.orientation.y = q[1]
|
|
|
- pose.orientation.z = q[2]
|
|
|
- pose.orientation.w = q[3]
|
|
|
-
|
|
|
- position_constraint.constraint_region.primitives.append(bounding_volume)
|
|
|
- position_constraint.constraint_region.primitive_poses.append(pose)
|
|
|
- position_constraint.weight = 1.0
|
|
|
-
|
|
|
- constraints.position_constraints.append(position_constraint)
|
|
|
-
|
|
|
- return constraints
|
|
|
-
|
|
|
#TODO go_home
|
|
|
@decorator_time
|
|
|
def go_home(self,a=1,v=1):
|
|
@@ -446,8 +306,8 @@ class MoveIt_Control:
|
|
|
def path_planning(self,folder_path,gohome=True):
|
|
|
file_path_result = os.path.join(folder_path, 'result.txt')
|
|
|
all_data = process_welding_data(file_path_result)
|
|
|
- err=0
|
|
|
- points,trajectory,trajectory_with_type = [],[],[]
|
|
|
+
|
|
|
+ way_points,trajectory,trajectory_with_type = [],[],[]
|
|
|
for i in range(len(all_data)):
|
|
|
rospy.loginfo("共读取到%d条焊缝,开始规划第%d条",len(all_data),i+1)
|
|
|
start_point = all_data[i][0]
|
|
@@ -456,20 +316,15 @@ class MoveIt_Control:
|
|
|
q2 = all_data[i][3]
|
|
|
|
|
|
point11 = [start_point[0]/1000, start_point[1]/1000, start_point[2]/1000, q1[0],q1[1],q1[2],q1[3]]
|
|
|
- points,trajectory,trajectory_with_type,err = self.move_p_flexible(point11,points,trajectory,trajectory_with_type)
|
|
|
- if err==1:#焊缝起点不可达 则跳过此次焊缝和移动的规划 并将焊缝添加到失败列表
|
|
|
- self.hf_fail.append(welding_sequence[self.hf_num])
|
|
|
- self.hf_num=self.hf_num+1#焊缝计数
|
|
|
- continue
|
|
|
-
|
|
|
point22 = [end_point[0]/1000, end_point[1]/1000, end_point[2]/1000, q2[0],q2[1],q2[2],q2[3]]
|
|
|
- points,trajectory,trajectory_with_type = self.move_pl(point22,points,trajectory,trajectory_with_type,v=speed_v)
|
|
|
+ way_points,trajectory,trajectory_with_type = self.plan_cartesian_path_LLL(point11,point22,way_points,trajectory,trajectory_with_type,v=speed_v)
|
|
|
|
|
|
rospy.loginfo("第%d条焊缝规划完毕",i+1)
|
|
|
rospy.loginfo("*******************")
|
|
|
- if gohome:
|
|
|
+ #if gohome:
|
|
|
#points,trajectory,trajectory_with_type,err= self.move_p_flexible(self.home_pose,points,trajectory,trajectory_with_type,v=speed_v)
|
|
|
- trajectory,trajectory_with_type = self.go_home_justplan(trajectory,trajectory_with_type)
|
|
|
+ #trajectory,trajectory_with_type = self.go_home_justplan(trajectory,trajectory_with_type)
|
|
|
+
|
|
|
traj_merge= merge_trajectories(trajectory)
|
|
|
trajectory_with_type_merge= merge_trajectories_with_type(trajectory_with_type)
|
|
|
rospy.loginfo("All of The trajectory Plan successfully")
|
|
@@ -568,24 +423,6 @@ def merge_trajectories_with_type(trajectory_with_type):
|
|
|
|
|
|
return merged_trajectory_with_type
|
|
|
|
|
|
-def pub_trajectories(trajectory_with_type_merge):
|
|
|
-
|
|
|
- pub = rospy.Publisher("/joint_path", JointTrajectory_ex, queue_size=5)
|
|
|
- count = 1
|
|
|
- rate = rospy.Rate(1)
|
|
|
- rospy.loginfo("正在发布轨迹信息..")
|
|
|
- while not rospy.is_shutdown():
|
|
|
- #轨迹接受完毕,关闭当前发布
|
|
|
- sign_traj_accepted = str(rospy.get_param("sign_traj_accepted"))
|
|
|
- if sign_traj_accepted == "1":
|
|
|
- rospy.set_param("sign_traj_accepted",0)
|
|
|
- break
|
|
|
-
|
|
|
- pub.publish(trajectory_with_type_merge)
|
|
|
- # rospy.loginfo("发布计数,count = %d",count)
|
|
|
- # count += 1
|
|
|
- rate.sleep()
|
|
|
-
|
|
|
|
|
|
class py_msgs():
|
|
|
fial=[]
|
|
@@ -647,39 +484,9 @@ def get_user_input():
|
|
|
except Exception as e:
|
|
|
rospy.logerr(f"发生错误:{e}")
|
|
|
|
|
|
-def ROS2_msgs(msgs, m_sr):
|
|
|
- for i in range(len(msgs.points)):
|
|
|
- py_msgs.point.xyz_list.append(msgs.points[i].positions)
|
|
|
-
|
|
|
- f_p = os.path.join(folder_path, 'outtt.txt')
|
|
|
- with open(f_p,'w') as file:
|
|
|
- for point in py_msgs.point.xyz_list:
|
|
|
- file.write(' '.join(str(value) for value in point)+ "\n")
|
|
|
-
|
|
|
-# def write_ee(msgs, m_sr):
|
|
|
-# for i in range(len(msgs.points)):
|
|
|
-# py_msgs.point.xyz_list.append(msgs.points[i].positions)
|
|
|
-
|
|
|
-# f_p1 = os.path.join(folder_path, 'ee_pos.txt')
|
|
|
-# with open(f_p1,'w') as file:
|
|
|
-# for point in py_msgs.point.xyz_list:
|
|
|
-# #更新机械臂状态
|
|
|
-# state=m_sr.arm.get_current_state()
|
|
|
-# #state.joint_state.position=point
|
|
|
-
|
|
|
-# #获取末端执行器的位姿
|
|
|
-# fk_result=m_sr.arm.get_current_pose(moveit_server.end_effector_link)
|
|
|
-
|
|
|
-# #获取位置信息
|
|
|
-# position=fk_result.pose.position
|
|
|
-# orientation=fk_result.pose.orientation
|
|
|
-
|
|
|
-# #格式化数据
|
|
|
-# pose_data = f"Position: x={position.x}, y={position.y}, z={position.z}\n"
|
|
|
-# pose_data += f"Orientation (Quaternion): x={orientation.x}, y={orientation.y}, z={orientation.z}, w={orientation.w}\n"
|
|
|
-# pose_data += "-" * 50 + "\n"
|
|
|
+
|
|
|
|
|
|
-# file.write(pose_data)
|
|
|
+
|
|
|
|
|
|
if __name__ =="__main__":
|
|
|
# from redis_publisher import Publisher
|
|
@@ -697,7 +504,7 @@ if __name__ =="__main__":
|
|
|
trajectory,trajectory_merge,trajectory_with_type_merge = moveit_server.path_planning(folder_path)
|
|
|
#ROS2_msgs(trajectory_with_type_merge,moveit_server)
|
|
|
#write_ee(trajectory_with_type_merge, moveit_server)
|
|
|
-
|
|
|
- moveit_server.arm.execute(trajectory_merge)
|
|
|
+ # moveit_server.go_home()
|
|
|
+ # moveit_server.arm.execute(trajectory_merge)
|
|
|
|
|
|
rospy.set_param("sign_control",0)
|