zhengr's picture
First version
19dc0f3
'''
This file has been 100% copied from this PR to the Transformers library:
https://github.com/huggingface/transformers/pull/27557
Author: Saibo-creator
Author GitHub: https://github.com/Saibo-creator
All credits go to the author.
'''
import math
import torch
from transformers.generation.logits_process import LogitsProcessor
from transformers.utils import add_start_docstrings
LOGITS_PROCESSOR_INPUTS_DOCSTRING = r"""
Args:
input_ids (`torch.LongTensor` of shape `(batch_size, sequence_length)`):
Indices of input sequence tokens in the vocabulary. [What are input IDs?](../glossary#input-ids)
scores (`torch.FloatTensor` of shape `(batch_size, config.vocab_size)`):
Prediction scores of a language modeling head. These can be logits for each vocabulary when not using beam
search or log softmax for each vocabulary token when using beam search
Return:
`torch.FloatTensor` of shape `(batch_size, config.vocab_size)`: The processed prediction scores.
"""
class GrammarConstrainedLogitsProcessor(LogitsProcessor):
def __init__(self, grammar_constraint):
self.last_size = None
self.grammar_constraint = grammar_constraint
self.batch_stacks = None
def filter_logits(self, logits, device):
# resolve each stack to a tensor of True/False for each token
# indicating acceptance
# acceptance = self.grammar_acceptor.filter_vocab(self.stacks, device)
acceptance = self.grammar_constraint.batch_filter_vocab(self.batch_stacks, device)
# logger.debug(acceptance)
# Logits to -inf where False
logits[~acceptance] = -math.inf
# TODO: batching
def process_logits(self, input_ids, scores, parse_start_index=None):
"""
:param input_ids:
:param scores:
:param parse_start_index: default None, which means generate from scratch. Set to 0 to parse all input_ids
:return:
"""
# we dynamically create stacks at the first call, so that we know the batch size and beam size
if self.batch_stacks is None:
self.batch_stacks = [self.grammar_constraint.init_stacks() for _ in range(len(input_ids))]
# if self.last_size is not set (which would be the case when processing the first token).
# In this case, do nothing.
if self.last_size is None:
prefix_to_parse = [
single_input_ids[parse_start_index:] if parse_start_index is not None else []
for single_input_ids in input_ids
]
# self.grammar_acceptor.accept_token_ids(prefix_to_parse, self.stacks)
self.batch_stacks = [
self.grammar_constraint.accept_token_ids(prefix, stack)
for prefix, stack in zip(prefix_to_parse, self.batch_stacks)
]
# if the length of the current input IDs (input_ids[0]) is exactly one more than self.last_size.
# This is expected in a scenario where inputs are processed incrementally, one token at a time.
elif len(input_ids[0]) == self.last_size + 1:
# self.stacks = self.grammar_acceptor.accept_token_id(input_ids[0][-1], self.stacks)
self.batch_stacks = [
self.grammar_constraint.accept_token_id(single_input_ids[-1], stack)
for single_input_ids, stack in zip(input_ids, self.batch_stacks)
]
# ensure that the input size is consistent with the expected incremental processing
# (i.e., one token at a time).
else:
# here we check if the input_ids are one token longer than the last time we processed
# but we don't check if input_ids are actually valid.
# Imagine a scenario where we generate 10 tokens, then we replace the 10 generated tokens with 10 new tokens.
# In this case, the input_ids will be consistent with the last_size, but the input_ids are not valid.
# However, should we really check if the input_ids are valid here?
# If we do, then we need to reparse the whole input_ids at each call, which is not efficient.
# Maybe we should just trust the user to provide valid input_ids?
# The conclusion is that, we assume the input_ids are valid, and our generation will be correct.
# If the input_ids are not valid, then the generation result will be wrong and we don't take responsibility for that.
raise RuntimeError(
"Input ID's length is inconsistent with the current state of "
"the GrammarConstrainedLogitsProcessor. If you want to process "
"another input sequence, please instantiate a new "
"GrammarConstrainedLogitsProcessor."
)
self.filter_logits(scores, scores.device)
self.last_size = len(input_ids[0])
return scores
@add_start_docstrings(LOGITS_PROCESSOR_INPUTS_DOCSTRING)
def __call__(self, input_ids: torch.LongTensor, scores: torch.FloatTensor) -> torch.FloatTensor:
return self.process_logits(input_ids, scores)