From 6acb68459373f37897f90753f3e701113b8a4147 Mon Sep 17 00:00:00 2001 From: 13315423919 <13315423919@qq.com> Date: Wed, 19 Nov 2025 19:43:17 +0800 Subject: [PATCH] Add File --- main/webx.py | 250 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 250 insertions(+) create mode 100644 main/webx.py diff --git a/main/webx.py b/main/webx.py new file mode 100644 index 0000000..9b0bf03 --- /dev/null +++ b/main/webx.py @@ -0,0 +1,250 @@ +from fastapi import FastAPI,Query,UploadFile,File,HTTPException,Request,Path,Form +from fastapi.responses import StreamingResponse,FileResponse,RedirectResponse +from fastapi.staticfiles import StaticFiles +from fastapi.responses import JSONResponse + +from api import * +from dingtalk import * +from wechat import * +from init import gcfg +from utils import * + +from k_database import User,Login + + +#应用 +app = FastAPI() + + +# 将'static'目录挂载到'/static'路径 +app.mount("/x", StaticFiles(directory="x"), name="static") + + +@app.get("/") +def read_root(req:Request,sn: str = Query(None, max_length=200)): + userid = req.cookies.get("userid", "id-00001") + if userid=="id-00001": + return RedirectResponse(url="/x/login.html?sn="+sn) + return RedirectResponse(url="/x/kagent.html?sn="+sn) + + +#login注册 +@app.post("/api/login") +async def login_kagent(req:Request): + login = await req.json() + + if login["telphone"]=="" or login["username"]=="": + response = JSONResponse(content={"code":401,"msg":"注册数据不能为空"}) + return response + + query = User.select().where(User.title == hashed_tel(login["telphone"])) + user = query.first() + if user: + userid = user.userid + + if (user.username != login["username"] or + (login["org"]!="" and login["org"]!=user.org)): + #信息变动,更新注册信息 + user.username = login["username"] + user.org = login["org"] + user.save(only=[User.username,User.org]) + #百科个人空间 + BaiKe_Catalog.create(catalog=f'个人空间/{login["username"]}',demo="存放个人数据") + else: + import secrets + userid= secrets.token_urlsafe(8) + user=User.create(userid=userid,username=login["username"],telphone=encrypt_tel(login["telphone"]), + org=login["org"],title=hashed_tel(login["telphone"]),ref="注册") + + #百科个人空间 + BaiKe_Catalog.create(catalog=f'个人空间/{login["username"]}',demo="存放个人数据") + #上传文件空间 + if not os.path.exists(f'{gcfg["fs"]["path"]}/个人文件夹/{login["username"]}'): + os.makedirs(f'{gcfg["fs"]["path"]}/个人文件夹/{login["username"]}') + + Login.create(userid=userid,username=login["username"],action="登录", pt=f"智能体:{login['kagent_sn']}",ref="注册",client_ip=get_client_ip(req)) + response = JSONResponse(content={"code":200}) + response.set_cookie(key="userid", value=userid, httponly=True) + response.set_cookie(key="username", value=login["username"].encode("utf8"), httponly=True) + response.set_cookie(key="ref", value="注册".encode("utf8"), httponly=True) + return response + + + +@app.get("/api/dingtalk_auth") +def dingtalk_kagent(authCode:str,state:str,req:Request): + + # 定义钉钉应用的 AppKey 和 AppSecret + app_key = gcfg["dingtalk"]["app_key"] + app_secret = gcfg["dingtalk"]["app_secret"] + + # 第一步:获取 Access Token + access_token = get_user_access_token(app_key, app_secret, authCode) + #print(access_token) + + # 第二步:使用 AuthCode 获取用户信息 + #验证码 + user_info = get_user_details(access_token) + #print(f"Basic User Info: {user_info}") + if "email" not in user_info: + user_info["email"]="" + + query = User.select().where(User.title == hashed_tel(user_info["mobile"])) + user = query.first() + if user: + userid = user.userid + else: + import secrets + userid= secrets.token_urlsafe(8) + user=User.create(userid=userid,username=user_info["nick"], + telphone=encrypt_tel(user_info["mobile"]), + org=user_info["email"]+" "+user_info["unionId"], + email=user_info["email"], + title= hashed_tel(user_info["mobile"]), + ref="钉钉") + #百科个人空间 + username=user_info["nick"] + BaiKe_Catalog.create(catalog=f'个人空间/{username}',demo="存放个人数据") + #上传文件空间 + if not os.path.exists(f'{gcfg["fs"]["path"]}/个人文件夹/{username}'): + os.makedirs(f'{gcfg["fs"]["path"]}/个人文件夹/{username}') + + Login.create(userid=userid,username=user_info["nick"],action="登录", pt=f"智能体:{state}",ref="钉钉",client_ip=get_client_ip(req)) + + response = RedirectResponse(url="/x/kagent.html?sn="+state) + response.set_cookie(key="userid", value=userid, httponly=True) + response.set_cookie(key="username", value=user_info["nick"].encode("utf8"), httponly=True) + response.set_cookie(key="ref", value="钉钉".encode("utf8"), httponly=True) + return response + + +#钉钉应用内登录 +@app.get("/api/dingtalk_auth_code") +def dingtalk_kagent_has_code(code:str,state:str,req:Request): + + # 定义钉钉应用的 AppKey 和 AppSecret + app_key = gcfg["dingtalk"]["app_key"] + app_secret = gcfg["dingtalk"]["app_secret"] + corpid = gcfg["dingtalk"]["exclusiveCorpId"] + + # 第一步:获取 Access Token + access_token = get_access_token(corpid,app_key, app_secret) + #print(access_token) + + # 第二步:使用 AuthToken 和code 获取用户信息 + #验证码 + user_info = get_userinfo_by_token_and_code(access_token,code) + #print(f"Basic User Info: {user_info}") + + query = User.select().where(User.title == hashed_tel(user_info["userid"])) + user = query.first() + if user: + userid = user.userid + else: + import secrets + userid= secrets.token_urlsafe(8) + user=User.create(userid=userid,username=user_info["name"], + telphone=encrypt_tel(user_info["userid"]), + org= user_info["unionid"], + email=user_info["device_id"], + title = hashed_tel(user_info["userid"]), + ref="钉钉") + #百科个人空间 + username=user_info["name"] + BaiKe_Catalog.create(catalog=f'个人空间/{username}',demo="存放个人数据") + #上传文件空间 + if not os.path.exists(f'{gcfg["fs"]["path"]}/个人文件夹/{username}'): + os.makedirs(f'{gcfg["fs"]["path"]}/个人文件夹/{username}') + + Login.create(userid=userid,username=user_info["name"],action="登录", pt=f"智能体:{state}",ref="钉钉",client_ip=get_client_ip(req)) + + response = JSONResponse(content={"url":"/x/wap.html?sn="+state}) + response.set_cookie(key="userid", value=userid, httponly=True) + response.set_cookie(key="username", value=user_info["name"].encode("utf8"), httponly=True) + response.set_cookie(key="ref", value="钉钉".encode("utf8"), httponly=True) + return response + + + +@app.get("/api/wechat_auth") +def wechat_kagent(code:str,state:str,req:Request): + + # 定义钉钉应用的 AppKey 和 AppSecret + app_key = gcfg["wechat"]["app_key"] + app_secret = gcfg["wechat"]["app_secret"] + + + # 第一步:获取 Access Token + access_token = get_wechat_access_token(app_key,app_secret,code) + #print(access_token) + + # 第二步:使用 AuthCode 获取用户信息 + #验证码 + user_info = get_wechat_userinfo(access_token["access_token"],access_token["openid"]) + print(f"Basic User Info: {user_info}") + #print(user_info["nickname"]) + + query = User.select().where(User.title == hashed_tel(user_info["unionid"])) + user = query.first() + if user: + userid = user.userid + else: + import secrets + userid= secrets.token_urlsafe(8) + user=User.create(userid=userid,username=user_info["nickname"], + telphone=encrypt_tel(user_info["unionid"]), + org=user_info["headimgurl"], + email=user_info["openid"], + title = hashed_tel(user_info["unionid"]), + ref="微信") + #百科个人空间 + username=user_info["nickname"] + BaiKe_Catalog.create(catalog=f'个人空间/{username}',demo="存放个人数据") + #上传文件空间 + if not os.path.exists(f'{gcfg["fs"]["path"]}/个人文件夹/{username}'): + os.makedirs(f'{gcfg["fs"]["path"]}/个人文件夹/{username}') + + Login.create(userid=userid,username=user_info["nickname"],action="登录", pt=f"智能体:{state}",ref="微信",client_ip=get_client_ip(req)) + + response = RedirectResponse(url="/x/kagent.html?sn="+state) + response.set_cookie(key="userid", value=userid, httponly=True) + response.set_cookie(key="username", value=user_info["nickname"].encode("utf8"), httponly=True) + response.set_cookie(key="headimgurl", value=user_info["headimgurl"]) + response.set_cookie(key="ref", value="微信".encode("utf8"), httponly=True) + + return response + +@app.get("/api/logout") +def logout_kagent(sn:str,req:Request): + userid = req.cookies.get("userid", "id-00001") + username = req.cookies.get("username", "匿名用户") + username = decode_username(username) + ref = req.cookies.get("ref", "注册") + ref = decode_username(ref) + Login.create(userid=userid,username=username,action="退出",pt=f"智能体:{sn}",client_ip=get_client_ip(req),ref=ref) + response = JSONResponse(content={"code":200}) + #response.delete_cookie("username"),用户名先给保留着 + #response.delete_cookie("userid") + response.delete_cookie("headimgurl") + response.delete_cookie("ref") + return response + + +#上传文件 +app.add_api_route("/api/upload",upload_file,methods=["POST"]) + +#获取智能体信息 +app.add_api_route("/api/kagent_cfg/{kasn}",get_kagent_by_sn,methods=["GET"]) + +#对话 +app.add_api_route("/api/chat_kagent",chat_kagent,methods=["POST"]) + +#对话响应 +app.add_api_route("/api/chat_kagent/{chat_id}",chat_event,methods=["GET"]) + +#对话服务 +app.add_api_route("/v1/chat/completions",chat_completions,methods=["POST"]) + +#百科保存 +app.add_api_route("/api/baike",new_baike,methods=["POST"]) +