Files
k3GPT/main/api.py

671 lines
24 KiB
Python
Raw Normal View History

2025-11-19 19:42:46 +08:00
from typing import Union
from fastapi import FastAPI,Query,UploadFile,File,HTTPException,Request,Path,Form
from fastapi.responses import StreamingResponse,FileResponse,RedirectResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel,Field
from typing import Optional,List,Dict,Any
import json
import time
import datetime
import json5
import shutil
############################ 模块代码 ########################
#全文检索
from full_index_search import full_search,doc_search,c_or_u_baike_index,c_or_u_fs_index,one_doc_search,delete_index_by_docid,adjust_ctx_size,full_search_by_baike_catalog,full_search_by_doc_list
#初始化信息
from init import *
from utils import regular_filename
#文件仓库
from ff import sftp_list_directory,smb_list_directory,linux_list_directory,ftp_list_directory,create_doc_cache
#数据库
from k_database import QA,BaiKe,BaiKe_Catalog,Doc,KAgent
#
from peewee import fn
from agent_chat import Agent_chat_build
#得到客户端ip
def get_client_ip(request):
x_forwarded_for = request.headers.get("X-Forwarded-For")
if x_forwarded_for:
client_ip = x_forwarded_for.split(",")[0]
else:
# 如果没有 X-Forwarded-For尝试从 X-Real-IP 获取
client_ip = request.headers.get("X-Real-IP")
# 如果以上都没有,使用 request.client.host
if not client_ip:
client_ip = request.client.host
return client_ip
#KAgent的sn查询,对外使用
#@app.get("/api/kagent_cfg/{kasn}")
def get_kagent_by_sn(kasn: str,request:Request):
if kasn=='superman':
title,demo,atype,guide,files = "超级个人助理","AI革命数字永生",3,"你好!我是你的超级个人助手!我可以调用我的发布和我的订阅智能体来帮你完成任务规划和执行,有事不决请问我!","S"
else:
#根据sn获得知识体的详细信息
try:
query = KAgent.select(KAgent.title,KAgent.demo,KAgent.atype,KAgent.guide,KAgent.files).where(KAgent.kasn==kasn)
title,demo,atype,guide,files = query.scalar(as_tuple=True)
except:
return {"title":"未知应用","guide":"未能正确识别SN",'atype':"","files":"","demo":"SN不正确,无法访问应用"}
username = request.cookies.get("username", "匿名用户")
username = decode_username(username)
cfg = {"title":title,"guide":guide,'atype':atype,"files":files,"demo":demo,"username":username}
#钉钉参数
if gcfg["dingtalk"]["app_key"]!="":
dingtalk={"client_id":gcfg["dingtalk"]["app_key"],
"close_login":gcfg["dingtalk"]["exclusiveLogin"],
"corpid":gcfg["dingtalk"]["exclusiveCorpId"],
}
cfg["dingtalk"]=dingtalk
#微信参数
if gcfg["wechat"]["app_key"]!="":
cfg["wechat"] = gcfg["wechat"]["app_key"]
#logo
cfg["logo"] = gcfg["fs"]["logo"]
#官网
cfg["url"]=gcfg["fs"]["url"]
#服务码
cfg["aicode"]=gcfg["fs"]["aicode"]
#分类:
cfg["agent_catalogs"] = get_gcfg_mem()["agent_catalogs"]
return cfg
#KAgent对话
#写入对话上下文
def write_chat_context(chat_id,context):
with open(f"/tmp/k3gpt/{chat_id}","w+") as f:
json.dump(context,f,ensure_ascii=False)
#删除对话上下文
def delete_chat_by_id(chat_id):
try:
os.remove(f"/tmp/k3gpt/{chat_id}")
except:
pass
#获取上下文
def get_chat_context(chat_id):
try:
with open(f"/tmp/k3gpt/{chat_id}","r") as f:
context = json.load(f)
return context
except Exception as e:
raise Exception(f"上下问读取失败:{e}")
from typing import Optional
class Chat(BaseModel):
question: str = Field(..., example="你是谁?")
ai_mode: Optional[str] = Field("", example="对话模式")
base:Optional[str] = Field(None, example="知识库")
file: Optional[str] = Field(None, example="文件名")
path: Optional[str] = Field(None, example="路径")
history: Optional[list] = Field(None, example="历史对话")
kagent_sn: Optional[str] = Field(None, example="Agent的sn,识别码")
mem: Optional[list] = Field([], example="可选的短期记忆")
#KAgent对话
def chat_kagent(chat:Chat):
chat_id=f"chat_{time.time()}"
count=0
ctx=[] #上下文信息
#根据sn获得知识体的详细信息
if chat.kagent_sn=='superman':
title,atype,prompt,files = "超级个人助理",3,"# 你是一个超级个人助理,优先使用100来回答问题,回答不好的再使用工具和其它智能体,你可以根据成功案例和失败案例进行参考","S"
else:
query = KAgent.select(KAgent.title,KAgent.atype,KAgent.prompt,KAgent.files).where(KAgent.kasn==chat.kagent_sn)
title,atype,prompt,files = query.scalar(as_tuple=True)
#上传文件
upload_files=[]
if chat.base and chat.path:
upload_files.append([chat.base,chat.path])
if not files:
files=""
#根据类型进行相应的处理
if atype in [0,1]:
doc_list=[]
#全文检索相关知识
if atype==0:
doc_list = files.split(",")
elif atype==1: #关键字检索文档列表
#支持多个目录
from functools import reduce
from operator import or_
conditions =[]
for file in files.split(","):
conditions.append(Doc.abs_path.contains(file))
# 使用 reduce 将多个条件用 OR 合并
combined_condition = reduce(or_, conditions)
query= (Doc
.select(Doc.base,Doc.abs_path)
.where( combined_condition )
.order_by(Doc.f_up_at.desc())
.limit(10000)
)
for r in query:
doc_list.append(f"{r.base}_{r.abs_path}")
#end if
print("相关文档数量",len(doc_list))
#搜索相关的百科
context0 = []
#确认是否有满足问题的百科数据
context0 = full_search_by_baike_catalog(chat.question,f"智能体数据/{title}")
#搜索文件
context = full_search_by_doc_list(chat.question,doc_list)
if len(context0)==0: #没有百科相关信息
context = adjust_ctx_size(context)
count = len(context)
if chat.ai_mode.find("Detailed")==0:
for i in range(count):
ctx.append({"name":i,"path":i,"base":i,"ctx":context[i],"size":len(context[i])})
write_chat_context(chat_id,{"title":title,"req":chat.question,"atype":atype,"ctx":context,"mem":chat.mem,
"prompt":prompt,"history":chat.history,"upload_files":upload_files})
elif len(context0) < 10 or len(context) < 10:
#百科信息比较少的情况,合并知识百科的内容,进行回答
context0.extend(context)
context = adjust_ctx_size(context0)
count = len(context)
if chat.ai_mode.find("Detailed")==0:
for i in range(count):
ctx.append({"name":i,"path":i,"base":i,"ctx":context[i],"size":len(context[i])})
write_chat_context(chat_id,{"title":title,"req":chat.question,"atype":atype,"ctx":context,"mem":chat.mem,
"prompt":prompt,"history":chat.history,"upload_files":upload_files})
else:
#百科的相关内容很多
#依赖知识百科,二次回答,时间比较长
#第一步根据检索信息,得到大模型回答的结果
context = adjust_ctx_size(context)
count = len(context)
answer = do_chat_agent_nostream({"title":title,"req":chat.question,"atype":atype,"ctx":context,"mem":chat.mem,
"prompt":prompt,"history":chat.history,"upload_files":upload_files})
#第二步,结合结合知识百科的信息,进行综合回答
context = adjust_ctx_size(context0)
context.insert(0,f"搜索答案: {answer}")
count = len(context)
prompt="""
# 智能助手
你是一个问答机器人的助手,请使用提供的上下问来回答问题.
# 回答逻辑
1. 上下文中标有知识百科的数据可信度很高可以优先采纳
2. 搜索答案是通过大模型RAG得到的答案
3. 如果知识百科的数据和问题关联性很高时可以讲百科的数据和搜索答案进行融合来回答
4. 如果知识百科的数据和问题关联性不高时可以直接使用搜索答案来回答
# 上下文
{context}
# 问题
{question}
"""
if chat.ai_mode.find("Detailed")==0:
for i in range(count):
ctx.append({"name":i,"path":i,"base":i,"ctx":context[i],"size":len(context[i])})
write_chat_context(chat_id,{"title":title,"req":chat.question,"atype":atype,"ctx":context,
"prompt":prompt,"history":chat.history,"upload_files":upload_files,"mem":chat.mem})
else:
#非检索的、融合检索的、上传文件的
if chat.mem:
ctx.append(chat.mem)
write_chat_context(chat_id,{"title":title,"req":chat.question,"atype":atype,"ctx":ctx,
"prompt":prompt,"ai_mode":chat.ai_mode,
"history":chat.history,"fun_def":files,"upload_files":upload_files})
return {"chat_id":chat_id,"count":count,"ctx":ctx}
def decode_username(username):
if username !="匿名用户":
# 原始字符串
#raw_string = "b'\\xe9\\x83\\x9c'"
# 提取字节部分
byte_sequence = username[2:-1] # 去掉前后的 `b'` 和 `'`
# 将转义的字节字符串转换为实际的字节对象
byte_string = bytes(byte_sequence, 'utf-8').decode('unicode_escape').encode('latin1')
# 解码为正常字符串
username = byte_string.decode('utf-8')
return username
#KAgent对话的流输出采用事件流机制实时输出
#@app.get("/api/chat_kagent/{chat_id}")
def chat_event(chat_id: str,request:Request):
userid = request.cookies.get("userid", "id-00001")
if userid=="id-00001":
return StreamingResponse(sse_do_response(do_chat_error(chat_id),chat_id,
"k3gpt"),media_type="text/event-stream")
username = request.cookies.get("username", "匿名用户")
username = decode_username(username)
client_ip = get_client_ip(request)
return StreamingResponse(
sse_do_response(
do_chat_agent(chat_id,userid,username,client_ip),
chat_id,
"k3gpt"),
media_type="text/event-stream")
def do_chat_error(chat_id):
delete_chat_by_id(chat_id)
yield "用户认证信息丢失,请重新认证后再访问!"
#SSE+不同的返回format()
def sse_do_response(gen,chat_id,format="k3gpt"):
if format=="k3gpt":
for content in gen:
if content.endswith("<new>") and len(content)>5:
yield gen_k3gpt_result(content[0:-5])
yield gen_k3gpt_result("<new>")
elif content.startswith("<tool>"):#工具+数据
data_start = content.find("<data>")
yield gen_k3gpt_result2(content[6:data_start],content[data_start+6:])
else:
yield gen_k3gpt_result(content)
else:
for content in gen:
if content.endswith("<new>") and len(content)>5:
yield gen_openai_result(chat_id,content[0:-5])
else:
yield gen_openai_result(chat_id,content)
#生成类OpenAI的流式diego结果
def gen_openai_result(chat_id,data):
messages=[]
messages.append({"delta":{"role":"assistant","content":data}})
response={"id":chat_id,"object":"chat","choices":messages}
json_str = "data: " + json.dumps(response) + "\n\n"
return json_str.encode("utf-8")
#返回结果信息和UI保持一致rsp为真正内容
def gen_k3gpt_result(data):
json_data={}
json_data["rsp"] = data
json_str = "data: " + json.dumps(json_data) + "\n\n"
return json_str.encode("utf-8")
#返回结果信息和UI保持一致rsp为真正内容,tool_name为工具名称,用来处理chart和表格的json数据
def gen_k3gpt_result2(tool_name,data):
json_data={}
json_data["rsp"] = data
json_data["type"] = tool_name
json_str = "data: " + json.dumps(json_data) + "\n\n"
return json_str.encode("utf-8")
def found_llm_think_data(data):
think=""
if data.find("<think>")>=0 and data.find("</think>")==-1:
#还未思考结束
think = data[8:]
data = ""
elif data.find("<think>")>=0 and data.find("</think>")>0:
#思考结束
end = data.find("</think>")
think = data[8:end]
data = data[end+8:]
if data.find("```json") >=0:
#找到json数据只返回json数据
begin = data.find("{")
end = data.rfind("}")
data = data[begin:end+1]
return think,data
#KAgent chat的主函数
def do_chat_agent(chat_id,userid="10000",username="Admin",client_ip="127.0.0.1",delta_stream=False):
chat = get_chat_context(chat_id)
chat["chat_id"]=chat_id
title = chat["title"]
question = chat["req"]
atype = chat["atype"] #智能体类型
prompt = chat["prompt"]
history = chat["history"]
context = chat["ctx"]
answer = ""
if atype==3:
#融合智能检索模式
agent_chat = Agent_chat_build(chat,delta_stream,username,userid)
answer = yield from agent_chat.do_chat_loop()
else:
if "mem" in chat:
mem = chat["mem"]
else:
mem = []
# 获取当前日期和星期
today = datetime.datetime.now()
# 获取今天是星期几使用weekday()方法(周一=0, 周日=6
weekday_number = today.weekday()
now=f"{today.strftime('%Y-%m-%d %H:%M:%S')} 星期{weekday_number+1}"
#用户不用再添加部分内容
if prompt.find("{context}")==-1 or prompt.find("{question}")==-1:
prompt +="""
### 当前时间
{now}
### 问题
{question}
### 上下文
{context}
"""
upload_files = chat["upload_files"]
file_content=""
if upload_files:
print("上传文件",upload_files)
#只取第一个文件
f1 = [f'{upload_files[0][0]}_{upload_files[0][1]}']
filename = upload_files[0][1]
file_contents = full_search_by_doc_list("",f1)
file_prompt = prompt +"""
### 文件
{filename}
### 文件内容
{file_content}
"""
if len(file_contents)>1:
answers = []
for i,file_content in enumerate(file_contents):
print(f"分片[{i}], 大小: {len(file_content)}")
system_prompt = file_prompt.format(**{"query":question,"question":question,"context":context,"now":now,"file_content":file_content,"filename":filename})
answer = yield from llm_ask(system_prompt,delta_stream)
answers.append(answer)
#汇总答案
sum_prompt = prompt +"""
# 上下文是文件的多个分片分别计算的到的结果,现在合并上下问,结合用户的问题,做最后的回答输出
"""
system_prompt = sum_prompt.format(**{"query":question,"question":question,"context":answers,"now":now,"history":history})
sum_answers = yield from llm_ask(system_prompt,delta_stream)
answer = answers
elif len(file_contents)==1:
file_content = file_contents[0]
system_prompt = file_prompt.format(**{"query":question,"question":question,"context":context,"now":now,"history":history,"file_content":file_content,"filename":filename})
answer = yield from llm_ask(system_prompt,delta_stream)
else:
yield f"没有找到文件的数据分片:{filename}"
else:
prompt +="""
### 上下文2
{mem}
"""
system_prompt = prompt.format(**{"query":question,"question":question,"context":context,"now":now,"history":history,"mem":mem})
answer = yield from llm_ask(system_prompt,delta_stream)
#删除文件
QA.create(userid=userid,username=username, question=question,answer=answer,ref=f"知识体:{title}",client_ip=client_ip)
delete_chat_by_id(chat_id)
return answer
#大模型回答,
def llm_ask(system_prompt,delta_stream):
messages = [{'role': 'user', 'content': system_prompt}]
responses = llm.chat(
messages=messages,
stream=True,
delta_stream=delta_stream
) # get a new response from the model where it can see the function response
answer=""
try:
for response in responses:
#print(response[0]["content"])
yield response[0]["content"]
if delta_stream==True:
answer +=response[0]["content"]
if not delta_stream:
think,data = found_llm_think_data(response[0]["content"])
answer = data
except Exception as e:
yield "大模型运行出现错误:{e}, 请检查配置是否正确或者网络是否通畅!\n如果是本地大模型还请检查大模型是否正常启动!"
return answer
#KAgent chat内部使用函数非流输出
def do_chat_agent_nostream(chat):
question = chat["req"]
mode = chat["mode"]
prompt = chat["prompt"]
history = chat["history"]
context = chat["ctx"]
doc_list = chat["doc_list"]
# 获取当前日期和星期
today = datetime.datetime.now()
# 获取今天是星期几使用weekday()方法(周一=0, 周日=6
weekday_number = today.weekday()
now=f"{today.strftime('%Y-%m-%d %H:%M:%S')} 星期{weekday_number+1}"
system_prompt = prompt.format(**{"query":question,"question":question,"context":context,"now":now,"history":history,"doc_list":doc_list})
messages = [{'role': 'user', 'content': system_prompt}]
responses = llm.chat(
messages=messages,
stream=False,
) # get a new response from the model where it can see the function response
return responses[0]["content"]
#文件中心上传文件
#@app.post("/api/upload")
async def upload_file(request:Request,curpath:str=Form(''), file: UploadFile = File(...)):
try:
username = request.cookies.get("username", "匿名用户")
username = decode_username(username)
#上传文件空间
if not os.path.exists(f'{gcfg["fs"]["path"]}/个人文件夹/{username}'):
os.makedirs(f'{gcfg["fs"]["path"]}/个人文件夹/{username}')
#去除空格
filename = regular_filename(file.filename)
with open(f"{gcfg['fs']['path']}/个人文件夹/{username}/{filename}", "wb") as buf:
shutil.copyfileobj(file.file, buf)
fsize = buf.tell()
except Exception as e:
return {"errno":1,"message":f"上传出错: {e}"}
#建立索引和文件记录
c_or_u_fs_index(filename,f"{gcfg['fs']['path']}/个人文件夹/{username}/{filename}",fsize,username)
return {"errno":0,"file":{"name":filename,"path":f"{gcfg['fs']['path']}/个人文件夹/{username}/{filename}","base":"文件中心"}}
#对外提供的大模型对外服务
def chat_completions(chat_req:dict,req:Request):
#先认证
auth_header = req.headers.get("Authorization")
if not auth_header:
raise HTTPException(status_code=401, detail="Authorization header missing")
# 示例Bearer token
if not auth_header.startswith("Bearer "):
raise HTTPException(status_code=401, detail="Invalid authorization scheme")
token = auth_header.split(" ")[1] # 提取 token
try:
#print(token)
import base64
sn,username_base64 = token.split(":")
username = base64.b64decode(username_base64.encode("utf-8")).decode("utf-8")
except:
raise HTTPException(status_code=401, detail="api_key is error")
#再内容
#chat_req = await req.json()
print(chat_req)
if 'messages' not in chat_req or not chat_req['messages']:
raise HTTPException(status_code=501, detail="message format error")
question = ""
for msg in chat_req['messages']:
if msg["role"]=="user":
if isinstance(msg["content"],list): #列表类型
for content in msg["content"]:
if content["type"]=="text":
question += content["text"]
elif isinstance(msg["content"],str):
question += msg["content"]
elif msg["role"]=="system":
if isinstance(msg["content"],list):#列表类型
for content in msg["content"]:
if content["type"]=="text":
question = content["text"]
elif isinstance(msg["content"],str):
question = msg["content"]
chat = Chat(kagent_sn=sn,question=question)
chat_info = chat_kagent(chat)
userid = "50000"
client_ip = get_client_ip(req)
chat_id = chat_info["chat_id"]
if "stream" in chat_req and chat_req["stream"]: #流式输出
return StreamingResponse(sse_do_response(
do_chat_agent(chat_id,userid,username,client_ip,True),
chat_id,
"openai"), media_type="text/event-stream")
#非流式输出,取得最后的结果
try:
anwser = ""
gen = do_chat_agent(chat_id,userid,username,client_ip,True)
while True:
next(gen)
except StopIteration as e:
anwser = e.value
messages=[]
messages.append({"message":{"role":"assistant","content":anwser}})
response={"id":chat_id,"object":"chat","choices":messages}
print(response)
return response
#当前用户的名字和IP
def get_user_info(req):
# 从请求中获取 Cookie
userid = req.cookies.get("userid", "id-00001")
username = req.cookies.get("username", "匿名用户")
username = decode_username(username)
client_ip = get_client_ip(req)
return username,client_ip
#百科的保存
class New_BaiKe(BaseModel):
id: int = Field(None, example="你好,世界!")
title: str = Field(..., example="你好,世界!")
catalog: str = Field(..., example="你好,世界!")
html: str = Field(..., example="你好,世界!")
style: Optional[Dict[str, Any]] = None #生成样式
trans: Optional[bool] = True
#百科的保存
#@app.post("/api/baike")
def new_baike(baike:New_BaiKe,req:Request):
username,client = get_user_info(req)
#增加个人空间的知识百科
if baike.catalog=="个人空间":
baike.catalog = f"个人空间/{username}"
if baike.trans:
from parse_html import trans_html_to_format
#print(baike.html)
#print("============================================")
baike.html = trans_html_to_format(baike.html)
if baike.id and baike.id !=0:
#更新
# query = BaiKe.select(BaiKe.full_id).where(BaiKe.id == baike.id)
# old_full_id = query.scalar()
# baike_u = BaiKe(id=baike.id)
# try:
# full_id = c_or_u_baike_index(baike.html,baike.title,baike.id,baike_u.full_id,baike.catalog)
# except:
# full_id = baike_u.full_id
# baike_u.html = baike.html
# baike_u.title = baike.title
# baike_u.catalog = baike.catalog
# baike_u.m_time = datetime.datetime.now()
# baike_u.full_id = full_id
# baike_u.save(only=[BaiKe.html,BaiKe.title,BaiKe.catalog,BaiKe.m_time,BaiKe.full_id])
#这个可以和上面的效果一样
c_or_u_baike_index(baike.html,baike.title,baike.id,baike.catalog)
BaiKe.update(title=baike.title,catalog=baike.catalog,html=baike.html,m_time=datetime.datetime.now(),modifier=username).where(BaiKe.id == baike.id).execute()
else:
#新建
baike=BaiKe.create(title=baike.title,catalog=baike.catalog,html=baike.html,creator=username,modifier=username)
c_or_u_baike_index(baike.html,baike.title,baike.id,baike.catalog)
#Operate_Log(req,"新建",f'[百科] id:{baike.id},标题:{baike.title}')
# baike.full_id = full_id
# baike.save(only=[BaiKe.full_id])
return {"msg":"OK","id":baike.id}