""" file: dingpan.py date: 2024-1112 访问钉钉的钉盘一个接口 """ import requests import json from datetime import datetime #获取access_token def get_dingtalk_access_token(app_key, app_secret): # 定义请求的URL url = 'https://api.dingtalk.com/v1.0/oauth2/accessToken' # 定义请求体的数据 payload = { "appKey": app_key, "appSecret": app_secret } # 设置请求头,指定内容类型为JSON headers = { 'Content-Type': 'application/json' } # 发送POST请求 response = requests.post(url, data=json.dumps(payload), headers=headers) # 检查响应状态码是否为200(HTTP OK) if response.status_code == 200: # 解析返回的JSON数据 response_data = response.json() # 检查是否有access_token字段 if 'accessToken' in response_data: return response_data['accessToken'] else: print('Error: Access token not found in the response.') return None else: print(f'Error: Received status code {response.status_code}') return None #获取用户信息 def get_user_details(access_token, userid, language='zh_CN'): # 定义请求的URL url = f'https://oapi.dingtalk.com/topapi/v2/user/get?access_token={access_token}' # 构建请求体的数据 payload = { "language": language, "userid": userid } # 设置请求头,指定内容类型为JSON headers = { 'Content-Type': 'application/json' } # 发送POST请求 response = requests.post(url, data=json.dumps(payload), headers=headers) # 检查响应状态码是否为200(HTTP OK) if response.status_code == 200: # 解析返回的JSON数据 response_data = response.json() # 检查是否有错误码 if response_data.get('errcode') == 0: return response_data['result'] else: print(f'Error: {response_data.get("errmsg")}') return None else: print(f'Error: Received status code {response.status_code}') return None #以获取指定用户在组织空间下的文件夹列表 def get_drive_spaces(access_token,union_id, space_type="org", next_token=None, max_results=50): # 定义请求的URL url = 'https://api.dingtalk.com/v1.0/drive/spaces' # 构建查询参数 params = { 'unionId': union_id, 'spaceType': space_type, 'maxResults': max_results } if next_token: params['nextToken'] = next_token # 设置请求头 headers = { 'x-acs-dingtalk-access-token': access_token, 'Content-Type': 'application/json' } # 发送GET请求 response = requests.get(url, params=params, headers=headers) # 检查响应状态码是否为200(HTTP OK) if response.status_code == 200: # 解析返回的JSON数据 response_data = response.json() return response_data["spaces"] else: print(f'Error: Received status code {response.status_code} {response.text}') return None #用户的人数 def get_user_count(access_token, only_active=False): # 定义请求的URL url = f'https://oapi.dingtalk.com/topapi/user/count?access_token={access_token}' # 构建请求体的数据 payload = { "only_active": str(only_active).lower() } # 设置请求头,指定内容类型为JSON headers = { 'Content-Type': 'application/json' } # 发送POST请求 response = requests.post(url, data=json.dumps(payload), headers=headers) # 检查响应状态码是否为200(HTTP OK) if response.status_code == 200: # 解析返回的JSON数据 response_data = response.json() # 检查是否有错误码 if response_data.get('errcode') == 0: return response_data['result']['count'] else: print(f'Error: {response_data.get("errmsg")}') return None else: print(f'Error: Received status code {response.status_code}') return None #获取用户的id列表, def get_user_list_id(access_token, dept_id): # 定义请求的URL url = f'https://oapi.dingtalk.com/topapi/user/listid?access_token={access_token}' # 构建请求体的数据 payload = { "dept_id": dept_id } # 设置请求头,指定内容类型为JSON headers = { 'Content-Type': 'application/json' } # 发送POST请求 response = requests.post(url, data=json.dumps(payload), headers=headers) # 检查响应状态码是否为200(HTTP OK) if response.status_code == 200: # 解析返回的JSON数据 response_data = response.json() # 检查是否有错误码 if response_data.get('errcode') == 0: return response_data['result']["userid_list"] else: print(f'Error: {response_data.get("errmsg")}') return None else: print(f'Error: Received status code {response.status_code}') return None #得到子部门的列表 def get_sub_departments(access_token, dept_id, language='zh_CN'): # 定义请求的URL url = f'https://oapi.dingtalk.com/topapi/v2/department/listsub?access_token={access_token}' # 构建请求体的数据 payload = { "language": language, "dept_id": dept_id } # 设置请求头,指定内容类型为JSON headers = { 'Content-Type': 'application/json' } # 发送POST请求 response = requests.post(url, data=json.dumps(payload), headers=headers) # 检查响应状态码是否为200(HTTP OK) if response.status_code == 200: # 解析返回的JSON数据 response_data = response.json() # 检查是否有错误码 if response_data.get('errcode') == 0: return response_data['result'] else: print(f'Error: {response_data.get("errmsg")}') return None else: print(f'Error: Received status code {response.status_code}') return None #得到组织内的管理员 def get_admin_users(access_token): # 定义请求的URL url = f'https://oapi.dingtalk.com/topapi/user/listadmin?access_token={access_token}' # 构建请求体的数据 payload = {} # 设置请求头,指定内容类型为JSON headers = { 'Content-Type': 'application/json' } # 发送POST请求 response = requests.post(url, data=json.dumps(payload), headers=headers) # 检查响应状态码是否为200(HTTP OK) if response.status_code == 200: # 解析返回的JSON数据 response_data = response.json() # 检查是否有错误码 if response_data.get('errcode') == 0: return response_data['result'] else: print(f'Error: {response_data.get("errmsg")}') return None else: print(f'Error: Received status code {response.status_code}') return None #所有文件和文件夹 """ 返回的文件信息 [ { "modifiedTime": "Fri Jun 21 15:43:42 CST 2024", "creatorId": "QksETWLqFiPBiSZ8oE1IcsqwiEiE", "modifierId": "QksETWLqFiPBiSZ8oE1IcsqwiEiE", "type": "FOLDER", "version": 1, "uuid": "7QG4Yx2JpL7eXAbrCz6m3Y4xJ9dEq3XD", "partitionType": "PUBLIC_OSS_PARTITION", "parentId": "143956017570", "spaceId": "24447524446", "path": "/分类分级/05用户手册", "createTime": "Fri Jun 21 15:43:42 CST 2024", "storageDriver": "DINGTALK", "name": "05用户手册", "id": "143956920642", "properties": { "readOnly": false }, "status": "NORMAL", "appProperties": {} } ] """ def list_all_dentries(space_id, union_id, access_token, next_token=None, max_results=1000, order="DESC", with_thumbnail=False): # 定义请求的URL url = f'https://api.dingtalk.com/v1.0/storage/spaces/{space_id}/dentries/listAll?unionId={union_id}' # 构建请求体的数据 payload = { "option": { "nextToken": next_token, "maxResults": max_results, "order": order, "withThumbnail": with_thumbnail } } # 设置请求头,指定内容类型为JSON headers = { 'x-acs-dingtalk-access-token': access_token, 'Content-Type': 'application/json' } # 发送POST请求 response = requests.post(url, data=json.dumps(payload), headers=headers) # 检查响应状态码是否为200(HTTP OK) if response.status_code == 200: # 解析返回的JSON数据 response_data = response.json() # 检查是否有错误码 if 'error' not in response_data and "nextToken" in response_data: return response_data["dentries"],response_data["nextToken"] elif 'error' not in response_data: #遍历完没有文件了 return response_data["dentries"],None else: print(f'Error: {response_data.get("error").get("message")}') return [],None else: print(f'Error: Received status code {response.status_code}') return [],None #等到下载文件的信息,可能是多个url,然后再调用url进行下载 """ { "protocol": "HEADER_SIGNATURE", "headerSignatureInfo": { "headers": { "Authorization": "OSS LTAIjmWpzHta71rc:/2tIMDsXGyehOirAzCHx1cI0C8o=", "x-oss-date": "Tue, 03 Jun 2025 06:47:27 GMT" }, "resourceUrls": [ "https://sh-dualstack.trans.dingtalk.com/yundisk0/iAEIAqRmaWxlA6h5dW5kaXNrMATOIVA04gXNE48GzQ3bB85mqzrtCM0CmQ.file" ], "expirationSeconds": 900, "internalResourceUrls": [ "lippi-space-sh.oss-cn-shanghai-internal.aliyuncs.com/yundisk0/iAEIAqRmaWxlA6h5dW5kaXNrMATOIVA04gXNE48GzQ3bB85mqzrtCM0CmQ.file" ], "region": "SHANGHAI" } } """ def get_download_info(space_id, dentry_id,union_id, access_token, version=None, prefer_intranet=False): # 定义请求的URL url = f'https://api.dingtalk.com/v1.0/storage/spaces/{space_id}/dentries/{dentry_id}/downloadInfos/query?unionId={union_id}' # 构建请求体的数据 payload = { "option": { "version": version, "preferIntranet": prefer_intranet } } # 设置请求头,指定内容类型为JSON headers = { 'x-acs-dingtalk-access-token': access_token, 'Content-Type': 'application/json' } # 发送POST请求 response = requests.post(url, data=json.dumps(payload), headers=headers) # 检查响应状态码是否为200(HTTP OK) if response.status_code == 200: # 解析返回的JSON数据 response_data = response.json() # 检查是否有错误码 if 'error' not in response_data: return response_data else: print(f'Error: {response_data.get("error").get("message")}') return None else: print(f'Error: Received status code {response.status_code},{response.text}') return None #等到下载文件的信息,可能是多个url,然后再调用url进行下载 def download_file(urls,headers,file=""): # 设置请求头,指定内容类型为JSON # headers = { # 'Authorization': header_auth, # "x-oss-date": oss_date # } # 发送POST请求 response_data=bytearray(b'') for url in urls: response = requests.get(url, headers=headers) # 检查响应状态码是否为200(HTTP OK) if response.status_code == 200: # 解析返回的JSON数据 response_data += response.content return response_data else: print(f'Error: Received status code {response.status_code},{response.text}') return None #end download_file #显示前50文件 def ding_list_top_50_files(app_key,app_secret,main_dir=None): access_token = get_dingtalk_access_token(app_key, app_secret) if access_token: print(f'Access Token: {access_token}') else: print('Failed to obtain access token.') admin_users = get_admin_users(access_token) user_details = get_user_details(access_token, admin_users[0]["userid"]) if user_details: print(json.dumps(user_details, indent=2, ensure_ascii=False)) else: print('Failed to retrieve user details.') spaces=[] spaces_info = get_drive_spaces(access_token,user_details["unionid"]) if spaces_info: print(json.dumps(spaces_info)) else: print('Failed to retrieve drive spaces information.') if main_dir: for space in spaces_info: if space["spaceName"] in main_dir.split(","): print("钉盘",space["spaceName"]) spaces.append(space) else: spaces = spaces_info print(spaces) files=[] #遍历目录 for space in spaces: nextToken = None dentries,nextToken = list_all_dentries(space["spaceId"], user_details["unionid"],access_token,nextToken) if dentries: #print(json.dumps(dentries, indent=2, ensure_ascii=False)) for dentrie in dentries: files.append({"path":dentrie["path"], "fname": dentrie["path"].split("/")[-1], "mtime": datetime.strptime(dentrie["modifiedTime"], "%a %b %d %H:%M:%S %Z %Y"), }) else: print('Failed to retrieve dentries.') print("总文件",len(files)) return files #入口函数 def ding_scan_directory(app_key,app_secret,main_dir=None): access_token = get_dingtalk_access_token(app_key, app_secret) if access_token: print(f'Access Token: {access_token}') else: print('Failed to 获得access_token.') admin_users = get_admin_users(access_token) user_details = get_user_details(access_token, admin_users[0]["userid"]) if user_details: print(json.dumps(user_details, indent=2, ensure_ascii=False)) else: print('Failed to 用户详细信息') spaces=[] spaces_info = get_drive_spaces(access_token,user_details["unionid"]) if spaces_info: print(json.dumps(spaces_info)) else: print('Failed to 获得钉盘信息') if main_dir: for space in spaces_info: if space["spaceName"] in main_dir.split(","): print("钉盘",space["spaceName"]) spaces.append(space) else: spaces = spaces_info print(spaces) files=[] #遍历目录 for space in spaces: nextToken = None while 1: dentries,nextToken = list_all_dentries(space["spaceId"], user_details["unionid"],access_token,nextToken) if dentries: #print(json.dumps(dentries, indent=2, ensure_ascii=False)) for dentrie in dentries: files.append({"path":dentrie["path"], "fname": dentrie["path"].split("/")[-1], "id":dentrie["id"], "mtime": datetime.strptime(dentrie["modifiedTime"], "%a %b %d %H:%M:%S %Z %Y"), "unionid": user_details["unionid"], "spaceId": space["spaceId"], "access_token": access_token }) else: print('Failed to retrieve dentries.') if nextToken==None: break print("总文件",len(files)) return files #单次下载文件 def ding_get_file_content_init(app_key,app_secret,spaceId,file_id): access_token = get_dingtalk_access_token(app_key, app_secret) if access_token: print(f'Access Token: {access_token}') else: print('Failed to obtain access token.') admin_users = get_admin_users(access_token) user_details = get_user_details(access_token, admin_users[0]["userid"]) if user_details: print(json.dumps(user_details, indent=2, ensure_ascii=False)) else: print('Failed to retrieve user details.') unionid = user_details["unionid"] return ding_get_file_content(spaceId,file_id,unionid,access_token) def ding_get_file_content(spaceId,file_id,unionid,access_token): download_info = get_download_info(spaceId,file_id,unionid, access_token) if download_info: return download_file(download_info["headerSignatureInfo"]["resourceUrls"],download_info["headerSignatureInfo"]["headers"]) else: print('Failed to retrieve download info.') return None if __name__=="__main__": # 示例调用 app_key = 'app_key' app_secret = 'app_secret' access_token = get_dingtalk_access_token(app_key, app_secret) if access_token: print(f'Access Token: {access_token}') else: print('Failed to obtain access token.') user_count = get_user_count(access_token, only_active=False) print("用户数",user_count) admin_users = get_admin_users(access_token) print("管理员",admin_users) # sub_departments = get_sub_departments(access_token, 1) # print("部门",sub_departments) # user_list_id = get_user_list_id(access_token, 1) # print("用户id",user_list_id) user_details = get_user_details(access_token, admin_users[0]["userid"]) if user_details: print(json.dumps(user_details, indent=2, ensure_ascii=False)) else: print('Failed to retrieve user details.') spaces_info = get_drive_spaces(access_token,user_details["unionid"]) print("钉盘") if spaces_info: print(json.dumps(spaces_info)) else: print('Failed to retrieve drive spaces information.') print("存储",spaces_info[0]["spaceName"]) #遍历目录 count=0 nextToken = None files=[] while 1: dentries,nextToken = list_all_dentries(spaces_info[0]["spaceId"], user_details["unionid"],access_token,nextToken) if dentries: print(json.dumps(dentries, indent=2, ensure_ascii=False)) count += len(dentries) files.extend(dentries) print("文件数量",count) else: print('Failed to retrieve dentries.') if nextToken==None: break for file in files: print(file["path"]) #更具id下载某一个文件 download_info = get_download_info(spaces_info[1]["spaceId"],file["id"], user_details["unionid"], access_token) if download_info: if len(download_info["headerSignatureInfo"]["resourceUrls"]) >1: print(json.dumps(download_info, indent=2, ensure_ascii=False)) #download_file(download_info["headerSignatureInfo"]["resourceUrls"],download_info["headerSignatureInfo"]["headers"]) else: print('Failed to retrieve download info.') print(json.dumps(user_details, indent=2, ensure_ascii=False))