Spaces:
Running
Running
import copy | |
import numpy as np | |
import pandas as pd | |
from sklearn.model_selection import train_test_split | |
class DataFrame(object): | |
"""Minimal pd.DataFrame analog for handling n-dimensional numpy matrices with additional | |
support for shuffling, batching, and train/test splitting. | |
Args: | |
columns: List of names corresponding to the matrices in data. | |
data: List of n-dimensional data matrices ordered in correspondence with columns. | |
All matrices must have the same leading dimension. Data can also be fed a list of | |
instances of np.memmap, in which case RAM usage can be limited to the size of a | |
single batch. | |
""" | |
def __init__(self, columns, data): | |
assert len(columns) == len(data), 'columns length does not match data length' | |
lengths = [mat.shape[0] for mat in data] | |
assert len(set(lengths)) == 1, 'all matrices in data must have same first dimension' | |
self.length = lengths[0] | |
self.columns = columns | |
self.data = data | |
self.dict = dict(zip(self.columns, self.data)) | |
self.idx = np.arange(self.length) | |
def shapes(self): | |
return pd.Series(dict(zip(self.columns, [mat.shape for mat in self.data]))) | |
def dtypes(self): | |
return pd.Series(dict(zip(self.columns, [mat.dtype for mat in self.data]))) | |
def shuffle(self): | |
np.random.shuffle(self.idx) | |
def train_test_split(self, train_size, random_state=np.random.randint(1000), stratify=None): | |
train_idx, test_idx = train_test_split( | |
self.idx, | |
train_size=train_size, | |
random_state=random_state, | |
stratify=stratify | |
) | |
train_df = DataFrame(copy.copy(self.columns), [mat[train_idx] for mat in self.data]) | |
test_df = DataFrame(copy.copy(self.columns), [mat[test_idx] for mat in self.data]) | |
return train_df, test_df | |
def batch_generator(self, batch_size, shuffle=True, num_epochs=10000, allow_smaller_final_batch=False): | |
epoch_num = 0 | |
while epoch_num < num_epochs: | |
if shuffle: | |
self.shuffle() | |
for i in range(0, self.length + 1, batch_size): | |
batch_idx = self.idx[i: i + batch_size] | |
if not allow_smaller_final_batch and len(batch_idx) != batch_size: | |
break | |
yield DataFrame( | |
columns=copy.copy(self.columns), | |
data=[mat[batch_idx].copy() for mat in self.data] | |
) | |
epoch_num += 1 | |
def iterrows(self): | |
for i in self.idx: | |
yield self[i] | |
def mask(self, mask): | |
return DataFrame(copy.copy(self.columns), [mat[mask] for mat in self.data]) | |
def concat(self, other_df): | |
mats = [] | |
for column in self.columns: | |
mats.append(np.concatenate([self[column], other_df[column]], axis=0)) | |
return DataFrame(copy.copy(self.columns), mats) | |
def items(self): | |
return self.dict.items() | |
def __iter__(self): | |
return self.dict.items().__iter__() | |
def __len__(self): | |
return self.length | |
def __getitem__(self, key): | |
if isinstance(key, str): | |
return self.dict[key] | |
elif isinstance(key, int): | |
return pd.Series(dict(zip(self.columns, [mat[self.idx[key]] for mat in self.data]))) | |
def __setitem__(self, key, value): | |
assert value.shape[0] == len(self), 'matrix first dimension does not match' | |
if key not in self.columns: | |
self.columns.append(key) | |
self.data.append(value) | |
self.dict[key] = value | |