code-chunker / Chunker.py
CintraAI's picture
updated requirements
4644b40
from abc import ABC, abstractmethod
from CodeParser import CodeParser
from utils import count_tokens
class Chunker(ABC):
def __init__(self, encoding_name="gpt-4"):
self.encoding_name = encoding_name
@abstractmethod
def chunk(self, content, token_limit):
pass
@abstractmethod
def get_chunk(self, chunked_content, chunk_number):
pass
@staticmethod
def print_chunks(chunks):
for chunk_number, chunk_code in chunks.items():
print(f"Chunk {chunk_number}:")
print("=" * 40)
print(chunk_code)
print("=" * 40)
@staticmethod
def consolidate_chunks_into_file(chunks):
return "\n".join(chunks.values())
@staticmethod
def count_lines(consolidated_chunks):
lines = consolidated_chunks.split("\n")
return len(lines)
class CodeChunker(Chunker):
def __init__(self, file_extension, encoding_name="gpt-4"):
super().__init__(encoding_name)
self.file_extension = file_extension
def chunk(self, code, token_limit) -> dict:
code_parser = CodeParser(self.file_extension)
chunks = {}
current_chunk = ""
token_count = 0
lines = code.split("\n")
i = 0
chunk_number = 1
start_line = 0
breakpoints = sorted(code_parser.get_lines_for_points_of_interest(code, self.file_extension))
comments = sorted(code_parser.get_lines_for_comments(code, self.file_extension))
adjusted_breakpoints = []
for bp in breakpoints:
current_line = bp - 1
highest_comment_line = None # Initialize with None to indicate no comment line has been found yet
while current_line in comments:
highest_comment_line = current_line # Update highest comment line found
current_line -= 1 # Move to the previous line
if highest_comment_line: # If a highest comment line exists, add it
adjusted_breakpoints.append(highest_comment_line)
else:
adjusted_breakpoints.append(
bp) # If no comments were found before the breakpoint, add the original breakpoint
breakpoints = sorted(set(adjusted_breakpoints)) # Ensure breakpoints are unique and sorted
while i < len(lines):
line = lines[i]
new_token_count = count_tokens(line, self.encoding_name)
if token_count + new_token_count > token_limit:
# Set the stop line to the last breakpoint before the current line
if i in breakpoints:
stop_line = i
else:
stop_line = max(max([x for x in breakpoints if x < i], default=start_line), start_line)
# If the stop line is the same as the start line, it means we haven't reached a breakpoint yet and we need to move to the next line to find one
if stop_line == start_line and i not in breakpoints:
token_count += new_token_count
i += 1
# If the stop line is the same as the start line and the current line is a breakpoint, it means we can create a chunk with just the current line
elif stop_line == start_line and i == stop_line:
token_count += new_token_count
i += 1
# If the stop line is the same as the start line and the current line is a breakpoint, it means we can create a chunk with just the current line
elif stop_line == start_line and i in breakpoints:
current_chunk = "\n".join(lines[start_line:stop_line])
if current_chunk.strip(): # If the current chunk is not just whitespace
chunks[chunk_number] = current_chunk # Using chunk_number as key
chunk_number += 1
token_count = 0
start_line = i
i += 1
# If the stop line is different from the start line, it means we're at the end of a block
else:
current_chunk = "\n".join(lines[start_line:stop_line])
if current_chunk.strip():
chunks[chunk_number] = current_chunk # Using chunk_number as key
chunk_number += 1
i = stop_line
token_count = 0
start_line = stop_line
else:
# If the token count is still within the limit, add the line to the current chunk
token_count += new_token_count
i += 1
# Append remaining code, if any, ensuring it's not empty or whitespace
current_chunk_code = "\n".join(lines[start_line:])
if current_chunk_code.strip(): # Checks if the chunk is not just whitespace
chunks[chunk_number] = current_chunk_code # Using chunk_number as key
return chunks
def get_chunk(self, chunked_codebase, chunk_number):
return chunked_codebase[chunk_number]