From ed023a6e91ffc981c45503b782a49fbae24f70aa Mon Sep 17 00:00:00 2001 From: 13315423919 <13315423919@qq.com> Date: Wed, 19 Nov 2025 19:43:00 +0800 Subject: [PATCH] Add File --- main/agent_chat.py | 1152 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 1152 insertions(+) create mode 100644 main/agent_chat.py diff --git a/main/agent_chat.py b/main/agent_chat.py new file mode 100644 index 0000000..f193cb9 --- /dev/null +++ b/main/agent_chat.py @@ -0,0 +1,1152 @@ +""" +融合智能agent +""" + + +import datetime +from init import llm,gcfg,generate_cfg,get_chat_model +import json +import json5 +from full_index_search import c_or_u_fs_index,adjust_ctx_size,jieba_fenci,c_or_u_baike_index,one_doc_search,full_search_by_doc_list,full_search_by_baike_catalog +from k_database import QA,KAgent,Doc,User,BaiKe,SuperManAgent +from web_search import search_web,get_detail_page2,get_single_page +from da_chat import Agent_DA_chat + +#融合智能agent的构建器,根据不同的功能,构建不同类型的Agent +def Agent_chat_build(chat,delta_stream,username,userid): + + #功能定义 + if "fun_def" in chat and chat["fun_def"]: + if chat["fun_def"] in ["A","B","0","1"]: + return Agent_chat(chat,delta_stream,username,userid) + elif chat["fun_def"] in ["C","D"]: + return Agent_chat0(chat,delta_stream,username,userid) + elif chat["fun_def"] =="S": + return Agent_superman(chat,delta_stream,username,userid) + else: + chat["fun_def"] = "0" + + return Agent_chat(chat,delta_stream,username,userid) + +#融合智能agent, 带推理功能的自响应式对话 +class Agent_chat: + def __init__(self,chat,delta_stream,username,userid): + #短期记忆 + self.mem=[] + #包含的其它知识体 + self.kagents={} + #运行信息的上下文 + self.run_context=Run_Context() + + #最后的结果 + self.answer="" + + #是否分段返回流 + self.delta_stream = delta_stream + + #最大思考轮次 + self.think_max_steps=5 + #思考轮次 + self.think_step=0 + + #用户名和id + self.username = username + self.userid = userid + + #对话的参数 + self.chat = chat + self.allow_doc_list=[] + + #是否是推理模型 + self.think_mode=False + + #功能定义 + self.fun_def = chat["fun_def"] + + #上传文件的情况,相当于指定了文件 + if 'upload_files' in chat and chat["upload_files"]: + doc = chat["upload_files"][0] + self.allow_doc_list.append([doc[0],doc[1]]) + self.run_context.put_env("用户上传文件",{"base":doc[0],"path":doc[1]}) + + #定位智能体列表" + self.kagent_data=[] + system_prompt = chat["prompt"] + + if system_prompt.find("") >0 and system_prompt.find("") >0: + + agents = system_prompt[system_prompt.find("")+8:system_prompt.find("")] + + agents = json5.loads(agents) + + agent_sns = [ item[0] for item in agents] + + self.fill_kagent_data(agent_sns) + #end + + #网络搜索 + self.ns_prompt=""" +## 联网搜索功能 +你可以生成一个400的任务,来启动联网搜索功能,根据搜索的结果来丰富问题的上下文信息。 +### 适用: + 1. 用户明确要求联网搜索的,如"联网搜索xxxx信息,回答问题","开启联网搜索" + 2. 全文检索知识库102后,相关上下文数量少或回答质量不能满足用户时,可以启动联网搜索来补充一些上下文 +### 联网搜索技能 + 1. 你善于使用互联网搜索引擎如百度、必应等来进行联网搜索,会根据用户的意图生成的关键词并能最大化的得到结果,不会乱添加其它无关的关键词。 + 2. 不要尝试通过一次搜索来回答用户问题,可以将问题拆解可以生成多个搜索任务,每次只生成一个主题关键词,这样更容易得到相关的上下文 + 3. 多轮任务时一定要根据运行信息上次任务的主题关键词进行优化,来搜索更多的信息以便回答最后的问题 + + 任务代码:{{"task":400,"关键字":"主题关键词"}} +""" + #不启用 + if self.fun_def in ["A","C"]: + self.ns_prompt="" + + #数据分析 + self.da_prompt=""" +## 数据分析功能 +你可以分析Excel的数据文件,对其进行求和、汇总、分组统计等计算。 +适用场景: + 1.用户明确要求分析数据文件,如"分析一下这个Excel文件,回答问题" + 2.数据分析自带问题回答综述能力,如果只有一个500任务,不需要再生成105做综述 + 3.文件绝对路径为必填,不需要104任务文件的内容 +任务代码: + {{"task":500,"文件名":"文件绝对路径,必填"}} + +""" + #工作流程 + self.workflow_prompt=f""" +## 工作流程 +1. 首先判断用户问题的复杂性,如果用户问题比较复杂:背景信息比较长(字数超过100个字以上)或者问题比较多(三个以上的问题或?号)的判定为复杂问题,按复杂问题处理流程处理。 +2. 不是复杂问题的则判断是否要改写问题,让问题表达清晰意图明确。如果问题改写后还是不清晰则回复用户让用户补充信息,结束任务。 +3. 分析问题理解用户的意图,并结合上次运行信息来考虑本次规划任务的生成,如果上次运行失败的考虑使用其它任务code +4. 优先根据提供的智能体功能列表来匹配问题,生成智能体调用任务,如果外部智能体功能不能满足用户意图,则根据内核技能来生成规划任务 +5. 对于复杂任务需要多个智能体或内核技能的,可以一次性生成一个按顺序调度任务列表,任务列表一定以json数组形式输出,使用[]包裹。格式如下: + [{{"id":"任务1","task":400,"关键字":"搜索的内容"}},{{"task":200,"SN":"智能体的标识","name":"智能体的名称","req":"问题信息"}},{{"id":"任务2","task":105}}..] +6. 运行信息是一个json数组结构,包括每个任务的任务代码和调用的后结果信息,如果没有出错或提示上下文数量为0,则表示上次任务成功,不用再重复生成相同任务。 +7. 根据运行信息来做任务规划,如果任务信息中只有工具调用和联网搜索,没有智能体调用的情景时,则需要生成105做最后的汇总回答;105一个就够了,不要重复规划105. +8. 如果用户的最后意图是生成海报、生成PPT、保存个人文件夹和保存个人空间等不需要做显式回答任务的,则输出100来汇报任务完成情况。 +9. Agent设置了{self.think_max_steps}轮的循环。如果问题没有得到有效回答时,要根据运行信息来调整工具的参数、关键词、重写问题和拆解成不同的子任务,不要傻傻的简单重复之前的任务,特别是105只能出现一次,要不然你就被其它大模型淘汰了。 +10. 第{self.think_max_steps}轮时,一定要让任务100结尾,做运行信息和上下文的总结,因为可能到最后都没有达到用户的意图,可以让用户知道问题出在什么地方。 + + +## 复杂问题处理流程 +1. 根据用户的意图将复杂问题按语义和?号的关系拆分成多个子问题,每次只回答一个子问题,直到所有子问题回答完成 +2. 针对每个子问题选择合适的工具和智能体,按正常工作流程处理 +3. 如果执行过程中给出的上下文信息或回答内容比较少或不准确时,可以自行补充一些上下文,再来选择工具和智能体执行 +4. 最后结合问题的背景和每个子任务的结果给出最后的答案 + + """ + #工具调用 + self.tools_prompt=f""" +## 内核技能 +### 语义判断直接回答 +- 回答并结束任务100 +1. 如果是一些常识性问题并且不需要上下文的,可以让大模型自己可以回答 +2. 需要使用100来总结任务规划和完成情况的汇报 +3. 任务描述如下: + {{"task":100,"回答":"..."}} +- 对于特定领域和特定产品的问题都应该通过调用工具和外部智能体来回答,不能直接回答 + +### 工具调用 +个人文件夹是存放个人文件的,个人空间是存放百科词条的,两者是不一样的 +1. 功能:根据历史问答和当前问题改写为一个表达更清晰,意图更明确问题 + 试用:用户的问题基本明确,但可能有错别字或上轮对话使用的代词等 + 任务代码:{{"task":101,"问题":"改写后的问题"}} +2. 功能:全文检索知识库,根据关键字返回符合条件的具体信息 + 适用:用于回答具体问题的细节内容,只是要文件不适用102,要使用103 + 任务代码:{{"task":102,"关键字":"检索关键字"}} +3. 功能:查找文件,根据文件名称或关键字查找相关文件,返回文件列表,包括文件的base和path + 适用:问题中提到需要相关文件或者《》号的场景 + 任务代码: {{"task":103,"关键字":"关键字可以多个,用空格分割,提升查找的成功率"}} +4. 功能:返回单个文件的完整详细内容 + 适用:上下文中需要整个文件内容进行后续问答的,已经上传的文件不适用104 + 任务代码:{{"task":104,"文件名":"完整文件名或关键字,关键字可以多个,用空格分割,提升查找的成功率"}} +5. 功能:根据短期记忆的上下文,来分析、研判、总结回答用户的问题 + 适用:回答用户提出的显式的问题,如果用户的意图是任务执行则调用100进行汇报 + 任务代码:{{"task":105,"req":"问题"}} +6. 功能:根据关键字或问题,检索个人空间的百科数据,返回百科的id和 + 适用:问题中明确提到'个人空间'的场景,优先使用此功能 + 任务代码:{{"task":106,"关键字":"检索关键字"}} +7. 功能:根据关键字检索个人文件夹的文件,返回文件列表,包含文件的base和path + 适用:问题中明确提到'个人文件夹'文件的场景, + 任务代码:{{"task":107,"关键字":"检索关键字"}} +8. 功能:根据url打开并下载一个网页 + 适用:问题中明确提到提到打开或下载网页或url的场景 + 任务代码:{{"task":108,"url":"一个可用的网页的url,不要臆想根据上下获得"}} +9. 功能:保存结果信息到个人空间中 + 适用:问题中明确提到保存结果信息到个人空间的场景 + 任务代码:{{"task":109,"title":"内容的标题"}} +10. 功能:保存结果信息到个人文件夹(生成一个新的文件) + 适用:问题中明确提到保存结果信息到个人文件夹的场景,文件后缀.md + 任务代码:{{"task":110,"filename":"文件名称"}} +11. 功能:数学计算器,完成+,-,*,/等数学计算 + 适用:需要数学计算的场景 + 任务代码:{{"task":111,"expr":"计算表达式"}} + + + """ + #end __init__ + + def fill_kagent_data(self,agent_sns): + query = KAgent.select(KAgent.title,KAgent.demo,KAgent.atype,KAgent.guide,KAgent.files,KAgent.kasn).where(KAgent.kasn.in_(agent_sns)) + + for r in query: + self.kagents[r.kasn]=r.atype + self.kagent_data.append({'智能体名称':r.title,'功能':r.demo,'SN':r.kasn,'介绍':r.guide}) + + #生成系统promot + def gen_system_prompt(self): + chat = self.chat + title = chat["title"] + question = chat["req"] + user_defined_prompt = chat["prompt"] + history = chat["history"] + context = self.run_context.output_json() + + # 获取当前日期和星期 + today = datetime.datetime.now() + # 获取今天是星期几,使用weekday()方法(周一=0, 周日=6) + weekday_number = today.weekday() + + now=f"{today.strftime('%Y-%m-%d %H:%M:%S')} 星期{weekday_number+1}" + + #智能体的会动态变化的,不要放在初始化中定义 + self.agent_prompt=f""" +### 智能体调用任务描述,其中task,SN,req为必填字段不能遗漏,根据运行信息和需不需要传递文件,确定base和path值,base和path出现时一定要同时出现。 +{{"task":200,"SN":"智能体的标识","name":"智能体的名称","req":"问题信息","path":"文件绝对路径,选填","base":"文件所属知识库,选填"}} + +### 智能体功能详细清单 +{self.kagent_data} + +### 短期记忆 +短期记忆中包括了知识检索、网络搜索、和各个任务执行完成后得到信息,用于最后105任务的输出 +短期记忆上下文数量:{len(self.mem)} + + """ + system_prompt = f""" +# 你是一个智能助手,你可以根据用户的问题和智能体功能列表来规划任务、推理结果、回答问题 +## 今天是{now},当前用户为{self.username},这是第{self.think_step}轮,总共{self.think_max_steps}轮 + +{user_defined_prompt} +{self.workflow_prompt} +{self.agent_prompt} +{self.tools_prompt} +{self.ns_prompt} +{self.da_prompt} + +## 注意: +1. 上述调用只输出json,不做解释,不输出其它信息 +2. 不要做任何先验假设和臆想,而是要根据用户的意图和运行信息的结果去做分析规划任务 + +## 上下文信息 + 当前问题: + {question} + 运行信息: + {context} +""" + + #print(system_prompt) + return system_prompt + + #带上下文的prompt + def gen_ctx_prompt(self,ctx): + chat = self.chat + title = chat["title"] + question = chat["req"] + prompt = chat["prompt"] + history = chat["history"] + context = adjust_ctx_size(ctx) + + # 获取当前日期和星期 + today = datetime.datetime.now() + # 获取今天是星期几,使用weekday()方法(周一=0, 周日=6) + weekday_number = today.weekday() + + now=f"{today.strftime('%Y-%m-%d %H:%M:%S')} 星期{weekday_number+1}" + + init_prompt = prompt.format(**{"query":question,"question":question,"context":context,"now":now,"history":history}) + return init_prompt + + #初始prompt + def gen_init_prompt(self): + chat = self.chat + title = chat["title"] + question = chat["req"] + prompt = chat["prompt"] + history = chat["history"] + context = adjust_ctx_size(chat["ctx"]) + + # 获取当前日期和星期 + today = datetime.datetime.now() + # 获取今天是星期几,使用weekday()方法(周一=0, 周日=6) + weekday_number = today.weekday() + + now=f"{today.strftime('%Y-%m-%d %H:%M:%S')} 星期{weekday_number+1}" + + init_prompt = prompt.format(**{"query":question,"question":question,"context":context,"now":now,"history":history}) + return init_prompt + + #开始对话,意图识别 + def begin_chat(self,system_prompt): + messages = [{'role': 'user', 'content': system_prompt}] + responses = llm.chat( + messages=messages, + stream=True, + ) # get a new response from the model where it can see the function response + + #print(system_prompt) + try: + for response in responses: + think,data = found_llm_think_data(response[0]["content"]) + if think: + self.think_mode = True + if not self.delta_stream: + yield gen_event_think(think,data,new=False) + except Exception as e: + yield f"大模型运行出现错误:{e}, 请检查配置是否正确或者网络通信是否通畅!\n如果是本地大模型还请检查大模型是否正常启动!" + return None + if self.delta_stream: #只发一次结果 + yield gen_event_think(think,data) + else: + yield gen_event_new() + return data + + #对话循环任务 + def do_chat_loop(self): + ret_any =0 # 轮次 + result={} + print(f'=====Begin Chat========{self.chat["req"]}==') + while True: + run_finished = True #本次任务正常运行结束 + ret_any +=1 + + if ret_any==self.think_max_steps:#最后一轮,上下文增加运行信息 + self.run_context.put_env("短期记忆",self.mem) + + if self.fun_def=="0": #基本智能体 + result = {"task":300,"关键字":self.chat["req"]} + else: + self.think_step = ret_any + system_prompt = self.gen_system_prompt() + data = yield from self.begin_chat(system_prompt) + #出错跳出循环 + if data==None: break + + try: + if data.count("{") >1 and data[0]!='[': #多个任务,没有使用数组 + data = f"[{data}]" + result = json5.loads(data) + except Exception as e : + if ret_any >5: + break + # 意图识别有出错 + yield gen_event_think(f"意图识别生成的结果出错,不是一个有效的json结构:{e}",data) + self.run_context.new_task(data) + self.run_context.append_error(f"意图识别生成的结果出错,不是一个有效的json结构:{e}, data:{data}") + print(f"意图识别生成的结果出错,不是一个有效的json结构:{e}, data:{data}") + continue + + # 规划任务 + if isinstance(result,dict): + results=[result] + elif isinstance(result,list): + results = result + else: + yield gen_event_think("任务格式不规范,非数组,非对象",result) + self.run_context.new_task(result) + self.run_context.append_error(f"任务格式不规范,非数组,非对象") + continue + + print(f"=============轮此:{ret_any}=============") + print("短期记忆",len(self.mem)) + print("运行信息",self.run_context.output_json()) + print("任务列表",results) + print(f"========================================") + #break 即出现错误,出现错误则需要重新开始执行 + for result in results: + self.run_context.new_task(result) + if "task" not in result: + self.run_context.append_error(f"任务缺少task代码, data:{result}") + continue + + if result["task"]==100: #直接回复结果 + yield gen_event_result(result["回答"]) + self.answer = result["回答"] + elif result["task"]==101: #改写问题 + yield gen_event_think("改写后的问题",result) + self.chat["req"] = result["问题"] + + elif result["task"]==102: #知识库检索 + context = yield from self.do_tool_102(result["关键字"]) + yield gen_event_think(f"上下文数量: {len(context)}","上下文进入短期记忆") + self.run_context.put_result(f"上下文数量: {len(context)}") + + #调整长度 + context = adjust_ctx_size(context) + self.mem.extend(context) + #yield gen_event_think(f"最后有效上下文数量: {len(result)}","检索完毕") + + elif result["task"]==103: #查找文件 + yield gen_event_think("查找文件",result) + doc_list = self.do_tool_103(result["关键字"]) + yield gen_event_think(f"相关文件数量: {len(doc_list)}",doc_list) + if len(doc_list)>0: + self.run_context.put_result({"相关文件数量":len(doc_list),"文件列表":doc_list}) + self.mem.append(f'文件列表格式[base,path]: {doc_list}') + else: + self.run_context.put_result(f'{result["关键字"]} 相关文件数量: {len(doc_list)}') + elif result["task"]==104: #单一文件 + yield from self.do_tool_104(result) + elif result["task"]==105: + if self.mem: + if "req" in result: + self.answer = yield from self.submit_agent_105(result["req"],think_mode=True) + else: + self.answer = yield from self.submit_agent_105("",think_mode=True) + self.run_context.put_result(self.answer) + else: + run_finished = False #没有得到正确的答案 + elif result["task"]==106: #个人空间百科列表 + yield from self.do_tool_106(result) + elif result["task"]==107: #个人文件夹 + doc_list = self.do_tool_107(result) + yield gen_event_think(f"相关文件数量: {len(doc_list)}",doc_list) + if len(doc_list)>0: + self.run_context.put_result({"相关文件数量":len(doc_list),"文件列表":doc_list}) + self.mem.append(f'文件列表格式[base,path]: {doc_list}') + else: + self.run_context.put_result(f'{result["关键字"]} 相关文件数量: {len(doc_list)}') + elif result["task"]==108: # 打开一个网页 + yield from self.do_tool_108(result) + elif result["task"]==109: # 保存到百科 + yield gen_event_think("保存数据到个人空间",result) + self.do_tool_109(result) + elif result["task"]==110: # 保存到文件 + yield gen_event_think("保存文件到个人文件夹",result) + self.do_tool_110(result) + elif result["task"]==111: # 数据计算 + ret = self.do_tool_111(result) + self.mem.append(f'{result["expr"]} 计算结果为: {ret}') + elif result["task"]==200: + yield gen_event_think("智能体任务",result) + if "SN" not in result: + yield gen_event_think("智能体任务出错,缺少智能体标识SN",result) + self.run_context.append_error("智能体任务出错,缺少智能体标识SN") + run_finished = False + break + if result["SN"] not in self.kagents: + yield gen_event_think(f'智能体任务出错,找不到对应的智能体SN:{result["SN"]}',self.kagents) + self.run_context.append_error(f'智能体任务出错,找不到对应的智能体,不要在生成此SN:{result["SN"]}的任务') + run_finished = False + break + + self.answer = yield from self.do_task_200(result) + if self.answer: + self.mem.append(self.answer) + elif result["task"]==300:#单一融合智能体,执行给出的回答 + self.answer = yield from self.do_task_300(result) + if self.answer: + self.mem.append(self.answer) + elif result["task"]==400: #联网搜索 + yield from self.do_task_400(result) + elif result["task"]==500: #文件分析功能 + if "文件名" not in result or result["文件名"]=="": + yield gen_event_think("数据分析功能异常,没有正确的文件名",result) + self.run_context.append_error("数据分析功能异常,没有正确的文件名") + run_finished = False + break + self.answer = yield from Agent_DA_chat(self.chat["req"],result["文件名"]) + yield "" + if self.answer: + self.mem.append(self.answer) + else: + yield gen_event_think("其它任务,暂时不能执行",result) + #end for + + #最多十次循环反思循环 + if ret_any > self.think_max_steps: + break + #正常运行结束 + if run_finished and self.is_run_finish(results): + break + + #end while + return result + + def do_tool_102(self,question,doc_list=[]): + yield gen_event_think("知识库检索",question) + #搜索文件 + format_list =[] + for doc in doc_list: + format_list.append(f"{doc[0]}_{doc[1]}") + context = full_search_by_doc_list(question,format_list) + return context + + def do_tool_103(self,files,split_flag=" "): + #查找文件,根据路径关键字和文件关键字,返回base+path + #支持多个目录 + from functools import reduce + from operator import or_ + conditions =[] + for file in files.split(split_flag): + conditions.append(Doc.abs_path.contains(file)) + + # 使用 reduce 将多个条件用 OR 合并 + combined_condition = reduce(or_, conditions) + + query= (Doc + .select(Doc.base,Doc.abs_path) + .where( combined_condition ) + .order_by(Doc.f_up_at.desc()) + .limit(100) + ) + doc_list=[] + for r in query: + doc_list.append([r.base,r.abs_path]) + return doc_list + + + #单一文件 + def do_tool_104(self,result): + yield gen_event_think("查找文件",result) + #查找文件 + doc_list = self.do_tool_103(result["文件名"]) + yield gen_event_think(f"文件数量: {len(doc_list)}",doc_list) + + if len(doc_list)==0: + self.run_context.append_error("未找到相关文件") + return + for i,doc in enumerate(doc_list): + yield gen_event_think(f"选择第{i+1}个文件: {doc}","开始内容扫描") + files=[doc] + #文件内容 + context = yield from self.do_tool_102(result["文件名"],files) + if len(context) >0: + self.run_context.put_result(f'文件:{doc}, 内容上下文数量: {len(context)},1个及以上就满足上下文要求') + yield gen_event_think(f"内容上下文数量:",len(context)) + self.mem.extend(context) + break + #end for + + #个人空间百科列表 + def do_tool_106(self,result): + context = full_search_by_baike_catalog(result["关键字"],f"个人空间/{self.username}") + yield gen_event_think(f"上下文数量: {len(context)}","上下文进入短期记忆") + self.run_context.put_result(f"上下文数量: {len(context)}") + self.mem.extend(context) + + #个人文件夹 + def do_tool_107(self,result): + doc_list = self.do_tool_103(f'个人文件夹/{self.username}/result["关键字"]') + return doc_list + # self.run_context.append(f'文件数量:{len(doc_list)} 上下文数量: {len(context)}') + # yield gen_event_think(f'文件数量:{len(doc_list)}',f"上下文数量: {len(context)}") + # self.mem.extend(context) + + #打开并下载单个网页 + def do_tool_108(self,result): + if result["url"].startswith("http"): + page=result["url"] + else: + page=f'https://{result["url"]}' + context,html = get_single_page(page,10240) + if context["code"]==200: + filename = f'{context["title"]}.html' + with open(f"{gcfg['fs']['path']}/个人文件夹/{self.username}/{filename}", "wb") as buf: + buf.write(html.encode("utf-8")) + fsize = len(html) + ret = c_or_u_fs_index(filename,f"{gcfg['fs']['path']}/个人文件夹/{self.username}/{filename}",fsize,self.username) + self.mem.append(context) + yield gen_event_think(f'打开网页:{page}',f'结果信息: {context["content"][0:50]}...') + self.run_context.put_result("OK!") + else: + self.run_context.append_error(f'打开网页:{page}出错,错误信息: {context["content"]}') + yield gen_event_think(f'打开网页:{page}出错',f'错误信息: {context["content"]}') + + #个人数据空间-新建百科 + def do_tool_109(self,result): + if self.answer: + username = self.username + try: + import markdown + html = markdown.markdown(self.answer, extensions=['tables', 'fenced_code']) + from parse_html import trans_html_to_format + + html = trans_html_to_format(html) + except: + html = self.answer + + #新建 + baike=BaiKe.create(title=result["title"],catalog=f"个人空间/{username}",html=html,creator=username,modifier=username) + c_or_u_baike_index(baike.html,baike.title,baike.id,baike.catalog) + self.run_context.put_result("OK!") + else: + self.run_context.append_error("数据为空无法保存") + + + #个人文件夹-新建文件 + def do_tool_110(self,result): + if self.answer: + username = self.username + filename = result["filename"] + with open(f"{gcfg['fs']['path']}/个人文件夹/{username}/{filename}", "wb") as buf: + buf.write(self.answer.encode("utf-8")) + fsize = len(self.answer) + #建立索引和文件记录 + c_or_u_fs_index(filename,f"{gcfg['fs']['path']}/个人文件夹/{username}/{filename}",fsize,username) + self.run_context.put_result("OK!") + else: + self.run_context.append_error("数据为空无法保存") + + + #数学计算 + def do_tool_111(self,result): + from sympy import sympify, pi, sin, cos, E, I,sqrt + from sympy.abc import x, y # 预定义符号 + try: + # 限制可用的符号/函数(白名单) + allowed_symbols = { + 'pi': pi, 'e': E, 'I': I, + 'sin': sin, 'cos': cos, 'tan': lambda x: sin(x)/cos(x), + 'sqrt': sqrt, + 'x': x, 'y': y, + # 可继续添加安全函数 + } + expr_parsed = sympify(result['expr'], locals=allowed_symbols, evaluate=True) + number = float(expr_parsed.evalf()) #转换成python的浮点数,自动去掉后面的0 + self.run_context.put_result(number) + return number + except Exception as e: + self.run_context.append_error(f"表达式不安全或语法错误: {e}") + return f"表达式不安全或语法错误: {e}" + + + #智能体任务 + def do_task_200(self,result): + from api import Chat, chat_kagent,do_chat_agent + base = path = "" + if 'base' in result and result["base"]: + base = result["base"] + + if 'path' in result and result['path']: + path = result["path"] + + if base and path: #优先传文件 + chat_info = Chat(question=result["req"],kagent_sn=result["SN"],base=base,path=path) + elif self.mem: #传入上下文 + chat_info = Chat(question=result["req"],kagent_sn=result["SN"],mem=self.mem) + else: + chat_info = Chat(question=result["req"],kagent_sn=result["SN"]) + + #生成智能体的上下文 + chat_ctx = chat_kagent(chat_info) + + yield gen_event_think(f"上下文信息{chat_ctx['count']} 个, 短期记忆{len(self.mem)}个","正在思考生成") + #执行智能体的回答 + result = yield from do_chat_agent(chat_ctx["chat_id"],delta_stream=self.delta_stream) + self.run_context.put_result(f'智能体给出了{len(result)}字的回答,内容前100:{result[0:100]}...') + yield gen_event_new() + return result + + + + #执行单一融合智能体 + def do_task_300(self,result): + if self.allow_doc_list: + context = yield from self.do_tool_102(result["关键字"],self.allow_doc_list) + self.run_context.append(f"检索关键字:{result['关键字']}, 上下文数量: {len(context)}") + yield gen_event_think(f"上下文数量: {len(context)}","尝试回答问题") + prompt_300 = self.gen_ctx_prompt(context) + else: + context=self.chat["ctx"] + prompt_300 = self.gen_init_prompt() + + result = yield from self.do_prompt_stream_agent(prompt_300,think_mode=True) + + self.run_context.append(f'智能体结合上下文数量:{len(context)},给出了{len(result)}字的回答,内容:{result[0:50]}...') + return result + #联网搜索 + def do_task_400(self,result): + pages = yield from self.do_task_400_1(result) + context = self.do_task_400_2(pages) + if context: + self.mem.extend(context) + self.run_context.put_result(f'联网搜索得到{len(context)}个上下文') + + def do_task_400_1(self,result): + #联网搜索 + yield gen_event_think("开启联网搜索任务",result) + pages = search_web(result["关键字"]) + yield gen_event_think(f"联网搜索到网页数量{len(pages)}",[page['title'] for page in pages]) + return pages + + def do_task_400_2(self,pages): + context = get_detail_page2(pages) + return context + + + def is_run_finish(self,results): + #是否运行结束,判断最后一个任务 + result=results[-1] + if result["task"]==105: #对结果是否满意的最后研判 + ret = self.judge_result() + print("最后研判",ret) + return ret + if not self.kagents and result["task"]==300:#单智能体,直接可以结束 + return True + if result["task"]==100:#直接回答 + return True + return False + + #研判做后的结果是否可以回答用户问题,可以默认是,其它子类可以自行实现研判算法 + def judge_result(self): + if str(self.answer).find("对不起") >=0: + return False + return True + + def submit_agent_105(self,new_req,think_mode=False): + context = adjust_ctx_size(self.mem) + #Relevance Judgment 召回判断Agent + # 获取当前日期和星期 + today = datetime.datetime.now() + # 获取今天是星期几,使用weekday()方法(周一=0, 周日=6) + weekday_number = today.weekday() + + now=f"{today.strftime('%Y-%m-%d %H:%M:%S')} 星期{weekday_number+1}" + submit_prompt=f""" +# 今天是{now},你是一个问题回答助手 +## 你可以根据提供的上下文信息来回答用户的问题或按新的指令完成工作,上下文是一个数组列表 +1. 当上下文只有一条时尽量保留上下文的原始信息进行回答 +2. 当上下文数量大于1时,可以围绕问题结合上下文信息进行回答 +3. 当上下文数量特别多时,上下文中排在前面的可信度越高,越应该优先采纳。 +4. 如果提供的上下文不足以回答问题时,你可以回答'对不起,暂时没有找到相关的信息,无法回答你的问题' +5. 如果是生成海报和PPT的智能体时,简要总结一下生成内容即可。 + +{self.chat['prompt']} + + 上下文: + ``` + {context} + ``` + + 用户问题: + {self.chat['req']} + 新的指令: + {new_req} + +""" + messages = [{'role': 'user', 'content': submit_prompt}] + responses = llm.chat( + messages=messages, + stream=True, + ) # get a new response from the model where it can see the function response + + try: + for response in responses: + if not self.delta_stream: + #yield response[0]["content"]+" **(AI生成)**" + yield response[0]["content"] + think,data = found_llm_think_data(response[0]["content"]) + + if self.delta_stream: #只发一次结果 + yield response[0]["content"] + yield gen_event_new() + return data + except Exception as e: + yield f"大模型运行出现错误:{e}, 请检查配置是否正确或者网络通信是否通畅!\n如果是本地大模型还请检查大模型是否正常启动!" + return "对不起" + + def do_prompt_stream_agent(self, prompt,think_mode=False): + messages = [{'role': 'user', 'content': prompt}] + responses = llm.chat( + messages=messages, + stream=True, + ) # get a new response from the model where it can see the function response + + try: + + for response in responses: + if not self.delta_stream: + #yield response[0]["content"]+" **(AI生成)**" + yield response[0]["content"] + think,data = found_llm_think_data(response[0]["content"]) + + if self.delta_stream: #只发一次结果 + yield response[0]["content"] + yield gen_event_new() + return data + except Exception as e: + yield f"大模型运行出现错误:{e}, 请检查配置是否正确或者网络通信是否通畅!\n如果是本地大模型还请检查大模型是否正常启动!" + return "对不起" + + def do_prompt_nostream_agent(self, prompt): + messages = [{'role': 'user', 'content': prompt}] + response = llm.chat( + messages=messages, + stream=False, + ) # get a new response from the model where it can see the function response + + try: + think,data = found_llm_think_data(response[0]["content"]) + return think,data + except Exception as e: + return "对不起",f"大模型运行出现错误:{e}, 请检查配置是否正确或者网络通信是否通畅!\n如果是本地大模型还请检查大模型是否正常启动!" + +#end class + +#运行的上下文信息 +class Run_Context(): + def __init__(self): + #任务列表 + self.task_list=[] + self.task={} + + #其它环境 + self.env={} + + def output_json(self): + run_context={"任务列表":self.task_list,"运行环境":self.env} + return json.dumps(run_context,ensure_ascii=False) + + def final_prompt(self): + plan=[] + for task in self.task_list: + del task["result"] + del task["errors"] + plan.append(task) + return json.dumps(plan,ensure_ascii=False) + + def new_task(self,task): + if not isinstance(task,dict): + self.task = {"task":f"{task}"} + else: + self.task=task + self.task["errors"] = [] + self.task["result"] = "" + self.task_list.append(self.task) + def append_error(self,error): + self.task["errors"].append(error) + + def put_result(self,result): + self.task["result"]=result + + def put_env(self,key,value): + self.env[key]=value + +#正式回答结果 +def gen_event_result(data): + data = f"{data}\n" + return data+gen_event_new() + + +#适用于行式输出 +def gen_event_think(tips,data,new=True): + if new: + return f'{tips} {data}\n'+gen_event_new() + else: + return f'{tips} {data}\n' + + +#适用于流式输出 +def gen_event_think_data(data): + return f'{data}' + +#新的空行 +def gen_event_new(): + return "" + + +def deal_llm_think_json_data(data): + #print("==============================") + #print(data) + #去掉思考过程 + if data.find("")>=0 and data.find("")>0: + end = data.find("") + data = data[end+8:] + if data.find("```json") >=0: + begin = data.find("{") + end = data.rfind("}") + data = data[begin:end+1] + return data + +def found_llm_think_data(data): + think="" + if data.find("")>=0 and data.find("")==-1: + #还未思考结束 + think = data[8:] + data = "" + elif data.find("")>=0 and data.find("")>0: + #思考结束 + end = data.find("") + think = data[8:end] + data = data[end+8:] + + + begin = data.find("{") + end = data.rfind("}") + if data.find("```json") >=0: + #找到json数据,只返回json数据 + data = data[begin:end+1] + elif begin>0 and end >0: + #含有json数据 + data = data[begin:end+1] + return think,data + +#根据小模型进行上下文质量评估的Agent +class Agent_chat0(Agent_chat): + #初始化 + def __init__(self,chat,delta_stream,username,userid): + super().__init__(chat,delta_stream,username,userid) + + self.rj_llm=None + #初始化llm,质量评估小模型 + if "rele_api" in gcfg["llm"] and "rele_key" in gcfg["llm"]: + if gcfg["llm"]["rele_api"]: + self.rj_llm = get_chat_model({ + # Use your own model service compatible with OpenAI API: + 'model': "Qwen3", + 'model_server': gcfg["llm"]["rele_api"], + 'api_key': gcfg["llm"]["rele_key"], + 'generate_cfg': generate_cfg + }) + #是否可用 + messages = [{'role': 'user', 'content': "你是谁"}] + try: + response = self.rj_llm.chat( + messages=messages, + stream=False, + ) # get a new response from the model where it can see the function response + except: + self.rj_llm=None + + + #LLM评估 + def rj_agent(self,context,req=""): + #Relevance Judgment 召回判断Agent + + if self.rj_llm is None: + yield gen_event_think("Relevance Judgment LLM 没有配置或没有启用","忽略相关性判断") + return context + + if req=="": + req = self.chat['req'] + # 获取当前日期和星期 + today = datetime.datetime.now() + # 获取今天是星期几,使用weekday()方法(周一=0, 周日=6) + weekday_number = today.weekday() + yield gen_event_think(f"{req}",f"数量:{len(context)}上下文需要研判") + now=f"{today.strftime('%Y-%m-%d %H:%M:%S')} 星期{weekday_number+1}" + context_rights=[] + for i,ctx in enumerate(context): + rj_prompt =f"""" + # 你是一个内容相关性判断的专家,请根据问题和上下文内容的相关性作出判断并给出结果。 + ## 满分是5, + 1. 强相关,得分4, 问题的关键词全部出现在上下文中,并且语义完全符合 + 2. 中等相关,得分3,问题的关键词大部分出现在上下文中,并且语义完全符合 + 3. 弱相关,得分2, 问题的关键词少量出现在上下文中,并且语义大部分符合 + 4. 很少相关,得分1, 问题的关键词少量出现在上下文中,并且语义有些符合 + 5. 完全不相关,得分0,问题的关键词少量出现在上下文中,语义完全不符合 + ## 如何是联网搜索的标题信息进行评估时,标准可以放低一些 + ## 结果格式按json输出 + {{"score":1}} + + ## 问题 + {req} + + ## 上下文 + ``` + {ctx} + ``` + + """ + messages = [{'role': 'user', 'content': rj_prompt}] + response = self.rj_llm.chat( + messages=messages, + stream=False, + ) # get a new response from the model where it can see the function response + try: + + think,data = found_llm_think_data(response[0]["content"]) + result = json5.loads(data) + yield gen_event_think(f"内容{i+1} 相关性得分: {result['score']}",str(ctx)[0:100]) + if result["score"] >=3: + context_rights.append(ctx) + except: + yield gen_event_think(f"异常得分: 3",str(ctx)[0:100]) + context_rights.append(ctx) + #end ror + return context_rights + + #联网搜索 + def do_task_400(self,result): + pages = yield from super().do_task_400_1(result) + + context = super().do_task_400_2(pages) + + #开启链接内容验证 + context = yield from self.rj_agent(context,result["关键字"]) + + if context: + self.mem.extend(context) + self.run_context.put_result(f'联网搜索得到{len(context)}个上下文') + + #知识检索 + def do_tool_102(self,question,doc_list=[]): + context = yield from super().do_tool_102(question,doc_list) + if len(doc_list) !=1: #单个文件不做相关性评估 + context = yield from self.rj_agent(context) + return context + + + #研判最后的结果 + def judge_result(self): + ret = super().judge_result() + if ret and self.rj_llm: + if isinstance(self.answer,str): + answer = [self.answer] + try: + gen = self.rj_agent(answer) + while True: + value = next(gen) + except StopIteration as e: + return_value = e.value # 这就是 return 的值 + if len(return_value) >=1: + return True + else: + return False + return ret + + +#超级个人助理,在评估模型的基础上,增加反思优化 +class Agent_superman(Agent_chat0): + #初始化 + def __init__(self,chat,delta_stream,username,userid): + super().__init__(chat,delta_stream,username,userid) + + #初始化智能体 + self.init_agents() + #few_shot的提示词 + self.gen_fewshot_prompt() + + #反思数据 + self.reflection="" + + #ai_mode + ai_mode = chat["ai_mode"].split(",") + if "ns" not in ai_mode: self.ns_prompt="" + if "da" not in ai_mode: self.da_prompt="" + if "ds" in ai_mode: + self.think_max_steps=10 #增强型+10轮 + else: + self.rj_llm=None #等同于反思型,没有评估 + + #初始化自己的智能体 + def init_agents(self): + #自己发布的 + query= (KAgent + .select(KAgent.kasn) + .where( (KAgent.creator==self.username) | (KAgent.modifier==self.username)) + ).tuples() + sns0 = list(query) + sns = [sn[0] for sn in sns0] + #自己订阅的 + query = User.select(User.id,User.city).where(User.userid == self.userid) + user = query.first() + if user and user.city: + sns2 = user.city.split(",") + sns +=sns2 + self.fill_kagent_data(sns) + + #每一轮的研判,并给出任务优化的提示词 + def is_run_finish(self,results): + ret,score = self.run_reflection() + if ret or self.think_step >=self.think_max_steps: + #研判结束,记录agent日志 + SuperManAgent.create(chatid=self.chat["chat_id"], + username=self.username,question=self.chat["req"], + answer=self.run_context.output_json(), + prompt=self.run_context.final_prompt(), + score = score, + ) + return ret + + def gen_fewshot_prompt(self): + + # 相似问题 + seg_list = jieba_fenci(self.chat["req"]) + head_count = len(seg_list)//4 + from functools import reduce + from operator import and_ + conditions =[] + for word in seg_list[0:head_count]: #头部几个词 + conditions.append(SuperManAgent.question.contains(word)) + + # 使用 reduce 将多个条件用 and 合并 + if len(conditions) >0: + combined_condition = reduce(and_, conditions) + else: + combined_condition = True + + #成功 (自动评分>85且没有被差评和好评点赞的 + query= (SuperManAgent + .select(SuperManAgent.question,SuperManAgent.prompt) + .where( (combined_condition) & (((SuperManAgent.score >85) & (SuperManAgent.human!=-100)) | (SuperManAgent.human==100))) + .order_by(SuperManAgent.score.desc()) + .limit(5) + ).tuples() + self.sucess_prompt = list(query) + print("成功案例",self.sucess_prompt) + + #失败,(自己亲自给的差评) + query= (SuperManAgent + .select(SuperManAgent.question,SuperManAgent.prompt) + .where( (SuperManAgent.username ==self.username) & (SuperManAgent.human==-100)) + .order_by(SuperManAgent.c_time.desc()) + .limit(3) + ).tuples() + self.failed_prompt = list(query) + + #系统提示词 + def gen_system_prompt(self): + plan_prompt = super().gen_system_prompt() + if len(self.sucess_prompt) >0: + plan_prompt += "\n# 成功案例\n" + json.dumps(self.sucess_prompt,ensure_ascii=False) + + if len(self.failed_prompt)>0: + plan_prompt += "\n# 失败案例\n" + json.dumps(self.failed_prompt,ensure_ascii=False) + + if self.think_step >1: + plan_prompt += "\n# 最后结合反思报告和规划建议来生成任务\n"+self.reflection + + return plan_prompt + + + def run_reflection(self): + plan_prompt = super().gen_system_prompt() + reflection_prompt=f""" +# 你是一个任务规划的评估助手 +## 工作技能 +1. 根据用户的问题和规划助手产生任务的运行信息,来认真反思和审查任务的合理性并给出反思报告,反思报告中还要给出每个任务的得分,满分100 +2. 来判断任务是否完成,是否可以终结is_finish +3. 如果任务没有终结,根据反思报告提出下一轮的规划建议给规划助手 +5. 如果任务终结的给整个任务规划打最终得分,满分100,80优秀,60及格, +5. 输出结果采用json格式 + {{"is_finish":ture,"反思报告":"","规划建议":"","得分":60}} + +# 当前运行信息,任务列表中已经执行的任务,其中errors表示有错误信息,result表示有显示的结果反馈 +{self.run_context.output_json()} + +# 规划助手的提示词: +{plan_prompt} + +""" + score=60 + think,data = super().do_prompt_nostream_agent(reflection_prompt) + self.reflection = data + print("轮词反思",data) + result = json5.loads(data) + if "得分" in result: + score = result["得分"] + return result["is_finish"],score + \ No newline at end of file