|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
""" |
|
This code is refer from: |
|
https://github.com/wangyuxin87/VisionLAN |
|
""" |
|
|
|
from __future__ import absolute_import |
|
from __future__ import division |
|
from __future__ import print_function |
|
|
|
import paddle |
|
from paddle import ParamAttr |
|
import paddle.nn as nn |
|
import paddle.nn.functional as F |
|
from paddle.nn.initializer import Normal, XavierNormal |
|
import numpy as np |
|
|
|
|
|
class PositionalEncoding(nn.Layer): |
|
def __init__(self, d_hid, n_position=200): |
|
super(PositionalEncoding, self).__init__() |
|
self.register_buffer( |
|
'pos_table', self._get_sinusoid_encoding_table(n_position, d_hid)) |
|
|
|
def _get_sinusoid_encoding_table(self, n_position, d_hid): |
|
''' Sinusoid position encoding table ''' |
|
|
|
def get_position_angle_vec(position): |
|
return [ |
|
position / np.power(10000, 2 * (hid_j // 2) / d_hid) |
|
for hid_j in range(d_hid) |
|
] |
|
|
|
sinusoid_table = np.array( |
|
[get_position_angle_vec(pos_i) for pos_i in range(n_position)]) |
|
sinusoid_table[:, 0::2] = np.sin(sinusoid_table[:, 0::2]) |
|
sinusoid_table[:, 1::2] = np.cos(sinusoid_table[:, 1::2]) |
|
sinusoid_table = paddle.to_tensor(sinusoid_table, dtype='float32') |
|
sinusoid_table = paddle.unsqueeze(sinusoid_table, axis=0) |
|
return sinusoid_table |
|
|
|
def forward(self, x): |
|
return x + self.pos_table[:, :x.shape[1]].clone().detach() |
|
|
|
|
|
class ScaledDotProductAttention(nn.Layer): |
|
"Scaled Dot-Product Attention" |
|
|
|
def __init__(self, temperature, attn_dropout=0.1): |
|
super(ScaledDotProductAttention, self).__init__() |
|
self.temperature = temperature |
|
self.dropout = nn.Dropout(attn_dropout) |
|
self.softmax = nn.Softmax(axis=2) |
|
|
|
def forward(self, q, k, v, mask=None): |
|
k = paddle.transpose(k, perm=[0, 2, 1]) |
|
attn = paddle.bmm(q, k) |
|
attn = attn / self.temperature |
|
if mask is not None: |
|
attn = attn.masked_fill(mask, -1e9) |
|
if mask.dim() == 3: |
|
mask = paddle.unsqueeze(mask, axis=1) |
|
elif mask.dim() == 2: |
|
mask = paddle.unsqueeze(mask, axis=1) |
|
mask = paddle.unsqueeze(mask, axis=1) |
|
repeat_times = [ |
|
attn.shape[1] // mask.shape[1], attn.shape[2] // mask.shape[2] |
|
] |
|
mask = paddle.tile(mask, [1, repeat_times[0], repeat_times[1], 1]) |
|
attn[mask == 0] = -1e9 |
|
attn = self.softmax(attn) |
|
attn = self.dropout(attn) |
|
output = paddle.bmm(attn, v) |
|
return output |
|
|
|
|
|
class MultiHeadAttention(nn.Layer): |
|
" Multi-Head Attention module" |
|
|
|
def __init__(self, n_head, d_model, d_k, d_v, dropout=0.1): |
|
super(MultiHeadAttention, self).__init__() |
|
self.n_head = n_head |
|
self.d_k = d_k |
|
self.d_v = d_v |
|
self.w_qs = nn.Linear( |
|
d_model, |
|
n_head * d_k, |
|
weight_attr=ParamAttr(initializer=Normal( |
|
mean=0, std=np.sqrt(2.0 / (d_model + d_k))))) |
|
self.w_ks = nn.Linear( |
|
d_model, |
|
n_head * d_k, |
|
weight_attr=ParamAttr(initializer=Normal( |
|
mean=0, std=np.sqrt(2.0 / (d_model + d_k))))) |
|
self.w_vs = nn.Linear( |
|
d_model, |
|
n_head * d_v, |
|
weight_attr=ParamAttr(initializer=Normal( |
|
mean=0, std=np.sqrt(2.0 / (d_model + d_v))))) |
|
|
|
self.attention = ScaledDotProductAttention(temperature=np.power(d_k, |
|
0.5)) |
|
self.layer_norm = nn.LayerNorm(d_model) |
|
self.fc = nn.Linear( |
|
n_head * d_v, |
|
d_model, |
|
weight_attr=ParamAttr(initializer=XavierNormal())) |
|
self.dropout = nn.Dropout(dropout) |
|
|
|
def forward(self, q, k, v, mask=None): |
|
d_k, d_v, n_head = self.d_k, self.d_v, self.n_head |
|
sz_b, len_q, _ = q.shape |
|
sz_b, len_k, _ = k.shape |
|
sz_b, len_v, _ = v.shape |
|
residual = q |
|
|
|
q = self.w_qs(q) |
|
q = paddle.reshape( |
|
q, shape=[-1, len_q, n_head, d_k]) |
|
k = self.w_ks(k) |
|
k = paddle.reshape(k, shape=[-1, len_k, n_head, d_k]) |
|
v = self.w_vs(v) |
|
v = paddle.reshape(v, shape=[-1, len_v, n_head, d_v]) |
|
|
|
q = paddle.transpose(q, perm=[2, 0, 1, 3]) |
|
q = paddle.reshape(q, shape=[-1, len_q, d_k]) |
|
k = paddle.transpose(k, perm=[2, 0, 1, 3]) |
|
k = paddle.reshape(k, shape=[-1, len_k, d_k]) |
|
v = paddle.transpose(v, perm=[2, 0, 1, 3]) |
|
v = paddle.reshape(v, shape=[-1, len_v, d_v]) |
|
|
|
mask = paddle.tile( |
|
mask, |
|
[n_head, 1, 1]) if mask is not None else None |
|
output = self.attention(q, k, v, mask=mask) |
|
output = paddle.reshape(output, shape=[n_head, -1, len_q, d_v]) |
|
output = paddle.transpose(output, perm=[1, 2, 0, 3]) |
|
output = paddle.reshape( |
|
output, shape=[-1, len_q, n_head * d_v]) |
|
output = self.dropout(self.fc(output)) |
|
output = self.layer_norm(output + residual) |
|
return output |
|
|
|
|
|
class PositionwiseFeedForward(nn.Layer): |
|
def __init__(self, d_in, d_hid, dropout=0.1): |
|
super(PositionwiseFeedForward, self).__init__() |
|
self.w_1 = nn.Conv1D(d_in, d_hid, 1) |
|
self.w_2 = nn.Conv1D(d_hid, d_in, 1) |
|
self.layer_norm = nn.LayerNorm(d_in) |
|
self.dropout = nn.Dropout(dropout) |
|
|
|
def forward(self, x): |
|
residual = x |
|
x = paddle.transpose(x, perm=[0, 2, 1]) |
|
x = self.w_2(F.relu(self.w_1(x))) |
|
x = paddle.transpose(x, perm=[0, 2, 1]) |
|
x = self.dropout(x) |
|
x = self.layer_norm(x + residual) |
|
return x |
|
|
|
|
|
class EncoderLayer(nn.Layer): |
|
''' Compose with two layers ''' |
|
|
|
def __init__(self, d_model, d_inner, n_head, d_k, d_v, dropout=0.1): |
|
super(EncoderLayer, self).__init__() |
|
self.slf_attn = MultiHeadAttention( |
|
n_head, d_model, d_k, d_v, dropout=dropout) |
|
self.pos_ffn = PositionwiseFeedForward( |
|
d_model, d_inner, dropout=dropout) |
|
|
|
def forward(self, enc_input, slf_attn_mask=None): |
|
enc_output = self.slf_attn( |
|
enc_input, enc_input, enc_input, mask=slf_attn_mask) |
|
enc_output = self.pos_ffn(enc_output) |
|
return enc_output |
|
|
|
|
|
class Transformer_Encoder(nn.Layer): |
|
def __init__(self, |
|
n_layers=2, |
|
n_head=8, |
|
d_word_vec=512, |
|
d_k=64, |
|
d_v=64, |
|
d_model=512, |
|
d_inner=2048, |
|
dropout=0.1, |
|
n_position=256): |
|
super(Transformer_Encoder, self).__init__() |
|
self.position_enc = PositionalEncoding( |
|
d_word_vec, n_position=n_position) |
|
self.dropout = nn.Dropout(p=dropout) |
|
self.layer_stack = nn.LayerList([ |
|
EncoderLayer( |
|
d_model, d_inner, n_head, d_k, d_v, dropout=dropout) |
|
for _ in range(n_layers) |
|
]) |
|
self.layer_norm = nn.LayerNorm(d_model, epsilon=1e-6) |
|
|
|
def forward(self, enc_output, src_mask, return_attns=False): |
|
enc_output = self.dropout( |
|
self.position_enc(enc_output)) |
|
for enc_layer in self.layer_stack: |
|
enc_output = enc_layer(enc_output, slf_attn_mask=src_mask) |
|
enc_output = self.layer_norm(enc_output) |
|
return enc_output |
|
|
|
|
|
class PP_layer(nn.Layer): |
|
def __init__(self, n_dim=512, N_max_character=25, n_position=256): |
|
|
|
super(PP_layer, self).__init__() |
|
self.character_len = N_max_character |
|
self.f0_embedding = nn.Embedding(N_max_character, n_dim) |
|
self.w0 = nn.Linear(N_max_character, n_position) |
|
self.wv = nn.Linear(n_dim, n_dim) |
|
self.we = nn.Linear(n_dim, N_max_character) |
|
self.active = nn.Tanh() |
|
self.softmax = nn.Softmax(axis=2) |
|
|
|
def forward(self, enc_output): |
|
|
|
reading_order = paddle.arange(self.character_len, dtype='int64') |
|
reading_order = reading_order.unsqueeze(0).expand( |
|
[enc_output.shape[0], self.character_len]) |
|
reading_order = self.f0_embedding(reading_order) |
|
|
|
|
|
reading_order = paddle.transpose(reading_order, perm=[0, 2, 1]) |
|
t = self.w0(reading_order) |
|
t = self.active( |
|
paddle.transpose( |
|
t, perm=[0, 2, 1]) + self.wv(enc_output)) |
|
t = self.we(t) |
|
t = self.softmax(paddle.transpose(t, perm=[0, 2, 1])) |
|
g_output = paddle.bmm(t, enc_output) |
|
return g_output |
|
|
|
|
|
class Prediction(nn.Layer): |
|
def __init__(self, |
|
n_dim=512, |
|
n_position=256, |
|
N_max_character=25, |
|
n_class=37): |
|
super(Prediction, self).__init__() |
|
self.pp = PP_layer( |
|
n_dim=n_dim, N_max_character=N_max_character, n_position=n_position) |
|
self.pp_share = PP_layer( |
|
n_dim=n_dim, N_max_character=N_max_character, n_position=n_position) |
|
self.w_vrm = nn.Linear(n_dim, n_class) |
|
self.w_share = nn.Linear(n_dim, n_class) |
|
self.nclass = n_class |
|
|
|
def forward(self, cnn_feature, f_res, f_sub, train_mode=False, |
|
use_mlm=True): |
|
if train_mode: |
|
if not use_mlm: |
|
g_output = self.pp(cnn_feature) |
|
g_output = self.w_vrm(g_output) |
|
f_res = 0 |
|
f_sub = 0 |
|
return g_output, f_res, f_sub |
|
g_output = self.pp(cnn_feature) |
|
f_res = self.pp_share(f_res) |
|
f_sub = self.pp_share(f_sub) |
|
g_output = self.w_vrm(g_output) |
|
f_res = self.w_share(f_res) |
|
f_sub = self.w_share(f_sub) |
|
return g_output, f_res, f_sub |
|
else: |
|
g_output = self.pp(cnn_feature) |
|
g_output = self.w_vrm(g_output) |
|
return g_output |
|
|
|
|
|
class MLM(nn.Layer): |
|
"Architecture of MLM" |
|
|
|
def __init__(self, n_dim=512, n_position=256, max_text_length=25): |
|
super(MLM, self).__init__() |
|
self.MLM_SequenceModeling_mask = Transformer_Encoder( |
|
n_layers=2, n_position=n_position) |
|
self.MLM_SequenceModeling_WCL = Transformer_Encoder( |
|
n_layers=1, n_position=n_position) |
|
self.pos_embedding = nn.Embedding(max_text_length, n_dim) |
|
self.w0_linear = nn.Linear(1, n_position) |
|
self.wv = nn.Linear(n_dim, n_dim) |
|
self.active = nn.Tanh() |
|
self.we = nn.Linear(n_dim, 1) |
|
self.sigmoid = nn.Sigmoid() |
|
|
|
def forward(self, x, label_pos): |
|
|
|
feature_v_seq = self.MLM_SequenceModeling_mask(x, src_mask=None) |
|
|
|
label_pos = paddle.to_tensor(label_pos, dtype='int64') |
|
pos_emb = self.pos_embedding(label_pos) |
|
pos_emb = self.w0_linear(paddle.unsqueeze(pos_emb, axis=2)) |
|
pos_emb = paddle.transpose(pos_emb, perm=[0, 2, 1]) |
|
|
|
att_map_sub = self.active(pos_emb + self.wv(feature_v_seq)) |
|
att_map_sub = self.we(att_map_sub) |
|
att_map_sub = paddle.transpose(att_map_sub, perm=[0, 2, 1]) |
|
att_map_sub = self.sigmoid(att_map_sub) |
|
|
|
|
|
att_map_sub = paddle.transpose(att_map_sub, perm=[0, 2, 1]) |
|
f_res = x * (1 - att_map_sub) |
|
f_sub = x * att_map_sub |
|
|
|
f_res = self.MLM_SequenceModeling_WCL(f_res, src_mask=None) |
|
f_sub = self.MLM_SequenceModeling_WCL(f_sub, src_mask=None) |
|
return f_res, f_sub, att_map_sub |
|
|
|
|
|
def trans_1d_2d(x): |
|
b, w_h, c = x.shape |
|
x = paddle.transpose(x, perm=[0, 2, 1]) |
|
x = paddle.reshape(x, [-1, c, 32, 8]) |
|
x = paddle.transpose(x, perm=[0, 1, 3, 2]) |
|
return x |
|
|
|
|
|
class MLM_VRM(nn.Layer): |
|
""" |
|
MLM+VRM, MLM is only used in training. |
|
ratio controls the occluded number in a batch. |
|
The pipeline of VisionLAN in testing is very concise with only a backbone + sequence modeling(transformer unit) + prediction layer(pp layer). |
|
x: input image |
|
label_pos: character index |
|
training_step: LF or LA process |
|
output |
|
text_pre: prediction of VRM |
|
test_rem: prediction of remaining string in MLM |
|
text_mas: prediction of occluded character in MLM |
|
mask_c_show: visualization of Mask_c |
|
""" |
|
|
|
def __init__(self, |
|
n_layers=3, |
|
n_position=256, |
|
n_dim=512, |
|
max_text_length=25, |
|
nclass=37): |
|
super(MLM_VRM, self).__init__() |
|
self.MLM = MLM(n_dim=n_dim, |
|
n_position=n_position, |
|
max_text_length=max_text_length) |
|
self.SequenceModeling = Transformer_Encoder( |
|
n_layers=n_layers, n_position=n_position) |
|
self.Prediction = Prediction( |
|
n_dim=n_dim, |
|
n_position=n_position, |
|
N_max_character=max_text_length + |
|
1, |
|
n_class=nclass) |
|
self.nclass = nclass |
|
self.max_text_length = max_text_length |
|
|
|
def forward(self, x, label_pos, training_step, train_mode=False): |
|
b, c, h, w = x.shape |
|
nT = self.max_text_length |
|
x = paddle.transpose(x, perm=[0, 1, 3, 2]) |
|
x = paddle.reshape(x, [-1, c, h * w]) |
|
x = paddle.transpose(x, perm=[0, 2, 1]) |
|
if train_mode: |
|
if training_step == 'LF_1': |
|
f_res = 0 |
|
f_sub = 0 |
|
x = self.SequenceModeling(x, src_mask=None) |
|
text_pre, test_rem, text_mas = self.Prediction( |
|
x, f_res, f_sub, train_mode=True, use_mlm=False) |
|
return text_pre, text_pre, text_pre, text_pre |
|
elif training_step == 'LF_2': |
|
|
|
f_res, f_sub, mask_c = self.MLM(x, label_pos) |
|
x = self.SequenceModeling(x, src_mask=None) |
|
text_pre, test_rem, text_mas = self.Prediction( |
|
x, f_res, f_sub, train_mode=True) |
|
mask_c_show = trans_1d_2d(mask_c) |
|
return text_pre, test_rem, text_mas, mask_c_show |
|
elif training_step == 'LA': |
|
|
|
f_res, f_sub, mask_c = self.MLM(x, label_pos) |
|
|
|
|
|
character_mask = paddle.zeros_like(mask_c) |
|
|
|
ratio = b // 2 |
|
if ratio >= 1: |
|
with paddle.no_grad(): |
|
character_mask[0:ratio, :, :] = mask_c[0:ratio, :, :] |
|
else: |
|
character_mask = mask_c |
|
x = x * (1 - character_mask) |
|
|
|
|
|
x = self.SequenceModeling(x, src_mask=None) |
|
|
|
text_pre, test_rem, text_mas = self.Prediction( |
|
x, f_res, f_sub, train_mode=True) |
|
mask_c_show = trans_1d_2d(mask_c) |
|
return text_pre, test_rem, text_mas, mask_c_show |
|
else: |
|
raise NotImplementedError |
|
else: |
|
f_res = 0 |
|
f_sub = 0 |
|
contextual_feature = self.SequenceModeling(x, src_mask=None) |
|
text_pre = self.Prediction( |
|
contextual_feature, |
|
f_res, |
|
f_sub, |
|
train_mode=False, |
|
use_mlm=False) |
|
text_pre = paddle.transpose( |
|
text_pre, perm=[1, 0, 2]) |
|
return text_pre, x |
|
|
|
|
|
class VLHead(nn.Layer): |
|
""" |
|
Architecture of VisionLAN |
|
""" |
|
|
|
def __init__(self, |
|
in_channels, |
|
out_channels=36, |
|
n_layers=3, |
|
n_position=256, |
|
n_dim=512, |
|
max_text_length=25, |
|
training_step='LA'): |
|
super(VLHead, self).__init__() |
|
self.MLM_VRM = MLM_VRM( |
|
n_layers=n_layers, |
|
n_position=n_position, |
|
n_dim=n_dim, |
|
max_text_length=max_text_length, |
|
nclass=out_channels + 1) |
|
self.training_step = training_step |
|
|
|
def forward(self, feat, targets=None): |
|
|
|
if self.training: |
|
label_pos = targets[-2] |
|
text_pre, test_rem, text_mas, mask_map = self.MLM_VRM( |
|
feat, label_pos, self.training_step, train_mode=True) |
|
return text_pre, test_rem, text_mas, mask_map |
|
else: |
|
text_pre, x = self.MLM_VRM( |
|
feat, targets, self.training_step, train_mode=False) |
|
return text_pre, x |
|
|