NCTC / models /research /pcl_rl /env_spec.py
NCTCMumbai's picture
Upload 2571 files
0b8359d
raw
history blame
6.01 kB
# Copyright 2017 The TensorFlow Authors All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Utilities for environment interface with agent / tensorflow."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import numpy as np
from six.moves import xrange
class spaces(object):
discrete = 0
box = 1
def get_space(space):
if hasattr(space, 'n'):
return space.n, spaces.discrete, None
elif hasattr(space, 'shape'):
return np.prod(space.shape), spaces.box, (space.low, space.high)
def get_spaces(spaces):
if hasattr(spaces, 'spaces'):
return zip(*[get_space(space) for space in spaces.spaces])
else:
return [(ret,) for ret in get_space(spaces)]
class EnvSpec(object):
def __init__(self, env, try_combining_actions=True,
discretize_actions=None):
self.discretize_actions = discretize_actions
# figure out observation space
self.obs_space = env.observation_space
self.obs_dims, self.obs_types, self.obs_info = get_spaces(self.obs_space)
# figure out action space
self.act_space = env.action_space
self.act_dims, self.act_types, self.act_info = get_spaces(self.act_space)
if self.discretize_actions:
self._act_dims = self.act_dims[:]
self._act_types = self.act_types[:]
self.act_dims = []
self.act_types = []
for i, (dim, typ) in enumerate(zip(self._act_dims, self._act_types)):
if typ == spaces.discrete:
self.act_dims.append(dim)
self.act_types.append(spaces.discrete)
elif typ == spaces.box:
for _ in xrange(dim):
self.act_dims.append(self.discretize_actions)
self.act_types.append(spaces.discrete)
else:
self._act_dims = None
self._act_types = None
if (try_combining_actions and
all(typ == spaces.discrete for typ in self.act_types)):
self.combine_actions = True
self.orig_act_dims = self.act_dims[:]
self.orig_act_types = self.act_types[:]
total_act_dim = 1
for dim in self.act_dims:
total_act_dim *= dim
self.act_dims = [total_act_dim]
self.act_types = [spaces.discrete]
else:
self.combine_actions = False
self.obs_dims_and_types = tuple(zip(self.obs_dims, self.obs_types))
self.act_dims_and_types = tuple(zip(self.act_dims, self.act_types))
self.total_obs_dim = sum(self.obs_dims)
self.total_sampling_act_dim = sum(self.sampling_dim(dim, typ)
for dim, typ in self.act_dims_and_types)
self.total_sampled_act_dim = sum(self.act_dims)
def sampling_dim(self, dim, typ):
if typ == spaces.discrete:
return dim
elif typ == spaces.box:
return 2 * dim # Gaussian mean and std
else:
assert False
def convert_actions_to_env(self, actions):
if self.combine_actions:
new_actions = []
actions = actions[0]
for dim in self.orig_act_dims:
new_actions.append(np.mod(actions, dim))
actions = (actions / dim).astype('int32')
actions = new_actions
if self.discretize_actions:
new_actions = []
idx = 0
for i, (dim, typ) in enumerate(zip(self._act_dims, self._act_types)):
if typ == spaces.discrete:
new_actions.append(actions[idx])
idx += 1
elif typ == spaces.box:
low, high = self.act_info[i]
cur_action = []
for j in xrange(dim):
cur_action.append(
low[j] + (high[j] - low[j]) * actions[idx] /
float(self.discretize_actions))
idx += 1
new_actions.append(np.hstack(cur_action))
actions = new_actions
return actions
def convert_env_actions_to_actions(self, actions):
if not self.combine_actions:
return actions
new_actions = 0
base = 1
for act, dim in zip(actions, self.orig_act_dims):
new_actions = new_actions + base * act
base *= dim
return [new_actions]
def convert_obs_to_list(self, obs):
if len(self.obs_dims) == 1:
return [obs]
else:
return list(obs)
def convert_action_to_gym(self, action):
if len(action) == 1:
return action[0]
else:
return list(action)
if ((not self.combine_actions or len(self.orig_act_dims) == 1) and
(len(self.act_dims) == 1 or
(self.discretize_actions and len(self._act_dims) == 1))):
return action[0]
else:
return list(action)
def initial_obs(self, batch_size):
batched = batch_size is not None
batch_size = batch_size or 1
obs = []
for dim, typ in self.obs_dims_and_types:
if typ == spaces.discrete:
obs.append(np.zeros(batch_size))
elif typ == spaces.box:
obs.append(np.zeros([batch_size, dim]))
if batched:
return obs
else:
return zip(*obs)[0]
def initial_act(self, batch_size=None):
batched = batch_size is not None
batch_size = batch_size or 1
act = []
for dim, typ in self.act_dims_and_types:
if typ == spaces.discrete:
act.append(-np.ones(batch_size))
elif typ == spaces.box:
act.append(-np.ones([batch_size, dim]))
if batched:
return act
else:
return zip(*act)[0]
def is_discrete(self, typ):
return typ == spaces.discrete
def is_box(self, typ):
return typ == spaces.box