From 79f005fe68e7f3cb623efcaaf17544590df403ed Mon Sep 17 00:00:00 2001 From: mkhoshnam Date: Tue, 26 Nov 2024 16:06:57 +0100 Subject: [PATCH 1/3] Add gymnasium_interface files --- src/gymnasium_interface/example.py | 54 +++++++ src/gymnasium_interface/pycram_gym_env.py | 111 ++++++++++++++ src/gymnasium_interface/task_executor.py | 175 ++++++++++++++++++++++ 3 files changed, 340 insertions(+) create mode 100644 src/gymnasium_interface/example.py create mode 100644 src/gymnasium_interface/pycram_gym_env.py create mode 100644 src/gymnasium_interface/task_executor.py diff --git a/src/gymnasium_interface/example.py b/src/gymnasium_interface/example.py new file mode 100644 index 000000000..8a3d5f95b --- /dev/null +++ b/src/gymnasium_interface/example.py @@ -0,0 +1,54 @@ +import logging +from gymnasium_interface.pycram_gym_env import PyCRAMGymEnv +from pycram.datastructures.enums import Arms, Grasp +from pycram.datastructures.pose import Pose + +# Configure logging +logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") + +def custom_reward(state): + """ + Custom reward function. + + :param state: The current state of the environment. + :type state: dict + :return: Reward value based on the state. + :rtype: float + """ + return 10.0 if state else -1.0 + +# Define actions as a list of strings +actions = ["navigate", "pick_up"] + +# Define default parameters for each action +default_params = { + "navigate": {"target_pose": Pose(position=[1.0, 2.0, 0.0], orientation=[0.0, 0.0, 0.0, 1.0])}, + "pick_up": {"object_desig": "milk", "arm": Arms.RIGHT, "grasps": [Grasp.FRONT]}, +} + +# Define objects to initialize in the environment +objects = [ + { + "name": "milk", + "type": "object", + "urdf": "milk.stl", + "pose": Pose(position=[2.5, 2.10, 1.02]), + } +] + +# Initialize the Gymnasium environment +env = PyCRAMGymEnv(actions=actions, default_params=default_params, objects=objects, reward_function=custom_reward) + +# Reset the environment and retrieve the initial state +state, info = env.reset() +logging.info(f"State after reset: {state}") + +# Perform a step in the environment +try: + state, reward, done, truncated, info = env.step( + action=1, # Index of the action to execute + params={"object_desig": "milk", "arm": Arms.RIGHT, "grasps": [Grasp.FRONT]}, + ) + logging.info(f"State after step: {state}, Reward: {reward}, Done: {done}, Truncated: {truncated}") +except ValueError as e: + logging.error(f"Action failed: {e}") diff --git a/src/gymnasium_interface/pycram_gym_env.py b/src/gymnasium_interface/pycram_gym_env.py new file mode 100644 index 000000000..c64eb299c --- /dev/null +++ b/src/gymnasium_interface/pycram_gym_env.py @@ -0,0 +1,111 @@ +import gymnasium as gym +from gymnasium.spaces import Discrete +from gymnasium_interface.task_executor import PyCRAMTaskExecutor # Use absolute import +from pycram.process_module import simulated_robot + + +class PyCRAMGymEnv(gym.Env): + """ + A Gymnasium-compatible environment for integrating PyCRAM task execution. + + This environment allows users to execute PyCRAM tasks within a Gymnasium-compatible + framework. It supports dynamic task initialization, state tracking, and custom reward + calculations. + + :param actions: List of valid action classes or functions (e.g., [NavigateAction, PickUpAction]). + :type actions: list + :param default_params: Default parameters for each action, keyed by action class/function (optional). + :type default_params: dict + :param objects: List of objects to initialize in the environment (optional). + :type objects: list + :param reward_function: Custom user-defined function to compute rewards (optional). + :type reward_function: callable + """ + + def __init__(self, actions, default_params=None, objects=None, reward_function=None): + self.actions = actions + self.default_params = default_params or {} + self.objects = objects or [] + self.reward_function = reward_function + + # Dynamically define the action space + self.action_space = Discrete(len(actions)) + + # Initialize the task executor + self.executor = PyCRAMTaskExecutor() + + # Initialize the state + self.state = None + self.reset() + + def reset(self): + """ + Resets the environment. + + :return: The initial state of the environment. + :rtype: tuple + """ + with simulated_robot: + self.executor.reset_task(self.objects) + self.state = self.executor.get_current_state() + return self.state, {} + + def step(self, action, params=None): + """ + Executes a step in the environment. + + :param action: The action index to execute. + :type action: int + :param params: Additional parameters for the action. + :type params: dict, optional + :return: A tuple containing the next state, reward, done flag, truncated flag, and additional info. + :rtype: tuple + """ + with simulated_robot: + action_name = self.actions[action] + action_params = self.default_params.get(action_name, {}).copy() + if params: + action_params.update(params) + + # Execute the action + self.executor.execute_action(action_name, action_params) + + # Update the state + self.state = self._get_observation() + + # Calculate reward + reward = self._calculate_reward() + + # Placeholder: done logic can be updated later + done = self._is_done() + + return self.state, reward, done, False, {} + + def _get_observation(self): + """ + Fetches the current state of the environment. + + :return: The current state of the environment. + :rtype: dict + """ + return self.state + + def _calculate_reward(self): + """ + Calculates the reward using the user-defined reward function. + + :return: The calculated reward. + :rtype: float + """ + if self.reward_function: + return self.reward_function(self.state) + return 1.0 + + def _is_done(self): + """ + Checks if the task is complete. + + :return: True if the task is done, otherwise False. + :rtype: bool + """ + return False diff --git a/src/gymnasium_interface/task_executor.py b/src/gymnasium_interface/task_executor.py new file mode 100644 index 000000000..9beaa4b69 --- /dev/null +++ b/src/gymnasium_interface/task_executor.py @@ -0,0 +1,175 @@ +from pycram.worlds.bullet_world import BulletWorld +from pycram.world_concepts.world_object import Object +from pycram.datastructures.enums import ObjectType, WorldMode, Grasp +from pycram.datastructures.pose import Pose +from pycram.designators.action_designator import NavigateAction, PickUpAction, PlaceAction, OpenAction, CloseAction +from pycram.designators.object_designator import BelieveObject +from pycram.process_module import simulated_robot +import logging + +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') + +class PyCRAMTaskExecutor: + """ + Handles task execution in a PyCRAM environment. This class integrates with BulletWorld for + managing objects and robot tasks in the simulation. + + Attributes: + world (BulletWorld): The BulletWorld instance managing the environment. + robot (Object): The robot object in the environment. + apartment (Object): The apartment or environment object in the simulation. + """ + + def __init__(self): + """ + Initializes the task executor for PyCRAM actions. + """ + self.world = BulletWorld(WorldMode.GUI) + self.robot = None + self.apartment = None + + def clear_world(self): + """ + Removes all objects from the BulletWorld. + """ + logging.info("Clearing all objects from BulletWorld...") + for obj in list(self.world.objects): + obj.remove() + logging.info("All objects removed from BulletWorld.") + + def reset_task(self, objects): + """ + Resets the simulation environment dynamically by clearing the world and adding new objects. + + :param objects: List of objects to be added to the environment. + :type objects: list[dict] + """ + self.clear_world() + + # Reload the apartment URDF + self.apartment = Object("apartment", "environment", "apartment.urdf") + + # Reinitialize the robot + self.robot = Object("pr2", ObjectType.ROBOT, "pr2.urdf", pose=Pose([1.2, 1, 0])) + self.world.robot = self.robot + + # Add dynamic objects + for obj in objects: + name = obj["name"] + obj_type = obj["type"] + urdf = obj["urdf"] + pose = obj["pose"] + + logging.info(f"Adding object: {name}, URDF path: {urdf}, Pose: {pose}") + + existing_object = self.world.get_object_by_name(name) + if existing_object: + logging.info(f"Reusing existing object: {name}") + else: + Object(name, obj_type, urdf, pose=pose) + + logging.info("Environment reset: Apartment, robot, and dynamic objects added.") + + def execute_action(self, action, params): + """ + Executes a PyCRAM action based on the provided parameters. + + :param action: The action to be executed (e.g., "navigate", "pick_up"). + :type action: str + :param params: Parameters required for the action. + :type params: dict + """ + if action == "navigate": + self._navigate(params) + elif action == "pick_up": + self._pick_up(params) + elif action == "place": + self._place(params) + elif action == "open": + self._open(params) + elif action == "close": + self._close(params) + else: + raise ValueError(f"Unknown action: {action}") + + def _navigate(self, params): + """ + Navigates the robot to a target location. + + :param params: Parameters for the navigate action, including "target_pose". + :type params: dict + """ + target_pose = params.get("target_pose") + if not target_pose: + raise ValueError("Missing parameter: target_pose") + NavigateAction(target_locations=[target_pose]).resolve().perform() + + def _pick_up(self, params): + """ + Picks up an object. + + :param params: Parameters for the pick-up action, including "object_desig" and "arm". + :type params: dict + """ + object_name = params.get("object_desig") + arm = params.get("arm") + grasps = params.get("grasps", [Grasp.RIGHT]) + if not object_name or not arm: + raise ValueError("Missing parameters: object_desig and arm are required") + object_desig = BelieveObject(names=[object_name]) + action = PickUpAction( + object_designator_description=object_desig, arms=[arm], grasps=grasps + ).resolve() + action.perform() + + def _place(self, params): + """ + Places an object at a target location. + + :param params: Parameters for the place action, including "object_desig", "target_pose", and "arm". + :type params: dict + """ + object_desig = params.get("object_desig") + target_pose = params.get("target_pose") + arm = params.get("arm") + if not object_desig or not target_pose or not arm: + raise ValueError("Missing parameters: object_desig, target_pose, and arm are required") + PlaceAction(object_designator_description=object_desig, target_locations=[target_pose], arms=[arm]).resolve().perform() + + def _open(self, params): + """ + Opens an object (e.g., a drawer or door). + + :param params: Parameters for the open action, including "handle_desig" and "arm". + :type params: dict + """ + handle_desig = params.get("handle_desig") + arm = params.get("arm") + if not handle_desig or not arm: + raise ValueError("Missing parameters: handle_desig and arm are required") + OpenAction(handle_desig, [arm]).resolve().perform() + + def _close(self, params): + """ + Closes an object (e.g., a drawer or door). + + :param params: Parameters for the close action, including "handle_desig" and "arm". + :type params: dict + """ + handle_desig = params.get("handle_desig") + arm = params.get("arm") + if not handle_desig or not arm: + raise ValueError("Missing parameters: handle_desig and arm are required") + CloseAction(handle_desig, [arm]).resolve().perform() + + def get_current_state(self): + """ + Fetches the current state of the environment, including the robot pose and objects. + + :return: Dictionary containing the robot pose and a list of objects with their poses. + :rtype: dict + """ + robot_pose = self.robot.get_pose() if self.robot else None + objects = [{"name": obj.name, "pose": obj.pose} for obj in self.world.objects] + return {"robot_pose": robot_pose, "objects": objects} From 46eb6a86a72bc5020586b163a6de852acdf1699d Mon Sep 17 00:00:00 2001 From: mkhoshnam Date: Tue, 26 Nov 2024 16:08:12 +0100 Subject: [PATCH 2/3] Add gymnasium_interface files --- src/gymnasium_interface/__init__.py | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 src/gymnasium_interface/__init__.py diff --git a/src/gymnasium_interface/__init__.py b/src/gymnasium_interface/__init__.py new file mode 100644 index 000000000..f5b7cbe7e --- /dev/null +++ b/src/gymnasium_interface/__init__.py @@ -0,0 +1,2 @@ +from gymnasium_interface.pycram_gym_env import PyCRAMGymEnv +from gymnasium_interface.task_executor import PyCRAMTaskExecutor From e04bc72acde4f814f2cd4c79f917e37d58d036af Mon Sep 17 00:00:00 2001 From: mkhoshnam Date: Wed, 27 Nov 2024 15:38:51 +0100 Subject: [PATCH 3/3] Updated task_executor.py --- src/gymnasium_interface/task_executor.py | 75 ++++++++++++------------ 1 file changed, 39 insertions(+), 36 deletions(-) diff --git a/src/gymnasium_interface/task_executor.py b/src/gymnasium_interface/task_executor.py index 9beaa4b69..bec81c537 100644 --- a/src/gymnasium_interface/task_executor.py +++ b/src/gymnasium_interface/task_executor.py @@ -2,25 +2,37 @@ from pycram.world_concepts.world_object import Object from pycram.datastructures.enums import ObjectType, WorldMode, Grasp from pycram.datastructures.pose import Pose -from pycram.designators.action_designator import NavigateAction, PickUpAction, PlaceAction, OpenAction, CloseAction +from pycram.designators.action_designator import ( + NavigateAction, + PickUpAction, + PlaceAction, + OpenAction, + CloseAction, +) from pycram.designators.object_designator import BelieveObject from pycram.process_module import simulated_robot import logging +from typing import Dict, List, Union # Configure logging -logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") + class PyCRAMTaskExecutor: """ - Handles task execution in a PyCRAM environment. This class integrates with BulletWorld for - managing objects and robot tasks in the simulation. + Handles task execution in a PyCRAM environment. This class integrates with BulletWorld + for managing objects and robot tasks in the simulation. Attributes: - world (BulletWorld): The BulletWorld instance managing the environment. - robot (Object): The robot object in the environment. - apartment (Object): The apartment or environment object in the simulation. + world: The BulletWorld instance managing the environment. + robot: The robot object in the environment. + apartment: The apartment or environment object in the simulation. """ + world: BulletWorld + robot: Object + apartment: Object + def __init__(self): """ Initializes the task executor for PyCRAM actions. @@ -29,7 +41,7 @@ def __init__(self): self.robot = None self.apartment = None - def clear_world(self): + def clear_world(self) -> None: """ Removes all objects from the BulletWorld. """ @@ -38,12 +50,11 @@ def clear_world(self): obj.remove() logging.info("All objects removed from BulletWorld.") - def reset_task(self, objects): + def reset_task(self, objects: List[Dict[str, Union[str, Pose]]]) -> None: """ Resets the simulation environment dynamically by clearing the world and adding new objects. :param objects: List of objects to be added to the environment. - :type objects: list[dict] """ self.clear_world() @@ -71,46 +82,42 @@ def reset_task(self, objects): logging.info("Environment reset: Apartment, robot, and dynamic objects added.") - def execute_action(self, action, params): + def execute_action(self, action: str, params: Dict[str, Union[str, Pose, List[Grasp]]]) -> None: """ Executes a PyCRAM action based on the provided parameters. :param action: The action to be executed (e.g., "navigate", "pick_up"). - :type action: str :param params: Parameters required for the action. - :type params: dict - """ - if action == "navigate": - self._navigate(params) - elif action == "pick_up": - self._pick_up(params) - elif action == "place": - self._place(params) - elif action == "open": - self._open(params) - elif action == "close": - self._close(params) + """ + action_map = { + "navigate": self._navigate, + "pick_up": self._pick_up, + "place": self._place, + "open": self._open, + "close": self._close, + } + + if action in action_map: + action_map[action](params) else: raise ValueError(f"Unknown action: {action}") - def _navigate(self, params): + def _navigate(self, params: Dict[str, Pose]) -> None: """ Navigates the robot to a target location. :param params: Parameters for the navigate action, including "target_pose". - :type params: dict """ target_pose = params.get("target_pose") if not target_pose: raise ValueError("Missing parameter: target_pose") NavigateAction(target_locations=[target_pose]).resolve().perform() - def _pick_up(self, params): + def _pick_up(self, params: Dict[str, Union[str, List[Grasp]]]) -> None: """ Picks up an object. :param params: Parameters for the pick-up action, including "object_desig" and "arm". - :type params: dict """ object_name = params.get("object_desig") arm = params.get("arm") @@ -123,12 +130,11 @@ def _pick_up(self, params): ).resolve() action.perform() - def _place(self, params): + def _place(self, params: Dict[str, Union[str, Pose]]) -> None: """ Places an object at a target location. :param params: Parameters for the place action, including "object_desig", "target_pose", and "arm". - :type params: dict """ object_desig = params.get("object_desig") target_pose = params.get("target_pose") @@ -137,12 +143,11 @@ def _place(self, params): raise ValueError("Missing parameters: object_desig, target_pose, and arm are required") PlaceAction(object_designator_description=object_desig, target_locations=[target_pose], arms=[arm]).resolve().perform() - def _open(self, params): + def _open(self, params: Dict[str, str]) -> None: """ Opens an object (e.g., a drawer or door). :param params: Parameters for the open action, including "handle_desig" and "arm". - :type params: dict """ handle_desig = params.get("handle_desig") arm = params.get("arm") @@ -150,12 +155,11 @@ def _open(self, params): raise ValueError("Missing parameters: handle_desig and arm are required") OpenAction(handle_desig, [arm]).resolve().perform() - def _close(self, params): + def _close(self, params: Dict[str, str]) -> None: """ Closes an object (e.g., a drawer or door). :param params: Parameters for the close action, including "handle_desig" and "arm". - :type params: dict """ handle_desig = params.get("handle_desig") arm = params.get("arm") @@ -163,12 +167,11 @@ def _close(self, params): raise ValueError("Missing parameters: handle_desig and arm are required") CloseAction(handle_desig, [arm]).resolve().perform() - def get_current_state(self): + def get_current_state(self) -> Dict[str, Union[Pose, List[Dict[str, Pose]]]]: """ Fetches the current state of the environment, including the robot pose and objects. :return: Dictionary containing the robot pose and a list of objects with their poses. - :rtype: dict """ robot_pose = self.robot.get_pose() if self.robot else None objects = [{"name": obj.name, "pose": obj.pose} for obj in self.world.objects]