Spaces:
Sleeping
Sleeping
import os | |
import random | |
import sys | |
from functools import partial | |
import numpy | |
import numpy as np | |
import rasterio | |
import torch | |
import torch.nn.functional as F | |
from torch import nn | |
from torch.autograd import Variable | |
#@title Dataset Code | |
def print_stats(stats): | |
print_lst = list() | |
for k, v in zip(stats.keys(), stats.values()): | |
print_lst.append("{}:{}".format(k, v)) | |
print('\n', ", ".join(print_lst)) | |
def get_dates(path, n=None): | |
""" | |
extracts a list of unique dates from dataset sample | |
:param path: to dataset sample folder | |
:param n: choose n random samples from all available dates | |
:return: list of unique dates in YYYYMMDD format | |
""" | |
files = os.listdir(path) | |
dates = list() | |
for f in files: | |
f = f.split(".")[0] | |
if len(f) == 8: # 20160101 | |
dates.append(f) | |
dates = set(dates) | |
if n is not None: | |
dates = random.sample(dates, n) | |
dates = list(dates) | |
dates.sort() | |
return dates | |
def get_all_dates(path, num_max_dates): | |
""" | |
extracts a list of unique dates from dataset sample | |
:param path: to dataset sample folder | |
:param num_max_dates: choose num_max_dates random samples from all available dates | |
:return: list of unique dates in YYYYMMDD format | |
""" | |
files = os.listdir(path) | |
dates = list() | |
for f in files: | |
f = f.split(".")[0] | |
if len(f) == 8: # 20160101 | |
dates.append(f) | |
dates = set(dates) | |
if num_max_dates < len(dates): | |
dates = random.sample(dates, num_max_dates) | |
dates = list(dates) | |
dates.sort() | |
return dates | |
def get_sliding_window(pos, x_annual_time_series, win_size): | |
# x_annual_time_series to sliding window | |
sw_stop = pos + 1 | |
sw_start = sw_stop - win_size | |
if sw_start < 0: | |
# batch, channels, time_series, H, W = x_annual_time_series.shape | |
channels, time_series, H, W = x_annual_time_series.shape | |
# x_win = torch.zeros(batch, channels, win_size, H, W) | |
x_win = torch.zeros(channels, win_size, H, W) | |
# x_win[:, :, -sw_stop:, :, :] = x_annual_time_series[:, :, :sw_stop, :, :] | |
x_win[:, -sw_stop:, :, :] = x_annual_time_series[:, :sw_stop, :, :] | |
else: | |
# x_annual[batch, channels, time_series, H, W] | |
# x_win = x_annual_time_series[:, :, sw_start:sw_stop, :, :] | |
x_win = x_annual_time_series[:, sw_start:sw_stop, :, :] | |
return x_win | |
def read_classes(csv): | |
with open(csv, 'r') as f: | |
classes = f.readlines() | |
ids = list() | |
names = list() | |
reliable_start_grow = list() | |
reliable_end_grow = list() | |
unreliable_start_grow = list() | |
unreliable_end_grow = list() | |
for row in classes: | |
row = row.replace("\n", "") | |
if '|' in row: | |
cls_info = row.split('|') | |
# we can have multiple id | |
id_info = cls_info[0].split(',') | |
id_info = [int(x) for x in id_info] | |
# ids.append(int(cls_info[0])) | |
ids.append(id_info) | |
names.append(cls_info[1]) | |
if len(cls_info) > 2: | |
reliable_start_grow.append(cls_info[2]) | |
reliable_end_grow.append(cls_info[3]) | |
if len(cls_info) > 4: | |
unreliable_start_grow.append(cls_info[2]) | |
unreliable_end_grow.append(cls_info[3]) | |
return ids, names, reliable_start_grow, reliable_end_grow, \ | |
unreliable_start_grow, unreliable_end_grow | |
def get_patch_id(samples, idx_img): | |
_path = samples[idx_img] | |
if _path.endswith(os.sep): | |
_path = _path[:-1] | |
_id = os.path.basename(_path) | |
return _id, _path | |
class SentinelDailyAnnualDatasetNoLabel(torch.utils.data.Dataset): | |
''' | |
If the first label is for example "1|unknown" then this will be replaced with a 0 (zero). | |
If you want to ignore other labels, then remove them from the classes.txt file and | |
this class will assigne label 0 (zero). | |
''' | |
def __init__(self, root_dirs, years, classes_path, max_seq_length, win_size, tileids=None): | |
self.max_seq_length = max_seq_length | |
self.win_size = win_size | |
# labels read from groudtruth files (y.tif) | |
# useful field to check the available labels | |
self.unique_labels = np.array([], dtype=float) | |
self.reliable_start_grow = list() | |
self.reliable_stop_grow = list() | |
self.unreliable_start_grow = list() | |
self.unreliable_stop_grow = list() | |
cls_info = read_classes(classes_path) | |
self.classids = cls_info[0] | |
self.classes = cls_info[1] | |
if len(cls_info[2]) > 0: | |
self.reliable_start_grow = cls_info[2] | |
self.reliable_stop_grow = cls_info[3] | |
if len(cls_info[4]) > 0: | |
self.unreliable_start_grow = cls_info[4] | |
self.unreliable_stop_grow = cls_info[5] | |
if type(years) is not list: | |
years = [years] | |
self.data_dirs = years | |
if type(root_dirs) is not list: | |
root_dirs = [root_dirs] | |
self.root_dirs = [r.rstrip("/") for r in root_dirs] | |
self.name = "" | |
self.samples = list() | |
self.ndates = list() | |
for root_dir in self.root_dirs: | |
print("Reading dataset info:", root_dir) | |
self.name += os.path.basename(root_dir) + '_' | |
for d in self.data_dirs: | |
if not os.path.isdir(os.path.join(root_dir, d)): | |
sys.exit('The directory ' + os.path.join(root_dir, d) + " does not exist!") | |
stats = dict( | |
rejected_nopath=0, | |
rejected_length=0, | |
total_samples=0) | |
dirs = [] | |
if tileids is None: | |
# files = os.listdir(self.data_dirs) | |
for d in self.data_dirs: | |
dirs_name = os.listdir(os.path.join(root_dir, d)) | |
dirs_path = [os.path.join(root_dir, d, f) for f in dirs_name] | |
dirs.extend(dirs_path) | |
else: | |
# tileids e.g. "tileids/train_fold0.tileids" path of line separated tileids specifying | |
# with open(os.path.join(root_dir, tileids), 'r') as f: | |
# files = [el.replace("\n", "") for el in f.readlines()] | |
for d in self.data_dirs: | |
dirs_path = [os.path.join(root_dir, d, f) for f in tileids] | |
dirs.extend(dirs_path) | |
for path in dirs: | |
if not os.path.exists(path): | |
stats["rejected_nopath"] += 1 | |
continue | |
ndates = len(get_dates(path)) | |
stats["total_samples"] += 1 | |
self.samples.append(path) | |
self.ndates.append(ndates) | |
print_stats(stats) | |
def __len__(self): | |
return len(self.samples) | |
def __getitem__(self, idx_img): | |
patch_id, path = get_patch_id(self.samples, idx_img) | |
dates = get_all_dates(path, self.max_seq_length) | |
# print("idx_img:", idx_img) | |
# print("self.samples:", self.samples) | |
# print("path:", path) | |
# print("self.max_seq_length:", self.max_seq_length) | |
# print(dates) | |
x_annual = list() | |
for date in dates: | |
x10_img, profile = read(os.path.join(path, date + ".tif")) | |
x_annual.append(x10_img) | |
padding_size = max(0, self.max_seq_length - len(dates)) | |
for i in range(padding_size): | |
# y_dailies.append(np.zeros_like(y_dailies[0])) | |
x_annual.append(np.zeros_like(x_annual[0])) | |
dates.append(dates[-1][:4] + '1231') | |
# dates = np.pad(dates, (0, padding_size - 1), mode='edge') # padding with mirror | |
x_annual = np.array(x_annual) * 1e-4 | |
x_annual = torch.from_numpy(x_annual) | |
# permute channels with time_series (t x c x h x w) -> (c x t x h x w) | |
x_annual = x_annual.permute(1, 0, 2, 3) | |
x_annual = x_annual.float() | |
# create sliding windows from x_annual | |
x_dailies = list() | |
for i in range(len(dates)): | |
x_win = get_sliding_window(i, x_annual, self.win_size) | |
x_dailies.append(x_win) | |
x_dailies = torch.stack(x_dailies) | |
# return x_dailies, y_annual, y_dailies, dates, patch_id | |
return x_dailies, dates, path | |
#@title Models code | |
# annual model | |
class SimpleNN(nn.Module): | |
def __init__(self, opt): | |
super(SimpleNN, self).__init__() | |
self.num_classes = opt.n_classes | |
self.conv1 = nn.Conv3d( | |
opt.sample_duration, | |
# opt.sample_channels, | |
64, | |
kernel_size=(7, 3, 3), # orig: 7 | |
stride=(1, 1, 1), # orig: (1, 2, 2) | |
padding=(3, 1, 1), # orig: (3, 3, 3) | |
bias=False) | |
self.conv2 = nn.Conv3d( | |
64, | |
128, | |
# kernel_size=(opt.sample_channels-opt.n_classes+1, 3, 3), # orig: 7 | |
kernel_size=(3, 3, 3), # orig: 7 | |
stride=(1, 1, 1), # orig: (1, 2, 2) | |
padding=(1, 1, 1), # orig: (3, 3, 3) | |
bias=False) | |
self.conv3 = nn.Conv3d( | |
128, | |
1, | |
# kernel_size=(opt.sample_channels-opt.n_classes+1, 3, 3), # orig: 7 | |
kernel_size=(3, 3, 3), # orig: 7 | |
stride=(1, 1, 1), # orig: (1, 2, 2) | |
padding=(1, 1, 1), # orig: (3, 3, 3) | |
bias=False) | |
def upsample3d(x, d, h, w): | |
return F.interpolate(x, size=(d, h, w), mode='trilinear', align_corners=True) | |
def forward(self, x): | |
_, _, _, h, w = x.shape | |
out = torch.relu(self.conv1(x)) | |
out = self.upsample3d(out, self.num_classes, h, w) | |
out = torch.relu(self.conv2(out)) | |
out = self.conv3(out) | |
out = out.squeeze(1) | |
return out, out | |
# daily | |
def get_fine_tuning_parameters(model, ft_begin_index): | |
if ft_begin_index == 0: | |
return model.parameters() | |
ft_module_names = [] | |
for i in range(ft_begin_index, 5): | |
ft_module_names.append('layer{}'.format(i)) | |
ft_module_names.append('fc') | |
parameters = [] | |
for k, v in model.named_parameters(): | |
for ft_module in ft_module_names: | |
if ft_module in k: | |
parameters.append({'params': v}) | |
break | |
else: | |
parameters.append({'params': v, 'lr': 0.0}) | |
return parameters | |
def downsample_basic_block(x, planes, stride): | |
out = F.avg_pool3d(x, kernel_size=1, stride=stride) | |
zero_pads = torch.Tensor( | |
out.size(0), planes - out.size(1), out.size(2), out.size(3), | |
out.size(4)).zero_() | |
if isinstance(out.data, torch.cuda.FloatTensor): | |
zero_pads = zero_pads.cuda() | |
out = Variable(torch.cat([out.data, zero_pads], dim=1)) | |
return out | |
class Bottleneck(nn.Module): | |
expansion = 4 | |
def __init__(self, inplanes, planes, stride=1, downsample=None): | |
super(Bottleneck, self).__init__() | |
self.conv1 = nn.Conv3d(inplanes, planes, kernel_size=1, bias=False) | |
self.bn1 = nn.BatchNorm3d(planes) | |
self.conv2 = nn.Conv3d( | |
planes, planes, kernel_size=3, stride=stride, padding=1, bias=False) | |
self.bn2 = nn.BatchNorm3d(planes) | |
self.conv3 = nn.Conv3d(planes, planes * 4, kernel_size=1, bias=False) | |
self.bn3 = nn.BatchNorm3d(planes * 4) | |
self.relu = nn.ReLU(inplace=True) | |
self.downsample = downsample | |
self.stride = stride | |
def forward(self, x): | |
residual = x | |
out = self.conv1(x) | |
out = self.bn1(out) | |
out = self.relu(out) | |
out = self.conv2(out) | |
out = self.bn2(out) | |
out = self.relu(out) | |
out = self.conv3(out) | |
out = self.bn3(out) | |
if self.downsample is not None: | |
residual = self.downsample(x) | |
out += residual | |
out = self.relu(out) | |
return out | |
class ResNet(nn.Module): | |
def __init__(self, | |
block, | |
layers, | |
opt, | |
# sample_size, # Height and width of inputs es. 112 x 112 | |
sample_duration, # Temporal duration of inputs, es. 16 | |
# shortcut_type='B', | |
# num_classes=400 | |
): | |
super(ResNet, self).__init__() | |
self.inplanes = 64 | |
kernel0 = min(7, sample_duration) | |
padding0 = int(kernel0 / 2) | |
self.conv1 = nn.Conv3d( | |
# opt.sample_duration, | |
opt.sample_channels, | |
64, | |
kernel_size=(kernel0, 3, 3), # orig: 7 | |
stride=(1, 1, 1), # orig: (1, 2, 2) | |
padding=(padding0, 1, 1), # orig: (3, 3, 3) | |
bias=False) | |
self.bn1 = nn.BatchNorm3d(64) | |
self.relu = nn.ReLU(inplace=True) | |
self.maxpool = nn.MaxPool3d(kernel_size=(3, 3, 3), stride=2, padding=1) | |
self.layer1 = self._make_layer(block, 64, layers[0], opt.resnet_shortcut) | |
self.layer2 = self._make_layer(block, 128, layers[1], opt.resnet_shortcut, stride=2) | |
self.layer3 = self._make_layer(block, 256, layers[2], opt.resnet_shortcut, stride=2) | |
self.layer4 = self._make_layer(block, 512, layers[3], opt.resnet_shortcut, stride=2) | |
# last_duration = int(math.ceil(sample_duration / 16)) | |
# last_size = int(math.ceil(sample_size / 32)) | |
# self.avgpool = nn.AvgPool3d( | |
# (last_duration, last_size, last_size), stride=1) | |
# self.fc = nn.Linear(512 * block.expansion, num_classes) | |
for m in self.modules(): | |
if isinstance(m, nn.Conv3d): | |
m.weight = nn.init.kaiming_normal_(m.weight, mode='fan_out') | |
elif isinstance(m, nn.BatchNorm3d): | |
m.weight.data.fill_(1) | |
m.bias.data.zero_() | |
def _make_layer(self, block, planes, blocks, shortcut_type, stride=1): | |
downsample = None | |
if stride != 1 or self.inplanes != planes * block.expansion: | |
if shortcut_type == 'A': | |
downsample = partial( | |
downsample_basic_block, | |
planes=planes * block.expansion, | |
stride=stride) | |
else: | |
downsample = nn.Sequential( | |
nn.Conv3d( | |
self.inplanes, | |
planes * block.expansion, | |
kernel_size=1, | |
stride=stride, | |
bias=False), nn.BatchNorm3d(planes * block.expansion)) | |
layers = [] | |
layers.append(block(self.inplanes, planes, stride, downsample)) | |
self.inplanes = planes * block.expansion | |
for i in range(1, blocks): | |
layers.append(block(self.inplanes, planes)) | |
return nn.Sequential(*layers) | |
def forward(self, x): | |
x = self.conv1(x) | |
x = self.bn1(x) | |
x = self.relu(x) | |
x = self.maxpool(x) | |
low_level_feat1 = x | |
x = self.layer1(x) | |
low_level_feat2 = x | |
x = self.layer2(x) | |
low_level_feat3 = x | |
x = self.layer3(x) | |
low_level_feat4 = x | |
x = self.layer4(x) | |
low_level_feat5 = x | |
return [low_level_feat1, low_level_feat2, low_level_feat3, low_level_feat4, low_level_feat5] | |
# x = self.avgpool(x) | |
# x = x.view(x.size(0), -1) | |
# x = self.fc(x) | |
# return x | |
def resnet50(opt, sample_duration): | |
"""Constructs a ResNet-50 model. | |
""" | |
model = ResNet(Bottleneck, [3, 4, 6, 3], opt, sample_duration) | |
return model | |
class FPN(nn.Module): | |
def __init__(self, opt, first_batch, sample_duration): | |
super(FPN, self).__init__() | |
# self.first_run = True | |
self.in_planes = 64 | |
self.num_classes = opt.n_classes | |
model = resnet50(opt, sample_duration) | |
self.back_bone = nn.DataParallel(model, device_ids=None) | |
# if opt.pretrain_path: | |
# print('loading pretrained model {}'.format(opt.pretrain_path)) | |
# pretrain = torch.load(opt.pretrain_path) | |
# assert opt.arch == pretrain['arch'] | |
# | |
# model.load_state_dict(pretrain['state_dict']) | |
# | |
# if opt.model == 'densenet': | |
# model.module.classifier = nn.Linear( | |
# model.module.classifier.in_features, opt.n_finetune_classes) | |
# model.module.classifier = model.module.classifier.cuda() | |
# else: | |
# model.module.fc = nn.Linear(model.module.fc.in_features, | |
# opt.n_finetune_classes) | |
# model.module.fc = model.module.fc.cuda() | |
# | |
# parameters = get_fine_tuning_parameters(model, opt.ft_begin_index) | |
# self.back_bone, parameters = generate_model(opt, sample_duration) | |
# Top layer | |
self.toplayer = None # nn.Conv3d(512, 256, kernel_size=1, stride=1, padding=0) # Reduce channels | |
# Lateral layers | |
self.latlayer1 = None # nn.Conv3d(256, 256, kernel_size=1, stride=1, padding=0 | |
self.latlayer2 = None # nn.Conv3d(128, 256, kernel_size=1, stride=1, padding=0) | |
self.latlayer3 = None # nn.Conv3d(64, 256, kernel_size=1, stride=1, padding=0) | |
# Addendum layers to reduce channels before sum | |
self.sumlayer1 = None | |
self.sumlayer2 = None | |
self.sumlayer3 = None | |
# Semantic branch | |
self.conv2_3d_p5 = None | |
self.conv2_3d_p4 = None | |
self.conv2_3d_p3 = None | |
self.conv2_3d_p2 = None | |
self.iam_joking(first_batch, not opt.no_cuda) | |
self.semantic_branch_2d = nn.Conv2d(256, 128, kernel_size=3, stride=1, padding=1) | |
self.conv2_2d = nn.Conv2d(256, 256, kernel_size=3, stride=1, padding=1) | |
# self.conv3 = nn.Conv2d(128, self.num_classes, kernel_size=1, stride=1, padding=0) | |
self.conv3 = nn.Conv2d(128, 64, kernel_size=1, stride=1, padding=0) | |
# opt.sample_duration is the number of samples taken from the time series | |
self.conv4out = nn.Conv2d(64, opt.sample_duration, kernel_size=3, stride=1, padding=1) | |
self.conv5out = nn.Conv2d(opt.sample_duration, self.num_classes, kernel_size=3, stride=1, padding=1) | |
# num_groups, num_channels | |
self.gn1 = nn.GroupNorm(128, 128) | |
self.gn2 = nn.GroupNorm(256, 256) | |
def iam_joking(self, x, use_cuda): | |
low_level_features = self.back_bone(x) | |
c1 = low_level_features[0] | |
c2 = low_level_features[1] | |
c3 = low_level_features[2] | |
c4 = low_level_features[3] | |
c5 = low_level_features[4] | |
# Top layer | |
self.toplayer = nn.Conv3d(c5.size()[1], c5.size()[1], kernel_size=1, stride=1, padding=0) # Reduce channels | |
# Lateral layers | |
self.latlayer1 = nn.Conv3d(c4.size()[1], c4.size()[1], kernel_size=1, stride=1, padding=0) | |
self.latlayer2 = nn.Conv3d(c3.size()[1], c3.size()[1], kernel_size=1, stride=1, padding=0) | |
self.latlayer3 = nn.Conv3d(c2.size()[1], c2.size()[1], kernel_size=1, stride=1, padding=0) | |
# Addendum layers to reduce channels | |
self.sumlayer1 = nn.Conv3d(c5.size()[1], c4.size()[1], kernel_size=1, stride=1, padding=0) # Reduce channels | |
self.sumlayer2 = nn.Conv3d(c4.size()[1], c3.size()[1], kernel_size=1, stride=1, padding=0) | |
self.sumlayer3 = nn.Conv3d(c3.size()[1], c2.size()[1], kernel_size=1, stride=1, padding=0) | |
if use_cuda: | |
self.toplayer = self.toplayer.cuda() | |
self.latlayer1 = self.latlayer1.cuda() | |
self.latlayer2 = self.latlayer2.cuda() | |
self.latlayer3 = self.latlayer3.cuda() | |
self.sumlayer1 = self.sumlayer1.cuda() | |
self.sumlayer2 = self.sumlayer2.cuda() | |
self.sumlayer3 = self.sumlayer3.cuda() | |
# Top-down | |
p5 = self.toplayer(c5) | |
p4 = self._upsample_add(self.sumlayer1(p5), self.latlayer1(c4)) | |
p3 = self._upsample_add(self.sumlayer2(p4), self.latlayer2(c3)) | |
p2 = self._upsample_add(self.sumlayer3(p3), self.latlayer3(c2)) | |
# calculate the sizes so that dimension c becomes 1 | |
self.conv2_3d_p5 = nn.Conv3d(p5.size()[1], 256, kernel_size=(p5.size()[2] + 2, 3, 3), stride=1, padding=1) | |
self.conv2_3d_p4 = nn.Conv3d(p4.size()[1], 256, kernel_size=(p4.size()[2] + 2, 3, 3), stride=1, padding=1) | |
self.conv2_3d_p3 = nn.Conv3d(p3.size()[1], 128, kernel_size=(p3.size()[2] + 2, 3, 3), stride=1, padding=1) | |
self.conv2_3d_p2 = nn.Conv3d(p2.size()[1], 128, kernel_size=(p2.size()[2] + 2, 3, 3), stride=1, padding=1) | |
def _upsample3d(self, x, d, h, w): | |
return F.interpolate(x, size=(d, h, w), mode='trilinear', align_corners=True) | |
def _upsample2d(self, x, h, w): | |
return F.interpolate(x, size=(h, w), mode='bilinear', align_corners=True) | |
def _make_layer(self, Bottleneck, planes, num_blocks, stride): | |
strides = [stride] + [1] * (num_blocks - 1) | |
layers = [] | |
for stride in strides: | |
layers.append(Bottleneck(self.in_planes, planes, stride)) | |
self.in_planes = planes * Bottleneck.expansion | |
return nn.Sequential(*layers) | |
def _upsample_add(self, x, y): | |
'''Upsample and add two feature maps. | |
Args: | |
x: (Variable) top feature map to be upsampled. | |
y: (Variable) lateral feature map. | |
Returns: | |
(Variable) added feature map. | |
Note in PyTorch, when input size is odd, the upsampled feature map | |
with `F.upsample(..., scale_factor=2, mode='nearest')` | |
maybe not equal to the lateral feature map size. | |
e.g. | |
original input size: [N,_,15,15] -> | |
conv2d feature map size: [N,_,8,8] -> | |
upsampled feature map size: [N,_,16,16] | |
So we choose bilinear upsample which supports arbitrary output sizes. | |
''' | |
_, _, D, H, W = y.size() | |
return F.interpolate(x, size=(D, H, W), mode='trilinear', align_corners=True) + y | |
def forward(self, x): | |
# Bottom-up using backbone | |
low_level_features = self.back_bone(x) | |
# c1 = low_level_features[0] | |
c2 = low_level_features[1] | |
c3 = low_level_features[2] | |
c4 = low_level_features[3] | |
c5 = low_level_features[4] | |
# Top-down | |
p5 = self.toplayer(c5) | |
p4 = self._upsample_add( | |
torch.relu(self.sumlayer1(p5)), torch.relu(self.latlayer1(c4))) # p5 interpolation to the size of c4 | |
p3 = self._upsample_add( | |
torch.relu(self.sumlayer2(p4)), torch.relu(self.latlayer2(c3))) | |
p2 = self._upsample_add( | |
torch.relu(self.sumlayer3(p3)), torch.relu(self.latlayer3(c2))) | |
# Smooth | |
# p4 = F.relu(self.smooth1(p4)) | |
# p3 = F.relu(self.smooth2(p3)) | |
# p2 = F.relu(self.smooth3(p2)) | |
# Semantic | |
_, _, _, h, w = p2.size() | |
# 256->256 | |
s5 = self.conv2_3d_p5(p5) | |
s5 = torch.squeeze(s5, 2) # squeeze only dim 2 to avoid to remove the batch dimension | |
s5 = self._upsample2d(torch.relu(self.gn2(s5)), h, w) | |
# 256->256 [32, 256, 24, 24] | |
s5 = self._upsample2d(torch.relu(self.gn2(self.conv2_2d(s5))), h, w) | |
# 256->128 [32, 128, 24, 24] | |
s5 = self._upsample2d(torch.relu(self.gn1(self.semantic_branch_2d(s5))), h, w) | |
# 256->256 p4:[32, 256, 4, 6, 6] -> s4:[32, 256, 1, 6, 6] | |
s4 = self.conv2_3d_p4(p4) | |
s4 = torch.squeeze(s4, 2) # s4:[32, 256, 6, 6] | |
s4 = self._upsample2d(torch.relu(self.gn2(s4)), h, w) # s4:[32, 256, 24, 24] | |
# 256->128 s4:[32, 128, 24, 24] | |
s4 = self._upsample2d(torch.relu(self.gn1(self.semantic_branch_2d(s4))), h, w) | |
# 256->128 | |
s3 = self.conv2_3d_p3(p3) | |
s3 = torch.squeeze(s3, 2) | |
s3 = self._upsample2d(torch.relu(self.gn1(s3)), h, w) | |
s2 = self.conv2_3d_p2(p2) | |
s2 = torch.squeeze(s2, 2) | |
s2 = self._upsample2d(torch.relu(self.gn1(s2)), h, w) | |
out = self._upsample2d(self.conv3(s2 + s3 + s4 + s5), 2 * h, 2 * w) | |
# introducing MSELoss on NDVI signal | |
out_cai = torch.sigmoid(self.conv4out(out)) # for Class Activation Interval | |
out_cls = self.conv5out(out_cai) # for Classification | |
return out_cai, out_cls | |
def ids_to_labels(dataloader, pred_labels): | |
new = np.zeros(pred_labels.shape, int) | |
for cl, i in zip(dataloader.dataset.classids, range(len(dataloader.dataset.classids))): | |
if type(cl) is list: | |
new[pred_labels == i] = cl[0] | |
# for c in cl: | |
# new[pred_labels == c] = i | |
else: | |
new[pred_labels == i] = cl | |
return new | |
def resume(path, model, optimizer): | |
if torch.cuda.is_available(): | |
snapshot = torch.load(path) | |
else: | |
snapshot = torch.load(path, map_location="cpu") | |
print("Loaded snapshot from", path) | |
model_state = snapshot.pop('model_state', snapshot) | |
optimizer_state = snapshot.pop('optimizer_state', None) | |
if model is not None and model_state is not None: | |
print("loading model...") | |
model.load_state_dict(model_state) | |
if optimizer is not None and optimizer_state is not None: | |
optimizer.load_state_dict(optimizer_state) | |
return snapshot | |
def read(file): | |
with rasterio.open(file) as src: | |
return src.read(), src.profile | |
#@title Setup Parameters | |
opt = type('test', (), {})() | |
opt.gpu_id = '' | |
opt.no_cuda = True | |
opt.n_classes = 8 | |
opt.model_depth = 50 | |
opt.batch_size = 1 | |
opt.sample_duration = 71 | |
opt.sample_channels = 9 | |
opt.win_size = 5 | |
opt.model = 'resnet' | |
opt.resnet_shortcut = 'B' | |
# opt.n_epochs = 20 | |
# opt.learning_rate = 0.01 | |
# opt.loss = 'ce' | |
# opt.optimizer = 'sgd' | |
# opt.export_only | |
# opt.test_tile = 'test_small.tileids' | |
opt.result_path = 'results' | |
input_data_folder = 'demo_data/lombardia' #@param {type:"string"} | |
opt.root_path = [input_data_folder] | |
opt.years = ['2019'] | |
opt.classes_path = 'demo_data/classes-newmapping.txt' | |
opt.resume_path = 'demo_data' | |
opt.pretrain_path = '' | |
opt.workers = 1 | |
os.environ['CUDA_VISIBLE_DEVICES'] = opt.gpu_id | |
if opt.no_cuda: | |
os.environ['CUDA_VISIBLE_DEVICES'] = "" | |
classes_color_map = np.array([[255/255.0, 255/255.0, 255/255.0], | |
[150/255.0, 150/255.0, 150/255.0], # Gray - 1 - Unknow cropland | |
[255/255.0, 0, 0], # Red - 2 - Other cereals | |
[161/255.0, 0, 0], | |
[0., 255/255.0, 0], # Green - 4 - Woods and other tree crops | |
[255/255.0, 240/255.0, 0], | |
[130/255.0, 120/255.0, 240/255.0], | |
[255/255.0, 136/255.0, 0], # Orange - 7 - Forage | |
[250, 190, 190], | |
[255/255.0, 255/255.0, 0], # Yellow - 9 - Corn | |
[0, 250, 154], | |
[64, 224, 208], | |
[0/255.0, 255/255.0, 255/255.0], # Turchese - 12 - Rice | |
[0.58823529, 0.39215686, 0.39215686], | |
[139/255, 35/255, 35/255], | |
[139/255, 35/255, 35/255], | |
[139/255, 35/255, 35/255], | |
[0, 0, 0], # Black - 17 - No Arable land | |
]) | |
color_labels = [classes_color_map[0], classes_color_map[1], classes_color_map[2], classes_color_map[4], | |
classes_color_map[7], classes_color_map[9], classes_color_map[12], classes_color_map[17]] | |
labels_map = ['Unknown', 'Unknown cropland', 'Other cereals', 'Woods and other tree crops', 'Forage', 'Corn', 'Rice', | |
'No arable land'] | |