Spaces:
Sleeping
Sleeping
import re | |
from fastapi import FastAPI, HTTPException | |
from fastapi.responses import StreamingResponse | |
from fastapi.responses import JSONResponse | |
from pydantic import BaseModel | |
from huggingface_hub import InferenceClient | |
import uvicorn | |
from typing import Generator | |
import json # Asegúrate de que esta línea esté al principio del archivo | |
import nltk | |
import os | |
import google.protobuf # This line should execute without errors if protobuf is installed correctly | |
import sentencepiece | |
from transformers import pipeline, AutoTokenizer,AutoModelForSeq2SeqLM | |
import spacy | |
nltk.data.path.append(os.getenv('NLTK_DATA')) | |
app = FastAPI() | |
# Initialize the InferenceClient with your model | |
client = InferenceClient("mistralai/Mixtral-8x7B-Instruct-v0.1") | |
class Item(BaseModel): | |
prompt: str | |
history: list | |
system_prompt: str | |
temperature: float = 0.8 | |
max_new_tokens: int = 4000 | |
top_p: float = 0.15 | |
repetition_penalty: float = 1.0 | |
def format_prompt(current_prompt, history): | |
formatted_history = "<s>" | |
for entry in history: | |
if entry["role"] == "user": | |
formatted_history += f"[USER] {entry['content']} [/USER]" | |
elif entry["role"] == "assistant": | |
formatted_history += f"[ASSISTANT] {entry['content']} [/ASSISTANT]" | |
formatted_history += f"[USER] {current_prompt} [/USER]</s>" | |
return formatted_history | |
def generate_stream(item: Item) -> Generator[bytes, None, None]: | |
formatted_prompt = format_prompt(f"{item.system_prompt}, {item.prompt}", item.history) | |
# Estimate token count for the formatted_prompt | |
input_token_count = len(nltk.word_tokenize(formatted_prompt)) # NLTK tokenization | |
# Ensure total token count doesn't exceed the maximum limit | |
max_tokens_allowed = 32768 | |
max_new_tokens_adjusted = max(1, min(item.max_new_tokens, max_tokens_allowed - input_token_count)) | |
generate_kwargs = { | |
"temperature": item.temperature, | |
"max_new_tokens": max_new_tokens_adjusted, | |
"top_p": item.top_p, | |
"repetition_penalty": item.repetition_penalty, | |
"do_sample": True, | |
"seed": 42, | |
} | |
# Stream the response from the InferenceClient | |
for response in client.text_generation(formatted_prompt, **generate_kwargs, stream=True, details=True): | |
# This assumes 'details=True' gives you a structure where you can access the text like this | |
chunk = { | |
"text": response.token.text, | |
"complete": response.generated_text is not None # Adjust based on how you detect completion | |
} | |
yield json.dumps(chunk).encode("utf-8") + b"\n" | |
class SummarizeRequest(BaseModel): | |
text: str | |
async def generate_text(item: Item): | |
# Stream response back to the client | |
return StreamingResponse(generate_stream(item), media_type="application/x-ndjson") | |
# Load spaCy model | |
nlp = spacy.load("en_core_web_sm") | |
class TextRequest(BaseModel): | |
text: str | |
def preprocess_text(text: str) -> str: | |
# Normalize whitespace and strip punctuation | |
text = re.sub(r'\s+', ' ', text.strip()) | |
text = re.sub(r'[^\w\s]', '', text) | |
return text | |
def reduce_tokens(text: str): | |
# Process the text with spaCy | |
doc = nlp(text) | |
# Select sentences that might be more important - this is a simple heuristic | |
important_sentences = [] | |
for sent in doc.sents: | |
if any(tok.dep_ == 'ROOT' for tok in sent): | |
important_sentences.append(sent.text) | |
# Join selected sentences to form the reduced text | |
reduced_text = ' '.join(important_sentences) | |
# Tokenize the reduced text to count the tokens | |
reduced_doc = nlp(reduced_text) # Ensure this line is correctly aligned | |
token_count = len(reduced_doc) | |
return reduced_text, token_count | |
def segment_text(text: str, max_tokens=500): # Slightly less than 512 for safety | |
# Use spaCy to divide the document into sentences | |
doc = nlp(text) | |
sentences = [sent.text.strip() for sent in doc.sents] | |
segments = [] | |
current_segment = [] | |
current_length = 0 | |
for sentence in sentences: | |
sentence_length = len(sentence.split()) # Simple word count | |
if current_length + sentence_length > max_tokens: | |
if current_segment: # Make sure there's something to add | |
segments.append(' '.join(current_segment)) | |
current_segment = [sentence] | |
current_length = sentence_length | |
else: | |
current_segment.append(sentence) | |
current_length += sentence_length | |
# Add the last segment if any | |
if current_segment: | |
segments.append(' '.join(current_segment)) | |
return segments | |
classifier = pipeline("text-classification", model="distilbert-base-uncased-finetuned-sst-2-english") | |
def classify_segments(segments): | |
return [classifier(segment) for segment in segments] | |
async def summarize(request: TextRequest): | |
try: | |
# Preprocess and segment the text | |
processed_text = preprocess_text(request.text) | |
segments = segment_text(processed_text) | |
# Classify each segment safely | |
classified_segments = [] | |
for segment in segments: | |
try: | |
result = classifier(segment) | |
classified_segments.append(result) | |
except Exception as e: | |
print(f"Error classifying segment: {e}") | |
classified_segments.append({"error": str(e)}) | |
# Optional: Reduce tokens or summarize | |
reduced_texts = [] | |
for segment in segments: | |
try: | |
reduced_text, token_count = reduce_tokens(segment) | |
reduced_texts.append((reduced_text, token_count)) | |
except Exception as e: | |
print(f"Error during token reduction: {e}") | |
reduced_texts.append(("Error", 0)) | |
return { | |
"classified_segments": classified_segments, | |
"reduced_texts": reduced_texts | |
} | |
except Exception as e: | |
print(f"Error during token reduction: {e}") | |
raise HTTPException(status_code=500, detail=str(e)) | |
if __name__ == "__main__": | |
uvicorn.run(app, host="0.0.0.0", port=8000) | |