File size: 4,803 Bytes
545c208
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
"""output parser for code interpreter"""

import ast
from typing import Tuple


def parse_code_action(
    output: str,
    mode: str = "prompt",
    code_start_token: str = "```\npython\n",
    code_end_token: str = "```",
    tool_call_token: str = "<|tool_call|>",
) -> Tuple[str, str]:
    """parse output from code interpreter

    Args:
        output (str): the output from code interpreter
        mode: the mode of the output, could be prompt, functioncall, assistant
        code_start_token: the token code script starts with, only used in prompt mode
        code_end_token: the token code script ends with, only used in prompt mode
        tool_call_token: the token for tool call, only used in prompt mode

    Returns:
        Tuple[str, str]: reasoning and code action
    """
    if mode == "prompt":
        return extract_code(output, code_start_token, code_end_token)
    elif mode == "functioncall":
        rsp = fc2dict(output, tool_call_token)
        if "tool_calls" in rsp and len(rsp["tool_calls"]) > 0:
            return rsp["content"], rsp["tool_calls"][0]["arguments"]["code"]
        else:
            return rsp["content"], ""
    elif mode == "assistant":
        raise NotImplementedError("assistant mode is not implemented yet")
    else:
        raise ValueError(f"mode {mode} is not supported")


def extract_code(
    rsp: str, code_start_token: str = "```\npython\n", code_end_token: str = "```"
) -> Tuple[str, str]:
    """extract code from assistant content

    Args:
        rsp (str): the response content from assistant
        code_start_token (str, optional): the token code script starts with. Defaults to "```\npython".
        code_end_token (str, optional): the token code script ends with. Defaults to "```".

    Returns:
        Tuple[str, str]: reasoning and code action
    """
    # TODO: implement the code extraction logic using different code_start_token and code_end_token
    rsp = str(rsp)

    start_index = rsp.find(code_start_token)
    if start_index == -1:
        return rsp, ""

    start_index += len(code_start_token)
    end_index = rsp.find(code_end_token, start_index)
    if end_index == -1:
        return rsp, ""

    return rsp[:start_index].replace(code_start_token, "").strip(), rsp[
        start_index:end_index
    ].strip()


import ast
import re
import json

def convert_function_call_to_json(string):
    try:
        tool_calls = []
        x = ast.parse(string)
        for tool in x.body:
            function_name = tool.value.func.id
            function_args = {}
            for keyword in tool.value.keywords:
                function_args[keyword.arg] = ast.literal_eval(keyword.value)
            this_one = {"name": function_name, "arguments": function_args}
            tool_calls.append(this_one)
        return tool_calls
    except Exception:
        return []
import json

def extract_code_from_arguments(arguments_str):
    try:
        arguments_dict = json.loads(arguments_str)
        return arguments_dict.get("code", "")
    except json.JSONDecodeError:
        return ""

def fc2dict(sequence: str, spliter="<|tool_call|>"):
    if spliter in sequence:
        content, tool_call_string = sequence.split(spliter, 1)
        try:
            # 找到第一个 { 和最后一个 }
            start_idx = tool_call_string.find('{')
            end_idx = tool_call_string.rfind('}')
            if start_idx != -1 and end_idx != -1:
                arguments_str = tool_call_string[start_idx:end_idx + 1]
                print("Arg:",arguments_str)
                arguments_str=arguments_str.replace("\n","\\n")
                #code_content = extract_code_from_arguments(arguments_str)
                tool_call_dict = {
                    "name": "execute_python",
                    "arguments": json.loads(arguments_str)
                }
                tool_calls = [tool_call_dict]
            else:
                tool_calls = []
            return {
                "content": content.strip(),
                "tool_calls": tool_calls,
                "role": "assistant",
            }
        except Exception as e:
            print(f"Error: {e}")
            return {"content": content.strip(), "role": "assistant"}
    else:
        return {"content": sequence.strip(), "role": "assistant"}

# 示例用法
sequence = '''To fulfill your request, I will perform the following steps:

1. Read the dataset from the provided path.
2. Extract the necessary data for the radar chart.
3. Create a radar chart using the extracted data.

Let's start by reading the dataset.

Action:


<|tool_call|>execute_python({"code":"import pandas as pd\n\n# Read the dataset\ndata_path = './data/radar.csv'\ndf = pd.read_csv(data_path)\ndf.head()"})\n'''
result = fc2dict(sequence)
print(result)