-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Receptionist find empty seat2 #1316
base: master
Are you sure you want to change the base?
Changes from all commits
d881ca7
d1b9645
513055b
652f568
f1d0a1c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,177 @@ | ||
#! /usr/bin/env python | ||
|
||
from __future__ import print_function | ||
|
||
from typing import List | ||
|
||
import rospy | ||
from ed.entity import Entity | ||
|
||
from robot_smach_states.human_interaction import Say | ||
from robot_smach_states.designator_iterator import IterateDesignator | ||
from robot_smach_states.util.designators import LockingDesignator | ||
from robot_smach_states.world_model import CheckSeatEmpty | ||
from robot_smach_states.reset import ResetArms | ||
from robot_smach_states.utility import LockDesignator | ||
from challenge_receptionist.point_at_receptionist import PointAtReception | ||
import robot_smach_states.util.designators as ds | ||
import smach | ||
|
||
|
||
class SeatsInRoomDesignator(ds.Designator): | ||
def __init__(self, robot, seat_ids, room, name=None): | ||
super(SeatsInRoomDesignator, self).__init__(resolve_type=[Entity], name=name) | ||
|
||
self.robot = robot | ||
|
||
ds.check_type(seat_ids, [str]) | ||
ds.check_type(room, Entity) | ||
|
||
self.room = room | ||
self.seat_ids = seat_ids | ||
|
||
def _resolve(self): | ||
room = self.room.resolve() if hasattr(self.room, 'resolve') else self.room # type: Entity | ||
if not room: | ||
rospy.logwarn("Room is None, so cannot find seats there") | ||
return None | ||
seats = [self.robot.ed.get_entity(seat_id) for seat_id in self.seat_ids] # type: List[Entity] | ||
|
||
true_seats = [seat for seat in seats if seat is not None] # get_entity returns None if entity does not exist | ||
|
||
seats_in_room = room.entities_in_volume(true_seats, "in") | ||
|
||
return seats_in_room | ||
|
||
def __repr__(self): | ||
return "SeatsInRoomDesignator({}, {}, {}, {})".format(self.robot, self.seat_ids, self.room, self.name) | ||
|
||
|
||
class FindEmptySeat(smach.StateMachine): | ||
""" | ||
Iterate over all seat-type objects and check that their 'on-top-of' volume is empty | ||
That can be done with an Inspect and then query for any Entities inside that volume. | ||
If there are none, then the seat is empty | ||
""" | ||
|
||
Comment on lines
+51
to
+56
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add information on the arguments in the docstring. What do the arguments mean? |
||
def __init__(self, robot, seats_to_inspect: dict, room, fit_supporting_entity=False, seat_is_for=None): | ||
smach.StateMachine.__init__(self, outcomes=['succeeded', 'failed']) | ||
|
||
seats_volumes_des = ds.VariableDesignator(seats_to_inspect) | ||
seats = LockingDesignator(SeatsInRoomDesignator(robot, list(seats_to_inspect.keys()), room, "seats_in_room")) | ||
seat_ent_des = ds.VariableDesignator(resolve_type=Entity) | ||
seat_ent_uuid_des = ds.AttrDesignator(seat_ent_des, 'uuid', resolve_type=str) | ||
volumes_des = ds.ValueByKeyDesignator(seats_volumes_des, seat_ent_uuid_des, resolve_type=[str], | ||
name="volumes_des") | ||
volume_des = ds.VariableDesignator(resolve_type=str) | ||
|
||
if seat_is_for: | ||
ds.check_type(seat_is_for, str) | ||
else: | ||
seat_is_for = ds.Designator(' ') | ||
|
||
with self: | ||
smach.StateMachine.add('SAY_LETS_FIND_SEAT', | ||
Say(robot, | ||
["Let me find a place for {name} to sit. Please be patient while I check " | ||
"out where there's place to sit"], | ||
name=seat_is_for, | ||
block=False), | ||
transitions={'spoken': 'LOCK_DESIGNATOR'}) | ||
|
||
smach.StateMachine.add('LOCK_DESIGNATOR', LockDesignator(seats), | ||
transitions={'locked': 'ITERATE_NEXT_SEAT'}) | ||
|
||
smach.StateMachine.add('ITERATE_NEXT_SEAT', | ||
IterateDesignator(seats, seat_ent_des.writeable), | ||
transitions={'next': 'ITERATE_NEXT_VOLUME', | ||
'stop_iteration': 'SAY_NO_EMPTY_SEATS'}) | ||
|
||
smach.StateMachine.add('ITERATE_NEXT_VOLUME', | ||
IterateDesignator(volumes_des, volume_des.writeable), | ||
transitions={'next': 'CHECK_SEAT_EMPTY', | ||
'stop_iteration': 'ITERATE_NEXT_SEAT'}) | ||
|
||
# ToDO: THis has to be modified once the CheckSeatEmpty state is ready! | ||
smach.StateMachine.add('CHECK_SEAT_EMPTY', | ||
CheckSeatEmpty(robot, seat_ent_des, volume_des, None), | ||
transitions={'occupied': 'ITERATE_NEXT_VOLUME', | ||
'empty': 'POINT_AT_EMPTY_SEAT', | ||
'partially_occupied': 'POINT_AT_PARTIALLY_OCCUPIED_SEAT', | ||
'failed': 'ITERATE_NEXT_VOLUME'}) | ||
|
||
smach.StateMachine.add('POINT_AT_EMPTY_SEAT', | ||
PointAtReception(robot=robot, | ||
arm_designator=ds.UnoccupiedArmDesignator(robot, { | ||
'required_goals': ['point_at']}), | ||
point_at_designator=seat_ent_des, | ||
volume=volume_des, | ||
look_at_designator=seat_ent_des), | ||
transitions={"succeeded": "SAY_SEAT_EMPTY", | ||
"failed": "SAY_SEAT_EMPTY"}) | ||
|
||
smach.StateMachine.add('SAY_SEAT_EMPTY', | ||
Say(robot, | ||
["Please sit on the {seat}, {name}!"], | ||
name=seat_is_for, | ||
seat=ds.AttrDesignator(seat_ent_des, 'uuid', resolve_type=str), | ||
block=True), | ||
transitions={'spoken': 'RESET_SUCCESS'}) | ||
|
||
smach.StateMachine.add('POINT_AT_PARTIALLY_OCCUPIED_SEAT', | ||
PointAtReception(robot=robot, | ||
arm_designator=ds.UnoccupiedArmDesignator(robot, { | ||
'required_goals': ['point_at']}), | ||
point_at_designator=seat_ent_des, | ||
volume=volume_des, | ||
look_at_designator=seat_ent_des), | ||
transitions={"succeeded": "SAY_SEAT_PARTIALLY_OCCUPIED", | ||
"failed": "SAY_SEAT_PARTIALLY_OCCUPIED"}) | ||
|
||
smach.StateMachine.add('SAY_SEAT_PARTIALLY_OCCUPIED', | ||
Say(robot, | ||
["I think there's some space left here where you can sit {name}," | ||
"Please leave some space for any potential future guests!"], | ||
name=seat_is_for, | ||
block=True), | ||
transitions={'spoken': 'RESET_SUCCESS'}) | ||
|
||
smach.StateMachine.add('SAY_NO_EMPTY_SEATS', | ||
Say(robot, | ||
["Sorry, there are no empty seats. I guess you just have to stand {name}"], | ||
name=seat_is_for, | ||
block=True), | ||
transitions={'spoken': 'RESET_FAIL'}) | ||
|
||
smach.StateMachine.add('RESET_FAIL', | ||
ResetArms(robot), | ||
transitions={'done': 'failed'}) | ||
|
||
smach.StateMachine.add('RESET_SUCCESS', | ||
ResetArms(robot), | ||
transitions={'done': 'succeeded'}) | ||
|
||
|
||
if __name__ == "__main__": | ||
import sys | ||
from robot_skills import get_robot | ||
|
||
if len(sys.argv) < 4: | ||
print( | ||
"Please provide robot_name, room and seats_to_inspect as arguments. Eg. 'hero livingroom dinner_table bar " | ||
"dinnertable", ) | ||
sys.exit(1) | ||
|
||
robot_name = sys.argv[1] | ||
room = sys.argv[2] | ||
seats_to_inspect = sys.argv[3:] | ||
|
||
seats_to_inspect = {seat: ['on_top_of'] for seat in seats_to_inspect} | ||
|
||
rospy.init_node('test_find_emtpy_seat') | ||
robot = get_robot(robot_name) | ||
|
||
sm = FindEmptySeat(robot, | ||
seats_to_inspect=seats_to_inspect, | ||
room=ds.EntityByIdDesignator(robot, room)) | ||
sm.execute() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,6 @@ | ||
# System | ||
import time | ||
|
||
from typing import List | ||
|
||
import rospy | ||
|
@@ -8,27 +11,53 @@ | |
from ed.entity import Entity | ||
from robot_smach_states.util import designators as ds | ||
from robot_smach_states.human_interaction import FindPeople | ||
from .world_model import look_at_segmentation_area | ||
|
||
|
||
class LookAtEntity(smach.State): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this function is more at place in world_model.py |
||
""" Look at an entity. This assumes the robot is already in front of the object """ | ||
|
||
def __init__(self, robot, entity, volume=None): | ||
""" Constructor | ||
|
||
:param robot: robot object | ||
:param entity: Ed Entity indicating the entity to look at | ||
:param volume: Volume string indicating the specific volume to look at (e.g., 'on_top_of') | ||
""" | ||
smach.State.__init__(self, outcomes=["done"]) | ||
self.robot = robot | ||
ds.check_type(entity, Entity) | ||
self.entity = ds.value_or_resolve(entity) | ||
if volume is not None: | ||
ds.check_type(volume, Entity) | ||
self.volume = ds.value_or_resolve(volume) | ||
|
||
def execute(self): | ||
""" Looks at the entity and updates its pose using the update kinect service """ | ||
if self.volume: | ||
look_at_segmentation_area(self.robot, self.entity, self.volume) | ||
else: | ||
look_at_segmentation_area(self.robot, self.entity) | ||
# This is needed because the head is not entirely still when the look_at_point function finishes | ||
time.sleep(0.5) | ||
Comment on lines
+41
to
+42
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is no longer true for Hero. It was a hack for Amigo |
||
# Return | ||
return "done" | ||
|
||
|
||
class PeopleInSeatDesignator(ds.Designator): | ||
def __init__(self, robot, seat: Entity, room: Entity = None, name=None): | ||
def __init__(self, robot, seat: Entity, volume: Entity = None, name=None): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
super(PeopleInSeatDesignator, self).__init__(resolve_type=[Entity], name=name) | ||
|
||
self.robot = robot | ||
|
||
ds.check_type(seat, Entity) | ||
if room is not None: | ||
ds.check_type(room, Entity) | ||
if volume is not None: | ||
ds.check_type(volume, Entity) | ||
|
||
self.room = room | ||
self.volume = volume | ||
self.seat = seat | ||
|
||
def _resolve(self) -> Entity: | ||
if self.room: | ||
room = ds.value_or_resolve(self.room) | ||
if not room: | ||
rospy.logwarn("Room is None, ignoring room constraints") | ||
|
||
seat = ds.value_or_resolve(self.seat) | ||
if not seat: | ||
rospy.logwarn("Seat is None, so cannot find seats there") | ||
|
@@ -43,13 +72,17 @@ def _resolve(self) -> Entity: | |
people: List[Entity] = self.robot.ed.get_entities( | ||
etype="person", center_point=seat_point, radius=1.0, ignore_z=True | ||
) | ||
|
||
# TODO ignore people not within the specified room. i.e spectators | ||
people = [person for person in people if person.pose.frame.p] | ||
return people | ||
if self.volume: | ||
volume = ds.value_or_resolve(self.volume) | ||
if not volume: | ||
rospy.logwarn("Volume is None, ignoring volume constraints") | ||
return people | ||
people = [person for person in people if | ||
volume.in_volume(VectorStamped.from_framestamped(person.pose), 'in')] | ||
return people | ||
|
||
def __repr__(self): | ||
return "PersonInSeatDesignator({}, {}, {}, {})".format(self.robot, self.seat, self.room, self.name) | ||
return "PersonInSeatDesignator({}, {}, {}, {})".format(self.robot, self.seat, self.volume, self.name) | ||
|
||
|
||
class _CheckPeople(smach.State): | ||
|
@@ -83,12 +116,13 @@ def execute(self, ud=None): | |
|
||
|
||
class CheckSeatEmpty(smach.StateMachine): | ||
def __init__(self, robot, entity_des, number_of_people_des=None): | ||
def __init__(self, robot, seat_ent_des, volume_des, number_of_people_des=None): | ||
""" | ||
Constructor | ||
|
||
:param robot: robot object | ||
:param entity_des: EdEntityDesignator indicating the (furniture) object to check | ||
:param seat_ent_des: EdEntityDesignator indicating the (furniture) object to check | ||
:param volume_des: EdEntityDesignator indicating the (room) location to constraint the search | ||
:param number_of_people_des: designator which will result in the number of people near the entity, or None in case of state fails. | ||
""" | ||
# TODO implement logic for percent vs volume check in state machine rather than in the states themselves | ||
|
@@ -99,23 +133,26 @@ def __init__(self, robot, entity_des, number_of_people_des=None): | |
else: | ||
number_of_people_des = ds.VariableDesignator(resolve_type=int) | ||
|
||
people_in_seat_des = PeopleInSeatDesignator(robot, entity_des) | ||
people_in_seat_des = PeopleInSeatDesignator(robot, seat_ent_des, volume_des) | ||
|
||
with self: | ||
# TODO add state to turn head towards the entity. | ||
smach.StateMachine.add( | ||
"LOOK_AT_SEAT", | ||
LookAtEntity(robot=robot, entity=seat_ent_des, volume=volume_des), | ||
transitions={"done": "INSPECT_SEAT"} | ||
) | ||
smach.StateMachine.add( | ||
"INSPECT_SEAT", | ||
FindPeople( | ||
robot=robot, | ||
# properties=None, | ||
# query_entity_designator=None, | ||
# found_people_designator=None, | ||
# look_distance=10.0, | ||
properties=None, | ||
query_entity_designator=volume_des, | ||
found_people_designator=people_in_seat_des, | ||
look_distance=1.0, | ||
# speak=False, | ||
# strict=True, | ||
# nearest=False, | ||
attempts=1, | ||
# search_timeout=60, | ||
look_range=(0.0, 0.0), | ||
look_steps=1, | ||
), | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This file is not needed unless you want to test this version and the original side by side. I think it was originally created as a tournament hack. But now you may feel free to make changes to find_empty_seat.py