File size: 9,595 Bytes
bf53f45
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
import argparse
import json
import os
from collections import defaultdict
from typing import Dict, List, Optional

import cv2
import tqdm
from mivolo.data.data_reader import PictureInfo, get_all_files
from mivolo.modeling.yolo_detector import Detector, PersonAndFaceResult
from preparation_utils import get_additional_bboxes, get_main_face, save_annotations
from prepare_fairface import find_persons_on_image


def get_im_name(img_path):
    im_name = img_path.split("/")[-1]
    im_name = im_name.replace("é", "e").replace("é", "e")
    im_name = im_name.replace("ó", "o").replace("ó", "o")
    im_name = im_name.replace("å", "a").replace("å", "a")
    im_name = im_name.replace("ñ", "n").replace("ñ", "n")
    im_name = im_name.replace("ö", "o").replace("ö", "o")
    im_name = im_name.replace("ä", "a").replace("ä", "a")
    im_name = im_name.replace("ü", "u").replace("ü", "u")
    im_name = im_name.replace("á", "a").replace("á", "a")
    im_name = im_name.replace("ë", "e").replace("ë", "e")
    im_name = im_name.replace("í", "i").replace("í", "i")

    return im_name


def read_json_annotations(annotations: List[str], splits: List[str]) -> Dict[str, dict]:
    print("Parsing annotations")
    annotations_per_image = {}
    stat_per_split: Dict[str, int] = defaultdict(int)

    missed = 0
    for item_id, face in tqdm.tqdm(enumerate(annotations), total=len(annotations)):
        im_name = get_im_name(face["img_path"])
        split = splits[int(face["folder"])]

        stat_per_split[split] += 1

        gender = face["gender"] if "gender" in face else None
        if "alignment_source" in face and face["alignment_source"] == "file not found":
            missed += 1

        annotations_per_image[im_name] = {"age": str(face["age"]), "gender": gender, "split": split}

    print("missed annots: ", missed)

    print(f"Per split images: {stat_per_split}")
    print(f"Found {len(annotations_per_image)} annotations")
    return annotations_per_image


def read_data(images_dir, annotations, splits) -> Dict[str, List[PictureInfo]]:
    dataset: Dict[str, List[PictureInfo]] = defaultdict(list)
    all_images = get_all_files(images_dir)
    print(f"Found {len(all_images)} images")

    annotations_per_file: Dict[str, dict] = read_json_annotations(annotations, splits)

    total, missed = 0, 0
    missed_gender_and_age = 0
    stat_per_ages: Dict[str, int] = defaultdict(int)
    stat_per_gender: Dict[str, int] = defaultdict(int)

    for image_path in all_images:
        total += 1
        image_name = get_im_name(image_path)

        if image_name not in annotations_per_file:
            missed += 1
            print(f"Can not find annotation for {image_name}")
        else:
            annot = annotations_per_file[image_name]
            age, gender, split = annot["age"], annot["gender"], annot["split"]

            if gender is None and age is None:
                missed_gender_and_age += 1
                # skip such image
                continue

            if age is not None:
                stat_per_ages[age] += 1
            if gender is not None:
                stat_per_gender[gender] += 1

            info = PictureInfo(image_path, age, gender)
            dataset[split].append(info)

    print(f"Missed annots for images: {missed}/{total}")
    print(f"Missed ages and gender: {missed_gender_and_age}")
    ages = list(stat_per_ages.keys())
    print(f"Per gender stat: {stat_per_gender}")
    print(f"Per ages categories ({len(ages)} cats) :")
    ages = sorted(ages, key=lambda x: int(x.split("(")[-1].split(",")[0].strip()))
    for age in ages:
        print(f"Age: {age} Count: {stat_per_ages[age]}")

    return dataset


def collect_faces(
    faces_dir: str,
    annotations: List[dict],
    data_dir: str,
    detector_cfg: dict = None,
    padding: float = 0.1,
    splits: List[str] = [],
    db_name: str = "",
    use_coarse_persons: bool = False,
    find_persons: bool = False,
    person_padding: float = 0.0,
    use_coarse_faces: bool = False,
):
    """
    Generate train, val, test .txt annotation files with columns:
        ["img_name", "age", "gender",
        "face_x0", "face_y0", "face_x1", "face_y1",
        "person_x0", "person_y0", "person_x1", "person_y1"]

    All person bboxes here will be set to [-1, -1, -1, -1]

    If detector_cfg is set, for each face bbox will be refined using detector.
        Also, other detected faces wil be written to txt file (needed for further preprocessing)
    """

    # out directory for annotations
    out_dir = os.path.join(data_dir, "annotations")
    os.makedirs(out_dir, exist_ok=True)

    # load annotations
    images_per_split: Dict[str, List[PictureInfo]] = read_data(faces_dir, annotations, splits)

    for split_ind, (split, images) in enumerate(images_per_split.items()):
        print(f"Processing {split} split ({split_ind}/{len(images_per_split)})...")
        if detector_cfg:
            # detect faces with yolo detector
            faces_not_found, images_with_other_faces = 0, 0
            other_faces: List[PictureInfo] = []

            detector_weights, device = detector_cfg["weights"], detector_cfg["device"]
            detector = Detector(detector_weights, device, verbose=False, conf_thresh=0.1, iou_thresh=0.2)
            for image_info in tqdm.tqdm(images, desc="Detecting faces: "):
                cv_im = cv2.imread(image_info.image_path)
                im_h, im_w = cv_im.shape[:2]

                pad_x, pad_y = int(padding * im_w), int(padding * im_h)
                coarse_face_bbox = [pad_x, pad_y, im_w - pad_x, im_h - pad_y]  # xyxy

                detected_objects: PersonAndFaceResult = detector.predict(cv_im)
                main_bbox, other_faces_inds = get_main_face(detected_objects, coarse_face_bbox)

                if len(other_faces_inds):
                    images_with_other_faces += 1

                if main_bbox is None:
                    # use a full image as a face bbox
                    faces_not_found += 1
                    main_bbox = coarse_face_bbox
                elif use_coarse_faces:
                    main_bbox = coarse_face_bbox
                image_info.bbox = main_bbox

                if find_persons:
                    additional_faces, additional_persons = find_persons_on_image(
                        image_info, main_bbox, detected_objects, other_faces_inds, device
                    )
                    # add all additional faces
                    other_faces.extend(additional_faces)
                    # add persons with empty faces
                    other_faces.extend(additional_persons)
                else:
                    additional_faces = get_additional_bboxes(detected_objects, other_faces_inds, image_info.image_path)
                    other_faces.extend(additional_faces)
                    # full image as a person bbox
                    coarse_person_bbox = [0, 0, im_w, im_h]  # xyxy
                    if find_persons:
                        image_info.person_bbox = coarse_person_bbox

            print(f"Faces not detected: {faces_not_found}/{len(images)}")
            print(f"Images with other faces: {images_with_other_faces}/{len(images)}")
            print(f"Other faces: {len(other_faces)}")

            images = images + other_faces

        else:
            for image_info in tqdm.tqdm(images, desc="Collect face bboxes: "):

                cv_im = cv2.imread(image_info.image_path)
                im_h, im_w = cv_im.shape[:2]

                # use a full image as a face bbox
                pad_x, pad_y = int(padding * im_w), int(padding * im_h)
                image_info.bbox = [pad_x, pad_y, im_w - pad_x, im_h - pad_y]  # xyxy

                if use_coarse_persons or find_persons:
                    # full image as a person bbox
                    pad_x_p, pad_y_p = int(person_padding * im_w), int(person_padding * im_h)
                    image_info.person_bbox = [pad_x_p, pad_y_p, im_w - pad_x_p, im_h]  # xyxy

        save_annotations(images, faces_dir, out_file=os.path.join(out_dir, f"{db_name}_{split}_annotations.csv"))


def get_parser():
    parser = argparse.ArgumentParser(description="CACD")
    parser.add_argument(
        "--dataset_path",
        default="data/CACD",
        type=str,
        required=True,
        help="path to dataset with CACD200 folder",
    )
    parser.add_argument(
        "--detector_weights", default=None, type=str, required=False, help="path to face and person detector"
    )
    parser.add_argument("--device", default="cuda:0", type=str, required=False, help="device to inference detector")

    return parser


if __name__ == "__main__":

    parser = get_parser()
    args = parser.parse_args()

    data_dir = args.dataset_path
    if data_dir[-1] == "/":
        data_dir = data_dir[:-1]

    faces_dir = os.path.join(data_dir, "CACD2000")

    # https://github.com/paplhjak/Facial-Age-Estimation-Benchmark-Databases/tree/main
    json_path = os.path.join(data_dir, "CACD2000.json")
    with open(json_path, "r") as stream:
        annotations = json.load(stream)

    detector_cfg: Optional[Dict[str, str]] = None
    if args.detector_weights is not None:
        detector_cfg = {"weights": args.detector_weights, "device": "cuda:0"}

    splits = ["train", "valid", "test"]
    collect_faces(
        faces_dir,
        annotations,
        data_dir,
        detector_cfg,
        padding=0.2,
        splits=splits,
        db_name="cacd",
        find_persons=True,
        use_coarse_faces=True,
    )