|
from fastapi import FastAPI, HTTPException, Header, Depends, Request, Response, Query |
|
from fastapi.responses import JSONResponse |
|
from fastapi.security import HTTPBasic, HTTPBasicCredentials |
|
from fastapi.exceptions import RequestValidationError |
|
import asyncio |
|
from typing import Optional, List |
|
from pydantic import BaseModel, ValidationError |
|
import pandas as pd |
|
import numpy as np |
|
import os |
|
from filesplit.merge import Merge |
|
import tensorflow as tf |
|
import string |
|
import re |
|
import json |
|
import csv |
|
import tiktoken |
|
from sklearn.preprocessing import LabelEncoder |
|
from tensorflow import keras |
|
from keras_nlp.layers import TransformerEncoder |
|
from tensorflow.keras import layers |
|
from tensorflow.keras.preprocessing.sequence import pad_sequences |
|
from tensorflow.keras.utils import plot_model |
|
|
|
api = FastAPI() |
|
dataPath = "data" |
|
imagePath = "images" |
|
|
|
|
|
strip_chars = string.punctuation + "¿" |
|
strip_chars = strip_chars.replace("[", "") |
|
strip_chars = strip_chars.replace("]", "") |
|
|
|
def custom_standardization(input_string): |
|
lowercase = tf.strings.lower(input_string) |
|
lowercase=tf.strings.regex_replace(lowercase, "[à]", "a") |
|
return tf.strings.regex_replace( |
|
lowercase, f"[{re.escape(strip_chars)}]", "") |
|
|
|
def load_vocab(file_path): |
|
with open(file_path, "r", encoding="utf-8") as file: |
|
return file.read().split('\n')[:-1] |
|
|
|
|
|
def decode_sequence_rnn(input_sentence, src, tgt): |
|
global translation_model |
|
|
|
vocab_size = 15000 |
|
sequence_length = 50 |
|
|
|
source_vectorization = layers.TextVectorization( |
|
max_tokens=vocab_size, |
|
output_mode="int", |
|
output_sequence_length=sequence_length, |
|
standardize=custom_standardization, |
|
vocabulary = load_vocab(dataPath+"/vocab_"+src+".txt"), |
|
) |
|
|
|
target_vectorization = layers.TextVectorization( |
|
max_tokens=vocab_size, |
|
output_mode="int", |
|
output_sequence_length=sequence_length + 1, |
|
standardize=custom_standardization, |
|
vocabulary = load_vocab(dataPath+"/vocab_"+tgt+".txt"), |
|
) |
|
|
|
tgt_vocab = target_vectorization.get_vocabulary() |
|
tgt_index_lookup = dict(zip(range(len(tgt_vocab)), tgt_vocab)) |
|
max_decoded_sentence_length = 50 |
|
tokenized_input_sentence = source_vectorization([input_sentence]) |
|
decoded_sentence = "[start]" |
|
for i in range(max_decoded_sentence_length): |
|
tokenized_target_sentence = target_vectorization([decoded_sentence]) |
|
next_token_predictions = translation_model.predict( |
|
[tokenized_input_sentence, tokenized_target_sentence], verbose=0) |
|
sampled_token_index = np.argmax(next_token_predictions[0, i, :]) |
|
sampled_token = tgt_index_lookup[sampled_token_index] |
|
decoded_sentence += " " + sampled_token |
|
if sampled_token == "[end]": |
|
break |
|
return decoded_sentence[8:-6] |
|
|
|
|
|
|
|
|
|
|
|
class TransformerDecoder(layers.Layer): |
|
def __init__(self, embed_dim, dense_dim, num_heads, **kwargs): |
|
super().__init__(**kwargs) |
|
self.embed_dim = embed_dim |
|
self.dense_dim = dense_dim |
|
self.num_heads = num_heads |
|
self.attention_1 = layers.MultiHeadAttention( |
|
num_heads=num_heads, key_dim=embed_dim) |
|
self.attention_2 = layers.MultiHeadAttention( |
|
num_heads=num_heads, key_dim=embed_dim) |
|
self.dense_proj = keras.Sequential( |
|
[layers.Dense(dense_dim, activation="relu"), |
|
layers.Dense(embed_dim),] |
|
) |
|
self.layernorm_1 = layers.LayerNormalization() |
|
self.layernorm_2 = layers.LayerNormalization() |
|
self.layernorm_3 = layers.LayerNormalization() |
|
self.supports_masking = True |
|
|
|
def get_config(self): |
|
config = super().get_config() |
|
config.update({ |
|
"embed_dim": self.embed_dim, |
|
"num_heads": self.num_heads, |
|
"dense_dim": self.dense_dim, |
|
}) |
|
return config |
|
|
|
def get_causal_attention_mask(self, inputs): |
|
input_shape = tf.shape(inputs) |
|
batch_size, sequence_length = input_shape[0], input_shape[1] |
|
i = tf.range(sequence_length)[:, tf.newaxis] |
|
j = tf.range(sequence_length) |
|
mask = tf.cast(i >= j, dtype="int32") |
|
mask = tf.reshape(mask, (1, input_shape[1], input_shape[1])) |
|
mult = tf.concat( |
|
[tf.expand_dims(batch_size, -1), |
|
tf.constant([1, 1], dtype=tf.int32)], axis=0) |
|
return tf.tile(mask, mult) |
|
|
|
def call(self, inputs, encoder_outputs, mask=None): |
|
causal_mask = self.get_causal_attention_mask(inputs) |
|
if mask is not None: |
|
padding_mask = tf.cast( |
|
mask[:, tf.newaxis, :], dtype="int32") |
|
padding_mask = tf.minimum(padding_mask, causal_mask) |
|
else: |
|
padding_mask = mask |
|
attention_output_1 = self.attention_1( |
|
query=inputs, |
|
value=inputs, |
|
key=inputs, |
|
attention_mask=causal_mask) |
|
attention_output_1 = self.layernorm_1(inputs + attention_output_1) |
|
attention_output_2 = self.attention_2( |
|
query=attention_output_1, |
|
value=encoder_outputs, |
|
key=encoder_outputs, |
|
attention_mask=padding_mask, |
|
) |
|
attention_output_2 = self.layernorm_2( |
|
attention_output_1 + attention_output_2) |
|
proj_output = self.dense_proj(attention_output_2) |
|
return self.layernorm_3(attention_output_2 + proj_output) |
|
|
|
class PositionalEmbedding(layers.Layer): |
|
def __init__(self, sequence_length, input_dim, output_dim, **kwargs): |
|
super().__init__(**kwargs) |
|
self.token_embeddings = layers.Embedding( |
|
input_dim=input_dim, output_dim=output_dim) |
|
self.position_embeddings = layers.Embedding( |
|
input_dim=sequence_length, output_dim=output_dim) |
|
self.sequence_length = sequence_length |
|
self.input_dim = input_dim |
|
self.output_dim = output_dim |
|
|
|
def call(self, inputs): |
|
length = tf.shape(inputs)[-1] |
|
positions = tf.range(start=0, limit=length, delta=1) |
|
embedded_tokens = self.token_embeddings(inputs) |
|
embedded_positions = self.position_embeddings(positions) |
|
return embedded_tokens + embedded_positions |
|
|
|
def compute_mask(self, inputs, mask=None): |
|
return tf.math.not_equal(inputs, 0) |
|
|
|
def get_config(self): |
|
config = super(PositionalEmbedding, self).get_config() |
|
config.update({ |
|
"output_dim": self.output_dim, |
|
"sequence_length": self.sequence_length, |
|
"input_dim": self.input_dim, |
|
}) |
|
return config |
|
|
|
def decode_sequence_transf(input_sentence, src, tgt): |
|
global translation_model |
|
|
|
vocab_size = 15000 |
|
sequence_length = 30 |
|
|
|
source_vectorization = layers.TextVectorization( |
|
max_tokens=vocab_size, |
|
output_mode="int", |
|
output_sequence_length=sequence_length, |
|
standardize=custom_standardization, |
|
vocabulary = load_vocab(dataPath+"/vocab_"+src+".txt"), |
|
) |
|
|
|
target_vectorization = layers.TextVectorization( |
|
max_tokens=vocab_size, |
|
output_mode="int", |
|
output_sequence_length=sequence_length + 1, |
|
standardize=custom_standardization, |
|
vocabulary = load_vocab(dataPath+"/vocab_"+tgt+".txt"), |
|
) |
|
|
|
tgt_vocab = target_vectorization.get_vocabulary() |
|
tgt_index_lookup = dict(zip(range(len(tgt_vocab)), tgt_vocab)) |
|
max_decoded_sentence_length = 50 |
|
tokenized_input_sentence = source_vectorization([input_sentence]) |
|
decoded_sentence = "[start]" |
|
for i in range(max_decoded_sentence_length): |
|
tokenized_target_sentence = target_vectorization( |
|
[decoded_sentence])[:, :-1] |
|
predictions = translation_model( |
|
[tokenized_input_sentence, tokenized_target_sentence]) |
|
sampled_token_index = np.argmax(predictions[0, i, :]) |
|
sampled_token = tgt_index_lookup[sampled_token_index] |
|
decoded_sentence += " " + sampled_token |
|
if sampled_token == "[end]": |
|
break |
|
return decoded_sentence[8:-6] |
|
|
|
|
|
|
|
def load_rnn(): |
|
|
|
merge = Merge( dataPath+"/rnn_en-fr_split", dataPath, "seq2seq_rnn-model-en-fr.h5").merge(cleanup=False) |
|
merge = Merge( dataPath+"/rnn_fr-en_split", dataPath, "seq2seq_rnn-model-fr-en.h5").merge(cleanup=False) |
|
rnn_en_fr = keras.models.load_model(dataPath+"/seq2seq_rnn-model-en-fr.h5") |
|
rnn_fr_en = keras.models.load_model(dataPath+"/seq2seq_rnn-model-fr-en.h5") |
|
rnn_en_fr.compile(optimizer="rmsprop", loss="sparse_categorical_crossentropy", metrics=["accuracy"]) |
|
rnn_fr_en.compile(optimizer="rmsprop", loss="sparse_categorical_crossentropy", metrics=["accuracy"]) |
|
return rnn_en_fr, rnn_fr_en |
|
|
|
def load_transformer(): |
|
custom_objects = {"TransformerDecoder": TransformerDecoder, "PositionalEmbedding": PositionalEmbedding} |
|
with keras.saving.custom_object_scope(custom_objects): |
|
transformer_en_fr = keras.models.load_model( "data/transformer-model-en-fr.h5") |
|
transformer_fr_en = keras.models.load_model( "data/transformer-model-fr-en.h5") |
|
merge = Merge( "data/transf_en-fr_weight_split", "data", "transformer-model-en-fr.weights.h5").merge(cleanup=False) |
|
merge = Merge( "data/transf_fr-en_weight_split", "data", "transformer-model-fr-en.weights.h5").merge(cleanup=False) |
|
transformer_en_fr.compile(optimizer="rmsprop", loss="sparse_categorical_crossentropy", metrics=["accuracy"]) |
|
transformer_fr_en.compile(optimizer="rmsprop", loss="sparse_categorical_crossentropy", metrics=["accuracy"]) |
|
|
|
return transformer_en_fr, transformer_fr_en |
|
|
|
rnn_en_fr, rnn_fr_en = load_rnn() |
|
transformer_en_fr, transformer_fr_en = load_transformer() |
|
|
|
|
|
|
|
def encode_text(textes): |
|
global tokenizer |
|
|
|
max_length=250 |
|
sequences = tokenizer.encode_batch(textes) |
|
return pad_sequences(sequences, maxlen=max_length, padding='post') |
|
|
|
def read_list_lan(): |
|
|
|
with open(dataPath+'/multilingue/lan_code.csv', 'r') as fichier_csv: |
|
reader = csv.reader(fichier_csv) |
|
lan_code = next(reader) |
|
return lan_code |
|
|
|
def init_dl_identifier(): |
|
global tokenizer, dl_model, label_encoder, lan_to_language |
|
|
|
tokenizer = tiktoken.get_encoding("cl100k_base") |
|
|
|
with open(dataPath+'/multilingue/lan_to_language.json', 'r') as fichier: |
|
lan_to_language = json.load(fichier) |
|
label_encoder = LabelEncoder() |
|
list_lan = read_list_lan() |
|
lan_identified = [lan_to_language[l] for l in list_lan] |
|
label_encoder.fit(list_lan) |
|
merge = Merge(dataPath+"/dl_id_lang_split", dataPath, "dl_tiktoken_id_language_model.h5").merge(cleanup=False) |
|
dl_model = keras.models.load_model(dataPath+"/dl_tiktoken_id_language_model.h5") |
|
|
|
if (dl_model is not None): print("dl_model OK") |
|
else: print("dl_model vide") |
|
return |
|
|
|
init_dl_identifier() |
|
|
|
def lang_id_dl(sentences): |
|
global dl_model, label_encoder, lan_to_language |
|
|
|
if 'dl_model' not in globals(): |
|
init_dl_identifier() |
|
if "str" in str(type(sentences)): predictions = dl_model.predict(encode_text([sentences])) |
|
else: predictions = dl_model.predict(encode_text(sentences)) |
|
|
|
predicted_labels_encoded = np.argmax(predictions, axis=1) |
|
predicted_languages = label_encoder.classes_[predicted_labels_encoded] |
|
if "str" in str(type(sentences)): return lan_to_language[predicted_languages[0]] |
|
else: return [l for l in predicted_languages] |
|
|
|
|
|
|
|
@api.get('/', name="Vérification que l'API fonctionne") |
|
def check_api(): |
|
load_rnn() |
|
load_transformer() |
|
init_dl_identifier() |
|
return {'message': "L'API fonctionne"} |
|
|
|
@api.get('/small_vocab/rnn', name="Traduction par RNN") |
|
async def trad_rnn(lang_tgt:str, |
|
texte: str): |
|
global translation_model |
|
|
|
if 'translation_model' not in globals(): |
|
load_rnn() |
|
load_transformer() |
|
|
|
if (lang_tgt=='en'): |
|
translation_model = rnn_fr_en |
|
return decode_sequence_rnn(texte, "fr", "en") |
|
else: |
|
translation_model = rnn_en_fr |
|
return decode_sequence_rnn(texte, "en", "fr") |
|
|
|
@api.get('/small_vocab/transformer', name="Traduction par Transformer") |
|
async def trad_transformer(lang_tgt:str, |
|
texte: str): |
|
global translation_model |
|
|
|
if 'translation_model' not in globals(): |
|
load_rnn() |
|
load_transformer() |
|
|
|
if (lang_tgt=='en'): |
|
translation_model = transformer_fr_en |
|
return decode_sequence_transf(texte, "fr", "en") |
|
else: |
|
translation_model = transformer_en_fr |
|
return decode_sequence_transf(texte, "en", "fr") |
|
|
|
@api.get('/small_vocab/plot_model', name="Affiche le modèle") |
|
def affiche_modele(lang_tgt:str, |
|
model_type: str): |
|
global translation_model |
|
|
|
if (lang_tgt=='en'): |
|
if model_type=="rnn": |
|
translation_model = rnn_fr_en |
|
else: |
|
translation_model = transformer_fr_en |
|
else: |
|
if model_type=="rnn": |
|
translation_model = rnn_en_fr |
|
else: |
|
translation_model = transformer_en_fr |
|
plot_model(translation_model, show_shapes=True, show_layer_names=True, show_layer_activations=True,rankdir='TB',to_file=imagePath+'/model_plot.png') |
|
with open(imagePath+'/model_plot.png', "rb") as image_file: |
|
|
|
image_data = image_file.read() |
|
|
|
|
|
return Response(content=image_data, media_type="image/png") |
|
|
|
@api.get('/lang_id_dl', name="Id de langue par DL") |
|
async def language_id_dl(sentence:List[str] = Query(..., min_length=1)): |
|
return lang_id_dl(sentence) |
|
|