Spaces:
Sleeping
Sleeping
import gradio as gr | |
import datetime | |
from typing import Dict, List, Union, Optional | |
import random | |
from huggingface_hub import InferenceClient | |
import logging | |
import json | |
# Set up logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - %(message)s' | |
) | |
logger = logging.getLogger(__name__) | |
class TravelPlanner: | |
def __init__(self): | |
self.accommodation_types = { | |
'luxury': (200, 500), | |
'mid_range': (100, 200), | |
'budget': (50, 100), | |
'hostel': (20, 50) | |
} | |
self.activity_costs = { | |
'sightseeing': (20, 50), | |
'museum': (15, 25), | |
'adventure_sport': (50, 200), | |
'local_tour': (30, 80), | |
'cultural_experience': (25, 60) | |
} | |
self.meal_costs = { | |
'luxury': (50, 100), | |
'mid_range': (20, 50), | |
'budget': (10, 20), | |
'street_food': (5, 10) | |
} | |
def validate_inputs(self, | |
destination: str, | |
num_days: int, | |
budget_level: str, | |
num_people: int) -> tuple[bool, str]: | |
""" | |
Validate input parameters | |
""" | |
if not destination or len(destination.strip()) == 0: | |
return False, "Destination cannot be empty" | |
if num_days < 1 or num_days > 30: | |
return False, "Number of days must be between 1 and 30" | |
if budget_level not in self.accommodation_types: | |
return False, f"Budget level must be one of: {', '.join(self.accommodation_types.keys())}" | |
if num_people < 1 or num_people > 10: | |
return False, "Number of people must be between 1 and 10" | |
return True, "" | |
def generate_itinerary(self, | |
destination: str, | |
num_days: int, | |
budget_level: str) -> List[Dict]: | |
""" | |
Generate a daily itinerary based on destination and budget level | |
""" | |
try: | |
activities = [ | |
'Morning sightseeing', | |
'Museum visit', | |
'Local market tour', | |
'Cultural workshop', | |
'Historical site visit', | |
'Nature walk', | |
'Local neighborhood exploration', | |
'Evening entertainment' | |
] | |
itinerary = [] | |
for day in range(1, num_days + 1): | |
# Ensure no duplicate activities in same day | |
day_activities = random.sample(activities, 3) | |
daily_schedule = { | |
'day': day, | |
'morning': day_activities[0], | |
'afternoon': day_activities[1], | |
'evening': day_activities[2], | |
'accommodation': f"{budget_level} accommodation" | |
} | |
itinerary.append(daily_schedule) | |
return itinerary | |
except Exception as e: | |
logger.error(f"Error generating itinerary: {str(e)}") | |
raise | |
def calculate_budget(self, | |
num_days: int, | |
budget_level: str, | |
num_people: int) -> Dict: | |
""" | |
Calculate estimated budget based on duration and comfort level | |
""" | |
try: | |
accommodation_range = self.accommodation_types[budget_level] | |
meal_range = self.meal_costs[budget_level] | |
# Use median values for more stable estimates | |
daily_accommodation = sum(accommodation_range) / 2 | |
daily_meals = (sum(meal_range) / 2) * 3 # 3 meals per day | |
daily_activities = 65 # Median activity cost | |
daily_transport = 30 # Median transport cost | |
total_accommodation = daily_accommodation * num_days | |
total_meals = daily_meals * num_days * num_people | |
total_activities = daily_activities * num_days * num_people | |
total_transport = daily_transport * num_days * num_people | |
# Add 15% buffer for unexpected expenses | |
subtotal = total_accommodation + total_meals + total_activities + total_transport | |
buffer = subtotal * 0.15 | |
total_cost = subtotal + buffer | |
return { | |
'accommodation': round(total_accommodation, 2), | |
'meals': round(total_meals, 2), | |
'activities': round(total_activities, 2), | |
'transport': round(total_transport, 2), | |
'buffer': round(buffer, 2), | |
'total': round(total_cost, 2), | |
'per_person': round(total_cost / num_people, 2) | |
} | |
except Exception as e: | |
logger.error(f"Error calculating budget: {str(e)}") | |
raise | |
def format_output(self, | |
destination: str, | |
itinerary: List[Dict], | |
budget: Dict) -> str: | |
""" | |
Format the itinerary and budget into a readable string | |
""" | |
try: | |
output = [f"π Travel Plan for {destination}", ""] | |
output.append("ποΈ ITINERARY") | |
output.append("=" * 20) | |
for day in itinerary: | |
output.extend([ | |
f"Day {day['day']}:", | |
f"π Morning: {day['morning']}", | |
f"βοΈ Afternoon: {day['afternoon']}", | |
f"π Evening: {day['evening']}", | |
f"π Accommodation: {day['accommodation']}", | |
"" | |
]) | |
output.extend([ | |
"π° BUDGET BREAKDOWN", | |
"=" * 20, | |
f"π¨ Accommodation: ${budget['accommodation']:,.2f}", | |
f"π½οΈ Meals: ${budget['meals']:,.2f}", | |
f"π« Activities: ${budget['activities']:,.2f}", | |
f"π Local Transport: ${budget['transport']:,.2f}", | |
f"β οΈ Buffer (15%): ${budget['buffer']:,.2f}", | |
"=" * 20, | |
f"π΅ Total Cost: ${budget['total']:,.2f}", | |
f"π₯ Cost per person: ${budget['per_person']:,.2f}" | |
]) | |
return "\n".join(output) | |
except Exception as e: | |
logger.error(f"Error formatting output: {str(e)}") | |
return "Error formatting travel plan. Please try again." | |
def parse_travel_request(message: str) -> Optional[Dict]: | |
""" | |
Parse travel request message and extract parameters | |
""" | |
try: | |
message = message.lower() | |
if "plan a trip" not in message: | |
return None | |
parts = message.split() | |
destination_idx = parts.index("to") + 1 | |
days_idx = parts.index("days") - 1 | |
budget_idx = parts.index("budget") - 1 | |
people_idx = parts.index("people") - 1 | |
return { | |
"destination": parts[destination_idx].capitalize(), | |
"num_days": int(parts[days_idx]), | |
"budget_level": parts[budget_idx], | |
"num_people": int(parts[people_idx]) | |
} | |
except (ValueError, IndexError) as e: | |
logger.warning(f"Error parsing travel request: {str(e)}") | |
return None | |
def respond( | |
message: str, | |
history: List[Dict[str, str]], | |
system_message: str, | |
max_tokens: int, | |
temperature: float, | |
top_p: float | |
) -> str: | |
""" | |
Process chat message and generate travel plan if requested | |
""" | |
try: | |
# Check if this is a travel planning request | |
travel_params = parse_travel_request(message) | |
if travel_params: | |
planner = TravelPlanner() | |
# Validate inputs | |
is_valid, error_msg = planner.validate_inputs( | |
travel_params["destination"], | |
travel_params["num_days"], | |
travel_params["budget_level"], | |
travel_params["num_people"] | |
) | |
if not is_valid: | |
return f"Error: {error_msg}\n\nPlease use the format: 'Plan a trip to [destination] for [X] days, [budget_level] budget, [X] people'" | |
try: | |
# Generate travel plan | |
itinerary = planner.generate_itinerary( | |
travel_params["destination"], | |
travel_params["num_days"], | |
travel_params["budget_level"] | |
) | |
budget = planner.calculate_budget( | |
travel_params["num_days"], | |
travel_params["budget_level"], | |
travel_params["num_people"] | |
) | |
return planner.format_output( | |
travel_params["destination"], | |
itinerary, | |
budget | |
) | |
except Exception as e: | |
logger.error(f"Error generating travel plan: {str(e)}") | |
return "Sorry, there was an error generating your travel plan. Please try again." | |
# If not a travel request, use the Hugging Face model | |
try: | |
client = InferenceClient("HuggingFaceH4/zephyr-7b-beta") | |
messages = [{"role": "system", "content": system_message}] | |
for msg in history: | |
if isinstance(msg, dict) and "role" in msg and "content" in msg: | |
messages.append(msg) | |
messages.append({"role": "user", "content": message}) | |
response = "" | |
for token in client.chat_completion( | |
messages, | |
max_tokens=max_tokens, | |
stream=True, | |
temperature=temperature, | |
top_p=top_p, | |
): | |
if hasattr(token.choices[0].delta, 'content'): | |
response += token.choices[0].delta.content or "" | |
return response | |
except Exception as e: | |
logger.error(f"Error with chat model: {str(e)}") | |
return "I apologize, but I'm having trouble connecting to the chat service. You can still use me for travel planning by saying 'Plan a trip to [destination]...'" | |
except Exception as e: | |
logger.error(f"Unexpected error in respond function: {str(e)}") | |
return "An unexpected error occurred. Please try again." | |
# Create the Gradio interface | |
demo = gr.ChatInterface( | |
respond, | |
additional_inputs=[ | |
gr.Textbox( | |
value="You are a travel planning assistant who can also chat about other topics.", | |
label="System message" | |
), | |
gr.Slider( | |
minimum=1, | |
maximum=2048, | |
value=512, | |
step=1, | |
label="Max new tokens" | |
), | |
gr.Slider( | |
minimum=0.1, | |
maximum=4.0, | |
value=0.7, | |
step=0.1, | |
label="Temperature" | |
), | |
gr.Slider( | |
minimum=0.1, | |
maximum=1.0, | |
value=0.95, | |
step=0.05, | |
label="Top-p (nucleus sampling)" | |
), | |
] | |
) | |
# Launch the application | |
if __name__ == "__main__": | |
try: | |
demo.launch() | |
except Exception as e: | |
logger.error(f"Error launching Gradio interface: {str(e)}") |