diff --git a/main/api.py b/main/api.py new file mode 100644 index 0000000..6b3907e --- /dev/null +++ b/main/api.py @@ -0,0 +1,671 @@ +from typing import Union + +from fastapi import FastAPI,Query,UploadFile,File,HTTPException,Request,Path,Form +from fastapi.responses import StreamingResponse,FileResponse,RedirectResponse +from fastapi.staticfiles import StaticFiles + +from pydantic import BaseModel,Field +from typing import Optional,List,Dict,Any +import json +import time +import datetime +import json5 + +import shutil + +############################ 模块代码 ######################## +#全文检索 +from full_index_search import full_search,doc_search,c_or_u_baike_index,c_or_u_fs_index,one_doc_search,delete_index_by_docid,adjust_ctx_size,full_search_by_baike_catalog,full_search_by_doc_list + +#初始化信息 +from init import * + +from utils import regular_filename + +#文件仓库 +from ff import sftp_list_directory,smb_list_directory,linux_list_directory,ftp_list_directory,create_doc_cache + + +#数据库 +from k_database import QA,BaiKe,BaiKe_Catalog,Doc,KAgent + +# +from peewee import fn + +from agent_chat import Agent_chat_build + +#得到客户端ip +def get_client_ip(request): + x_forwarded_for = request.headers.get("X-Forwarded-For") + if x_forwarded_for: + client_ip = x_forwarded_for.split(",")[0] + else: + # 如果没有 X-Forwarded-For,尝试从 X-Real-IP 获取 + client_ip = request.headers.get("X-Real-IP") + + # 如果以上都没有,使用 request.client.host + if not client_ip: + client_ip = request.client.host + return client_ip + + +#KAgent的sn查询,对外使用 +#@app.get("/api/kagent_cfg/{kasn}") +def get_kagent_by_sn(kasn: str,request:Request): + + if kasn=='superman': + title,demo,atype,guide,files = "超级个人助理","AI革命数字永生",3,"你好!我是你的超级个人助手!我可以调用我的发布和我的订阅智能体来帮你完成任务规划和执行,有事不决请问我!","S" + else: + #根据sn获得知识体的详细信息 + try: + query = KAgent.select(KAgent.title,KAgent.demo,KAgent.atype,KAgent.guide,KAgent.files).where(KAgent.kasn==kasn) + title,demo,atype,guide,files = query.scalar(as_tuple=True) + except: + return {"title":"未知应用","guide":"未能正确识别SN",'atype':"","files":"","demo":"SN不正确,无法访问应用"} + + username = request.cookies.get("username", "匿名用户") + username = decode_username(username) + + cfg = {"title":title,"guide":guide,'atype':atype,"files":files,"demo":demo,"username":username} + #钉钉参数 + if gcfg["dingtalk"]["app_key"]!="": + dingtalk={"client_id":gcfg["dingtalk"]["app_key"], + "close_login":gcfg["dingtalk"]["exclusiveLogin"], + "corpid":gcfg["dingtalk"]["exclusiveCorpId"], + } + cfg["dingtalk"]=dingtalk + #微信参数 + if gcfg["wechat"]["app_key"]!="": + cfg["wechat"] = gcfg["wechat"]["app_key"] + + #logo + cfg["logo"] = gcfg["fs"]["logo"] + #官网 + cfg["url"]=gcfg["fs"]["url"] + #服务码 + cfg["aicode"]=gcfg["fs"]["aicode"] + + #分类: + cfg["agent_catalogs"] = get_gcfg_mem()["agent_catalogs"] + + return cfg + +#KAgent对话 +#写入对话上下文 +def write_chat_context(chat_id,context): + with open(f"/tmp/k3gpt/{chat_id}","w+") as f: + json.dump(context,f,ensure_ascii=False) + +#删除对话上下文 +def delete_chat_by_id(chat_id): + try: + os.remove(f"/tmp/k3gpt/{chat_id}") + except: + pass + +#获取上下文 +def get_chat_context(chat_id): + try: + with open(f"/tmp/k3gpt/{chat_id}","r") as f: + context = json.load(f) + return context + except Exception as e: + raise Exception(f"上下问读取失败:{e}") + +from typing import Optional + +class Chat(BaseModel): + question: str = Field(..., example="你是谁?") + ai_mode: Optional[str] = Field("", example="对话模式") + base:Optional[str] = Field(None, example="知识库") + file: Optional[str] = Field(None, example="文件名") + path: Optional[str] = Field(None, example="路径") + history: Optional[list] = Field(None, example="历史对话") + kagent_sn: Optional[str] = Field(None, example="Agent的sn,识别码") + mem: Optional[list] = Field([], example="可选的短期记忆") + +#KAgent对话 +def chat_kagent(chat:Chat): + chat_id=f"chat_{time.time()}" + count=0 + ctx=[] #上下文信息 + + #根据sn获得知识体的详细信息 + if chat.kagent_sn=='superman': + title,atype,prompt,files = "超级个人助理",3,"# 你是一个超级个人助理,优先使用100来回答问题,回答不好的再使用工具和其它智能体,你可以根据成功案例和失败案例进行参考","S" + else: + query = KAgent.select(KAgent.title,KAgent.atype,KAgent.prompt,KAgent.files).where(KAgent.kasn==chat.kagent_sn) + title,atype,prompt,files = query.scalar(as_tuple=True) + + #上传文件 + upload_files=[] + if chat.base and chat.path: + upload_files.append([chat.base,chat.path]) + + if not files: + files="" + + #根据类型进行相应的处理 + if atype in [0,1]: + doc_list=[] + #全文检索相关知识 + if atype==0: + doc_list = files.split(",") + elif atype==1: #关键字检索文档列表 + #支持多个目录 + from functools import reduce + from operator import or_ + conditions =[] + for file in files.split(","): + conditions.append(Doc.abs_path.contains(file)) + + # 使用 reduce 将多个条件用 OR 合并 + combined_condition = reduce(or_, conditions) + query= (Doc + .select(Doc.base,Doc.abs_path) + .where( combined_condition ) + .order_by(Doc.f_up_at.desc()) + .limit(10000) + ) + for r in query: + doc_list.append(f"{r.base}_{r.abs_path}") + #end if + print("相关文档数量",len(doc_list)) + + #搜索相关的百科 + context0 = [] + + #确认是否有满足问题的百科数据 + context0 = full_search_by_baike_catalog(chat.question,f"智能体数据/{title}") + + #搜索文件 + context = full_search_by_doc_list(chat.question,doc_list) + + if len(context0)==0: #没有百科相关信息 + context = adjust_ctx_size(context) + count = len(context) + if chat.ai_mode.find("Detailed")==0: + for i in range(count): + ctx.append({"name":i,"path":i,"base":i,"ctx":context[i],"size":len(context[i])}) + write_chat_context(chat_id,{"title":title,"req":chat.question,"atype":atype,"ctx":context,"mem":chat.mem, + "prompt":prompt,"history":chat.history,"upload_files":upload_files}) + elif len(context0) < 10 or len(context) < 10: + #百科信息比较少的情况,合并知识百科的内容,进行回答 + context0.extend(context) + context = adjust_ctx_size(context0) + count = len(context) + if chat.ai_mode.find("Detailed")==0: + for i in range(count): + ctx.append({"name":i,"path":i,"base":i,"ctx":context[i],"size":len(context[i])}) + write_chat_context(chat_id,{"title":title,"req":chat.question,"atype":atype,"ctx":context,"mem":chat.mem, + "prompt":prompt,"history":chat.history,"upload_files":upload_files}) + else: + #百科的相关内容很多 + #依赖知识百科,二次回答,时间比较长 + + #第一步根据检索信息,得到大模型回答的结果 + context = adjust_ctx_size(context) + count = len(context) + answer = do_chat_agent_nostream({"title":title,"req":chat.question,"atype":atype,"ctx":context,"mem":chat.mem, + "prompt":prompt,"history":chat.history,"upload_files":upload_files}) + + #第二步,结合结合知识百科的信息,进行综合回答 + context = adjust_ctx_size(context0) + context.insert(0,f"搜索答案: {answer}") + count = len(context) + + prompt=""" +# 智能助手 +你是一个问答机器人的助手,请使用提供的上下问来回答问题. + +# 回答逻辑 +1. 上下文中标有知识百科的数据可信度很高,可以优先采纳 +2. 搜索答案是通过大模型RAG得到的答案 +3. 如果知识百科的数据和问题关联性很高时,可以讲百科的数据和搜索答案进行融合来回答 +4. 如果知识百科的数据和问题关联性不高时,可以直接使用搜索答案来回答 + +# 上下文 +{context} + + +# 问题 +{question} + +""" + + if chat.ai_mode.find("Detailed")==0: + for i in range(count): + ctx.append({"name":i,"path":i,"base":i,"ctx":context[i],"size":len(context[i])}) + write_chat_context(chat_id,{"title":title,"req":chat.question,"atype":atype,"ctx":context, + "prompt":prompt,"history":chat.history,"upload_files":upload_files,"mem":chat.mem}) + + else: + #非检索的、融合检索的、上传文件的 + if chat.mem: + ctx.append(chat.mem) + write_chat_context(chat_id,{"title":title,"req":chat.question,"atype":atype,"ctx":ctx, + "prompt":prompt,"ai_mode":chat.ai_mode, + "history":chat.history,"fun_def":files,"upload_files":upload_files}) + + return {"chat_id":chat_id,"count":count,"ctx":ctx} + + +def decode_username(username): + if username !="匿名用户": + # 原始字符串 + #raw_string = "b'\\xe9\\x83\\x9c'" + + # 提取字节部分 + byte_sequence = username[2:-1] # 去掉前后的 `b'` 和 `'` + + # 将转义的字节字符串转换为实际的字节对象 + byte_string = bytes(byte_sequence, 'utf-8').decode('unicode_escape').encode('latin1') + + # 解码为正常字符串 + username = byte_string.decode('utf-8') + return username + + +#KAgent对话的流输出,采用事件流机制,实时输出 +#@app.get("/api/chat_kagent/{chat_id}") +def chat_event(chat_id: str,request:Request): + userid = request.cookies.get("userid", "id-00001") + if userid=="id-00001": + return StreamingResponse(sse_do_response(do_chat_error(chat_id),chat_id, + "k3gpt"),media_type="text/event-stream") + username = request.cookies.get("username", "匿名用户") + username = decode_username(username) + + client_ip = get_client_ip(request) + + return StreamingResponse( + sse_do_response( + do_chat_agent(chat_id,userid,username,client_ip), + chat_id, + "k3gpt"), + media_type="text/event-stream") + + +def do_chat_error(chat_id): + delete_chat_by_id(chat_id) + yield "用户认证信息丢失,请重新认证后再访问!" + + +#SSE+不同的返回format() +def sse_do_response(gen,chat_id,format="k3gpt"): + if format=="k3gpt": + for content in gen: + if content.endswith("") and len(content)>5: + yield gen_k3gpt_result(content[0:-5]) + yield gen_k3gpt_result("") + elif content.startswith(""):#工具+数据 + data_start = content.find("") + yield gen_k3gpt_result2(content[6:data_start],content[data_start+6:]) + else: + yield gen_k3gpt_result(content) + else: + for content in gen: + if content.endswith("") and len(content)>5: + yield gen_openai_result(chat_id,content[0:-5]) + else: + yield gen_openai_result(chat_id,content) + + +#生成类OpenAI的流式diego结果 +def gen_openai_result(chat_id,data): + messages=[] + messages.append({"delta":{"role":"assistant","content":data}}) + response={"id":chat_id,"object":"chat","choices":messages} + json_str = "data: " + json.dumps(response) + "\n\n" + return json_str.encode("utf-8") + +#返回结果信息和UI保持一致,rsp为真正内容 +def gen_k3gpt_result(data): + json_data={} + json_data["rsp"] = data + json_str = "data: " + json.dumps(json_data) + "\n\n" + return json_str.encode("utf-8") + +#返回结果信息和UI保持一致,rsp为真正内容,tool_name为工具名称,用来处理chart和表格的json数据 +def gen_k3gpt_result2(tool_name,data): + json_data={} + json_data["rsp"] = data + json_data["type"] = tool_name + json_str = "data: " + json.dumps(json_data) + "\n\n" + return json_str.encode("utf-8") + + +def found_llm_think_data(data): + think="" + if data.find("")>=0 and data.find("")==-1: + #还未思考结束 + think = data[8:] + data = "" + elif data.find("")>=0 and data.find("")>0: + #思考结束 + end = data.find("") + think = data[8:end] + data = data[end+8:] + + if data.find("```json") >=0: + #找到json数据,只返回json数据 + begin = data.find("{") + end = data.rfind("}") + data = data[begin:end+1] + return think,data + + +#KAgent chat的主函数 +def do_chat_agent(chat_id,userid="10000",username="Admin",client_ip="127.0.0.1",delta_stream=False): + chat = get_chat_context(chat_id) + chat["chat_id"]=chat_id + title = chat["title"] + question = chat["req"] + atype = chat["atype"] #智能体类型 + prompt = chat["prompt"] + history = chat["history"] + context = chat["ctx"] + + answer = "" + if atype==3: + #融合智能检索模式 + agent_chat = Agent_chat_build(chat,delta_stream,username,userid) + answer = yield from agent_chat.do_chat_loop() + + else: + if "mem" in chat: + mem = chat["mem"] + else: + mem = [] + # 获取当前日期和星期 + today = datetime.datetime.now() + # 获取今天是星期几,使用weekday()方法(周一=0, 周日=6) + weekday_number = today.weekday() + + now=f"{today.strftime('%Y-%m-%d %H:%M:%S')} 星期{weekday_number+1}" + + #用户不用再添加部分内容 + if prompt.find("{context}")==-1 or prompt.find("{question}")==-1: + prompt +=""" +### 当前时间 +{now} +### 问题 +{question} +### 上下文 +{context} + """ + + upload_files = chat["upload_files"] + file_content="" + + if upload_files: + print("上传文件",upload_files) + #只取第一个文件 + f1 = [f'{upload_files[0][0]}_{upload_files[0][1]}'] + filename = upload_files[0][1] + file_contents = full_search_by_doc_list("",f1) + file_prompt = prompt +""" + ### 文件 + {filename} + ### 文件内容 + {file_content} + """ + + if len(file_contents)>1: + + answers = [] + for i,file_content in enumerate(file_contents): + print(f"分片[{i}], 大小: {len(file_content)}") + system_prompt = file_prompt.format(**{"query":question,"question":question,"context":context,"now":now,"file_content":file_content,"filename":filename}) + answer = yield from llm_ask(system_prompt,delta_stream) + answers.append(answer) + + #汇总答案 + sum_prompt = prompt +""" +# 上下文是文件的多个分片分别计算的到的结果,现在合并上下问,结合用户的问题,做最后的回答输出 +""" + system_prompt = sum_prompt.format(**{"query":question,"question":question,"context":answers,"now":now,"history":history}) + sum_answers = yield from llm_ask(system_prompt,delta_stream) + answer = answers + + elif len(file_contents)==1: + file_content = file_contents[0] + system_prompt = file_prompt.format(**{"query":question,"question":question,"context":context,"now":now,"history":history,"file_content":file_content,"filename":filename}) + answer = yield from llm_ask(system_prompt,delta_stream) + else: + yield f"没有找到文件的数据分片:{filename}" + else: + prompt +=""" +### 上下文2 +{mem} +""" + system_prompt = prompt.format(**{"query":question,"question":question,"context":context,"now":now,"history":history,"mem":mem}) + answer = yield from llm_ask(system_prompt,delta_stream) + + #删除文件 + QA.create(userid=userid,username=username, question=question,answer=answer,ref=f"知识体:{title}",client_ip=client_ip) + delete_chat_by_id(chat_id) + return answer + +#大模型回答, +def llm_ask(system_prompt,delta_stream): + messages = [{'role': 'user', 'content': system_prompt}] + responses = llm.chat( + messages=messages, + stream=True, + delta_stream=delta_stream + ) # get a new response from the model where it can see the function response + + answer="" + try: + + for response in responses: + #print(response[0]["content"]) + yield response[0]["content"] + if delta_stream==True: + answer +=response[0]["content"] + if not delta_stream: + think,data = found_llm_think_data(response[0]["content"]) + answer = data + except Exception as e: + yield "大模型运行出现错误:{e}, 请检查配置是否正确或者网络是否通畅!\n如果是本地大模型还请检查大模型是否正常启动!" + return answer + + +#KAgent chat内部使用函数,非流输出 +def do_chat_agent_nostream(chat): + question = chat["req"] + mode = chat["mode"] + prompt = chat["prompt"] + history = chat["history"] + context = chat["ctx"] + doc_list = chat["doc_list"] + + # 获取当前日期和星期 + today = datetime.datetime.now() + # 获取今天是星期几,使用weekday()方法(周一=0, 周日=6) + weekday_number = today.weekday() + + now=f"{today.strftime('%Y-%m-%d %H:%M:%S')} 星期{weekday_number+1}" + + system_prompt = prompt.format(**{"query":question,"question":question,"context":context,"now":now,"history":history,"doc_list":doc_list}) + + messages = [{'role': 'user', 'content': system_prompt}] + responses = llm.chat( + messages=messages, + stream=False, + ) # get a new response from the model where it can see the function response + + return responses[0]["content"] + + +#文件中心上传文件 +#@app.post("/api/upload") +async def upload_file(request:Request,curpath:str=Form(''), file: UploadFile = File(...)): + try: + username = request.cookies.get("username", "匿名用户") + username = decode_username(username) + #上传文件空间 + if not os.path.exists(f'{gcfg["fs"]["path"]}/个人文件夹/{username}'): + os.makedirs(f'{gcfg["fs"]["path"]}/个人文件夹/{username}') + + #去除空格 + filename = regular_filename(file.filename) + + with open(f"{gcfg['fs']['path']}/个人文件夹/{username}/{filename}", "wb") as buf: + shutil.copyfileobj(file.file, buf) + fsize = buf.tell() + except Exception as e: + return {"errno":1,"message":f"上传出错: {e}"} + + #建立索引和文件记录 + c_or_u_fs_index(filename,f"{gcfg['fs']['path']}/个人文件夹/{username}/{filename}",fsize,username) + + + return {"errno":0,"file":{"name":filename,"path":f"{gcfg['fs']['path']}/个人文件夹/{username}/{filename}","base":"文件中心"}} + +#对外提供的大模型对外服务 +def chat_completions(chat_req:dict,req:Request): + #先认证 + auth_header = req.headers.get("Authorization") + if not auth_header: + raise HTTPException(status_code=401, detail="Authorization header missing") + + # 示例:Bearer token + if not auth_header.startswith("Bearer "): + raise HTTPException(status_code=401, detail="Invalid authorization scheme") + + token = auth_header.split(" ")[1] # 提取 token + try: + #print(token) + import base64 + sn,username_base64 = token.split(":") + username = base64.b64decode(username_base64.encode("utf-8")).decode("utf-8") + except: + raise HTTPException(status_code=401, detail="api_key is error") + + #再内容 + #chat_req = await req.json() + print(chat_req) + if 'messages' not in chat_req or not chat_req['messages']: + raise HTTPException(status_code=501, detail="message format error") + + question = "" + for msg in chat_req['messages']: + if msg["role"]=="user": + if isinstance(msg["content"],list): #列表类型 + for content in msg["content"]: + if content["type"]=="text": + question += content["text"] + elif isinstance(msg["content"],str): + question += msg["content"] + elif msg["role"]=="system": + if isinstance(msg["content"],list):#列表类型 + for content in msg["content"]: + if content["type"]=="text": + question = content["text"] + elif isinstance(msg["content"],str): + question = msg["content"] + + chat = Chat(kagent_sn=sn,question=question) + chat_info = chat_kagent(chat) + + userid = "50000" + client_ip = get_client_ip(req) + chat_id = chat_info["chat_id"] + if "stream" in chat_req and chat_req["stream"]: #流式输出 + return StreamingResponse(sse_do_response( + do_chat_agent(chat_id,userid,username,client_ip,True), + chat_id, + "openai"), media_type="text/event-stream") + + #非流式输出,取得最后的结果 + try: + anwser = "" + gen = do_chat_agent(chat_id,userid,username,client_ip,True) + while True: + next(gen) + except StopIteration as e: + anwser = e.value + messages=[] + messages.append({"message":{"role":"assistant","content":anwser}}) + response={"id":chat_id,"object":"chat","choices":messages} + print(response) + return response + + +#当前用户的名字和IP +def get_user_info(req): + # 从请求中获取 Cookie + userid = req.cookies.get("userid", "id-00001") + username = req.cookies.get("username", "匿名用户") + username = decode_username(username) + client_ip = get_client_ip(req) + return username,client_ip + + +#百科的保存 +class New_BaiKe(BaseModel): + id: int = Field(None, example="你好,世界!") + title: str = Field(..., example="你好,世界!") + catalog: str = Field(..., example="你好,世界!") + html: str = Field(..., example="你好,世界!") + style: Optional[Dict[str, Any]] = None #生成样式 + trans: Optional[bool] = True + + + +#百科的保存 +#@app.post("/api/baike") +def new_baike(baike:New_BaiKe,req:Request): + + username,client = get_user_info(req) + + #增加个人空间的知识百科 + if baike.catalog=="个人空间": + baike.catalog = f"个人空间/{username}" + + if baike.trans: + from parse_html import trans_html_to_format + #print(baike.html) + #print("============================================") + baike.html = trans_html_to_format(baike.html) + + if baike.id and baike.id !=0: + + #更新 + # query = BaiKe.select(BaiKe.full_id).where(BaiKe.id == baike.id) + # old_full_id = query.scalar() + + + # baike_u = BaiKe(id=baike.id) + + # try: + # full_id = c_or_u_baike_index(baike.html,baike.title,baike.id,baike_u.full_id,baike.catalog) + # except: + # full_id = baike_u.full_id + + # baike_u.html = baike.html + # baike_u.title = baike.title + # baike_u.catalog = baike.catalog + # baike_u.m_time = datetime.datetime.now() + # baike_u.full_id = full_id + # baike_u.save(only=[BaiKe.html,BaiKe.title,BaiKe.catalog,BaiKe.m_time,BaiKe.full_id]) + + #这个可以和上面的效果一样 + c_or_u_baike_index(baike.html,baike.title,baike.id,baike.catalog) + BaiKe.update(title=baike.title,catalog=baike.catalog,html=baike.html,m_time=datetime.datetime.now(),modifier=username).where(BaiKe.id == baike.id).execute() + + else: + #新建 + baike=BaiKe.create(title=baike.title,catalog=baike.catalog,html=baike.html,creator=username,modifier=username) + + c_or_u_baike_index(baike.html,baike.title,baike.id,baike.catalog) + + #Operate_Log(req,"新建",f'[百科] id:{baike.id},标题:{baike.title}') + + # baike.full_id = full_id + # baike.save(only=[BaiKe.full_id]) + + + return {"msg":"OK","id":baike.id} \ No newline at end of file