import textwrap from fastapi import FastAPI from pydantic import BaseModel from dotenv import load_dotenv import asyncio from api.external_services import InitiazlizeGithubService, InitiazlizeActiveloopService # Load environment variables load_dotenv() github_service = InitiazlizeGithubService() activeloop_service = InitiazlizeActiveloopService() async def process_file(owner, repo, file_type): docs = github_service.load_repo_data(owner, repo, file_type) activeloop_service.upload_to_activeloop(docs) return docs async def main(): repo_url = "https://github.com/facebookresearch/segment-anything" owner, repo = github_service.parse_github_url(repo_url) file_types = [".py", ".js", ".ts", ".md", "ipynb"] tasks = [] for file_type in file_types: task = process_file(owner, repo, file_type) tasks.append(task) results = await asyncio.gather(*tasks) # docs = github_service.load_repo_data(owner, repo, file_type) # activeloop_service.upload_to_activeloop(docs) print(results) # activeloop_service.upload_to_activeloop(docs1) if __name__ == "__main__": asyncio.run(main()) # # upload # owner, repo = github_service.parse_github_url(repo_url) # docs = github_service.load_repo_data(owner, repo) # activeloop_service.upload_to_activeloop(docs) # # retrieve # intro_question = "what this code is talking about?" # answer = activeloop_service.query_engine.query(intro_question) # print(answer.__dict__)