From ea87d266784762ac23b04792865b68332825a24c Mon Sep 17 00:00:00 2001 From: inter Date: Mon, 8 Sep 2025 16:36:21 +0800 Subject: [PATCH] Add File --- backend/apps/datasource/crud/permission.py | 82 ++++++++++++++++++++++ 1 file changed, 82 insertions(+) create mode 100644 backend/apps/datasource/crud/permission.py diff --git a/backend/apps/datasource/crud/permission.py b/backend/apps/datasource/crud/permission.py new file mode 100644 index 0000000..80c32ef --- /dev/null +++ b/backend/apps/datasource/crud/permission.py @@ -0,0 +1,82 @@ +import json +from typing import List, Optional + +from sqlalchemy import and_ +from apps.datasource.crud.row_permission import transFilterTree +from apps.datasource.models.datasource import CoreDatasource, CoreField, CoreTable +from common.core.deps import CurrentUser, SessionDep +from sqlbot_xpack.permissions.api.permission import transRecord2DTO +from sqlbot_xpack.permissions.models.ds_permission import DsPermission, PermissionDTO +from sqlbot_xpack.permissions.models.ds_rules import DsRules + +def get_row_permission_filters(session: SessionDep, current_user: CurrentUser, ds: CoreDatasource, + tables: Optional[list] = None, single_table: Optional[CoreTable] = None): + if single_table: + table_list = [session.get(CoreTable, single_table.id)] + else: + table_list = session.query(CoreTable).filter( + and_(CoreTable.ds_id == ds.id, CoreTable.table_name.in_(tables)) + ).all() + + filters = [] + if is_normal_user(current_user): + for table in table_list: + row_permissions = session.query(DsPermission).filter( + and_(DsPermission.table_id == table.id, DsPermission.type == 'row')).all() + contain_rules = session.query(DsRules).all() + res: List[PermissionDTO] = [] + if row_permissions is not None: + for permission in row_permissions: + # check permission and user in same rules + flag = False + for r in contain_rules: + p_list = json.loads(r.permission_list) + u_list = json.loads(r.user_list) + if p_list is not None and u_list is not None and permission.id in p_list and ( + current_user.id in u_list or f'{current_user.id}' in u_list): + flag = True + if flag: + res.append(transRecord2DTO(session, permission)) + where_str = transFilterTree(session, res, ds) + filters.append({"table": table.table_name, "filter": where_str}) + return filters + + +def get_column_permission_fields(session: SessionDep, current_user: CurrentUser, table: CoreTable, + fields: list[CoreField]): + if is_normal_user(current_user): + column_permissions = session.query(DsPermission).filter( + and_(DsPermission.table_id == table.id, DsPermission.type == 'column')).all() + contain_rules = session.query(DsRules).all() + if column_permissions is not None: + for permission in column_permissions: + # check permission and user in same rules + # obj = session.query(DsRules).filter( + # and_(DsRules.permission_list.op('@>')(cast([permission.id], JSONB)), + # or_(DsRules.user_list.op('@>')(cast([f'{current_user.id}'], JSONB)), + # DsRules.user_list.op('@>')(cast([current_user.id], JSONB)))) + # ).first() + flag = False + for r in contain_rules: + p_list = json.loads(r.permission_list) + u_list = json.loads(r.user_list) + if p_list is not None and u_list is not None and permission.id in p_list and ( + current_user.id in u_list or f'{current_user.id}' in u_list): + flag = True + if flag: + permission_list = json.loads(permission.permissions) + fields = filter_list(fields, permission_list) + return fields + + +def is_normal_user(current_user: CurrentUser): + return current_user.id != 1 + + +def filter_list(list_a, list_b): + id_to_invalid = {} + for b in list_b: + if not b['enable']: + id_to_invalid[b['field_id']] = True + + return [a for a in list_a if not id_to_invalid.get(a.id, False)]