From 1adff3dce96f2bfa5120bd6e0bd05699ef7afa7e Mon Sep 17 00:00:00 2001 From: inter Date: Mon, 8 Sep 2025 16:36:16 +0800 Subject: [PATCH] Add File --- backend/apps/system/crud/user.py | 88 ++++++++++++++++++++++++++++++++ 1 file changed, 88 insertions(+) create mode 100644 backend/apps/system/crud/user.py diff --git a/backend/apps/system/crud/user.py b/backend/apps/system/crud/user.py new file mode 100644 index 0000000..69255a5 --- /dev/null +++ b/backend/apps/system/crud/user.py @@ -0,0 +1,88 @@ + +from typing import Optional +from sqlmodel import Session, func, select, delete as sqlmodel_delete +from apps.system.models.system_model import UserWsModel, WorkspaceModel +from apps.system.schemas.auth import CacheName, CacheNamespace +from apps.system.schemas.system_schema import EMAIL_REGEX, PWD_REGEX, BaseUserDTO, UserInfoDTO, UserWs +from common.core.deps import SessionDep +from common.core.sqlbot_cache import cache, clear_cache +from common.utils.locale import I18n +from common.utils.utils import SQLBotLogUtil +from ..models.user import UserModel +from common.core.security import verify_md5pwd +import re + +def get_db_user(*, session: Session, user_id: int) -> UserModel: + db_user = session.get(UserModel, user_id) + return db_user + +def get_user_by_account(*, session: Session, account: str) -> BaseUserDTO | None: + statement = select(UserModel).where(UserModel.account == account) + db_user = session.exec(statement).first() + if not db_user: + return None + return BaseUserDTO.model_validate(db_user.model_dump()) + +@cache(namespace=CacheNamespace.AUTH_INFO, cacheName=CacheName.USER_INFO, keyExpression="user_id") +async def get_user_info(*, session: Session, user_id: int) -> UserInfoDTO | None: + db_user: UserModel = get_db_user(session = session, user_id = user_id) + if not db_user: + return None + userInfo = UserInfoDTO.model_validate(db_user.model_dump()) + userInfo.isAdmin = userInfo.id == 1 and userInfo.account == 'admin' + if userInfo.isAdmin: + return userInfo + ws_model: UserWsModel = session.exec(select(UserWsModel).where(UserWsModel.uid == userInfo.id, UserWsModel.oid == userInfo.oid)).first() + userInfo.weight = ws_model.weight if ws_model else -1 + return userInfo + +def authenticate(*, session: Session, account: str, password: str) -> BaseUserDTO | None: + db_user = get_user_by_account(session=session, account=account) + if not db_user: + return None + if not verify_md5pwd(password, db_user.password): + return None + return db_user + +async def user_ws_options(session: Session, uid: int, trans: Optional[I18n] = None) -> list[UserWs]: + if uid == 1: + stmt = select(WorkspaceModel.id, WorkspaceModel.name).order_by(WorkspaceModel.name, WorkspaceModel.create_time) + else: + stmt = select(WorkspaceModel.id, WorkspaceModel.name).join( + UserWsModel, UserWsModel.oid == WorkspaceModel.id + ).where( + UserWsModel.uid == uid, + ).order_by(WorkspaceModel.name, WorkspaceModel.create_time) + result = session.exec(stmt) + if not trans: + return result.all() + return [ + UserWs(id = id, name = trans(name) if name.startswith('i18n') else name) + for id, name in result.all() + ] + +@clear_cache(namespace=CacheNamespace.AUTH_INFO, cacheName=CacheName.USER_INFO, keyExpression="id") +async def single_delete(session: SessionDep, id: int): + user_model: UserModel = get_db_user(session = session, user_id = id) + del_stmt = sqlmodel_delete(UserWsModel).where(UserWsModel.uid == id) + session.exec(del_stmt) + session.delete(user_model) + session.commit() + +@clear_cache(namespace=CacheNamespace.AUTH_INFO, cacheName=CacheName.USER_INFO, keyExpression="id") +async def clean_user_cache(id: int): + SQLBotLogUtil.info(f"User cache for [{id}] has been cleaned") + + +def check_account_exists(*, session: Session, account: str) -> bool: + return session.exec(select(func.count()).select_from(UserModel).where(UserModel.account == account)).one() > 0 +def check_email_exists(*, session: Session, email: str) -> bool: + return session.exec(select(func.count()).select_from(UserModel).where(UserModel.email == email)).one() > 0 + + + +def check_email_format(email: str) -> bool: + return bool(EMAIL_REGEX.fullmatch(email)) + +def check_pwd_format(pwd: str) -> bool: + return bool(PWD_REGEX.fullmatch(pwd))