import openai import numpy as np from tempfile import NamedTemporaryFile import copy import shapely from shapely.geometry import * from shapely.affinity import * from omegaconf import OmegaConf from moviepy.editor import ImageSequenceClip import gradio as gr from lmp import LMP, LMPFGen from sim import PickPlaceEnv, LMP_wrapper from consts import ALL_BLOCKS, ALL_BOWLS from md_logger import MarkdownLogger import numpy as np import os import hydra import random import re import openai import IPython import time import pybullet as p import traceback from datetime import datetime from pprint import pprint import cv2 import re import random import json from gensim.agent import Agent from gensim.critic import Critic from gensim.sim_runner import SimulationRunner from gensim.memory import Memory from gensim.utils import set_gpt_model, clear_messages class DemoRunner: def __init__(self): self._env = None def setup(self, api_key): openai.api_key = api_key cfg['model_output_dir'] = 'temp' cfg['prompt_folder'] = 'topdown_task_generation_prompt_simple_singleprompt' set_gpt_model(cfg['gpt_model']) cfg['load_memory'] = True cfg['task_description_candidate_num'] = 10 cfg['record']['save_video'] = True memory = Memory(cfg) agent = Agent(cfg, memory) critic = Critic(cfg, memory) self.simulation_runner = SimulationRunner(cfg, agent, critic, memory) info = '### Build' img = np.zeros((720, 640, 0)) return info, img def run(self, instruction): cfg['target_task_name'] = instruction self._env.cache_video = [] self._md_logger.clear() try: self.simulation_runner.task_creation() self.simulation_runner.simulate_task() except Exception as e: return f'Error: {e}', None, None video_file_name = None if self._env.cache_video: rendered_clip = ImageSequenceClip(self._env.cache_video, fps=25) video_file_name = NamedTemporaryFile(suffix='.mp4').name rendered_clip.write_videofile(video_file_name, fps=25) return self.simulation_runner.chat_log, self.simulation_runner.env.curr_video, video_file_name def setup(api_key): if not api_key: return 'Please enter your OpenAI API key!', None, None demo_runner = DemoRunner() info, img = demo_runner.setup(api_key) return info, img, demo_runner def run(instruction, demo_runner): if demo_runner is None: return 'Please run setup first!', None, None return demo_runner.run(instruction) if __name__ == '__main__': with open('README.md', 'r') as f: for _ in range(12): next(f) readme_text = f.read() with gr.Blocks() as demo: state = gr.State(None) gr.Markdown(readme_text) gr.Markdown('# Interactive Demo') with gr.Row(): with gr.Column(): with gr.Row(): inp_api_key = gr.Textbox(label='OpenAI API Key (this is not stored anywhere)', lines=1) btn_setup = gr.Button("Setup/Reset Simulation") info_setup = gr.Markdown(label='Setup Info') with gr.Column(): img_setup = gr.Image(label='Current Simulation') with gr.Row(): with gr.Column(): inp_instruction = gr.Textbox(label='Task Name', lines=1) btn_run = gr.Button("Run (this may take 30+ seconds)") info_run = gr.Markdown(label='Generated Code') with gr.Column(): video_run = gr.Video(label='Video of Last Instruction') btn_setup.click( setup, inputs=[inp_api_key], outputs=[info_setup, img_setup, state] ) btn_run.click( run, inputs=[inp_instruction, state], outputs=[info_run, img_setup, video_run] ) demo.launch(debug=True)