diff --git a/main/dingpan.py b/main/dingpan.py new file mode 100644 index 0000000..c89fb75 --- /dev/null +++ b/main/dingpan.py @@ -0,0 +1,608 @@ +""" +file: dingpan.py +date: 2024-1112 + +访问钉钉的钉盘一个接口 + +""" + +import requests +import json +from datetime import datetime + +#获取access_token +def get_dingtalk_access_token(app_key, app_secret): + # 定义请求的URL + url = 'https://api.dingtalk.com/v1.0/oauth2/accessToken' + + # 定义请求体的数据 + payload = { + "appKey": app_key, + "appSecret": app_secret + } + + # 设置请求头,指定内容类型为JSON + headers = { + 'Content-Type': 'application/json' + } + + # 发送POST请求 + response = requests.post(url, data=json.dumps(payload), headers=headers) + + # 检查响应状态码是否为200(HTTP OK) + if response.status_code == 200: + # 解析返回的JSON数据 + response_data = response.json() + # 检查是否有access_token字段 + if 'accessToken' in response_data: + return response_data['accessToken'] + else: + print('Error: Access token not found in the response.') + return None + else: + print(f'Error: Received status code {response.status_code}') + return None + + +#获取用户信息 +def get_user_details(access_token, userid, language='zh_CN'): + # 定义请求的URL + url = f'https://oapi.dingtalk.com/topapi/v2/user/get?access_token={access_token}' + + # 构建请求体的数据 + payload = { + "language": language, + "userid": userid + } + + # 设置请求头,指定内容类型为JSON + headers = { + 'Content-Type': 'application/json' + } + + # 发送POST请求 + response = requests.post(url, data=json.dumps(payload), headers=headers) + + # 检查响应状态码是否为200(HTTP OK) + if response.status_code == 200: + # 解析返回的JSON数据 + response_data = response.json() + # 检查是否有错误码 + if response_data.get('errcode') == 0: + return response_data['result'] + else: + print(f'Error: {response_data.get("errmsg")}') + return None + else: + print(f'Error: Received status code {response.status_code}') + return None + +#以获取指定用户在组织空间下的文件夹列表 +def get_drive_spaces(access_token,union_id, space_type="org", next_token=None, max_results=50): + # 定义请求的URL + url = 'https://api.dingtalk.com/v1.0/drive/spaces' + + # 构建查询参数 + params = { + 'unionId': union_id, + 'spaceType': space_type, + 'maxResults': max_results + } + if next_token: + params['nextToken'] = next_token + + # 设置请求头 + headers = { + 'x-acs-dingtalk-access-token': access_token, + 'Content-Type': 'application/json' + } + + # 发送GET请求 + response = requests.get(url, params=params, headers=headers) + + # 检查响应状态码是否为200(HTTP OK) + if response.status_code == 200: + # 解析返回的JSON数据 + response_data = response.json() + return response_data["spaces"] + else: + print(f'Error: Received status code {response.status_code} {response.text}') + return None + +#用户的人数 +def get_user_count(access_token, only_active=False): + # 定义请求的URL + url = f'https://oapi.dingtalk.com/topapi/user/count?access_token={access_token}' + + # 构建请求体的数据 + payload = { + "only_active": str(only_active).lower() + } + + # 设置请求头,指定内容类型为JSON + headers = { + 'Content-Type': 'application/json' + } + + # 发送POST请求 + response = requests.post(url, data=json.dumps(payload), headers=headers) + + # 检查响应状态码是否为200(HTTP OK) + if response.status_code == 200: + # 解析返回的JSON数据 + response_data = response.json() + # 检查是否有错误码 + if response_data.get('errcode') == 0: + return response_data['result']['count'] + else: + print(f'Error: {response_data.get("errmsg")}') + return None + else: + print(f'Error: Received status code {response.status_code}') + return None + +#获取用户的id列表, +def get_user_list_id(access_token, dept_id): + # 定义请求的URL + url = f'https://oapi.dingtalk.com/topapi/user/listid?access_token={access_token}' + + # 构建请求体的数据 + payload = { + "dept_id": dept_id + } + + # 设置请求头,指定内容类型为JSON + headers = { + 'Content-Type': 'application/json' + } + + # 发送POST请求 + response = requests.post(url, data=json.dumps(payload), headers=headers) + + # 检查响应状态码是否为200(HTTP OK) + if response.status_code == 200: + # 解析返回的JSON数据 + response_data = response.json() + # 检查是否有错误码 + if response_data.get('errcode') == 0: + return response_data['result']["userid_list"] + else: + print(f'Error: {response_data.get("errmsg")}') + return None + else: + print(f'Error: Received status code {response.status_code}') + return None + +#得到子部门的列表 +def get_sub_departments(access_token, dept_id, language='zh_CN'): + # 定义请求的URL + url = f'https://oapi.dingtalk.com/topapi/v2/department/listsub?access_token={access_token}' + + # 构建请求体的数据 + payload = { + "language": language, + "dept_id": dept_id + } + + # 设置请求头,指定内容类型为JSON + headers = { + 'Content-Type': 'application/json' + } + + # 发送POST请求 + response = requests.post(url, data=json.dumps(payload), headers=headers) + + # 检查响应状态码是否为200(HTTP OK) + if response.status_code == 200: + # 解析返回的JSON数据 + response_data = response.json() + # 检查是否有错误码 + if response_data.get('errcode') == 0: + return response_data['result'] + else: + print(f'Error: {response_data.get("errmsg")}') + return None + else: + print(f'Error: Received status code {response.status_code}') + return None + + +#得到组织内的管理员 +def get_admin_users(access_token): + # 定义请求的URL + url = f'https://oapi.dingtalk.com/topapi/user/listadmin?access_token={access_token}' + + # 构建请求体的数据 + payload = {} + + # 设置请求头,指定内容类型为JSON + headers = { + 'Content-Type': 'application/json' + } + + # 发送POST请求 + response = requests.post(url, data=json.dumps(payload), headers=headers) + + # 检查响应状态码是否为200(HTTP OK) + if response.status_code == 200: + # 解析返回的JSON数据 + response_data = response.json() + # 检查是否有错误码 + if response_data.get('errcode') == 0: + return response_data['result'] + else: + print(f'Error: {response_data.get("errmsg")}') + return None + else: + print(f'Error: Received status code {response.status_code}') + return None + + +#所有文件和文件夹 +""" +返回的文件信息 +[ +{ + "modifiedTime": "Fri Jun 21 15:43:42 CST 2024", + "creatorId": "QksETWLqFiPBiSZ8oE1IcsqwiEiE", + "modifierId": "QksETWLqFiPBiSZ8oE1IcsqwiEiE", + "type": "FOLDER", + "version": 1, + "uuid": "7QG4Yx2JpL7eXAbrCz6m3Y4xJ9dEq3XD", + "partitionType": "PUBLIC_OSS_PARTITION", + "parentId": "143956017570", + "spaceId": "24447524446", + "path": "/分类分级/05用户手册", + "createTime": "Fri Jun 21 15:43:42 CST 2024", + "storageDriver": "DINGTALK", + "name": "05用户手册", + "id": "143956920642", + "properties": { + "readOnly": false + }, + "status": "NORMAL", + "appProperties": {} + } +] +""" +def list_all_dentries(space_id, union_id, access_token, next_token=None, max_results=1000, order="DESC", with_thumbnail=False): + # 定义请求的URL + url = f'https://api.dingtalk.com/v1.0/storage/spaces/{space_id}/dentries/listAll?unionId={union_id}' + + # 构建请求体的数据 + payload = { + "option": { + "nextToken": next_token, + "maxResults": max_results, + "order": order, + "withThumbnail": with_thumbnail + } + } + + # 设置请求头,指定内容类型为JSON + headers = { + 'x-acs-dingtalk-access-token': access_token, + 'Content-Type': 'application/json' + } + + # 发送POST请求 + response = requests.post(url, data=json.dumps(payload), headers=headers) + + # 检查响应状态码是否为200(HTTP OK) + if response.status_code == 200: + # 解析返回的JSON数据 + response_data = response.json() + # 检查是否有错误码 + if 'error' not in response_data and "nextToken" in response_data: + return response_data["dentries"],response_data["nextToken"] + elif 'error' not in response_data: + #遍历完没有文件了 + return response_data["dentries"],None + else: + print(f'Error: {response_data.get("error").get("message")}') + return [],None + else: + print(f'Error: Received status code {response.status_code}') + return [],None + + +#等到下载文件的信息,可能是多个url,然后再调用url进行下载 +""" +{ + "protocol": "HEADER_SIGNATURE", + "headerSignatureInfo": { + "headers": { + "Authorization": "OSS LTAIjmWpzHta71rc:/2tIMDsXGyehOirAzCHx1cI0C8o=", + "x-oss-date": "Tue, 03 Jun 2025 06:47:27 GMT" + }, + "resourceUrls": [ + "https://sh-dualstack.trans.dingtalk.com/yundisk0/iAEIAqRmaWxlA6h5dW5kaXNrMATOIVA04gXNE48GzQ3bB85mqzrtCM0CmQ.file" + ], + "expirationSeconds": 900, + "internalResourceUrls": [ + "lippi-space-sh.oss-cn-shanghai-internal.aliyuncs.com/yundisk0/iAEIAqRmaWxlA6h5dW5kaXNrMATOIVA04gXNE48GzQ3bB85mqzrtCM0CmQ.file" + ], + "region": "SHANGHAI" + } +} +""" +def get_download_info(space_id, dentry_id,union_id, access_token, version=None, prefer_intranet=False): + # 定义请求的URL + url = f'https://api.dingtalk.com/v1.0/storage/spaces/{space_id}/dentries/{dentry_id}/downloadInfos/query?unionId={union_id}' + + # 构建请求体的数据 + payload = { + "option": { + "version": version, + "preferIntranet": prefer_intranet + } + } + + # 设置请求头,指定内容类型为JSON + headers = { + 'x-acs-dingtalk-access-token': access_token, + 'Content-Type': 'application/json' + } + + # 发送POST请求 + response = requests.post(url, data=json.dumps(payload), headers=headers) + + # 检查响应状态码是否为200(HTTP OK) + if response.status_code == 200: + # 解析返回的JSON数据 + response_data = response.json() + # 检查是否有错误码 + if 'error' not in response_data: + return response_data + else: + print(f'Error: {response_data.get("error").get("message")}') + return None + else: + print(f'Error: Received status code {response.status_code},{response.text}') + return None + + +#等到下载文件的信息,可能是多个url,然后再调用url进行下载 +def download_file(urls,headers,file=""): + + # 设置请求头,指定内容类型为JSON + # headers = { + # 'Authorization': header_auth, + # "x-oss-date": oss_date + # } + + # 发送POST请求 + response_data=bytearray(b'') + for url in urls: + response = requests.get(url, headers=headers) + + # 检查响应状态码是否为200(HTTP OK) + if response.status_code == 200: + # 解析返回的JSON数据 + response_data += response.content + return response_data + else: + print(f'Error: Received status code {response.status_code},{response.text}') + return None +#end download_file + + +#显示前50文件 +def ding_list_top_50_files(app_key,app_secret,main_dir=None): + access_token = get_dingtalk_access_token(app_key, app_secret) + + if access_token: + print(f'Access Token: {access_token}') + else: + print('Failed to obtain access token.') + admin_users = get_admin_users(access_token) + + + user_details = get_user_details(access_token, admin_users[0]["userid"]) + + if user_details: + print(json.dumps(user_details, indent=2, ensure_ascii=False)) + else: + print('Failed to retrieve user details.') + + + spaces=[] + spaces_info = get_drive_spaces(access_token,user_details["unionid"]) + + if spaces_info: + print(json.dumps(spaces_info)) + else: + print('Failed to retrieve drive spaces information.') + + if main_dir: + for space in spaces_info: + if space["spaceName"] in main_dir.split(","): + print("钉盘",space["spaceName"]) + spaces.append(space) + else: + spaces = spaces_info + + print(spaces) + + files=[] + #遍历目录 + for space in spaces: + nextToken = None + + dentries,nextToken = list_all_dentries(space["spaceId"], user_details["unionid"],access_token,nextToken) + + if dentries: + #print(json.dumps(dentries, indent=2, ensure_ascii=False)) + for dentrie in dentries: + files.append({"path":dentrie["path"], + "fname": dentrie["path"].split("/")[-1], + "mtime": datetime.strptime(dentrie["modifiedTime"], "%a %b %d %H:%M:%S %Z %Y"), + }) + else: + print('Failed to retrieve dentries.') + print("总文件",len(files)) + return files + +#入口函数 +def ding_scan_directory(app_key,app_secret,main_dir=None): + access_token = get_dingtalk_access_token(app_key, app_secret) + + if access_token: + print(f'Access Token: {access_token}') + else: + print('Failed to 获得access_token.') + admin_users = get_admin_users(access_token) + user_details = get_user_details(access_token, admin_users[0]["userid"]) + + if user_details: + print(json.dumps(user_details, indent=2, ensure_ascii=False)) + else: + print('Failed to 用户详细信息') + + spaces=[] + spaces_info = get_drive_spaces(access_token,user_details["unionid"]) + + if spaces_info: + print(json.dumps(spaces_info)) + else: + print('Failed to 获得钉盘信息') + + if main_dir: + for space in spaces_info: + if space["spaceName"] in main_dir.split(","): + print("钉盘",space["spaceName"]) + spaces.append(space) + else: + spaces = spaces_info + + print(spaces) + + files=[] + #遍历目录 + for space in spaces: + nextToken = None + while 1: + dentries,nextToken = list_all_dentries(space["spaceId"], user_details["unionid"],access_token,nextToken) + + if dentries: + #print(json.dumps(dentries, indent=2, ensure_ascii=False)) + for dentrie in dentries: + files.append({"path":dentrie["path"], + "fname": dentrie["path"].split("/")[-1], + "id":dentrie["id"], + "mtime": datetime.strptime(dentrie["modifiedTime"], "%a %b %d %H:%M:%S %Z %Y"), + "unionid": user_details["unionid"], + "spaceId": space["spaceId"], + "access_token": access_token + }) + else: + print('Failed to retrieve dentries.') + if nextToken==None: + break + print("总文件",len(files)) + return files + +#单次下载文件 +def ding_get_file_content_init(app_key,app_secret,spaceId,file_id): + access_token = get_dingtalk_access_token(app_key, app_secret) + + if access_token: + print(f'Access Token: {access_token}') + else: + print('Failed to obtain access token.') + admin_users = get_admin_users(access_token) + + + user_details = get_user_details(access_token, admin_users[0]["userid"]) + + if user_details: + print(json.dumps(user_details, indent=2, ensure_ascii=False)) + else: + print('Failed to retrieve user details.') + unionid = user_details["unionid"] + + return ding_get_file_content(spaceId,file_id,unionid,access_token) + + +def ding_get_file_content(spaceId,file_id,unionid,access_token): + download_info = get_download_info(spaceId,file_id,unionid, access_token) + if download_info: + + return download_file(download_info["headerSignatureInfo"]["resourceUrls"],download_info["headerSignatureInfo"]["headers"]) + else: + print('Failed to retrieve download info.') + return None + + + + +if __name__=="__main__": + # 示例调用 + app_key = 'app_key' + app_secret = 'app_secret' + access_token = get_dingtalk_access_token(app_key, app_secret) + + if access_token: + print(f'Access Token: {access_token}') + else: + print('Failed to obtain access token.') + + user_count = get_user_count(access_token, only_active=False) + print("用户数",user_count) + + admin_users = get_admin_users(access_token) + print("管理员",admin_users) + + # sub_departments = get_sub_departments(access_token, 1) + # print("部门",sub_departments) + + # user_list_id = get_user_list_id(access_token, 1) + # print("用户id",user_list_id) + + + user_details = get_user_details(access_token, admin_users[0]["userid"]) + + if user_details: + print(json.dumps(user_details, indent=2, ensure_ascii=False)) + else: + print('Failed to retrieve user details.') + + spaces_info = get_drive_spaces(access_token,user_details["unionid"]) + print("钉盘") + if spaces_info: + print(json.dumps(spaces_info)) + else: + print('Failed to retrieve drive spaces information.') + + print("存储",spaces_info[0]["spaceName"]) + + + #遍历目录 + count=0 + nextToken = None + files=[] + while 1: + dentries,nextToken = list_all_dentries(spaces_info[0]["spaceId"], user_details["unionid"],access_token,nextToken) + + if dentries: + print(json.dumps(dentries, indent=2, ensure_ascii=False)) + count += len(dentries) + files.extend(dentries) + print("文件数量",count) + else: + print('Failed to retrieve dentries.') + if nextToken==None: + break + + for file in files: + print(file["path"]) + #更具id下载某一个文件 + download_info = get_download_info(spaces_info[1]["spaceId"],file["id"], user_details["unionid"], access_token) + if download_info: + if len(download_info["headerSignatureInfo"]["resourceUrls"]) >1: + print(json.dumps(download_info, indent=2, ensure_ascii=False)) + #download_file(download_info["headerSignatureInfo"]["resourceUrls"],download_info["headerSignatureInfo"]["headers"]) + else: + print('Failed to retrieve download info.') + + print(json.dumps(user_details, indent=2, ensure_ascii=False)) \ No newline at end of file