from fastapi import FastAPI,Query,UploadFile,File,HTTPException,Request,Path,Form from fastapi.responses import StreamingResponse,FileResponse,RedirectResponse from fastapi.staticfiles import StaticFiles from fastapi.responses import JSONResponse from api import * from dingtalk import * from wechat import * from init import gcfg from utils import * from k_database import User,Login #应用 app = FastAPI() # 将'static'目录挂载到'/static'路径 app.mount("/x", StaticFiles(directory="x"), name="static") @app.get("/") def read_root(req:Request,sn: str = Query(None, max_length=200)): userid = req.cookies.get("userid", "id-00001") if userid=="id-00001": return RedirectResponse(url="/x/login.html?sn="+sn) return RedirectResponse(url="/x/kagent.html?sn="+sn) #login注册 @app.post("/api/login") async def login_kagent(req:Request): login = await req.json() if login["telphone"]=="" or login["username"]=="": response = JSONResponse(content={"code":401,"msg":"注册数据不能为空"}) return response query = User.select().where(User.title == hashed_tel(login["telphone"])) user = query.first() if user: userid = user.userid if (user.username != login["username"] or (login["org"]!="" and login["org"]!=user.org)): #信息变动,更新注册信息 user.username = login["username"] user.org = login["org"] user.save(only=[User.username,User.org]) #百科个人空间 BaiKe_Catalog.create(catalog=f'个人空间/{login["username"]}',demo="存放个人数据") else: import secrets userid= secrets.token_urlsafe(8) user=User.create(userid=userid,username=login["username"],telphone=encrypt_tel(login["telphone"]), org=login["org"],title=hashed_tel(login["telphone"]),ref="注册") #百科个人空间 BaiKe_Catalog.create(catalog=f'个人空间/{login["username"]}',demo="存放个人数据") #上传文件空间 if not os.path.exists(f'{gcfg["fs"]["path"]}/个人文件夹/{login["username"]}'): os.makedirs(f'{gcfg["fs"]["path"]}/个人文件夹/{login["username"]}') Login.create(userid=userid,username=login["username"],action="登录", pt=f"智能体:{login['kagent_sn']}",ref="注册",client_ip=get_client_ip(req)) response = JSONResponse(content={"code":200}) response.set_cookie(key="userid", value=userid, httponly=True) response.set_cookie(key="username", value=login["username"].encode("utf8"), httponly=True) response.set_cookie(key="ref", value="注册".encode("utf8"), httponly=True) return response @app.get("/api/dingtalk_auth") def dingtalk_kagent(authCode:str,state:str,req:Request): # 定义钉钉应用的 AppKey 和 AppSecret app_key = gcfg["dingtalk"]["app_key"] app_secret = gcfg["dingtalk"]["app_secret"] # 第一步:获取 Access Token access_token = get_user_access_token(app_key, app_secret, authCode) #print(access_token) # 第二步:使用 AuthCode 获取用户信息 #验证码 user_info = get_user_details(access_token) #print(f"Basic User Info: {user_info}") if "email" not in user_info: user_info["email"]="" query = User.select().where(User.title == hashed_tel(user_info["mobile"])) user = query.first() if user: userid = user.userid else: import secrets userid= secrets.token_urlsafe(8) user=User.create(userid=userid,username=user_info["nick"], telphone=encrypt_tel(user_info["mobile"]), org=user_info["email"]+" "+user_info["unionId"], email=user_info["email"], title= hashed_tel(user_info["mobile"]), ref="钉钉") #百科个人空间 username=user_info["nick"] BaiKe_Catalog.create(catalog=f'个人空间/{username}',demo="存放个人数据") #上传文件空间 if not os.path.exists(f'{gcfg["fs"]["path"]}/个人文件夹/{username}'): os.makedirs(f'{gcfg["fs"]["path"]}/个人文件夹/{username}') Login.create(userid=userid,username=user_info["nick"],action="登录", pt=f"智能体:{state}",ref="钉钉",client_ip=get_client_ip(req)) response = RedirectResponse(url="/x/kagent.html?sn="+state) response.set_cookie(key="userid", value=userid, httponly=True) response.set_cookie(key="username", value=user_info["nick"].encode("utf8"), httponly=True) response.set_cookie(key="ref", value="钉钉".encode("utf8"), httponly=True) return response #钉钉应用内登录 @app.get("/api/dingtalk_auth_code") def dingtalk_kagent_has_code(code:str,state:str,req:Request): # 定义钉钉应用的 AppKey 和 AppSecret app_key = gcfg["dingtalk"]["app_key"] app_secret = gcfg["dingtalk"]["app_secret"] corpid = gcfg["dingtalk"]["exclusiveCorpId"] # 第一步:获取 Access Token access_token = get_access_token(corpid,app_key, app_secret) #print(access_token) # 第二步:使用 AuthToken 和code 获取用户信息 #验证码 user_info = get_userinfo_by_token_and_code(access_token,code) #print(f"Basic User Info: {user_info}") query = User.select().where(User.title == hashed_tel(user_info["userid"])) user = query.first() if user: userid = user.userid else: import secrets userid= secrets.token_urlsafe(8) user=User.create(userid=userid,username=user_info["name"], telphone=encrypt_tel(user_info["userid"]), org= user_info["unionid"], email=user_info["device_id"], title = hashed_tel(user_info["userid"]), ref="钉钉") #百科个人空间 username=user_info["name"] BaiKe_Catalog.create(catalog=f'个人空间/{username}',demo="存放个人数据") #上传文件空间 if not os.path.exists(f'{gcfg["fs"]["path"]}/个人文件夹/{username}'): os.makedirs(f'{gcfg["fs"]["path"]}/个人文件夹/{username}') Login.create(userid=userid,username=user_info["name"],action="登录", pt=f"智能体:{state}",ref="钉钉",client_ip=get_client_ip(req)) response = JSONResponse(content={"url":"/x/wap.html?sn="+state}) response.set_cookie(key="userid", value=userid, httponly=True) response.set_cookie(key="username", value=user_info["name"].encode("utf8"), httponly=True) response.set_cookie(key="ref", value="钉钉".encode("utf8"), httponly=True) return response @app.get("/api/wechat_auth") def wechat_kagent(code:str,state:str,req:Request): # 定义钉钉应用的 AppKey 和 AppSecret app_key = gcfg["wechat"]["app_key"] app_secret = gcfg["wechat"]["app_secret"] # 第一步:获取 Access Token access_token = get_wechat_access_token(app_key,app_secret,code) #print(access_token) # 第二步:使用 AuthCode 获取用户信息 #验证码 user_info = get_wechat_userinfo(access_token["access_token"],access_token["openid"]) print(f"Basic User Info: {user_info}") #print(user_info["nickname"]) query = User.select().where(User.title == hashed_tel(user_info["unionid"])) user = query.first() if user: userid = user.userid else: import secrets userid= secrets.token_urlsafe(8) user=User.create(userid=userid,username=user_info["nickname"], telphone=encrypt_tel(user_info["unionid"]), org=user_info["headimgurl"], email=user_info["openid"], title = hashed_tel(user_info["unionid"]), ref="微信") #百科个人空间 username=user_info["nickname"] BaiKe_Catalog.create(catalog=f'个人空间/{username}',demo="存放个人数据") #上传文件空间 if not os.path.exists(f'{gcfg["fs"]["path"]}/个人文件夹/{username}'): os.makedirs(f'{gcfg["fs"]["path"]}/个人文件夹/{username}') Login.create(userid=userid,username=user_info["nickname"],action="登录", pt=f"智能体:{state}",ref="微信",client_ip=get_client_ip(req)) response = RedirectResponse(url="/x/kagent.html?sn="+state) response.set_cookie(key="userid", value=userid, httponly=True) response.set_cookie(key="username", value=user_info["nickname"].encode("utf8"), httponly=True) response.set_cookie(key="headimgurl", value=user_info["headimgurl"]) response.set_cookie(key="ref", value="微信".encode("utf8"), httponly=True) return response @app.get("/api/logout") def logout_kagent(sn:str,req:Request): userid = req.cookies.get("userid", "id-00001") username = req.cookies.get("username", "匿名用户") username = decode_username(username) ref = req.cookies.get("ref", "注册") ref = decode_username(ref) Login.create(userid=userid,username=username,action="退出",pt=f"智能体:{sn}",client_ip=get_client_ip(req),ref=ref) response = JSONResponse(content={"code":200}) #response.delete_cookie("username"),用户名先给保留着 #response.delete_cookie("userid") response.delete_cookie("headimgurl") response.delete_cookie("ref") return response #上传文件 app.add_api_route("/api/upload",upload_file,methods=["POST"]) #获取智能体信息 app.add_api_route("/api/kagent_cfg/{kasn}",get_kagent_by_sn,methods=["GET"]) #对话 app.add_api_route("/api/chat_kagent",chat_kagent,methods=["POST"]) #对话响应 app.add_api_route("/api/chat_kagent/{chat_id}",chat_event,methods=["GET"]) #对话服务 app.add_api_route("/v1/chat/completions",chat_completions,methods=["POST"]) #百科保存 app.add_api_route("/api/baike",new_baike,methods=["POST"])