PPO playing starpilot from https://github.com/sgoodfriend/rl-algo-impls/tree/227aa2fbde36e688a09d8ad309b0947721eef160
80e385e
import numpy as np | |
from dataclasses import dataclass | |
from torch.utils.tensorboard.writer import SummaryWriter | |
from typing import Dict, List, Optional, Sequence, TypeVar | |
class Episode: | |
score: float = 0 | |
length: int = 0 | |
StatisticSelf = TypeVar("StatisticSelf", bound="Statistic") | |
class Statistic: | |
values: np.ndarray | |
round_digits: int = 2 | |
def mean(self) -> float: | |
return np.mean(self.values).item() | |
def std(self) -> float: | |
return np.std(self.values).item() | |
def min(self) -> float: | |
return np.min(self.values).item() | |
def max(self) -> float: | |
return np.max(self.values).item() | |
def sum(self) -> float: | |
return np.sum(self.values).item() | |
def __len__(self) -> int: | |
return len(self.values) | |
def _diff(self: StatisticSelf, o: StatisticSelf) -> float: | |
return (self.mean - self.std) - (o.mean - o.std) | |
def __gt__(self: StatisticSelf, o: StatisticSelf) -> bool: | |
return self._diff(o) > 0 | |
def __ge__(self: StatisticSelf, o: StatisticSelf) -> bool: | |
return self._diff(o) >= 0 | |
def __repr__(self) -> str: | |
mean = round(self.mean, self.round_digits) | |
std = round(self.std, self.round_digits) | |
if self.round_digits == 0: | |
mean = int(mean) | |
std = int(std) | |
return f"{mean} +/- {std}" | |
def to_dict(self) -> Dict[str, float]: | |
return { | |
"mean": self.mean, | |
"std": self.std, | |
"min": self.min, | |
"max": self.max, | |
} | |
EpisodesStatsSelf = TypeVar("EpisodesStatsSelf", bound="EpisodesStats") | |
class EpisodesStats: | |
episodes: Sequence[Episode] | |
simple: bool | |
score: Statistic | |
length: Statistic | |
def __init__(self, episodes: Sequence[Episode], simple: bool = False) -> None: | |
self.episodes = episodes | |
self.simple = simple | |
self.score = Statistic(np.array([e.score for e in episodes])) | |
self.length = Statistic(np.array([e.length for e in episodes]), round_digits=0) | |
def __gt__(self: EpisodesStatsSelf, o: EpisodesStatsSelf) -> bool: | |
return self.score > o.score | |
def __ge__(self: EpisodesStatsSelf, o: EpisodesStatsSelf) -> bool: | |
return self.score >= o.score | |
def __repr__(self) -> str: | |
return ( | |
f"Score: {self.score} ({round(self.score.mean - self.score.std, 2)}) | " | |
f"Length: {self.length}" | |
) | |
def __len__(self) -> int: | |
return len(self.episodes) | |
def _asdict(self) -> dict: | |
return { | |
"n_episodes": len(self.episodes), | |
"score": self.score.to_dict(), | |
"length": self.length.to_dict(), | |
} | |
def write_to_tensorboard( | |
self, tb_writer: SummaryWriter, main_tag: str, global_step: Optional[int] = None | |
) -> None: | |
stats = {"mean": self.score.mean} | |
if not self.simple: | |
stats.update( | |
{ | |
"min": self.score.min, | |
"max": self.score.max, | |
"result": self.score.mean - self.score.std, | |
"n_episodes": len(self.episodes), | |
"length": self.length.mean, | |
} | |
) | |
tb_writer.add_scalars( | |
main_tag, | |
stats, | |
global_step=global_step, | |
) | |
class EpisodeAccumulator: | |
def __init__(self, num_envs: int): | |
self._episodes = [] | |
self.current_episodes = [Episode() for _ in range(num_envs)] | |
def episodes(self) -> List[Episode]: | |
return self._episodes | |
def step(self, reward: np.ndarray, done: np.ndarray) -> None: | |
for idx, current in enumerate(self.current_episodes): | |
current.score += reward[idx] | |
current.length += 1 | |
if done[idx]: | |
self._episodes.append(current) | |
self.current_episodes[idx] = Episode() | |
self.on_done(idx, current) | |
def __len__(self) -> int: | |
return len(self.episodes) | |
def on_done(self, ep_idx: int, episode: Episode) -> None: | |
pass | |
def stats(self) -> EpisodesStats: | |
return EpisodesStats(self.episodes) | |