|
|
|
|
|
''' |
|
@File : DatasetAnalyzer.py |
|
@Time : 2022/04/08 10:10:12 |
|
@Author : zzubqh |
|
@Version : 1.0 |
|
@Contact : baiqh@microport.com |
|
@License : (C)Copyright 2017-2018, Liugroup-NLPR-CASIA |
|
@Desc : None |
|
''' |
|
|
|
|
|
|
|
import numpy as np |
|
import os |
|
import SimpleITK as sitk |
|
from multiprocessing import Pool |
|
|
|
|
|
class DatasetAnalyzer(object): |
|
""" |
|
接收一个类似train.md的文件 |
|
格式:**/ct_file.nii.gz, */seg_file.nii.gz |
|
""" |
|
def __init__(self, annotation_file, num_processes=4): |
|
self.dataset = [] |
|
self.num_processes = num_processes |
|
with open(annotation_file, 'r', encoding='utf-8') as rf: |
|
for line_item in rf: |
|
items = line_item.strip().split(',') |
|
self.dataset.append({'ct': items[0], 'mask': items[1]}) |
|
|
|
print('total load {0} ct files'.format(len(self.dataset))) |
|
|
|
def _get_effective_data(self, dataset_item: dict): |
|
itk_img = sitk.ReadImage(dataset_item['ct']) |
|
itk_mask = sitk.ReadImage(dataset_item['mask']) |
|
|
|
img_np = sitk.GetArrayFromImage(itk_img) |
|
mask_np = sitk.GetArrayFromImage(itk_mask) |
|
|
|
mask_index = mask_np > 0 |
|
effective_data = img_np[mask_index][::10] |
|
return list(effective_data) |
|
|
|
def compute_stats(self): |
|
if len(self.dataset) == 0: |
|
return np.nan, np.nan, np.nan, np.nan, np.nan, np.nan, np.nan |
|
|
|
process_pool = Pool(self.num_processes) |
|
data_value = process_pool.map(self._get_effective_data, self.dataset) |
|
|
|
print('sub process end, get {0} case data'.format(len(data_value))) |
|
voxels = [] |
|
for value in data_value: |
|
voxels += value |
|
|
|
median = np.median(voxels) |
|
mean = np.mean(voxels) |
|
sd = np.std(voxels) |
|
mn = np.min(voxels) |
|
mx = np.max(voxels) |
|
percentile_99_5 = np.percentile(voxels, 99.5) |
|
percentile_00_5 = np.percentile(voxels, 00.5) |
|
|
|
process_pool.close() |
|
process_pool.join() |
|
return median, mean, sd, mn, mx, percentile_99_5, percentile_00_5 |
|
|
|
|
|
if __name__ == '__main__': |
|
import tqdm |
|
annotation = r'/home/code/Dental/Segmentation/dataset/tooth_label.md' |
|
analyzer = DatasetAnalyzer(annotation, num_processes=8) |
|
out_dir = r'/data/Dental/SegTrainingClipdata' |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for item in tqdm.tqdm(analyzer.dataset): |
|
ct_file = item['ct'] |
|
out_name = os.path.basename(ct_file) |
|
out_path = os.path.join(out_dir, out_name) |
|
itk_img = sitk.ReadImage(item['ct']) |
|
img_np = sitk.GetArrayFromImage(itk_img) |
|
data = np.clip(img_np, 181.0, 7578.0) |
|
clip_img = sitk.GetImageFromArray(data) |
|
clip_img.CopyInformation(itk_img) |
|
sitk.WriteImage(clip_img, out_path) |
|
|
|
|
|
|
|
|