|
import os |
|
import os.path |
|
|
|
|
|
working_directory = "auto_gpt_workspace" |
|
|
|
|
|
if not os.path.exists(working_directory): |
|
os.makedirs(working_directory) |
|
|
|
|
|
def safe_join(base, *paths): |
|
"""Join one or more path components intelligently.""" |
|
new_path = os.path.join(base, *paths) |
|
norm_new_path = os.path.normpath(new_path) |
|
|
|
if os.path.commonprefix([base, norm_new_path]) != base: |
|
raise ValueError("Attempted to access outside of working directory.") |
|
|
|
return norm_new_path |
|
|
|
|
|
def split_file(content, max_length=4000, overlap=0): |
|
""" |
|
Split text into chunks of a specified maximum length with a specified overlap |
|
between chunks. |
|
|
|
:param text: The input text to be split into chunks |
|
:param max_length: The maximum length of each chunk, |
|
default is 4000 (about 1k token) |
|
:param overlap: The number of overlapping characters between chunks, |
|
default is no overlap |
|
:return: A generator yielding chunks of text |
|
""" |
|
start = 0 |
|
content_length = len(content) |
|
|
|
while start < content_length: |
|
end = start + max_length |
|
if end + overlap < content_length: |
|
chunk = content[start : end + overlap] |
|
else: |
|
chunk = content[start:content_length] |
|
yield chunk |
|
start += max_length - overlap |
|
|
|
|
|
def read_file(filename) -> str: |
|
"""Read a file and return the contents""" |
|
try: |
|
filepath = safe_join(working_directory, filename) |
|
with open(filepath, "r", encoding="utf-8") as f: |
|
content = f.read() |
|
return content |
|
except Exception as e: |
|
return f"Error: {str(e)}" |
|
|
|
|
|
def ingest_file(filename, memory, max_length=4000, overlap=200): |
|
""" |
|
Ingest a file by reading its content, splitting it into chunks with a specified |
|
maximum length and overlap, and adding the chunks to the memory storage. |
|
|
|
:param filename: The name of the file to ingest |
|
:param memory: An object with an add() method to store the chunks in memory |
|
:param max_length: The maximum length of each chunk, default is 4000 |
|
:param overlap: The number of overlapping characters between chunks, default is 200 |
|
""" |
|
try: |
|
print(f"Working with file {filename}") |
|
content = read_file(filename) |
|
content_length = len(content) |
|
print(f"File length: {content_length} characters") |
|
|
|
chunks = list(split_file(content, max_length=max_length, overlap=overlap)) |
|
|
|
num_chunks = len(chunks) |
|
for i, chunk in enumerate(chunks): |
|
print(f"Ingesting chunk {i + 1} / {num_chunks} into memory") |
|
memory_to_add = ( |
|
f"Filename: {filename}\n" f"Content part#{i + 1}/{num_chunks}: {chunk}" |
|
) |
|
|
|
memory.add(memory_to_add) |
|
|
|
print(f"Done ingesting {num_chunks} chunks from {filename}.") |
|
except Exception as e: |
|
print(f"Error while ingesting file '{filename}': {str(e)}") |
|
|
|
|
|
def write_to_file(filename, text): |
|
"""Write text to a file""" |
|
try: |
|
filepath = safe_join(working_directory, filename) |
|
directory = os.path.dirname(filepath) |
|
if not os.path.exists(directory): |
|
os.makedirs(directory) |
|
with open(filepath, "w", encoding="utf-8") as f: |
|
f.write(text) |
|
return "File written to successfully." |
|
except Exception as e: |
|
return "Error: " + str(e) |
|
|
|
|
|
def append_to_file(filename, text): |
|
"""Append text to a file""" |
|
try: |
|
filepath = safe_join(working_directory, filename) |
|
with open(filepath, "a") as f: |
|
f.write(text) |
|
return "Text appended successfully." |
|
except Exception as e: |
|
return "Error: " + str(e) |
|
|
|
|
|
def delete_file(filename): |
|
"""Delete a file""" |
|
try: |
|
filepath = safe_join(working_directory, filename) |
|
os.remove(filepath) |
|
return "File deleted successfully." |
|
except Exception as e: |
|
return "Error: " + str(e) |
|
|
|
|
|
def search_files(directory): |
|
found_files = [] |
|
|
|
if directory == "" or directory == "/": |
|
search_directory = working_directory |
|
else: |
|
search_directory = safe_join(working_directory, directory) |
|
|
|
for root, _, files in os.walk(search_directory): |
|
for file in files: |
|
if file.startswith("."): |
|
continue |
|
relative_path = os.path.relpath(os.path.join(root, file), working_directory) |
|
found_files.append(relative_path) |
|
|
|
return found_files |
|
|