Demosthene-OR's picture
Update main_dl.py
1157ef0
raw
history blame
13.7 kB
from fastapi import FastAPI, HTTPException, Header, Depends, Request, Response, Query
from fastapi.responses import JSONResponse
from fastapi.security import HTTPBasic, HTTPBasicCredentials
from fastapi.exceptions import RequestValidationError
import asyncio
from typing import Optional, List
from pydantic import BaseModel, ValidationError
import pandas as pd
import numpy as np
import os
from filesplit.merge import Merge
import tensorflow as tf
import string
import re
import json
import csv
import tiktoken
from sklearn.preprocessing import LabelEncoder
from tensorflow import keras
# import keras
from keras_nlp.layers import TransformerEncoder
from tensorflow.keras import layers
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.utils import plot_model
api = FastAPI()
dataPath = "data"
imagePath = "images"
# ===== Keras ====
strip_chars = string.punctuation + "¿"
strip_chars = strip_chars.replace("[", "")
strip_chars = strip_chars.replace("]", "")
def custom_standardization(input_string):
lowercase = tf.strings.lower(input_string)
lowercase=tf.strings.regex_replace(lowercase, "[à]", "a")
return tf.strings.regex_replace(
lowercase, f"[{re.escape(strip_chars)}]", "")
def load_vocab(file_path):
with open(file_path, "r", encoding="utf-8") as file:
return file.read().split('\n')[:-1]
def decode_sequence_rnn(input_sentence, src, tgt):
global translation_model
vocab_size = 15000
sequence_length = 50
source_vectorization = layers.TextVectorization(
max_tokens=vocab_size,
output_mode="int",
output_sequence_length=sequence_length,
standardize=custom_standardization,
vocabulary = load_vocab(dataPath+"/vocab_"+src+".txt"),
)
target_vectorization = layers.TextVectorization(
max_tokens=vocab_size,
output_mode="int",
output_sequence_length=sequence_length + 1,
standardize=custom_standardization,
vocabulary = load_vocab(dataPath+"/vocab_"+tgt+".txt"),
)
tgt_vocab = target_vectorization.get_vocabulary()
tgt_index_lookup = dict(zip(range(len(tgt_vocab)), tgt_vocab))
max_decoded_sentence_length = 50
tokenized_input_sentence = source_vectorization([input_sentence])
decoded_sentence = "[start]"
for i in range(max_decoded_sentence_length):
tokenized_target_sentence = target_vectorization([decoded_sentence])
next_token_predictions = translation_model.predict(
[tokenized_input_sentence, tokenized_target_sentence], verbose=0)
sampled_token_index = np.argmax(next_token_predictions[0, i, :])
sampled_token = tgt_index_lookup[sampled_token_index]
decoded_sentence += " " + sampled_token
if sampled_token == "[end]":
break
return decoded_sentence[8:-6]
# ===== Enf of Keras ====
# ===== Transformer section ====
class TransformerDecoder(layers.Layer):
def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
super().__init__(**kwargs)
self.embed_dim = embed_dim
self.dense_dim = dense_dim
self.num_heads = num_heads
self.attention_1 = layers.MultiHeadAttention(
num_heads=num_heads, key_dim=embed_dim)
self.attention_2 = layers.MultiHeadAttention(
num_heads=num_heads, key_dim=embed_dim)
self.dense_proj = keras.Sequential(
[layers.Dense(dense_dim, activation="relu"),
layers.Dense(embed_dim),]
)
self.layernorm_1 = layers.LayerNormalization()
self.layernorm_2 = layers.LayerNormalization()
self.layernorm_3 = layers.LayerNormalization()
self.supports_masking = True
def get_config(self):
config = super().get_config()
config.update({
"embed_dim": self.embed_dim,
"num_heads": self.num_heads,
"dense_dim": self.dense_dim,
})
return config
def get_causal_attention_mask(self, inputs):
input_shape = tf.shape(inputs)
batch_size, sequence_length = input_shape[0], input_shape[1]
i = tf.range(sequence_length)[:, tf.newaxis]
j = tf.range(sequence_length)
mask = tf.cast(i >= j, dtype="int32")
mask = tf.reshape(mask, (1, input_shape[1], input_shape[1]))
mult = tf.concat(
[tf.expand_dims(batch_size, -1),
tf.constant([1, 1], dtype=tf.int32)], axis=0)
return tf.tile(mask, mult)
def call(self, inputs, encoder_outputs, mask=None):
causal_mask = self.get_causal_attention_mask(inputs)
if mask is not None:
padding_mask = tf.cast(
mask[:, tf.newaxis, :], dtype="int32")
padding_mask = tf.minimum(padding_mask, causal_mask)
else:
padding_mask = mask
attention_output_1 = self.attention_1(
query=inputs,
value=inputs,
key=inputs,
attention_mask=causal_mask)
attention_output_1 = self.layernorm_1(inputs + attention_output_1)
attention_output_2 = self.attention_2(
query=attention_output_1,
value=encoder_outputs,
key=encoder_outputs,
attention_mask=padding_mask,
)
attention_output_2 = self.layernorm_2(
attention_output_1 + attention_output_2)
proj_output = self.dense_proj(attention_output_2)
return self.layernorm_3(attention_output_2 + proj_output)
class PositionalEmbedding(layers.Layer):
def __init__(self, sequence_length, input_dim, output_dim, **kwargs):
super().__init__(**kwargs)
self.token_embeddings = layers.Embedding(
input_dim=input_dim, output_dim=output_dim)
self.position_embeddings = layers.Embedding(
input_dim=sequence_length, output_dim=output_dim)
self.sequence_length = sequence_length
self.input_dim = input_dim
self.output_dim = output_dim
def call(self, inputs):
length = tf.shape(inputs)[-1]
positions = tf.range(start=0, limit=length, delta=1)
embedded_tokens = self.token_embeddings(inputs)
embedded_positions = self.position_embeddings(positions)
return embedded_tokens + embedded_positions
def compute_mask(self, inputs, mask=None):
return tf.math.not_equal(inputs, 0)
def get_config(self):
config = super(PositionalEmbedding, self).get_config()
config.update({
"output_dim": self.output_dim,
"sequence_length": self.sequence_length,
"input_dim": self.input_dim,
})
return config
def decode_sequence_tranf(input_sentence, src, tgt):
global translation_model
vocab_size = 15000
sequence_length = 30
source_vectorization = layers.TextVectorization(
max_tokens=vocab_size,
output_mode="int",
output_sequence_length=sequence_length,
standardize=custom_standardization,
vocabulary = load_vocab(dataPath+"/vocab_"+src+".txt"),
)
target_vectorization = layers.TextVectorization(
max_tokens=vocab_size,
output_mode="int",
output_sequence_length=sequence_length + 1,
standardize=custom_standardization,
vocabulary = load_vocab(dataPath+"/vocab_"+tgt+".txt"),
)
tgt_vocab = target_vectorization.get_vocabulary()
tgt_index_lookup = dict(zip(range(len(tgt_vocab)), tgt_vocab))
max_decoded_sentence_length = 50
tokenized_input_sentence = source_vectorization([input_sentence])
decoded_sentence = "[start]"
for i in range(max_decoded_sentence_length):
tokenized_target_sentence = target_vectorization(
[decoded_sentence])[:, :-1]
predictions = translation_model(
[tokenized_input_sentence, tokenized_target_sentence])
sampled_token_index = np.argmax(predictions[0, i, :])
sampled_token = tgt_index_lookup[sampled_token_index]
decoded_sentence += " " + sampled_token
if sampled_token == "[end]":
break
return decoded_sentence[8:-6]
# ==== End Transforformer section ====
def load_all_data():
merge = Merge( dataPath+"/rnn_en-fr_split", dataPath, "seq2seq_rnn-model-en-fr.h5").merge(cleanup=False)
merge = Merge( dataPath+"/rnn_fr-en_split", dataPath, "seq2seq_rnn-model-fr-en.h5").merge(cleanup=False)
rnn_en_fr = keras.models.load_model(dataPath+"/seq2seq_rnn-model-en-fr.h5") # , compile=False)
rnn_fr_en = keras.models.load_model(dataPath+"/seq2seq_rnn-model-fr-en.h5") # , compile=False)
rnn_en_fr.compile(optimizer="rmsprop", loss="sparse_categorical_crossentropy", metrics=["accuracy"])
rnn_fr_en.compile(optimizer="rmsprop", loss="sparse_categorical_crossentropy", metrics=["accuracy"])
custom_objects = {"TransformerDecoder": TransformerDecoder, "PositionalEmbedding": PositionalEmbedding}
with keras.saving.custom_object_scope(custom_objects):
transformer_en_fr = keras.models.load_model( "data/transformer-model-en-fr.h5")
transformer_fr_en = keras.models.load_model( "data/transformer-model-fr-en.h5")
merge = Merge( "data/transf_en-fr_weight_split", "data", "transformer-model-en-fr.weights.h5").merge(cleanup=False)
merge = Merge( "data/transf_fr-en_weight_split", "data", "transformer-model-fr-en.weights.h5").merge(cleanup=False)
transformer_en_fr.compile(optimizer="rmsprop", loss="sparse_categorical_crossentropy", metrics=["accuracy"])
transformer_fr_en.compile(optimizer="rmsprop", loss="sparse_categorical_crossentropy", metrics=["accuracy"])
return rnn_en_fr, rnn_fr_en, transformer_en_fr, transformer_fr_en
rnn_en_fr, rnn_fr_en, transformer_en_fr, transformer_fr_en = load_all_data()
# ==== Language identifier ====
def encode_text(textes):
global tokenizer
max_length=250
sequences = tokenizer.encode_batch(textes)
return pad_sequences(sequences, maxlen=max_length, padding='post')
def read_list_lan():
with open(dataPath+'/multilingue/lan_code.csv', 'r') as fichier_csv:
reader = csv.reader(fichier_csv)
lan_code = next(reader)
return lan_code
def init_dl_identifier():
global tokenizer, dl_model, label_encoder, lan_to_language
tokenizer = tiktoken.get_encoding("cl100k_base")
# Lisez le contenu du fichier JSON
with open(dataPath+'/multilingue/lan_to_language.json', 'r') as fichier:
lan_to_language = json.load(fichier)
label_encoder = LabelEncoder()
list_lan = read_list_lan()
lan_identified = [lan_to_language[l] for l in list_lan]
label_encoder.fit(list_lan)
merge = Merge(dataPath+"/dl_id_lang_split", dataPath, "dl_tiktoken_id_language_model.h5").merge(cleanup=False)
dl_model = keras.models.load_model(dataPath+"/dl_tiktoken_id_language_model.h5") #, compile=False)
if (dl_model is not None): print("dl_model OK")
else: print("dl_model vide")
return
def lang_id_dl(sentences):
global dl_model, label_encoder, lan_to_language
print("sentences:",sentences)
if "str" in str(type(sentences)): predictions = dl_model.predict(encode_text([sentences]))
else: predictions = dl_model.predict(encode_text(sentences))
# Décodage des prédictions en langues
predicted_labels_encoded = np.argmax(predictions, axis=1)
predicted_languages = label_encoder.classes_[predicted_labels_encoded]
if "str" in str(type(sentences)): return lan_to_language[predicted_languages[0]]
else: return [l for l in predicted_languages]
# ==== Endpoints ====
@api.get('/', name="Vérification que l'API fonctionne")
def check_api():
load_all_data()
init_dl_identifier()
return {'message': "L'API fonctionne"}
@api.get('/small_vocab/rnn', name="Traduction par RNN")
async def trad_rnn(lang_tgt:str,
texte: str):
global translation_model
if (lang_tgt=='en'):
translation_model = rnn_fr_en
return decode_sequence_rnn(texte, "fr", "en")
else:
translation_model = rnn_en_fr
return decode_sequence_rnn(texte, "en", "fr")
@api.get('/small_vocab/transformer', name="Traduction par Transformer")
async def trad_transformer(lang_tgt:str,
texte: str):
global translation_model
if (lang_tgt=='en'):
translation_model = transformer_fr_en
return decode_sequence_tranf(texte, "fr", "en")
else:
translation_model = transformer_en_fr
return decode_sequence_tranf(texte, "en", "fr")
@api.get('/small_vocab/plot_model', name="Affiche le modèle")
def affiche_modele(lang_tgt:str,
model_type: str):
global translation_model
if (lang_tgt=='en'):
if model_type=="rnn":
translation_model = rnn_fr_en
else:
translation_model = transformer_fr_en
else:
if model_type=="rnn":
translation_model = rnn_en_fr
else:
translation_model = transformer_en_fr
plot_model(translation_model, show_shapes=True, show_layer_names=True, show_layer_activations=True,rankdir='TB',to_file=imagePath+'/model_plot.png')
with open(imagePath+'/model_plot.png', "rb") as image_file:
# Lire les données de l'image
image_data = image_file.read()
# Retourner l'image en tant que réponse HTTP avec le type de contenu approprié
return Response(content=image_data, media_type="image/png")
@api.get('/lang_id_dl', name="Id de langue par DL")
def language_id_dl(sentence:List[str] = Query(..., min_length=1)):
return lang_id_dl(sentence)