Spaces:
Running
Running
from abc import ABC, abstractmethod | |
from CodeParser import CodeParser | |
from utils import count_tokens | |
class Chunker(ABC): | |
def __init__(self, encoding_name="gpt-4"): | |
self.encoding_name = encoding_name | |
def chunk(self, content, token_limit): | |
pass | |
def get_chunk(self, chunked_content, chunk_number): | |
pass | |
def print_chunks(chunks): | |
for chunk_number, chunk_code in chunks.items(): | |
print(f"Chunk {chunk_number}:") | |
print("=" * 40) | |
print(chunk_code) | |
print("=" * 40) | |
def consolidate_chunks_into_file(chunks): | |
return "\n".join(chunks.values()) | |
def count_lines(consolidated_chunks): | |
lines = consolidated_chunks.split("\n") | |
return len(lines) | |
class CodeChunker(Chunker): | |
def __init__(self, file_extension, encoding_name="gpt-4"): | |
super().__init__(encoding_name) | |
self.file_extension = file_extension | |
def chunk(self, code, token_limit) -> dict: | |
code_parser = CodeParser(self.file_extension) | |
chunks = {} | |
current_chunk = "" | |
token_count = 0 | |
lines = code.split("\n") | |
i = 0 | |
chunk_number = 1 | |
start_line = 0 | |
breakpoints = sorted(code_parser.get_lines_for_points_of_interest(code, self.file_extension)) | |
comments = sorted(code_parser.get_lines_for_comments(code, self.file_extension)) | |
adjusted_breakpoints = [] | |
for bp in breakpoints: | |
current_line = bp - 1 | |
highest_comment_line = None # Initialize with None to indicate no comment line has been found yet | |
while current_line in comments: | |
highest_comment_line = current_line # Update highest comment line found | |
current_line -= 1 # Move to the previous line | |
if highest_comment_line: # If a highest comment line exists, add it | |
adjusted_breakpoints.append(highest_comment_line) | |
else: | |
adjusted_breakpoints.append( | |
bp) # If no comments were found before the breakpoint, add the original breakpoint | |
breakpoints = sorted(set(adjusted_breakpoints)) # Ensure breakpoints are unique and sorted | |
while i < len(lines): | |
line = lines[i] | |
new_token_count = count_tokens(line, self.encoding_name) | |
if token_count + new_token_count > token_limit: | |
# Set the stop line to the last breakpoint before the current line | |
if i in breakpoints: | |
stop_line = i | |
else: | |
stop_line = max(max([x for x in breakpoints if x < i], default=start_line), start_line) | |
# If the stop line is the same as the start line, it means we haven't reached a breakpoint yet and we need to move to the next line to find one | |
if stop_line == start_line and i not in breakpoints: | |
token_count += new_token_count | |
i += 1 | |
# If the stop line is the same as the start line and the current line is a breakpoint, it means we can create a chunk with just the current line | |
elif stop_line == start_line and i == stop_line: | |
token_count += new_token_count | |
i += 1 | |
# If the stop line is the same as the start line and the current line is a breakpoint, it means we can create a chunk with just the current line | |
elif stop_line == start_line and i in breakpoints: | |
current_chunk = "\n".join(lines[start_line:stop_line]) | |
if current_chunk.strip(): # If the current chunk is not just whitespace | |
chunks[chunk_number] = current_chunk # Using chunk_number as key | |
chunk_number += 1 | |
token_count = 0 | |
start_line = i | |
i += 1 | |
# If the stop line is different from the start line, it means we're at the end of a block | |
else: | |
current_chunk = "\n".join(lines[start_line:stop_line]) | |
if current_chunk.strip(): | |
chunks[chunk_number] = current_chunk # Using chunk_number as key | |
chunk_number += 1 | |
i = stop_line | |
token_count = 0 | |
start_line = stop_line | |
else: | |
# If the token count is still within the limit, add the line to the current chunk | |
token_count += new_token_count | |
i += 1 | |
# Append remaining code, if any, ensuring it's not empty or whitespace | |
current_chunk_code = "\n".join(lines[start_line:]) | |
if current_chunk_code.strip(): # Checks if the chunk is not just whitespace | |
chunks[chunk_number] = current_chunk_code # Using chunk_number as key | |
return chunks | |
def get_chunk(self, chunked_codebase, chunk_number): | |
return chunked_codebase[chunk_number] | |