AI大模型探索之路-实战篇15: Agent智能数据分析平台之整合封装Tools和Memory功能代码
CSDN 2024-06-28 13:31:05 阅读 95
系列篇章💥
AI大模型探索之路-实战篇4:深入DB-GPT数据应用开发框架调研
AI大模型探索之路-实战篇5:探索Open Interpreter开放代码解释器调研
AI大模型探索之路-实战篇6:掌握Function Calling的详细流程
AI大模型探索之路-实战篇7:Function Calling技术实战自动生成函数
AI大模型探索之路-实战篇8:多轮对话与Function Calling技术应用
AI大模型探索之路-实战篇9:探究Agent智能数据分析平台的架构与功能
AI大模型探索之路-实战篇10:数据预处理的艺术:构建Agent智能数据分析平台的基础
AI大模型探索之路-实战篇11: Function Calling技术整合:强化Agent智能数据分析平台功能
AI大模型探索之路-实战篇12: 构建互动式Agent智能数据分析平台:实现多轮对话控制
AI大模型探索之路-实战篇13: 从对话到报告:打造能记录和分析的Agent智能数据分析平台
AI大模型探索之路-实战篇14: 集成本地Python代码解释器:强化Agent智能数据分析平台
目录
系列篇章💥一、前言二、Memory功能实现之在线云盘类封装1、创建OpenAi客户端2、定义本地云盘文件目录创建方法3、定义doc文档创建方法4、定义文件内容追加方法5、定义获取文件内容的方法6、定义清理文件内容的方法7、定义获取文件列表的方法8、定义文件重命名的方法9、定义删除文件的方法10、定义追加图片的方法11、定义一个云盘文件操作类
三、Memory功能实现之消息工具类封装1、定义消息管理类2、测试-查看消息管理器3、测试-追加消息4、测试-删除消息5、添加背景知识
四、Tools功能之函数封装1、获取表结构基本信息(工具函数)2、提取SQL数据到python变量(辅助函数)3、python代码解释器(工具函数)4、function函数信息生成器(辅助函数)5、function函数调用辅助类
三、结语
一、前言
在前面篇章中我们实现了Agent智能数据分析平台中的Tools和Memory相关代码落地实践,本文中我们将对这两大块功能代码进行整合封装。
二、Memory功能实现之在线云盘类封装
1、创建OpenAi客户端
## 导入依赖
import openai
import os
import numpy as np
import pandas as pd
import json
import io
from openai import OpenAI
import inspect
import pymysql
import tiktoken
from docx import Document
import matplotlib.pyplot as plt
import seaborn as sns
import tempfile
import ast
from IPython.display import display, Markdown, Code
import shutil
import copy
from openai import APIConnectionError,AuthenticationError
openai.api_key = os.getenv("OPENAI_API_KEY")
client = OpenAI(api_key=openai.api_key)
2、定义本地云盘文件目录创建方法
import os
def create_or_get_folder(folder_name):
"""
根据项目创建云盘目录
"""
base_path = "/root/autodl-tmp/iquery项目/iquery云盘"
full_path = os.path.join(base_path, folder_name)
# 如果目录不存在,则创建它
if not os.path.exists(full_path):
os.makedirs(full_path)
print(f"目录 { folder_name} 创建成功")
else:
print(f"目录 { folder_name} 已存在")
return full_path
print(create_or_get_folder(folder_name = "测试函数"))
3、定义doc文档创建方法
def create_or_get_doc(folder_name, doc_name):
"""
创建或获取文件路径
"""
base_path = "/root/autodl-tmp/iquery项目/iquery云盘"
full_path_folder=os.path.join(base_path,folder_name)
file_path_doc = os.path.join(base_path+"/"+folder_name, f'{ doc_name}.doc')
# 检查目录是否存在,如果不存在则创建
if not os.path.exists(full_path_folder):
os.makedirs(full_path_folder)
# 检查文件是否存在
if os.path.exists(file_path_doc):
# 文件存在,打开并追加内容
document = Document(file_path_doc)
else:
# 文件不存在,创建一个新的文档对象
document = Document()
# 保存文档
document.save(file_path_doc)
return file_path_doc
create_or_get_doc(folder_name="测试函数",doc_name="数据分析")
4、定义文件内容追加方法
def append_content_in_doc(folder_name, doc_name, qa_string):
""""
往文件里追加内容
@param folder_name=目录名,doc_name=文件名,qa_string=追加的内容
"""
base_path = "/root/autodl-tmp/iquery项目/iquery云盘"
## 目录地址
full_path_folder=base_path+"/"+folder_name
## 文件地址
full_path_doc = os.path.join(full_path_folder, doc_name)+".doc"
# 检查目录是否存在,如果不存在则创建
if not os.path.exists(full_path_folder):
os.makedirs(full_path_folder)
# 检查文件是否存在
if os.path.exists(full_path_doc):
# 文件存在,打开并追加内容
document = Document(full_path_doc)
else:
# 文件不存在,创建一个新的文档对象
document = Document()
# 追加内容
document.add_paragraph(qa_string)
# 保存文档
document.save(full_path_doc)
print(f"内容已追加到 { doc_name}")
测试
my_dict = "天青色等烟雨,而我在等你"
append_content_in_doc(folder_name="测试函数",doc_name="数据分析",qa_string=my_dict)
5、定义获取文件内容的方法
## 实现根据项目和文件获取文件内容的方法
from docx import Document
import os
def get_file_content(folder_name, doc_name):
"""
实现根据项目名和文件名获取文件内容的方法
@param project_name:项目名,file_name:文件名
@return 文件内容
"""
# 构建文件的完整路径
base_path = "/root/autodl-tmp/iquery项目/iquery云盘"
file_path = os.path.join(folder_name, doc_name)
full_path = os.path.join(base_path, file_path)+".doc"
# 确保文件存在
if not os.path.exists(full_path):
return "文件不存在"
try:
# 加载文档
doc = Document(full_path)
content = []
# 遍历文档中的每个段落,并收集文本
for para in doc.paragraphs:
content.append(para.text)
# 将所有段落文本合并成一个字符串返回
return '\n'.join(content)
except Exception as e:
return f"读取文件时发生错误: { e}"
测试
get_file_content(folder_name="测试函数",doc_name="数据分析")
6、定义清理文件内容的方法
from docx import Document
def clear_content_in_doc(folder_name, doc_name):
# 打开文档
base_path = "/root/autodl-tmp/iquery项目/iquery云盘"
file_path = os.path.join(base_path+"/"+folder_name, f'{ doc_name}.doc')
doc = Document(file_path)
# 遍历每一个段落,设置其文本为空字符串
for p in doc.paragraphs:
for run in p.runs:
run.text = ''
# 保存修改后的文档
doc.save(file_path)
print("文档内容清除完毕")
测试
clear_content_in_doc(folder_name="测试函数",doc_name="数据分析")
7、定义获取文件列表的方法
def list_files_in_folder(folder_name):
"""
列举当前文件夹的全部文件
"""
base_path = "/root/autodl-tmp/iquery项目/iquery云盘"
full_path = os.path.join(base_path,folder_name )
file_names = [f for f in os.listdir(full_path) if os.path.isfile(os.path.join(full_path, f))]
return file_names
测试
list_files_in_folder(folder_name="测试函数")
8、定义文件重命名的方法
def rename_doc(folder_name, doc_name, new_name):
"""
修改指定的文档名称
"""
base_path = "/root/autodl-tmp/iquery项目/iquery云盘"
file_path = os.path.join(base_path+"/"+folder_name, f'{ doc_name}.doc')
new_file_path = os.path.join(base_path+"/"+folder_name, f'{ new_name}.doc')
# 重命名文件
os.rename(file_path, new_file_path)
return new_name
测试
rename_doc(folder_name="测试函数",doc_name="数据分析",new_name="数据可视化分析报告")
9、定义删除文件的方法
def delete_all_files_in_folder(folder_name):
"""
删除某文件夹内全部文件
"""
# 定义要删除的目录路径
base_path = "/root/autodl-tmp/iquery项目/iquery云盘"
full_path = os.path.join(base_path,folder_name)
# 遍历整个目录
for filename in os.listdir(full_path):
# 构造文件或者文件夹的绝对路径
file_path = os.path.join(full_path, filename)
try:
# 如果是文件,则删除文件
if os.path.isfile(file_path) or os.path.islink(file_path):
os.unlink(file_path)
# 如果是文件夹,则删除文件夹
elif os.path.isdir(file_path):
shutil.rmtree(file_path)
print("文件已清除完毕")
except Exception as e:
print('Failed to delete %s. Reason: %s' % (file_path, e))
测试
delete_all_files_in_folder(folder_name = "测试函数")
10、定义追加图片的方法
from docx import Document
import matplotlib.pyplot as plt
import os
import tempfile
def append_img_in_doc(folder_name, doc_name, img):
""""
往文件里追加图片
@param folder_name=目录名,doc_name=文件名,img=图片对象,数据类型为matplotlib.figure.Figure对象
"""
base_path = "/root/autodl-tmp/iquery项目/iquery云盘"
## 目录地址
full_path_folder=base_path+"/"+folder_name
## 文件地址
full_path_doc = os.path.join(full_path_folder, doc_name)+".doc"
# 检查目录是否存在,如果不存在则创建
if not os.path.exists(full_path_folder):
os.makedirs(full_path_folder)
# 检查文件是否存在
if os.path.exists(full_path_doc):
print(full_path_doc)
# 文件存在,打开并追加内容
document = Document(full_path_doc)
else:
# 文件不存在,创建一个新的文档对象
document = Document()
# 追加图片
# 将matplotlib的Figure对象保存为临时图片文件
with tempfile.NamedTemporaryFile(delete=False, suffix='.png') as tmpfile:
img.savefig(tmpfile.name, format='png')
# 将图片插入到.docx文档中
document.add_picture(tmpfile.name)
# 保存文档
document.save(full_path_doc)
print(f"图片已追加到 { doc_name}")
import matplotlib.pyplot as plt
# 创建一个图形
fig, ax = plt.subplots()
ax.plot([1, 2, 3, 4, 5])
append_img_in_doc(folder_name="测试函数",doc_name="数据分析",img=fig)
11、定义一个云盘文件操作类
class CloudFile():
"""
用于操作云盘文件
"""
def __init__(self,
project_name,
part_name,
doc_content = None):
# 项目名称,即项目文件夹名称
self.project_name = project_name
# 项目某部分名称,即项目文件名称
self.part_name = part_name
# 项目文件夹ID
# 若项目文件夹ID为空,则获取项目文件夹ID
folder_path=create_or_get_folder(folder_name=project_name)
# 创建时获取当前项目中其他文件名称列表
self.doc_list = list_files_in_folder(folder_name=project_name)
file_path = create_or_get_doc(folder_name=project_name,
doc_name=part_name)
# 项目文件具体内容,相当于多轮对话内容
self.doc_content = doc_content
# 若初始content不为空,则将其追加入文档内
if doc_content != None:
append_content_in_doc(folder_name=project_name,
doc_name=part_name,
dict_list=doc_content)
def get_doc_content(self):
"""
根据项目某文件的文件ID,获取对应的文件内容
"""
self.doc_content = get_file_content(folder_name=self.project_name, doc_name=self.part_name)
return self.doc_content
def append_doc_content(self, content):
"""
根据项目某文件的文件ID,追加文件内容
"""
append_content_in_doc(folder_name=self.project_name,
doc_name=self.part_name,
dict_list=content)
def clear_content(self):
"""
清空某文件内的全部内容
"""
clear_content_in_doc(folder_name=self.project_name, doc_name=self.part_name)
def delete_all_files(self):
"""
删除当前项目文件夹内的全部文件
"""
delete_all_files_in_folder(folder_name=self.project_name)
def update_doc_list(self):
"""
更新当前项目文件夹内的全部文件名称
"""
self.doc_list = list_files_in_folder(folder_name=self.project_name)
def rename_doc(self, new_name):
"""
修改当前文件名称
"""
self.part_name = rename_doc_in_drive(folder_name=self.project_name, doc_name=self.part_name,
new_name=new_name)
本地存储测试
c1 = CloudFile(project_name='测试项目', part_name='测试文档1')
三、Memory功能实现之消息工具类封装
1、定义消息管理类
class MessageManager():
"""
MessageManager,用于创建Chat模型能够接收和解读的messages对象。该对象是原始Chat模型接收的\
messages对象的更高级表现形式,MessageManager类对象将字典类型的list作为其属性之一,同时还能\
能区分系统消息和历史对话消息,并且能够自行计算当前对话的token量,并执能够在append的同时删\
减最早对话消息,从而能够更加顺畅的输入大模型并完成多轮对话需求。
"""
def __init__(self,
system_content_list=[],
question='你好。',
tokens_thr=None,
project=None):
self.system_content_list = system_content_list
# 系统消息文档列表,相当于外部输入文档列表
system_messages = []
# 除系统消息外历史对话消息
history_messages = []
# 用于保存全部消息的list
messages_all = []
# 系统消息字符串
system_content = ''
# 历史消息字符串,此时为用户输入信息
history_content = question
# 系统消息+历史消息字符串
content_all = ''
# 输入到messages中系统消息个数,初始情况为0
num_of_system_messages = 0
# 全部信息的token数量
all_tokens_count = 0
encoding = tiktoken.encoding_for_model("gpt-3.5-turbo")
# 将外部输入文档列表依次保存为系统消息
if system_content_list != []:
for content in system_content_list:
system_messages.append({ "role": "system", "content": content})
# 同时进行全文档拼接
system_content += content
# 计算系统消息token
system_tokens_count = len(encoding.encode(system_content))
# 拼接系统消息
messages_all += system_messages
# 计算系统消息个数
num_of_system_messages = len(system_content_list)
# 若存在最大token数量限制
if tokens_thr != None:
# 若系统消息超出限制
if system_tokens_count >= tokens_thr:
print("system_messages的tokens数量超出限制,当前系统消息将不会被输入模型")
# 删除系统消息
system_messages = []
messages_all = []
# 系统消息个数清零
num_of_system_messages = 0
# 系统消息token数清零
system_tokens_count = 0
all_tokens_count += system_tokens_count
# 创建首次对话消息
history_messages = [{ "role": "user", "content": question}]
# 创建全部消息列表
messages_all += history_messages
# 计算用户问题token
user_tokens_count = len(encoding.encode(question))
# 计算总token数
all_tokens_count += user_tokens_count
# 若存在最大token限制
if tokens_thr != None:
# 若超出最大token限制
if all_tokens_count >= tokens_thr:
print("当前用户问题的tokens数量超出限制,该消息无法被输入到模型中")
# 同时清空系统消息和用户消息
history_messages = []
system_messages = []
messages_all = []
num_of_system_messages = 0
all_tokens_count = 0
# 全部messages信息
self.messages = messages_all
# system_messages信息
self.system_messages = system_messages
# user_messages信息
self.history_messages = history_messages
# messages信息中全部content的token数量
self.tokens_count = all_tokens_count
# 系统信息数量
self.num_of_system_messages = num_of_system_messages
# 最大token数量阈值
self.tokens_thr = tokens_thr
# token数计算编码方式
self.encoding = tiktoken.encoding_for_model("gpt-3.5-turbo")
# message挂靠的项目
self.project = project
# 删除部分对话信息
def messages_pop(self, manual=False, index=None):
def reduce_tokens(index):
drop_message = self.history_messages.pop(index)
self.tokens_count -= len(self.encoding.encode(str(drop_message)))
if self.tokens_thr is not None:
while self.tokens_count >= self.tokens_thr:
reduce_tokens(-1)
if manual:
if index is None:
reduce_tokens(-1)
elif 0 <= index < len(self.history_messages) or index == -1:
reduce_tokens(index)
else:
raise ValueError("Invalid index value: {}".format(index))
# 更新messages
self.messages = self.system_messages + self.history_messages
# 增加部分对话信息
def messages_append(self, new_messages):
# 若是单独一个字典,或JSON格式字典
if type(new_messages) is dict or type(new_messages) is openai.types.chat.chat_completion_message.ChatCompletionMessage:
self.messages.append(new_messages)
self.tokens_count += len(self.encoding.encode(str(new_messages)))
# 若新消息也是MessageManager对象
elif isinstance(new_messages, MessageManager):
self.messages += new_messages.messages
self.tokens_count += new_messages.tokens_count
# 重新更新history_messages
self.history_messages = self.messages[self.num_of_system_messages: ]
# 再执行pop,若有需要,则会删除部分历史消息
self.messages_pop()
# 复制信息
def copy(self):
# 创建一个新的 MessageManager 对象,复制所有重要的属性
system_content_str_list = [message["content"] for message in self.system_messages]
new_obj = MessageManager(
system_content_list=copy.deepcopy(system_content_str_list), # 使用深复制来复制系统消息
question=self.history_messages[0]["content"] if self.history_messages else '',
tokens_thr=self.tokens_thr
)
# 复制任何其他需要复制的属性
new_obj.history_messages = copy.deepcopy(self.history_messages) # 使用深复制来复制历史消息
new_obj.messages = copy.deepcopy(self.messages) # 使用深复制来复制所有消息
new_obj.tokens_count = self.tokens_count
new_obj.num_of_system_messages = self.num_of_system_messages
return new_obj
# 增加系统消息
def add_system_messages(self, new_system_content):
system_content_list = self.system_content_list
system_messages = []
# 若是字符串,则将其转化为list
if type(new_system_content) == str:
new_system_content = [new_system_content]
system_content_list.extend(new_system_content)
new_system_content_str = ''
for content in new_system_content:
new_system_content_str += content
new_token_count = len(self.encoding.encode(str(new_system_content_str)))
self.tokens_count += new_token_count
self.system_content_list = system_content_list
for message in system_content_list:
system_messages.append({ "role": "system", "content": message})
self.system_messages = system_messages
self.num_of_system_messages = len(system_content_list)
self.messages = system_messages + self.history_messages
# 再执行pop,若有需要,则会删除部分历史消息
self.messages_pop()
# 删除系统消息
def delete_system_messages(self):
system_content_list = self.system_content_list
if system_content_list != []:
system_content_str = ''
for content in system_content_list:
system_content_str += content
delete_token_count = len(self.encoding.encode(str(system_content_str)))
self.tokens_count -= delete_token_count
self.num_of_system_messages = 0
self.system_content_list = []
self.system_messages = []
self.messages = self.history_messages
# 清除对话消息中的function消息
def delete_function_messages(self):
# 用于删除外部函数消息
history_messages = self.history_messages
# 从后向前迭代列表
for index in range(len(history_messages) - 1, -1, -1):
message = history_messages[index]
## 这儿估计有问题
if message.get("function_call") or message.get("role") == "function":
self.messages_pop(manual=True, index=index)
2、测试-查看消息管理器
msg1 = MessageManager()
msg1.system_messages
3、测试-追加消息
msg1.history_messages
msg1.messages_append({ "role": "user", "content": "你好,有什么可以帮你?"})
msg1.history_messages
4、测试-删除消息
msg1.messages_pop(manual=True, index=-1)
msg1.history_messages
5、添加背景知识
# 数据字典文件
with open('/root/autodl-tmp/iquery项目/data/数据字典/iquery数据字典.md', 'r', encoding='utf-8') as f:
data_dictionary = f.read()
# 数据分析报告编写专家文档
with open('/root/autodl-tmp/iquery项目/data/业务知识/本公司数据分析师业务介绍.md', 'r', encoding='utf-8') as f:
da_instruct = f.read()
msg2 = MessageManager(system_content_list=[data_dictionary, da_instruct])
msg2.system_messages
输出
四、Tools功能之函数封装
1、获取表结构基本信息(工具函数)
定义一个SQL工具函数,用于获取表结构信息,作为大模型生成SQL的背景知识
## mysql hive sparksql
def sql_inter(sql_query, g='globals()'):
"""
用于获取iquery数据库中各张表的有关相关信息,\
核心功能是将输入的SQL代码传输至iquery数据库所在的MySQL环境中进行运行,\
并最终返回SQL代码运行结果。需要注意的是,本函数是借助pymysql来连接MySQL数据库。
:param sql_query: 字符串形式的SQL查询语句,用于执行对MySQL中iquery数据库中各张表进行查询,并获得各表中的各类相关信息
:param g: g,字符串形式变量,表示环境变量,无需设置,保持默认参数即可
:return:sql_query在MySQL中的运行结果。
"""
mysql_pw = "iquery_agent"
connection = pymysql.connect(
host='localhost', # 数据库地址
user='iquery_agent', # 数据库用户名
passwd=mysql_pw, # 数据库密码
db='iquery', # 数据库名
charset='utf8' # 字符集选择utf8
)
try:
with connection.cursor() as cursor:
# SQL查询语句
sql = sql_query
cursor.execute(sql)
# 获取查询结果
results = cursor.fetchall()
finally:
connection.close()
return json.dumps(results)
sql_inter(sql_query='SELECT COUNT(*) FROM user_demographics;', g=globals())
2、提取SQL数据到python变量(辅助函数)
定义一个辅助函数,用于将查询到的记录提取保存到本地python变量中
def extract_data(sql_query,df_name,g='globals()'):
"""
用于借助pymysql,将MySQL中的iquery数据库中的表读取并保存到本地Python环境中。
:param sql_query: 字符串形式的SQL查询语句,用于提取MySQL中iquery数据库中的某张表。
:param df_name: 将MySQL数据库中提取的表格进行本地保存时的变量名,以字符串形式表示。
:param g: g,字符串形式变量,表示环境变量,无需设置,保持默认参数即可
:return:表格读取和保存结果
"""
mysql_pw = "iquery_agent"
connection = pymysql.connect(
host='localhost', # 数据库地址
user='iquery_agent', # 数据库用户名
passwd=mysql_pw, # 数据库密码
db='iquery', # 数据库名
charset='utf8' # 字符集选择utf8
)
globals()[df_name] = pd.read_sql(sql_query, connection)
return "已成功完成%s变量创建" % df_name
extract_data(sql_query = 'SELECT * FROM user_demographics;',
df_name = 'user_demographics_df',
g = globals())
从python变量中取出数据查看
user_demographics_df.head()
3、python代码解释器(工具函数)
def python_inter(py_code,g='globals()'):
"""
用于对iquery数据库中各张数据表进行查询和处理,并获取最终查询或处理结果。
:param py_code: 字符串形式的Python代码,用于执行对iquery数据库中各张数据表进行操作
:param g: g,字符串形式变量,表示环境变量,无需设置,保持默认参数即可
:return:代码运行的最终结果
"""
# 添加图片对象,如果存在绘图代码,则创建fig对象
py_code = insert_fig_object(py_code)
global_vars_before = set(globals().keys())
try:
exec(py_code, globals())
except Exception as e:
return str(e)
global_vars_after = set(globals().keys())
new_vars = global_vars_after - global_vars_before
if new_vars:
result = { var: globals()[var] for var in new_vars}
return str(result)
else:
try:
return str(eval(py_code, globals()))
except Exception as e:
return "已经顺利执行代码"
检查图形对象,赋值给fig(方便通过全局变量fig进行统一的绘图对象操作)
def insert_fig_object(code_str,g='globals()'):
"""
为图片创建fig对象
:param g: g,字符串形式变量,表示环境变量,无需设置,保持默认参数即可
"""
#print("开始画图了")
global fig
# 检查是否已存在 fig 对象的创建
if 'fig = plt.figure' in code_str or 'fig, ax = plt.subplots()' in code_str:
return code_str # 如果存在,则返回原始代码字符串
# 定义可能的库别名和全名
plot_aliases = ['plt.', 'matplotlib.pyplot.','plot']
sns_aliases = ['sns.', 'seaborn.']
# 寻找第一次出现绘图相关代码的位置
first_plot_occurrence = min((code_str.find(alias) for alias in plot_aliases + sns_aliases if code_str.find(alias) >= 0), default=-1)
# 如果找到绘图代码,则在该位置之前插入 fig 对象的创建
if first_plot_occurrence != -1:
plt_figure_index = code_str.find('plt.figure')
if plt_figure_index != -1:
# 寻找 plt.figure 后的括号位置,以确定是否有参数
closing_bracket_index = code_str.find(')', plt_figure_index)
# 如果找到了 plt.figure(),则替换为 fig = plt.figure()
modified_str = code_str[:plt_figure_index] + 'fig = ' + code_str[plt_figure_index:closing_bracket_index + 1] + code_str[closing_bracket_index + 1:]
else:
modified_str = code_str[:first_plot_occurrence] + 'fig = plt.figure()\n' + code_str[first_plot_occurrence:]
return modified_str
else:
return code_str # 如果没有找到绘图代码,则返回原始代码字符串
图形绘制代码测试
import matplotlib.pyplot as plt
# 数据
categories = ['Category 1', 'Category 2', 'Category 3', 'Category 4']
values = [4, 7, 1, 8]
# 创建 Figure 对象
fig, ax = plt.subplots()
# 在 Axes 对象 ax 上创建条形图
ax.bar(categories, values)
# 添加标题和标签
ax.set_title('Bar Chart Example')
ax.set_xlabel('Categories')
ax.set_ylabel('Values')
# 显示图表
plt.show()
insert_fig_object方法测试
code_string = """
import matplotlib.pyplot as plt
# 数据
categories = ['Category 1', 'Category 2', 'Category 3', 'Category 4']
values = [4, 7, 1, 8]
# 创建 Figure 对象
fig, ax = plt.subplots()
# 在 Axes 对象 ax 上创建条形图
ax.bar(categories, values)
# 添加标题和标签
ax.set_title('Bar Chart Example')
ax.set_xlabel('Categories')
ax.set_ylabel('Values')
# 显示图表
plt.show()
"""
print(insert_fig_object(code_str = code_string, g=globals()))
python_inter(py_code = code_string, g=globals())
查看fig对象
4、function函数信息生成器(辅助函数)
定义一个用于生成function calling 函数信息的,辅助函数(为了保证稳定性,实践使用是最好手工编写,避免大模型生成的结构体不太稳定)
def auto_functions(functions_list):
"""
Chat模型的functions参数编写函数
:param functions_list: 包含一个或者多个函数对象的列表;
:return:满足Chat模型functions参数要求的functions对象
"""
def functions_generate(functions_list):
# 创建空列表,用于保存每个函数的描述字典
functions = []
# 对每个外部函数进行循环
for function in functions_list:
# 读取函数对象的函数说明
function_description = inspect.getdoc(function)
# 读取函数的函数名字符串
function_name = function.__name__
system_prompt = '以下是某的函数说明:%s' % function_description
user_prompt = '根据这个函数的函数说明,请帮我创建一个JSON格式的字典,这个字典有如下5点要求:\
1.字典总共有三个键值对;\
2.第一个键值对的Key是字符串name,value是该函数的名字:%s,也是字符串;\
3.第二个键值对的Key是字符串description,value是该函数的函数的功能说明,也是字符串;\
4.第三个键值对的Key是字符串parameters,value是一个JSON Schema对象,用于说明该函数的参数输入规范。\
5.输出结果必须是一个JSON格式的字典,只输出这个字典即可,前后不需要任何前后修饰或说明的语句' % function_name
response = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[
{ "role": "system", "content": system_prompt},
{ "role": "user", "content": user_prompt}
]
)
json_function_description=json.loads(response.choices[0].message.content.replace("```","").replace("json",""))
json_str={ "type": "function","function":json_function_description}
functions.append(json_str)
return functions
max_attempts = 4
attempts = 0
while attempts < max_attempts:
try:
functions = functions_generate(functions_list)
break # 如果代码成功执行,跳出循环
except Exception as e:
attempts += 1 # 增加尝试次数
print("发生错误:", e)
if attempts == max_attempts:
print("已达到最大尝试次数,程序终止。")
raise # 重新引发最后一个异常
else:
print("正在重新运行...")
return functions
5、function函数调用辅助类
负责承接外部函数调用时相关功能支持。
类属性包括外部函数列表、外部函数参数说明列表、以及调用方式说明三项。
class AvailableFunctions():
"""
外部函数类,主要负责承接外部函数调用时相关功能支持。类属性包括外部函数列表、外部函数参数说明列表、以及调用方式说明三项。
"""
def __init__(self, functions_list=[], functions=[], function_call="auto"):
self.functions_list = functions_list
self.functions = functions
self.functions_dic = None
self.function_call = None
# 当外部函数列表不为空、且外部函数参数解释为空时,调用auto_functions创建外部函数解释列表
if functions_list != []:
self.functions_dic = { func.__name__: func for func in functions_list}
self.function_call = function_call
if functions == []:
self.functions = auto_functions(functions_list)
# 增加外部函数方法,并且同时可以更换外部函数调用规则
def add_function(self, new_function, function_description=None, function_call_update=None):
self.functions_list.append(new_function)
self.functions_dic[new_function.__name__] = new_function
if function_description == None:
new_function_description = auto_functions([new_function])
self.functions.append(new_function_description)
else:
self.functions.append(function_description)
if function_call_update != None:
self.function_call = function_call_update
af = AvailableFunctions(functions_list=[sql_inter, extract_data, python_inter])
af.functions_list
af.functions_dic
af.function_call
af.functions
输出:
[{ 'type': 'function',
'function': { 'name': 'sql_inter',
'description': '用于获取iquery数据库中各张表的有关相关信息,核心功能是将输入的SQL代码传输至iquery数据库所在的MySQL环境中进行运行,并最终返回SQL代码运行结果。',
'parameters': { 'type': 'object',
'properties': { 'sql_query': { 'type': 'string',
'description': '字符串形式的SQL查询语句,用于执行对MySQL中iquery数据库中各张表进行查询,并获得各表中的各类相关信息。'},
'g': { 'type': 'string',
'description': 'g,字符串形式变量,表示环境变量,无需设置,保持默认参数即可。'}},
'required': ['sql_query', 'g']}}},
{ 'type': 'function',
'function': { 'name': 'extract_data',
'description': '用于借助pymysql,将MySQL中的iquery数据库中的表读取并保存到本地Python环境中。',
'parameters': { 'type': 'object',
'properties': { 'sql_query': { 'type': 'string',
'description': '字符串形式的SQL查询语句,用于提取MySQL中iquery数据库中的某张表。'},
'df_name': { 'type': 'string',
'description': '将MySQL数据库中提取的表格进行本地保存时的变量名,以字符串形式表示。'},
'g': { 'type': 'string', 'description': 'g,字符串形式变量,表示环境变量,无需设置,保持默认参数即可'}},
'required': ['sql_query', 'df_name']}}},
{ 'type': 'function',
'function': { 'name': 'python_inter',
'description': '用于对iquery数据库中各张数据表进行查询和处理,并获取最终查询或处理结果。',
'parameters': { '$schema': 'http://-schema.org/draft-07/schema#',
'type': 'object',
'properties': { 'py_code': { 'type': 'string',
'description': '字符串形式的Python代码,用于执行对iquery数据库中各张数据表进行操作'},
'g': { 'type': 'string', 'description': 'g,字符串形式变量,表示环境变量,无需设置,保持默认参数即可'}},
'required': ['py_code', 'g']}}}]
三、结语
本文中我们封装落地了Agent智能数据分析平台中的Tools和Memory相关代码,下一篇章中我将落地实践Agent智能数据分析平台的核心模块Plan,探索发掘人类意图,优化整个决策流程。
🎯🔖更多专栏系列文章:AIGC-AI大模型探索之路
😎 作者介绍:我是寻道AI小兵,资深程序老猿,从业10年+、互联网系统架构师,目前专注于AIGC的探索。
📖 技术交流:建立有技术交流群,可以扫码👇 加入社群,500本各类编程书籍、AI教程、AI工具等你领取!
如果文章内容对您有所触动,别忘了点赞、⭐关注,收藏!加入我,让我们携手同行AI的探索之旅,一起开启智能时代的大门!
声明
本文内容仅代表作者观点,或转载于其他网站,本站不以此文作为商业用途
如有涉及侵权,请联系本站进行删除
转载本站原创文章,请注明来源及作者。