sgoodfriend's picture
A2C playing PongNoFrameskip-v4 from https://github.com/sgoodfriend/rl-algo-impls/tree/983cb75e43e51cf4ef57f177194ab9a4a1a8808b
05b94c0
from typing import Dict, Optional, Tuple, Type
import numpy as np
import torch
import torch.nn as nn
from numpy.typing import NDArray
from torch.distributions import Distribution, constraints
from rl_algo_impls.shared.actor.actor import Actor, PiForward, pi_forward
from rl_algo_impls.shared.actor.categorical import MaskedCategorical
from rl_algo_impls.shared.encoder import EncoderOutDim
from rl_algo_impls.shared.module.utils import mlp
class MultiCategorical(Distribution):
def __init__(
self,
nvec: NDArray[np.int64],
probs=None,
logits=None,
validate_args=None,
masks: Optional[torch.Tensor] = None,
):
# Either probs or logits should be set
assert (probs is None) != (logits is None)
masks_split = (
torch.split(masks, nvec.tolist(), dim=1)
if masks is not None
else [None] * len(nvec)
)
if probs:
self.dists = [
MaskedCategorical(probs=p, validate_args=validate_args, mask=m)
for p, m in zip(torch.split(probs, nvec.tolist(), dim=1), masks_split)
]
param = probs
else:
assert logits is not None
self.dists = [
MaskedCategorical(logits=lg, validate_args=validate_args, mask=m)
for lg, m in zip(torch.split(logits, nvec.tolist(), dim=1), masks_split)
]
param = logits
batch_shape = param.size()[:-1] if param.ndimension() > 1 else torch.Size()
super().__init__(batch_shape=batch_shape, validate_args=validate_args)
def log_prob(self, action: torch.Tensor) -> torch.Tensor:
prob_stack = torch.stack(
[c.log_prob(a) for a, c in zip(action.T, self.dists)], dim=-1
)
return prob_stack.sum(dim=-1)
def entropy(self) -> torch.Tensor:
return torch.stack([c.entropy() for c in self.dists], dim=-1).sum(dim=-1)
def sample(self, sample_shape: torch.Size = torch.Size()) -> torch.Tensor:
return torch.stack([c.sample(sample_shape) for c in self.dists], dim=-1)
@property
def mode(self) -> torch.Tensor:
return torch.stack([c.mode for c in self.dists], dim=-1)
@property
def arg_constraints(self) -> Dict[str, constraints.Constraint]:
# Constraints handled by child distributions in dist
return {}
class MultiDiscreteActorHead(Actor):
def __init__(
self,
nvec: NDArray[np.int64],
in_dim: EncoderOutDim,
hidden_sizes: Tuple[int, ...] = (32,),
activation: Type[nn.Module] = nn.ReLU,
init_layers_orthogonal: bool = True,
) -> None:
super().__init__()
self.nvec = nvec
assert isinstance(in_dim, int)
layer_sizes = (in_dim,) + hidden_sizes + (nvec.sum(),)
self._fc = mlp(
layer_sizes,
activation,
init_layers_orthogonal=init_layers_orthogonal,
final_layer_gain=0.01,
)
def forward(
self,
obs: torch.Tensor,
actions: Optional[torch.Tensor] = None,
action_masks: Optional[torch.Tensor] = None,
) -> PiForward:
logits = self._fc(obs)
pi = MultiCategorical(self.nvec, logits=logits, masks=action_masks)
return pi_forward(pi, actions)
@property
def action_shape(self) -> Tuple[int, ...]:
return (len(self.nvec),)