|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
""" |
|
This code is refer from: |
|
https://github.com/JiaquanYe/TableMASTER-mmocr/blob/master/mmocr/models/textrecog/decoders/master_decoder.py |
|
""" |
|
|
|
import copy |
|
import math |
|
import paddle |
|
from paddle import nn |
|
from paddle.nn import functional as F |
|
|
|
|
|
class TableMasterHead(nn.Layer): |
|
""" |
|
Split to two transformer header at the last layer. |
|
Cls_layer is used to structure token classification. |
|
Bbox_layer is used to regress bbox coord. |
|
""" |
|
|
|
def __init__(self, |
|
in_channels, |
|
out_channels=30, |
|
headers=8, |
|
d_ff=2048, |
|
dropout=0, |
|
max_text_length=500, |
|
loc_reg_num=4, |
|
**kwargs): |
|
super(TableMasterHead, self).__init__() |
|
hidden_size = in_channels[-1] |
|
self.layers = clones( |
|
DecoderLayer(headers, hidden_size, dropout, d_ff), 2) |
|
self.cls_layer = clones( |
|
DecoderLayer(headers, hidden_size, dropout, d_ff), 1) |
|
self.bbox_layer = clones( |
|
DecoderLayer(headers, hidden_size, dropout, d_ff), 1) |
|
self.cls_fc = nn.Linear(hidden_size, out_channels) |
|
self.bbox_fc = nn.Sequential( |
|
|
|
nn.Linear(hidden_size, loc_reg_num), |
|
nn.Sigmoid()) |
|
self.norm = nn.LayerNorm(hidden_size) |
|
self.embedding = Embeddings(d_model=hidden_size, vocab=out_channels) |
|
self.positional_encoding = PositionalEncoding(d_model=hidden_size) |
|
|
|
self.SOS = out_channels - 3 |
|
self.PAD = out_channels - 1 |
|
self.out_channels = out_channels |
|
self.loc_reg_num = loc_reg_num |
|
self.max_text_length = max_text_length |
|
|
|
def make_mask(self, tgt): |
|
""" |
|
Make mask for self attention. |
|
:param src: [b, c, h, l_src] |
|
:param tgt: [b, l_tgt] |
|
:return: |
|
""" |
|
trg_pad_mask = (tgt != self.PAD).unsqueeze(1).unsqueeze(3) |
|
|
|
tgt_len = paddle.shape(tgt)[1] |
|
trg_sub_mask = paddle.tril( |
|
paddle.ones( |
|
([tgt_len, tgt_len]), dtype=paddle.float32)) |
|
|
|
tgt_mask = paddle.logical_and( |
|
trg_pad_mask.astype(paddle.float32), trg_sub_mask) |
|
return tgt_mask.astype(paddle.float32) |
|
|
|
def decode(self, input, feature, src_mask, tgt_mask): |
|
|
|
x = self.embedding(input) |
|
x = self.positional_encoding(x) |
|
|
|
|
|
for i, layer in enumerate(self.layers): |
|
x = layer(x, feature, src_mask, tgt_mask) |
|
|
|
|
|
for layer in self.cls_layer: |
|
cls_x = layer(x, feature, src_mask, tgt_mask) |
|
cls_x = self.norm(cls_x) |
|
|
|
|
|
for layer in self.bbox_layer: |
|
bbox_x = layer(x, feature, src_mask, tgt_mask) |
|
bbox_x = self.norm(bbox_x) |
|
return self.cls_fc(cls_x), self.bbox_fc(bbox_x) |
|
|
|
def greedy_forward(self, SOS, feature): |
|
input = SOS |
|
output = paddle.zeros( |
|
[input.shape[0], self.max_text_length + 1, self.out_channels]) |
|
bbox_output = paddle.zeros( |
|
[input.shape[0], self.max_text_length + 1, self.loc_reg_num]) |
|
max_text_length = paddle.to_tensor(self.max_text_length) |
|
for i in range(max_text_length + 1): |
|
target_mask = self.make_mask(input) |
|
out_step, bbox_output_step = self.decode(input, feature, None, |
|
target_mask) |
|
prob = F.softmax(out_step, axis=-1) |
|
next_word = prob.argmax(axis=2, dtype="int64") |
|
input = paddle.concat( |
|
[input, next_word[:, -1].unsqueeze(-1)], axis=1) |
|
if i == self.max_text_length: |
|
output = out_step |
|
bbox_output = bbox_output_step |
|
return output, bbox_output |
|
|
|
def forward_train(self, out_enc, targets): |
|
|
|
|
|
|
|
padded_targets = targets[0] |
|
src_mask = None |
|
tgt_mask = self.make_mask(padded_targets[:, :-1]) |
|
output, bbox_output = self.decode(padded_targets[:, :-1], out_enc, |
|
src_mask, tgt_mask) |
|
return {'structure_probs': output, 'loc_preds': bbox_output} |
|
|
|
def forward_test(self, out_enc): |
|
batch_size = out_enc.shape[0] |
|
SOS = paddle.zeros([batch_size, 1], dtype='int64') + self.SOS |
|
output, bbox_output = self.greedy_forward(SOS, out_enc) |
|
output = F.softmax(output) |
|
return {'structure_probs': output, 'loc_preds': bbox_output} |
|
|
|
def forward(self, feat, targets=None): |
|
feat = feat[-1] |
|
b, c, h, w = feat.shape |
|
feat = feat.reshape([b, c, h * w]) |
|
feat = feat.transpose((0, 2, 1)) |
|
out_enc = self.positional_encoding(feat) |
|
if self.training: |
|
return self.forward_train(out_enc, targets) |
|
|
|
return self.forward_test(out_enc) |
|
|
|
|
|
class DecoderLayer(nn.Layer): |
|
""" |
|
Decoder is made of self attention, srouce attention and feed forward. |
|
""" |
|
|
|
def __init__(self, headers, d_model, dropout, d_ff): |
|
super(DecoderLayer, self).__init__() |
|
self.self_attn = MultiHeadAttention(headers, d_model, dropout) |
|
self.src_attn = MultiHeadAttention(headers, d_model, dropout) |
|
self.feed_forward = FeedForward(d_model, d_ff, dropout) |
|
self.sublayer = clones(SubLayerConnection(d_model, dropout), 3) |
|
|
|
def forward(self, x, feature, src_mask, tgt_mask): |
|
x = self.sublayer[0](x, lambda x: self.self_attn(x, x, x, tgt_mask)) |
|
x = self.sublayer[1]( |
|
x, lambda x: self.src_attn(x, feature, feature, src_mask)) |
|
return self.sublayer[2](x, self.feed_forward) |
|
|
|
|
|
class MultiHeadAttention(nn.Layer): |
|
def __init__(self, headers, d_model, dropout): |
|
super(MultiHeadAttention, self).__init__() |
|
|
|
assert d_model % headers == 0 |
|
self.d_k = int(d_model / headers) |
|
self.headers = headers |
|
self.linears = clones(nn.Linear(d_model, d_model), 4) |
|
self.attn = None |
|
self.dropout = nn.Dropout(dropout) |
|
|
|
def forward(self, query, key, value, mask=None): |
|
B = query.shape[0] |
|
|
|
|
|
query, key, value = \ |
|
[l(x).reshape([B, 0, self.headers, self.d_k]).transpose([0, 2, 1, 3]) |
|
for l, x in zip(self.linears, (query, key, value))] |
|
|
|
x, self.attn = self_attention( |
|
query, key, value, mask=mask, dropout=self.dropout) |
|
x = x.transpose([0, 2, 1, 3]).reshape([B, 0, self.headers * self.d_k]) |
|
return self.linears[-1](x) |
|
|
|
|
|
class FeedForward(nn.Layer): |
|
def __init__(self, d_model, d_ff, dropout): |
|
super(FeedForward, self).__init__() |
|
self.w_1 = nn.Linear(d_model, d_ff) |
|
self.w_2 = nn.Linear(d_ff, d_model) |
|
self.dropout = nn.Dropout(dropout) |
|
|
|
def forward(self, x): |
|
return self.w_2(self.dropout(F.relu(self.w_1(x)))) |
|
|
|
|
|
class SubLayerConnection(nn.Layer): |
|
""" |
|
A residual connection followed by a layer norm. |
|
Note for code simplicity the norm is first as opposed to last. |
|
""" |
|
|
|
def __init__(self, size, dropout): |
|
super(SubLayerConnection, self).__init__() |
|
self.norm = nn.LayerNorm(size) |
|
self.dropout = nn.Dropout(dropout) |
|
|
|
def forward(self, x, sublayer): |
|
return x + self.dropout(sublayer(self.norm(x))) |
|
|
|
|
|
def masked_fill(x, mask, value): |
|
mask = mask.astype(x.dtype) |
|
return x * paddle.logical_not(mask).astype(x.dtype) + mask * value |
|
|
|
|
|
def self_attention(query, key, value, mask=None, dropout=None): |
|
""" |
|
Compute 'Scale Dot Product Attention' |
|
""" |
|
d_k = value.shape[-1] |
|
|
|
score = paddle.matmul(query, key.transpose([0, 1, 3, 2]) / math.sqrt(d_k)) |
|
if mask is not None: |
|
|
|
score = masked_fill(score, mask == 0, -6.55e4) |
|
|
|
p_attn = F.softmax(score, axis=-1) |
|
|
|
if dropout is not None: |
|
p_attn = dropout(p_attn) |
|
return paddle.matmul(p_attn, value), p_attn |
|
|
|
|
|
def clones(module, N): |
|
""" Produce N identical layers """ |
|
return nn.LayerList([copy.deepcopy(module) for _ in range(N)]) |
|
|
|
|
|
class Embeddings(nn.Layer): |
|
def __init__(self, d_model, vocab): |
|
super(Embeddings, self).__init__() |
|
self.lut = nn.Embedding(vocab, d_model) |
|
self.d_model = d_model |
|
|
|
def forward(self, *input): |
|
x = input[0] |
|
return self.lut(x) * math.sqrt(self.d_model) |
|
|
|
|
|
class PositionalEncoding(nn.Layer): |
|
""" Implement the PE function. """ |
|
|
|
def __init__(self, d_model, dropout=0., max_len=5000): |
|
super(PositionalEncoding, self).__init__() |
|
self.dropout = nn.Dropout(p=dropout) |
|
|
|
|
|
pe = paddle.zeros([max_len, d_model]) |
|
position = paddle.arange(0, max_len).unsqueeze(1).astype('float32') |
|
div_term = paddle.exp( |
|
paddle.arange(0, d_model, 2) * -math.log(10000.0) / d_model) |
|
pe[:, 0::2] = paddle.sin(position * div_term) |
|
pe[:, 1::2] = paddle.cos(position * div_term) |
|
pe = pe.unsqueeze(0) |
|
self.register_buffer('pe', pe) |
|
|
|
def forward(self, feat, **kwargs): |
|
feat = feat + self.pe[:, :paddle.shape(feat)[1]] |
|
return self.dropout(feat) |
|
|