NebulOS / src /hw_nats_fast_interface.py
fracapuano
add files via upload
81a5d0a
raw
history blame contribute delete
No virus
9.72 kB
from typing import Set, Text, List, Tuple, Dict
from itertools import chain
from .utils import get_project_root
import numpy as np
import json
class HW_NATS_FastInterface:
def __init__(self,
datapath:str=str(get_project_root()) + "/data/nebuloss.json",
indexpath:str=str(get_project_root()) + "/data/nats_arch_index.json",
dataset:str="cifar10",
device:Text="edgegpu",
scores_sample_size:int=1e3):
AVAILABLE_DATASETS = ["cifar10", "cifar100", "ImageNet16-120"]
AVAILABLE_DEVICES = ["edgegpu", "eyeriss", "fpga"]
# catch input errors
if dataset not in AVAILABLE_DATASETS:
raise ValueError(f"Dataset {dataset} not in {AVAILABLE_DATASETS}!")
if device not in AVAILABLE_DEVICES and device is not None:
raise ValueError(f"Device {device} not in {AVAILABLE_DEVICES}!")
if isinstance(datapath, str):
# parent init
with open(datapath, "r") as datafile:
self._data = {
int(key): value for key, value in json.load(datafile).items()
}
elif isinstance(datapath, dict):
self._data = {
int(key): value for key, value in datapath.items()
}
else:
raise ValueError(f"Datapath must be either a string or a dictionary, not {type(datapath)}")
# importing the "/"-architecture <-> index from a json file
with open(indexpath, "r") as indexfile:
self._architecture_to_index = json.load(indexfile)
# store dataset field
self._dataset = dataset
self.target_device = device
# architectures to use to estimate mean and std for scores normalization
self.random_indices = np.random.choice(len(self), int(scores_sample_size), replace=False)
def __len__(self)->int:
"""Number of architectures in considered search space."""
return len(self._data)
def __getitem__(self, idx:int) -> Dict:
"""Returns (untrained) network corresponding to index `idx`"""
return self._data[idx]
def __iter__(self):
"""Iterator method"""
self.iteration_index = 0
return self
def __next__(self):
if self.iteration_index >= self.__len__():
raise StopIteration
# access current element
net = self[self.iteration_index]
# update the iteration index
self.iteration_index += 1
return net
@property
def data(self):
return self._data
@property
def architecture_to_index(self):
return self._architecture_to_index
@property
def name(self)->Text:
return "nats"
@property
def ordered_all_ops(self)->List[Text]:
"""NASTS Bench available operations, ordered (without any precise logic)"""
return ['skip_connect', 'nor_conv_1x1', 'nor_conv_3x3', 'none', 'avg_pool_3x3']
@property
def architecture_len(self)->int:
"""Returns the number of different operations that uniquevoly define a given architecture"""
return 6
@property
def all_ops(self)->Set[Text]:
"""NASTS Bench available operations."""
return {'skip_connect', 'nor_conv_1x1', 'nor_conv_3x3', 'none', 'avg_pool_3x3'}
@property
def dataset(self)->Text:
return self._dataset
@dataset.setter
def change_dataset(self, new_dataset:Text)->None:
"""
Updates the current dataset with a new one.
Raises ValueError when new_dataset is not one of ["cifar10", "cifar100", "imagenet16-120"]
"""
if new_dataset.lower() in self.NATS_datasets:
self._dataset = new_dataset
else:
raise ValueError(f"New dataset {new_dataset} not in {self.NATS_datasets}")
def get_score_mean(self, score_name:Text)->float:
"""
Calculate the mean score value across the dataset for the given score name.
Args:
score_name (Text): The name of the score for which to calculate the mean.
Returns:
float: The mean score value.
Note:
The score values are retrieved from each data point in the dataset and averaged.
"""
if not hasattr(self, f"mean_{score_name}"):
# compute the mean on 1000 instances
mean_score = np.mean([self[i][self.dataset][score_name] for i in self.random_indices])
# set the mean score accordingly
setattr(self, f"mean_{score_name}", mean_score)
self.get_score_mean(score_name=score_name)
return getattr(self, f"mean_{score_name}")
def get_score_std(self, score_name: Text) -> float:
"""
Calculate the standard deviation of the score values across the dataset for the given score name.
Args:
score_name (Text): The name of the score for which to calculate the standard deviation.
Returns:
float: The standard deviation of the score values.
Note:
The score values are retrieved from each data point in the dataset, and the standard deviation is calculated.
"""
if not hasattr(self, f"std_{score_name}"):
# compute the mean on 1000 instances
std_score = np.std([self[i][self.dataset][score_name] for i in self.random_indices])
# set the mean score accordingly
setattr(self, f"std_{score_name}", std_score)
self.get_score_std(score_name=score_name)
return getattr(self, f"std_{score_name}")
def generate_random_samples(self, n_samples:int=10)->Tuple[List[Text], List[int]]:
"""Generate a group of architectures chosen at random"""
idxs = np.random.choice(self.__len__(), size=n_samples, replace=False)
cell_structures = [self[i]["architecture_string"] for i in idxs]
# return tinynets, cell_structures_string and the unique indices of the networks
return cell_structures, idxs
def list_to_architecture(self, input_list:List[str])->str:
"""
Reformats genotype as architecture string.
This function clearly is specific for this very search space.
"""
return "|{}|+|{}|{}|+|{}|{}|{}|".format(*input_list)
def architecture_to_list(self, architecture_string:Text)->List[Text]:
"""Turn architectures string into genotype list
Args:
architecture_string(str): String characterising the cell structure only.
Returns:
List[str]: List containing the operations in the input cell structure.
In a genetic-algorithm setting, this description represents a genotype.
"""
# divide the input string into different levels
subcells = architecture_string.split("+")
# divide into different nodes to retrieve ops
ops = chain(*[subcell.split("|")[1:-1] for subcell in subcells])
return list(ops)
def list_to_accuracy(self, input_list:List[str])->float:
"""Returns the test accuracy of an input list representing the architecture.
This list contains the operations.
Args:
input_list (List[str]): List of operations inside the architecture.
Returns:
float: Test accuracy (after 200 training epochs).
"""
# retrieving the index associated to this particular architecture
arch_index = self.architecture_to_index["/".join(input_list)]
return self[arch_index][self.dataset]["test_accuracy"]
def architecture_to_accuracy(self, architecture_string:str)->float:
"""Returns the test accuracy of an architecture string.
The architecture <-> index map is normalized to be as general as possible, hence some (minor)
input processing is needed.
Args:
architecture_string (str): Architecture string.
Returns:
float: Test accuracy (after 200 training epochs).
"""
# retrieving the index associated to this particular architecture
arch_index = self.architecture_to_index["/".join(self.architecture_to_list(architecture_string))]
return self[arch_index][self.dataset]["test_accuracy"]
def list_to_score(self, input_list:List[Text], score:Text)->float:
"""Returns the value of `score` of an input list representing the architecture.
This list contains the operations.
Args:
input_list (List[Text]): List of operations inside the architecture.
score (Text): Score of interest.
Returns:
float: Score value for `input_list`.
"""
arch_index = self.architecture_to_index["/".join(input_list)]
return self[arch_index][self.dataset].get(score, None)
def architecture_to_score(self, architecture_string:Text, score:Text)->float:
"""Returns the value of `score` of an architecture string.
The architecture <-> index map is normalized to be as general as possible, hence some (minor)
input processing is needed.
Args:
architecture_string (Text): Architecture string.
score (Text): Score of interest.
Returns:
float: Score value for `architecture_string`.
"""
# retrieving the index associated to this particular architecture
arch_index = self.architecture_to_index["/".join(self.architecture_to_list(architecture_string))]
return self[arch_index][self.dataset].get(score, None)