from typing import Union from fastapi import FastAPI,Query,UploadFile,File,HTTPException,Request,Path,Form from fastapi.responses import StreamingResponse,FileResponse,RedirectResponse,JSONResponse from fastapi.staticfiles import StaticFiles from pydantic import BaseModel,Field from typing import Optional,List,Dict,Any from typing import Annotated import json import time import datetime import json5 from utils import * import shutil ############################ 模块代码 ######################## #初始化信息,必须放在前面 from init import * #全文检索 from full_index_search import full_search,doc_search,c_or_u_baike_index,c_or_u_fs_index,one_doc_search,delete_index_by_docid,adjust_ctx_size,delete_fs_index_by_base_path,full_search_by_doc_list #文件仓库 from ff import sftp_list_directory,smb_list_directory,linux_list_directory,ftp_list_directory,create_doc_cache,ding_list_directory # from dingtalk import * #数据库 from k_database import QA,BaiKe,BaiKe_Catalog,Doc,KAgent,User,Login,SuperManAgent # from peewee import fn #应用 app = FastAPI() from api import * # 定义不需要保护的 URL 列表 UNPROTECTED_URLS = ["/", "/ui/login.html", "/api/login_cfg", "/api/login","/api/dingtalk_auth", #"/api/baike","/api/kagent", "/api/i","/ui/im","/ui/js", "/v1/chat/completions"] # 中间件:Cookie 验证 @app.middleware("http") async def cookie_validation_middleware(request: Request, call_next): # 检查请求路径是否在不受保护的列表中 if request.url.path in UNPROTECTED_URLS or request.url.path[0:6] in UNPROTECTED_URLS: # 如果是不受保护的路径,直接跳过验证 response = await call_next(request) return response # 从请求中获取 Cookie session_id = request.cookies.get("session_id") # 检查 Cookie 是否存在 if session_id is None: return RedirectResponse(url="/ui/login.html") # 检查是否存在 if not valid_session_id(session_id): return RedirectResponse(url="/ui/login.html") #return JSONResponse(status_code=403, content={"detail": "无效的会话 Cookie"}) # 如果验证通过,继续处理请求 response = await call_next(request) return response #上传文件 app.add_api_route("/api/upload",upload_file,methods=["POST"]) #获取智能体信息 app.add_api_route("/api/kagent_cfg/{kasn}",get_kagent_by_sn,methods=["GET"]) #对话 app.add_api_route("/api/chat_kagent",chat_kagent,methods=["POST"]) #对话相应 app.add_api_route("/api/chat_kagent/{chat_id}",chat_event,methods=["GET"]) #对话服务 app.add_api_route("/v1/chat/completions",chat_completions,methods=["POST"]) #百科保存 app.add_api_route("/api/baike",new_baike,methods=["POST"]) # from contextlib import asynccontextmanager # # 定义一个异步上下文管理器 # @asynccontextmanager # async def lifespan(app: FastAPI): # # 启动事件:在这里执行启动时需要的操作 # print("Starting up the application...") # yield # 这里是应用程序运行的地方 # # 关闭事件:在这里执行关闭时需要的操作 # print("Shutting down the application...") # # await database.disconnect() # # 将上下文管理器注册为应用程序的 lifespan 事件处理器 # app.router.lifespan_context = lifespan # 将'static'目录挂载到'/static'路径 app.mount("/ui", StaticFiles(directory="ui"), name="static") @app.get("/") def read_root(): return RedirectResponse(url="/ui/login.html") @app.get("/api/login_cfg") def login_cfg(req:Request): cfg = {"logo":gcfg["fs"]["logo"],"slogan":gcfg["fs"]["slogan"]} if "dingtalk_corpid" in gcfg["fs"] and gcfg["fs"]["dingtalk_corpid"]!="": dingtalk_admin = {"client_id":gcfg["dingtalk"]["app_key"], "cropid":gcfg["fs"]["dingtalk_corpid"], } cfg["dingtalk"] = dingtalk_admin username = req.cookies.get("username", "匿名用户") cfg["username"] = decode_username(username) return cfg session_timeout=3600*12 #login注册 @app.post("/api/login") async def login_admin(req:Request): login = await req.json() if login["telphone"]=="" or login["username"]=="": response = JSONResponse(content={"code":401,"msg":"用户名或密码不能为空"}) return response if login["username"]=="Admin" and hashed_password(login["telphone"])== gcfg["fs"]["pswd"]: session_id = generate_hashed_session_id() query = User.select().where(User.userid == "Admin") user = query.first() if not user: user=User.create(userid='Admin',username="管理员",telphone=encrypt_tel("12345"), title=hashed_tel("12345"),ref="帐号") response = JSONResponse(content={"code":200,"url":"/ui/index.html"}) response.set_cookie(key="session_id", value=session_id, httponly=True,max_age=session_timeout) response.set_cookie(key="userid", value="Admin", httponly=True,max_age=session_timeout) response.set_cookie(key="username", value="管理员".encode("utf8"), httponly=True,max_age=session_timeout) response.set_cookie(key="ref", value="帐号".encode("utf8"), httponly=True,max_age=session_timeout) else: response = JSONResponse(content={"code":403,"msg":"验证不通过"}) Login.create(userid="Admin",username="管理员",action="登录", pt=f"知识平台",ref="帐号",client_ip=get_client_ip(req)) return response #钉钉扫码登录 @app.get("/api/dingtalk_auth") def dingtalk_kagent(authCode:str,state:str,req:Request): # 定义钉钉应用的 AppKey 和 AppSecret app_key = gcfg["dingtalk"]["app_key"] app_secret = gcfg["dingtalk"]["app_secret"] # 第一步:获取 Access Token access_token = get_user_access_token(app_key, app_secret, authCode) #print(access_token) # 第二步:使用 AuthCode 获取用户信息 #验证码 user_info = get_user_details(access_token) #print(f"Basic User Info: {user_info}") if "email" not in user_info: user_info["email"]="" query = User.select().where(User.title == hashed_tel(user_info["mobile"])) user = query.first() if user: userid = user.userid else: import secrets userid= secrets.token_urlsafe(8) user=User.create(userid=userid,username=user_info["nick"], telphone=encrypt_tel(user_info["mobile"]), org=user_info["email"]+" "+user_info["unionId"], email=user_info["email"], title = hashed_tel(user_info["mobile"]), ref="钉钉") #百科个人空间 BaiKe_Catalog.create(catalog=f'个人空间/{user_info["nick"]}',demo="存放个人数据") #上传文件空间 if not os.path.exists(f'{gcfg["fs"]["path"]}/个人文件夹/{user_info["nick"]}'): os.makedirs(f'{gcfg["fs"]["path"]}/个人文件夹/{user_info["nick"]}') session_id = generate_hashed_session_id() response = RedirectResponse(url="/ui/index.html") response.set_cookie(key="session_id", value=session_id, httponly=True,max_age=session_timeout) response.set_cookie(key="userid", value=userid, httponly=True,max_age=session_timeout) response.set_cookie(key="username", value=user_info["nick"].encode("utf8"), httponly=True,max_age=session_timeout) response.set_cookie(key="ref", value="钉钉".encode("utf8"), httponly=True,) Login.create(userid=userid,username=user_info["nick"],action="登录", pt="知识平台",ref="钉钉",client_ip=get_client_ip(req)) return response @app.get("/api/logout") def logout_kagent(req:Request): response = RedirectResponse(url="/ui/login.html") # 从请求中获取 Cookie session_id = req.cookies.get("session_id") userid = req.cookies.get("userid", "id-00001") username = req.cookies.get("username", "匿名用户") username = decode_username(username) ref = req.cookies.get("ref", "帐号") ref = decode_username(ref) Login.create(userid=userid,username=username,action="退出",pt="知识平台",client_ip=get_client_ip(req),ref=ref) delete_session_id(session_id) response.delete_cookie("username") response.delete_cookie("session_id") response.delete_cookie("userid") response.delete_cookie("ref") return response #重要的操作日志 def Operate_Log(req,action,info): # 从请求中获取 Cookie userid = req.cookies.get("userid", "id-00001") username = req.cookies.get("username", "匿名用户") username = decode_username(username) ref = req.cookies.get("ref", "帐号") ref = decode_username(ref) Login.create(userid=userid,username=username,action=action,pt=info,ref=ref,client_ip=get_client_ip(req)) #当前用户的名字和IP def get_user_info(req): # 从请求中获取 Cookie userid = req.cookies.get("userid", "id-00001") username = req.cookies.get("username", "匿名用户") username = decode_username(username) client_ip = get_client_ip(req) return username,client_ip @app.get("/api/is_admin") def is_admin(request:Request): # 从请求中获取 Cookie userid = request.cookies.get("userid") ret = False if userid=="Admin": ret = True return {"is_admin":ret} #文档搜索的主接口 @app.get("/api/search") def search(q: Union[str, None],req:Request): q1 = q.lower() #查询知识百科 r1=[] query= (BaiKe .select(BaiKe.id,BaiKe.title,BaiKe.catalog,BaiKe.c_time,BaiKe.m_time,BaiKe.view_count) .where((BaiKe.title.contains(q1))) .order_by(BaiKe.m_time.desc()) .limit(3) ) for r in query: content= f"知识百科: {r.title.lower()}".replace(q,f"{q}") r1.append({"id":r.id,"name":r.title,"path":str(r.id),"base":"知识百科","fdate":r.m_time,"content":content}) #查询文档列表 r0=[] query= (Doc .select(Doc.base,Doc.f_name,Doc.f_up_at,Doc.f_size,Doc.f_sec,Doc.abs_path) .where( Doc.abs_path.contains(q1) ) .order_by(Doc.f_up_at.desc()) .limit(5) ) for r in query: content= f"文件匹配: {r.abs_path.lower()}".replace(q,f"{q}") r0.append({"id":1000,"name":r.f_name,"path":r.abs_path,"base":r.base,"fdate":r.f_up_at,"content":content}) count,result= doc_search(q,50) Operate_Log(req,"搜索",f'[知识搜索] 关键词:{q},结果数:{count+len(r0)+len(r1)}') return {"key": q, "count":count+len(r0)+len(r1),"data":r1+r0+result} #知识库文档下载 @app.get("/api/fd") async def download_file(path:str,filename:str,base:str,req:Request): if base=="文件中心": file_path = path Operate_Log(req,"下载",f'[知识库文档] 知识库:文件中心 文件:{path} ') else: try: with open(f"{gcfg['fs']['path']}/conf/source.conf") as conf: source = json5.load(conf) except: source={"测试":{"name":"测试","type":"FTP","address":"192.168.0.1:21","user":"admin","password":"1","path":"home","scan":1,"count":1024}} file_path=create_doc_cache(path,source[base]) Operate_Log(req,"下载",f'[知识库文档] 知识库:{source[base]} 文件:{path} ') return FileResponse(path=file_path, media_type='application/octet-stream', filename=filename) #知识库文档点赞 @app.get("/api/fd/like") async def file_like(path:str,filename:str,base:str): q = (Doc .update({Doc.f_sec: Doc.f_sec +1 }) .where((Doc.abs_path==path) & (Doc.base==base)) ) q.execute() return {"code":200} #超级个人助理点赞 @app.put("/api/superman/like/{chat_id}") async def superman_like(chat_id:str): q = (SuperManAgent .update({SuperManAgent.human: 100 }) .where((SuperManAgent.chatid==chat_id)) ) q.execute() return {"code":200} #超级个人助理点踩 @app.put("/api/superman/dislike/{chat_id}") async def superman_like(chat_id:str): q = (SuperManAgent .update({SuperManAgent.human: -100 }) .where((SuperManAgent.chatid==chat_id)) ) q.execute() return {"code":200} #搜索对话 @app.post("/api/chat_search") def chat_search(chat:Chat,req:Request): chat_id=f"chat_{time.time()}" count=0 ctx = [] #上下文信息 #print(chat.history) if chat.ai_mode=="KG": #全文检索相关知识 context,meta = full_search(chat.question,50) context = adjust_ctx_size(context) count = len(context) files = [] for i in range(count): files.append(meta[i][0]) ctx.append({"name":meta[i][0],"path":meta[i][1],"base":meta[i][2],"ctx":context[i],"size":len(context[i])}) write_chat_context(chat_id,{"req":chat.question,"mode":chat.ai_mode,"ctx":context,"meta":files}) elif chat.ai_mode=="FA": write_chat_context(chat_id,{"req":chat.question,"mode":chat.ai_mode,"history":chat.history}) elif chat.ai_mode=="DS": #深度思考,支持多轮会话 write_chat_context(chat_id,{"req":chat.question,"mode":chat.ai_mode,"history":chat.history}) elif chat.ai_mode=="NS": from web_search import search_web rs = search_web(chat.question) for r in rs: ctx.append({"name":r["title"],"path":r["link"],"ctx":r["desc"]}) count = len(ctx) write_chat_context(chat_id,{"req":chat.question,"mode":chat.ai_mode,"history":chat.history,"ctx":rs}) else: #其它两种模式暂未实现 write_chat_context(chat_id,{"req":chat.question,"mode":chat.ai_mode,"history":chat.history}) Operate_Log(req,"对话",f'[搜索对话] 问题:{chat.question}') return {"chat_id":chat_id,"count":count,"ctx":ctx,"show_file":True} #单个文件的对话 @app.post("/api/chat_one_doc") def chat_one_search(chat:Chat,req:Request): chat_id=f"chat_{time.time()}" mode = "KG" if chat.path.endswith(".xlsx") or chat.path.endswith(".xls"): mode = "DA" context= one_doc_search(chat.question,chat.base,chat.file,chat.path) ctx=[] context = adjust_ctx_size(context) for i in range(len(context)): ctx.append({"ctx":context[i],"size":len(context[i])}) write_chat_context(chat_id,{"req":chat.question,"mode":mode,"ctx":context,"meta":[chat.path]}) Operate_Log(req,"对话",f'[文件对话] 问题:{chat.question},文件:{chat.path}') return {"chat_id":chat_id,"count":len(ctx),"ctx":ctx,"show_file":False} #知识对话的流输出,采用事件流机制,实时输出 @app.get("/api/chat/{chat_id}") def chat_event(chat_id: str,req:Request): return StreamingResponse(do_chat(chat_id,req), media_type="text/event-stream") #chat的主函数 def do_chat(chat_id,req): chat = get_chat_context(chat_id) query= chat["req"] mode = chat["mode"] if "meta" in chat: meta = chat["meta"] if "ctx" in chat: context0 = chat["ctx"] if "history" in chat: history = chat["history"] if mode=="KG":#知识库对话 answer = yield from KG_chat(query,context0,meta) ref = "知识库对话" elif mode=="FA":#大模型对话 answer = yield from FA_chat(query,history) ref = "大模型对话" elif mode=="DS":#深度思考 from ds_chat import DS_chat answer = yield from DS_chat(query,history) ref = "深度思考" elif mode=="DA": #excel文件的分析 from da_chat import DA_chat answer = yield from DA_chat(query,context0,meta[0]) ref = "数据分析" elif mode=="NS": #联网搜索 from web_search import get_detail_page context = get_detail_page(context0) #print(context) answer = yield from NS_chat(query,context,history) ref = "联网搜索" username,client_ip = get_user_info(req) QA.create(username=username,client_ip=client_ip,question=query,answer=answer,ref=ref) delete_chat_by_id(chat_id) #联网搜索 def NS_chat(query,context,history): # 获取当前日期和星期 today = datetime.datetime.now() # 获取今天是星期几,使用weekday()方法(周一=0, 周日=6) weekday_number = today.weekday() system_prompt=f""" 今天是{today.strftime('%Y-%m-%d')} 星期{weekday_number+1}, 你是一个支持多轮知识问答的智能助手。 你可以根据联网搜索到的上下文信息和历史信息以及自己的知识储备来回答用户的问题。 本次问题: {query} 联网搜索: ``` {context} ``` 历史对话: ``` {history} ``` """ messages = [{'role': 'user', 'content': system_prompt}] responses = llm.chat( messages=messages, stream=True, ) # get a new response from the model where it can see the function response json_str="" try: for response in responses: json_str = "data: " + json.dumps({'rsp':response[0]["content"]}) + "\n\n" yield json_str.encode("utf-8") return response[0]["content"] except Exception as e: json_str = "data: " + json.dumps({"rsp": f"大模型运行出现错误:{e}, 请检查配置是否正确或者网络是否通畅!\n如果是本地大模型还请检查大模型是否正常启动!"}) + "\n\n" yield json_str.encode("utf-8") return f"大模型运行出现错误:{e}" #大模型自己回答 def FA_chat(query,history): # 获取当前日期和星期 today = datetime.datetime.now() # 获取今天是星期几,使用weekday()方法(周一=0, 周日=6) weekday_number = today.weekday() system_prompt=f""" 今天是{today.strftime('%Y-%m-%d')} 星期{weekday_number+1}, 你是一个支持多轮知识问答的智能助手。 你可以参考的历史信息和本次问题来回答用户的问题,更好给用户一个满意的答案。 历史对话: ``` {history} ``` 本次问题: {query} """ messages = [{'role': 'user', 'content': system_prompt}] responses = llm.chat( messages=messages, stream=True, ) # get a new response from the model where it can see the function response json_str="" try: for response in responses: json_str = "data: " + json.dumps({'rsp':response[0]["content"]}) + "\n\n" yield json_str.encode("utf-8") #QA.create(question=query,answer=response[0]["content"],ref="大模型自答") return response[0]["content"] except Exception as e: json_str = "data: " + json.dumps({"rsp": f"大模型运行出现错误:{e}, 请检查配置是否正确或者网络是否通畅!\n如果是本地大模型还请检查大模型是否正常启动!"}) + "\n\n" yield json_str.encode("utf-8") return f"大模型运行出现错误:{e}" #内部知识库对话 def KG_chat(query,context,meta): system_prompt=f""" 你是一个知识问答的助手。 你可以根据提供的上下文信息来回答用户的问题,先判断一下文件名称和问题是否相关,再考虑要不要使用这段上下文。 同时上下文中排在前面的可信度越高,越应该优先采纳。 问题: {query} 上下文: ``` {context} ``` """ messages = [{'role': 'user', 'content': system_prompt}] responses = llm.chat( messages=messages, stream=True, ) # get a new response from the model where it can see the function response json_str="" try: for response in responses: json_str = "data: " + json.dumps({'rsp':response[0]["content"]}) + "\n\n" yield json_str.encode("utf-8") if "response" in locals(): return response[0]["content"] else: raise Exception("大模型没有正常回答,请检查上下文配置") except Exception as e: json_str = "data: " + json.dumps({"rsp": f"大模型运行出现错误:{e}, 请检查配置是否正确或者网络通信是否通畅!\n如果是本地大模型还请检查大模型是否正常启动!"}) + "\n\n" yield json_str.encode("utf-8") return f"大模型运行出现错误:{e}" #大模型对话测试,主要测试api的功能和连接等 @app.post("/api/chat_llm_test") def chat_one_search(chat:Chat,req:Request): chat_id=f"chat_{time.time()}" write_chat_context(chat_id,{"req":chat.question,"mode":chat.ai_mode}) return {"chat_id":chat_id,"count":0,"ctx":"","show_file":False} #知识对话的流输出,采用事件流机制,实时输出 @app.get("/api/chat_llm/{chat_id}") def chat_event(chat_id: str,req:Request): return StreamingResponse(do_chat_llm(chat_id,req), media_type="text/event-stream") def do_chat_llm(chat_id,req): chat = get_chat_context(chat_id) query= chat["req"] mode = chat["mode"] cfg = json.loads(mode) if cfg["id"] =="xunfei": llm_temp = openai_model({ # Use your own model service compatible with OpenAI API: 'model': cfg["name"], 'model_server': cfg["url"], 'api_key': cfg["api_key"], 'generate_cfg': generate_cfg }) else: llm_temp = get_chat_model({ # Use your own model service compatible with OpenAI API: 'model': cfg["name"], 'model_server': cfg["url"], 'api_key': cfg["api_key"], 'generate_cfg': generate_cfg }) messages = [{'role': 'user', 'content': query}] responses = llm_temp.chat( messages=messages, stream=True, ) # get a new response from the model where it can see the function response json_str="" try: for response in responses: json_str = "data: " + json.dumps({'rsp':response[0]["content"]}) + "\n\n" yield json_str.encode("utf-8") except Exception as e: json_str = "data: " + json.dumps({"rsp": f"大模型运行出现错误:{e}, 请检查配置是否正确或者网络通信是否通畅!\n如果是本地大模型还请检查大模型是否正常启动!"}) + "\n\n" yield json_str.encode("utf-8") return f"大模型运行出现错误:{e}" #AI创作 def do_ai_create(chat_id): chat = get_chat_context(chat_id) query=chat["req"] mode= chat["mode"] #end for system_prompts={ "AI续写": f""" 你是一个语言处理的助手,根据提供的上下文,续写后续的内容,尽可能接近上下文表达的主题意思,字数300到500之间。 上下文: {query} """, "AI大纲": f""" 你是一个语言处理的助手,根据提供的上下文,提出一个3到5的大纲来阐述上下的主题。 上下文: {query} """, "AI丰富": f""" 你是一个语言处理的助手,根据提供的上下文,对内容进行扩写,扩写的长度为原来内容的2倍左右。 上下文: {query} """, "AI优化": f""" 你是一个语言处理的助手,根据提供的上下文,对语句进行缩写,缩写为原来长度的1/2左右。 上下文: {query} """, "AI摘要": f""" 你是一个语言处理的助手,根据提供的上下文,生成一个摘要,字数为100字左右。 上下文: {query} """, "拼写检查": f""" 你是一个语言处理的助手,根据提供的上下文,检查其中有错别字、同音字的情况,若有就直接改写。 上下文: {query} """, "智能翻译": f""" 你是一个语言处理的助手,根据提供的上下文,如果上下文是中文的就翻译成英文,如果上下文是英文的就翻译成中文。 上下文: {query} """, } messages = [{'role': 'user', 'content': system_prompts[mode]}] responses = llm.chat( messages=messages, stream=True, ) # get a new response from the model where it can see the function response try: for response in responses: json_str = "data: " + json.dumps({'rsp':response[0]["content"]}) + "\n\n" yield json_str.encode("utf-8") except Exception as e: json_str = "data: " + json.dumps({"rsp": f"大模型运行出现错误:{e}, 请检查配置是否正确或者网络是否通畅!\n如果是本地大模型还请检查大模型是否正常启动!"}) + "\n\n" yield json_str.encode("utf-8") delete_chat_by_id(chat_id) #AI创作 @app.post("/api/ai_create") def ai_create(chat:Chat): chat_id=f"ai_create_{int(time.time())}" write_chat_context(chat_id,{"req":chat.question,"mode":chat.ai_mode}) return {"chat_id":chat_id} #AI创作的流输出 @app.get("/api/ai_create/{chat_id}") def read_item(chat_id: str): return StreamingResponse(do_ai_create(chat_id), media_type="text/event-stream") #百科的查询 @app.get("/api/baike") async def query_baike( title: str = Query(None, max_length=200), catalog: str = Query(None, max_length=50), pagesize: int = Query(10, ge=0), # 必须大于等于0 ): limit=pagesize*30 if title and catalog: query= (BaiKe .select(BaiKe.id,BaiKe.title,BaiKe.catalog,BaiKe.c_time,BaiKe.m_time, BaiKe.view_count,BaiKe.modifier,BaiKe.creator) .where((BaiKe.title.contains(title)) & (BaiKe.catalog==catalog)) .order_by(BaiKe.m_time.desc()) .limit(limit) ) count = BaiKe.select().where((BaiKe.title.contains(title)) & (BaiKe.catalog==catalog)).count() elif title and not catalog: query= (BaiKe .select(BaiKe.id,BaiKe.title,BaiKe.catalog,BaiKe.c_time,BaiKe.m_time, BaiKe.view_count,BaiKe.modifier,BaiKe.creator) .where(BaiKe.title.contains(title) ) .order_by(BaiKe.m_time.desc()) .limit(limit) ) count = BaiKe.select().where(BaiKe.title.contains(title)).count() elif not title and catalog: query= (BaiKe .select(BaiKe.id,BaiKe.title,BaiKe.catalog,BaiKe.c_time,BaiKe.m_time, BaiKe.view_count,BaiKe.modifier,BaiKe.creator) .where(BaiKe.catalog==catalog) .order_by(BaiKe.m_time.desc()) .limit(limit) ) count = BaiKe.select().where(BaiKe.catalog==catalog).count() else: query= (BaiKe .select(BaiKe.id,BaiKe.title,BaiKe.catalog,BaiKe.c_time,BaiKe.m_time, BaiKe.view_count,BaiKe.modifier,BaiKe.creator) .order_by(BaiKe.m_time.desc()) .limit(limit) ) count = BaiKe.select().count() data=[] for r in query: data.append({"id":r.id,"title":r.title,"catalog":r.catalog,"c_time":r.c_time, "m_time":r.m_time,"view_count":r.view_count, "modifier":r.modifier,"creator":r.creator}) result={"count":count,"data":data} return result #百科的导出 @app.get("/api/baike/exp") async def exp_baike( req:Request, title: str = Query(None, max_length=200), catalog: str = Query(None, max_length=50), ): if title and catalog: query= (BaiKe .select(BaiKe.title,BaiKe.catalog,BaiKe.html) .where((BaiKe.title.contains(title)) & (BaiKe.catalog==catalog)) .order_by(BaiKe.m_time.desc()) ) elif title and not catalog: query= (BaiKe .select(BaiKe.title,BaiKe.catalog,BaiKe.html) .where(BaiKe.title.contains(title) ) .order_by(BaiKe.m_time.desc()) ) elif not title and catalog: query= (BaiKe .select(BaiKe.title,BaiKe.catalog,BaiKe.html) .where(BaiKe.catalog==catalog) .order_by(BaiKe.m_time.desc()) ) else: query= (BaiKe .select(BaiKe.title,BaiKe.catalog,BaiKe.html) .order_by(BaiKe.m_time.desc()) ) #生成文件 now = time.time() with open(f'{gcfg["fs"]["path"]}/pub/baike_{now}.jsonl',"w+",encoding='utf-8') as f: for r in query: f.write(json.dumps({"prompt":r.title,"completion":r.html,"catalog":r.catalog},ensure_ascii=False)+" \n") Operate_Log(req,"导出",f'[百科] 文件:pub/baike_{now}.jsonl') return {"filename":f'pub/baike_{now}.jsonl'} #百科数据的导出 @app.post("/api/baike/exp_select") async def exp_baike_select(selected: List[int],req:Request): query = BaiKe.select(BaiKe.title,BaiKe.catalog,BaiKe.html).where(BaiKe.id.in_(selected)) #生成文件 now = time.time() with open(f'{gcfg["fs"]["path"]}/pub/agents_{now}.jsonl',"w+",encoding='utf-8') as f: for r in query: f.write(json.dumps({"prompt":r.title,"completion":r.html,"catalog":r.catalog},ensure_ascii=False)+" \n") Operate_Log(req,"导出",f'[智能体] 文件:pub/agents_{now}.jsonl') return {"filename":f'pub/agents_{now}.jsonl'} #百科的查看 @app.get("/api/baike/{baike_id}") def get_baike(baike_id: int): baike = BaiKe.get_by_id(baike_id) baike.view_count +=1 baike.save() return {"title":baike.title,"catalog":baike.catalog,"html":baike.html,'c_time':baike.c_time,"m_time":baike.m_time,"view_count":baike.view_count} #百科生成word文档 @app.post("/api/baike_gen_word") def baike_gen_word(baike:New_BaiKe): from f_build import gen_word try: filename = gen_word(baike.title,baike.html) except Exception as e : raise HTTPException(status_code=500, detail=f"运行出错: {e}") return {"filename":filename} #百科生成pdf文档 @app.post("/api/baike_gen_pdf") def baike_gen_pdf(baike:New_BaiKe): from f_build import gen_pdf try: filename = gen_pdf(baike.title,baike.html) except Exception as e : msg = get_error_info() raise HTTPException(status_code=500, detail=f"运行出错: {msg}") return {"filename":filename} #百科生成ppt文档 @app.post("/api/baike_gen_ppt") def baike_gen_pdf(baike:New_BaiKe): from f_build import gen_ppt,gen_ppt_by_ai try: if baike.style: filename = gen_ppt_by_ai(baike.title,baike.html,baike.style) else: filename = gen_ppt(baike.title,baike.html) except Exception as e : msg = get_error_info() raise HTTPException(status_code=500, detail=f"运行出错:{e} \n 错误位置如下:\n {msg}") return {"filename":filename} def get_error_info(): import traceback import inspect import sys tb = traceback.extract_tb(sys.exc_info()[2]) msg="" for tb0 in tb: filename, line_number, func, text = tb0 # 最后一个帧是异常发生处 msg += f"错误在文件: {filename}, 行号: {line_number}, 函数: {func}, 代码: {text} \n" return msg #百科生成海报 @app.post("/api/baike_gen_poster") def baike_gen_poster(baike:New_BaiKe): from f_build import gen_poster,gen_poster_style try: if baike.style: filename = gen_poster_style(baike.title,baike.html,baike.style) else: filename = gen_poster(baike.title,baike.html) except Exception as e : msg = get_error_info() msg += f"错误原因:{e}" raise HTTPException(status_code=500, detail=f"运行出错: {msg}") #return FileResponse(path=file_path, media_type='application/pdf', filename=filename) return {"filename":filename} #所有的总数 @app.get("/api/count") def count_all(): from peewee import fn c1 = BaiKe.select().count() watch_sum = BaiKe.select(fn.SUM(BaiKe.view_count)).scalar() c2 = BaiKe_Catalog.select().count() c3 = Doc.select().count() s1 = Doc.select(fn.SUM(Doc.f_size)).scalar() c4 = QA.select().count() try: with open("conf/source.conf") as conf: source = json5.load(conf) except: source={"测试":{"name":"测试","type":"FTP","address":"192.168.0.1:21","user":"admin","password":"1","path":"home","scan":1,"count":1024}} c5 = len(source) today= datetime.date.today() qa_cnt = QA.select().where( fn.DATE(QA.c_time)== today ).count() doc_cnt = Doc.select().where( fn.DATE(Doc.f_up_at)== today ).count() baike_cnt = BaiKe.select().where( fn.DATE(BaiKe.m_time)== today ).count() return {"baike_count":c1,"watch_sum":watch_sum,"catalog_count":c2,"doc_count":c3,"QA_count":c4,"base_count":c5,"data_size":s1,"qa_cnt":qa_cnt,"doc_cnt":doc_cnt,"baike_cnt":baike_cnt} #百科的删除 @app.delete("/api/baike/{baike_ids}") def del_baike(baike_ids: str,req:Request): for baike_id in baike_ids.split(","): delete_fs_index_by_base_path("知识百科",baike_id) BaiKe.delete().where(BaiKe.id==baike_id).execute() Operate_Log(req,"删除",f'[百科] id:{baike_ids}') return {"msg":"OK"} #百科分类的查询 @app.get("/api/catalog") async def query_catalog( demo: str = Query(None, max_length=200), ): if demo: query= (BaiKe_Catalog .select(BaiKe_Catalog.id,BaiKe_Catalog.catalog,BaiKe_Catalog.demo,BaiKe_Catalog.c_time,BaiKe_Catalog.doc_count) .where( BaiKe_Catalog.demo.contains(demo) | BaiKe_Catalog.catalog.contains(demo)) .order_by(BaiKe_Catalog.c_time.asc()) ) count = BaiKe_Catalog.select().where( BaiKe_Catalog.demo.contains(demo) ).count() else: query= (BaiKe_Catalog .select(BaiKe_Catalog.id,BaiKe_Catalog.catalog,BaiKe_Catalog.demo,BaiKe_Catalog.c_time,BaiKe_Catalog.doc_count) .order_by(BaiKe_Catalog.c_time.asc()) ) count = BaiKe_Catalog.select().count() data=[] for r in query: data.append({"id":r.id,"catalog":r.catalog,"demo":r.demo,"c_time":r.c_time,"doc_count":r.doc_count}) return {"count":count,"data":data} #百科分类的保存 @app.post("/api/catalog") async def new_catalog(req:Request): json_data = await req.json() count = BaiKe_Catalog.select().where( BaiKe_Catalog.catalog==json_data["catalog"] ).count() if count ==0: #没有才新建 c=BaiKe_Catalog.create(catalog=json_data["catalog"],demo=json_data["demo"]) return {'code':200} #百科分类的字典形式 @app.get("/api/catalogs") def list_catalogs(): query= (BaiKe_Catalog .select(BaiKe_Catalog.catalog) .order_by(BaiKe_Catalog.c_time.asc()) ) data=[] for r in query: data.append(r.catalog) return {"data":data} #百科分类的删除 @app.get("/api/catalog_del/{catalog_id}") def del_catalog(catalog_id: int): BaiKe_Catalog.delete().where(BaiKe_Catalog.id==catalog_id).execute() return {"msg":"OK"} #所有KAgent的搜索和查询, 分组展示 @app.get("/api/kagent") async def query_kagent( req:Request, q: str = Query(None, max_length=200), ): if q: query= (KAgent .select(KAgent.id,KAgent.title,KAgent.demo,KAgent.kasn,KAgent.m_time, KAgent.atype,KAgent.catalog,KAgent.oth, KAgent.guide,KAgent.modifier,KAgent.creator) .where( KAgent.title.contains(q) | KAgent.demo.contains(q) | KAgent.guide.contains(q) ) #.order_by(KAgent.m_time.desc() ) count = KAgent.select().where( KAgent.title.contains(q) | KAgent.demo.contains(q) | KAgent.guide.contains(q) ).count() else: #.order_by(KAgent.m_time.desc() query= (KAgent .select(KAgent.id,KAgent.title,KAgent.demo,KAgent.kasn,KAgent.m_time, KAgent.atype,KAgent.catalog,KAgent.oth, KAgent.guide,KAgent.modifier,KAgent.creator) ) count = KAgent.select().count() #atypes={0:"静态知识库",1:"动态知识库",2:"AI生成",3:"融合智能体",100:"未来"} agent_catalogs = get_gcfg_mem()["agent_catalogs"] groups={} for r in query: if r.catalog==None: r.catalog="知识库" if r.oth==None: r.oth="orange" if r.catalog not in groups: groups[r.catalog]={"title": r.catalog,"data":[],"count":0,"color": agent_catalogs[r.catalog] if r.catalog in agent_catalogs else r.oth } groups[r.catalog]["data"].append({"id":r.id,"title":r.title,"demo":r.demo,"guide":r.guide, "m_time":r.m_time,"atype":r.atype,"catalog":r.catalog,"oth":r.oth, "kasn":r.kasn,"modifier":r.modifier,"creator":r.creator}) groups[r.catalog]["count"] +=1 data = list(groups.values()) return {"count":count,"data":data} #KAgent的搜索和查询+用户身份自由的权限, 适用于管理 @app.get("/api/kagent/mgr") async def query_kagent( req:Request, q: str = Query(None, max_length=200), ): username,client = get_user_info(req) if username=="管理员": if q: query= (KAgent .select(KAgent.id,KAgent.title,KAgent.demo,KAgent.kasn,KAgent.m_time, KAgent.atype,KAgent.catalog,KAgent.oth, KAgent.guide,KAgent.modifier,KAgent.creator) .where( KAgent.title.contains(q) | KAgent.demo.contains(q) | KAgent.guide.contains(q) ) .order_by(KAgent.m_time.desc()) ).dicts() #count = KAgent.select().where( KAgent.title.contains(q) | KAgent.demo.contains(q) | KAgent.guide.contains(q) ).count() else: query= (KAgent .select(KAgent.id,KAgent.title,KAgent.demo,KAgent.kasn,KAgent.m_time, KAgent.atype,KAgent.catalog,KAgent.oth, KAgent.guide,KAgent.modifier,KAgent.creator) .order_by(KAgent.m_time.desc()) ).dicts() #count = KAgent.select().count() else: if q: query= (KAgent .select(KAgent.id,KAgent.title,KAgent.demo,KAgent.kasn,KAgent.m_time, KAgent.atype,KAgent.catalog,KAgent.oth, KAgent.guide,KAgent.modifier,KAgent.creator) .where( (KAgent.title.contains(q) | KAgent.demo.contains(q) | KAgent.guide.contains(q)) & (KAgent.creator==username | KAgent.modifier==username)) .order_by(KAgent.m_time.desc()) ).dicts() #count = KAgent.select().where( KAgent.title.contains(q) | KAgent.demo.contains(q) | KAgent.guide.contains(q) ).count() else: query= (KAgent .select(KAgent.id,KAgent.title,KAgent.demo,KAgent.kasn,KAgent.m_time, KAgent.atype,KAgent.catalog,KAgent.oth, KAgent.guide,KAgent.modifier,KAgent.creator) .where( (KAgent.creator==username) | (KAgent.modifier==username)) .order_by(KAgent.m_time.desc()) ).dicts() #count = KAgent.select().where( KAgent.creator==username | KAgent.modifier==username).count() data = list(query) count = len(data) return {"count":count,"data":data} #KAgent的搜索和查询+只看自己创建的或修改过的, 还有自己订阅的 @app.get("/api/kagent/my") async def query_kagent( req:Request, q: str = Query(None, max_length=200), ): username,client = get_user_info(req) if q: query= (KAgent .select(KAgent.id,KAgent.title,KAgent.demo,KAgent.kasn,KAgent.m_time, KAgent.atype,KAgent.catalog,KAgent.oth, KAgent.guide,KAgent.modifier,KAgent.creator) .where( (KAgent.title.contains(q) | KAgent.demo.contains(q) | KAgent.guide.contains(q)) & ( (KAgent.creator==username) | (KAgent.modifier==username))) .order_by(KAgent.m_time.desc()) ).dicts() #count = KAgent.select().where( (KAgent.title.contains(q) | KAgent.demo.contains(q) | KAgent.guide.contains(q)) & ( (KAgent.creator==username) | (KAgent.modifier==username)) ).count() else: query= (KAgent .select(KAgent.id,KAgent.title,KAgent.demo,KAgent.kasn,KAgent.m_time, KAgent.atype,KAgent.catalog,KAgent.oth, KAgent.guide,KAgent.modifier,KAgent.creator) .where( (KAgent.creator==username) | (KAgent.modifier==username)) .order_by(KAgent.m_time.desc()) ).dicts() #count = KAgent.select().where( (KAgent.creator==username) | (KAgent.modifier==username)).count() data = list(query) count = len(data) #没有数据的 if count==0: data.append({"id":-1,"title":"无发布","demo":"暂时没有发布的智能体","guide":"无发布", "m_time":datetime.datetime.now(),"atype":0,"kasn":"", "modifier":username,"creator":username }) #自己订阅的 sub_data=[] sub_data.append({"id":-1,"title":"无订阅","demo":"暂时没有订阅的智能体","guide":"无订阅", "m_time":datetime.datetime.now(),"atype":0,"kasn":"", "modifier":username,"creator":username }) sub_count=0 user_id = req.cookies.get("userid", "id-00001") query = User.select(User.id,User.city).where(User.userid == user_id) user = query.first() if user and user.city: sns = user.city.split(",") query= (KAgent .select(KAgent.id,KAgent.title,KAgent.demo,KAgent.kasn,KAgent.m_time, KAgent.atype,KAgent.catalog,KAgent.oth, KAgent.guide,KAgent.modifier,KAgent.creator) .where( (KAgent.kasn.in_(sns)) ).dicts() ) sub_data = list(query) sub_count = len(sub_data) return {"count":count,"data":data,"sub_count":sub_count,"sub_data":sub_data} #KAgent的新建和保存 @app.post("/api/kagent") async def new_kagent(req:Request): ka = await req.json() username,client = get_user_info(req) #处理分类 catalog,color = ka["catalog"].split(" ") update_gcfg_mem("agent_catalogs",catalog,color) #保存 if ka["id"]==0: import secrets kasn= secrets.token_urlsafe(8) if "files" in ka: if isinstance(ka["files"],list): files = ",".join(ka["files"]) #改成字符串存储 else: files = ka["files"] else: files = "" ka_ready = KAgent.create(title=ka["title"],demo=ka["demo"],guide=ka["guide"], atype=ka["atype"],catalog=catalog,oth=color, kasn=kasn,files=files,prompt=ka["prompt"], creator=username,modifier=username) ka_id = ka_ready.id Operate_Log(req,"新建",f'[智能体] 标题:{ka["title"]} sn:{kasn}') else: ka_u = KAgent.get_or_none(KAgent.id==ka["id"]) ka_u.title = ka["title"] ka_u.demo = ka["demo"] ka_u.guide = ka["guide"] ka_u.atype = ka["atype"] ka_u.catalog = catalog ka_u.oth = color ka_u.m_time = datetime.datetime.now() ka_u.prompt = ka["prompt"] ka_u.modifier = username kasn = ka_u.kasn if "files" in ka: if isinstance(ka["files"],list): ka_u.files = ",".join(ka["files"]) #改成字符串存储 else: ka_u.files = ka["files"] else: ka_u.files = "" ka_u.save(only=[KAgent.demo,KAgent.title,KAgent.guide,KAgent.m_time,KAgent.prompt, KAgent.atype,KAgent.catalog,KAgent.oth, KAgent.files,KAgent.modifier]) ka_id = ka["id"] return {'code':200,'id':ka_id,'kasn':kasn} #KAgent的删除 @app.delete("/api/kagent/{id}") def del_kagent(id: int,req:Request): KAgent.delete().where(KAgent.id==id).execute() Operate_Log(req,"删除",f'[智能体] id:{id}') return {"msg":"OK"} #KAgent的查询 @app.get("/api/kagent/{id}") def get_kagent(id: int): kagent = KAgent.get_by_id(id) return {"id":id,"title":kagent.title,"catalog":kagent.catalog,"oth":kagent.oth,"demo":kagent.demo,"guide":kagent.guide,'atype':kagent.atype,"files":kagent.files,"prompt":kagent.prompt,"kasn":kagent.kasn} from typing import List #KAgent的导出 @app.post("/api/kagent/export") async def export_kagent(selected: List[int],req:Request): print(selected) if len(selected)>0: query = KAgent.select().where(KAgent.id.in_(selected)) else: query = KAgent.select() data=[] for r in query: data.append({"title":r.title,"demo":r.demo,"guide":r.guide,"atype":r.atype,"kasn":r.kasn,"prompt":r.prompt, "icon":r.icon,"files":r.files,"catalog":r.catalog,"baike":r.baike,"oth":r.oth, "creator":r.creator if r.creator else r.modifier}) #print(json.dumps(data)) now = int(time.time()) with open(f'{gcfg["fs"]["path"]}/pub/kagent_{now}.json',"w") as f: json.dump(data,f,ensure_ascii=False) Operate_Log(req,"导出",f'[智能体]') return {"filename":f'pub/kagent_{now}.json'} #Kagent的导入 @app.post("/api/kagent/import") async def import_kagent(req:Request,file: UploadFile = File(...)): try: username,client = get_user_info(req) data = json.loads(file.file.read()) for ka in data: KAgent.delete().where(KAgent.kasn==ka["kasn"]).execute() KAgent.create(title=ka["title"],demo=ka["demo"],guide=ka["guide"],atype=ka["atype"],kasn=ka["kasn"], files=ka["files"],prompt=ka["prompt"],icon=ka["icon"],catalog=ka["catalog"],baike=ka["baike"], oth=ka["oth"], creator= ka["creator"] if "creator" in ka and ka["creator"] else username, modifier=username) except Exception as e: return {"errno":1,"message":f"导入出错: {e}"} Operate_Log(req,"导入",f'[智能体]') return {"errno":0} #文件中心管理支持文件夹的上传,会自动创建文件夹 @app.post("/api/fs/upload") async def fs_upload(req:Request,curpath:str=Form(''), file: UploadFile = File(...)): filename = regular_filename(file.filename) abs_path = f"{gcfg['fs']['path']}/{curpath}/{filename}" os.makedirs(os.path.dirname(abs_path), exist_ok=True) with open(abs_path, "wb") as buf: shutil.copyfileobj(file.file, buf) fsize = buf.tell() try: username = req.cookies.get("username", "匿名用户") username = decode_username(username) ret = c_or_u_fs_index(filename,abs_path,fsize,username) print("上传文件",ret) except Exception as e: return {"errno":1,"message":f"上传成功: {e}"} return {"errno":0,"file":{"name":filename,"path":abs_path,"base":"文件中心"}} #KAgent的新建和保存 @app.post("/api/fs/move_dir_or_file") async def fs_move(req:Request): move_dir = await req.json() username,client = get_user_info(req) import os,shutil src = gcfg['fs']['path']+move_dir["src"] dst = gcfg['fs']['path']+move_dir["dst"] if dst.endswith("/"): os.makedirs(os.path.dirname(dst), exist_ok=True) shutil.move(src, dst) return {"errno":0,"message":"移动成功"} ################################################################################################## import hashlib import os #上传百科图片 @app.post("/api/uploadimg") async def upload_img(req:Request,file: UploadFile = File(...)): # 创建一个 SHA-256 哈希对象 sha256_hash = hashlib.sha256() # 分块读取文件以节省内存 while chunk := await file.read(8192): sha256_hash.update(chunk) # 计算最终的哈希值并转换为十六进制字符串 file_hash = sha256_hash.hexdigest() try: #上面读过了,要重新回头,下面才有内容 file.file.seek(0) with open(f"{gcfg['fs']['path']}/img/{file_hash}", "wb") as img: shutil.copyfileobj(file.file, img) except Exception as e: return {"errno":1,"message":f"上传出错: {e}"} Operate_Log(req,"上传",f'[百科图片] 文件:{file.filename}') return {"errno":0,"data":{"url":f"/api/img/{file_hash}","alt": file.filename}} #上传图片,base64格式的 @app.post("/api/img_base64") async def upload_img_base64(img_base64_data:str=Form(''),alias:str=Form('')): # 创建一个 SHA-256 哈希对象 sha256_hash = hashlib.sha256() #print(img_base64_data) import base64 img_data = base64.b64decode(img_base64_data.split(",")[1]) sha256_hash.update(img_data) # 计算最终的哈希值并转换为十六进制字符串 file_hash = sha256_hash.hexdigest() try: with open(f"{gcfg['fs']['path']}/img/{file_hash}", "wb") as img: img.write(img_data) except Exception as e: return {"errno":1,"message":f"上传出错: {e}"} return {"errno":0,"data":{"url":f"/api/img/{file_hash}","alt": alias}} #下载图片 @app.get("/api/img/{filename}") async def download_img(filename: str): file_path = os.path.join(f"{gcfg['fs']['path']}/img/", filename) if not os.path.exists(file_path): raise HTTPException(status_code=404, detail="File not found") return FileResponse(path=file_path, media_type='application/octet-stream', filename=filename) #上传视频 @app.post("/api/uploadvideo") async def upload_video(req:Request,file: UploadFile = File(...)): # 创建一个 SHA-256 哈希对象 sha256_hash = hashlib.sha256() # 分块读取文件以节省内存 while chunk := await file.read(8192): sha256_hash.update(chunk) # 计算最终的哈希值并转换为十六进制字符串 file_hash = sha256_hash.hexdigest() try: #上面读过了,要重新回头,下面才有内容 file.file.seek(0) with open(f"{gcfg['fs']['path']}/video/{file_hash}", "wb") as img: shutil.copyfileobj(file.file, img) except Exception as e: return {"errno":1,"message":f"上传出错: {e}"} Operate_Log(req,"上传",f'[百科视频] 文件:{file.filename}') return {"errno":0,"data":{"url":f"/api/video/{file_hash}","alt": file.filename}} #下载视频 @app.get("/api/video/{filename}") async def download_video(filename: str): file_path = os.path.join(f"{gcfg['fs']['path']}/video/", filename) if not os.path.exists(file_path): raise HTTPException(status_code=404, detail="File not found") return FileResponse(path=file_path, media_type='application/octet-stream', filename=filename) #大模型 @app.get("/api/llm") async def get_llm(): with open("conf/llm.default") as conf: llm_cfg = json5.load(conf) llm_cfg_dict = {d['id']: d for d in llm_cfg} selected = "local" try: with open(f"{gcfg['fs']['path']}/conf/llm.conf") as conf: llm_conf = json5.load(conf) selected=llm_conf["selected"] llm_cfg_dict_0 = {d['id']: d for d in llm_conf["llm_cfg"]} llm_cfg_dict.update(llm_cfg_dict_0) except Exception as e: print(e) selected = "local" llm_cfg_list = list(llm_cfg_dict.values()) return {"selected":selected,"llm_cfg":llm_cfg_list} #大模型的保存 @app.post("/api/llm") async def post_llm(request:Request): # 从请求体中读取JSON数据 json_data = await request.json() with open(f"{gcfg['fs']['path']}/conf/llm.conf","w+") as conf: json5.dump(json_data,conf,ensure_ascii=False) return {"code":200} #配置参数 @app.get("/api/gcfg") async def get_gcfg(): return {"data":gcfg} #配置参数,内存参数的实时更新 @app.get("/api/gcfg_mem") async def get_gcfg(): gcfg_mem0 = get_gcfg_mem() return {"data":gcfg_mem0} #配置参数保存 @app.post("/api/gcfg") async def post_gcfg(request:Request): # 从请求体中读取JSON数据 json_data = await request.json() if json_data["fs"]["pswd"] != gcfg["fs"]["pswd"]: json_data["fs"]["pswd"] = hashed_password(json_data["fs"]["pswd"]) with open("conf/gcfg.conf","w+") as conf: json5.dump(json_data,conf,ensure_ascii=False) #外部保存一份,作为备份 with open(f'{json_data["fs"]["path"]}/conf/gcfg.conf',"w+") as conf: json5.dump(json_data,conf,ensure_ascii=False) return {"code":200} #文件仓库源 @app.get("/api/source") async def get_source(demo: str = Query(None, max_length=200)): try: with open(f"{gcfg['fs']['path']}/conf/source.conf") as conf: source = json5.load(conf) except: source={"测试":{"name":"测试","type":"FTP","address":"192.168.0.1:21","user":"admin","password":"1","path":"home","scan":1,"count":1024}} data=[] for k,v in source.items(): v["password"]="" if "scan_date" in v: v["scan_date"] = datetime.datetime.fromtimestamp(v["scan_date"]).strftime('%Y-%m-%d %H:%M:%S') if demo: for k1,v1, in v.items(): if str(v1).find(demo)>=0: data.append(v) break else: data.append(v) return {"count":len(data),"data":data} #文件仓库保存 @app.post("/api/source") async def post_source(request:Request): # 从请求体中读取JSON数据 json_data = await request.json() try: with open(f"{gcfg['fs']['path']}/conf/source.conf") as conf: source = json5.load(conf) except: source={"测试":{"name":"测试","type":"FTP","address":"192.168.0.1:21","user":"admin","password":"1","path":"home","scan":1,"count":1024}} if "scan_date" in json_data: del json_data["scan_date"] source[json_data["name"]] = json_data with open(f"{gcfg['fs']['path']}/conf/source.conf","w+") as conf: json5.dump(source,conf,ensure_ascii=False) return {"code":200} #文件仓库删除 @app.delete("/api/source/{names}") async def delete_source(names:str): try: with open(f"{gcfg['fs']['path']}/conf/source.conf") as conf: source = json5.load(conf) except: source={"测试":{"name":"测试","type":"FTP","address":"192.168.0.1:21","user":"admin","password":"1","path":"home","scan":1,"count":1024}} for name in names.split(","): del source[name] with open(f"{gcfg['fs']['path']}/conf/source.conf","w+") as conf: json5.dump(source,conf,ensure_ascii=False) return {"code":200} #文件仓库的测试 @app.get("/api/source/test/{name}") async def test_source(name:str): try: with open(f"{gcfg['fs']['path']}/conf/source.conf") as conf: source = json5.load(conf) except: source={"测试":{"name":"测试","type":"FTP","address":"192.168.0.1:21","user":"admin","password":"1","path":"home","scan":1,"count":1024}} data = [] try: if source[name]['type']=='SFTP': data = sftp_list_directory(source[name]["address"],22,source[name]["user"],source[name]["password"],source[name]["path"]) elif source[name]['type']=='Windows共享': data = smb_list_directory(source[name]["address"],source[name]["user"],source[name]["password"],source[name]["path"]) elif source[name]['type']=='FTP': data = ftp_list_directory(source[name]["address"],source[name]["user"],source[name]["password"],source[name]["path"]) elif source[name]['type']=='本地': data = linux_list_directory(source[name]["path"]) elif source[name]['type']=='钉盘': data = ding_list_directory(source[name]["address"],source[name]["password"],source[name]["path"]) else: return {"code":201,"message":f"不支持的类型{source[name]['type']}"} except Exception as e: return {"code":201,"message":f"{e}"} return {"code":200,"data":data} from pathlib import Path as PPath #文件中心 @app.get("/api/fs/{full_path:path}") async def get_fs(full_path: Annotated[str, Path(description="")],demo: str = Query(None, max_length=200)): data=[] data0=[] #目录 data1=[] #文件 if full_path.find("./")>=0: return {"curpath":full_path,"count":len(data),"data":data} # 定义目录路径 directory = PPath(gcfg["fs"]["path"]+"/"+full_path) # 遍历目录中的所有条目并区分文件和目录 for entry in directory.iterdir(): if demo is None or entry.name.find(demo)==-1: continue stat = entry.stat() if entry.is_dir(): data0.append({"name":entry.name,"type":"目录","size":-1, "c_time":datetime.datetime.fromtimestamp(stat.st_ctime), "m_time":datetime.datetime.fromtimestamp(stat.st_mtime)}) elif entry.is_file(): data1.append({"name":entry.name,"type":"文件","size":stat.st_size, "c_time":datetime.datetime.fromtimestamp(stat.st_ctime), "m_time":datetime.datetime.fromtimestamp(stat.st_mtime)}) data0 = sorted(data0, key=lambda x: x['m_time'], reverse=True) data1 = sorted(data1, key=lambda x: x['m_time'], reverse=True) data = data0 + data1 return {"curpath":full_path,"count":len(data),"data":data} #文件中心下载文件 @app.get("/api/fsd/{full_path:path}") async def download_fs_file(req:Request,full_path: Annotated[str, Path(description="")]): #不安全的文件不能下载 if str(full_path).find("../") >=0: raise HTTPException(status_code=404, detail="Not a valid Path") if not full_path.startswith(gcfg['fs']['path']): file_path = f"{gcfg['fs']['path']}/{full_path}" else: file_path = full_path #文件名 filename = os.path.basename(file_path) if not os.path.exists(file_path): raise HTTPException(status_code=404, detail="File not found") if filename.endswith(".pdf"): mime_type="application/pdf" else: mime_type="application/octet-stream" Operate_Log(req,"下载",f'[文件中心] 文件:{file_path}') return FileResponse(path=file_path, media_type=mime_type, filename=filename) #删除文件或目录 @app.delete("/api/fs/{full_path:path}") async def remove_file(req:Request,full_path: Annotated[str, Path(description="")]): if not full_path.startswith(gcfg['fs']['path']): file_path = f"{gcfg['fs']['path']}/{full_path}" else: file_path = full_path file_name = os.path.basename(file_path) if not os.path.exists(file_path): raise HTTPException(status_code=404, detail="Dir or File not found") try: os.remove(file_path) except: os.rmdir(file_path) #从全文索引中删除 delete_fs_index_by_base_path("文件中心",file_path) #从知识库文档中删除 Doc.delete().where((Doc.abs_path==file_path) & (Doc.f_name==file_name) & (Doc.base=="文件中心")).execute() Operate_Log(req,"删除",f'[文件中心] 文件:{file_path}') return {"errno":0} #创建目录 @app.post("/api/fs/{full_path:path}") async def mk_dir(full_path: Annotated[str, Path(description="")]): dir_path = f"{gcfg['fs']['path']}/{full_path}" if os.path.exists(dir_path): raise HTTPException(status_code=404, detail="Dir is exists") os.mkdir(dir_path) return {"errno":0} #知识库文档的查询 @app.get("/api/kg_doc") async def query_kg_doc( demo: str = Query(None, max_length=200), fpath: str = Query(None, max_length=200) ): limit=1000 if demo and fpath: query= (Doc .select(Doc.id,Doc.base,Doc.f_name,Doc.f_up_at,Doc.f_size,Doc.f_sec,Doc.abs_path,Doc.author) .where( (Doc.abs_path.contains(fpath)) & (Doc.f_name.contains(demo)) ) .order_by(Doc.f_up_at.desc()) .limit(limit) ) count = Doc.select().where( (Doc.abs_path.contains(fpath)) & (Doc.f_name.contains(demo)) ).count() elif fpath: query= (Doc .select(Doc.id,Doc.base,Doc.f_name,Doc.f_up_at,Doc.f_size,Doc.f_sec,Doc.abs_path,Doc.author) .where( Doc.abs_path.contains(fpath) ) .order_by(Doc.f_up_at.desc()) .limit(limit) ) count = Doc.select().where( Doc.abs_path.contains(fpath) ).count() elif demo: query= (Doc .select(Doc.id,Doc.base,Doc.f_name,Doc.f_up_at,Doc.f_size,Doc.f_sec,Doc.abs_path,Doc.author) .where( Doc.abs_path.contains(demo) ) .order_by(Doc.f_up_at.desc()) .limit(limit) ) count = Doc.select().where( Doc.abs_path.contains(demo) ).count() else: query= (Doc .select(Doc.id,Doc.base,Doc.f_name,Doc.f_up_at,Doc.f_size,Doc.f_sec,Doc.abs_path,Doc.author) .order_by(Doc.f_up_at.desc()) .limit(limit) ) count = Doc.select().count() data=[] for r in query: data.append({"id":r.id,"base":r.base,"f_name":r.f_name,"ct_time":r.f_up_at, "size":r.f_size,"like":r.f_sec,"path":r.abs_path,"author":r.author}) return {"count":count,"data":data} #历史对话记录的查询 @app.get("/api/chat_history") async def chat_history(req:Request, demo: str = Query(None, max_length=200), ): limit=10*10 username,client = get_user_info(req) if username=="管理员": if demo: query= (QA .select() .where( (QA.question.contains(demo)) | (QA.answer.contains(demo)) ) .order_by(QA.c_time.desc()) .limit(limit) ) count = QA.select().where( (QA.question.contains(demo)) | (QA.answer.contains(demo)) ).count() else: query= (QA .select() .order_by(QA.c_time.desc()) .limit(limit) ) count = QA.select().count() else: #只看自己的 if demo: query= (QA .select() .where( ((QA.question.contains(demo)) | (QA.answer.contains(demo))) & (QA.username==username) ) .order_by(QA.c_time.desc()) .limit(limit) ) count = QA.select().where( ((QA.question.contains(demo)) | (QA.answer.contains(demo))) & (QA.username==username) ).count() else: query= (QA .select() .where(QA.username==username) .order_by(QA.c_time.desc()) .limit(limit) ) count = QA.select().where(QA.username==username).count() data=[] for r in query: data.append({"id":r.id,"userid":r.userid,"username":r.username,"query":r.question,"answer":r.answer,"c_time":r.c_time,"ref":r.ref,"client_ip":r.client_ip}) return {"count":count,"data":data} #历史对话记录的查询 @app.get("/api/chat_history/my") async def chat_history(req:Request, title: str = Query(None, max_length=200), ): limit=5 username,client = get_user_info(req) query= (QA .select(QA.question) .where((QA.username==username) & (QA.ref.contains(title))) .order_by(QA.c_time.desc()) .limit(limit) ) data=[] for r in query: data.append(r.question) return {"count":limit,"data":data} #对话记录的删除 @app.delete("/api/chat_history/{chat_ids}") def del_chat_his(chat_ids: str): for chat_id in chat_ids.split(","): QA.delete().where(QA.id==chat_id).execute() return {"msg":"OK"} #历史对话记录的查询 @app.get("/api/today") async def count_today(): today= datetime.date.today() qa_cnt = QA.select().where( fn.DATE(QA.c_time)== today ).count() doc_cnt = Doc.select().where( fn.DATE(Doc.f_up_at)== today ).count() baike_cnt = BaiKe.select().where( fn.DATE(BaiKe.m_time)== today ).count() return {"qa_cnt":qa_cnt,"doc_cnt":doc_cnt,"baike_cnt":baike_cnt} #用户信息的查询 @app.get("/api/user") async def query_user( demo: str = Query(None, max_length=200), ): limit=500 if demo: query= (User .select() .where( User.username.contains(demo) ) .order_by(User.c_time.desc()) .limit(limit) ) count = User.select().where( User.username.contains(demo) ).count() else: query= (User .select() .order_by(User.c_time.desc()) .limit(limit) ) count = User.select().count() data=[] for r in query: data.append({"userid":r.userid,"username":r.username,"telphone":decrypt_tel(r.telphone),"c_time":r.c_time,"city":r.city,"org":r.org,"ref":r.ref}) return {"count":count,"data":data} #用户的删除 @app.delete("/api/user/{user_ids}") def del_user(user_ids: str): for user_id in user_ids.split(","): User.delete().where(User.userid==user_id).execute() return {"msg":"OK"} #用户的city,订阅的智能体 @app.put("/api/user_sn/{kasn}") def put_user_sn(req:Request,kasn:str): user_id = req.cookies.get("userid", "id-00001") query = User.select(User.id,User.city).where(User.userid == user_id) user = query.first() if user: if user.city: user.city += ","+kasn else: user.city = kasn user.save(only=[User.city]) return {"msg":"OK"} #用户的city,取消订阅 @app.put("/api/cancel_sn/{kasn}") def put_user_sn(req:Request,kasn:str): user_id = req.cookies.get("userid", "id-00001") query = User.select(User.id,User.city).where(User.userid == user_id) user = query.first() if user and user.city: sns = user.city.split(",") sns = [x for x in sns if x != kasn] user.city = ",".join(sns) user.save(only=[User.city]) return {"msg":"OK"} #日志查询 @app.get("/api/logs") async def query_logs( demo: str = Query(None, max_length=200), ): limit=500 if demo: query= (Login .select() .where( Login.username.contains(demo) ) .order_by(Login.c_time.desc()) .limit(limit) ) count = Login.select().where( Login.username.contains(demo) ).count() else: query= (Login .select() .order_by(Login.c_time.desc()) .limit(limit) ) count = Login.select().count() data=[] for r in query: data.append({"id":r.id,"userid":r.userid,"username":r.username,"action":r.action,"c_time":r.c_time,"pt":r.pt,"ref":r.ref,"client_ip":r.client_ip}) return {"count":count,"data":data} #日志的删除 @app.delete("/api/logs/{log_ids}") def del_user(req:Request,log_ids: str): for log_id in log_ids.split(","): Login.delete().where(Login.id==log_id).execute() Operate_Log(req,"删除",f'[日志] id:{log_ids}') return {"msg":"OK"}