DaFuc / app.py
Aluode's picture
Upload 6 files
9b3b253 verified
raw
history blame
21 kB
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import random
import json
import logging
import gradio as gr
import time
from openai import OpenAI
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# VAE Class for Latent Space Encoding
class VAE(nn.Module):
def __init__(self, input_dim, latent_dim):
super().__init__()
self.encoder = nn.Sequential(
nn.Linear(input_dim, 128),
nn.ReLU(),
nn.Linear(128, latent_dim * 2)
)
self.decoder = nn.Sequential(
nn.Linear(latent_dim, 128),
nn.ReLU(),
nn.Linear(128, input_dim)
)
self.latent_dim = latent_dim
def forward(self, x):
mu_logvar = self.encoder(x)
mu, logvar = torch.chunk(mu_logvar, 2, dim=-1)
z = self.sample_latent(mu, logvar)
recon = self.decoder(z)
return recon, mu, logvar, z
def sample_latent(self, mu, logvar):
std = torch.exp(0.5 * logvar)
eps = torch.randn_like(std)
return mu + eps * std
def vae_loss(self, recon_x, x, mu, logvar):
recon_loss = nn.MSELoss()(recon_x, x)
kld = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
return recon_loss + kld
# FractalNode class for FUCWM with Attention
class FractalNode(nn.Module):
def __init__(self, input_dim, output_dim, depth=0, max_depth=5, max_children=2):
super().__init__()
self.traditional_weight = nn.Linear(input_dim, output_dim)
nn.init.xavier_uniform_(self.traditional_weight.weight)
self.superweight = nn.Parameter(torch.eye(output_dim))
self.norm = nn.LayerNorm(output_dim)
self._children = []
self.is_active = True
self.max_children = max_children
self.complexity_threshold = 0.5
self.depth = depth
self.max_depth = max_depth
self.attention_weights = nn.Parameter(torch.ones(max_children))
def forward(self, x):
if x.dim() == 1:
x = x.unsqueeze(0)
base_output = self.traditional_weight(x)
base_output = self.norm(base_output)
complexity = self.calculate_complexity(base_output)
if complexity > self.complexity_threshold and len(self._children) < self.max_children and self.depth < self.max_depth:
new_child = FractalNode(self.traditional_weight.out_features, self.traditional_weight.out_features,
depth=self.depth+1, max_depth=self.max_depth)
self._children.append(new_child)
self.add_module(f'child_{len(self._children)}', new_child)
modulated_output = torch.matmul(self.superweight, base_output.unsqueeze(-1)).squeeze(-1)
for i, child in enumerate(self._children):
if child.is_active:
child_output = child(modulated_output)
modulated_output = modulated_output + child_output * F.softmax(self.attention_weights, dim=0)[i]
return modulated_output
def calculate_complexity(self, output):
return torch.log(1 + torch.norm(output))
def calculate_relevance(self, child_output):
return torch.sigmoid(torch.sum(child_output * self.superweight))
def update_superweights(self, context):
context_influence = torch.tanh(torch.matmul(self.superweight, context.unsqueeze(-1))).squeeze(-1)
self.superweight.data = self.superweight.data + 0.01 * context_influence
for child in self._children:
if child.is_active:
child.update_superweights(context)
def grow(self, complexity_threshold):
if self.calculate_complexity(self.traditional_weight.weight) > complexity_threshold and len(self._children) < self.max_children and self.depth < self.max_depth:
new_child = FractalNode(self.traditional_weight.out_features, self.traditional_weight.out_features,
depth=self.depth+1, max_depth=self.max_depth)
self._children.append(new_child)
self.add_module(f'child_{len(self._children)}', new_child)
for child in self._children:
child.grow(complexity_threshold)
def update_attention(self, co_activation_vector):
self.attention_weights.data += co_activation_vector[:len(self._children)]
self.attention_weights.data = F.softmax(self.attention_weights, dim=0)
@property
def complexity(self):
return torch.norm(self.superweight)
@property
def children(self):
return self._children
# FUCWM class with Attention
class FUCWM(nn.Module):
def __init__(self, vocab_size, embed_dim, output_dim, max_depth=5):
super().__init__()
self.word_embeddings = nn.Embedding(vocab_size, embed_dim)
self.root = FractalNode(embed_dim, output_dim, max_depth=max_depth)
self.max_depth = max_depth
self.co_activation_matrix = torch.zeros((max_depth, max_depth))
def forward(self, x):
if x.dtype == torch.long:
embedded = self.word_embeddings(x)
if embedded.dim() == 3:
embedded = embedded.mean(dim=1)
else:
embedded = x
output = self.root(embedded)
self.update_co_activations()
return output
def grow(self, complexity_threshold):
self.root.grow(complexity_threshold)
def update_superweights(self, context):
self.root.update_superweights(context)
def manage_padding(self):
def _manage_padding(node, depth):
if depth >= self.max_depth:
node.is_active = False
else:
activation = torch.norm(node.superweight)
if not node.is_active and activation > 0.5:
node.is_active = True
elif node.is_active and activation < 0.1:
node.is_active = False
for child in node.children:
_manage_padding(child, depth + 1)
_manage_padding(self.root, 0)
def update_co_activations(self):
for i in range(self.max_depth):
for j in range(self.max_depth):
if i != j:
self.co_activation_matrix[i][j] += 0.1 * random.random()
self.co_activation_matrix = F.softmax(self.co_activation_matrix, dim=1)
def update_attention_weights(self):
def update_node(node, depth):
node.update_attention(self.co_activation_matrix[depth])
for child in node.children:
update_node(child, depth+1)
update_node(self.root, 0)
class DynamicAI:
def __init__(self, vocab_size=10000, embed_dim=64, latent_dim=64, output_dim=64, max_depth=5):
self.vae = VAE(embed_dim, latent_dim)
self.model = FUCWM(vocab_size, embed_dim, output_dim, max_depth)
self.optimizer = optim.Adam(list(self.vae.parameters()) + list(self.model.parameters()), lr=0.0001)
self.scheduler = optim.lr_scheduler.StepLR(self.optimizer, step_size=5, gamma=0.1)
self.criterion = nn.MSELoss()
self.word_to_index = {}
self.index_to_word = {}
self.next_index = 0
self.lm_studio_client = OpenAI(base_url="http://localhost:1234/v1", api_key="lm-studio")
def tokenize(self, text):
words = text.lower().split()
indices = []
for word in words:
if word not in self.word_to_index:
self.word_to_index[word] = self.next_index
self.index_to_word[self.next_index] = word
self.next_index += 1
indices.append(self.word_to_index[word])
return torch.tensor(indices, dtype=torch.long).unsqueeze(0)
def chat(self, input_text, max_length=20, temperature=0.7):
input_tokens = self.tokenize(input_text)
thinking_process = []
with torch.no_grad():
embedded_q = self.model.word_embeddings(input_tokens)
_, _, _, z_q = self.vae(embedded_q.mean(dim=1))
output, node_info = self.fractal_thinking(z_q)
thinking_process.append(node_info)
response = []
for _ in range(max_length):
output = output / temperature
probs = torch.softmax(output, dim=-1)
next_word_index = torch.multinomial(probs, 1).item()
next_word = self.index_to_word.get(next_word_index, "")
if next_word:
response.append(next_word)
if next_word in ['.', '!', '?']:
break
next_token = self.tokenize(next_word)
_, _, _, next_latent = self.vae(self.model.word_embeddings(next_token).mean(dim=1))
output, node_info = self.fractal_thinking(next_latent)
thinking_process.append(node_info)
else:
break
thinking_str = "\n".join(thinking_process)
response_str = ' '.join(response)
return f"Thinking Process:\n{thinking_str}\n\nResponse: {response_str}"
def fractal_thinking(self, input_vector):
def traverse_node(node, x, depth):
node_info = f"Node depth: {depth}, Complexity: {node.complexity.item():.4f}, Children: {len(node.children)}"
output = node(x)
if depth < node.max_depth:
for child in node.children:
child_output, child_info = traverse_node(child, output, depth + 1)
output = output + child_output * node.calculate_relevance(child_output)
node_info += f"\n{child_info}"
return output, node_info
output, node_info = traverse_node(self.model.root, input_vector, 0)
return output, node_info
def talk_with_lm_studio(self, initial_message, conversation_duration=60, delay=2):
message = initial_message
start_time = time.time()
conversation_log = []
while time.time() - start_time < conversation_duration:
ai_response = self.chat(message)
logger.info(f"DynamicAI:\n{ai_response}")
conversation_log.append(f"DynamicAI:\n{ai_response}")
yield "\n\n".join(conversation_log)
ai_message = ai_response.split("Response: ")[-1].strip()
if not ai_message:
logger.info("DynamicAI generated an empty response. Skipping LM Studio turn.")
conversation_log.append("DynamicAI: [No response generated. Still learning...]")
yield "\n\n".join(conversation_log)
time.sleep(delay)
continue
lm_studio_response = self.send_to_lm_studio(ai_message)
if lm_studio_response:
logger.info(f"LM Studio: {lm_studio_response}")
conversation_log.append(f"LM Studio: {lm_studio_response}")
message = lm_studio_response
yield "\n\n".join(conversation_log)
else:
logger.warning("No response from LM Studio. Ending conversation.")
break
time.sleep(delay)
def send_to_lm_studio(self, message):
if not message.strip():
logger.warning("Attempted to send an empty message to LM Studio. Skipping.")
return None
try:
completion = self.lm_studio_client.chat.completions.create(
model="unsloth/Llama-3.2-3B-Instruct-GGUF",
messages=[
{"role": "system", "content": "You're talking to an experimental fractal AI that is still learning to communicate. If it doesn't respond or sends empty messages, please be patient and continue the conversation."},
{"role": "user", "content": message}
],
temperature=0.7,
)
response = completion.choices[0].message.content
return response
except Exception as e:
logger.error(f"Error sending to LM Studio: {str(e)}")
return None
def train_on_qa_pairs(self, qa_pairs, epochs=10):
if not isinstance(qa_pairs, list) or len(qa_pairs) == 0:
raise ValueError("qa_pairs must be a non-empty list")
logger.info(f"Training on {len(qa_pairs)} Q&A pairs for {epochs} epochs...")
for epoch in range(epochs):
total_loss = 0
errors = 0
random.shuffle(qa_pairs)
for i, (question, answer) in enumerate(qa_pairs):
self.optimizer.zero_grad()
try:
q_tokens = self.tokenize(question)
a_tokens = self.tokenize(answer)
q_embedded = self.model.word_embeddings(q_tokens)
_, _, _, q_latent = self.vae(q_embedded.mean(dim=1))
a_embedded = self.model.word_embeddings(a_tokens)
_, _, _, a_latent = self.vae(a_embedded.mean(dim=1))
q_output = self.model(q_latent)
a_output = self.model(a_latent)
loss = self.criterion(q_output, a_output)
loss.backward()
torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=1.0)
self.optimizer.step()
total_loss += loss.item()
self.model.grow(complexity_threshold=0.5)
self.model.update_superweights(q_output.detach())
self.model.manage_padding()
self.model.update_attention_weights()
if i % 10 == 0:
logger.info(f"Epoch {epoch+1}, Pair {i+1}/{len(qa_pairs)}, Loss: {loss.item():.4f}")
except Exception as e:
logger.error(f"Error processing pair: {question} | {answer}")
logger.error(f"Error details: {str(e)}")
errors += 1
continue
avg_loss = total_loss / (len(qa_pairs) - errors) if len(qa_pairs) > errors else 0
logger.info(f"Epoch {epoch+1}/{epochs}, Average Loss: {avg_loss:.4f}, Errors: {errors}")
self.scheduler.step()
self.save_state(f"model_state_epoch_{epoch+1}.pth")
yield epoch + 1, avg_loss, errors
def save_state(self, filename):
state = {
'model_state': self.model.state_dict(),
'vae_state': self.vae.state_dict(),
'optimizer_state': self.optimizer.state_dict(),
'scheduler_state': self.scheduler.state_dict(),
'word_to_index': self.word_to_index,
'index_to_word': self.index_to_word,
'next_index': self.next_index
}
torch.save(state, filename)
logger.info(f"Model state saved to {filename}")
def load_state(self, filename):
state = torch.load(filename)
self.word_to_index = state['word_to_index']
self.index_to_word = state['index_to_word']
self.next_index = state['next_index']
self.rebuild_model_structure(state['model_state'])
self.model.load_state_dict(state['model_state'])
self.vae.load_state_dict(state['vae_state'])
self.optimizer.load_state_dict(state['optimizer_state'])
self.scheduler.load_state_dict(state['scheduler_state'])
logger.info(f"Model state loaded from {filename}")
def rebuild_model_structure(self, state_dict):
def rebuild_node(node, prefix):
child_indices = set()
for name in state_dict.keys():
if name.startswith(prefix):
parts = name[len(prefix):].split('.')
if parts[0].startswith('child_'):
child_index = int(parts[0].split('_')[1])
child_indices.add(child_index)
for index in sorted(child_indices):
while len(node._children) < index:
new_child = FractalNode(node.traditional_weight.out_features,
node.traditional_weight.out_features,
depth=node.depth+1,
max_depth=node.max_depth)
node._children.append(new_child)
node.add_module(f'child_{len(node._children)}', new_child)
child_prefix = f"{prefix}child_{index}."
rebuild_node(node._children[index-1], child_prefix)
rebuild_node(self.model.root, "root.")
def grow(self, complexity_threshold):
self.model.grow(complexity_threshold)
def update_superweights(self, context):
self.model.update_superweights(context)
def manage_padding(self):
self.model.manage_padding()
# Gradio Interface for DynamicAI
def create_gradio_interface(ai):
def handle_chat(message, temperature):
return ai.chat(message, temperature=float(temperature))
def handle_save(filename):
ai.save_state(filename)
return f"State saved to {filename}"
def handle_load(filename):
ai.load_state(filename)
return f"State loaded from {filename}"
def handle_train_qa(qa_pairs_file, epochs):
try:
with open(qa_pairs_file.name, 'r', encoding='utf-8') as f:
qa_pairs = json.load(f)
output = ["Starting training..."]
for epoch, loss, errors in ai.train_on_qa_pairs(qa_pairs, epochs=int(epochs)):
output.append(f"Epoch {epoch}/{epochs}, Loss: {loss:.4f}, Errors: {errors}")
output.append("Training completed successfully")
return "\n".join(output)
except Exception as e:
return f"Error during training: {str(e)}"
def handle_lm_studio_chat(initial_message, duration, delay):
conversation_log = gr.Textbox()
for log in ai.talk_with_lm_studio(initial_message, conversation_duration=float(duration), delay=float(delay)):
conversation_log = log
yield conversation_log
with gr.Blocks() as interface:
gr.Markdown("# Dynamic AI with Fractal Universe Chocolate Wafer Model and Attention Mechanism")
with gr.Tab("Chat"):
chat_input = gr.Textbox(label="Your message")
temperature = gr.Slider(minimum=0.1, maximum=2.0, value=0.7, label="Temperature")
chat_output = gr.Textbox(label="AI response")
chat_button = gr.Button("Chat")
chat_button.click(handle_chat, inputs=[chat_input, temperature], outputs=chat_output)
with gr.Tab("LM Studio Conversation"):
initial_message = gr.Textbox(label="Initial Message")
duration = gr.Number(label="Conversation Duration (seconds)", value=60)
delay = gr.Number(label="Delay between messages (seconds)", value=2)
conversation_log = gr.Textbox(label="Conversation Log", lines=20)
start_conversation = gr.Button("Start Conversation")
start_conversation.click(handle_lm_studio_chat, inputs=[initial_message, duration, delay], outputs=conversation_log)
with gr.Tab("Train on Q&A"):
qa_file = gr.File(label="Q&A Pairs JSON File")
epochs_input = gr.Number(label="Number of Epochs", value=10)
train_button = gr.Button("Train on Q&A Pairs")
train_output = gr.Textbox(label="Training status")
train_button.click(handle_train_qa, inputs=[qa_file, epochs_input], outputs=train_output)
with gr.Tab("Save/Load State"):
filename_input = gr.Textbox(label="Filename")
save_button = gr.Button("Save State")
load_button = gr.Button("Load State")
state_output = gr.Textbox(label="Operation result")
save_button.click(handle_save, inputs=filename_input, outputs=state_output)
load_button.click(handle_load, inputs=filename_input, outputs=state_output)
return interface
# Main execution
if __name__ == "__main__":
dynamic_ai = DynamicAI(vocab_size=50000, embed_dim=256, latent_dim=256, output_dim=256, max_depth=7)
iface = create_gradio_interface(dynamic_ai)
iface.launch()