File size: 9,116 Bytes
626eca0 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 |
import logging
import os
from pathlib import Path
from typing import Any, Dict, List
import torch
import transformers as tr
from torch.utils.data import IterableDataset
from transformers import AutoConfig
from relik.common.log import get_console_logger, get_logger
from relik.common.utils import get_callable_from_string
from relik.reader.pytorch_modules.hf.modeling_relik import (
RelikReaderConfig,
RelikReaderSample,
)
console_logger = get_console_logger()
logger = get_logger(__name__, level=logging.INFO)
class RelikReaderBase(torch.nn.Module):
default_reader_class: str | None = None
default_data_class: str | None = None
def __init__(
self,
transformer_model: str | tr.PreTrainedModel | None = None,
additional_special_symbols: int = 0,
num_layers: int | None = None,
activation: str = "gelu",
linears_hidden_size: int | None = 512,
use_last_k_layers: int = 1,
training: bool = False,
device: str | torch.device | None = None,
precision: int = 32,
tokenizer: str | tr.PreTrainedTokenizer | None = None,
dataset: IterableDataset | str | None = None,
default_reader_class: tr.PreTrainedModel | str | None = None,
**kwargs,
) -> None:
super().__init__()
self.default_reader_class = default_reader_class or self.default_reader_class
if self.default_reader_class is None:
raise ValueError("You must specify a default reader class.")
# get the callable for the default reader class
self.default_reader_class: tr.PreTrainedModel = get_callable_from_string(
self.default_reader_class
)
if isinstance(transformer_model, str):
config = AutoConfig.from_pretrained(
transformer_model, trust_remote_code=True
)
if "relik-reader" in config.model_type:
transformer_model = self.default_reader_class.from_pretrained(
transformer_model, **kwargs
)
else:
reader_config = RelikReaderConfig(
transformer_model=transformer_model,
additional_special_symbols=additional_special_symbols,
num_layers=num_layers,
activation=activation,
linears_hidden_size=linears_hidden_size,
use_last_k_layers=use_last_k_layers,
training=training,
)
transformer_model = self.default_reader_class(reader_config)
self.relik_reader_model = transformer_model
self.relik_reader_model_config = self.relik_reader_model.config
# get the tokenizer
self._tokenizer = tokenizer
# and instantiate the dataset class
self.dataset: IterableDataset | None = dataset
# move the model to the device
self.to(device or torch.device("cpu"))
# set the precision
self.precision = precision
def forward(self, **kwargs) -> Dict[str, Any]:
return self.relik_reader_model(**kwargs)
def _read(self, *args, **kwargs) -> Any:
raise NotImplementedError
@torch.no_grad()
@torch.inference_mode()
def read(
self,
text: List[str] | List[List[str]] | None = None,
samples: List[RelikReaderSample] | None = None,
input_ids: torch.Tensor | None = None,
attention_mask: torch.Tensor | None = None,
token_type_ids: torch.Tensor | None = None,
prediction_mask: torch.Tensor | None = None,
special_symbols_mask: torch.Tensor | None = None,
candidates: List[List[str]] | None = None,
max_length: int = 1000,
max_batch_size: int = 128,
token_batch_size: int = 2048,
precision: int | str | None = None,
progress_bar: bool = False,
*args,
**kwargs,
) -> List[RelikReaderSample] | List[List[RelikReaderSample]]:
"""
Reads the given text.
Args:
text (:obj:`List[str]` or :obj:`List[List[str]]`, `optional`):
The text to read in tokens. If a list of list of tokens is provided, each
inner list is considered a sentence.
samples (:obj:`List[RelikReaderSample]`, `optional`):
The samples to read. If provided, `text` and `candidates` are ignored.
input_ids (:obj:`torch.Tensor`, `optional`):
The input ids of the text.
attention_mask (:obj:`torch.Tensor`, `optional`):
The attention mask of the text.
token_type_ids (:obj:`torch.Tensor`, `optional`):
The token type ids of the text.
prediction_mask (:obj:`torch.Tensor`, `optional`):
The prediction mask of the text.
special_symbols_mask (:obj:`torch.Tensor`, `optional`):
The special symbols mask of the text.
candidates (:obj:`List[List[str]]`, `optional`):
The candidates of the text.
max_length (:obj:`int`, `optional`, defaults to 1024):
The maximum length of the text.
max_batch_size (:obj:`int`, `optional`, defaults to 128):
The maximum batch size.
token_batch_size (:obj:`int`, `optional`):
The maximum number of tokens per batch.
precision (:obj:`int` or :obj:`str`, `optional`):
The precision to use. If not provided, the default is 32 bit.
progress_bar (:obj:`bool`, `optional`, defaults to :obj:`False`):
Whether to show a progress bar.
Returns:
The predicted labels for each sample.
"""
if text is None and input_ids is None and samples is None:
raise ValueError(
"Either `text` or `input_ids` or `samples` must be provided."
)
if (input_ids is None and samples is None) and (
text is None or candidates is None
):
raise ValueError(
"`text` and `candidates` must be provided to return the predictions when "
"`input_ids` and `samples` is not provided."
)
if text is not None and samples is None:
if len(text) != len(candidates):
raise ValueError("`text` and `candidates` must have the same length.")
if isinstance(text[0], str): # change to list of text
text = [text]
candidates = [candidates]
samples = [
RelikReaderSample(tokens=t, candidates=c)
for t, c in zip(text, candidates)
]
return self._read(
samples,
input_ids,
attention_mask,
token_type_ids,
prediction_mask,
special_symbols_mask,
max_length,
max_batch_size,
token_batch_size,
precision or self.precision,
progress_bar,
*args,
**kwargs,
)
@property
def device(self) -> torch.device:
"""
The device of the model.
"""
return next(self.parameters()).device
@property
def tokenizer(self) -> tr.PreTrainedTokenizer:
"""
The tokenizer.
"""
if self._tokenizer:
return self._tokenizer
self._tokenizer = tr.AutoTokenizer.from_pretrained(
self.relik_reader_model.config.name_or_path
)
return self._tokenizer
def save_pretrained(
self,
output_dir: str | os.PathLike,
model_name: str | None = None,
push_to_hub: bool = False,
**kwargs,
) -> None:
"""
Saves the model to the given path.
Args:
output_dir (`str` or :obj:`os.PathLike`):
The path to save the model to.
model_name (`str`, `optional`):
The name of the model. If not provided, the model will be saved as
`default_reader_class.__name__`.
push_to_hub (`bool`, `optional`, defaults to `False`):
Whether to push the model to the HuggingFace Hub.
**kwargs:
Additional keyword arguments to pass to the `save_pretrained` method
"""
# create the output directory
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
model_name = model_name or self.default_reader_class.__name__
logger.info(f"Saving reader to {output_dir / model_name}")
# save the model
self.relik_reader_model.register_for_auto_class()
self.relik_reader_model.save_pretrained(
output_dir / model_name, push_to_hub=push_to_hub, **kwargs
)
if self.tokenizer:
logger.info("Saving also the tokenizer")
self.tokenizer.save_pretrained(
output_dir / model_name, push_to_hub=push_to_hub, **kwargs
)
|