123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526 |
- # 导入基本ros和moveit库
- import os
- import moveit_msgs.msg
- import rospy, sys
- import moveit_commander
- import moveit_msgs
- import tf.transformations
- from logging.handlers import RotatingFileHandler
- from moveit_commander import MoveGroupCommander, PlanningSceneInterface, RobotCommander
- from moveit_msgs.msg import PlanningScene, ObjectColor,CollisionObject, AttachedCollisionObject,RobotTrajectory
- from moveit_msgs.msg import RobotState
- from moveit_msgs.msg import Constraints, PositionConstraint, OrientationConstraint
- from shape_msgs.msg import SolidPrimitive
- from sensor_msgs.msg import JointState
- from trajectory_msgs.msg import JointTrajectory,JointTrajectoryPoint
- from geometry_msgs.msg import PoseStamped, Pose,Point,Vector3
- from copy import deepcopy
- import numpy as np
- import tf
- from tf.transformations import euler_from_quaternion,quaternion_from_euler,quaternion_from_euler, quaternion_multiply
- import math
- import traceback
- from lst_robot_r.msg import JointTrajectoryPoint_ex,JointTrajectory_ex
- import json
- import termios
- from redis_publisher import Publisher
- pi = np.pi
- class MoveIt_Control:
- # 初始化程序
- def __init__(self, is_use_gripper=False):
- self.home_pose = []
- self.hf_num = 0#焊缝计数 从0开始
- self.hf_fail = []
- # Init ros configs
- moveit_commander.roscpp_initialize(sys.argv)
- self.robot = moveit_commander.RobotCommander()
-
- self.arm = moveit_commander.MoveGroupCommander('manipulator')
- self.arm.set_goal_joint_tolerance(0.0001)
- self.arm.set_goal_position_tolerance(0.0005)
- self.arm.set_goal_orientation_tolerance(0.1)
- #self.arm.set_planner_id("RRTConnet")#不知道能不能用
- self.end_effector_link = self.arm.get_end_effector_link()
- # 设置机械臂基座的参考系
- self.reference_frame = 'base_link'
- self.arm.set_pose_reference_frame(self.reference_frame)
- # # 设置最大规划时间和是否允许重新规划
- self.arm.set_planning_time(10.0)
- self.arm.allow_replanning(True)
- self.arm.set_max_acceleration_scaling_factor(1)
- self.arm.set_max_velocity_scaling_factor(1)
- self.arm.set_num_planning_attempts(10)
- # 机械臂初始姿态
- #self.move_j()
- self.go_home()
- self.home_pose = self.get_now_pose()
- #self.go_ready()
- #TODO plan_cartesian_path
- def plan_cartesian_path_LLL(self, point_s, point_e, waypoints, trajectory, trajectory_with_type, v=1):
- fraction = 0.0 # 路径规划覆盖率
- maxtries = 50 # 最大尝试规划次数
- attempts = 0 # 已经尝试规划次数
- #起点位置设置为规划组最后一个点
- start_pose = self.arm.get_current_pose(self.end_effector_link).pose
- wpose = deepcopy(start_pose)
- waypoint = []
- #print(waypoint)
- if waypoints:
- wpose.position.x = waypoints[-1].position.x
- wpose.position.y = waypoints[-1].position.y
- wpose.position.z = waypoints[-1].position.z
- wpose.orientation.x = waypoints[-1].orientation.x
- wpose.orientation.y = waypoints[-1].orientation.y
- wpose.orientation.z = waypoints[-1].orientation.z
- wpose.orientation.w = waypoints[-1].orientation.w
- waypoint.append(deepcopy(wpose))
- #wpose = deepcopy(pose)
- wpose.position.x = point_s[0]
- wpose.position.y = point_s[1]
- wpose.position.z = point_s[2]
- wpose.orientation.x = point_s[3]
- wpose.orientation.y = point_s[4]
- wpose.orientation.z = point_s[5]
- wpose.orientation.w = point_s[6]
- waypoint.append(deepcopy(wpose))
- waypoints.append(deepcopy(wpose))
- wpose.position.x = point_e[0]
- wpose.position.y = point_e[1]
- wpose.position.z = point_e[2]
- wpose.orientation.x = point_e[3]
- wpose.orientation.y = point_e[4]
- wpose.orientation.z = point_e[5]
- wpose.orientation.w = point_e[6]
- waypoint.append(deepcopy(wpose))
- waypoints.append(deepcopy(wpose))
- # 尝试规划一条笛卡尔空间下的路径,依次通过所有路点
- while fraction < 1.0 and attempts < maxtries:
- (plan, fraction) = self.arm.compute_cartesian_path(
- waypoint, # waypoint poses,路点列表
- 0.001, # eef_step,终端步进值
- 0.0, # jump_threshold,跳跃阈值
- True) # avoid_collisions,避障规划
- attempts += 1
- if fraction == 1.0 :
- rospy.loginfo("Path computed successfully.")
- #traj = plan
- trajj = plan #取出规划的信息
- retimed_planed = self.retime_plan(trajj, v)
- trajj_with_type = mark_the_traj(trajj, "during-l", welding_sequence)
- trajj_with_type.points[-len(trajj.joint_trajectory.points)].type = "start"
- trajj_with_type.points[-1].type = "end"
- trajectory.append(trajj)
- rospy.loginfo("开始执行")
- moveit_server.arm.execute(retimed_planed)
- rospy.loginfo("执行结束")
- trajectory_with_type.append(trajj_with_type)
- err = 0
- else:
- err = 1
- rospy.loginfo("fraction=" + str(fraction) + " failed after " + str(maxtries))
-
- return waypoints, trajectory, trajectory_with_type, err
-
-
- def plan_cartesian_path_point(self, point_ss, point_ee, waypoints, trajectory, trajectory_with_type, v=1):
- fraction = 0.0 # 路径规划覆盖率
- maxtries = 50 # 最大尝试规划次数
- attempts = 0 # 已经尝试规划次数
- #起点位置设置为规划组最后一个点
- start_pose = self.arm.get_current_pose(self.end_effector_link).pose
- wpose = deepcopy(start_pose)
- waypoint = []
- #print(waypoint)
- if waypoints:
- wpose.position.x = waypoints[-1].position.x
- wpose.position.y = waypoints[-1].position.y
- wpose.position.z = waypoints[-1].position.z
- wpose.orientation.x = waypoints[-1].orientation.x
- wpose.orientation.y = waypoints[-1].orientation.y
- wpose.orientation.z = waypoints[-1].orientation.z
- wpose.orientation.w = waypoints[-1].orientation.w
- waypoint.append(deepcopy(wpose))
- #wpose = deepcopy(pose)
- wpose.position.x = point_ss[0]
- wpose.position.y = point_ss[1]
- wpose.position.z = point_ss[2]
- wpose.orientation.x = point_ss[3]
- wpose.orientation.y = point_ss[4]
- wpose.orientation.z = point_ss[5]
- wpose.orientation.w = point_ss[6]
- waypoint.append(deepcopy(wpose))
- waypoints.append(deepcopy(wpose))
- wpose.position.x = point_ee[0]
- wpose.position.y = point_ee[1]
- wpose.position.z = point_ee[2]
- wpose.orientation.x = point_ee[3]
- wpose.orientation.y = point_ee[4]
- wpose.orientation.z = point_ee[5]
- wpose.orientation.w = point_ee[6]
- waypoint.append(deepcopy(wpose))
- waypoints.append(deepcopy(wpose))
- # 尝试规划一条笛卡尔空间下的路径,依次通过所有路点
- while fraction < 1.0 and attempts < maxtries:
- (plan, fraction) = self.arm.compute_cartesian_path(
- waypoint, # waypoint poses,路点列表
- 0.001, # eef_step,终端步进值
- 0.0, # jump_threshold,跳跃阈值
- True) # avoid_collisions,避障规划
- attempts += 1
- if fraction == 1.0 :
- rospy.loginfo("Path computed successfully.")
- #traj = plan
- trajj = plan #取出规划的信息
- retimed_planed = self.retime_plan(trajj, v)
- trajj_with_type = mark_the_traj(trajj, "during-l", welding_sequence)
- trajectory.append(trajj)
- rospy.loginfo("开始执行")
- moveit_server.arm.execute(retimed_planed)
- rospy.loginfo("执行结束")
- trajectory_with_type.append(trajj_with_type)
- err = 0
- else:
- err = 1
- rospy.loginfo("fraction=" + str(fraction) + " failed after " + str(maxtries))
-
- return waypoints, trajectory, trajectory_with_type, err
-
-
- #TODO retime_plan
- def retime_plan(self, traj, v):
- current_state = self.arm.get_current_state()
-
- retimed_traj = self.arm.retime_trajectory(
- current_state,
- traj,
- velocity_scaling_factor=0.03,
- acceleration_scaling_factor=1)
-
- return retimed_traj
-
-
- def get_now_pose(self):
- # 获取机械臂当前位姿
- current_pose = self.arm.get_current_pose(self.end_effector_link).pose
- pose = []
- pose.append(current_pose.position.x)
- pose.append(current_pose.position.y)
- pose.append(current_pose.position.z)
- pose.append(current_pose.orientation.x)
- pose.append(current_pose.orientation.y)
- pose.append(current_pose.orientation.z)
- pose.append(current_pose.orientation.w)
- # 打印位姿信息
- #rospy.lohome_poseinfo("Current Pose: {}".format(pose))
- return pose
- def go_home(self, a=1, v=1):
- rospy.loginfo("go_home start")
- self.arm.set_max_acceleration_scaling_factor(a)
- self.arm.set_max_velocity_scaling_factor(v)
- self.arm.set_named_target('home')
- rospy.loginfo("get home_point")
- self.arm.go()
- rospy.loginfo("go_home end")
- # rospy.sleep(1)
-
- def go_home_justplan(self, trajectory, trajectory_with_type, a=1, v=1):
- rospy.loginfo("go_home start")
- self.arm.set_max_acceleration_scaling_factor(a)
- self.arm.set_max_velocity_scaling_factor(v)
- state = self.arm.get_current_state()
- if trajectory:
- state.joint_state.position = trajectory[-1].joint_trajectory.points[-1].positions
- self.arm.set_start_state(state)
- self.arm.set_named_target('home')
- traj = self.arm.plan()
- trajj = traj[1]
- moveit_server.arm.execute(trajj)
- rospy.loginfo("go_home end")
- traj_with_type = mark_the_traj(trajj, "go-home", welding_sequence)
- trajectory.append(trajj)
- trajectory_with_type.append(traj_with_type)
- return trajectory,trajectory_with_type
- #TODO path_planning
- def path_planning(self, folder_path, gohome=True):
- file_path_result = os.path.join(folder_path, 'result.txt')
- all_data = process_welding_data(file_path_result)
- way_points,trajectory,trajectory_with_type = [], [], []
- for i in range(len(all_data)):
- rospy.loginfo("共读取到%d条焊缝,开始规划第%d条", len(all_data), i+1)
- start_point = all_data[i][0]
- end_point = all_data[i][1]
- q1 = all_data[i][2]
- q2 = all_data[i][3]
- point11 = [start_point[0]/1000, start_point[1]/1000, start_point[2]/1000, q1[0], q1[1], q1[2], q1[3]]
- point22 = [end_point[0]/1000, end_point[1]/1000, end_point[2]/1000, q2[0], q2[1], q2[2], q2[3]]
-
- up_value = -0.1
- point_up = [point11[0], point11[1], point11[2]-up_value, point11[3], point11[4], point11[5], point11[6]]
- # 点到点规划
- way_points, trajectory, trajectory_with_type, error = self.plan_cartesian_path_point(point_up, point11, way_points, trajectory, trajectory_with_type, v=speed_v)
- rospy.loginfo("末端执行器上移完毕")
- rospy.loginfo("*******************")
-
- if(error == 1):
- rospy.loginfo("焊缝规划出现失败 清空路径列表")
- rospy.loginfo("*******************")
- trajectory.clear()
- trajectory_with_type.clear()
- break
-
- # 焊缝规划
- way_points, trajectory, trajectory_with_type, error = self.plan_cartesian_path_LLL(point11, point22, way_points, trajectory, trajectory_with_type, v=speed_v)
-
- rospy.loginfo("第%d条焊缝规划完毕", i+1)
- rospy.loginfo("*******************")
- if(error == 1):
- rospy.loginfo("焊缝规划出现失败 清空路径列表")
- rospy.loginfo("*******************")
- trajectory.clear()
- trajectory_with_type.clear()
- break
- #向上移动末端执行器
- if i<len(all_data):
- up_value = -0.1
- point_up = [point22[0], point22[1], point22[2]-up_value, point22[3], point22[4], point22[5], point22[6]]
- # 点到点规划
- way_points, trajectory, trajectory_with_type, error = self.plan_cartesian_path_point(point22, point_up, way_points,trajectory,trajectory_with_type,v=speed_v)
- rospy.loginfo("末端执行器上移完毕")
- rospy.loginfo("*******************")
-
- if(error == 1):
- rospy.loginfo("焊缝规划出现失败 清空路径列表")
- rospy.loginfo("*******************")
- trajectory.clear()
- trajectory_with_type.clear()
- break
- #rospy.loginfo("All of The trajectory Plan successfully")
- #rospy.loginfo("*******************")
- if gohome:
- #trajectory,trajectory_with_type= self.move_p_flexible(trajectory,trajectory_with_type)
- trajectory, trajectory_with_type = self.go_home_justplan(trajectory, trajectory_with_type)
- traj_merge = merge_trajectories(trajectory)
- trajectory_with_type_merge = merge_trajectories_with_type(trajectory_with_type)
- return trajectory, traj_merge, trajectory_with_type_merge
- ###########################################################class end#############################################################
- def process_welding_data(filename):
- all_data = [] # 用来存储每一行处理后的数据
- with open(filename, 'r') as file:
- for line in file:
- parts = line.strip().split('/')
- coordinates_and_poses = [part.split(',') for part in parts[1:]]
-
- start_point = tuple(map(float, coordinates_and_poses[0][:3])) # 使用元组以避免修改
- end_point = tuple(map(float, coordinates_and_poses[1][:3]))
- q1 = tuple(map(float, coordinates_and_poses[2][:4]))
- q2 = tuple(map(float, coordinates_and_poses[3][:4]))
-
- # 收集每行处理后的数据
- all_data.append((start_point, end_point, q1, q2))
- return all_data
- def mark_the_traj(traj,TYPE,SEQUENCE):
- traj_with_type = JointTrajectory_ex()
- traj_with_type.header = traj.joint_trajectory.header
- traj_with_type.joint_names = traj.joint_trajectory.joint_names
- traj_with_type.points = [
- JointTrajectoryPoint_ex(
- positions = point.positions,
- velocities = point.velocities,
- accelerations = point.accelerations,
- effort = point.effort,
- type = TYPE,
- sequence = SEQUENCE
- ) for point in traj.joint_trajectory.points
- ]
- return traj_with_type
- def merge_trajectories(trajectories):
- if not trajectories:
- return None
-
- # 创建一个新的 JointTrajectory 对象
- merged_trajectory = JointTrajectory()
- merged_trajectory.header = trajectories[0].joint_trajectory.header
- merged_trajectory.joint_names = trajectories[0].joint_trajectory.joint_names
-
- # 初始化时间累加器
- last_time_from_start = rospy.Duration(0)
-
- # 合并所有 trajectories 的 points
- for traj in trajectories:
- for point in traj.joint_trajectory.points:
- # 创建新的轨迹点
- new_point = deepcopy(point)
- # 累加时间
- new_point.time_from_start += last_time_from_start
- merged_trajectory.points.append(new_point)
-
- # 更新时间累加器为当前轨迹的最后一个点的时间
- if traj.joint_trajectory.points:
- last_time_from_start = traj.joint_trajectory.points[-1].time_from_start + last_time_from_start
-
- return merged_trajectory
- def merge_trajectories_with_type(trajectory_with_type):
- if not trajectory_with_type:
- return None
-
- # 创建一个新的 JointTrajectory 对象
- merged_trajectory_with_type = JointTrajectory_ex()
- merged_trajectory_with_type.header = trajectory_with_type[0].header
- merged_trajectory_with_type.joint_names = trajectory_with_type[0].joint_names
-
- # 初始化时间累加器
- last_time_from_start = rospy.Duration(0)
-
- # 合并所有 trajectories 的 points
- for traj in trajectory_with_type:
- for point in traj.points:
- # 创建新的轨迹点
- new_point = deepcopy(point)
- # 累加时间
- new_point.time_from_start += last_time_from_start
- merged_trajectory_with_type.points.append(new_point)
-
- # 更新时间累加器为当前轨迹的最后一个点的时间
- if traj.points:
- last_time_from_start = traj.points[-1].time_from_start + last_time_from_start
-
- return merged_trajectory_with_type
- class py_msgs():
- fial = []
- shun_xv = []
- class point():
- xyz_list = []
- type = []
- #redis消息格式
- def ROS2PY_msgs(msgs, m_sr):
- for i in range(len(msgs.points)):
- py_msgs.point.xyz_list.append(msgs.points[i].positions)
- py_msgs.point.type.append(msgs.points[i].type)
- py_msgs.fail = m_sr.hf_fail.copy()
- py_msgs.shun_xv = msgs.points[0].sequence.copy()
- # 打印规划信息
- # for i in range(len(py_msgs.point.type)):
- # print(py_msgs.point.xyz_list[i])
- # print(py_msgs.point.type[i])
- message = {
- 'positions': py_msgs.point.xyz_list, # 规划路径点结果
- 'flags': py_msgs.point.type, # 规划路径点的类型,焊接点移动点
- 'weld_order': py_msgs.shun_xv, # 规划路径顺序
- 'failed': py_msgs.fail, # 规划路径失败的焊缝
- }
- return json.dumps(message)
- # for i in range(len(py_msgs.point.type)):
- # moveit_server.move_j(py_msgs.point.xyz_list[i])
- # #input("任意键继续")
- # def get_valid_input(factor, default):
- # while True:
- # user_input=input(factor)
- # if user_input=="":
- # return default
- # try:
- # value=float(user_input)
- # if 0<value<=1:
- # return value
- # else:
- # rospy.logerr("缩放因子必须在(0, 1]范围内,请重新输入!")
- # except ValueError:
- # rospy.logerr("输入值无效,请根据提示重新输入!")
- # def get_user_input():
- # """获取用户输入的速度和加速度缩放因子,并在控制台中打印信息和错误提示"""
- # DEFUALT_V = 1.0
- # DEFUALT_A = 1.0
-
- # try:
- # termios.tcflush(sys.stdin, termios.TCIFLUSH) #清空输入缓存
- # rospy.loginfo("输入缓存区已清空!")
-
- # vv=get_valid_input("请输入速度缩放因子(0-1, 默认为1): ", DEFUALT_V)
- # #a=get_valid_input("请输入加速度缩放因子(0-1, 默认为1): ", DEFUALT_A)
-
- # #rospy.loginfo(f"用户输入的速度缩放因子为{vv}")
- # return vv
- # except Exception as e:
- # rospy.logerr(f"发生错误:{e}")
-
- # def ROS2_msgs_write(msgs, m_sr):
- # for i in range(len(msgs.points)):
- # py_msgs.point.xyz_list.append(msgs.points[i].positions)
-
- # f_p = os.path.join(folder_path, 'outtt.txt')
- # with open(f_p,'w') as file:
- # for point in py_msgs.point.xyz_list:
- # file.write(' '.join(str(value) for value in point)+ "\n")
-
- if __name__ =="__main__":
- publisher = Publisher()
- folder_path = rospy.get_param("folder_path")
- rospy.init_node('moveit_control_server', anonymous=False)
- moveit_server = MoveIt_Control(is_use_gripper=False)
- welding_sequence = rospy.get_param('welding_sequence')
- speed_v = 0.001
- # speed_v=get_user_input()
- # rospy.loginfo(f"用户输入的速度缩放因子为{speed_v}")
- trajectory, trajectory_merge, trajectory_with_type_merge = moveit_server.path_planning(folder_path)
- #moveit_server.go_ready() #合并后的轨迹也需要从ready点开始执行
-
- # rospy.loginfo("开始执行合并后轨迹")
- # moveit_server.arm.execute(trajectory_merge)
- # #ROS2_msgs_write(trajectory_with_type_merge,moveit_server)
- # rospy.loginfo("合并后轨迹执行完毕")
-
- message = ROS2PY_msgs(trajectory_with_type_merge, moveit_server)
- publisher.pub_plan_result(message)
- print("路径规划信息已发布....")
|