Files
k3GPT/main/dingpan.py
2025-11-19 19:43:16 +08:00

608 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
file: dingpan.py
date: 2024-1112
访问钉钉的钉盘一个接口
"""
import requests
import json
from datetime import datetime
#获取access_token
def get_dingtalk_access_token(app_key, app_secret):
# 定义请求的URL
url = 'https://api.dingtalk.com/v1.0/oauth2/accessToken'
# 定义请求体的数据
payload = {
"appKey": app_key,
"appSecret": app_secret
}
# 设置请求头指定内容类型为JSON
headers = {
'Content-Type': 'application/json'
}
# 发送POST请求
response = requests.post(url, data=json.dumps(payload), headers=headers)
# 检查响应状态码是否为200HTTP OK
if response.status_code == 200:
# 解析返回的JSON数据
response_data = response.json()
# 检查是否有access_token字段
if 'accessToken' in response_data:
return response_data['accessToken']
else:
print('Error: Access token not found in the response.')
return None
else:
print(f'Error: Received status code {response.status_code}')
return None
#获取用户信息
def get_user_details(access_token, userid, language='zh_CN'):
# 定义请求的URL
url = f'https://oapi.dingtalk.com/topapi/v2/user/get?access_token={access_token}'
# 构建请求体的数据
payload = {
"language": language,
"userid": userid
}
# 设置请求头指定内容类型为JSON
headers = {
'Content-Type': 'application/json'
}
# 发送POST请求
response = requests.post(url, data=json.dumps(payload), headers=headers)
# 检查响应状态码是否为200HTTP OK
if response.status_code == 200:
# 解析返回的JSON数据
response_data = response.json()
# 检查是否有错误码
if response_data.get('errcode') == 0:
return response_data['result']
else:
print(f'Error: {response_data.get("errmsg")}')
return None
else:
print(f'Error: Received status code {response.status_code}')
return None
#以获取指定用户在组织空间下的文件夹列表
def get_drive_spaces(access_token,union_id, space_type="org", next_token=None, max_results=50):
# 定义请求的URL
url = 'https://api.dingtalk.com/v1.0/drive/spaces'
# 构建查询参数
params = {
'unionId': union_id,
'spaceType': space_type,
'maxResults': max_results
}
if next_token:
params['nextToken'] = next_token
# 设置请求头
headers = {
'x-acs-dingtalk-access-token': access_token,
'Content-Type': 'application/json'
}
# 发送GET请求
response = requests.get(url, params=params, headers=headers)
# 检查响应状态码是否为200HTTP OK
if response.status_code == 200:
# 解析返回的JSON数据
response_data = response.json()
return response_data["spaces"]
else:
print(f'Error: Received status code {response.status_code} {response.text}')
return None
#用户的人数
def get_user_count(access_token, only_active=False):
# 定义请求的URL
url = f'https://oapi.dingtalk.com/topapi/user/count?access_token={access_token}'
# 构建请求体的数据
payload = {
"only_active": str(only_active).lower()
}
# 设置请求头指定内容类型为JSON
headers = {
'Content-Type': 'application/json'
}
# 发送POST请求
response = requests.post(url, data=json.dumps(payload), headers=headers)
# 检查响应状态码是否为200HTTP OK
if response.status_code == 200:
# 解析返回的JSON数据
response_data = response.json()
# 检查是否有错误码
if response_data.get('errcode') == 0:
return response_data['result']['count']
else:
print(f'Error: {response_data.get("errmsg")}')
return None
else:
print(f'Error: Received status code {response.status_code}')
return None
#获取用户的id列表
def get_user_list_id(access_token, dept_id):
# 定义请求的URL
url = f'https://oapi.dingtalk.com/topapi/user/listid?access_token={access_token}'
# 构建请求体的数据
payload = {
"dept_id": dept_id
}
# 设置请求头指定内容类型为JSON
headers = {
'Content-Type': 'application/json'
}
# 发送POST请求
response = requests.post(url, data=json.dumps(payload), headers=headers)
# 检查响应状态码是否为200HTTP OK
if response.status_code == 200:
# 解析返回的JSON数据
response_data = response.json()
# 检查是否有错误码
if response_data.get('errcode') == 0:
return response_data['result']["userid_list"]
else:
print(f'Error: {response_data.get("errmsg")}')
return None
else:
print(f'Error: Received status code {response.status_code}')
return None
#得到子部门的列表
def get_sub_departments(access_token, dept_id, language='zh_CN'):
# 定义请求的URL
url = f'https://oapi.dingtalk.com/topapi/v2/department/listsub?access_token={access_token}'
# 构建请求体的数据
payload = {
"language": language,
"dept_id": dept_id
}
# 设置请求头指定内容类型为JSON
headers = {
'Content-Type': 'application/json'
}
# 发送POST请求
response = requests.post(url, data=json.dumps(payload), headers=headers)
# 检查响应状态码是否为200HTTP OK
if response.status_code == 200:
# 解析返回的JSON数据
response_data = response.json()
# 检查是否有错误码
if response_data.get('errcode') == 0:
return response_data['result']
else:
print(f'Error: {response_data.get("errmsg")}')
return None
else:
print(f'Error: Received status code {response.status_code}')
return None
#得到组织内的管理员
def get_admin_users(access_token):
# 定义请求的URL
url = f'https://oapi.dingtalk.com/topapi/user/listadmin?access_token={access_token}'
# 构建请求体的数据
payload = {}
# 设置请求头指定内容类型为JSON
headers = {
'Content-Type': 'application/json'
}
# 发送POST请求
response = requests.post(url, data=json.dumps(payload), headers=headers)
# 检查响应状态码是否为200HTTP OK
if response.status_code == 200:
# 解析返回的JSON数据
response_data = response.json()
# 检查是否有错误码
if response_data.get('errcode') == 0:
return response_data['result']
else:
print(f'Error: {response_data.get("errmsg")}')
return None
else:
print(f'Error: Received status code {response.status_code}')
return None
#所有文件和文件夹
"""
返回的文件信息
[
{
"modifiedTime": "Fri Jun 21 15:43:42 CST 2024",
"creatorId": "QksETWLqFiPBiSZ8oE1IcsqwiEiE",
"modifierId": "QksETWLqFiPBiSZ8oE1IcsqwiEiE",
"type": "FOLDER",
"version": 1,
"uuid": "7QG4Yx2JpL7eXAbrCz6m3Y4xJ9dEq3XD",
"partitionType": "PUBLIC_OSS_PARTITION",
"parentId": "143956017570",
"spaceId": "24447524446",
"path": "/分类分级/05用户手册",
"createTime": "Fri Jun 21 15:43:42 CST 2024",
"storageDriver": "DINGTALK",
"name": "05用户手册",
"id": "143956920642",
"properties": {
"readOnly": false
},
"status": "NORMAL",
"appProperties": {}
}
]
"""
def list_all_dentries(space_id, union_id, access_token, next_token=None, max_results=1000, order="DESC", with_thumbnail=False):
# 定义请求的URL
url = f'https://api.dingtalk.com/v1.0/storage/spaces/{space_id}/dentries/listAll?unionId={union_id}'
# 构建请求体的数据
payload = {
"option": {
"nextToken": next_token,
"maxResults": max_results,
"order": order,
"withThumbnail": with_thumbnail
}
}
# 设置请求头指定内容类型为JSON
headers = {
'x-acs-dingtalk-access-token': access_token,
'Content-Type': 'application/json'
}
# 发送POST请求
response = requests.post(url, data=json.dumps(payload), headers=headers)
# 检查响应状态码是否为200HTTP OK
if response.status_code == 200:
# 解析返回的JSON数据
response_data = response.json()
# 检查是否有错误码
if 'error' not in response_data and "nextToken" in response_data:
return response_data["dentries"],response_data["nextToken"]
elif 'error' not in response_data:
#遍历完没有文件了
return response_data["dentries"],None
else:
print(f'Error: {response_data.get("error").get("message")}')
return [],None
else:
print(f'Error: Received status code {response.status_code}')
return [],None
#等到下载文件的信息可能是多个url,然后再调用url进行下载
"""
{
"protocol": "HEADER_SIGNATURE",
"headerSignatureInfo": {
"headers": {
"Authorization": "OSS LTAIjmWpzHta71rc:/2tIMDsXGyehOirAzCHx1cI0C8o=",
"x-oss-date": "Tue, 03 Jun 2025 06:47:27 GMT"
},
"resourceUrls": [
"https://sh-dualstack.trans.dingtalk.com/yundisk0/iAEIAqRmaWxlA6h5dW5kaXNrMATOIVA04gXNE48GzQ3bB85mqzrtCM0CmQ.file"
],
"expirationSeconds": 900,
"internalResourceUrls": [
"lippi-space-sh.oss-cn-shanghai-internal.aliyuncs.com/yundisk0/iAEIAqRmaWxlA6h5dW5kaXNrMATOIVA04gXNE48GzQ3bB85mqzrtCM0CmQ.file"
],
"region": "SHANGHAI"
}
}
"""
def get_download_info(space_id, dentry_id,union_id, access_token, version=None, prefer_intranet=False):
# 定义请求的URL
url = f'https://api.dingtalk.com/v1.0/storage/spaces/{space_id}/dentries/{dentry_id}/downloadInfos/query?unionId={union_id}'
# 构建请求体的数据
payload = {
"option": {
"version": version,
"preferIntranet": prefer_intranet
}
}
# 设置请求头指定内容类型为JSON
headers = {
'x-acs-dingtalk-access-token': access_token,
'Content-Type': 'application/json'
}
# 发送POST请求
response = requests.post(url, data=json.dumps(payload), headers=headers)
# 检查响应状态码是否为200HTTP OK
if response.status_code == 200:
# 解析返回的JSON数据
response_data = response.json()
# 检查是否有错误码
if 'error' not in response_data:
return response_data
else:
print(f'Error: {response_data.get("error").get("message")}')
return None
else:
print(f'Error: Received status code {response.status_code},{response.text}')
return None
#等到下载文件的信息可能是多个url,然后再调用url进行下载
def download_file(urls,headers,file=""):
# 设置请求头指定内容类型为JSON
# headers = {
# 'Authorization': header_auth,
# "x-oss-date": oss_date
# }
# 发送POST请求
response_data=bytearray(b'')
for url in urls:
response = requests.get(url, headers=headers)
# 检查响应状态码是否为200HTTP OK
if response.status_code == 200:
# 解析返回的JSON数据
response_data += response.content
return response_data
else:
print(f'Error: Received status code {response.status_code},{response.text}')
return None
#end download_file
#显示前50文件
def ding_list_top_50_files(app_key,app_secret,main_dir=None):
access_token = get_dingtalk_access_token(app_key, app_secret)
if access_token:
print(f'Access Token: {access_token}')
else:
print('Failed to obtain access token.')
admin_users = get_admin_users(access_token)
user_details = get_user_details(access_token, admin_users[0]["userid"])
if user_details:
print(json.dumps(user_details, indent=2, ensure_ascii=False))
else:
print('Failed to retrieve user details.')
spaces=[]
spaces_info = get_drive_spaces(access_token,user_details["unionid"])
if spaces_info:
print(json.dumps(spaces_info))
else:
print('Failed to retrieve drive spaces information.')
if main_dir:
for space in spaces_info:
if space["spaceName"] in main_dir.split(","):
print("钉盘",space["spaceName"])
spaces.append(space)
else:
spaces = spaces_info
print(spaces)
files=[]
#遍历目录
for space in spaces:
nextToken = None
dentries,nextToken = list_all_dentries(space["spaceId"], user_details["unionid"],access_token,nextToken)
if dentries:
#print(json.dumps(dentries, indent=2, ensure_ascii=False))
for dentrie in dentries:
files.append({"path":dentrie["path"],
"fname": dentrie["path"].split("/")[-1],
"mtime": datetime.strptime(dentrie["modifiedTime"], "%a %b %d %H:%M:%S %Z %Y"),
})
else:
print('Failed to retrieve dentries.')
print("总文件",len(files))
return files
#入口函数
def ding_scan_directory(app_key,app_secret,main_dir=None):
access_token = get_dingtalk_access_token(app_key, app_secret)
if access_token:
print(f'Access Token: {access_token}')
else:
print('Failed to 获得access_token.')
admin_users = get_admin_users(access_token)
user_details = get_user_details(access_token, admin_users[0]["userid"])
if user_details:
print(json.dumps(user_details, indent=2, ensure_ascii=False))
else:
print('Failed to 用户详细信息')
spaces=[]
spaces_info = get_drive_spaces(access_token,user_details["unionid"])
if spaces_info:
print(json.dumps(spaces_info))
else:
print('Failed to 获得钉盘信息')
if main_dir:
for space in spaces_info:
if space["spaceName"] in main_dir.split(","):
print("钉盘",space["spaceName"])
spaces.append(space)
else:
spaces = spaces_info
print(spaces)
files=[]
#遍历目录
for space in spaces:
nextToken = None
while 1:
dentries,nextToken = list_all_dentries(space["spaceId"], user_details["unionid"],access_token,nextToken)
if dentries:
#print(json.dumps(dentries, indent=2, ensure_ascii=False))
for dentrie in dentries:
files.append({"path":dentrie["path"],
"fname": dentrie["path"].split("/")[-1],
"id":dentrie["id"],
"mtime": datetime.strptime(dentrie["modifiedTime"], "%a %b %d %H:%M:%S %Z %Y"),
"unionid": user_details["unionid"],
"spaceId": space["spaceId"],
"access_token": access_token
})
else:
print('Failed to retrieve dentries.')
if nextToken==None:
break
print("总文件",len(files))
return files
#单次下载文件
def ding_get_file_content_init(app_key,app_secret,spaceId,file_id):
access_token = get_dingtalk_access_token(app_key, app_secret)
if access_token:
print(f'Access Token: {access_token}')
else:
print('Failed to obtain access token.')
admin_users = get_admin_users(access_token)
user_details = get_user_details(access_token, admin_users[0]["userid"])
if user_details:
print(json.dumps(user_details, indent=2, ensure_ascii=False))
else:
print('Failed to retrieve user details.')
unionid = user_details["unionid"]
return ding_get_file_content(spaceId,file_id,unionid,access_token)
def ding_get_file_content(spaceId,file_id,unionid,access_token):
download_info = get_download_info(spaceId,file_id,unionid, access_token)
if download_info:
return download_file(download_info["headerSignatureInfo"]["resourceUrls"],download_info["headerSignatureInfo"]["headers"])
else:
print('Failed to retrieve download info.')
return None
if __name__=="__main__":
# 示例调用
app_key = 'app_key'
app_secret = 'app_secret'
access_token = get_dingtalk_access_token(app_key, app_secret)
if access_token:
print(f'Access Token: {access_token}')
else:
print('Failed to obtain access token.')
user_count = get_user_count(access_token, only_active=False)
print("用户数",user_count)
admin_users = get_admin_users(access_token)
print("管理员",admin_users)
# sub_departments = get_sub_departments(access_token, 1)
# print("部门",sub_departments)
# user_list_id = get_user_list_id(access_token, 1)
# print("用户id",user_list_id)
user_details = get_user_details(access_token, admin_users[0]["userid"])
if user_details:
print(json.dumps(user_details, indent=2, ensure_ascii=False))
else:
print('Failed to retrieve user details.')
spaces_info = get_drive_spaces(access_token,user_details["unionid"])
print("钉盘")
if spaces_info:
print(json.dumps(spaces_info))
else:
print('Failed to retrieve drive spaces information.')
print("存储",spaces_info[0]["spaceName"])
#遍历目录
count=0
nextToken = None
files=[]
while 1:
dentries,nextToken = list_all_dentries(spaces_info[0]["spaceId"], user_details["unionid"],access_token,nextToken)
if dentries:
print(json.dumps(dentries, indent=2, ensure_ascii=False))
count += len(dentries)
files.extend(dentries)
print("文件数量",count)
else:
print('Failed to retrieve dentries.')
if nextToken==None:
break
for file in files:
print(file["path"])
#更具id下载某一个文件
download_info = get_download_info(spaces_info[1]["spaceId"],file["id"], user_details["unionid"], access_token)
if download_info:
if len(download_info["headerSignatureInfo"]["resourceUrls"]) >1:
print(json.dumps(download_info, indent=2, ensure_ascii=False))
#download_file(download_info["headerSignatureInfo"]["resourceUrls"],download_info["headerSignatureInfo"]["headers"])
else:
print('Failed to retrieve download info.')
print(json.dumps(user_details, indent=2, ensure_ascii=False))