Spaces:
Running
Running
import os | |
os.environ["CUDA_VISIBLE_DEVICES"] = "" | |
import threading | |
import gym | |
import multiprocessing | |
import numpy as np | |
from queue import Queue | |
import argparse | |
import matplotlib.pyplot as plt | |
import tensorflow as tf | |
from tensorflow.python import keras | |
from tensorflow.python.keras import layers | |
parser = argparse.ArgumentParser(description='Run A3C algorithm on the game ' | |
'Cartpole.') | |
parser.add_argument('--algorithm', default='a3c', type=str, | |
help='Choose between \'a3c\' and \'random\'.') | |
parser.add_argument('--train', dest='train', action='store_true', | |
help='Train our model.') | |
parser.add_argument('--lr', default=0.001, | |
help='Learning rate for the shared optimizer.') | |
parser.add_argument('--update-freq', default=20, type=int, | |
help='How often to update the global model.') | |
parser.add_argument('--max-eps', default=1000, type=int, | |
help='Global maximum number of episodes to run.') | |
parser.add_argument('--gamma', default=0.99, | |
help='Discount factor of rewards.') | |
parser.add_argument('--save-dir', default='/tmp/', type=str, | |
help='Directory in which you desire to save the model.') | |
args = parser.parse_args() | |
class ActorCriticModel(keras.Model): | |
def __init__(self, state_size, action_size): | |
super(ActorCriticModel, self).__init__() | |
self.state_size = state_size | |
self.action_size = action_size | |
self.dense1 = layers.Dense(100, activation='relu') | |
self.policy_logits = layers.Dense(action_size) | |
self.dense2 = layers.Dense(100, activation='relu') | |
self.values = layers.Dense(1) | |
def call(self, inputs): | |
# Forward pass | |
x = self.dense1(inputs) | |
logits = self.policy_logits(x) | |
v1 = self.dense2(inputs) | |
values = self.values(v1) | |
return logits, values | |
def record(episode, | |
episode_reward, | |
worker_idx, | |
global_ep_reward, | |
result_queue, | |
total_loss, | |
num_steps): | |
"""Helper function to store score and print statistics. | |
Arguments: | |
episode: Current episode | |
episode_reward: Reward accumulated over the current episode | |
worker_idx: Which thread (worker) | |
global_ep_reward: The moving average of the global reward | |
result_queue: Queue storing the moving average of the scores | |
total_loss: The total loss accumualted over the current episode | |
num_steps: The number of steps the episode took to complete | |
""" | |
if global_ep_reward == 0: | |
global_ep_reward = episode_reward | |
else: | |
global_ep_reward = global_ep_reward * 0.99 + episode_reward * 0.01 | |
print( | |
f"Episode: {episode} | " | |
f"Moving Average Reward: {int(global_ep_reward)} | " | |
f"Episode Reward: {int(episode_reward)} | " | |
f"Loss: {int(total_loss / float(num_steps) * 1000) / 1000} | " | |
f"Steps: {num_steps} | " | |
f"Worker: {worker_idx}" | |
) | |
result_queue.put(global_ep_reward) | |
return global_ep_reward | |
class RandomAgent: | |
"""Random Agent that will play the specified game | |
Arguments: | |
env_name: Name of the environment to be played | |
max_eps: Maximum number of episodes to run agent for. | |
""" | |
def __init__(self, env_name, max_eps): | |
self.env = gym.make(env_name) | |
self.max_episodes = max_eps | |
self.global_moving_average_reward = 0 | |
self.res_queue = Queue() | |
def run(self): | |
reward_avg = 0 | |
for episode in range(self.max_episodes): | |
done = False | |
self.env.reset() | |
reward_sum = 0.0 | |
steps = 0 | |
while not done: | |
# Sample randomly from the action space and step | |
_, reward, done, _ = self.env.step(self.env.action_space.sample()) | |
steps += 1 | |
reward_sum += reward | |
# Record statistics | |
self.global_moving_average_reward = record(episode, | |
reward_sum, | |
0, | |
self.global_moving_average_reward, | |
self.res_queue, 0, steps) | |
reward_avg += reward_sum | |
final_avg = reward_avg / float(self.max_episodes) | |
print("Average score across {} episodes: {}".format(self.max_episodes, final_avg)) | |
return final_avg | |
class MasterAgent(): | |
def __init__(self): | |
self.game_name = 'CartPole-v0' | |
save_dir = args.save_dir | |
self.save_dir = save_dir | |
if not os.path.exists(save_dir): | |
os.makedirs(save_dir) | |
env = gym.make(self.game_name) | |
self.state_size = env.observation_space.shape[0] | |
self.action_size = env.action_space.n | |
self.opt = tf.compat.v1.train.AdamOptimizer(args.lr, use_locking=True) | |
print(self.state_size, self.action_size) | |
self.global_model = ActorCriticModel(self.state_size, self.action_size) # global network | |
self.global_model(tf.convert_to_tensor(np.random.random((1, self.state_size)), dtype=tf.float32)) | |
def train(self): | |
if args.algorithm == 'random': | |
random_agent = RandomAgent(self.game_name, args.max_eps) | |
random_agent.run() | |
return | |
res_queue = Queue() | |
workers = [Worker(self.state_size, | |
self.action_size, | |
self.global_model, | |
self.opt, res_queue, | |
i, game_name=self.game_name, | |
save_dir=self.save_dir) for i in range(multiprocessing.cpu_count())] | |
for i, worker in enumerate(workers): | |
print("Starting worker {}".format(i)) | |
worker.start() | |
moving_average_rewards = [] # record episode reward to plot | |
while True: | |
reward = res_queue.get() | |
if reward is not None: | |
moving_average_rewards.append(reward) | |
else: | |
break | |
[w.join() for w in workers] | |
plt.plot(moving_average_rewards) | |
plt.ylabel('Moving average ep reward') | |
plt.xlabel('Step') | |
plt.savefig(os.path.join(self.save_dir, | |
'{} Moving Average.png'.format(self.game_name))) | |
plt.show() | |
def play(self): | |
env = gym.make(self.game_name).unwrapped | |
state = env.reset() | |
model = self.global_model | |
model_path = os.path.join(self.save_dir, 'model_{}.h5'.format(self.game_name)) | |
print('Loading model from: {}'.format(model_path)) | |
model.load_weights(model_path) | |
done = False | |
step_counter = 0 | |
reward_sum = 0 | |
try: | |
while not done: | |
env.render(mode='rgb_array') | |
policy, value = model(tf.convert_to_tensor(state[None, :], dtype=tf.float32)) | |
policy = tf.nn.softmax(policy) | |
action = np.argmax(policy) | |
state, reward, done, _ = env.step(action) | |
reward_sum += reward | |
print("{}. Reward: {}, action: {}".format(step_counter, reward_sum, action)) | |
step_counter += 1 | |
except KeyboardInterrupt: | |
print("Received Keyboard Interrupt. Shutting down.") | |
finally: | |
env.close() | |
class Memory: | |
def __init__(self): | |
self.states = [] | |
self.actions = [] | |
self.rewards = [] | |
def store(self, state, action, reward): | |
self.states.append(state) | |
self.actions.append(action) | |
self.rewards.append(reward) | |
def clear(self): | |
self.states = [] | |
self.actions = [] | |
self.rewards = [] | |
class Worker(threading.Thread): | |
# Set up global variables across different threads | |
global_episode = 0 | |
# Moving average reward | |
global_moving_average_reward = 0 | |
best_score = 0 | |
save_lock = threading.Lock() | |
def __init__(self, | |
state_size, | |
action_size, | |
global_model, | |
opt, | |
result_queue, | |
idx, | |
game_name='CartPole-v0', | |
save_dir='/tmp'): | |
super(Worker, self).__init__() | |
self.state_size = state_size | |
self.action_size = action_size | |
self.result_queue = result_queue | |
self.global_model = global_model | |
self.opt = opt | |
self.local_model = ActorCriticModel(self.state_size, self.action_size) | |
self.worker_idx = idx | |
self.game_name = game_name | |
self.env = gym.make(self.game_name).unwrapped | |
self.save_dir = save_dir | |
self.ep_loss = 0.0 | |
def run(self): | |
total_step = 1 | |
mem = Memory() | |
while Worker.global_episode < args.max_eps: | |
current_state = self.env.reset() | |
mem.clear() | |
ep_reward = 0. | |
ep_steps = 0 | |
self.ep_loss = 0 | |
time_count = 0 | |
done = False | |
while not done: | |
logits, _ = self.local_model( | |
tf.convert_to_tensor(current_state[None, :], | |
dtype=tf.float32)) | |
probs = tf.nn.softmax(logits) | |
action = np.random.choice(self.action_size, p=probs.numpy()[0]) | |
new_state, reward, done, _ = self.env.step(action) | |
if done: | |
reward = -1 | |
ep_reward += reward | |
mem.store(current_state, action, reward) | |
if time_count == args.update_freq or done: | |
# Calculate gradient wrt to local model. We do so by tracking the | |
# variables involved in computing the loss by using tf.GradientTape | |
with tf.GradientTape() as tape: | |
total_loss = self.compute_loss(done, | |
new_state, | |
mem, | |
args.gamma) | |
self.ep_loss += total_loss | |
# Calculate local gradients | |
grads = tape.gradient(total_loss, self.local_model.trainable_weights) | |
# Push local gradients to global model | |
self.opt.apply_gradients(zip(grads, | |
self.global_model.trainable_weights)) | |
# Update local model with new weights | |
self.local_model.set_weights(self.global_model.get_weights()) | |
mem.clear() | |
time_count = 0 | |
if done: # done and print information | |
Worker.global_moving_average_reward = \ | |
record(Worker.global_episode, ep_reward, self.worker_idx, | |
Worker.global_moving_average_reward, self.result_queue, | |
self.ep_loss, ep_steps) | |
# We must use a lock to save our model and to print to prevent data races. | |
if ep_reward > Worker.best_score: | |
with Worker.save_lock: | |
print("Saving best model to {}, " | |
"episode score: {}".format(self.save_dir, ep_reward)) | |
self.global_model.save_weights( | |
os.path.join(self.save_dir, | |
'model_{}.h5'.format(self.game_name)) | |
) | |
Worker.best_score = ep_reward | |
Worker.global_episode += 1 | |
ep_steps += 1 | |
time_count += 1 | |
current_state = new_state | |
total_step += 1 | |
self.result_queue.put(None) | |
def compute_loss(self, | |
done, | |
new_state, | |
memory, | |
gamma=0.99): | |
if done: | |
reward_sum = 0. # terminal | |
else: | |
reward_sum = self.local_model( | |
tf.convert_to_tensor(new_state[None, :], | |
dtype=tf.float32))[-1].numpy()[0] | |
# Get discounted rewards | |
discounted_rewards = [] | |
for reward in memory.rewards[::-1]: # reverse buffer r | |
reward_sum = reward + gamma * reward_sum | |
discounted_rewards.append(reward_sum) | |
discounted_rewards.reverse() | |
logits, values = self.local_model( | |
tf.convert_to_tensor(np.vstack(memory.states), | |
dtype=tf.float32)) | |
# Get our advantages | |
advantage = tf.convert_to_tensor(np.array(discounted_rewards)[:, None], | |
dtype=tf.float32) - values | |
# Value loss | |
value_loss = advantage ** 2 | |
# Calculate our policy loss | |
policy = tf.nn.softmax(logits) | |
entropy = tf.nn.softmax_cross_entropy_with_logits(labels=policy, logits=logits) | |
policy_loss = tf.nn.sparse_softmax_cross_entropy_with_logits(labels=memory.actions, | |
logits=logits) | |
policy_loss *= tf.stop_gradient(advantage) | |
policy_loss -= 0.01 * entropy | |
total_loss = tf.reduce_mean((0.5 * value_loss + policy_loss)) | |
return total_loss | |
if __name__ == '__main__': | |
print(args) | |
master = MasterAgent() | |
if args.train: | |
master.train() | |
else: | |
master.play() | |