# Copyright 2017 The TensorFlow Authors All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================== """Utilities for environment interface with agent / tensorflow.""" from __future__ import absolute_import from __future__ import division from __future__ import print_function import numpy as np from six.moves import xrange class spaces(object): discrete = 0 box = 1 def get_space(space): if hasattr(space, 'n'): return space.n, spaces.discrete, None elif hasattr(space, 'shape'): return np.prod(space.shape), spaces.box, (space.low, space.high) def get_spaces(spaces): if hasattr(spaces, 'spaces'): return zip(*[get_space(space) for space in spaces.spaces]) else: return [(ret,) for ret in get_space(spaces)] class EnvSpec(object): def __init__(self, env, try_combining_actions=True, discretize_actions=None): self.discretize_actions = discretize_actions # figure out observation space self.obs_space = env.observation_space self.obs_dims, self.obs_types, self.obs_info = get_spaces(self.obs_space) # figure out action space self.act_space = env.action_space self.act_dims, self.act_types, self.act_info = get_spaces(self.act_space) if self.discretize_actions: self._act_dims = self.act_dims[:] self._act_types = self.act_types[:] self.act_dims = [] self.act_types = [] for i, (dim, typ) in enumerate(zip(self._act_dims, self._act_types)): if typ == spaces.discrete: self.act_dims.append(dim) self.act_types.append(spaces.discrete) elif typ == spaces.box: for _ in xrange(dim): self.act_dims.append(self.discretize_actions) self.act_types.append(spaces.discrete) else: self._act_dims = None self._act_types = None if (try_combining_actions and all(typ == spaces.discrete for typ in self.act_types)): self.combine_actions = True self.orig_act_dims = self.act_dims[:] self.orig_act_types = self.act_types[:] total_act_dim = 1 for dim in self.act_dims: total_act_dim *= dim self.act_dims = [total_act_dim] self.act_types = [spaces.discrete] else: self.combine_actions = False self.obs_dims_and_types = tuple(zip(self.obs_dims, self.obs_types)) self.act_dims_and_types = tuple(zip(self.act_dims, self.act_types)) self.total_obs_dim = sum(self.obs_dims) self.total_sampling_act_dim = sum(self.sampling_dim(dim, typ) for dim, typ in self.act_dims_and_types) self.total_sampled_act_dim = sum(self.act_dims) def sampling_dim(self, dim, typ): if typ == spaces.discrete: return dim elif typ == spaces.box: return 2 * dim # Gaussian mean and std else: assert False def convert_actions_to_env(self, actions): if self.combine_actions: new_actions = [] actions = actions[0] for dim in self.orig_act_dims: new_actions.append(np.mod(actions, dim)) actions = (actions / dim).astype('int32') actions = new_actions if self.discretize_actions: new_actions = [] idx = 0 for i, (dim, typ) in enumerate(zip(self._act_dims, self._act_types)): if typ == spaces.discrete: new_actions.append(actions[idx]) idx += 1 elif typ == spaces.box: low, high = self.act_info[i] cur_action = [] for j in xrange(dim): cur_action.append( low[j] + (high[j] - low[j]) * actions[idx] / float(self.discretize_actions)) idx += 1 new_actions.append(np.hstack(cur_action)) actions = new_actions return actions def convert_env_actions_to_actions(self, actions): if not self.combine_actions: return actions new_actions = 0 base = 1 for act, dim in zip(actions, self.orig_act_dims): new_actions = new_actions + base * act base *= dim return [new_actions] def convert_obs_to_list(self, obs): if len(self.obs_dims) == 1: return [obs] else: return list(obs) def convert_action_to_gym(self, action): if len(action) == 1: return action[0] else: return list(action) if ((not self.combine_actions or len(self.orig_act_dims) == 1) and (len(self.act_dims) == 1 or (self.discretize_actions and len(self._act_dims) == 1))): return action[0] else: return list(action) def initial_obs(self, batch_size): batched = batch_size is not None batch_size = batch_size or 1 obs = [] for dim, typ in self.obs_dims_and_types: if typ == spaces.discrete: obs.append(np.zeros(batch_size)) elif typ == spaces.box: obs.append(np.zeros([batch_size, dim])) if batched: return obs else: return zip(*obs)[0] def initial_act(self, batch_size=None): batched = batch_size is not None batch_size = batch_size or 1 act = [] for dim, typ in self.act_dims_and_types: if typ == spaces.discrete: act.append(-np.ones(batch_size)) elif typ == spaces.box: act.append(-np.ones([batch_size, dim])) if batched: return act else: return zip(*act)[0] def is_discrete(self, typ): return typ == spaces.discrete def is_box(self, typ): return typ == spaces.box