# 导入基本ros和moveit库 import os import moveit_msgs.msg import rospy, sys import moveit_commander import moveit_msgs import tf.transformations from logging.handlers import RotatingFileHandler from moveit_commander import MoveGroupCommander, PlanningSceneInterface, RobotCommander from moveit_msgs.msg import PlanningScene, ObjectColor,CollisionObject, AttachedCollisionObject,RobotTrajectory from moveit_msgs.msg import RobotState from moveit_msgs.msg import Constraints, PositionConstraint, OrientationConstraint from shape_msgs.msg import SolidPrimitive from sensor_msgs.msg import JointState from trajectory_msgs.msg import JointTrajectory,JointTrajectoryPoint from geometry_msgs.msg import PoseStamped, Pose,Point,Vector3 from copy import deepcopy import numpy as np import tf from tf.transformations import euler_from_quaternion,quaternion_from_euler,quaternion_from_euler, quaternion_multiply import math import traceback from lst_robot_r.msg import JointTrajectoryPoint_ex,JointTrajectory_ex import json import termios from redis_publisher import Publisher pi = np.pi class MoveIt_Control: # 初始化程序 def __init__(self, is_use_gripper=False): self.home_pose = [] self.hf_num = 0#焊缝计数 从0开始 self.hf_fail = [] # Init ros configs moveit_commander.roscpp_initialize(sys.argv) self.robot = moveit_commander.RobotCommander() self.arm = moveit_commander.MoveGroupCommander('manipulator') self.arm.set_goal_joint_tolerance(0.0001) self.arm.set_goal_position_tolerance(0.0005) self.arm.set_goal_orientation_tolerance(0.1) #self.arm.set_planner_id("RRTConnet")#不知道能不能用 self.end_effector_link = self.arm.get_end_effector_link() # 设置机械臂基座的参考系 self.reference_frame = 'base_link' self.arm.set_pose_reference_frame(self.reference_frame) # # 设置最大规划时间和是否允许重新规划 self.arm.set_planning_time(10.0) self.arm.allow_replanning(True) self.arm.set_max_acceleration_scaling_factor(1) self.arm.set_max_velocity_scaling_factor(1) self.arm.set_num_planning_attempts(10) # 机械臂初始姿态 #self.move_j() self.go_home() self.home_pose = self.get_now_pose() #self.go_ready() #TODO plan_cartesian_path def plan_cartesian_path_LLL(self, point_s, point_e, waypoints, trajectory, trajectory_with_type, v=1): fraction = 0.0 # 路径规划覆盖率 maxtries = 50 # 最大尝试规划次数 attempts = 0 # 已经尝试规划次数 #起点位置设置为规划组最后一个点 start_pose = self.arm.get_current_pose(self.end_effector_link).pose wpose = deepcopy(start_pose) waypoint = [] #print(waypoint) if waypoints: wpose.position.x = waypoints[-1].position.x wpose.position.y = waypoints[-1].position.y wpose.position.z = waypoints[-1].position.z wpose.orientation.x = waypoints[-1].orientation.x wpose.orientation.y = waypoints[-1].orientation.y wpose.orientation.z = waypoints[-1].orientation.z wpose.orientation.w = waypoints[-1].orientation.w waypoint.append(deepcopy(wpose)) #wpose = deepcopy(pose) wpose.position.x = point_s[0] wpose.position.y = point_s[1] wpose.position.z = point_s[2] wpose.orientation.x = point_s[3] wpose.orientation.y = point_s[4] wpose.orientation.z = point_s[5] wpose.orientation.w = point_s[6] waypoint.append(deepcopy(wpose)) waypoints.append(deepcopy(wpose)) wpose.position.x = point_e[0] wpose.position.y = point_e[1] wpose.position.z = point_e[2] wpose.orientation.x = point_e[3] wpose.orientation.y = point_e[4] wpose.orientation.z = point_e[5] wpose.orientation.w = point_e[6] waypoint.append(deepcopy(wpose)) waypoints.append(deepcopy(wpose)) # 尝试规划一条笛卡尔空间下的路径,依次通过所有路点 while fraction < 1.0 and attempts < maxtries: (plan, fraction) = self.arm.compute_cartesian_path( waypoint, # waypoint poses,路点列表 0.001, # eef_step,终端步进值 0.0, # jump_threshold,跳跃阈值 True) # avoid_collisions,避障规划 attempts += 1 if fraction == 1.0 : rospy.loginfo("Path computed successfully.") #traj = plan trajj = plan #取出规划的信息 retimed_planed = self.retime_plan(trajj, v) trajj_with_type = mark_the_traj(trajj, "during-l", welding_sequence) trajj_with_type.points[-len(trajj.joint_trajectory.points)].type = "start" trajj_with_type.points[-1].type = "end" trajectory.append(trajj) rospy.loginfo("开始执行") moveit_server.arm.execute(retimed_planed) rospy.loginfo("执行结束") trajectory_with_type.append(trajj_with_type) err = 0 else: err = 1 rospy.loginfo("fraction=" + str(fraction) + " failed after " + str(maxtries)) return waypoints, trajectory, trajectory_with_type, err def plan_cartesian_path_point(self, point_ss, point_ee, waypoints, trajectory, trajectory_with_type, v=1): fraction = 0.0 # 路径规划覆盖率 maxtries = 50 # 最大尝试规划次数 attempts = 0 # 已经尝试规划次数 #起点位置设置为规划组最后一个点 start_pose = self.arm.get_current_pose(self.end_effector_link).pose wpose = deepcopy(start_pose) waypoint = [] #print(waypoint) if waypoints: wpose.position.x = waypoints[-1].position.x wpose.position.y = waypoints[-1].position.y wpose.position.z = waypoints[-1].position.z wpose.orientation.x = waypoints[-1].orientation.x wpose.orientation.y = waypoints[-1].orientation.y wpose.orientation.z = waypoints[-1].orientation.z wpose.orientation.w = waypoints[-1].orientation.w waypoint.append(deepcopy(wpose)) #wpose = deepcopy(pose) wpose.position.x = point_ss[0] wpose.position.y = point_ss[1] wpose.position.z = point_ss[2] wpose.orientation.x = point_ss[3] wpose.orientation.y = point_ss[4] wpose.orientation.z = point_ss[5] wpose.orientation.w = point_ss[6] waypoint.append(deepcopy(wpose)) waypoints.append(deepcopy(wpose)) wpose.position.x = point_ee[0] wpose.position.y = point_ee[1] wpose.position.z = point_ee[2] wpose.orientation.x = point_ee[3] wpose.orientation.y = point_ee[4] wpose.orientation.z = point_ee[5] wpose.orientation.w = point_ee[6] waypoint.append(deepcopy(wpose)) waypoints.append(deepcopy(wpose)) # 尝试规划一条笛卡尔空间下的路径,依次通过所有路点 while fraction < 1.0 and attempts < maxtries: (plan, fraction) = self.arm.compute_cartesian_path( waypoint, # waypoint poses,路点列表 0.001, # eef_step,终端步进值 0.0, # jump_threshold,跳跃阈值 True) # avoid_collisions,避障规划 attempts += 1 if fraction == 1.0 : rospy.loginfo("Path computed successfully.") #traj = plan trajj = plan #取出规划的信息 retimed_planed = self.retime_plan(trajj, v) trajj_with_type = mark_the_traj(trajj, "during-l", welding_sequence) trajectory.append(trajj) rospy.loginfo("开始执行") moveit_server.arm.execute(retimed_planed) rospy.loginfo("执行结束") trajectory_with_type.append(trajj_with_type) err = 0 else: err = 1 rospy.loginfo("fraction=" + str(fraction) + " failed after " + str(maxtries)) return waypoints, trajectory, trajectory_with_type, err #TODO retime_plan def retime_plan(self, traj, v): current_state = self.arm.get_current_state() retimed_traj = self.arm.retime_trajectory( current_state, traj, velocity_scaling_factor=0.03, acceleration_scaling_factor=1) return retimed_traj def get_now_pose(self): # 获取机械臂当前位姿 current_pose = self.arm.get_current_pose(self.end_effector_link).pose pose = [] pose.append(current_pose.position.x) pose.append(current_pose.position.y) pose.append(current_pose.position.z) pose.append(current_pose.orientation.x) pose.append(current_pose.orientation.y) pose.append(current_pose.orientation.z) pose.append(current_pose.orientation.w) # 打印位姿信息 #rospy.lohome_poseinfo("Current Pose: {}".format(pose)) return pose def go_home(self, a=1, v=1): rospy.loginfo("go_home start") self.arm.set_max_acceleration_scaling_factor(a) self.arm.set_max_velocity_scaling_factor(v) self.arm.set_named_target('home') rospy.loginfo("get home_point") self.arm.go() rospy.loginfo("go_home end") # rospy.sleep(1) def go_home_justplan(self, trajectory, trajectory_with_type, a=1, v=1): rospy.loginfo("go_home start") self.arm.set_max_acceleration_scaling_factor(a) self.arm.set_max_velocity_scaling_factor(v) state = self.arm.get_current_state() if trajectory: state.joint_state.position = trajectory[-1].joint_trajectory.points[-1].positions self.arm.set_start_state(state) self.arm.set_named_target('home') traj = self.arm.plan() trajj = traj[1] moveit_server.arm.execute(trajj) rospy.loginfo("go_home end") traj_with_type = mark_the_traj(trajj, "go-home", welding_sequence) trajectory.append(trajj) trajectory_with_type.append(traj_with_type) return trajectory,trajectory_with_type #TODO path_planning def path_planning(self, folder_path, gohome=True): file_path_result = os.path.join(folder_path, 'result.txt') all_data = process_welding_data(file_path_result) way_points,trajectory,trajectory_with_type = [], [], [] for i in range(len(all_data)): rospy.loginfo("共读取到%d条焊缝,开始规划第%d条", len(all_data), i+1) start_point = all_data[i][0] end_point = all_data[i][1] q1 = all_data[i][2] q2 = all_data[i][3] point11 = [start_point[0]/1000, start_point[1]/1000, start_point[2]/1000, q1[0], q1[1], q1[2], q1[3]] point22 = [end_point[0]/1000, end_point[1]/1000, end_point[2]/1000, q2[0], q2[1], q2[2], q2[3]] up_value = -0.1 point_up = [point11[0], point11[1], point11[2]-up_value, point11[3], point11[4], point11[5], point11[6]] # 点到点规划 way_points, trajectory, trajectory_with_type, error = self.plan_cartesian_path_point(point_up, point11, way_points, trajectory, trajectory_with_type, v=speed_v) rospy.loginfo("末端执行器上移完毕") rospy.loginfo("*******************") if(error == 1): rospy.loginfo("焊缝规划出现失败 清空路径列表") rospy.loginfo("*******************") trajectory.clear() trajectory_with_type.clear() break # 焊缝规划 way_points, trajectory, trajectory_with_type, error = self.plan_cartesian_path_LLL(point11, point22, way_points, trajectory, trajectory_with_type, v=speed_v) rospy.loginfo("第%d条焊缝规划完毕", i+1) rospy.loginfo("*******************") if(error == 1): rospy.loginfo("焊缝规划出现失败 清空路径列表") rospy.loginfo("*******************") trajectory.clear() trajectory_with_type.clear() break #向上移动末端执行器 if i