File size: 5,266 Bytes
920ac9e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 |
import numpy as np
import pandas as pd
import h5py
import torch
from trimesh.voxel.runlength import dense_to_brle
from pathlib import Path
from matplotlib.colors import ListedColormap
from collections import defaultdict
from scipy import ndimage as ski
from typing import Any, Union, Dict, Literal
from numpy.typing import NDArray
import matplotlib as plt
import chabud
from pathlib import Path
dataset = Path("A:/CodingProjekte/DataMining/src/train_eval.hdf5")
#Es liegen 15 vortrainierte Modelle auf dem Server im Verzeichnis
#/global/public/chabud-ecml-pkdd2023/checkpoints/.
#Sie können sich verfügbaren Checkpoints wie folgt anzeigen lassen:
ckpt = Path('A:/CodingProjekte/DataMining/src/lightning_logs/version_30/checkpoints/model-epoch=25-val_iou=0.00.ckpt')
#Sie können einen beliebigen Checkpoint wie folgt laden:
mdl = chabud.FireModel.load_from_checkpoint(ckpt, map_location="cpu")
# Vom Modell `mdl` benötigte Kanäle extrahieren
# channels = np.stack([c(bands) for c in mdl.channels])
# with torch.set_grad_enabled(False):
# # Modell auf 1xlen(channels)x512x512 großen Tensor anwenden
# # D.h. wir haben eine Batchgröße von 1 (ineffizient aber einfach).
# print(channels)
# # channels = channels.astype(float)
# pred = mdl.forward(torch.Tensor(channels[np.newaxis, ...])).sigmoid() > 0.5
# # Ersten beiden Dimensionen (batch und channel) löschen und in ein numpy Array wandeln
# pred = pred[0, 0, ...].detach().numpy()
def process_dataset(scene, bands, true):
rgb = ski.exposure.adjust_gamma(np.clip(bands[..., [3, 2, 1]], 0, 1), 0.4)
channels = np.stack([c(bands) for c in mdl.channels])
with torch.set_grad_enabled(False):
pred = mdl.forward(torch.Tensor(channels[np.newaxis, ...])).sigmoid() > 0.5
pred = pred[0, 0, ...].detach().numpy()
cmap = ListedColormap(["white", "tab:brown", "tab:orange", "tab:blue"])
mask = np.zeros_like(pred, dtype=int)
mask = np.where(true & pred, 1, mask)
mask = np.where(~true & pred, 2, mask)
mask = np.where(true & ~pred, 3, mask)
true_edge = ski.feature.canny(true.astype("float")).astype("uint8")
pred_edge = ski.feature.canny(pred.astype("float")).astype("uint8")
fig, (axm, axi) = plt.subplots(ncols=2, figsize=(20, 10))
axm.imshow(mask, cmap=cmap, interpolation="nearest")
axi.imshow(rgb, interpolation="nearest")
axi.imshow(true_edge, cmap=ListedColormap(["#00000000", "tab:blue"]), interpolation="nearest")
axi.imshow(pred_edge, cmap=ListedColormap(["#00000000", "tab:orange"]), interpolation="nearest")
for ax in [axm, axi]:
ax.axes.xaxis.set_ticklabels([])
ax.axes.yaxis.set_ticklabels([])
fig.tight_layout()
fig.savefig(f"masks/{scene}-f_{dataset.attrs['fold']}.png")
plt.close()
# class RandomModel:
# def __init__(self, shape):
# self.shape = shape
# return
# def __call__(self, input):
# # input is ignored, just generate some random predictions
# return np.random.randint(0, 2, size=self.shape, dtype=bool)
class FixedModel:
def __init__(self, shape) -> None:
self.shape = shape
return
def __call__(self, input) -> Any:
# input is ignored, just generate a mask filled with zeros, with fixed pixels set to 1
mask = np.zeros(self.shape, dtype=bool)
mask[100:250, 100:250] = True
return mask
def retrieve_validation_fold(path: Union[str, Path]) -> Dict[str, NDArray]:
result = defaultdict(dict)
with h5py.File(path, 'r') as fp:
for uuid, values in fp.items():
if values.attrs['fold'] != 0:
continue
result[uuid]['post'] = values['post_fire'][...]
# result[uuid]['pre'] = values['pre_fire'][...]
return dict(result)
def compute_submission_mask(id: str, mask: NDArray):
brle = dense_to_brle(mask.astype(bool).flatten())
return {"id": id, "rle_mask": brle, "index": np.arange(len(brle))}
#der Code aus dem letzten Workshop
class PPModel:
def __init__(self,model):
self._model = model
self._model.eval()
def __call__(self,bands) -> Any:
#preprocessing
bands = bands /10000
channels = np.stack([c(bands) for c in self._model.channels])
channels = torch.Tensor(channels[np.newaxis, ...])
#Modell auswerten
with torch.set_grad_enabled(False):
mask = self._model.forward(channels).sigmoid() > 0.5
#postprocessing
mask = mask[0,0, ...].detach().numpy()
return mask
if __name__ == '__main__':
validation_fold = retrieve_validation_fold('train_eval.hdf5')
# use a list to accumulate results
result = []
# instantiate the model
# model = FixedModel(shape=(512, 512))
model = PPModel(mdl)
for uuid in validation_fold:
input_images = validation_fold[uuid]['post']
# perform the prediction
predicted = model(input_images)
# convert the prediction in RLE format
encoded_prediction = compute_submission_mask(uuid, predicted)
result.append(pd.DataFrame(encoded_prediction))
# concatenate all dataframes
submission_df = pd.concat(result)
submission_df.to_csv('predictions.csv', index=False) |