import math import random import hashlib import gym from enum import IntEnum import numpy as np from gym import error, spaces, utils from gym.utils import seeding from .rendering import * from abc import ABC, abstractmethod import warnings import astar import traceback import warnings from functools import wraps SocialAINPCActionsDict = { "go_forward": 0, "rotate_left": 1, "rotate_right": 2, "toggle_action": 3, "point_stop_point": 4, "point_E": 5, "point_S": 6, "point_W": 7, "point_N": 8, "stop_point": 9, "no_op": 10 } point_dir_encoding = { "point_E": 0, "point_S": 1, "point_W": 2, "point_N": 3, } def get_traceback(): tb = traceback.extract_stack() return "".join(traceback.format_list(tb)[:-1]) # Size in pixels of a tile in the full-scale human view TILE_PIXELS = 32 # Map of color names to RGB values COLORS = { 'red' : np.array([255, 0, 0]), 'green' : np.array([0, 255, 0]), 'blue' : np.array([0, 0, 255]), 'purple': np.array([112, 39, 195]), 'yellow': np.array([255, 255, 0]), 'grey' : np.array([100, 100, 100]), 'brown': np.array([82, 36, 19]) } COLOR_NAMES = sorted(list(COLORS.keys())) # Used to map colors to integers COLOR_TO_IDX = { 'red' : 0, 'green' : 1, 'blue' : 2, 'purple': 3, 'yellow': 4, 'grey' : 5, 'brown' : 6, } IDX_TO_COLOR = dict(zip(COLOR_TO_IDX.values(), COLOR_TO_IDX.keys())) # Map of object type to integers OBJECT_TO_IDX = { 'unseen' : 0, 'empty' : 1, 'wall' : 2, 'floor' : 3, 'door' : 4, 'key' : 5, 'ball' : 6, 'box' : 7, 'goal' : 8, 'lava' : 9, 'agent' : 10, 'npc' : 11, 'switch' : 12, 'lockablebox' : 13, 'apple' : 14, 'applegenerator' : 15, 'generatorplatform': 16, 'marble' : 17, 'marbletee' : 18, 'fence' : 19, 'remotedoor' : 20, 'lever' : 21, } IDX_TO_OBJECT = dict(zip(OBJECT_TO_IDX.values(), OBJECT_TO_IDX.keys())) # Map of state names to integers STATE_TO_IDX = { 'open' : 0, 'closed': 1, 'locked': 2, } # Map of agent direction indices to vectors DIR_TO_VEC = [ # Pointing right (positive X) np.array((1, 0)), # Down (positive Y) np.array((0, 1)), # Pointing left (negative X) np.array((-1, 0)), # Up (negative Y) np.array((0, -1)), ] class WorldObj: """ Base class for grid world objects """ def __init__(self, type, color): assert type in OBJECT_TO_IDX, type assert color in COLOR_TO_IDX, color self.type = type self.color = color self.contains = None # Initial position of the object self.init_pos = None # Current position of the object self.cur_pos = np.array((0, 0)) def can_overlap(self): """Can the agent overlap with this?""" return False def can_push(self): """Can the agent push the object?""" return False def can_pickup(self): """Can the agent pick this up?""" return False def can_contain(self): """Can this contain another object?""" return False def see_behind(self): """Can the agent see behind this object?""" return True def toggle(self, env, pos): """Method to trigger/toggle an action this object performs""" return False def encode(self, nb_dims=3, absolute_coordinates=False): """Encode the a description of this object as a nb_dims-tuple of integers""" if absolute_coordinates: core = (OBJECT_TO_IDX[self.type], *self.cur_pos, COLOR_TO_IDX[self.color]) else: core = (OBJECT_TO_IDX[self.type], COLOR_TO_IDX[self.color]) return core + (0,) * (nb_dims - len(core)) def cache(self, *args, **kwargs): """Used for cached rendering.""" return self.encode(*args, **kwargs) @staticmethod def decode(type_idx, color_idx, state): """Create an object from a 3-tuple state description""" obj_type = IDX_TO_OBJECT[type_idx] color = IDX_TO_COLOR[color_idx] if obj_type == 'empty' or obj_type == 'unseen': return None if obj_type == 'wall': v = Wall(color) elif obj_type == 'floor': v = Floor(color) elif obj_type == 'ball': v = Ball(color) elif obj_type == 'marble': v = Marble(color) elif obj_type == 'apple': eaten = state == 1 v = Apple(color, eaten=eaten) elif obj_type == 'apple_generator': is_pressed = state == 2 v = AppleGenerator(color, is_pressed=is_pressed) elif obj_type == 'key': v = Key(color) elif obj_type == 'box': v = Box(color) elif obj_type == 'lockablebox': is_locked = state == 2 v = LockableBox(color, is_locked=is_locked) elif obj_type == 'door': # State, 0: open, 1: closed, 2: locked is_open = state == 0 is_locked = state == 2 v = Door(color, is_open, is_locked) elif obj_type == 'remotedoor': # State, 0: open, 1: closed is_open = state == 0 v = RemoteDoor(color, is_open) elif obj_type == 'goal': v = Goal() elif obj_type == 'lava': v = Lava() elif obj_type == 'fence': v = Fence() elif obj_type == 'switch': v = Switch(color, is_on=state) elif obj_type == 'lever': v = Lever(color, is_on=state) elif obj_type == 'npc': warnings.warn("NPC's internal state cannot be decoded. Only the icon is shown.") v = NPC(color) v.npc_type=0 else: assert False, "unknown object type in decode '%s'" % obj_type return v def render(self, r): """Draw this object with the given renderer""" raise NotImplementedError class BlockableWorldObj(WorldObj): def __init__(self, type, color, block_set): super(BlockableWorldObj, self).__init__(type, color) self.block_set = block_set self.blocked = False def can_push(self): return True def push(self, *args, **kwargs): return self.block_block_set() def toggle(self, *args, **kwargs): return self.block_block_set() def block_block_set(self): """A function that blocks the block set""" if not self.blocked: if self.block_set is not None: # cprint("BLOCKED!", "red") for e in self.block_set: e.block() return True else: return False def block(self): self.blocked = True class Goal(WorldObj): def __init__(self): super().__init__('goal', 'green') def can_overlap(self): return True def render(self, img): fill_coords(img, point_in_rect(0, 1, 0, 1), COLORS[self.color]) class Floor(WorldObj): """ Colored floor tile the agent can walk over """ def __init__(self, color='blue'): super().__init__('floor', color) def can_overlap(self): return True def render(self, img): # Give the floor a pale color color = COLORS[self.color] / 2 fill_coords(img, point_in_rect(0.031, 1, 0.031, 1), color) class Lava(WorldObj): def __init__(self): super().__init__('lava', 'red') def can_overlap(self): return True def render(self, img): c = (255, 128, 0) # Background color fill_coords(img, point_in_rect(0, 1, 0, 1), c) # Little waves for i in range(3): ylo = 0.3 + 0.2 * i yhi = 0.4 + 0.2 * i fill_coords(img, point_in_line(0.1, ylo, 0.3, yhi, r=0.03), (0,0,0)) fill_coords(img, point_in_line(0.3, yhi, 0.5, ylo, r=0.03), (0,0,0)) fill_coords(img, point_in_line(0.5, ylo, 0.7, yhi, r=0.03), (0,0,0)) fill_coords(img, point_in_line(0.7, yhi, 0.9, ylo, r=0.03), (0,0,0)) class Fence(WorldObj): """ Same as Lava but can't overlap. """ def __init__(self): super().__init__('fence', 'grey') def can_overlap(self): return False def render(self, img): c = COLORS[self.color] # ugly fence fill_coords(img, point_in_rect( 0.1, 0.9, 0.5, 0.9 # (0.15, 0.9), # (0.10, 0.5), # (0.95, 0.9), # (0.90, 0.5), # (0.10, 0.9), # (0.10, 0.5), # (0.95, 0.9), # (0.95, 0.5), ), c) fill_coords(img, point_in_quadrangle( # (0.15, 0.9), # (0.10, 0.5), # (0.95, 0.9), # (0.90, 0.5), (0.10, 0.9), (0.10, 0.5), (0.95, 0.9), (0.95, 0.5), ), c) return # preety fence fill_coords(img, point_in_quadrangle( (0.15, 0.3125), (0.15, 0.4125), (0.85, 0.4875), (0.85, 0.5875), ), c) # h2 fill_coords(img, point_in_quadrangle( (0.15, 0.6125), (0.15, 0.7125), (0.85, 0.7875), (0.85, 0.8875), ), c) # vm fill_coords(img, point_in_quadrangle( (0.45, 0.2875), (0.45, 0.8875), (0.55, 0.3125), (0.55, 0.9125), ), c) fill_coords(img, point_in_triangle( (0.45, 0.2875), (0.55, 0.3125), (0.5, 0.25), ), c) # vl fill_coords(img, point_in_quadrangle( (0.25, 0.2375), (0.25, 0.8375), (0.35, 0.2625), (0.35, 0.8625), ), c) # vl fill_coords(img, point_in_triangle( (0.25, 0.2375), (0.35, 0.2625), (0.3, 0.2), ), c) # vr fill_coords(img, point_in_quadrangle( (0.65, 0.3375), (0.65, 0.9375), (0.75, 0.3625), (0.75, 0.9625), ), c) fill_coords(img, point_in_triangle( (0.65, 0.3375), (0.75, 0.3625), (0.7, 0.3), ), c) class Wall(WorldObj): def __init__(self, color='grey'): super().__init__('wall', color) def see_behind(self): return False def render(self, img): fill_coords(img, point_in_rect(0, 1, 0, 1), COLORS[self.color]) class Lever(BlockableWorldObj): def __init__(self, color, object=None, is_on=False, block_set=None, active_steps=None): super().__init__('lever', color, block_set) self.is_on = is_on self.object = object self.active_steps = active_steps self.countdown = None # countdown timer self.was_activated = False if self.block_set is not None: if self.is_on: raise ValueError("If using a block set, a Switch must be initialized as OFF") def can_overlap(self): """The agent can only walk over this cell when the door is open""" return False def see_behind(self): return True def step(self): if self.countdown is not None: self.countdown = self.countdown - 1 if self.countdown <= 0: # if nothing is on the door, close the door and deactivate timer self.toggle() self.countdown = None def toggle(self, env=None, pos=None): if self.blocked: return False if self.was_activated and not self.is_on: # cannot be activated twice return False self.is_on = not self.is_on if self.is_on: if self.active_steps is not None: # activate countdown to shutdown self.countdown = self.active_steps self.was_activated = True if self.object is not None: # open object self.object.open_close() if self.is_on: self.block_block_set() return True def block(self): self.blocked = True def encode(self, nb_dims=3, absolute_coordinates=False): """Encode the a description of this object as a 3-tuple of integers""" # State, 0: off, 1: on state = 1 if self.is_on else 0 count = self.countdown if self.countdown is not None else 255 if absolute_coordinates: v = (OBJECT_TO_IDX[self.type], *self.cur_pos, COLOR_TO_IDX[self.color], state, count) else: v = (OBJECT_TO_IDX[self.type], COLOR_TO_IDX[self.color], state, count) v += (0,) * (nb_dims-len(v)) return v def render(self, img): c = COLORS[self.color] black = (0, 0, 0) # off_angle = -math.pi/3 off_angle = -math.pi/2 on_angle = -math.pi/8 rotating_lever_shapes = [] rotating_lever_shapes.append((point_in_rect(0.5, 0.9, 0.77, 0.83), c)) rotating_lever_shapes.append((point_in_circle(0.9, 0.8, 0.1), c)) rotating_lever_shapes.append((point_in_circle(0.5, 0.8, 0.08), c)) if self.is_on: if self.countdown is None: angle = on_angle else: angle = (self.countdown/self.active_steps) * (on_angle-off_angle) + off_angle else: angle = off_angle fill_coords(img, point_in_circle_clip(0.5, 0.8, 0.12, theta_end=-math.pi), c) # fill_coords(img, point_in_circle_clip(0.5, 0.8, 0.08, theta_end=-math.pi), black) rotating_lever_shapes = [(rotate_fn(v, cx=0.5, cy=0.8, theta=angle), col) for v, col in rotating_lever_shapes] for v, col in rotating_lever_shapes: fill_coords(img, v, col) fill_coords(img, point_in_rect(0.2, 0.8, 0.78, 0.82), c) fill_coords(img, point_in_circle(0.5, 0.8, 0.03), (0, 0, 0)) class RemoteDoor(BlockableWorldObj): """Door that are unlocked by a lever""" def __init__(self, color, is_open=False, block_set=None): super().__init__('remotedoor', color, block_set) self.is_open = is_open def can_overlap(self): """The agent can only walk over this cell when the door is open""" return self.is_open def see_behind(self): return self.is_open # def toggle(self, env, pos=None): # return False def open_close(self): # If the player has the right key to open the door self.is_open = not self.is_open return True def encode(self, nb_dims=3, absolute_coordinates=False): """Encode the a description of this object as a 3-tuple of integers""" # State, 0: open, 1: closed state = 0 if self.is_open else 1 if absolute_coordinates: v = (OBJECT_TO_IDX[self.type], *self.cur_pos, COLOR_TO_IDX[self.color], state) else: v = (OBJECT_TO_IDX[self.type], COLOR_TO_IDX[self.color], state) v += (0,) * (nb_dims-len(v)) return v def block(self): self.blocked = True def render(self, img): c = COLORS[self.color] if self.is_open: fill_coords(img, point_in_rect(0.88, 1.00, 0.00, 1.00), c) fill_coords(img, point_in_rect(0.92, 0.96, 0.04, 0.96), (0,0,0)) else: fill_coords(img, point_in_rect(0.00, 1.00, 0.00, 1.00), c) fill_coords(img, point_in_rect(0.04, 0.96, 0.04, 0.96), (0,0,0)) fill_coords(img, point_in_rect(0.08, 0.92, 0.08, 0.92), c) fill_coords(img, point_in_rect(0.12, 0.88, 0.12, 0.88), (0,0,0)) # wifi symbol fill_coords(img, point_in_circle_clip(cx=0.5, cy=0.8, r=0.5, theta_start=-np.pi/3, theta_end=-2*np.pi/3), c) fill_coords(img, point_in_circle_clip(cx=0.5, cy=0.8, r=0.45, theta_start=-np.pi/3, theta_end=-2*np.pi/3), (0,0,0)) fill_coords(img, point_in_circle_clip(cx=0.5, cy=0.8, r=0.4, theta_start=-np.pi/3, theta_end=-2*np.pi/3), c) fill_coords(img, point_in_circle_clip(cx=0.5, cy=0.8, r=0.35, theta_start=-np.pi/3, theta_end=-2*np.pi/3), (0,0,0)) fill_coords(img, point_in_circle_clip(cx=0.5, cy=0.8, r=0.3, theta_start=-np.pi/3, theta_end=-2*np.pi/3), c) fill_coords(img, point_in_circle_clip(cx=0.5, cy=0.8, r=0.25, theta_start=-np.pi/3, theta_end=-2*np.pi/3), (0,0,0)) fill_coords(img, point_in_circle_clip(cx=0.5, cy=0.8, r=0.2, theta_start=-np.pi/3, theta_end=-2*np.pi/3), c) fill_coords(img, point_in_circle_clip(cx=0.5, cy=0.8, r=0.15, theta_start=-np.pi/3, theta_end=-2*np.pi/3), (0,0,0)) fill_coords(img, point_in_circle_clip(cx=0.5, cy=0.8, r=0.1, theta_start=-np.pi/3, theta_end=-2*np.pi/3), c) return class Door(BlockableWorldObj): def __init__(self, color, is_open=False, is_locked=False, block_set=None): super().__init__('door', color, block_set) self.is_open = is_open self.is_locked = is_locked def can_overlap(self): """The agent can only walk over this cell when the door is open""" return self.is_open def see_behind(self): return self.is_open def toggle(self, env, pos=None): if self.blocked: return False # If the player has the right key to open the door if self.is_locked: if isinstance(env.carrying, Key) and env.carrying.color == self.color: self.is_locked = False self.is_open = True ret = True ret = False else: self.is_open = not self.is_open ret = True self.block_block_set() return ret def encode(self, nb_dims=3, absolute_coordinates=False): """Encode the a description of this object as a 3-tuple of integers""" # State, 0: open, 1: closed, 2: locked if self.is_open: state = 0 elif self.is_locked: state = 2 elif not self.is_open: state = 1 if absolute_coordinates: v = (OBJECT_TO_IDX[self.type], *self.cur_pos, COLOR_TO_IDX[self.color], state) else: v = (OBJECT_TO_IDX[self.type], COLOR_TO_IDX[self.color], state) v += (0,) * (nb_dims-len(v)) return v def render(self, img): c = COLORS[self.color] if self.is_open: fill_coords(img, point_in_rect(0.88, 1.00, 0.00, 1.00), c) fill_coords(img, point_in_rect(0.92, 0.96, 0.04, 0.96), (0,0,0)) return # Door frame and door if self.is_locked: fill_coords(img, point_in_rect(0.00, 1.00, 0.00, 1.00), c) fill_coords(img, point_in_rect(0.06, 0.94, 0.06, 0.94), 0.45 * np.array(c)) # Draw key slot fill_coords(img, point_in_rect(0.52, 0.75, 0.50, 0.56), c) else: fill_coords(img, point_in_rect(0.00, 1.00, 0.00, 1.00), c) fill_coords(img, point_in_rect(0.04, 0.96, 0.04, 0.96), (0,0,0)) fill_coords(img, point_in_rect(0.08, 0.92, 0.08, 0.92), c) fill_coords(img, point_in_rect(0.12, 0.88, 0.12, 0.88), (0,0,0)) # Draw door handle fill_coords(img, point_in_circle(cx=0.75, cy=0.50, r=0.08), c) class Switch(BlockableWorldObj): def __init__(self, color, lockable_object=None, is_on=False, no_turn_off=True, no_light=True, locker_switch=False, block_set=None): super().__init__('switch', color, block_set) self.is_on = is_on self.lockable_object = lockable_object self.no_turn_off = no_turn_off self.no_light = no_light self.locker_switch = locker_switch if self.block_set is not None: if self.is_on: raise ValueError("If using a block set, a Switch must be initialized as OFF") if not self.no_turn_off: raise ValueError("If using a block set, a Switch must be initialized can't be turned off") def can_overlap(self): """The agent can only walk over this cell when the door is open""" return False def see_behind(self): return True def toggle(self, env, pos=None): if self.blocked: return False if self.is_on: if self.no_turn_off: return False self.is_on = not self.is_on if self.lockable_object is not None: if self.locker_switch: # locker/unlocker switch self.lockable_object.is_locked = not self.lockable_object.is_locked else: # opener switch self.lockable_object.toggle(env, pos) if self.is_on: self.block_block_set() if self.no_turn_off: # assert that obj is toggled only once assert not hasattr(self, "was_toggled") self.was_toggled = True return True def block(self): self.blocked = True def encode(self, nb_dims=3, absolute_coordinates=False): """Encode the a description of this object as a 3-tuple of integers""" # State, 0: off, 1: on state = 1 if self.is_on else 0 if self.no_light: state = 0 if absolute_coordinates: v = (OBJECT_TO_IDX[self.type], *self.cur_pos, COLOR_TO_IDX[self.color], state) else: v = (OBJECT_TO_IDX[self.type], COLOR_TO_IDX[self.color], state) v += (0,) * (nb_dims-len(v)) return v def render(self, img): c = COLORS[self.color] # Door frame and door if self.is_on and not self.no_light: fill_coords(img, point_in_rect(0.00, 1.00, 0.00, 1.00), c) fill_coords(img, point_in_rect(0.04, 0.96, 0.04, 0.96), (0,0,0)) fill_coords(img, point_in_rect(0.08, 0.92, 0.08, 0.92), c) fill_coords(img, point_in_rect(0.12, 0.88, 0.12, 0.88), 0.45 * np.array(c)) else: fill_coords(img, point_in_rect(0.00, 1.00, 0.00, 1.00), c) fill_coords(img, point_in_rect(0.04, 0.96, 0.04, 0.96), (0,0,0)) fill_coords(img, point_in_rect(0.08, 0.92, 0.08, 0.92), c) fill_coords(img, point_in_rect(0.12, 0.88, 0.12, 0.88), (0,0,0)) class Key(WorldObj): def __init__(self, color='blue'): super(Key, self).__init__('key', color) def can_pickup(self): return True def render(self, img): c = COLORS[self.color] # Vertical quad fill_coords(img, point_in_rect(0.50, 0.63, 0.31, 0.88), c) # Teeth fill_coords(img, point_in_rect(0.38, 0.50, 0.59, 0.66), c) fill_coords(img, point_in_rect(0.38, 0.50, 0.81, 0.88), c) # Ring fill_coords(img, point_in_circle(cx=0.56, cy=0.28, r=0.190), c) fill_coords(img, point_in_circle(cx=0.56, cy=0.28, r=0.064), (0,0,0)) class MarbleTee(WorldObj): def __init__(self, color="red"): super(MarbleTee, self).__init__('marbletee', color) def can_pickup(self): return False def can_push(self): return False def render(self, img): c = COLORS[self.color] fill_coords(img, point_in_quadrangle( (0.2, 0.5), (0.8, 0.5), (0.4, 0.6), (0.6, 0.6), ), c) fill_coords(img, point_in_triangle( (0.4, 0.6), (0.6, 0.6), (0.5, 0.9), ), c) class Marble(WorldObj): def __init__(self, color='blue', env=None): super(Marble, self).__init__('marble', color) self.is_tagged = False self.moving_dir = None self.env = env self.was_pushed = False self.tee = MarbleTee(color) self.tee_uncovered = False def can_pickup(self): return True def step(self): if self.moving_dir is not None: prev_pos = self.cur_pos self.go_forward() if any(prev_pos != self.cur_pos) and not self.tee_uncovered: assert self.was_pushed # if Marble was moved for the first time, uncover the Tee # self.env.grid.set(*prev_pos, self.tee) self.env.put_obj_np(self.tee, prev_pos) self.tee_uncovered = True @property def is_moving(self): return self.moving_dir is not None @property def dir_vec(self): """ Get the direction vector for the agent, pointing in the direction of forward movement. """ if self.moving_dir is not None: return DIR_TO_VEC[self.moving_dir] else: return np.array((0, 0)) @property def front_pos(self): """ Get the position of the cell that is right in front of the agent """ return self.cur_pos + self.dir_vec def go_forward(self): # Get the position in front of the agent fwd_pos = self.front_pos # Get the contents of the cell in front of the agent fwd_cell = self.env.grid.get(*fwd_pos) # Don't move if you are going to collide if fwd_pos.tolist() != self.env.agent_pos.tolist() and (fwd_cell is None or fwd_cell.can_overlap()): self.env.grid.set(*self.cur_pos, None) self.env.grid.set(*fwd_pos, self) self.cur_pos = fwd_pos return True # push object if pushable if fwd_pos.tolist() != self.env.agent_pos.tolist() and (fwd_cell is not None and fwd_cell.can_push()): fwd_cell.push(push_dir=self.moving_dir, pusher=self) self.moving_dir = None return True else: self.moving_dir = None return False def can_push(self): return True def push(self, push_dir, pusher=None): if type(push_dir) is not int: raise ValueError("Direction must be of type int and is of type {}".format(type(push_dir))) self.moving_dir = push_dir if self.moving_dir is not None: self.was_pushed = True def render(self, img): color = COLORS[self.color] if self.is_tagged: color = color / 2 fill_coords(img, point_in_circle(0.5, 0.5, 0.20), color) fill_coords(img, point_in_circle(0.55, 0.45, 0.07), (0, 0, 0)) def tag(self,): self.is_tagged = True def encode(self, nb_dims=3, absolute_coordinates=False): """Encode the a description of this object as a nb_dims-tuple of integers""" if absolute_coordinates: core = (OBJECT_TO_IDX[self.type], *self.cur_pos, COLOR_TO_IDX[self.color]) else: core = (OBJECT_TO_IDX[self.type], COLOR_TO_IDX[self.color]) return core + (1 if self.is_tagged else 0,) * (nb_dims - len(core)) class Ball(WorldObj): def __init__(self, color='blue'): super(Ball, self).__init__('ball', color) self.is_tagged = False def can_pickup(self): return True def render(self, img): color = COLORS[self.color] if self.is_tagged: color = color / 2 fill_coords(img, point_in_circle(0.5, 0.5, 0.31), color) def tag(self,): self.is_tagged = True def encode(self, nb_dims=3, absolute_coordinates=False): """Encode the a description of this object as a nb_dims-tuple of integers""" if absolute_coordinates: core = (OBJECT_TO_IDX[self.type], *self.cur_pos, COLOR_TO_IDX[self.color]) else: core = (OBJECT_TO_IDX[self.type], COLOR_TO_IDX[self.color]) return core + (1 if self.is_tagged else 0,) * (nb_dims - len(core)) class Apple(WorldObj): def __init__(self, color='red', eaten=False): super(Apple, self).__init__('apple', color) self.is_tagged = False self.eaten = eaten assert self.color != "yellow" def revert(self, color='red', eaten=False): self.color = color self.is_tagged = False self.eaten = eaten assert self.color != "yellow" def can_pickup(self): return False def render(self, img): color = COLORS[self.color] if self.is_tagged: color = color / 2 fill_coords(img, point_in_circle(0.5, 0.5, 0.31), color) fill_coords(img, point_in_rect(0.1, 0.9, 0.1, 0.55), (0, 0, 0)) fill_coords(img, point_in_circle(0.35, 0.5, 0.15), color) fill_coords(img, point_in_circle(0.65, 0.5, 0.15), color) fill_coords(img, point_in_rect(0.48, 0.52, 0.2, 0.45), COLORS["brown"]) # quadrangle fill_coords(img, point_in_quadrangle( (0.52, 0.25), (0.65, 0.1), (0.75, 0.3), (0.90, 0.15), ), COLORS["green"]) if self.eaten: assert self.color == "yellow" fill_coords(img, point_in_circle(0.74, 0.6, 0.23), (0,0,0)) fill_coords(img, point_in_circle(0.26, 0.6, 0.23), (0,0,0)) def toggle(self, env, pos): if not self.eaten: self.eaten = True assert self.color != "yellow" self.color = "yellow" return True else: assert self.color == "yellow" return False def tag(self,): self.is_tagged = True def encode(self, nb_dims=3, absolute_coordinates=False): """Encode the a description of this object as a nb_dims-tuple of integers""" # eaten <=> yellow assert self.eaten == (self.color == "yellow") if absolute_coordinates: core = (OBJECT_TO_IDX[self.type], *self.cur_pos, COLOR_TO_IDX[self.color]) else: core = (OBJECT_TO_IDX[self.type], COLOR_TO_IDX[self.color]) return core + (1 if self.is_tagged else 0,) * (nb_dims - len(core)) class GeneratorPlatform(WorldObj): def __init__(self, color="red"): super(GeneratorPlatform, self).__init__('generatorplatform', color) def can_pickup(self): return False def can_push(self): return False def render(self, img): c = COLORS[self.color] fill_coords(img, point_in_circle(0.5, 0.5, 0.2), c) fill_coords(img, point_in_circle(0.5, 0.5, 0.18), (0,0,0)) fill_coords(img, point_in_circle(0.5, 0.5, 0.16), c) fill_coords(img, point_in_circle(0.5, 0.5, 0.14), (0,0,0)) class AppleGenerator(BlockableWorldObj): def __init__(self, color="red", is_pressed=False, block_set=None, on_push=None, marble_activation=False): super(AppleGenerator, self).__init__('applegenerator', color, block_set) self.is_pressed = is_pressed self.on_push = on_push self.marble_activation = marble_activation def can_pickup(self): return False def block(self): self.blocked = True def can_push(self): return True def push(self, push_dir=None, pusher=None): if self.marble_activation: # check that it is marble that pushed the generator if type(pusher) != Marble: return self.block_block_set() if not self.blocked: self.is_pressed = True if self.on_push: self.on_push() return self.block_block_set() else: return False def render(self, img): c = COLORS[self.color] if not self.marble_activation: # Outline fill_coords(img, point_in_rect(0.15, 0.85, 0.15, 0.85), c) # fill_coords(img, point_in_rect(0.17, 0.83, 0.17, 0.83), (0, 0, 0)) fill_coords(img, point_in_rect(0.16, 0.84, 0.16, 0.84), (0, 0, 0)) # Outline fill_coords(img, point_in_rect(0.22, 0.78, 0.22, 0.78), c) fill_coords(img, point_in_rect(0.24, 0.76, 0.24, 0.76), (0, 0, 0)) else: # Outline fill_coords(img, point_in_circle(0.5, 0.5, 0.37), c) fill_coords(img, point_in_circle(0.5, 0.5, 0.35), (0, 0, 0)) # Outline fill_coords(img, point_in_circle(0.5, 0.5, 0.32), c) fill_coords(img, point_in_circle(0.5, 0.5, 0.30), (0, 0, 0)) # Apple inside fill_coords(img, point_in_circle(0.5, 0.5, 0.2), COLORS["red"]) # fill_coords(img, point_in_rect(0.18, 0.82, 0.18, 0.55), (0, 0, 0)) fill_coords(img, point_in_rect(0.30, 0.65, 0.30, 0.55), (0, 0, 0)) fill_coords(img, point_in_circle(0.42, 0.5, 0.12), COLORS["red"]) fill_coords(img, point_in_circle(0.58, 0.5, 0.12), COLORS["red"]) # peteljka fill_coords(img, point_in_rect(0.49, 0.50, 0.25, 0.45), COLORS["brown"]) # leaf fill_coords(img, point_in_quadrangle( (0.52, 0.32), (0.60, 0.21), (0.70, 0.34), (0.80, 0.23), ), COLORS["green"]) def encode(self, nb_dims=3, absolute_coordinates=False): """Encode the a description of this object as a 3-tuple of integers""" type = 2 if self.marble_activation else 1 if absolute_coordinates: v = (OBJECT_TO_IDX[self.type], *self.cur_pos, COLOR_TO_IDX[self.color], type) else: v = (OBJECT_TO_IDX[self.type], COLOR_TO_IDX[self.color], type) v += (0,) * (nb_dims - len(v)) return v class Box(WorldObj): def __init__(self, color="red", contains=None): super(Box, self).__init__('box', color) self.contains = contains def can_pickup(self): return True def render(self, img): c = COLORS[self.color] # Outline fill_coords(img, point_in_rect(0.12, 0.88, 0.12, 0.88), c) fill_coords(img, point_in_rect(0.18, 0.82, 0.18, 0.82), (0,0,0)) # Horizontal slit fill_coords(img, point_in_rect(0.16, 0.84, 0.47, 0.53), c) def toggle(self, env, pos): # Replace the box by its contents env.grid.set(*pos, self.contains) return True class LockableBox(BlockableWorldObj): def __init__(self, color="red", is_locked=False, contains=None, block_set=None): super(LockableBox, self).__init__('lockablebox', color, block_set) self.contains = contains self.is_locked = is_locked self.is_open = False def can_pickup(self): return True def encode(self, nb_dims=3, absolute_coordinates=False): """Encode the a description of this object as a 3-tuple of integers""" # State, 0: open, 1: closed, 2: locked # 2 and 1 to be consistent with doors if self.is_locked: state = 2 else: state = 1 if absolute_coordinates: v = (OBJECT_TO_IDX[self.type], *self.cur_pos, COLOR_TO_IDX[self.color], state) else: v = (OBJECT_TO_IDX[self.type], COLOR_TO_IDX[self.color], state) v += (0,) * (nb_dims - len(v)) return v def render(self, img): c = COLORS[self.color] # Outline fill_coords(img, point_in_rect(0.12, 0.88, 0.12, 0.88), c) if self.is_locked: fill_coords(img, point_in_rect(0.18, 0.82, 0.18, 0.82), 0.45 * np.array(c)) else: fill_coords(img, point_in_rect(0.18, 0.82, 0.18, 0.82), (0, 0, 0)) # Horizontal slit fill_coords(img, point_in_rect(0.16, 0.84, 0.47, 0.53), c) def toggle(self, env, pos): if self.blocked: return False if self.is_locked: if isinstance(env.carrying, Key) and env.carrying.color == self.color: self.is_locked = False self.is_open = True return True return False self.is_open = True # Replace the box by its contents env.grid.set(*pos, self.contains) self.block_block_set() # assert that obj is toggled only once assert not hasattr(self, "was_toggled") self.was_toggled = True return True def block(self): self.blocked = True class NPC(ABC, WorldObj): def __init__(self, color, view_size=7): super().__init__('npc', color) self.point_dir = 255 # initially no point self.introduction_statement = "Help please " self.list_of_possible_utterances = NPC.get_list_of_possible_utterances() self.view_size = view_size self.carrying = False self.prim_actions_dict = SocialAINPCActionsDict self.reset_last_action() @staticmethod def get_list_of_possible_utterances(): return ["no_op"] def _npc_action(func): """ Decorator that logs the last action """ @wraps(func) def func_wrapper(self, *args, **kwargs): if self.env.add_npc_last_prim_action: self.last_action = func.__name__ return func(self, *args, **kwargs) return func_wrapper def reset_last_action(self): self.last_action = "no_op" def step(self): self.reset_last_action() if self.env.hidden_npc: info = { "prim_action": "no_op", "utterance": "no_op", "was_introduced_to": self.was_introduced_to } return None, info else: return None, None def handle_introduction(self, utterance): reply, action = None, None # introduction and language if self.env.parameters.get("Pragmatic_frame_complexity", "No") == "No": # only once if not self.was_introduced_to: self.was_introduced_to = True elif self.env.parameters["Pragmatic_frame_complexity"] == "Eye_contact": # only first time at eye contact if self.is_eye_contact() and not self.was_introduced_to: self.was_introduced_to = True # if not self.was_introduced_to: # rotate to see the agent # action = self.look_at_action(self.env.agent_pos) elif self.env.parameters["Pragmatic_frame_complexity"] == "Ask": # every time asked if utterance == self.introduction_statement: self.was_introduced_to = True elif self.env.parameters["Pragmatic_frame_complexity"] == "Ask_Eye_contact": # only first time at eye contact with the introduction statement if (self.is_eye_contact() and utterance == self.introduction_statement) and not self.was_introduced_to: self.was_introduced_to = True # if not self.was_introduced_to: # # rotate to see the agent # action = self.look_at_action(self.env.agent_pos) else: raise NotImplementedError() return reply, action def look_at_action(self, target_pos): # rotate to see the target_pos wanted_dir = self.compute_wanted_dir(target_pos) action = self.compute_turn_action(wanted_dir) return action @_npc_action def rotate_left(self): self.npc_dir -= 1 if self.npc_dir < 0: self.npc_dir += 4 return True @_npc_action def rotate_right(self): self.npc_dir = (self.npc_dir + 1) % 4 return True def path_to_toggle_pos(self, goal_pos): """ Return the next action from the path to toggling an object at goal_pos """ if type(goal_pos) != np.ndarray or goal_pos.shape != (2,): raise ValueError(f"goal_pos must be a np.ndarray of shape (2,) and is {goal_pos}") assert type(self.front_pos) == np.ndarray and self.front_pos.shape == (2,) if all(self.front_pos == goal_pos): # in front of door return self.toggle_action else: return self.path_to_pos(goal_pos) def turn_to_see_agent(self): wanted_dir = self.compute_wanted_dir(self.env.agent_pos) action = self.compute_turn_action(wanted_dir) return action def relative_coords(self, x, y): """ Check if a grid position belongs to the npc's field of view, and returns the corresponding coordinates """ vx, vy = self.get_view_coords(x, y) if vx < 0 or vy < 0 or vx >= self.view_size or vy >= self.view_size: return None return vx, vy def get_view_coords(self, i, j): """ Translate and rotate absolute grid coordinates (i, j) into the npc's partially observable view (sub-grid). Note that the resulting coordinates may be negative or outside of the npc's view size. """ ax, ay = self.cur_pos dx, dy = self.dir_vec rx, ry = self.right_vec # Compute the absolute coordinates of the top-left view corner sz = self.view_size hs = self.view_size // 2 tx = ax + (dx * (sz-1)) - (rx * hs) ty = ay + (dy * (sz-1)) - (ry * hs) lx = i - tx ly = j - ty # Project the coordinates of the object relative to the top-left # corner onto the agent's own coordinate system vx = (rx*lx + ry*ly) vy = -(dx*lx + dy*ly) return vx, vy def is_pointing(self): return self.point_dir != 255 def path_to_pos(self, goal_pos): """ Return the next action from the path to goal_pos """ if type(goal_pos) != np.ndarray or goal_pos.shape != (2,): raise ValueError(f"goal_pos must be a np.ndarray of shape (2,) and is {goal_pos}") def neighbors(n): n_nd = np.array(n) adjacent_positions = [ n_nd + np.array([ 0, 1]), n_nd + np.array([ 0,-1]), n_nd + np.array([ 1, 0]), n_nd + np.array([-1, 0]), ] adjacent_cells = map(lambda pos: self.env.grid.get(*pos), adjacent_positions) # keep the positions that don't have anything on or can_overlap neighbors = [ tuple(pos) for pos, cell in zip(adjacent_positions, adjacent_cells) if ( all(pos == goal_pos) or cell is None or cell.can_overlap() ) and not all(pos == self.env.agent_pos) ] for n1 in neighbors: yield n1 def distance(n1, n2): return 1 def cost(n, goal): # manhattan return int(np.abs(np.array(n) - np.array(goal)).sum()) # def is_goal_reached(n, goal): # return all(n == goal) path = astar.find_path( # tuples because np.ndarray is not hashable tuple(self.cur_pos), tuple(goal_pos), neighbors_fnct=neighbors, heuristic_cost_estimate_fnct=cost, distance_between_fnct=distance, # is_goal_reached_fnct=is_goal_reached ) if path is None: # no possible path return None path = list(path) assert all(path[0] == self.cur_pos) next_step = path[1] wanted_dir = self.compute_wanted_dir(next_step) if self.npc_dir == wanted_dir: return self.go_forward else: return self.compute_turn_action(wanted_dir) def gen_obs_grid(self): """ Generate the sub-grid observed by the npc. This method also outputs a visibility mask telling us which grid cells the npc can actually see. """ view_size = self.view_size topX, topY, botX, botY = self.env.get_view_exts(dir=self.npc_dir, view_size=view_size, pos=self.cur_pos) grid = self.env.grid.slice(topX, topY, view_size, view_size) for i in range(self.npc_dir + 1): grid = grid.rotate_left() # Process ocluders and visibility # Note that this incurs some performance cost if not self.env.see_through_walls: vis_mask = grid.process_vis(agent_pos=(view_size // 2, view_size - 1)) else: vis_mask = np.ones(shape=(grid.width, grid.height), dtype=np.bool) # Make it so the npc sees what it's carrying # We do this by placing the carried object at the agent's position # in the agent's partially observable view npc_pos = grid.width // 2, grid.height - 1 if self.carrying: grid.set(*npc_pos, self.carrying) else: grid.set(*npc_pos, None) return grid, vis_mask def is_near_agent(self): ax, ay = self.env.agent_pos wx, wy = self.cur_pos if (ax == wx and abs(ay - wy) == 1) or (ay == wy and abs(ax - wx) == 1): return True return False def is_eye_contact(self): """ Returns true if the agent and the NPC are looking at each other """ if self.cur_pos[1] == self.env.agent_pos[1]: # same y if self.cur_pos[0] > self.env.agent_pos[0]: return self.npc_dir == 2 and self.env.agent_dir == 0 else: return self.npc_dir == 0 and self.env.agent_dir == 2 if self.cur_pos[0] == self.env.agent_pos[0]: # same x if self.cur_pos[1] > self.env.agent_pos[1]: return self.npc_dir == 3 and self.env.agent_dir == 1 else: return self.npc_dir == 1 and self.env.agent_dir == 3 return False def compute_wanted_dir(self, target_pos): """ Computes the direction in which the NPC should look to see target pos """ distance_vec = target_pos - self.cur_pos angle = np.degrees(np.arctan2(*distance_vec)) if angle < 0: angle += 360 if angle < 45: wanted_dir = 1 # S elif angle < 135: wanted_dir = 0 # E elif angle < 225: wanted_dir = 3 # N elif angle < 315: wanted_dir = 2 # W elif angle < 360: wanted_dir = 1 # S return wanted_dir def compute_wanted_point_dir(self, target_pos): point_dir = self.compute_wanted_dir(target_pos) return point_dir # dir = 0 # E # dir = 1 # S # dir = 2 # W # dir = 3 # N # dir = 255 # no point @_npc_action def stop_point(self): self.point_dir = 255 return True @_npc_action def point_E(self): self.point_dir = point_dir_encoding["point_E"] return True @_npc_action def point_S(self): self.point_dir = point_dir_encoding["point_S"] return True @_npc_action def point_W(self): self.point_dir = point_dir_encoding["point_W"] return True @_npc_action def point_N(self): self.point_dir = point_dir_encoding["point_N"] return True def compute_wanted_point_action(self, target_pos): point_dir = self.compute_wanted_dir(target_pos) if point_dir == point_dir_encoding["point_E"]: return self.point_E elif point_dir == point_dir_encoding["point_S"]: return self.point_S elif point_dir == point_dir_encoding["point_W"]: return self.point_W elif point_dir == point_dir_encoding["point_N"]: return self.point_N else: raise ValueError("Unknown direction {}".format(point_dir)) def compute_turn_action(self, wanted_dir): """ Return the action turning for in the direction of wanted_dir """ if self.npc_dir == wanted_dir: # return lambda *args: None return None if (wanted_dir - self.npc_dir) == 1 or (wanted_dir == 0 and self.npc_dir == 3): return self.rotate_right if (wanted_dir - self.npc_dir) == - 1 or (wanted_dir == 3 and self.npc_dir == 0): return self.rotate_left else: return self.env._rand_elem([self.rotate_left, self.rotate_right]) @_npc_action def go_forward(self): # Get the position in front of the agent fwd_pos = self.front_pos # Get the contents of the cell in front of the agent fwd_cell = self.env.grid.get(*fwd_pos) # Don't move if you are going to collide if fwd_pos.tolist() != self.env.agent_pos.tolist() and (fwd_cell is None or fwd_cell.can_overlap()): self.env.grid.set(*self.cur_pos, None) self.env.grid.set(*fwd_pos, self) self.cur_pos = fwd_pos return True # push object if pushable if fwd_pos.tolist() != self.env.agent_pos.tolist() and (fwd_cell is not None and fwd_cell.can_push()): fwd_cell.push(push_dir=self.npc_dir, pusher=self) else: return False @_npc_action def toggle_action(self): fwd_pos = self.front_pos fwd_cell = self.env.grid.get(*fwd_pos) if fwd_cell: return fwd_cell.toggle(self.env, fwd_pos) return False @property def dir_vec(self): """ Get the direction vector for the agent, pointing in the direction of forward movement. """ assert self.npc_dir >= 0 and self.npc_dir < 4 return DIR_TO_VEC[self.npc_dir] @property def right_vec(self): """ Get the vector pointing to the right of the agent. """ dx, dy = self.dir_vec return np.array((-dy, dx)) @property def front_pos(self): """ Get the position of the cell that is right in front of the agent """ return self.cur_pos + self.dir_vec @property def back_pos(self): """ Get the position of the cell that is right in front of the agent """ return self.cur_pos - self.dir_vec @property def right_pos(self): """ Get the position of the cell that is right in front of the agent """ return self.cur_pos + self.right_vec @property def left_pos(self): """ Get the position of the cell that is right in front of the agent """ return self.cur_pos - self.right_vec def draw_npc_face(self, c): assert self.npc_type == 0 assert all(COLORS[self.color] == c) shapes = [] shapes_colors = [] # Draw eyes shapes.append(point_in_circle(cx=0.70, cy=0.50, r=0.10)) shapes_colors.append(c) shapes.append(point_in_circle(cx=0.30, cy=0.50, r=0.10)) shapes_colors.append(c) # Draw mouth shapes.append(point_in_rect(0.20, 0.80, 0.72, 0.81)) shapes_colors.append(c) # Draw bottom hat shapes.append(point_in_triangle((0.15, 0.28), (0.85, 0.28), (0.50, 0.05))) shapes_colors.append(c) # Draw top hat shapes.append(point_in_rect(0.30, 0.70, 0.05, 0.28)) shapes_colors.append(c) return shapes, shapes_colors def render(self, img): c = COLORS[self.color] npc_shapes = [] npc_shapes_colors = [] npc_face_shapes, npc_face_shapes_colors = self.draw_npc_face(c=c) npc_shapes.extend(npc_face_shapes) npc_shapes_colors.extend(npc_face_shapes_colors) if hasattr(self, "npc_dir"): # Pre-rotation to ensure npc_dir = 1 means NPC looks downwards npc_shapes = [rotate_fn(v, cx=0.5, cy=0.5, theta=-1*(math.pi / 2)) for v in npc_shapes] # Rotate npc based on its direction npc_shapes = [rotate_fn(v, cx=0.5, cy=0.5, theta=(math.pi/2) * self.npc_dir) for v in npc_shapes] if hasattr(self, "point_dir"): if self.is_pointing(): # default points east finger = point_in_triangle((0.85, 0.1), (0.85, 0.3), (0.99, 0.2)) finger = rotate_fn(finger, cx=0.5, cy=0.5, theta=(math.pi/2) * self.point_dir) npc_shapes.append(finger) npc_shapes_colors.append(c) if self.last_action == self.toggle_action.__name__: # T symbol t_symbol = [point_in_rect(0.8, 0.84, 0.02, 0.18), point_in_rect(0.8, 0.95, 0.08, 0.12)] t_symbol = [rotate_fn(v, cx=0.5, cy=0.5, theta=(math.pi/2) * self.npc_dir) for v in t_symbol] npc_shapes.extend(t_symbol) npc_shapes_colors.extend([c, c]) elif self.last_action == self.go_forward.__name__: # symbol for Forward (ommited for speed) pass if self.env.hidden_npc: # crossed eye symbol dx, dy = 0.15, -0.2 # draw eye npc_shapes.append(point_in_circle(cx=0.70+dx, cy=0.48+dy, r=0.11)) npc_shapes_colors.append((128,128,128)) npc_shapes.append(point_in_circle(cx=0.30+dx, cy=0.52+dy, r=0.11)) npc_shapes_colors.append((128,128,128)) npc_shapes.append(point_in_circle(0.5+dx, 0.5+dy, 0.25)) npc_shapes_colors.append((128, 128, 128)) npc_shapes.append(point_in_circle(0.5+dx, 0.5+dy, 0.20)) npc_shapes_colors.append((0, 0, 0)) npc_shapes.append(point_in_circle(0.5+dx, 0.5+dy, 0.1)) npc_shapes_colors.append((128, 128, 128)) # cross it npc_shapes.append(point_in_line(0.2+dx, 0.7+dy, 0.8+dx, 0.3+dy, 0.04)) npc_shapes_colors.append((128, 128, 128)) # Draw shapes for v, c in zip(npc_shapes, npc_shapes_colors): fill_coords(img, v, c) def cache(self, *args, **kwargs): """Used for cached rendering.""" # adding npc_dir and point_dir because, when egocentric coordinates are used, # they can result in the same encoding but require new rendering return self.encode(*args, **kwargs) + (self.npc_dir, self.point_dir,) def can_overlap(self): # If the NPC is hidden, agent can overlap on it return self.env.hidden_npc def encode(self, nb_dims=3, absolute_coordinates=False): if not hasattr(self, "npc_type"): raise ValueError("An NPC class must implement the npc_type (int)") if not hasattr(self, "env"): raise ValueError("An NPC class must have the env") assert nb_dims == 6+2*bool(absolute_coordinates) if self.env.hidden_npc: return (1,) + (0,) * (nb_dims-1) assert self.env.egocentric_observation == (not absolute_coordinates) if absolute_coordinates: v = (OBJECT_TO_IDX[self.type], *self.cur_pos, COLOR_TO_IDX[self.color], self.npc_type) else: v = (OBJECT_TO_IDX[self.type], COLOR_TO_IDX[self.color], self.npc_type) if self.env.add_npc_direction: assert hasattr(self, "npc_dir"), "4D but there is no npc dir in NPC state" assert self.npc_dir >= 0 if self.env.egocentric_observation: assert self.env.agent_dir >= 0 # 0 - eye contact; 2 - gaze in same direction; 1 - to left; 3 - to right npc_dir_enc = (self.npc_dir - self.env.agent_dir + 2) % 4 v += (npc_dir_enc,) else: v += (self.npc_dir,) if self.env.add_npc_point_direction: assert hasattr(self, "point_dir"), "5D but there is no npc point dir in NPC state" if self.point_dir == 255: # no pointing v += (self.point_dir,) elif 0 <= self.point_dir <= 3: # pointing if self.env.egocentric_observation: assert self.env.agent_dir >= 0 # 0 - pointing at agent; 2 - point in direction of agent gaze; 1 - to left; 3 - to right point_enc = (self.point_dir - self.env.agent_dir + 2) % 4 v += (point_enc,) else: v += (self.point_dir,) else: raise ValueError(f"Undefined point direction {self.point_dir}") if self.env.add_npc_last_prim_action: assert hasattr(self, "last_action"), "6D but there is no last action in NPC state" if self.last_action in ["point_E", "point_S", "point_W", "point_N"] and self.env.egocentric_observation: # get the direction of the last point last_action_point_dir = point_dir_encoding[self.last_action] # convert to relative dir # 0 - pointing at agent; 2 - point in direction of agent gaze; 1 - to left; 3 - to right last_action_relative_point_dir = (last_action_point_dir - self.env.agent_dir + 2) % 4 # the point_X action ids are in range [point_E, ... , point_N] # id of point_E is the starting one, we use the same range [E, S, W ,N ] -> [at, left, same, right] last_action_id = self.prim_actions_dict["point_E"] + last_action_relative_point_dir else: last_action_id = self.prim_actions_dict[self.last_action] v += (last_action_id,) assert self.point_dir >= 0 assert len(v) == nb_dims return v class Grid: """ Represent a grid and operations on it """ # Static cache of pre-renderer tiles tile_cache = {} def __init__(self, width, height, nb_obj_dims): assert width >= 3 assert height >= 3 self.width = width self.height = height self.nb_obj_dims = nb_obj_dims self.grid = [None] * width * height def __contains__(self, key): if isinstance(key, WorldObj): for e in self.grid: if e is key: return True elif isinstance(key, tuple): for e in self.grid: if e is None: continue if (e.color, e.type) == key: return True if key[0] is None and key[1] == e.type: return True return False def __eq__(self, other): grid1 = self.encode() grid2 = other.encode() return np.array_equal(grid2, grid1) def __ne__(self, other): return not self == other def copy(self): from copy import deepcopy return deepcopy(self) def set(self, i, j, v): assert i >= 0 and i < self.width assert j >= 0 and j < self.height self.grid[j * self.width + i] = v def get(self, i, j): assert i >= 0 and i < self.width assert j >= 0 and j < self.height return self.grid[j * self.width + i] def horz_wall(self, x, y, length=None, obj_type=Wall): if length is None: length = self.width - x for i in range(0, length): o = obj_type() o.cur_pos = np.array((x+i, y)) self.set(x + i, y, o) def vert_wall(self, x, y, length=None, obj_type=Wall): if length is None: length = self.height - y for j in range(0, length): o = obj_type() o.cur_pos = np.array((x, y+j)) self.set(x, y + j, o) def wall_rect(self, x, y, w, h): self.horz_wall(x, y, w) self.horz_wall(x, y+h-1, w) self.vert_wall(x, y, h) self.vert_wall(x+w-1, y, h) def rotate_left(self): """ Rotate the grid to the left (counter-clockwise) """ grid = Grid(self.height, self.width, self.nb_obj_dims) for i in range(self.width): for j in range(self.height): v = self.get(i, j) grid.set(j, grid.height - 1 - i, v) return grid def slice(self, topX, topY, width, height): """ Get a subset of the grid """ grid = Grid(width, height, self.nb_obj_dims) for j in range(0, height): for i in range(0, width): x = topX + i y = topY + j if x >= 0 and x < self.width and \ y >= 0 and y < self.height: v = self.get(x, y) else: v = Wall() grid.set(i, j, v) return grid @classmethod def render_tile( cls, obj, agent_dir=None, highlight=False, tile_size=TILE_PIXELS, subdivs=3, nb_obj_dims=3, mask_unobserved=False ): """ Render a tile and cache the result """ # Hash map lookup key for the cache key = (agent_dir, highlight, tile_size, mask_unobserved) # key = obj.encode(nb_dims=nb_obj_dims) + key if obj else key key = obj.cache(nb_dims=nb_obj_dims) + key if obj else key if key in cls.tile_cache: return cls.tile_cache[key] img = np.zeros(shape=(tile_size * subdivs, tile_size * subdivs, 3), dtype=np.uint8) # 3D for rendering # Draw the grid lines (top and left edges) fill_coords(img, point_in_rect(0, 0.031, 0, 1), (100, 100, 100)) fill_coords(img, point_in_rect(0, 1, 0, 0.031), (100, 100, 100)) if obj != None: obj.render(img) # Overlay the agent on top if agent_dir is not None: tri_fn = point_in_triangle( (0.12, 0.19), (0.87, 0.50), (0.12, 0.81), ) # Rotate the agent based on its direction tri_fn = rotate_fn(tri_fn, cx=0.5, cy=0.5, theta=0.5*math.pi*agent_dir) fill_coords(img, tri_fn, (255, 0, 0)) # Highlight the cell if needed if highlight: highlight_img(img) elif mask_unobserved: # mask unobserved and not highlighted -> unobserved by the agent img *= 0 # Downsample the image to perform supersampling/anti-aliasing img = downsample(img, subdivs) # Cache the rendered tile cls.tile_cache[key] = img return img def render( self, tile_size, agent_pos=None, agent_dir=None, highlight_mask=None, mask_unobserved=False, ): """ Render this grid at a given scale :param r: target renderer object :param tile_size: tile size in pixels """ if highlight_mask is None: highlight_mask = np.zeros(shape=(self.width, self.height), dtype=np.bool) # Compute the total grid size width_px = self.width * tile_size height_px = self.height * tile_size img = np.zeros(shape=(height_px, width_px, 3), dtype=np.uint8) # Render the grid for j in range(0, self.height): for i in range(0, self.width): cell = self.get(i, j) agent_here = np.array_equal(agent_pos, (i, j)) tile_img = Grid.render_tile( cell, agent_dir=agent_dir if agent_here else None, highlight=highlight_mask[i, j], tile_size=tile_size, nb_obj_dims=self.nb_obj_dims, mask_unobserved=mask_unobserved ) ymin = j * tile_size ymax = (j+1) * tile_size xmin = i * tile_size xmax = (i+1) * tile_size img[ymin:ymax, xmin:xmax, :] = tile_img return img def encode(self, vis_mask=None, absolute_coordinates=False): """ Produce a compact numpy encoding of the grid """ if vis_mask is None: vis_mask = np.ones((self.width, self.height), dtype=bool) array = np.zeros((self.width, self.height, self.nb_obj_dims), dtype='uint8') for i in range(self.width): for j in range(self.height): if vis_mask[i, j]: v = self.get(i, j) if v is None: array[i, j, 0] = OBJECT_TO_IDX['empty'] array[i, j, 1:] = 0 else: array[i, j, :] = v.encode(nb_dims=self.nb_obj_dims, absolute_coordinates=absolute_coordinates) return array @staticmethod def decode(array): """ Decode an array grid encoding back into a grid """ width, height, channels = array.shape assert channels in [5, 4, 3] vis_mask = np.ones(shape=(width, height), dtype=np.bool) grid = Grid(width, height, nb_obj_dims=channels) for i in range(width): for j in range(height): if len(array[i, j]) == 3: type_idx, color_idx, state = array[i, j] else: type_idx, color_idx, state, orient = array[i, j] v = WorldObj.decode(type_idx, color_idx, state) grid.set(i, j, v) vis_mask[i, j] = (type_idx != OBJECT_TO_IDX['unseen']) return grid, vis_mask def process_vis(grid, agent_pos): # mask = np.zeros(shape=(grid.width, grid.height), dtype=np.bool) # # mask[agent_pos[0], agent_pos[1]] = True # # for j in reversed(range(0, grid.height)): # for i in range(0, grid.width-1): # if not mask[i, j]: # continue # # cell = grid.get(i, j) # if cell and not cell.see_behind(): # continue # # mask[i+1, j] = True # if j > 0: # mask[i+1, j-1] = True # mask[i, j-1] = True # # for i in reversed(range(1, grid.width)): # if not mask[i, j]: # continue # # cell = grid.get(i, j) # if cell and not cell.see_behind(): # continue # # mask[i-1, j] = True # if j > 0: # mask[i-1, j-1] = True # mask[i, j-1] = True mask = np.ones(shape=(grid.width, grid.height), dtype=np.bool) # handle frontal occlusions # 45 deg for j in reversed(range(0, agent_pos[1]+1)): dy = abs(agent_pos[1] - j) # in front of the agent i = agent_pos[0] cell = grid.get(i, j) if (cell and not cell.see_behind()) or mask[i, j] == False: if j < agent_pos[1] and j > 0: # 45 deg mask[i-1,j-1] = False mask[i,j-1] = False mask[i+1,j-1] = False # agent -> to the left for i in reversed(range(1, agent_pos[0])): dx = abs(agent_pos[0] - i) cell = grid.get(i, j) if (cell and not cell.see_behind()) or mask[i,j] == False: # angle if dx >= dy: mask[i - 1, j] = False if j > 0: mask[i - 1, j - 1] = False if dy >= dx: mask[i, j - 1] = False # agent -> to the right for i in range(agent_pos[0]+1, grid.width-1): dx = abs(agent_pos[0] - i) cell = grid.get(i, j) if (cell and not cell.see_behind()) or mask[i,j] == False: # angle if dx >= dy: mask[i + 1, j] = False if j > 0: mask[i + 1, j - 1] = False if dy >= dx: mask[i, j - 1] = False # for i in range(0, grid.width): # cell = grid.get(i, j) # if (cell and not cell.see_behind()) or mask[i,j] == False: # mask[i, j-1] = False # grid # for j in reversed(range(0, agent_pos[1]+1)): # # i = agent_pos[0] # cell = grid.get(i, j) # if (cell and not cell.see_behind()) or mask[i, j] == False: # if j < agent_pos[1]: # # grid # mask[i,j-1] = False # # for i in reversed(range(1, agent_pos[0])): # # agent -> to the left # cell = grid.get(i, j) # if (cell and not cell.see_behind()) or mask[i,j] == False: # # grid # mask[i-1, j] = False # if j < agent_pos[1] and j > 0: # mask[i, j-1] = False # # for i in range(agent_pos[0]+1, grid.width-1): # # agent -> to the right # cell = grid.get(i, j) # if (cell and not cell.see_behind()) or mask[i,j] == False: # # grid # mask[i+1, j] = False # if j < agent_pos[1] and j > 0: # mask[i, j-1] = False for j in range(0, grid.height): for i in range(0, grid.width): if not mask[i, j]: grid.set(i, j, None) return mask class MiniGridEnv(gym.Env): """ 2D grid world game environment """ metadata = { 'render.modes': ['human', 'rgb_array'], 'video.frames_per_second' : 10 } # Enumeration of possible actions class Actions(IntEnum): # Turn left, turn right, move forward left = 0 right = 1 forward = 2 # Pick up an object pickup = 3 # Drop an object drop = 4 # Toggle/activate an object toggle = 5 # Done completing task done = 6 def __init__( self, grid_size=None, width=None, height=None, max_steps=100, see_through_walls=False, full_obs=False, seed=None, agent_view_size=7, actions=None, action_space=None, add_npc_direction=False, add_npc_point_direction=False, add_npc_last_prim_action=False, reward_diminish_factor=0.9, egocentric_observation=True, ): # sanity check params for SocialAI experiments if "SocialAI" in type(self).__name__: assert egocentric_observation assert grid_size == 10 assert not see_through_walls assert max_steps == 80 assert agent_view_size == 7 assert not full_obs assert add_npc_direction and add_npc_point_direction and add_npc_last_prim_action self.egocentric_observation = egocentric_observation if hasattr(self, "lever_active_steps"): assert self.lever_active_steps == 10 # Can't set both grid_size and width/height if grid_size: assert width == None and height == None width = grid_size height = grid_size # Action enumeration for this environment if actions: self.actions = actions else: self.actions = MiniGridEnv.Actions # Actions are discrete integer values if action_space: self.action_space = action_space else: self.action_space = spaces.MultiDiscrete([len(self.actions)]) # Number of cells (width and height) in the agent view assert agent_view_size % 2 == 1 assert agent_view_size >= 3 self.agent_view_size = agent_view_size # Number of object dimensions (i.e. number of channels in symbolic image) self.add_npc_direction = add_npc_direction self.add_npc_point_direction = add_npc_point_direction self.add_npc_last_prim_action = add_npc_last_prim_action self.nb_obj_dims = 3 + 2*bool(not self.egocentric_observation) + int(self.add_npc_direction) + int(self.add_npc_point_direction) + int(self.add_npc_last_prim_action) # Observations are dictionaries containing an # encoding of the grid and a textual 'mission' string self.observation_space = spaces.Box( low=0, high=255, shape=(self.agent_view_size, self.agent_view_size, self.nb_obj_dims), dtype='uint8' ) self.observation_space = spaces.Dict({ 'image': self.observation_space }) # Range of possible rewards self.reward_range = (0, 1) # Window to use for human rendering mode self.window = None # Environment configuration self.width = width self.height = height self.max_steps = max_steps self.see_through_walls = see_through_walls self.full_obs = full_obs self.reward_diminish_factor = reward_diminish_factor # Current position and direction of the agent self.agent_pos = None self.agent_dir = None # Initialize the RNG self.seed(seed=seed) # Initialize the state self.reset() def reset(self): # Current position and direction of the agent self.agent_pos = None self.agent_dir = None # Generate a new random grid at the start of each episode # To keep the same grid for each episode, call env.seed() with # the same seed before calling env.reset() self._gen_grid(self.width, self.height) # These fields should be defined by _gen_grid assert self.agent_pos is not None assert self.agent_dir is not None # Check that the agent doesn't overlap with an object start_cell = self.grid.get(*self.agent_pos) assert start_cell is None or start_cell.can_overlap() # Item picked up, being carried, initially nothing self.carrying = None # Step count since episode start self.step_count = 0 # Return first observation obs = self.gen_obs(full_obs=self.full_obs) return obs def reset_with_info(self, *args, **kwargs): obs = self.reset(*args, **kwargs) info = self.generate_info(done=False, reward=0) return obs, info def seed(self, seed=1337): # Seed the random number generator self.np_random, _ = seeding.np_random(seed) return [seed] def hash(self, size=16): """Compute a hash that uniquely identifies the current state of the environment. :param size: Size of the hashing """ sample_hash = hashlib.sha256() to_encode = [self.grid.encode(), self.agent_pos, self.agent_dir] for item in to_encode: sample_hash.update(str(item).encode('utf8')) return sample_hash.hexdigest()[:size] @property def steps_remaining(self): return self.max_steps - self.step_count def is_near(self, pos1, pos2): ax, ay = pos1 wx, wy = pos2 if (ax == wx and abs(ay - wy) == 1) or (ay == wy and abs(ax - wx) == 1): return True return False def get_cell(self, x, y): return self.grid.get(x, y) def __str__(self): """ Produce a pretty string of the environment's grid along with the agent. A grid cell is represented by 2-character string, the first one for the object and the second one for the color. """ # Map of object types to short string OBJECT_TO_STR = { 'wall' : 'W', 'floor' : 'F', 'door' : 'D', 'key' : 'K', 'ball' : 'A', 'box' : 'B', 'goal' : 'G', 'lava' : 'V', } # Short string for opened door OPENDED_DOOR_IDS = '_' # Map agent's direction to short string AGENT_DIR_TO_STR = { 0: '>', 1: 'V', 2: '<', 3: '^' } str = '' for j in range(self.grid.height): for i in range(self.grid.width): if i == self.agent_pos[0] and j == self.agent_pos[1]: str += 2 * AGENT_DIR_TO_STR[self.agent_dir] continue c = self.grid.get(i, j) if c == None: str += ' ' continue if c.type == 'door': if c.is_open: str += '__' elif c.is_locked: str += 'L' + c.color[0].upper() else: str += 'D' + c.color[0].upper() continue str += OBJECT_TO_STR[c.type] + c.color[0].upper() if j < self.grid.height - 1: str += '\n' return str def _gen_grid(self, width, height): assert False, "_gen_grid needs to be implemented by each environment" def _reward(self): """ Compute the reward to be given upon success """ return 1 - self.reward_diminish_factor * (self.step_count / self.max_steps) def _rand_int(self, low, high): """ Generate random integer in [low,high[ """ return self.np_random.randint(low, high) def _rand_float(self, low, high): """ Generate random float in [low,high[ """ return self.np_random.uniform(low, high) def _rand_bool(self): """ Generate random boolean value """ return (self.np_random.randint(0, 2) == 0) def _rand_elem(self, iterable): """ Pick a random element in a list """ lst = list(iterable) idx = self._rand_int(0, len(lst)) return lst[idx] def _rand_subset(self, iterable, num_elems): """ Sample a random subset of distinct elements of a list """ lst = list(iterable) assert num_elems <= len(lst) out = [] while len(out) < num_elems: elem = self._rand_elem(lst) lst.remove(elem) out.append(elem) return out def _rand_color(self): """ Generate a random color name (string) """ return self._rand_elem(COLOR_NAMES) def _rand_pos(self, xLow, xHigh, yLow, yHigh): """ Generate a random (x,y) position tuple """ return ( self.np_random.randint(xLow, xHigh), self.np_random.randint(yLow, yHigh) ) def find_loc(self, top=None, size=None, reject_fn=None, max_tries=math.inf, reject_agent_pos=True, reject_taken_pos=True ): """ Place an object at an empty position in the grid :param top: top-left position of the rectangle where to place :param size: size of the rectangle where to place :param reject_fn: function to filter out potential positions """ if top is None: top = (0, 0) else: top = (max(top[0], 0), max(top[1], 0)) if size is None: size = (self.grid.width, self.grid.height) num_tries = 0 while True: # This is to handle with rare cases where rejection sampling # gets stuck in an infinite loop if num_tries > max_tries: raise RecursionError('rejection sampling failed in place_obj') if num_tries % 10000 == 0 and num_tries > 0: warnings.warn("num_tries = {}. This is probably an infinite loop. {}".format(num_tries, get_traceback())) # warnings.warn("num_tries = {}. This is probably an infinite loop.".format(num_tries)) exit() break num_tries += 1 pos = np.array(( self._rand_int(top[0], min(top[0] + size[0], self.grid.width)), self._rand_int(top[1], min(top[1] + size[1], self.grid.height)) )) # Don't place the object on top of another object if reject_taken_pos: if self.grid.get(*pos) != None: continue # Don't place the object where the agent is if reject_agent_pos and np.array_equal(pos, self.agent_pos): continue # Check if there is a filtering criterion if reject_fn and reject_fn(self, pos): continue break return pos def place_obj(self, obj, top=None, size=None, reject_fn=None, max_tries=math.inf ): """ Place an object at an empty position in the grid :param top: top-left position of the rectangle where to place :param size: size of the rectangle where to place :param reject_fn: function to filter out potential positions """ # if top is None: # top = (0, 0) # else: # top = (max(top[0], 0), max(top[1], 0)) # # if size is None: # size = (self.grid.width, self.grid.height) # # num_tries = 0 # # while True: # # This is to handle with rare cases where rejection sampling # # gets stuck in an infinite loop # if num_tries > max_tries: # raise RecursionError('rejection sampling failed in place_obj') # # num_tries += 1 # # pos = np.array(( # self._rand_int(top[0], min(top[0] + size[0], self.grid.width)), # self._rand_int(top[1], min(top[1] + size[1], self.grid.height)) # )) # # # Don't place the object on top of another object # if self.grid.get(*pos) != None: # continue # # # Don't place the object where the agent is # if np.array_equal(pos, self.agent_pos): # continue # # # Check if there is a filtering criterion # if reject_fn and reject_fn(self, pos): # continue # # break # # self.grid.set(*pos, obj) # # if obj is not None: # obj.init_pos = pos # obj.cur_pos = pos # # return pos pos = self.find_loc( top=top, size=size, reject_fn=reject_fn, max_tries=max_tries ) if obj is None: self.grid.set(*pos, obj) else: self.put_obj_np(obj, pos) return pos def put_obj_np(self, obj, pos): """ Put an object at a specific position in the grid """ assert isinstance(pos, np.ndarray) i, j = pos cell = self.grid.get(i, j) if cell is not None: raise ValueError("trying to put {} on {}".format(obj, cell)) self.grid.set(i, j, obj) obj.init_pos = np.array((i, j)) obj.cur_pos = np.array((i, j)) def put_obj(self, obj, i, j): """ Put an object at a specific position in the grid """ warnings.warn( "This function is kept for backwards compatiblity with minigrid. It is recommended to use put_object_np()." ) raise DeprecationWarning("Deprecated use put_obj_np. (or remove this Warning)") self.grid.set(i, j, obj) obj.init_pos = (i, j) obj.cur_pos = (i, j) def place_agent( self, top=None, size=None, rand_dir=True, max_tries=math.inf ): """ Set the agent's starting point at an empty position in the grid """ self.agent_pos = None pos = self.place_obj(None, top, size, max_tries=max_tries) self.agent_pos = pos if rand_dir: self.agent_dir = self._rand_int(0, 4) return pos @property def dir_vec(self): """ Get the direction vector for the agent, pointing in the direction of forward movement. """ assert self.agent_dir >= 0 and self.agent_dir < 4 return DIR_TO_VEC[self.agent_dir] @property def right_vec(self): """ Get the vector pointing to the right of the agent. """ dx, dy = self.dir_vec return np.array((-dy, dx)) @property def front_pos(self): """ Get the position of the cell that is right in front of the agent """ return self.agent_pos + self.dir_vec @property def back_pos(self): """ Get the position of the cell that is right in front of the agent """ return self.agent_pos - self.dir_vec @property def right_pos(self): """ Get the position of the cell that is right in front of the agent """ return self.agent_pos + self.right_vec @property def left_pos(self): """ Get the position of the cell that is right in front of the agent """ return self.agent_pos - self.right_vec def get_view_coords(self, i, j): """ Translate and rotate absolute grid coordinates (i, j) into the agent's partially observable view (sub-grid). Note that the resulting coordinates may be negative or outside of the agent's view size. """ ax, ay = self.agent_pos dx, dy = self.dir_vec rx, ry = self.right_vec # Compute the absolute coordinates of the top-left view corner sz = self.agent_view_size hs = self.agent_view_size // 2 tx = ax + (dx * (sz-1)) - (rx * hs) ty = ay + (dy * (sz-1)) - (ry * hs) lx = i - tx ly = j - ty # Project the coordinates of the object relative to the top-left # corner onto the agent's own coordinate system vx = (rx*lx + ry*ly) vy = -(dx*lx + dy*ly) return vx, vy def get_view_exts(self, dir=None, view_size=None, pos=None): """ Get the extents of the square set of tiles visible to the agent (or to an npc if specified Note: the bottom extent indices are not included in the set """ # by default compute view exts for agent if (dir is None) and (view_size is None) and (pos is None): dir = self.agent_dir view_size = self.agent_view_size pos = self.agent_pos # Facing right if dir == 0: topX = pos[0] topY = pos[1] - view_size // 2 # Facing down elif dir == 1: topX = pos[0] - view_size // 2 topY = pos[1] # Facing left elif dir == 2: topX = pos[0] - view_size + 1 topY = pos[1] - view_size // 2 # Facing up elif dir == 3: topX = pos[0] - view_size // 2 topY = pos[1] - view_size + 1 else: assert False, "invalid agent direction: {}".format(dir) botX = topX + view_size botY = topY + view_size return (topX, topY, botX, botY) def relative_coords(self, x, y): """ Check if a grid position belongs to the agent's field of view, and returns the corresponding coordinates """ vx, vy = self.get_view_coords(x, y) if vx < 0 or vy < 0 or vx >= self.agent_view_size or vy >= self.agent_view_size: return None return vx, vy def in_view(self, x, y): """ check if a grid position is visible to the agent """ return self.relative_coords(x, y) is not None def agent_sees(self, x, y): """ Check if a non-empty grid position is visible to the agent """ coordinates = self.relative_coords(x, y) if coordinates is None: return False vx, vy = coordinates assert not self.full_obs, "agent sees function not implemented with full_obs" obs = self.gen_obs() obs_grid, _ = Grid.decode(obs['image']) obs_cell = obs_grid.get(vx, vy) world_cell = self.grid.get(x, y) return obs_cell is not None and obs_cell.type == world_cell.type def step(self, action): self.step_count += 1 reward = 0 done = False # Get the position in front of the agent fwd_pos = self.front_pos # Get the contents of the cell in front of the agent fwd_cell = self.grid.get(*fwd_pos) # Rotate left if action == self.actions.left: self.agent_dir -= 1 if self.agent_dir < 0: self.agent_dir += 4 # Rotate right elif action == self.actions.right: self.agent_dir = (self.agent_dir + 1) % 4 # Move forward elif action == self.actions.forward: if fwd_cell != None and fwd_cell.can_push(): fwd_cell.push(push_dir=self.agent_dir, pusher="agent") if fwd_cell == None or fwd_cell.can_overlap(): self.agent_pos = fwd_pos if fwd_cell != None and fwd_cell.type == 'goal': done = True reward = self._reward() if fwd_cell != None and fwd_cell.type == 'lava': done = True # Pick up an object elif hasattr(self.actions, "pickup") and action == self.actions.pickup: if fwd_cell and fwd_cell.can_pickup(): if self.carrying is None: self.carrying = fwd_cell self.carrying.cur_pos = np.array([-1, -1]) self.grid.set(*fwd_pos, None) # Drop an object elif hasattr(self.actions, "drop") and action == self.actions.drop: if not fwd_cell and self.carrying: self.grid.set(*fwd_pos, self.carrying) self.carrying.cur_pos = fwd_pos self.carrying = None # Toggle/activate an object elif action == self.actions.toggle: if fwd_cell: fwd_cell.toggle(self, fwd_pos) # Done action (not used by default) elif action == self.actions.done: pass elif action in map(int, self.actions): # action that was added in an inheriting class (ex. talk action) pass elif np.isnan(action): # action skip pass else: assert False, f"unknown action {action}" if self.step_count >= self.max_steps: done = True obs = self.gen_obs(full_obs=self.full_obs) info = self.generate_info(done, reward) return obs, reward, done, info def generate_info(self, done, reward): success = done and reward > 0 info = {"success": success} gen_extra_info_dict = self.gen_extra_info() # add stuff needed for textual observations here assert not any(item in info for item in gen_extra_info_dict), "Duplicate keys found with gen_extra_info" info = { **info, **gen_extra_info_dict, } return info def gen_extra_info(self): grid, vis_mask = self.gen_obs_grid() carrying = self.carrying agent_pos_vx, agent_pos_vy = self.get_view_coords(self.agent_pos[0], self.agent_pos[1]) npc_actions_dict = SocialAINPCActionsDict extra_info = { "image": grid.encode(vis_mask), "vis_mask": vis_mask, "carrying": carrying, "agent_pos_vx": agent_pos_vx, "agent_pos_vy": agent_pos_vy, "npc_actions_dict": npc_actions_dict } return extra_info def gen_obs_grid(self): """ Generate the sub-grid observed by the agent. This method also outputs a visibility mask telling us which grid cells the agent can actually see. """ topX, topY, botX, botY = self.get_view_exts() grid = self.grid.slice(topX, topY, self.agent_view_size, self.agent_view_size) for i in range(self.agent_dir + 1): grid = grid.rotate_left() # Process occluders and visibility # Note that this incurs some performance cost if not self.see_through_walls: vis_mask = grid.process_vis(agent_pos=(self.agent_view_size // 2, self.agent_view_size - 1)) else: vis_mask = np.ones(shape=(grid.width, grid.height), dtype=np.bool) # Make it so the agent sees what it's carrying # We do this by placing the carried object at the agent's position # in the agent's partially observable view agent_pos = grid.width // 2, grid.height - 1 if self.carrying: grid.set(*agent_pos, self.carrying) else: grid.set(*agent_pos, None) return grid, vis_mask def add_agent_to_grid(self, image): """ Add agent to symbolic pixel image, used only for full observation """ ax, ay = self.agent_pos image[ax,ay] = [9,9,9,self.agent_dir] # could be made cleaner by creating an Agent_id (here we use Lava_id) return image def gen_obs(self, full_obs=False): """ Generate the agent's view (partially observable, low-resolution encoding) Fully observable view can be returned when full_obs is set to True """ if full_obs: image = self.add_agent_to_grid(self.grid.encode()) else: grid, vis_mask = self.gen_obs_grid() # Encode the partially observable view into a numpy array image = grid.encode(vis_mask, absolute_coordinates=not self.egocentric_observation) assert hasattr(self, 'mission'), "environments must define a textual mission string" # Observations are dictionaries containing: # - an image (partially observable view of the environment) # - the agent's direction/orientation (acting as a compass) # - a textual mission string (instructions for the agent) obs = { 'image': image, 'direction': self.agent_dir, 'mission': self.mission } return obs def get_obs_render(self, obs, tile_size=TILE_PIXELS//2): """ Render an agent observation for visualization """ grid, vis_mask = Grid.decode(obs) # Render the whole grid img = grid.render( tile_size, agent_pos=(self.agent_view_size // 2, self.agent_view_size - 1), agent_dir=3, highlight_mask=vis_mask ) return img def render(self, mode='human', close=False, highlight=True, tile_size=TILE_PIXELS, mask_unobserved=False): """ Render the whole-grid human view """ if mode == 'human' and close: if self.window: self.window.close() return if mode == 'human' and not self.window: import gym_minigrid.window self.window = gym_minigrid.window.Window('gym_minigrid') self.window.show(block=False) # Compute which cells are visible to the agent _, vis_mask = self.gen_obs_grid() # Compute the world coordinates of the bottom-left corner # of the agent's view area f_vec = self.dir_vec r_vec = self.right_vec top_left = self.agent_pos + f_vec * (self.agent_view_size-1) - r_vec * (self.agent_view_size // 2) # Mask of which cells to highlight highlight_mask = np.zeros(shape=(self.width, self.height), dtype=np.bool) # For each cell in the visibility mask for vis_j in range(0, self.agent_view_size): for vis_i in range(0, self.agent_view_size): # If this cell is not visible, don't highlight it if not vis_mask[vis_i, vis_j]: continue # Compute the world coordinates of this cell abs_i, abs_j = top_left - (f_vec * vis_j) + (r_vec * vis_i) if abs_i < 0 or abs_i >= self.width: continue if abs_j < 0 or abs_j >= self.height: continue # Mark this cell to be highlighted highlight_mask[abs_i, abs_j] = True # Render the whole grid img = self.grid.render( tile_size, self.agent_pos, self.agent_dir, highlight_mask=highlight_mask if highlight else None, mask_unobserved=mask_unobserved ) if mode == 'human': # self.window.set_caption(self.mission) self.window.show_img(img) return img def get_mission(self): return self.mission def close(self): if self.window: self.window.close() return def gen_text_obs(self): grid, vis_mask = self.gen_obs_grid() # Encode the partially observable view into a numpy array image = grid.encode(vis_mask) # (OBJECT_TO_IDX[self.type], COLOR_TO_IDX[self.color], state) # State, 0: open, 1: closed, 2: locked IDX_TO_COLOR = dict(zip(COLOR_TO_IDX.values(), COLOR_TO_IDX.keys())) IDX_TO_OBJECT = dict(zip(OBJECT_TO_IDX.values(), OBJECT_TO_IDX.keys())) list_textual_descriptions = [] if self.carrying is not None: list_textual_descriptions.append("You carry a {} {}".format(self.carrying.color, self.carrying.type)) agent_pos_vx, agent_pos_vy = self.get_view_coords(self.agent_pos[0], self.agent_pos[1]) view_field_dictionary = dict() for i in range(image.shape[0]): for j in range(image.shape[1]): if image[i][j][0] != 0 and image[i][j][0] != 1 and image[i][j][0] != 2: if i not in view_field_dictionary.keys(): view_field_dictionary[i] = dict() view_field_dictionary[i][j] = image[i][j] else: view_field_dictionary[i][j] = image[i][j] # Find the wall if any # We describe a wall only if there is no objects between the agent and the wall in straight line # Find wall in front add_wall_descr = False if add_wall_descr: j = agent_pos_vy - 1 object_seen = False while j >= 0 and not object_seen: if image[agent_pos_vx][j][0] != 0 and image[agent_pos_vx][j][0] != 1: if image[agent_pos_vx][j][0] == 2: list_textual_descriptions.append( f"A wall is {agent_pos_vy - j} steps in front of you. \n") # forward object_seen = True else: object_seen = True j -= 1 # Find wall left i = agent_pos_vx - 1 object_seen = False while i >= 0 and not object_seen: if image[i][agent_pos_vy][0] != 0 and image[i][agent_pos_vy][0] != 1: if image[i][agent_pos_vy][0] == 2: list_textual_descriptions.append( f"A wall is {agent_pos_vx - i} steps to the left. \n") # left object_seen = True else: object_seen = True i -= 1 # Find wall right i = agent_pos_vx + 1 object_seen = False while i < image.shape[0] and not object_seen: if image[i][agent_pos_vy][0] != 0 and image[i][agent_pos_vy][0] != 1: if image[i][agent_pos_vy][0] == 2: list_textual_descriptions.append( f"A wall is {i - agent_pos_vx} steps to the right. \n") # right object_seen = True else: object_seen = True i += 1 # list_textual_descriptions.append("You see the following objects: ") # returns the position of seen objects relative to you for i in view_field_dictionary.keys(): for j in view_field_dictionary[i].keys(): if i != agent_pos_vx or j != agent_pos_vy: object = view_field_dictionary[i][j] front_dist = agent_pos_vy - j left_right_dist = i-agent_pos_vx loc_descr = "" if front_dist == 1 and left_right_dist == 0: loc_descr += "Right in front of you " elif left_right_dist == 1 and front_dist == 0: loc_descr += "Just to the right of you" elif left_right_dist == -1 and front_dist == 0: loc_descr += "Just to the left of you" else: front_str = str(front_dist)+" steps in front of you " if front_dist > 0 else "" loc_descr += front_str suff = "s" if abs(left_right_dist) > 0 else "" and_ = "and" if loc_descr != "" else "" if left_right_dist < 0: left_right_str = f"{and_} {-left_right_dist} step{suff} to the left" loc_descr += left_right_str elif left_right_dist > 0: left_right_str = f"{and_} {left_right_dist} step{suff} to the right" loc_descr += left_right_str else: left_right_str = "" loc_descr += left_right_str loc_descr += f" there is a " obj_type = IDX_TO_OBJECT[object[0]] if obj_type == "npc": IDX_TO_STATE = {0: 'friendly', 1: 'antagonistic'} description = f"{IDX_TO_STATE[object[2]]} {IDX_TO_COLOR[object[1]]} peer. " # gaze gaze_dir = { 0: "towards you", 1: "to the left of you", 2: "in the same direction as you", 3: "to the right of you", } description += f"It is looking {gaze_dir[object[3]]}. " # point point_dir = { 0: "towards you", 1: "to the left of you", 2: "in the same direction as you", 3: "to the right of you", } if object[4] != 255: description += f"It is pointing {point_dir[object[4]]}. " # last action last_action = {v: k for k, v in SocialAINPCActionsDict.items()}[object[5]] last_action = { "go_forward": "foward", "rotate_left": "turn left", "rotate_right": "turn right", "toggle_action": "toggle", "point_stop_point": "stop pointing", "point_E": "", "point_S": "", "point_W": "", "point_N": "", "stop_point": "stop pointing", "no_op": "" }[last_action] if last_action not in ["no_op", ""]: description += f"It's last action is {last_action}. " elif obj_type in ["switch", "apple", "generatorplatform", "marble", "marbletee", "fence"]: if obj_type == "switch": # assumes that Switch.no_light == True assert object[-1] == 0 description = f"{IDX_TO_COLOR[object[1]]} {IDX_TO_OBJECT[object[0]]} " assert object[2:].mean() == 0 elif obj_type == "lockablebox": IDX_TO_STATE = {0: 'open', 1: 'closed', 2: 'locked'} description = f"{IDX_TO_STATE[object[2]]} {IDX_TO_COLOR[object[1]]} {IDX_TO_OBJECT[object[0]]} " assert object[3:].mean() == 0 elif obj_type == "applegenerator": IDX_TO_STATE = {1: 'square', 2: 'round'} description = f"{IDX_TO_STATE[object[2]]} {IDX_TO_COLOR[object[1]]} {IDX_TO_OBJECT[object[0]]} " assert object[3:].mean() == 0 elif obj_type == "remotedoor": IDX_TO_STATE = {0: 'open', 1: 'closed'} description = f"{IDX_TO_STATE[object[2]]} {IDX_TO_COLOR[object[1]]} {IDX_TO_OBJECT[object[0]]} " assert object[3:].mean() == 0 elif obj_type == "door": IDX_TO_STATE = {0: 'open', 1: 'closed', 2: 'locked'} description = f"{IDX_TO_STATE[object[2]]} {IDX_TO_COLOR[object[1]]} {IDX_TO_OBJECT[object[0]]} " assert object[3:].mean() == 0 elif obj_type == "lever": IDX_TO_STATE = {1: 'activated', 0: 'unactivated'} if object[3] == 255: countdown_txt = "" else: countdown_txt = f"with {object[3]} timesteps left. " description = f"{IDX_TO_STATE[object[2]]} {IDX_TO_COLOR[object[1]]} {IDX_TO_OBJECT[object[0]]} {countdown_txt}" assert object[4:].mean() == 0 else: raise ValueError(f"Undefined object type {obj_type}") full_destr = loc_descr + description + "\n" list_textual_descriptions.append(full_destr) if len(list_textual_descriptions) == 0: list_textual_descriptions.append("\n") return {'descriptions': list_textual_descriptions} class MultiModalMiniGridEnv(MiniGridEnv): grammar = None def reset(self, *args, **kwargs): obs = super().reset() self.append_existing_utterance_to_history() obs = self.add_utterance_to_observation(obs) self.reset_utterance() return obs def append_existing_utterance_to_history(self): if self.utterance != self.empty_symbol: if self.utterance.startswith(self.empty_symbol): self.utterance_history += self.utterance[len(self.empty_symbol):] else: assert self.utterance == self.beginning_string self.utterance_history += self.utterance def add_utterance_to_observation(self, obs): obs["utterance"] = self.utterance obs["utterance_history"] = self.utterance_history obs["mission"] = "Hidden" return obs def reset_utterance(self): # set utterance to empty indicator self.utterance = self.empty_symbol def render(self, *args, show_dialogue=True, **kwargs): obs = super().render(*args, **kwargs) if args and args[0] == 'human': # draw text to the side of the image self.window.clear_text() # erase previous text if show_dialogue: self.window.set_caption(self.full_conversation) # self.window.ax.set_title("correct color: {}".format(self.box.target_color), loc="left", fontsize=10) if self.outcome_info: color = None if "SUCCESS" in self.outcome_info: color = "lime" elif "FAILURE" in self.outcome_info: color = "red" self.window.add_text(*(0.01, 0.85, self.outcome_info), **{'fontsize': 15, 'color': color, 'weight': "bold"}) self.window.show_img(obs) # re-draw image to add changes to window return obs def add_obstacles(self): self.obstacles = self.parameters.get("Obstacles", "No") if self.parameters else "No" if self.obstacles != "No": n_stumps_range = { "A_bit": (1, 2), "Medium": (3, 4), "A_lot": (5, 6), }[self.obstacles] n_stumps = random.randint(*n_stumps_range) for _ in range(n_stumps): self.wall_start_x = self._rand_int(1, self.current_width - 2) self.wall_start_y = self._rand_int(1, self.current_height - 2) if random.choice([True, False]): self.grid.horz_wall( x=self.wall_start_x, y=self.wall_start_y, length=1 ) else: self.grid.horz_wall( x=self.wall_start_x, y=self.wall_start_y, length=1 )