from typing import Union from fastapi import FastAPI,Query,UploadFile,File,HTTPException,Request,Path,Form from fastapi.responses import StreamingResponse,FileResponse,RedirectResponse from fastapi.staticfiles import StaticFiles from pydantic import BaseModel,Field from typing import Optional,List,Dict,Any import json import time import datetime import json5 import shutil ############################ 模块代码 ######################## #全文检索 from full_index_search import full_search,doc_search,c_or_u_baike_index,c_or_u_fs_index,one_doc_search,delete_index_by_docid,adjust_ctx_size,full_search_by_baike_catalog,full_search_by_doc_list #初始化信息 from init import * from utils import regular_filename #文件仓库 from ff import sftp_list_directory,smb_list_directory,linux_list_directory,ftp_list_directory,create_doc_cache #数据库 from k_database import QA,BaiKe,BaiKe_Catalog,Doc,KAgent # from peewee import fn from agent_chat import Agent_chat_build #得到客户端ip def get_client_ip(request): x_forwarded_for = request.headers.get("X-Forwarded-For") if x_forwarded_for: client_ip = x_forwarded_for.split(",")[0] else: # 如果没有 X-Forwarded-For,尝试从 X-Real-IP 获取 client_ip = request.headers.get("X-Real-IP") # 如果以上都没有,使用 request.client.host if not client_ip: client_ip = request.client.host return client_ip #KAgent的sn查询,对外使用 #@app.get("/api/kagent_cfg/{kasn}") def get_kagent_by_sn(kasn: str,request:Request): if kasn=='superman': title,demo,atype,guide,files = "超级个人助理","AI革命数字永生",3,"你好!我是你的超级个人助手!我可以调用我的发布和我的订阅智能体来帮你完成任务规划和执行,有事不决请问我!","S" else: #根据sn获得知识体的详细信息 try: query = KAgent.select(KAgent.title,KAgent.demo,KAgent.atype,KAgent.guide,KAgent.files).where(KAgent.kasn==kasn) title,demo,atype,guide,files = query.scalar(as_tuple=True) except: return {"title":"未知应用","guide":"未能正确识别SN",'atype':"","files":"","demo":"SN不正确,无法访问应用"} username = request.cookies.get("username", "匿名用户") username = decode_username(username) cfg = {"title":title,"guide":guide,'atype':atype,"files":files,"demo":demo,"username":username} #钉钉参数 if gcfg["dingtalk"]["app_key"]!="": dingtalk={"client_id":gcfg["dingtalk"]["app_key"], "close_login":gcfg["dingtalk"]["exclusiveLogin"], "corpid":gcfg["dingtalk"]["exclusiveCorpId"], } cfg["dingtalk"]=dingtalk #微信参数 if gcfg["wechat"]["app_key"]!="": cfg["wechat"] = gcfg["wechat"]["app_key"] #logo cfg["logo"] = gcfg["fs"]["logo"] #官网 cfg["url"]=gcfg["fs"]["url"] #服务码 cfg["aicode"]=gcfg["fs"]["aicode"] #分类: cfg["agent_catalogs"] = get_gcfg_mem()["agent_catalogs"] return cfg #KAgent对话 #写入对话上下文 def write_chat_context(chat_id,context): with open(f"/tmp/k3gpt/{chat_id}","w+") as f: json.dump(context,f,ensure_ascii=False) #删除对话上下文 def delete_chat_by_id(chat_id): try: os.remove(f"/tmp/k3gpt/{chat_id}") except: pass #获取上下文 def get_chat_context(chat_id): try: with open(f"/tmp/k3gpt/{chat_id}","r") as f: context = json.load(f) return context except Exception as e: raise Exception(f"上下问读取失败:{e}") from typing import Optional class Chat(BaseModel): question: str = Field(..., example="你是谁?") ai_mode: Optional[str] = Field("", example="对话模式") base:Optional[str] = Field(None, example="知识库") file: Optional[str] = Field(None, example="文件名") path: Optional[str] = Field(None, example="路径") history: Optional[list] = Field(None, example="历史对话") kagent_sn: Optional[str] = Field(None, example="Agent的sn,识别码") mem: Optional[list] = Field([], example="可选的短期记忆") #KAgent对话 def chat_kagent(chat:Chat): chat_id=f"chat_{time.time()}" count=0 ctx=[] #上下文信息 #根据sn获得知识体的详细信息 if chat.kagent_sn=='superman': title,atype,prompt,files = "超级个人助理",3,"# 你是一个超级个人助理,优先使用100来回答问题,回答不好的再使用工具和其它智能体,你可以根据成功案例和失败案例进行参考","S" else: query = KAgent.select(KAgent.title,KAgent.atype,KAgent.prompt,KAgent.files).where(KAgent.kasn==chat.kagent_sn) title,atype,prompt,files = query.scalar(as_tuple=True) #上传文件 upload_files=[] if chat.base and chat.path: upload_files.append([chat.base,chat.path]) if not files: files="" #根据类型进行相应的处理 if atype in [0,1]: doc_list=[] #全文检索相关知识 if atype==0: doc_list = files.split(",") elif atype==1: #关键字检索文档列表 #支持多个目录 from functools import reduce from operator import or_ conditions =[] for file in files.split(","): conditions.append(Doc.abs_path.contains(file)) # 使用 reduce 将多个条件用 OR 合并 combined_condition = reduce(or_, conditions) query= (Doc .select(Doc.base,Doc.abs_path) .where( combined_condition ) .order_by(Doc.f_up_at.desc()) .limit(10000) ) for r in query: doc_list.append(f"{r.base}_{r.abs_path}") #end if print("相关文档数量",len(doc_list)) #搜索相关的百科 context0 = [] #确认是否有满足问题的百科数据 context0 = full_search_by_baike_catalog(chat.question,f"智能体数据/{title}") #搜索文件 context = full_search_by_doc_list(chat.question,doc_list) if len(context0)==0: #没有百科相关信息 context = adjust_ctx_size(context) count = len(context) if chat.ai_mode.find("Detailed")==0: for i in range(count): ctx.append({"name":i,"path":i,"base":i,"ctx":context[i],"size":len(context[i])}) write_chat_context(chat_id,{"title":title,"req":chat.question,"atype":atype,"ctx":context,"mem":chat.mem, "prompt":prompt,"history":chat.history,"upload_files":upload_files}) elif len(context0) < 10 or len(context) < 10: #百科信息比较少的情况,合并知识百科的内容,进行回答 context0.extend(context) context = adjust_ctx_size(context0) count = len(context) if chat.ai_mode.find("Detailed")==0: for i in range(count): ctx.append({"name":i,"path":i,"base":i,"ctx":context[i],"size":len(context[i])}) write_chat_context(chat_id,{"title":title,"req":chat.question,"atype":atype,"ctx":context,"mem":chat.mem, "prompt":prompt,"history":chat.history,"upload_files":upload_files}) else: #百科的相关内容很多 #依赖知识百科,二次回答,时间比较长 #第一步根据检索信息,得到大模型回答的结果 context = adjust_ctx_size(context) count = len(context) answer = do_chat_agent_nostream({"title":title,"req":chat.question,"atype":atype,"ctx":context,"mem":chat.mem, "prompt":prompt,"history":chat.history,"upload_files":upload_files}) #第二步,结合结合知识百科的信息,进行综合回答 context = adjust_ctx_size(context0) context.insert(0,f"搜索答案: {answer}") count = len(context) prompt=""" # 智能助手 你是一个问答机器人的助手,请使用提供的上下问来回答问题. # 回答逻辑 1. 上下文中标有知识百科的数据可信度很高,可以优先采纳 2. 搜索答案是通过大模型RAG得到的答案 3. 如果知识百科的数据和问题关联性很高时,可以讲百科的数据和搜索答案进行融合来回答 4. 如果知识百科的数据和问题关联性不高时,可以直接使用搜索答案来回答 # 上下文 {context} # 问题 {question} """ if chat.ai_mode.find("Detailed")==0: for i in range(count): ctx.append({"name":i,"path":i,"base":i,"ctx":context[i],"size":len(context[i])}) write_chat_context(chat_id,{"title":title,"req":chat.question,"atype":atype,"ctx":context, "prompt":prompt,"history":chat.history,"upload_files":upload_files,"mem":chat.mem}) else: #非检索的、融合检索的、上传文件的 if chat.mem: ctx.append(chat.mem) write_chat_context(chat_id,{"title":title,"req":chat.question,"atype":atype,"ctx":ctx, "prompt":prompt,"ai_mode":chat.ai_mode, "history":chat.history,"fun_def":files,"upload_files":upload_files}) return {"chat_id":chat_id,"count":count,"ctx":ctx} def decode_username(username): if username !="匿名用户": # 原始字符串 #raw_string = "b'\\xe9\\x83\\x9c'" # 提取字节部分 byte_sequence = username[2:-1] # 去掉前后的 `b'` 和 `'` # 将转义的字节字符串转换为实际的字节对象 byte_string = bytes(byte_sequence, 'utf-8').decode('unicode_escape').encode('latin1') # 解码为正常字符串 username = byte_string.decode('utf-8') return username #KAgent对话的流输出,采用事件流机制,实时输出 #@app.get("/api/chat_kagent/{chat_id}") def chat_event(chat_id: str,request:Request): userid = request.cookies.get("userid", "id-00001") if userid=="id-00001": return StreamingResponse(sse_do_response(do_chat_error(chat_id),chat_id, "k3gpt"),media_type="text/event-stream") username = request.cookies.get("username", "匿名用户") username = decode_username(username) client_ip = get_client_ip(request) return StreamingResponse( sse_do_response( do_chat_agent(chat_id,userid,username,client_ip), chat_id, "k3gpt"), media_type="text/event-stream") def do_chat_error(chat_id): delete_chat_by_id(chat_id) yield "用户认证信息丢失,请重新认证后再访问!" #SSE+不同的返回format() def sse_do_response(gen,chat_id,format="k3gpt"): if format=="k3gpt": for content in gen: if content.endswith("") and len(content)>5: yield gen_k3gpt_result(content[0:-5]) yield gen_k3gpt_result("") elif content.startswith(""):#工具+数据 data_start = content.find("") yield gen_k3gpt_result2(content[6:data_start],content[data_start+6:]) else: yield gen_k3gpt_result(content) else: for content in gen: if content.endswith("") and len(content)>5: yield gen_openai_result(chat_id,content[0:-5]) else: yield gen_openai_result(chat_id,content) #生成类OpenAI的流式diego结果 def gen_openai_result(chat_id,data): messages=[] messages.append({"delta":{"role":"assistant","content":data}}) response={"id":chat_id,"object":"chat","choices":messages} json_str = "data: " + json.dumps(response) + "\n\n" return json_str.encode("utf-8") #返回结果信息和UI保持一致,rsp为真正内容 def gen_k3gpt_result(data): json_data={} json_data["rsp"] = data json_str = "data: " + json.dumps(json_data) + "\n\n" return json_str.encode("utf-8") #返回结果信息和UI保持一致,rsp为真正内容,tool_name为工具名称,用来处理chart和表格的json数据 def gen_k3gpt_result2(tool_name,data): json_data={} json_data["rsp"] = data json_data["type"] = tool_name json_str = "data: " + json.dumps(json_data) + "\n\n" return json_str.encode("utf-8") def found_llm_think_data(data): think="" if data.find("")>=0 and data.find("")==-1: #还未思考结束 think = data[8:] data = "" elif data.find("")>=0 and data.find("")>0: #思考结束 end = data.find("") think = data[8:end] data = data[end+8:] if data.find("```json") >=0: #找到json数据,只返回json数据 begin = data.find("{") end = data.rfind("}") data = data[begin:end+1] return think,data #KAgent chat的主函数 def do_chat_agent(chat_id,userid="10000",username="Admin",client_ip="127.0.0.1",delta_stream=False): chat = get_chat_context(chat_id) chat["chat_id"]=chat_id title = chat["title"] question = chat["req"] atype = chat["atype"] #智能体类型 prompt = chat["prompt"] history = chat["history"] context = chat["ctx"] answer = "" if atype==3: #融合智能检索模式 agent_chat = Agent_chat_build(chat,delta_stream,username,userid) answer = yield from agent_chat.do_chat_loop() else: if "mem" in chat: mem = chat["mem"] else: mem = [] # 获取当前日期和星期 today = datetime.datetime.now() # 获取今天是星期几,使用weekday()方法(周一=0, 周日=6) weekday_number = today.weekday() now=f"{today.strftime('%Y-%m-%d %H:%M:%S')} 星期{weekday_number+1}" #用户不用再添加部分内容 if prompt.find("{context}")==-1 or prompt.find("{question}")==-1: prompt +=""" ### 当前时间 {now} ### 问题 {question} ### 上下文 {context} """ upload_files = chat["upload_files"] file_content="" if upload_files: print("上传文件",upload_files) #只取第一个文件 f1 = [f'{upload_files[0][0]}_{upload_files[0][1]}'] filename = upload_files[0][1] file_contents = full_search_by_doc_list("",f1) file_prompt = prompt +""" ### 文件 {filename} ### 文件内容 {file_content} """ if len(file_contents)>1: answers = [] for i,file_content in enumerate(file_contents): print(f"分片[{i}], 大小: {len(file_content)}") system_prompt = file_prompt.format(**{"query":question,"question":question,"context":context,"now":now,"file_content":file_content,"filename":filename}) answer = yield from llm_ask(system_prompt,delta_stream) answers.append(answer) #汇总答案 sum_prompt = prompt +""" # 上下文是文件的多个分片分别计算的到的结果,现在合并上下问,结合用户的问题,做最后的回答输出 """ system_prompt = sum_prompt.format(**{"query":question,"question":question,"context":answers,"now":now,"history":history}) sum_answers = yield from llm_ask(system_prompt,delta_stream) answer = answers elif len(file_contents)==1: file_content = file_contents[0] system_prompt = file_prompt.format(**{"query":question,"question":question,"context":context,"now":now,"history":history,"file_content":file_content,"filename":filename}) answer = yield from llm_ask(system_prompt,delta_stream) else: yield f"没有找到文件的数据分片:{filename}" else: prompt +=""" ### 上下文2 {mem} """ system_prompt = prompt.format(**{"query":question,"question":question,"context":context,"now":now,"history":history,"mem":mem}) answer = yield from llm_ask(system_prompt,delta_stream) #删除文件 QA.create(userid=userid,username=username, question=question,answer=answer,ref=f"知识体:{title}",client_ip=client_ip) delete_chat_by_id(chat_id) return answer #大模型回答, def llm_ask(system_prompt,delta_stream): messages = [{'role': 'user', 'content': system_prompt}] responses = llm.chat( messages=messages, stream=True, delta_stream=delta_stream ) # get a new response from the model where it can see the function response answer="" try: for response in responses: #print(response[0]["content"]) yield response[0]["content"] if delta_stream==True: answer +=response[0]["content"] if not delta_stream: think,data = found_llm_think_data(response[0]["content"]) answer = data except Exception as e: yield "大模型运行出现错误:{e}, 请检查配置是否正确或者网络是否通畅!\n如果是本地大模型还请检查大模型是否正常启动!" return answer #KAgent chat内部使用函数,非流输出 def do_chat_agent_nostream(chat): question = chat["req"] mode = chat["mode"] prompt = chat["prompt"] history = chat["history"] context = chat["ctx"] doc_list = chat["doc_list"] # 获取当前日期和星期 today = datetime.datetime.now() # 获取今天是星期几,使用weekday()方法(周一=0, 周日=6) weekday_number = today.weekday() now=f"{today.strftime('%Y-%m-%d %H:%M:%S')} 星期{weekday_number+1}" system_prompt = prompt.format(**{"query":question,"question":question,"context":context,"now":now,"history":history,"doc_list":doc_list}) messages = [{'role': 'user', 'content': system_prompt}] responses = llm.chat( messages=messages, stream=False, ) # get a new response from the model where it can see the function response return responses[0]["content"] #文件中心上传文件 #@app.post("/api/upload") async def upload_file(request:Request,curpath:str=Form(''), file: UploadFile = File(...)): try: username = request.cookies.get("username", "匿名用户") username = decode_username(username) #上传文件空间 if not os.path.exists(f'{gcfg["fs"]["path"]}/个人文件夹/{username}'): os.makedirs(f'{gcfg["fs"]["path"]}/个人文件夹/{username}') #去除空格 filename = regular_filename(file.filename) with open(f"{gcfg['fs']['path']}/个人文件夹/{username}/{filename}", "wb") as buf: shutil.copyfileobj(file.file, buf) fsize = buf.tell() except Exception as e: return {"errno":1,"message":f"上传出错: {e}"} #建立索引和文件记录 c_or_u_fs_index(filename,f"{gcfg['fs']['path']}/个人文件夹/{username}/{filename}",fsize,username) return {"errno":0,"file":{"name":filename,"path":f"{gcfg['fs']['path']}/个人文件夹/{username}/{filename}","base":"文件中心"}} #对外提供的大模型对外服务 def chat_completions(chat_req:dict,req:Request): #先认证 auth_header = req.headers.get("Authorization") if not auth_header: raise HTTPException(status_code=401, detail="Authorization header missing") # 示例:Bearer token if not auth_header.startswith("Bearer "): raise HTTPException(status_code=401, detail="Invalid authorization scheme") token = auth_header.split(" ")[1] # 提取 token try: #print(token) import base64 sn,username_base64 = token.split(":") username = base64.b64decode(username_base64.encode("utf-8")).decode("utf-8") except: raise HTTPException(status_code=401, detail="api_key is error") #再内容 #chat_req = await req.json() print(chat_req) if 'messages' not in chat_req or not chat_req['messages']: raise HTTPException(status_code=501, detail="message format error") question = "" for msg in chat_req['messages']: if msg["role"]=="user": if isinstance(msg["content"],list): #列表类型 for content in msg["content"]: if content["type"]=="text": question += content["text"] elif isinstance(msg["content"],str): question += msg["content"] elif msg["role"]=="system": if isinstance(msg["content"],list):#列表类型 for content in msg["content"]: if content["type"]=="text": question = content["text"] elif isinstance(msg["content"],str): question = msg["content"] chat = Chat(kagent_sn=sn,question=question) chat_info = chat_kagent(chat) userid = "50000" client_ip = get_client_ip(req) chat_id = chat_info["chat_id"] if "stream" in chat_req and chat_req["stream"]: #流式输出 return StreamingResponse(sse_do_response( do_chat_agent(chat_id,userid,username,client_ip,True), chat_id, "openai"), media_type="text/event-stream") #非流式输出,取得最后的结果 try: anwser = "" gen = do_chat_agent(chat_id,userid,username,client_ip,True) while True: next(gen) except StopIteration as e: anwser = e.value messages=[] messages.append({"message":{"role":"assistant","content":anwser}}) response={"id":chat_id,"object":"chat","choices":messages} print(response) return response #当前用户的名字和IP def get_user_info(req): # 从请求中获取 Cookie userid = req.cookies.get("userid", "id-00001") username = req.cookies.get("username", "匿名用户") username = decode_username(username) client_ip = get_client_ip(req) return username,client_ip #百科的保存 class New_BaiKe(BaseModel): id: int = Field(None, example="你好,世界!") title: str = Field(..., example="你好,世界!") catalog: str = Field(..., example="你好,世界!") html: str = Field(..., example="你好,世界!") style: Optional[Dict[str, Any]] = None #生成样式 trans: Optional[bool] = True #百科的保存 #@app.post("/api/baike") def new_baike(baike:New_BaiKe,req:Request): username,client = get_user_info(req) #增加个人空间的知识百科 if baike.catalog=="个人空间": baike.catalog = f"个人空间/{username}" if baike.trans: from parse_html import trans_html_to_format #print(baike.html) #print("============================================") baike.html = trans_html_to_format(baike.html) if baike.id and baike.id !=0: #更新 # query = BaiKe.select(BaiKe.full_id).where(BaiKe.id == baike.id) # old_full_id = query.scalar() # baike_u = BaiKe(id=baike.id) # try: # full_id = c_or_u_baike_index(baike.html,baike.title,baike.id,baike_u.full_id,baike.catalog) # except: # full_id = baike_u.full_id # baike_u.html = baike.html # baike_u.title = baike.title # baike_u.catalog = baike.catalog # baike_u.m_time = datetime.datetime.now() # baike_u.full_id = full_id # baike_u.save(only=[BaiKe.html,BaiKe.title,BaiKe.catalog,BaiKe.m_time,BaiKe.full_id]) #这个可以和上面的效果一样 c_or_u_baike_index(baike.html,baike.title,baike.id,baike.catalog) BaiKe.update(title=baike.title,catalog=baike.catalog,html=baike.html,m_time=datetime.datetime.now(),modifier=username).where(BaiKe.id == baike.id).execute() else: #新建 baike=BaiKe.create(title=baike.title,catalog=baike.catalog,html=baike.html,creator=username,modifier=username) c_or_u_baike_index(baike.html,baike.title,baike.id,baike.catalog) #Operate_Log(req,"新建",f'[百科] id:{baike.id},标题:{baike.title}') # baike.full_id = full_id # baike.save(only=[BaiKe.full_id]) return {"msg":"OK","id":baike.id}