diff --git a/robot_smach_states/examples/manipulation/example_top_grasp.py b/robot_smach_states/examples/manipulation/example_top_grasp.py new file mode 100644 index 0000000000..54409d61e1 --- /dev/null +++ b/robot_smach_states/examples/manipulation/example_top_grasp.py @@ -0,0 +1,46 @@ +# System +import argparse + +# ROS +import PyKDL as kdl +import rospy + +# TU/e Robotics +from ed.entity import Entity +from ed.shape import RightPrism +from pykdl_ros import FrameStamped +from robot_skills.get_robot import get_robot + +# Robot Smach States +import robot_smach_states.util.designators as ds +from robot_smach_states.manipulation.top_grasp import TopGrab + + +if __name__ == "__main__": + + parser = argparse.ArgumentParser(description="Put an imaginary object in the world model and grasp it using the " + "'topgrasp' smach state") + parser.add_argument("x", type=float, help="x-coordinate (in map) of the imaginary object") + parser.add_argument("y", type=float, help="y-coordinate (in map) of the imaginary object") + parser.add_argument("z", type=float, help="z-coordinate (in map) of the imaginary object") + parser.add_argument("--robot", default="hero", help="Robot name (amigo, hero, sergio)") + args = parser.parse_args() + + rospy.init_node("test_top_grasping") + + robot = get_robot(args.robot) + + entity_id = "test_item" + pose = FrameStamped(frame=kdl.Frame(kdl.Rotation.RPY(0.0, 0.0, 0.0), kdl.Vector(args.x, args.y, args.z)), + stamp=rospy.Time.now(), + frame_id="map") + robot.ed.update_entity(uuid=entity_id, frame_stamped=pose) + shape = RightPrism([kdl.Vector(0, 0, 0), kdl.Vector(0, 0.05, 0), kdl.Vector(0.05, 0.05, 0), kdl.Vector(0.05, 0, 0)], -0.1, 0.1) + item = Entity(entity_id, "test_type", pose.header.frame_id, pose.frame, shape, None, None, rospy.Time.now()) + + item = ds.Designator(item) + + arm = ds.UnoccupiedArmDesignator(robot).lockable() + + grab_state = TopGrab(robot, item, arm) + grab_state.execute() diff --git a/robot_smach_states/src/robot_smach_states/manipulation/cutlery_detector.py b/robot_smach_states/src/robot_smach_states/manipulation/cutlery_detector.py new file mode 100644 index 0000000000..c2c95062e8 --- /dev/null +++ b/robot_smach_states/src/robot_smach_states/manipulation/cutlery_detector.py @@ -0,0 +1,245 @@ +import rospy +import cv2 +import math +import numpy as np +from ultralytics import YOLO +from cv_bridge import CvBridge +from sensor_msgs.msg import Image + + + +class YoloSegmentor: + def __init__(self) -> None: + model_path = "~/MEGA/developers/Donal/yolov8x-seg.pt" + device = "cuda" + self.model = YOLO(model_path).to(device) + self.class_ids = [42, 43, 44] # See the COCO dataset for class id to label info (fork = 42, knife = 43, spoon = 44) + self.active = False + + self.intercept = 0 + self.slope = 0 + self.length = 0 + + self.publisher = rospy.Publisher('/hero/segmented_image', Image, queue_size=10) + self.subscriber = rospy.Subscriber('/hero/hand_camera/image_raw', Image, self.callback) + + # Initialize the attributes to store the values + self.x_center = None + self.y_center = None + self.slope = None + self.upwards = None + self.class_id = None + self.filtered_masks = None + self.time = rospy.Time.now() + + + def start(self): + self.active = True + + def stop(self): + self.active = False + + @staticmethod + def detect(model, frame): + results = model(frame) + global result + result = results[0] + + segmentation_contours_idx = [np.array(seg, dtype=np.int32) for seg in result.masks.xy] + class_ids = np.array(result.boxes.cls.cpu(), dtype="int") + + return class_ids, segmentation_contours_idx, result + + def extract_table_segment(self, image, class_ids, segmentation_contours_idx): + table_segment = np.zeros_like(image, dtype=np.uint8) + global width + height, width, channels = image.shape #obtain data on image size + + for class_id, seg in zip(class_ids, segmentation_contours_idx): + self.class_id = class_id + if class_id in self.class_ids: + cv2.fillPoly(table_segment, [seg], color=(255, 0, 255)) #If fork, knife or spoon a pink mask will be created + else: + cv2.fillPoly(table_segment, [seg], color=(255, 0, 0)) #If another object from the COCO dataset a blue mask will be created + + return table_segment + + def coordinates_center_point(self): + # Separating boxes and masks for cutlery from other objects + cutlery_boxes = [] + filtered_masks = [] + for i, box in enumerate(result.boxes): + if int(box.cls) in self.class_ids: + cutlery_boxes.append(box) + filtered_masks.append(result.masks.xy[i]) + self.filtered_masks = filtered_masks + x_center = None + y_center = None + + # Check if cutlery_boxes is empty + if not cutlery_boxes: + coordinates_box = None # No bounding boxes found + else: + # Access the xywh attribute of the first box in cutlery_boxes + coordinates_box_tensor = cutlery_boxes[0].xywh + if coordinates_box_tensor.numel() == 0: # emake sure that the tensor is not empty before accessing its elements + coordinates_box = None + else: + coordinates_box = coordinates_box_tensor[0].tolist() + x_center, y_center = coordinates_box[0], coordinates_box[1] # Center coordinates of the bounding box + + x_center, y_center = coordinates_box[0], coordinates_box[1] # Obtain the center coordinates of the bounding box = center coordinates object + print(f"x_center, y_center = {x_center, y_center}") #print center coordinates + + # Store the values + self.x_center = x_center + self.y_center = y_center + + return x_center, y_center + + def visualize_center_point(self, x_center, y_center, table_segment): + cv2.circle(table_segment, (int(x_center), int(y_center)), 5, (0, 255, 255), -1) # Draw yellow dot at the center point + return table_segment + + def calculate_slope_intercept(self): + + inner_array = self.filtered_masks[0] # result.masks.xy is an array in an array, so has to be unpacked first + x = inner_array[:,0] # first column = x coordinates of all points of the segmentation mask + y = inner_array[:,1] # second column = y coordinates of all points of the segmentation mask + + mean_x = np.mean(x) + mean_y = np.mean(y) + min_x = int(min(x)) + max_x = int(max(x)) + min_y = int(min(y)) + max_y = int(max(y)) + + length = math.sqrt((max_x - min_x)**2+(max_y - min_y)**2) + self.length = length + + #for nearly vertical cases: + if max_x - min_x < (1/5 * width): #if the object takes up a small width (less than 1/5 of the image) it is oriented (nearly) vertical in the camera coordinate frame + numerator = np.sum((y - mean_y) ** 2) + denominator = np.sum((x - mean_x) * (y - mean_y)) + slope = numerator / denominator + + #for all other orientations: + else: + numerator = np.sum((x - mean_x) * (y - mean_y)) + denominator = np.sum((x - mean_x) ** 2) + slope = numerator / denominator + intercept = mean_y - slope * mean_x + + print(f"Calculated slope: {slope}") + print(f"Calculated intercept: {intercept}") + + # Store the slope value + self.slope = slope + + return slope, intercept, min_x, max_x + + def predict(self, slope, intercept, min_x, max_x): + y_minx = slope * min_x + intercept #y-coordinate corresponding to min_x + y_maxx = slope * max_x + intercept #y-coordinate corresponding to max_x + rospy.loginfo("predict") + return y_minx, y_maxx + + def visualize_orientation(self, min_x, max_x, y_minx, y_maxx, table_segment): + cv2.line(table_segment, (min_x, int(y_minx)), (max_x, int(y_maxx)), (0, 255, 0), 2) # Draw the line created by the least squares method in green + return table_segment + + def object_direction(self, x_center, y_center, slope): + perpendicular_slope = -1/slope + perpendicular_intercept = y_center - perpendicular_slope * x_center + + inner_array = self.filtered_masks[0] + x = inner_array[:,0] + y = inner_array[:,1] + coordinates_upper = 0 + coordinates_lower = 0 + if 0 <= perpendicular_slope < math.inf: + for i in range(len(x)): + if y[i]<= perpendicular_slope*x[i] + perpendicular_intercept: + coordinates_upper += 1 + else: + coordinates_lower += 1 + if -math.inf < perpendicular_slope < 0: + for i in range(len(x)): + if y[i]>= perpendicular_slope*x[i] + perpendicular_intercept: + coordinates_upper += 1 + else: + coordinates_lower += 1 + print("Size outline upper half of the mask:", coordinates_upper) + print("Size outline lower half of the mask:", coordinates_lower) + + if coordinates_upper >= coordinates_lower: + upwards = True + elif coordinates_upper < coordinates_lower: + upwards = False + self.time = rospy.Time.now() + self.upwards = upwards + + return upwards + + def callback(self, data): + #rospy.loginfo("Received image data") + + if not self.active: + #rospy.loginfo("Callback inactive") + return + bridge = CvBridge() + cv_image = bridge.imgmsg_to_cv2(data, desired_encoding='passthrough') + cv_image = cv2.cvtColor(cv_image, cv2.COLOR_RGB2BGR) + rospy.loginfo("Image converted") + + class_ids, segmentation_contours_idx, result = self.detect(self.model, cv_image) + rospy.loginfo("Object detected") + + table_segment = self.extract_table_segment(cv_image, class_ids, segmentation_contours_idx) + + #makes sure that even when no cutlery is detected, the segmentation of objects is visualized + table_message = bridge.cv2_to_imgmsg(table_segment, encoding="passthrough") + self.publisher.publish(table_message) + + if any(class_id in self.class_ids for class_id in class_ids): + rospy.loginfo("Cutlery detected") + + x_center, y_center = self.coordinates_center_point() + + + # Visualize center point + table_segment_with_center = self.visualize_center_point(x_center, y_center, table_segment) + rospy.loginfo("Center point visualized") + + # Calculate slope and intercept + slope, intercept, min_x, max_x = self.calculate_slope_intercept() + rospy.loginfo("Slope and intercept calculated") + + y_minx, y_maxx = self.predict(slope, intercept, min_x, max_x) + + # Visualize orientation + table_segment_with_orientation = self.visualize_orientation(min_x, max_x, y_minx, y_maxx, table_segment_with_center) + rospy.loginfo("Orientation visualized") + + # Publish the table segment as a binary mask + table_message = bridge.cv2_to_imgmsg(table_segment_with_orientation, encoding="passthrough") + self.publisher.publish(table_message) + rospy.loginfo("Segmented image with orientation published") + + upwards = self.object_direction(x_center, y_center, slope) + rospy.loginfo("Direction determined") + + else: + rospy.loginfo("No cutlery detected") + + # Method to be able to access values in the top_grasp code + def data_object(self): + return self.x_center, self.y_center, self.length, self.slope, self. upwards, self.time + + +if __name__ == '__main__': + rospy.init_node("cutlery_detector") + ts = YoloSegmentor() + ts.start() + + rospy.spin() \ No newline at end of file diff --git a/robot_smach_states/src/robot_smach_states/manipulation/gripper_tf2_listener.py b/robot_smach_states/src/robot_smach_states/manipulation/gripper_tf2_listener.py new file mode 100755 index 0000000000..561cbff1a7 --- /dev/null +++ b/robot_smach_states/src/robot_smach_states/manipulation/gripper_tf2_listener.py @@ -0,0 +1,39 @@ +import rospy +import tf2_ros +import geometry_msgs.msg +from pykdl_ros import FrameStamped + +if __name__ == '__main__': + rospy.init_node('tf2_gripper_listener') + + tfBuffer = tf2_ros.Buffer() + listener = tf2_ros.TransformListener(tfBuffer) + + gripper_coordinates = rospy.Publisher('/gripper_coordinates', geometry_msgs.msg.PoseStamped, queue_size=1) + + rate = rospy.Rate(10.0) + + while not rospy.is_shutdown(): + try: + gripper_id = FrameStamped.from_xyz_rpy(0, 0, 0, 0, 0, 0, rospy.Time(), frame_id="hand_palm_link") #generates the frame of the gripper + gripper_bl = tfBuffer.lookup_transform("base_link", "hand_palm_link", rospy.Time()) #calculates the transform between the origin of the base frame and that of the gripper frame + + # PoseStamped coordinates + # obtains coordinates and orientation of the gripper transformed into the coordinate frame of the robot's base + msg = geometry_msgs.msg.PoseStamped() + msg.header = gripper_bl.header + msg.pose.position.x = gripper_bl.transform.translation.x + msg.pose.position.y = gripper_bl.transform.translation.y + msg.pose.position.z = gripper_bl.transform.translation.z + msg.pose.orientation = gripper_bl.transform.rotation + + # publishing coordinates + gripper_coordinates.publish(msg) + + + except (tf2_ros.LookupException, tf2_ros.ConnectivityException, tf2_ros.ExtrapolationException): + + rate.sleep() + continue + + rate.sleep() \ No newline at end of file diff --git a/robot_smach_states/src/robot_smach_states/manipulation/test_detection_imported_image.py b/robot_smach_states/src/robot_smach_states/manipulation/test_detection_imported_image.py new file mode 100644 index 0000000000..abe91d30c1 --- /dev/null +++ b/robot_smach_states/src/robot_smach_states/manipulation/test_detection_imported_image.py @@ -0,0 +1,169 @@ +import rospy +import cv2 +import numpy as np +import math +from ultralytics import YOLO +from cv_bridge import CvBridge +from sensor_msgs.msg import Image + +class YoloSegmentor: + def __init__(self) -> None: + model_path = "~/MEGA/developers/Donal/yolov8x-seg.pt" + device = "cuda" + self.model = YOLO(model_path).to(device) + self.class_ids = [42, 43, 44] # See the COCO dataset for class id to label info (fork = 42, knife = 43, spoon = 44) + + def detect(self, frame): + results = self.model(frame) + global result # Make 'result' global such that in can be accessed in the LeastSquaresMethod class + result = results[0] + segmentation_contours_idx = [np.array(seg, dtype=np.int32) for seg in result.masks.xy] + + + print(f"RESULTSS= {results}") + print(f"RESULT = {result}") + + class_ids = np.array(result.boxes.cls.cpu(), dtype="int") + + + # Collecting boxes and masks for the specified class IDs + global filtered_masks + cutlery_boxes = [] + filtered_masks = [] + for i, box in enumerate(result.boxes): + if int(box.cls) in self.class_ids: + cutlery_boxes.append(box) + filtered_masks.append(result.masks.xy[i]) + + x_center = None + y_center = None + + # Check if cutlery_boxes is empty + if not cutlery_boxes: + coordinates_box = None # No bounding boxes found + else: + # Access the xywh attribute of the first box in cutlery_boxes + coordinates_box_tensor = cutlery_boxes[0].xywh + if coordinates_box_tensor.numel() == 0: + coordinates_box = None + else: + coordinates_box = coordinates_box_tensor[0].tolist() + x_center, y_center = coordinates_box[0], coordinates_box[1] # Center coordinates of the bounding box + + + print(f"x_center, y_center = {x_center, y_center}") #print center coordinates + return class_ids, segmentation_contours_idx, x_center, y_center # outputs an integer with the name of the detected object as well as a segmentation of the object's contour + + def extract_table_segment(self, image, class_ids, segmentations): + table_mask = np.zeros_like(image, dtype=np.uint8) + for class_id, seg in zip(class_ids, segmentations): + if class_id in self.class_ids: + cv2.fillPoly(table_mask, [seg], color=(255, 0, 255)) #If fork, knife or spoon a pink/purple mask will be created + else: + cv2.fillPoly(table_mask, [seg], color=(255, 0, 0)) #If another object from the COCO dataset a red mask will be created + return table_mask + + def process_image(self, cv_image): + classes, segmentations, x_center, y_center = self.detect(cv_image) + table_segment = self.extract_table_segment(cv_image, classes, segmentations) + return table_segment, x_center, y_center + + +class LeastSquaresMethod: + def __init__(self): + self.intercept = 0 + self.slope = 0 + + def fit(self, x, y): + self.slope, self.intercept = self.calculate_slope_intercept(x, y) + + def calculate_slope_intercept(self, x, y): + mean_x = np.mean(x) + mean_y = np.mean(y) + numerator = np.sum((x - mean_x) * (y - mean_y)) + + min_x = int(min(x)) + max_x = int(max(x)) + + height, width, channels = image.shape + #for nearly vertical cases: + if max_x - min_x < (1/3 * width): + numerator = np.sum((y - mean_y) ** 2) + denominator = np.sum((x - mean_x) * (y - mean_y)) + slope = numerator / denominator + #for all other orientations: + else: + numerator = np.sum((x - mean_x) * (y - mean_y)) + denominator = np.sum((x - mean_x) ** 2) + slope = numerator / denominator + self.slope = slope + intercept = mean_y - slope * mean_x + return slope, intercept + + def predict(self, x): + return self.slope * x + self.intercept + + def object_direction(self, x_center, y_center): + perpendicular_slope = -1/self.slope + perpendicular_intercept = y_center - perpendicular_slope * x_center + + inner_array = filtered_masks[0] + x = inner_array[:,0] + y = inner_array[:,1] + coordinates_upper = 0 + coordinates_lower = 0 + if 0 <= perpendicular_slope < math.inf: + for i in range(len(x)): + if y[i]<= perpendicular_slope*x[i] + perpendicular_intercept: + coordinates_upper += 1 + else: + coordinates_lower += 1 + if -math.inf < perpendicular_slope < 0: + for i in range(len(x)): + if y[i]>= perpendicular_slope*x[i] + perpendicular_intercept: + coordinates_upper += 1 + else: + coordinates_lower += 1 + print("Size outline upper half of the mask:", coordinates_upper) + print("Size outline lower half of the mask:", coordinates_lower) + + if coordinates_upper >= coordinates_lower: + upwards = True + elif coordinates_upper < coordinates_lower: + upwards = False + self.upwards = upwards + + return upwards + +if __name__ == '__main__': + ts = YoloSegmentor() + image = cv2.imread('fork.jpeg') + table_segment, x_center, y_center = ts.process_image(image) + + model = LeastSquaresMethod() + + inner_array = result.masks.xy[0] # result.masks.xy is an array in an array, so has to be unpacked first + + x = inner_array[:,0] # first column = x coordinates of all points of the segmentation mask + y = inner_array[:,1] # second column = y coordinates of all points of the segmentation mask + + model.fit(x, y) #fit a line through all coordinates of the segmentation mask using the least squares method + print(f"Calculated slope: {model.slope}") + print(f"Calculated intercept: {model.intercept}") + + # Draw yellow dot at the center point + if x_center != None: + cv2.circle(table_segment, (int(x_center), int(y_center)), 5, (0, 255, 255), -1) + + # Draw the line created by the least squares method in green, this is only for visualization + min_x = int(min(x)) + max_x = int(max(x)) + + # this should be x,y starting point of line, x,y end point of line + cv2.line(table_segment, (min_x, int(model.predict(min_x))), (max_x, int(model.predict(max_x))), (0, 255, 0), 2) + + upwards = model.object_direction(x_center, y_center) + print("Object direction:", "upwards" if upwards else "downwards") + + cv2.imshow('Table Segment', table_segment) + cv2.waitKey(0) \ No newline at end of file diff --git a/robot_smach_states/src/robot_smach_states/manipulation/top_grasp.py b/robot_smach_states/src/robot_smach_states/manipulation/top_grasp.py new file mode 100644 index 0000000000..7806919d63 --- /dev/null +++ b/robot_smach_states/src/robot_smach_states/manipulation/top_grasp.py @@ -0,0 +1,697 @@ +from __future__ import absolute_import + +# ROS +import PyKDL as kdl +from pykdl_ros import VectorStamped, FrameStamped +import rospy +import smach +import tf2_ros +import math +from geometry_msgs.msg import Twist +import tf.transformations as tft +import numpy as np + +# TU/e Robotics +from ed.entity import Entity +from robot_skills.robot import Robot +from robot_skills.arm.arms import PublicArm, GripperTypes +from robot_smach_states.utility import check_arm_requirements, ResolveArm +from robot_smach_states.util.designators import check_type +from robot_smach_states.navigation.navigate_to_grasp import NavigateToGrasp + +from robot_smach_states.util.designators.arm import ArmDesignator +from robot_smach_states.util.designators.core import Designator + +from robot_smach_states.manipulation.cutlery_detector import YoloSegmentor +from robot_smach_states.manipulation.active_grasp_detector import ActiveGraspDetector +from robot_skills.util.exceptions import TimeOutException + + + +class PrepareGrasp(smach.State): + REQUIRED_ARM_PROPERTIES = {"required_gripper_types": [GripperTypes.GRASPING],} + + def __init__(self, robot: Robot, arm: ArmDesignator) -> None: + """ + Set the arm in the appropriate position before actually grabbing + + :param robot: robot to execute state with + :param arm: Designator that resolves to the arm to grasp with + """ + smach.State.__init__(self, outcomes=['succeeded', 'failed']) + + # Assign member variables + self.robot = robot + self.arm_designator = arm + + def execute(self, userdata=None) -> str: + arm = self.arm_designator.resolve() + if not arm: + rospy.logerr("Could not resolve arm") + return "failed" + + #pre-grasp position in two steps to limit possible collisions with the table + pre_grasp_joint_goal_upwards = [0.69, # arm lift joint. ranges from 0.0 to 0.7m + 0.0, # arm flex joint. lower values move the arm downwards ranges from -2 to 0.0 radians + 0.0, # arm roll joint + -1.57, # wrist flex joint. lower values move the hand down + 0.0] # wrist roll joint. + arm._arm._send_joint_trajectory([pre_grasp_joint_goal_upwards]) # send the command to the robot. + arm.wait_for_motion_done() + #only lifting the arm flex joint + pre_grasp_joint_goal_outwards = [0.69, + -1.57, + 0.0, + -1.57, + 0.0] + arm._arm._send_joint_trajectory([pre_grasp_joint_goal_outwards]) + # Open gripper + arm.gripper.send_goal('open', timeout=0.0) + arm.wait_for_motion_done() + return 'succeeded' + + +class TopGrasp(smach.State): + REQUIRED_ARM_PROPERTIES = {"required_gripper_types": [GripperTypes.GRASPING], + "required_goals": ["carrying_pose"], } + + def __init__(self, robot: Robot, arm: ArmDesignator, grab_entity: Designator) -> None: + """ + Pick up an item given an arm and an entity to be picked up + + :param robot: robot to execute this state with + :param arm: Designator that resolves to the arm to grasp with + """ + smach.State.__init__(self, outcomes=['succeeded', 'failed', 'grasp_failed']) + + # Assign member variables + self.robot = robot + self.arm_designator = arm + + assert self.robot.get_arm(**self.REQUIRED_ARM_PROPERTIES) is not None,\ + "None of the available arms meets all this class's requirements: {}".format(self.REQUIRED_ARM_PROPERTIES) + check_type(grab_entity, Entity) + self.grab_entity_designator = grab_entity + self.yolo_segmentor = YoloSegmentor() + + def execute(self, userdata=None) -> str: + grab_entity = self.grab_entity_designator.resolve() + if not grab_entity: + rospy.logerr("Could not resolve grab_entity") + return "failed" + + arm = self.arm_designator.resolve() + if not arm: + rospy.logerr("Could not resolve arm") + return "failed" + + grasp_succeeded=False + rate = rospy.Rate(10) # loop rate in hz + + +# start segmentation + self.yolo_segmentor.start() + upwards = None + time_difference = 5 + while upwards == None or time_difference > 1: #recency of data is important tehrefore the obtained data should be from within 1 seconds ago + x_cutlery, y_cutlery, length, slope, upwards, time = self.yolo_segmentor.data_object() #obtain cutlery's center point from the detection + time_difference = (rospy.Time.now()- time).to_sec() + #stop segmentation after desired data has been obtained + self.yolo_segmentor.stop() + print('x,y,slope obtained') + rospy.loginfo(f"direction upwards = {upwards}") +#rewrite pixel coordinate into the robot's frame + height_table = 0.735 #manual input of table height + height_gripper = 0.895 #value for gripper position in set pre-grasp-pose + distance_camera = height_gripper - height_table + 0.0045 + + #camera info + P = np.array([ + [205.46963709898583, 0.0, 320.5, -14.382874596929009], + [0.0, 205.46963709898583, 240.5, 0.0], + [0.0, 0.0, 1.0, 0.0] + ]) + + fx = P[0, 0] + fy = P[1, 1] + cx = P[0, 2] + cy = P[1, 2] + T_x = P[0, 3] #Translation along x-axis between camera frame and hand_palm_link + x_n = (x_cutlery - cx - T_x) / fx + y_n = (y_cutlery - cy) / fy + + X_c = distance_camera * x_n + Y_c = distance_camera * y_n + + cutlery_length = distance_camera*length /fx # this can be done since fx is equal to fy + + base_coordinates = np.array([X_c, Y_c, distance_camera, 1]) + x_cutlery_real, y_cutlery_real = base_coordinates[:2] + rospy.loginfo(f"X, Y, length = {x_cutlery_real, y_cutlery_real, cutlery_length}") + print('frame rewritten') + + +#move gripper towards object's determined grasping point + #move towards y coordinates with base + velocity = 0.02 # desired robot speed, relatively slow since ony short distances are to be covered + duration_y = abs(y_cutlery_real)/velocity + + v = Twist() + v.linear.x = 0 + if y_cutlery_real>0: + v.linear.y = -velocity + elif y_cutlery_real<0: + v.linear.y = velocity + v.angular.z = 0 # rotation speed to the left, none desired + start_time = rospy.Time.now() + while (rospy.Time.now() - start_time).to_sec() < duration_y: + self.robot.base._cmd_vel.publish(v) # send command to the robot + + duration_x = abs(x_cutlery_real)/velocity + if x_cutlery_real>0: + v.linear.x = velocity + elif x_cutlery_real<0: + v.linear.x = -velocity + v.linear.y = 0 + v.angular.z = 0 # rotation speed to the left, none desired + + start_time = rospy.Time.now() + while (rospy.Time.now() - start_time).to_sec() < duration_x: + self.robot.base._cmd_vel.publish(v) # send command to the robot + print('moved towards') +#rotate wrist according to orientation + + # Calculate the angle + angle = math.atan(slope) + + #Obtain the arm's current joint positions + joints_arm = arm._arm.get_joint_states() + arm_lift_joint = joints_arm['arm_lift_joint'] + arm_flex_joint = joints_arm['arm_flex_joint'] + arm_roll_joint = joints_arm['arm_roll_joint'] + wrist_flex_joint = joints_arm['wrist_flex_joint'] + wrist_roll_joint = angle + + + if not upwards: #rotate gripper 180 degrees to grasp cutlery in the correct direction + if wrist_roll_joint <= 0: + wrist_roll_joint = 3.14 - abs(wrist_roll_joint) + elif 0 < wrist_roll_joint <= 0.56: + wrist_roll_joint = 3.14 + wrist_roll_joint + elif 1.34 <= wrist_roll_joint < 1.57: + wrist_roll_joint = - 3.14 + wrist_roll_joint + #wrist roll joint values between 1.34 and 0.56 don't have an 180 degrees turned opposite since this is out of range. + #therefore the gripper will go to the closest possible value which has a max inaccuarcy of 15 degrees + elif 0.95 <= wrist_roll_joint < 1.34: + wrist_roll_joint = -1.8 + elif 0.56 < wrist_roll_joint < 0.95: + wrist_roll_joint = 3.7 + + #180 degree rotation of wrist_roll_joint if upwards = false leads to change in gripper coordinates. therefore the gripper should move backwards by 0.024 m (twice the offset between wrist_roll_link and hand_palm_link) + tfBuffer = tf2_ros.Buffer() + listener = tf2_ros.TransformListener(tfBuffer) + rospy.sleep(1) + + base_in_gripper_frame = tfBuffer.lookup_transform("hand_palm_link", "base_link", rospy.Time()) + rospy.loginfo(f"base_gripper_frame = {base_in_gripper_frame}") + + # Convert the original quaternion to a rotation matrix + rotation_matrix = tft.quaternion_matrix([ + base_in_gripper_frame.transform.rotation.x, + base_in_gripper_frame.transform.rotation.y, + base_in_gripper_frame.transform.rotation.z, + base_in_gripper_frame.transform.rotation.w + ]) + + # Desired movement direction in the gripper frame (positive direction) + direction_in_gripper_frame = [ -1, 0, 0, 1 ] + + # Transform the direction vector to the base frame + direction_in_base_frame = rotation_matrix.dot(direction_in_gripper_frame) + + v = Twist() + v.linear.x = direction_in_base_frame[1]/40 #velocity ranges from -0.025 to 0.025 + v.linear.y = direction_in_base_frame[0]/40 + v.angular.z = 0 # Assuming no rotation is needed + duration = 0.024/math.sqrt(v.linear.x**2 + v.linear.y**2) + start_time = rospy.Time.now() + while (rospy.Time.now() - start_time).to_sec() < duration: + self.robot.base._cmd_vel.publish(v) # send command to the robot + + #rotate wrist + cutlery_direction_wrist_joint_goal = [arm_lift_joint, + arm_flex_joint, + arm_roll_joint, + wrist_flex_joint, + wrist_roll_joint] + arm._arm._send_joint_trajectory([cutlery_direction_wrist_joint_goal]) # send the command to the robot. + arm.wait_for_motion_done() + print('wrist rotation done') + +#Moving arm downwards until force detection + + joints_arm = arm._arm.get_joint_states() + arm_flex_joint = joints_arm['arm_flex_joint'] + arm_roll_joint = joints_arm['arm_roll_joint'] + wrist_flex_joint = joints_arm['wrist_flex_joint'] + wrist_roll_joint = joints_arm['wrist_roll_joint'] + move_arm = True + + while not grasp_succeeded and not rospy.is_shutdown(): + # control loop + if (move_arm): + downward_joint_goal = [0, # arm lift joint. ranges from 0.0 to 0.7m + arm_flex_joint, # arm flex joint. lower values move the arm downwards ranges from -2 to 0.0 radians + arm_roll_joint, # arm roll joint + wrist_flex_joint, # wrist flex joint. lower values move the hand down + wrist_roll_joint] # wrist roll joint. + arm._arm._send_joint_trajectory([downward_joint_goal], max_joint_vel = 0.02) # send the command to the robot. + #Move arm downwards, don't wait until motion is done, but until a force is detected + try: + arm._arm.force_sensor.wait_for_edge_up(3.0) # wait 3 seconds for a force detection + except TimeOutException: + rospy.loginfo("No edge up detected within timeout") + + joints_arm = arm._arm.get_joint_states() + arm_lift_joint = joints_arm['arm_lift_joint'] + print(arm_lift_joint) + grasp_joint_goal = [(arm_lift_joint + 0.061), #change this in a position relative to obtained coordinates or table height + arm_flex_joint, + arm_roll_joint, + wrist_flex_joint, + wrist_roll_joint] + arm._arm._send_joint_trajectory([grasp_joint_goal]) # send the command to the robot. + arm.wait_for_motion_done() # wait until the motion is complete + move_arm = False # reset flag to move the arm. + continue # dont wait for the rest of the loop. + + #grasp object + arm.gripper.send_goal('close', timeout=0.0, max_torque = 0.2) + arm.wait_for_motion_done() + + rospy.sleep(5) #wait until gripper is closed before doing grasp detection + #detecting if grasp has succeeded + active_grasp_detector = ActiveGraspDetector(self.robot, self.arm_designator) + grasp_detection = active_grasp_detector.execute() + + if grasp_detection == 'true': + grasp_succeeded = True + + else: # for other options: false, cannot determine and failed, the grasp has not succeeded and grasp_succeeded is therefore false + grasp_succeeded = False + #move arm back + pre_grasp_joint_goal_outwards = [0.69, + -1.57, + 0.0, + -1.57, + 0.0] + arm._arm._send_joint_trajectory([pre_grasp_joint_goal_outwards]) + # Open gripper + arm.gripper.send_goal('open', timeout=0.0) + arm.wait_for_motion_done() + return "grasp_failed" + + + rospy.loginfo("The robot is holding something: {}!".format(grasp_succeeded)) + +# move arm up slightly to limit friction on table + joints_arm = arm._arm.get_joint_states() + arm_lift_joint = joints_arm['arm_lift_joint'] + arm_flex_joint = joints_arm['arm_flex_joint'] + arm_roll_joint = joints_arm['arm_roll_joint'] + wrist_flex_joint = joints_arm['wrist_flex_joint'] + wrist_roll_joint = joints_arm['wrist_roll_joint'] + + arm_upwards_joint_goal = [(arm_lift_joint + 0.005), # arm lift joint. ranges from 0.0 to 0.7m + arm_flex_joint, # arm flex joint. lower values move the arm downwards ranges from -2 to 0.0 radians + arm_roll_joint, # arm roll joint + wrist_flex_joint, # wrist flex joint. lower values move the hand down + wrist_roll_joint] # wrist roll joint. + arm._arm._send_joint_trajectory([arm_upwards_joint_goal]) # send the command to the robot. + +#determine on which side of the table Hero is + tfBuffer = tf2_ros.Buffer() + listener = tf2_ros.TransformListener(tfBuffer) + rospy.sleep(1) + + base_in_table_frame = tfBuffer.lookup_transform("dinner_table", "base_link", rospy.Time()) + rospy.loginfo(f"base_table_frame = {base_in_table_frame}") + + #x and y coordinates of the base in the coordinate frame of the table + x_base = base_in_table_frame.transform.translation.x + y_base = base_in_table_frame.transform.translation.y + +#manual input of table size + table_length = 1.645 + table_width = 0.845 + + #position around the table divided in four quarters: 1 = positive x side, 3 = negative x side, 2 = negative y side, 4 = positive y side + if x_base > 1/2*table_length and (abs(x_base) - 1/2*table_length) >= (abs(y_base) - 1/2*table_width): + position = 1 + elif y_base < -1/2*table_width and (abs(x_base) - 1/2*table_length) < (abs(y_base) - 1/2*table_width): + position = 2 + elif x_base < -1/2*table_length and (abs(x_base) - 1/2*table_length) >= (abs(y_base) - 1/2*table_width): + position = 3 + elif y_base > 1/2*table_width and (abs(x_base) - 1/2*table_length) < (abs(y_base) - 1/2*table_width): + position = 4 + +#Rotate wrist back in line with table edge + tfBuffer = tf2_ros.Buffer() + listener = tf2_ros.TransformListener(tfBuffer) + rospy.sleep(1) + + gripper_in_table_frame = tfBuffer.lookup_transform("dinner_table", "hand_palm_link", rospy.Time()) + rospy.loginfo(f"gripper_table_frame = {gripper_in_table_frame}") + + #Convert the original quaternion to a rotation matrix + rotation_matrix = tft.quaternion_matrix([ + gripper_in_table_frame.transform.rotation.x, + gripper_in_table_frame.transform.rotation.y, + gripper_in_table_frame.transform.rotation.z, + gripper_in_table_frame.transform.rotation.w + ]) + + x_direction_gripper = rotation_matrix[:3, 0] # Extract the x-axis direction vector of the gripper in the table frame + x_direction_gripper_xy = np.array([x_direction_gripper[0], x_direction_gripper[1], 0]) # Project the x-direction vector onto the x-y plane + x_direction_gripper_xy /= np.linalg.norm(x_direction_gripper_xy) # Normalize + + if position == 1: # x-axis gripper should align with negative x-axis table + table_axis = np.array([-1, 0, 0]) + + if position == 2: # x-axis gripper should align with y-axis table + table_axis = np.array([0, 1, 0]) + + if position == 3: # x-axis gripper should align with x-axis table + table_axis = np.array([1, 0, 0]) + + if position == 4: # x-axis gripper should align with negative y-axis table + table_axis= np.array([0, -1, 0]) + + # Calculate the cross product and dot product in the x-y plane + cross_prod = np.cross(table_axis, x_direction_gripper_xy) + dot_prod = np.dot(table_axis, x_direction_gripper_xy) + + # Calculate the angle using atan2 for a more stable solution + angle_to_align = np.arctan2(np.linalg.norm(cross_prod), dot_prod) + + # Determine the direction of rotation + if cross_prod[2] < 0: + angle_to_align = -angle_to_align + + joints_arm = arm._arm.get_joint_states() + arm_lift_joint = joints_arm['arm_lift_joint'] + arm_flex_joint = joints_arm['arm_flex_joint'] + arm_roll_joint = joints_arm['arm_roll_joint'] + wrist_flex_joint = joints_arm['wrist_flex_joint'] + wrist_roll_joint = joints_arm['wrist_roll_joint'] + angle_to_align + + wrist_rotation_back_goal = [arm_lift_joint, + arm_flex_joint, + arm_roll_joint, + wrist_flex_joint, + wrist_roll_joint] + arm._arm._send_joint_trajectory([wrist_rotation_back_goal]) # send the command to the robot. + arm.wait_for_motion_done() + rospy.sleep(5) #wait until the wrist has stopped turning + + print('rotated to align with table edge') +#Move towards table edge + #movement of base should be in negative x direction of the gripper frame + tfBuffer = tf2_ros.Buffer() + listener = tf2_ros.TransformListener(tfBuffer) + rospy.sleep(1) + + base_in_gripper_frame = tfBuffer.lookup_transform("hand_palm_link", "base_link", rospy.Time()) + rospy.loginfo(f"base_gripper_frame = {base_in_gripper_frame}") + + # Convert the original quaternion to a rotation matrix + rotation_matrix = tft.quaternion_matrix([ + base_in_gripper_frame.transform.rotation.x, + base_in_gripper_frame.transform.rotation.y, + base_in_gripper_frame.transform.rotation.z, + base_in_gripper_frame.transform.rotation.w + ]) + + # Desired movement direction in the gripper frame (negative x direction) + direction_in_gripper_frame = [ -1, 0, 0, 1 ] + + # Transform the direction vector to the base frame + direction_in_base_frame = rotation_matrix.dot(direction_in_gripper_frame) + + v = Twist() + v.linear.x = direction_in_base_frame[0]/40 #velocity ranges from -0.025 to 0.025 + v.linear.y = direction_in_base_frame[1]/40 + v.angular.z = 0 # Assuming no rotation is needed + + tfBuffer = tf2_ros.Buffer() + listener = tf2_ros.TransformListener(tfBuffer) + rospy.sleep(1) + + gripper_in_table_frame = tfBuffer.lookup_transform("dinner_table", "hand_palm_link", rospy.Time()) + rospy.loginfo(f"gripper_table_frame = {gripper_in_table_frame}") + + #this ensures 4 to 4.5 cm of the object sticks out over the edge. a margin of 5 mm is taken into account here for wheel slip + distance_gripper_to_edge = 0.015 + while abs(gripper_in_table_frame.transform.translation.x) < (1/2*table_length - distance_gripper_to_edge) and abs(gripper_in_table_frame.transform.translation.y) < (1/2*table_width - distance_gripper_to_edge): + self.robot.base._cmd_vel.publish(v) # send command to the robot + gripper_in_table_frame = tfBuffer.lookup_transform("dinner_table", "hand_palm_link", rospy.Time()) + + rospy.sleep(5) + + #open gripper + arm.gripper.send_goal('open', timeout=0.0) + arm.wait_for_motion_done() + + #move farther away until grasp distance from edge o grasp at 2cm from cutlery's outer end (half gripper width) + duration = 0.05/math.sqrt(v.linear.x**2 + v.linear.y**2) + start_time = rospy.Time.now() + while (rospy.Time.now() - start_time).to_sec() < duration: + self.robot.base._cmd_vel.publish(v) # send command to the robot + +#rotate wrist around the cutlery, towards pre-grasp pose for a sideways grasp + #determine towards which side hero will grasp the object + x_gripper = gripper_in_table_frame.transform.translation.x, + y_gripper = gripper_in_table_frame.transform.translation.y + side = None + if (position == 1 and y_gripper >= (y_base - 0.078)) or (position == 2 and x_gripper >= (x_base - 0.078)) or (position == 3 and y_gripper <= (y_base + 0.078)) or (position == 4 and x_gripper <= (x_base + 0.078)): + side = 'right' #gripper is right of base + else: + side = 'left' #gripper is left of base + print(side, position) + + tfBuffer = tf2_ros.Buffer() + listener = tf2_ros.TransformListener(tfBuffer) + rospy.sleep(1) + + base_in_gripper_frame = tfBuffer.lookup_transform("hand_palm_link", "base_link", rospy.Time()) + rospy.loginfo(f"base_in_gripper_frame = {base_in_gripper_frame}") + + #Convert the original quaternion to a rotation matrix + rotation_matrix = tft.quaternion_matrix([ + base_in_gripper_frame.transform.rotation.x, + base_in_gripper_frame.transform.rotation.y, + base_in_gripper_frame.transform.rotation.z, + base_in_gripper_frame.transform.rotation.w + ]) + + x_direction_base = rotation_matrix[:3, 0] # Extract the x-axis direction vector of the gripper in the table frame + negative_x_direction_base_xy = np.array([x_direction_base[0], x_direction_base[1], 0]) # Project the x-direction vector onto the x-y plane + negative_x_direction_base_xy /= np.linalg.norm(negative_x_direction_base_xy) # Normalize + x_axis_gripper_xy = np.array([1, 0, 0]) + + # x-axis base should align with x-axis gripper + cross_prod = np.cross(x_axis_gripper_xy, negative_x_direction_base_xy) + dot_prod = np.dot(x_axis_gripper_xy, negative_x_direction_base_xy) + angle_to_align = np.arctan2(np.linalg.norm(cross_prod), dot_prod) + + # Determine the direction of rotation + if cross_prod[2] < 0: + angle_to_align = -angle_to_align + rotation_y = 1 - abs(angle_to_align)/1.57 + rotation_x = abs(angle_to_align)/1.57 + tfBuffer = tf2_ros.Buffer() + listener = tf2_ros.TransformListener(tfBuffer) + rospy.sleep(1) + + base_in_gripper_frame = tfBuffer.lookup_transform("hand_palm_link", "base_link", rospy.Time()) + rospy.loginfo(f"base_gripper_frame = {base_in_gripper_frame}") + + # Convert the original quaternion to a rotation matrix + rotation_matrix = tft.quaternion_matrix([ + base_in_gripper_frame.transform.rotation.x, + base_in_gripper_frame.transform.rotation.y, + base_in_gripper_frame.transform.rotation.z, + base_in_gripper_frame.transform.rotation.w + ]) + + # Desired movement direction in the gripper frame (positive direction) + direction_in_gripper_frame = [ 0, 1, 0, 1 ] + + # Transform the direction vector to the base frame + direction_in_base_frame = rotation_matrix.dot(direction_in_gripper_frame) + + if side == 'right': + v = Twist() + v.linear.x = -direction_in_base_frame[0]/40 #velocity ranges from -0.025 to 0.025 + v.linear.y = -direction_in_base_frame[1]/40 + v.angular.z = 0 # Assuming no rotation is needed + duration = 0.2305/math.sqrt(v.linear.x**2 + v.linear.y**2) + start_time = rospy.Time.now() + while (rospy.Time.now() - start_time).to_sec() < duration: + self.robot.base._cmd_vel.publish(v) # send command to the robot + rospy.sleep(5) #wait until base is done moving + + tfBuffer = tf2_ros.Buffer() + listener = tf2_ros.TransformListener(tfBuffer) + rospy.sleep(1) + gripper_in_base_frame = tfBuffer.lookup_transform("base_link", "hand_palm_link",rospy.Time()) + rospy.loginfo(f"base_gripper_frame = {gripper_in_base_frame}") + + + base_to_gripper = self.frame_from_xyzrpy((gripper_in_base_frame.transform.translation.x + 0.1405*rotation_x), # x distance to the robot + (gripper_in_base_frame.transform.translation.y - 0.1405*rotation_y), # y distance off center from the robot (fixed if rpy=0) + (gripper_in_base_frame.transform.translation.z - 0.095), # z height of the gripper + 1.57, 0, (-1.57+angle_to_align)) + + pose_goal = FrameStamped(base_to_gripper, + rospy.Time.now(), #timestamp when this pose was created + "base_link" # the frame in which the pose is expressed. base link lies in the center of the robot at the height of the floor. + ) + arm.send_goal(pose_goal) # send the command to the robot. + arm.wait_for_motion_done() # wait until the motion is complete + + if side == 'left': + v = Twist() + v.linear.x = direction_in_base_frame[0]/40 #velocity ranges from -0.025 to 0.025 + v.linear.y = direction_in_base_frame[1]/40 + v.angular.z = 0 # Assuming no rotation is needed + duration = 0.2305/math.sqrt(v.linear.x**2 + v.linear.y**2) + start_time = rospy.Time.now() + while (rospy.Time.now() - start_time).to_sec() < duration: + self.robot.base._cmd_vel.publish(v) # send command to the robot + rospy.sleep(5) #wait until base is done moving + + tfBuffer = tf2_ros.Buffer() + listener = tf2_ros.TransformListener(tfBuffer) + rospy.sleep(1) + gripper_in_base_frame = tfBuffer.lookup_transform("base_link", "hand_palm_link",rospy.Time()) + rospy.loginfo(f"base_gripper_frame = {gripper_in_base_frame}") + + + base_to_gripper = self.frame_from_xyzrpy((gripper_in_base_frame.transform.translation.x+ 0.1405*rotation_x), # x distance to the robot + (gripper_in_base_frame.transform.translation.y + 0.1405*rotation_y), # y distance off center from the robot (fixed if rpy=0) + (gripper_in_base_frame.transform.translation.z -0.095), # z height of the gripper + -1.57, 0, (1.57 +angle_to_align)) + + pose_goal = FrameStamped(base_to_gripper, + rospy.Time.now(), #timestamp when this pose was created + "base_link" # the frame in which the pose is expressed. base link lies in the center of the robot at the height of the floor. + ) + arm.send_goal(pose_goal) # send the command to the robot. + arm.wait_for_motion_done() # wait until the motion is complete + + +#close gripper + arm.gripper.send_goal('close', timeout=0.0, max_torque = 0.3) # option given by max_torque to close the gripper with more force + arm.wait_for_motion_done() + + #detecting if grasp has succeeded + rospy.sleep(5) + active_grasp_detector = ActiveGraspDetector(self.robot, self.arm_designator) + grasp_detection = active_grasp_detector.execute() + + + if grasp_detection == 'true': + grasp_succeeded = True + else: # for other options: false, cannot determine and failed, the grasp has not succeeded and grasp_succeeded is therefore false + grasp_succeeded = False + return "failed" + + rospy.loginfo("The robot is holding something: {}!".format(grasp_succeeded)) + + +#lift up, go to original stance + + + + return "succeeded" + + @staticmethod + def frame_from_xyzrpy(x, y, z, roll, pitch, yaw): + """ + Helper function to create a kdl frame based on an x,y,z position and a RPY rotation + """ + return kdl.Frame(kdl.Rotation.RPY(roll, pitch, yaw), kdl.Vector(x, y, z)) + + +class ResetOnFailure(smach.State): + """ Class to reset the robot after a grab has failed """ + + REQUIRED_ARM_PROPERTIES = {"required_gripper_types": [GripperTypes.GRASPING], } + + def __init__(self, robot, arm): + """ + Constructor + + :param robot: robot object + :param arm: arm designator + """ + smach.State.__init__(self, outcomes=['done']) + + self._robot = robot + self.arm_designator = arm + + def execute(self, userdata=None): + """ Execute hook """ + arm = self.arm_designator.resolve() + arm.reset() + + if self._robot.robot_name == "hero": + self._robot.torso.reset() # Move up to make resetting of the arm safer. + if arm is not None: + arm.gripper.send_goal('open') + self._robot.head.reset() # Sends a goal + self._robot.head.cancel_goal() # And cancels it... + if arm is not None: + arm.reset() + self._robot.torso.reset() + return 'done' + + +class TopGrab(smach.StateMachine): + def __init__(self, robot: Robot, item: Designator, arm: ArmDesignator): + """ + Let the given robot move to an entity and grab that entity using some arm + + :param robot: Robot to use + :param item: Designator that resolves to the item to grab. E.g. EntityByIdDesignator + :param arm: Designator that resolves to the arm to use for grabbing. E.g. UnoccupiedArmDesignator + """ + smach.StateMachine.__init__(self, outcomes=['done', 'failed']) + + # Check types or designator resolve types + check_type(arm, PublicArm) + + with self: + smach.StateMachine.add('RESOLVE_ARM', ResolveArm(arm, self), + transitions={'succeeded': 'NAVIGATE_TO_GRAB', + 'failed': 'failed'}) + + smach.StateMachine.add('NAVIGATE_TO_GRAB', NavigateToGrasp(robot, arm, item), + transitions={'unreachable': 'RESET_FAILURE', + 'goal_not_defined': 'RESET_FAILURE', + 'arrived': 'PREPARE_GRASP'}) + + smach.StateMachine.add('PREPARE_GRASP', PrepareGrasp(robot, arm), + transitions={'succeeded': 'GRAB', + 'failed': 'RESET_FAILURE'}) + + smach.StateMachine.add('GRAB', TopGrasp(robot, arm, item), + transitions={'succeeded': 'done', + 'failed': 'RESET_FAILURE', + 'grasp_failed': 'GRAB'}) + + smach.StateMachine.add("RESET_FAILURE", ResetOnFailure(robot, arm), + transitions={'done': 'failed'}) + + check_arm_requirements(self, robot) \ No newline at end of file