MoMask / data /t2m_dataset.py
MeYourHint's picture
first demo version
c0eac48
raw
history blame
14.6 kB
from os.path import join as pjoin
import torch
from torch.utils import data
import numpy as np
from tqdm import tqdm
from torch.utils.data._utils.collate import default_collate
import random
import codecs as cs
def collate_fn(batch):
batch.sort(key=lambda x: x[3], reverse=True)
return default_collate(batch)
class MotionDataset(data.Dataset):
def __init__(self, opt, mean, std, split_file):
self.opt = opt
joints_num = opt.joints_num
self.data = []
self.lengths = []
id_list = []
with open(split_file, 'r') as f:
for line in f.readlines():
id_list.append(line.strip())
for name in tqdm(id_list):
try:
motion = np.load(pjoin(opt.motion_dir, name + '.npy'))
if motion.shape[0] < opt.window_size:
continue
self.lengths.append(motion.shape[0] - opt.window_size)
self.data.append(motion)
except Exception as e:
# Some motion may not exist in KIT dataset
print(e)
pass
self.cumsum = np.cumsum([0] + self.lengths)
if opt.is_train:
# root_rot_velocity (B, seq_len, 1)
std[0:1] = std[0:1] / opt.feat_bias
# root_linear_velocity (B, seq_len, 2)
std[1:3] = std[1:3] / opt.feat_bias
# root_y (B, seq_len, 1)
std[3:4] = std[3:4] / opt.feat_bias
# ric_data (B, seq_len, (joint_num - 1)*3)
std[4: 4 + (joints_num - 1) * 3] = std[4: 4 + (joints_num - 1) * 3] / 1.0
# rot_data (B, seq_len, (joint_num - 1)*6)
std[4 + (joints_num - 1) * 3: 4 + (joints_num - 1) * 9] = std[4 + (joints_num - 1) * 3: 4 + (
joints_num - 1) * 9] / 1.0
# local_velocity (B, seq_len, joint_num*3)
std[4 + (joints_num - 1) * 9: 4 + (joints_num - 1) * 9 + joints_num * 3] = std[
4 + (joints_num - 1) * 9: 4 + (
joints_num - 1) * 9 + joints_num * 3] / 1.0
# foot contact (B, seq_len, 4)
std[4 + (joints_num - 1) * 9 + joints_num * 3:] = std[
4 + (
joints_num - 1) * 9 + joints_num * 3:] / opt.feat_bias
assert 4 + (joints_num - 1) * 9 + joints_num * 3 + 4 == mean.shape[-1]
np.save(pjoin(opt.meta_dir, 'mean.npy'), mean)
np.save(pjoin(opt.meta_dir, 'std.npy'), std)
self.mean = mean
self.std = std
print("Total number of motions {}, snippets {}".format(len(self.data), self.cumsum[-1]))
def inv_transform(self, data):
return data * self.std + self.mean
def __len__(self):
return self.cumsum[-1]
def __getitem__(self, item):
if item != 0:
motion_id = np.searchsorted(self.cumsum, item) - 1
idx = item - self.cumsum[motion_id] - 1
else:
motion_id = 0
idx = 0
motion = self.data[motion_id][idx:idx + self.opt.window_size]
"Z Normalization"
motion = (motion - self.mean) / self.std
return motion
class Text2MotionDatasetEval(data.Dataset):
def __init__(self, opt, mean, std, split_file, w_vectorizer):
self.opt = opt
self.w_vectorizer = w_vectorizer
self.max_length = 20
self.pointer = 0
self.max_motion_length = opt.max_motion_length
min_motion_len = 40 if self.opt.dataset_name =='t2m' else 24
data_dict = {}
id_list = []
with cs.open(split_file, 'r') as f:
for line in f.readlines():
id_list.append(line.strip())
# id_list = id_list[:250]
new_name_list = []
length_list = []
for name in tqdm(id_list):
try:
motion = np.load(pjoin(opt.motion_dir, name + '.npy'))
if (len(motion)) < min_motion_len or (len(motion) >= 200):
continue
text_data = []
flag = False
with cs.open(pjoin(opt.text_dir, name + '.txt')) as f:
for line in f.readlines():
text_dict = {}
line_split = line.strip().split('#')
caption = line_split[0]
tokens = line_split[1].split(' ')
f_tag = float(line_split[2])
to_tag = float(line_split[3])
f_tag = 0.0 if np.isnan(f_tag) else f_tag
to_tag = 0.0 if np.isnan(to_tag) else to_tag
text_dict['caption'] = caption
text_dict['tokens'] = tokens
if f_tag == 0.0 and to_tag == 0.0:
flag = True
text_data.append(text_dict)
else:
try:
n_motion = motion[int(f_tag*20) : int(to_tag*20)]
if (len(n_motion)) < min_motion_len or (len(n_motion) >= 200):
continue
new_name = random.choice('ABCDEFGHIJKLMNOPQRSTUVW') + '_' + name
while new_name in data_dict:
new_name = random.choice('ABCDEFGHIJKLMNOPQRSTUVW') + '_' + name
data_dict[new_name] = {'motion': n_motion,
'length': len(n_motion),
'text':[text_dict]}
new_name_list.append(new_name)
length_list.append(len(n_motion))
except:
print(line_split)
print(line_split[2], line_split[3], f_tag, to_tag, name)
# break
if flag:
data_dict[name] = {'motion': motion,
'length': len(motion),
'text': text_data}
new_name_list.append(name)
length_list.append(len(motion))
except:
pass
name_list, length_list = zip(*sorted(zip(new_name_list, length_list), key=lambda x: x[1]))
self.mean = mean
self.std = std
self.length_arr = np.array(length_list)
self.data_dict = data_dict
self.name_list = name_list
self.reset_max_len(self.max_length)
def reset_max_len(self, length):
assert length <= self.max_motion_length
self.pointer = np.searchsorted(self.length_arr, length)
print("Pointer Pointing at %d"%self.pointer)
self.max_length = length
def inv_transform(self, data):
return data * self.std + self.mean
def __len__(self):
return len(self.data_dict) - self.pointer
def __getitem__(self, item):
idx = self.pointer + item
data = self.data_dict[self.name_list[idx]]
motion, m_length, text_list = data['motion'], data['length'], data['text']
# Randomly select a caption
text_data = random.choice(text_list)
caption, tokens = text_data['caption'], text_data['tokens']
if len(tokens) < self.opt.max_text_len:
# pad with "unk"
tokens = ['sos/OTHER'] + tokens + ['eos/OTHER']
sent_len = len(tokens)
tokens = tokens + ['unk/OTHER'] * (self.opt.max_text_len + 2 - sent_len)
else:
# crop
tokens = tokens[:self.opt.max_text_len]
tokens = ['sos/OTHER'] + tokens + ['eos/OTHER']
sent_len = len(tokens)
pos_one_hots = []
word_embeddings = []
for token in tokens:
word_emb, pos_oh = self.w_vectorizer[token]
pos_one_hots.append(pos_oh[None, :])
word_embeddings.append(word_emb[None, :])
pos_one_hots = np.concatenate(pos_one_hots, axis=0)
word_embeddings = np.concatenate(word_embeddings, axis=0)
if self.opt.unit_length < 10:
coin2 = np.random.choice(['single', 'single', 'double'])
else:
coin2 = 'single'
if coin2 == 'double':
m_length = (m_length // self.opt.unit_length - 1) * self.opt.unit_length
elif coin2 == 'single':
m_length = (m_length // self.opt.unit_length) * self.opt.unit_length
idx = random.randint(0, len(motion) - m_length)
motion = motion[idx:idx+m_length]
"Z Normalization"
motion = (motion - self.mean) / self.std
if m_length < self.max_motion_length:
motion = np.concatenate([motion,
np.zeros((self.max_motion_length - m_length, motion.shape[1]))
], axis=0)
# print(word_embeddings.shape, motion.shape)
# print(tokens)
return word_embeddings, pos_one_hots, caption, sent_len, motion, m_length, '_'.join(tokens)
class Text2MotionDataset(data.Dataset):
def __init__(self, opt, mean, std, split_file):
self.opt = opt
self.max_length = 20
self.pointer = 0
self.max_motion_length = opt.max_motion_length
min_motion_len = 40 if self.opt.dataset_name =='t2m' else 24
data_dict = {}
id_list = []
with cs.open(split_file, 'r') as f:
for line in f.readlines():
id_list.append(line.strip())
# id_list = id_list[:250]
new_name_list = []
length_list = []
for name in tqdm(id_list):
try:
motion = np.load(pjoin(opt.motion_dir, name + '.npy'))
if (len(motion)) < min_motion_len or (len(motion) >= 200):
continue
text_data = []
flag = False
with cs.open(pjoin(opt.text_dir, name + '.txt')) as f:
for line in f.readlines():
text_dict = {}
line_split = line.strip().split('#')
# print(line)
caption = line_split[0]
tokens = line_split[1].split(' ')
f_tag = float(line_split[2])
to_tag = float(line_split[3])
f_tag = 0.0 if np.isnan(f_tag) else f_tag
to_tag = 0.0 if np.isnan(to_tag) else to_tag
text_dict['caption'] = caption
text_dict['tokens'] = tokens
if f_tag == 0.0 and to_tag == 0.0:
flag = True
text_data.append(text_dict)
else:
try:
n_motion = motion[int(f_tag*20) : int(to_tag*20)]
if (len(n_motion)) < min_motion_len or (len(n_motion) >= 200):
continue
new_name = random.choice('ABCDEFGHIJKLMNOPQRSTUVW') + '_' + name
while new_name in data_dict:
new_name = random.choice('ABCDEFGHIJKLMNOPQRSTUVW') + '_' + name
data_dict[new_name] = {'motion': n_motion,
'length': len(n_motion),
'text':[text_dict]}
new_name_list.append(new_name)
length_list.append(len(n_motion))
except:
print(line_split)
print(line_split[2], line_split[3], f_tag, to_tag, name)
# break
if flag:
data_dict[name] = {'motion': motion,
'length': len(motion),
'text': text_data}
new_name_list.append(name)
length_list.append(len(motion))
except Exception as e:
# print(e)
pass
# name_list, length_list = zip(*sorted(zip(new_name_list, length_list), key=lambda x: x[1]))
name_list, length_list = new_name_list, length_list
self.mean = mean
self.std = std
self.length_arr = np.array(length_list)
self.data_dict = data_dict
self.name_list = name_list
def inv_transform(self, data):
return data * self.std + self.mean
def __len__(self):
return len(self.data_dict) - self.pointer
def __getitem__(self, item):
idx = self.pointer + item
data = self.data_dict[self.name_list[idx]]
motion, m_length, text_list = data['motion'], data['length'], data['text']
# Randomly select a caption
text_data = random.choice(text_list)
caption, tokens = text_data['caption'], text_data['tokens']
if self.opt.unit_length < 10:
coin2 = np.random.choice(['single', 'single', 'double'])
else:
coin2 = 'single'
if coin2 == 'double':
m_length = (m_length // self.opt.unit_length - 1) * self.opt.unit_length
elif coin2 == 'single':
m_length = (m_length // self.opt.unit_length) * self.opt.unit_length
idx = random.randint(0, len(motion) - m_length)
motion = motion[idx:idx+m_length]
"Z Normalization"
motion = (motion - self.mean) / self.std
if m_length < self.max_motion_length:
motion = np.concatenate([motion,
np.zeros((self.max_motion_length - m_length, motion.shape[1]))
], axis=0)
# print(word_embeddings.shape, motion.shape)
# print(tokens)
return caption, motion, m_length
def reset_min_len(self, length):
assert length <= self.max_motion_length
self.pointer = np.searchsorted(self.length_arr, length)
print("Pointer Pointing at %d" % self.pointer)