Spaces:
Sleeping
Sleeping
# -*- coding: utf-8 -*- | |
"""encoder.ipynb | |
Automatically generated. | |
Original file is located at: | |
/home/macu/work/nbs/encoder.ipynb | |
""" | |
#default_exp encoder | |
#hide | |
%load_ext autoreload | |
%autoreload 2 | |
#export | |
import pandas as pd | |
import numpy as np | |
from fastcore.all import * | |
from tsai.callback.MVP import * | |
from tsai.imports import * | |
from tsai.models.InceptionTimePlus import InceptionTimePlus | |
from tsai.models.explainability import get_acts_and_grads | |
from tsai.models.layers import * | |
from tsai.data.validation import combine_split_data | |
#hide | |
from tsai.all import * | |
#export | |
class DCAE_torch(Module): | |
def __init__(self, c_in, seq_len, delta, nfs=[64, 32, 12], kss=[10, 5, 5], | |
pool_szs=[2,2,3], output_fsz=10): | |
""" | |
Create a Deep Convolutional Autoencoder for multivariate time series of `d` dimensions, | |
sliced with a window size of `w`. The parameter `delta` sets the number of latent features that will be | |
contained in the Dense layer of the network. The the number of features | |
maps (filters), the filter size and the pool size can also be adjusted." | |
""" | |
assert all_equal([len(x) for x in [nfs, kss, pool_szs]], np.repeat(len(nfs), 3)), \ | |
'nfs, kss, and pool_szs must have the same length' | |
assert np.prod(pool_szs) == nfs[-1], \ | |
'The number of filters in the last conv layer must be equal to the product of pool sizes' | |
assert seq_len % np.prod(pool_szs) == 0, \ | |
'The product of pool sizes must be a divisor of the window size' | |
layers = [] | |
for i in range_of(kss): | |
layers += [Conv1d(ni=nfs[i-1] if i>0 else c_in, nf=nfs[i], ks=kss[i]), | |
nn.MaxPool1d(kernel_size=pool_szs[i])] | |
self.downsample = nn.Sequential(*layers) | |
self.bottleneck = nn.Sequential(OrderedDict([ | |
('flatten', nn.Flatten()), | |
('latent_in', nn.Linear(seq_len, delta)), | |
('latent_out', nn.Linear(delta, seq_len)), | |
('reshape', Reshape(nfs[-1], seq_len // np.prod(pool_szs))) | |
])) | |
layers = [] | |
for i in reversed(range_of(kss)): | |
layers += [Conv1d(ni=nfs[i+1] if i != (len(nfs)-1) else nfs[-1], | |
nf=nfs[i], ks=kss[i]), | |
nn.Upsample(scale_factor=pool_szs[i])] | |
layers += [Conv1d(ni=nfs[0], nf=c_in, kernel_size=output_fsz)] | |
self.upsample = nn.Sequential(*layers) | |
def forward(self, x): | |
x = self.downsample(x) | |
x = self.bottleneck(x) | |
x = self.upsample(x) | |
return x | |
#hide | |
foo = torch.rand(3, 1, 48) | |
m = DCAE_torch(c_in=foo.shape[1], seq_len=foo.shape[2], delta=12) | |
m(foo).shape | |
#export | |
ENCODER_EMBS_MODULE_NAME = { | |
InceptionTimePlus: 'backbone', # for mvp based models | |
DCAE_torch: 'bottleneck.latent_in' | |
} | |
#export | |
def get_enc_embs(X, enc_learn, module=None, cpu=False, average_seq_dim=True, to_numpy=True): | |
""" | |
Get the embeddings of X from an encoder, passed in `enc_learn as a fastai | |
learner. By default, the embeddings are obtained from the last layer | |
before the model head, although any layer can be passed to `model`. | |
Input | |
- `cpu`: Whether to do the model inference in cpu of gpu (GPU recommended) | |
- `average_seq_dim`: Whether to aggregate the embeddings in the sequence dimensions | |
- `to_numpy`: Whether to return the result as a numpy array (if false returns a tensor) | |
""" | |
if cpu: | |
print("--> Get enc embs CPU") | |
enc_learn.dls.cpu() | |
enc_learn.cpu() | |
else: | |
print("--> Use CUDA |Get enc embs GPU") | |
enc_learn.dls.cuda() | |
enc_learn.cuda() | |
print("devices: ", enc_learn.dls.device, enc_learn.model.device) | |
print("Use CUDA -->") | |
if enc_learn.dls.bs == 0: enc_learn.dls.bs = 64 | |
print("--> Get enc embs bs: ", enc_learn.dls.bs) | |
aux_dl = enc_learn.dls.valid.new_dl(X=X) | |
aux_dl.bs = enc_learn.dls.bs if enc_learn.dls.bs>0 else 64 | |
module = nested_attr(enc_learn.model, | |
ENCODER_EMBS_MODULE_NAME[type(enc_learn.model)]) \ | |
if module is None else module | |
embs = [get_acts_and_grads(model=enc_learn.model, | |
modules=module, | |
x=xb[0], cpu=cpu)[0] for xb in aux_dl] | |
embs = to_concat(embs) | |
if embs.ndim == 3 and average_seq_dim: embs = embs.mean(axis=2) | |
if to_numpy: embs = embs.numpy() if cpu else embs.cpu().numpy() | |
return embs | |
#hide | |
import wandb | |
from dvats.utils import * | |
wandb_api = wandb.Api() | |
enc_artifact = wandb_api.artifact('deepvats/mvp:latest') | |
enc_learner = enc_artifact.to_obj() | |
X = torch.rand(9, 1, 48) | |
#hide | |
#slow | |
#%%time | |
embs = get_enc_embs(X, enc_learner, cpu=True) | |
test_eq(embs.shape[0], X.shape[0]) | |
embs.shape, embs.__class__ | |
#hide | |
%%time | |
embs = get_enc_embs(X, enc_learner, cpu=False, to_numpy=False) | |
test_eq(embs.shape[0], X.shape[0]) | |
embs.shape, embs.__class__, embs.device | |
#hide | |
%%time | |
embs = get_enc_embs(X, enc_learner, cpu=False, to_numpy=True) | |
test_eq(embs.shape[0], X.shape[0]) | |
embs.shape, embs.__class__ | |
#hide | |
#from nbdev.export import notebook2script | |
#notebook2script() | |
#from tsai import nb2py | |
#nb2py | |
#beep(1) |