Files
k3GPT/main/dingpan.py

608 lines
19 KiB
Python
Raw Permalink Normal View History

2025-11-19 19:43:16 +08:00
"""
file: dingpan.py
date: 2024-1112
访问钉钉的钉盘一个接口
"""
import requests
import json
from datetime import datetime
#获取access_token
def get_dingtalk_access_token(app_key, app_secret):
# 定义请求的URL
url = 'https://api.dingtalk.com/v1.0/oauth2/accessToken'
# 定义请求体的数据
payload = {
"appKey": app_key,
"appSecret": app_secret
}
# 设置请求头指定内容类型为JSON
headers = {
'Content-Type': 'application/json'
}
# 发送POST请求
response = requests.post(url, data=json.dumps(payload), headers=headers)
# 检查响应状态码是否为200HTTP OK
if response.status_code == 200:
# 解析返回的JSON数据
response_data = response.json()
# 检查是否有access_token字段
if 'accessToken' in response_data:
return response_data['accessToken']
else:
print('Error: Access token not found in the response.')
return None
else:
print(f'Error: Received status code {response.status_code}')
return None
#获取用户信息
def get_user_details(access_token, userid, language='zh_CN'):
# 定义请求的URL
url = f'https://oapi.dingtalk.com/topapi/v2/user/get?access_token={access_token}'
# 构建请求体的数据
payload = {
"language": language,
"userid": userid
}
# 设置请求头指定内容类型为JSON
headers = {
'Content-Type': 'application/json'
}
# 发送POST请求
response = requests.post(url, data=json.dumps(payload), headers=headers)
# 检查响应状态码是否为200HTTP OK
if response.status_code == 200:
# 解析返回的JSON数据
response_data = response.json()
# 检查是否有错误码
if response_data.get('errcode') == 0:
return response_data['result']
else:
print(f'Error: {response_data.get("errmsg")}')
return None
else:
print(f'Error: Received status code {response.status_code}')
return None
#以获取指定用户在组织空间下的文件夹列表
def get_drive_spaces(access_token,union_id, space_type="org", next_token=None, max_results=50):
# 定义请求的URL
url = 'https://api.dingtalk.com/v1.0/drive/spaces'
# 构建查询参数
params = {
'unionId': union_id,
'spaceType': space_type,
'maxResults': max_results
}
if next_token:
params['nextToken'] = next_token
# 设置请求头
headers = {
'x-acs-dingtalk-access-token': access_token,
'Content-Type': 'application/json'
}
# 发送GET请求
response = requests.get(url, params=params, headers=headers)
# 检查响应状态码是否为200HTTP OK
if response.status_code == 200:
# 解析返回的JSON数据
response_data = response.json()
return response_data["spaces"]
else:
print(f'Error: Received status code {response.status_code} {response.text}')
return None
#用户的人数
def get_user_count(access_token, only_active=False):
# 定义请求的URL
url = f'https://oapi.dingtalk.com/topapi/user/count?access_token={access_token}'
# 构建请求体的数据
payload = {
"only_active": str(only_active).lower()
}
# 设置请求头指定内容类型为JSON
headers = {
'Content-Type': 'application/json'
}
# 发送POST请求
response = requests.post(url, data=json.dumps(payload), headers=headers)
# 检查响应状态码是否为200HTTP OK
if response.status_code == 200:
# 解析返回的JSON数据
response_data = response.json()
# 检查是否有错误码
if response_data.get('errcode') == 0:
return response_data['result']['count']
else:
print(f'Error: {response_data.get("errmsg")}')
return None
else:
print(f'Error: Received status code {response.status_code}')
return None
#获取用户的id列表
def get_user_list_id(access_token, dept_id):
# 定义请求的URL
url = f'https://oapi.dingtalk.com/topapi/user/listid?access_token={access_token}'
# 构建请求体的数据
payload = {
"dept_id": dept_id
}
# 设置请求头指定内容类型为JSON
headers = {
'Content-Type': 'application/json'
}
# 发送POST请求
response = requests.post(url, data=json.dumps(payload), headers=headers)
# 检查响应状态码是否为200HTTP OK
if response.status_code == 200:
# 解析返回的JSON数据
response_data = response.json()
# 检查是否有错误码
if response_data.get('errcode') == 0:
return response_data['result']["userid_list"]
else:
print(f'Error: {response_data.get("errmsg")}')
return None
else:
print(f'Error: Received status code {response.status_code}')
return None
#得到子部门的列表
def get_sub_departments(access_token, dept_id, language='zh_CN'):
# 定义请求的URL
url = f'https://oapi.dingtalk.com/topapi/v2/department/listsub?access_token={access_token}'
# 构建请求体的数据
payload = {
"language": language,
"dept_id": dept_id
}
# 设置请求头指定内容类型为JSON
headers = {
'Content-Type': 'application/json'
}
# 发送POST请求
response = requests.post(url, data=json.dumps(payload), headers=headers)
# 检查响应状态码是否为200HTTP OK
if response.status_code == 200:
# 解析返回的JSON数据
response_data = response.json()
# 检查是否有错误码
if response_data.get('errcode') == 0:
return response_data['result']
else:
print(f'Error: {response_data.get("errmsg")}')
return None
else:
print(f'Error: Received status code {response.status_code}')
return None
#得到组织内的管理员
def get_admin_users(access_token):
# 定义请求的URL
url = f'https://oapi.dingtalk.com/topapi/user/listadmin?access_token={access_token}'
# 构建请求体的数据
payload = {}
# 设置请求头指定内容类型为JSON
headers = {
'Content-Type': 'application/json'
}
# 发送POST请求
response = requests.post(url, data=json.dumps(payload), headers=headers)
# 检查响应状态码是否为200HTTP OK
if response.status_code == 200:
# 解析返回的JSON数据
response_data = response.json()
# 检查是否有错误码
if response_data.get('errcode') == 0:
return response_data['result']
else:
print(f'Error: {response_data.get("errmsg")}')
return None
else:
print(f'Error: Received status code {response.status_code}')
return None
#所有文件和文件夹
"""
返回的文件信息
[
{
"modifiedTime": "Fri Jun 21 15:43:42 CST 2024",
"creatorId": "QksETWLqFiPBiSZ8oE1IcsqwiEiE",
"modifierId": "QksETWLqFiPBiSZ8oE1IcsqwiEiE",
"type": "FOLDER",
"version": 1,
"uuid": "7QG4Yx2JpL7eXAbrCz6m3Y4xJ9dEq3XD",
"partitionType": "PUBLIC_OSS_PARTITION",
"parentId": "143956017570",
"spaceId": "24447524446",
"path": "/分类分级/05用户手册",
"createTime": "Fri Jun 21 15:43:42 CST 2024",
"storageDriver": "DINGTALK",
"name": "05用户手册",
"id": "143956920642",
"properties": {
"readOnly": false
},
"status": "NORMAL",
"appProperties": {}
}
]
"""
def list_all_dentries(space_id, union_id, access_token, next_token=None, max_results=1000, order="DESC", with_thumbnail=False):
# 定义请求的URL
url = f'https://api.dingtalk.com/v1.0/storage/spaces/{space_id}/dentries/listAll?unionId={union_id}'
# 构建请求体的数据
payload = {
"option": {
"nextToken": next_token,
"maxResults": max_results,
"order": order,
"withThumbnail": with_thumbnail
}
}
# 设置请求头指定内容类型为JSON
headers = {
'x-acs-dingtalk-access-token': access_token,
'Content-Type': 'application/json'
}
# 发送POST请求
response = requests.post(url, data=json.dumps(payload), headers=headers)
# 检查响应状态码是否为200HTTP OK
if response.status_code == 200:
# 解析返回的JSON数据
response_data = response.json()
# 检查是否有错误码
if 'error' not in response_data and "nextToken" in response_data:
return response_data["dentries"],response_data["nextToken"]
elif 'error' not in response_data:
#遍历完没有文件了
return response_data["dentries"],None
else:
print(f'Error: {response_data.get("error").get("message")}')
return [],None
else:
print(f'Error: Received status code {response.status_code}')
return [],None
#等到下载文件的信息可能是多个url,然后再调用url进行下载
"""
{
"protocol": "HEADER_SIGNATURE",
"headerSignatureInfo": {
"headers": {
"Authorization": "OSS LTAIjmWpzHta71rc:/2tIMDsXGyehOirAzCHx1cI0C8o=",
"x-oss-date": "Tue, 03 Jun 2025 06:47:27 GMT"
},
"resourceUrls": [
"https://sh-dualstack.trans.dingtalk.com/yundisk0/iAEIAqRmaWxlA6h5dW5kaXNrMATOIVA04gXNE48GzQ3bB85mqzrtCM0CmQ.file"
],
"expirationSeconds": 900,
"internalResourceUrls": [
"lippi-space-sh.oss-cn-shanghai-internal.aliyuncs.com/yundisk0/iAEIAqRmaWxlA6h5dW5kaXNrMATOIVA04gXNE48GzQ3bB85mqzrtCM0CmQ.file"
],
"region": "SHANGHAI"
}
}
"""
def get_download_info(space_id, dentry_id,union_id, access_token, version=None, prefer_intranet=False):
# 定义请求的URL
url = f'https://api.dingtalk.com/v1.0/storage/spaces/{space_id}/dentries/{dentry_id}/downloadInfos/query?unionId={union_id}'
# 构建请求体的数据
payload = {
"option": {
"version": version,
"preferIntranet": prefer_intranet
}
}
# 设置请求头指定内容类型为JSON
headers = {
'x-acs-dingtalk-access-token': access_token,
'Content-Type': 'application/json'
}
# 发送POST请求
response = requests.post(url, data=json.dumps(payload), headers=headers)
# 检查响应状态码是否为200HTTP OK
if response.status_code == 200:
# 解析返回的JSON数据
response_data = response.json()
# 检查是否有错误码
if 'error' not in response_data:
return response_data
else:
print(f'Error: {response_data.get("error").get("message")}')
return None
else:
print(f'Error: Received status code {response.status_code},{response.text}')
return None
#等到下载文件的信息可能是多个url,然后再调用url进行下载
def download_file(urls,headers,file=""):
# 设置请求头指定内容类型为JSON
# headers = {
# 'Authorization': header_auth,
# "x-oss-date": oss_date
# }
# 发送POST请求
response_data=bytearray(b'')
for url in urls:
response = requests.get(url, headers=headers)
# 检查响应状态码是否为200HTTP OK
if response.status_code == 200:
# 解析返回的JSON数据
response_data += response.content
return response_data
else:
print(f'Error: Received status code {response.status_code},{response.text}')
return None
#end download_file
#显示前50文件
def ding_list_top_50_files(app_key,app_secret,main_dir=None):
access_token = get_dingtalk_access_token(app_key, app_secret)
if access_token:
print(f'Access Token: {access_token}')
else:
print('Failed to obtain access token.')
admin_users = get_admin_users(access_token)
user_details = get_user_details(access_token, admin_users[0]["userid"])
if user_details:
print(json.dumps(user_details, indent=2, ensure_ascii=False))
else:
print('Failed to retrieve user details.')
spaces=[]
spaces_info = get_drive_spaces(access_token,user_details["unionid"])
if spaces_info:
print(json.dumps(spaces_info))
else:
print('Failed to retrieve drive spaces information.')
if main_dir:
for space in spaces_info:
if space["spaceName"] in main_dir.split(","):
print("钉盘",space["spaceName"])
spaces.append(space)
else:
spaces = spaces_info
print(spaces)
files=[]
#遍历目录
for space in spaces:
nextToken = None
dentries,nextToken = list_all_dentries(space["spaceId"], user_details["unionid"],access_token,nextToken)
if dentries:
#print(json.dumps(dentries, indent=2, ensure_ascii=False))
for dentrie in dentries:
files.append({"path":dentrie["path"],
"fname": dentrie["path"].split("/")[-1],
"mtime": datetime.strptime(dentrie["modifiedTime"], "%a %b %d %H:%M:%S %Z %Y"),
})
else:
print('Failed to retrieve dentries.')
print("总文件",len(files))
return files
#入口函数
def ding_scan_directory(app_key,app_secret,main_dir=None):
access_token = get_dingtalk_access_token(app_key, app_secret)
if access_token:
print(f'Access Token: {access_token}')
else:
print('Failed to 获得access_token.')
admin_users = get_admin_users(access_token)
user_details = get_user_details(access_token, admin_users[0]["userid"])
if user_details:
print(json.dumps(user_details, indent=2, ensure_ascii=False))
else:
print('Failed to 用户详细信息')
spaces=[]
spaces_info = get_drive_spaces(access_token,user_details["unionid"])
if spaces_info:
print(json.dumps(spaces_info))
else:
print('Failed to 获得钉盘信息')
if main_dir:
for space in spaces_info:
if space["spaceName"] in main_dir.split(","):
print("钉盘",space["spaceName"])
spaces.append(space)
else:
spaces = spaces_info
print(spaces)
files=[]
#遍历目录
for space in spaces:
nextToken = None
while 1:
dentries,nextToken = list_all_dentries(space["spaceId"], user_details["unionid"],access_token,nextToken)
if dentries:
#print(json.dumps(dentries, indent=2, ensure_ascii=False))
for dentrie in dentries:
files.append({"path":dentrie["path"],
"fname": dentrie["path"].split("/")[-1],
"id":dentrie["id"],
"mtime": datetime.strptime(dentrie["modifiedTime"], "%a %b %d %H:%M:%S %Z %Y"),
"unionid": user_details["unionid"],
"spaceId": space["spaceId"],
"access_token": access_token
})
else:
print('Failed to retrieve dentries.')
if nextToken==None:
break
print("总文件",len(files))
return files
#单次下载文件
def ding_get_file_content_init(app_key,app_secret,spaceId,file_id):
access_token = get_dingtalk_access_token(app_key, app_secret)
if access_token:
print(f'Access Token: {access_token}')
else:
print('Failed to obtain access token.')
admin_users = get_admin_users(access_token)
user_details = get_user_details(access_token, admin_users[0]["userid"])
if user_details:
print(json.dumps(user_details, indent=2, ensure_ascii=False))
else:
print('Failed to retrieve user details.')
unionid = user_details["unionid"]
return ding_get_file_content(spaceId,file_id,unionid,access_token)
def ding_get_file_content(spaceId,file_id,unionid,access_token):
download_info = get_download_info(spaceId,file_id,unionid, access_token)
if download_info:
return download_file(download_info["headerSignatureInfo"]["resourceUrls"],download_info["headerSignatureInfo"]["headers"])
else:
print('Failed to retrieve download info.')
return None
if __name__=="__main__":
# 示例调用
app_key = 'app_key'
app_secret = 'app_secret'
access_token = get_dingtalk_access_token(app_key, app_secret)
if access_token:
print(f'Access Token: {access_token}')
else:
print('Failed to obtain access token.')
user_count = get_user_count(access_token, only_active=False)
print("用户数",user_count)
admin_users = get_admin_users(access_token)
print("管理员",admin_users)
# sub_departments = get_sub_departments(access_token, 1)
# print("部门",sub_departments)
# user_list_id = get_user_list_id(access_token, 1)
# print("用户id",user_list_id)
user_details = get_user_details(access_token, admin_users[0]["userid"])
if user_details:
print(json.dumps(user_details, indent=2, ensure_ascii=False))
else:
print('Failed to retrieve user details.')
spaces_info = get_drive_spaces(access_token,user_details["unionid"])
print("钉盘")
if spaces_info:
print(json.dumps(spaces_info))
else:
print('Failed to retrieve drive spaces information.')
print("存储",spaces_info[0]["spaceName"])
#遍历目录
count=0
nextToken = None
files=[]
while 1:
dentries,nextToken = list_all_dentries(spaces_info[0]["spaceId"], user_details["unionid"],access_token,nextToken)
if dentries:
print(json.dumps(dentries, indent=2, ensure_ascii=False))
count += len(dentries)
files.extend(dentries)
print("文件数量",count)
else:
print('Failed to retrieve dentries.')
if nextToken==None:
break
for file in files:
print(file["path"])
#更具id下载某一个文件
download_info = get_download_info(spaces_info[1]["spaceId"],file["id"], user_details["unionid"], access_token)
if download_info:
if len(download_info["headerSignatureInfo"]["resourceUrls"]) >1:
print(json.dumps(download_info, indent=2, ensure_ascii=False))
#download_file(download_info["headerSignatureInfo"]["resourceUrls"],download_info["headerSignatureInfo"]["headers"])
else:
print('Failed to retrieve download info.')
print(json.dumps(user_details, indent=2, ensure_ascii=False))