Add File
This commit is contained in:
608
main/dingpan.py
Normal file
608
main/dingpan.py
Normal file
@@ -0,0 +1,608 @@
|
||||
"""
|
||||
file: dingpan.py
|
||||
date: 2024-1112
|
||||
|
||||
访问钉钉的钉盘一个接口
|
||||
|
||||
"""
|
||||
|
||||
import requests
|
||||
import json
|
||||
from datetime import datetime
|
||||
|
||||
#获取access_token
|
||||
def get_dingtalk_access_token(app_key, app_secret):
|
||||
# 定义请求的URL
|
||||
url = 'https://api.dingtalk.com/v1.0/oauth2/accessToken'
|
||||
|
||||
# 定义请求体的数据
|
||||
payload = {
|
||||
"appKey": app_key,
|
||||
"appSecret": app_secret
|
||||
}
|
||||
|
||||
# 设置请求头,指定内容类型为JSON
|
||||
headers = {
|
||||
'Content-Type': 'application/json'
|
||||
}
|
||||
|
||||
# 发送POST请求
|
||||
response = requests.post(url, data=json.dumps(payload), headers=headers)
|
||||
|
||||
# 检查响应状态码是否为200(HTTP OK)
|
||||
if response.status_code == 200:
|
||||
# 解析返回的JSON数据
|
||||
response_data = response.json()
|
||||
# 检查是否有access_token字段
|
||||
if 'accessToken' in response_data:
|
||||
return response_data['accessToken']
|
||||
else:
|
||||
print('Error: Access token not found in the response.')
|
||||
return None
|
||||
else:
|
||||
print(f'Error: Received status code {response.status_code}')
|
||||
return None
|
||||
|
||||
|
||||
#获取用户信息
|
||||
def get_user_details(access_token, userid, language='zh_CN'):
|
||||
# 定义请求的URL
|
||||
url = f'https://oapi.dingtalk.com/topapi/v2/user/get?access_token={access_token}'
|
||||
|
||||
# 构建请求体的数据
|
||||
payload = {
|
||||
"language": language,
|
||||
"userid": userid
|
||||
}
|
||||
|
||||
# 设置请求头,指定内容类型为JSON
|
||||
headers = {
|
||||
'Content-Type': 'application/json'
|
||||
}
|
||||
|
||||
# 发送POST请求
|
||||
response = requests.post(url, data=json.dumps(payload), headers=headers)
|
||||
|
||||
# 检查响应状态码是否为200(HTTP OK)
|
||||
if response.status_code == 200:
|
||||
# 解析返回的JSON数据
|
||||
response_data = response.json()
|
||||
# 检查是否有错误码
|
||||
if response_data.get('errcode') == 0:
|
||||
return response_data['result']
|
||||
else:
|
||||
print(f'Error: {response_data.get("errmsg")}')
|
||||
return None
|
||||
else:
|
||||
print(f'Error: Received status code {response.status_code}')
|
||||
return None
|
||||
|
||||
#以获取指定用户在组织空间下的文件夹列表
|
||||
def get_drive_spaces(access_token,union_id, space_type="org", next_token=None, max_results=50):
|
||||
# 定义请求的URL
|
||||
url = 'https://api.dingtalk.com/v1.0/drive/spaces'
|
||||
|
||||
# 构建查询参数
|
||||
params = {
|
||||
'unionId': union_id,
|
||||
'spaceType': space_type,
|
||||
'maxResults': max_results
|
||||
}
|
||||
if next_token:
|
||||
params['nextToken'] = next_token
|
||||
|
||||
# 设置请求头
|
||||
headers = {
|
||||
'x-acs-dingtalk-access-token': access_token,
|
||||
'Content-Type': 'application/json'
|
||||
}
|
||||
|
||||
# 发送GET请求
|
||||
response = requests.get(url, params=params, headers=headers)
|
||||
|
||||
# 检查响应状态码是否为200(HTTP OK)
|
||||
if response.status_code == 200:
|
||||
# 解析返回的JSON数据
|
||||
response_data = response.json()
|
||||
return response_data["spaces"]
|
||||
else:
|
||||
print(f'Error: Received status code {response.status_code} {response.text}')
|
||||
return None
|
||||
|
||||
#用户的人数
|
||||
def get_user_count(access_token, only_active=False):
|
||||
# 定义请求的URL
|
||||
url = f'https://oapi.dingtalk.com/topapi/user/count?access_token={access_token}'
|
||||
|
||||
# 构建请求体的数据
|
||||
payload = {
|
||||
"only_active": str(only_active).lower()
|
||||
}
|
||||
|
||||
# 设置请求头,指定内容类型为JSON
|
||||
headers = {
|
||||
'Content-Type': 'application/json'
|
||||
}
|
||||
|
||||
# 发送POST请求
|
||||
response = requests.post(url, data=json.dumps(payload), headers=headers)
|
||||
|
||||
# 检查响应状态码是否为200(HTTP OK)
|
||||
if response.status_code == 200:
|
||||
# 解析返回的JSON数据
|
||||
response_data = response.json()
|
||||
# 检查是否有错误码
|
||||
if response_data.get('errcode') == 0:
|
||||
return response_data['result']['count']
|
||||
else:
|
||||
print(f'Error: {response_data.get("errmsg")}')
|
||||
return None
|
||||
else:
|
||||
print(f'Error: Received status code {response.status_code}')
|
||||
return None
|
||||
|
||||
#获取用户的id列表,
|
||||
def get_user_list_id(access_token, dept_id):
|
||||
# 定义请求的URL
|
||||
url = f'https://oapi.dingtalk.com/topapi/user/listid?access_token={access_token}'
|
||||
|
||||
# 构建请求体的数据
|
||||
payload = {
|
||||
"dept_id": dept_id
|
||||
}
|
||||
|
||||
# 设置请求头,指定内容类型为JSON
|
||||
headers = {
|
||||
'Content-Type': 'application/json'
|
||||
}
|
||||
|
||||
# 发送POST请求
|
||||
response = requests.post(url, data=json.dumps(payload), headers=headers)
|
||||
|
||||
# 检查响应状态码是否为200(HTTP OK)
|
||||
if response.status_code == 200:
|
||||
# 解析返回的JSON数据
|
||||
response_data = response.json()
|
||||
# 检查是否有错误码
|
||||
if response_data.get('errcode') == 0:
|
||||
return response_data['result']["userid_list"]
|
||||
else:
|
||||
print(f'Error: {response_data.get("errmsg")}')
|
||||
return None
|
||||
else:
|
||||
print(f'Error: Received status code {response.status_code}')
|
||||
return None
|
||||
|
||||
#得到子部门的列表
|
||||
def get_sub_departments(access_token, dept_id, language='zh_CN'):
|
||||
# 定义请求的URL
|
||||
url = f'https://oapi.dingtalk.com/topapi/v2/department/listsub?access_token={access_token}'
|
||||
|
||||
# 构建请求体的数据
|
||||
payload = {
|
||||
"language": language,
|
||||
"dept_id": dept_id
|
||||
}
|
||||
|
||||
# 设置请求头,指定内容类型为JSON
|
||||
headers = {
|
||||
'Content-Type': 'application/json'
|
||||
}
|
||||
|
||||
# 发送POST请求
|
||||
response = requests.post(url, data=json.dumps(payload), headers=headers)
|
||||
|
||||
# 检查响应状态码是否为200(HTTP OK)
|
||||
if response.status_code == 200:
|
||||
# 解析返回的JSON数据
|
||||
response_data = response.json()
|
||||
# 检查是否有错误码
|
||||
if response_data.get('errcode') == 0:
|
||||
return response_data['result']
|
||||
else:
|
||||
print(f'Error: {response_data.get("errmsg")}')
|
||||
return None
|
||||
else:
|
||||
print(f'Error: Received status code {response.status_code}')
|
||||
return None
|
||||
|
||||
|
||||
#得到组织内的管理员
|
||||
def get_admin_users(access_token):
|
||||
# 定义请求的URL
|
||||
url = f'https://oapi.dingtalk.com/topapi/user/listadmin?access_token={access_token}'
|
||||
|
||||
# 构建请求体的数据
|
||||
payload = {}
|
||||
|
||||
# 设置请求头,指定内容类型为JSON
|
||||
headers = {
|
||||
'Content-Type': 'application/json'
|
||||
}
|
||||
|
||||
# 发送POST请求
|
||||
response = requests.post(url, data=json.dumps(payload), headers=headers)
|
||||
|
||||
# 检查响应状态码是否为200(HTTP OK)
|
||||
if response.status_code == 200:
|
||||
# 解析返回的JSON数据
|
||||
response_data = response.json()
|
||||
# 检查是否有错误码
|
||||
if response_data.get('errcode') == 0:
|
||||
return response_data['result']
|
||||
else:
|
||||
print(f'Error: {response_data.get("errmsg")}')
|
||||
return None
|
||||
else:
|
||||
print(f'Error: Received status code {response.status_code}')
|
||||
return None
|
||||
|
||||
|
||||
#所有文件和文件夹
|
||||
"""
|
||||
返回的文件信息
|
||||
[
|
||||
{
|
||||
"modifiedTime": "Fri Jun 21 15:43:42 CST 2024",
|
||||
"creatorId": "QksETWLqFiPBiSZ8oE1IcsqwiEiE",
|
||||
"modifierId": "QksETWLqFiPBiSZ8oE1IcsqwiEiE",
|
||||
"type": "FOLDER",
|
||||
"version": 1,
|
||||
"uuid": "7QG4Yx2JpL7eXAbrCz6m3Y4xJ9dEq3XD",
|
||||
"partitionType": "PUBLIC_OSS_PARTITION",
|
||||
"parentId": "143956017570",
|
||||
"spaceId": "24447524446",
|
||||
"path": "/分类分级/05用户手册",
|
||||
"createTime": "Fri Jun 21 15:43:42 CST 2024",
|
||||
"storageDriver": "DINGTALK",
|
||||
"name": "05用户手册",
|
||||
"id": "143956920642",
|
||||
"properties": {
|
||||
"readOnly": false
|
||||
},
|
||||
"status": "NORMAL",
|
||||
"appProperties": {}
|
||||
}
|
||||
]
|
||||
"""
|
||||
def list_all_dentries(space_id, union_id, access_token, next_token=None, max_results=1000, order="DESC", with_thumbnail=False):
|
||||
# 定义请求的URL
|
||||
url = f'https://api.dingtalk.com/v1.0/storage/spaces/{space_id}/dentries/listAll?unionId={union_id}'
|
||||
|
||||
# 构建请求体的数据
|
||||
payload = {
|
||||
"option": {
|
||||
"nextToken": next_token,
|
||||
"maxResults": max_results,
|
||||
"order": order,
|
||||
"withThumbnail": with_thumbnail
|
||||
}
|
||||
}
|
||||
|
||||
# 设置请求头,指定内容类型为JSON
|
||||
headers = {
|
||||
'x-acs-dingtalk-access-token': access_token,
|
||||
'Content-Type': 'application/json'
|
||||
}
|
||||
|
||||
# 发送POST请求
|
||||
response = requests.post(url, data=json.dumps(payload), headers=headers)
|
||||
|
||||
# 检查响应状态码是否为200(HTTP OK)
|
||||
if response.status_code == 200:
|
||||
# 解析返回的JSON数据
|
||||
response_data = response.json()
|
||||
# 检查是否有错误码
|
||||
if 'error' not in response_data and "nextToken" in response_data:
|
||||
return response_data["dentries"],response_data["nextToken"]
|
||||
elif 'error' not in response_data:
|
||||
#遍历完没有文件了
|
||||
return response_data["dentries"],None
|
||||
else:
|
||||
print(f'Error: {response_data.get("error").get("message")}')
|
||||
return [],None
|
||||
else:
|
||||
print(f'Error: Received status code {response.status_code}')
|
||||
return [],None
|
||||
|
||||
|
||||
#等到下载文件的信息,可能是多个url,然后再调用url进行下载
|
||||
"""
|
||||
{
|
||||
"protocol": "HEADER_SIGNATURE",
|
||||
"headerSignatureInfo": {
|
||||
"headers": {
|
||||
"Authorization": "OSS LTAIjmWpzHta71rc:/2tIMDsXGyehOirAzCHx1cI0C8o=",
|
||||
"x-oss-date": "Tue, 03 Jun 2025 06:47:27 GMT"
|
||||
},
|
||||
"resourceUrls": [
|
||||
"https://sh-dualstack.trans.dingtalk.com/yundisk0/iAEIAqRmaWxlA6h5dW5kaXNrMATOIVA04gXNE48GzQ3bB85mqzrtCM0CmQ.file"
|
||||
],
|
||||
"expirationSeconds": 900,
|
||||
"internalResourceUrls": [
|
||||
"lippi-space-sh.oss-cn-shanghai-internal.aliyuncs.com/yundisk0/iAEIAqRmaWxlA6h5dW5kaXNrMATOIVA04gXNE48GzQ3bB85mqzrtCM0CmQ.file"
|
||||
],
|
||||
"region": "SHANGHAI"
|
||||
}
|
||||
}
|
||||
"""
|
||||
def get_download_info(space_id, dentry_id,union_id, access_token, version=None, prefer_intranet=False):
|
||||
# 定义请求的URL
|
||||
url = f'https://api.dingtalk.com/v1.0/storage/spaces/{space_id}/dentries/{dentry_id}/downloadInfos/query?unionId={union_id}'
|
||||
|
||||
# 构建请求体的数据
|
||||
payload = {
|
||||
"option": {
|
||||
"version": version,
|
||||
"preferIntranet": prefer_intranet
|
||||
}
|
||||
}
|
||||
|
||||
# 设置请求头,指定内容类型为JSON
|
||||
headers = {
|
||||
'x-acs-dingtalk-access-token': access_token,
|
||||
'Content-Type': 'application/json'
|
||||
}
|
||||
|
||||
# 发送POST请求
|
||||
response = requests.post(url, data=json.dumps(payload), headers=headers)
|
||||
|
||||
# 检查响应状态码是否为200(HTTP OK)
|
||||
if response.status_code == 200:
|
||||
# 解析返回的JSON数据
|
||||
response_data = response.json()
|
||||
# 检查是否有错误码
|
||||
if 'error' not in response_data:
|
||||
return response_data
|
||||
else:
|
||||
print(f'Error: {response_data.get("error").get("message")}')
|
||||
return None
|
||||
else:
|
||||
print(f'Error: Received status code {response.status_code},{response.text}')
|
||||
return None
|
||||
|
||||
|
||||
#等到下载文件的信息,可能是多个url,然后再调用url进行下载
|
||||
def download_file(urls,headers,file=""):
|
||||
|
||||
# 设置请求头,指定内容类型为JSON
|
||||
# headers = {
|
||||
# 'Authorization': header_auth,
|
||||
# "x-oss-date": oss_date
|
||||
# }
|
||||
|
||||
# 发送POST请求
|
||||
response_data=bytearray(b'')
|
||||
for url in urls:
|
||||
response = requests.get(url, headers=headers)
|
||||
|
||||
# 检查响应状态码是否为200(HTTP OK)
|
||||
if response.status_code == 200:
|
||||
# 解析返回的JSON数据
|
||||
response_data += response.content
|
||||
return response_data
|
||||
else:
|
||||
print(f'Error: Received status code {response.status_code},{response.text}')
|
||||
return None
|
||||
#end download_file
|
||||
|
||||
|
||||
#显示前50文件
|
||||
def ding_list_top_50_files(app_key,app_secret,main_dir=None):
|
||||
access_token = get_dingtalk_access_token(app_key, app_secret)
|
||||
|
||||
if access_token:
|
||||
print(f'Access Token: {access_token}')
|
||||
else:
|
||||
print('Failed to obtain access token.')
|
||||
admin_users = get_admin_users(access_token)
|
||||
|
||||
|
||||
user_details = get_user_details(access_token, admin_users[0]["userid"])
|
||||
|
||||
if user_details:
|
||||
print(json.dumps(user_details, indent=2, ensure_ascii=False))
|
||||
else:
|
||||
print('Failed to retrieve user details.')
|
||||
|
||||
|
||||
spaces=[]
|
||||
spaces_info = get_drive_spaces(access_token,user_details["unionid"])
|
||||
|
||||
if spaces_info:
|
||||
print(json.dumps(spaces_info))
|
||||
else:
|
||||
print('Failed to retrieve drive spaces information.')
|
||||
|
||||
if main_dir:
|
||||
for space in spaces_info:
|
||||
if space["spaceName"] in main_dir.split(","):
|
||||
print("钉盘",space["spaceName"])
|
||||
spaces.append(space)
|
||||
else:
|
||||
spaces = spaces_info
|
||||
|
||||
print(spaces)
|
||||
|
||||
files=[]
|
||||
#遍历目录
|
||||
for space in spaces:
|
||||
nextToken = None
|
||||
|
||||
dentries,nextToken = list_all_dentries(space["spaceId"], user_details["unionid"],access_token,nextToken)
|
||||
|
||||
if dentries:
|
||||
#print(json.dumps(dentries, indent=2, ensure_ascii=False))
|
||||
for dentrie in dentries:
|
||||
files.append({"path":dentrie["path"],
|
||||
"fname": dentrie["path"].split("/")[-1],
|
||||
"mtime": datetime.strptime(dentrie["modifiedTime"], "%a %b %d %H:%M:%S %Z %Y"),
|
||||
})
|
||||
else:
|
||||
print('Failed to retrieve dentries.')
|
||||
print("总文件",len(files))
|
||||
return files
|
||||
|
||||
#入口函数
|
||||
def ding_scan_directory(app_key,app_secret,main_dir=None):
|
||||
access_token = get_dingtalk_access_token(app_key, app_secret)
|
||||
|
||||
if access_token:
|
||||
print(f'Access Token: {access_token}')
|
||||
else:
|
||||
print('Failed to 获得access_token.')
|
||||
admin_users = get_admin_users(access_token)
|
||||
user_details = get_user_details(access_token, admin_users[0]["userid"])
|
||||
|
||||
if user_details:
|
||||
print(json.dumps(user_details, indent=2, ensure_ascii=False))
|
||||
else:
|
||||
print('Failed to 用户详细信息')
|
||||
|
||||
spaces=[]
|
||||
spaces_info = get_drive_spaces(access_token,user_details["unionid"])
|
||||
|
||||
if spaces_info:
|
||||
print(json.dumps(spaces_info))
|
||||
else:
|
||||
print('Failed to 获得钉盘信息')
|
||||
|
||||
if main_dir:
|
||||
for space in spaces_info:
|
||||
if space["spaceName"] in main_dir.split(","):
|
||||
print("钉盘",space["spaceName"])
|
||||
spaces.append(space)
|
||||
else:
|
||||
spaces = spaces_info
|
||||
|
||||
print(spaces)
|
||||
|
||||
files=[]
|
||||
#遍历目录
|
||||
for space in spaces:
|
||||
nextToken = None
|
||||
while 1:
|
||||
dentries,nextToken = list_all_dentries(space["spaceId"], user_details["unionid"],access_token,nextToken)
|
||||
|
||||
if dentries:
|
||||
#print(json.dumps(dentries, indent=2, ensure_ascii=False))
|
||||
for dentrie in dentries:
|
||||
files.append({"path":dentrie["path"],
|
||||
"fname": dentrie["path"].split("/")[-1],
|
||||
"id":dentrie["id"],
|
||||
"mtime": datetime.strptime(dentrie["modifiedTime"], "%a %b %d %H:%M:%S %Z %Y"),
|
||||
"unionid": user_details["unionid"],
|
||||
"spaceId": space["spaceId"],
|
||||
"access_token": access_token
|
||||
})
|
||||
else:
|
||||
print('Failed to retrieve dentries.')
|
||||
if nextToken==None:
|
||||
break
|
||||
print("总文件",len(files))
|
||||
return files
|
||||
|
||||
#单次下载文件
|
||||
def ding_get_file_content_init(app_key,app_secret,spaceId,file_id):
|
||||
access_token = get_dingtalk_access_token(app_key, app_secret)
|
||||
|
||||
if access_token:
|
||||
print(f'Access Token: {access_token}')
|
||||
else:
|
||||
print('Failed to obtain access token.')
|
||||
admin_users = get_admin_users(access_token)
|
||||
|
||||
|
||||
user_details = get_user_details(access_token, admin_users[0]["userid"])
|
||||
|
||||
if user_details:
|
||||
print(json.dumps(user_details, indent=2, ensure_ascii=False))
|
||||
else:
|
||||
print('Failed to retrieve user details.')
|
||||
unionid = user_details["unionid"]
|
||||
|
||||
return ding_get_file_content(spaceId,file_id,unionid,access_token)
|
||||
|
||||
|
||||
def ding_get_file_content(spaceId,file_id,unionid,access_token):
|
||||
download_info = get_download_info(spaceId,file_id,unionid, access_token)
|
||||
if download_info:
|
||||
|
||||
return download_file(download_info["headerSignatureInfo"]["resourceUrls"],download_info["headerSignatureInfo"]["headers"])
|
||||
else:
|
||||
print('Failed to retrieve download info.')
|
||||
return None
|
||||
|
||||
|
||||
|
||||
|
||||
if __name__=="__main__":
|
||||
# 示例调用
|
||||
app_key = 'app_key'
|
||||
app_secret = 'app_secret'
|
||||
access_token = get_dingtalk_access_token(app_key, app_secret)
|
||||
|
||||
if access_token:
|
||||
print(f'Access Token: {access_token}')
|
||||
else:
|
||||
print('Failed to obtain access token.')
|
||||
|
||||
user_count = get_user_count(access_token, only_active=False)
|
||||
print("用户数",user_count)
|
||||
|
||||
admin_users = get_admin_users(access_token)
|
||||
print("管理员",admin_users)
|
||||
|
||||
# sub_departments = get_sub_departments(access_token, 1)
|
||||
# print("部门",sub_departments)
|
||||
|
||||
# user_list_id = get_user_list_id(access_token, 1)
|
||||
# print("用户id",user_list_id)
|
||||
|
||||
|
||||
user_details = get_user_details(access_token, admin_users[0]["userid"])
|
||||
|
||||
if user_details:
|
||||
print(json.dumps(user_details, indent=2, ensure_ascii=False))
|
||||
else:
|
||||
print('Failed to retrieve user details.')
|
||||
|
||||
spaces_info = get_drive_spaces(access_token,user_details["unionid"])
|
||||
print("钉盘")
|
||||
if spaces_info:
|
||||
print(json.dumps(spaces_info))
|
||||
else:
|
||||
print('Failed to retrieve drive spaces information.')
|
||||
|
||||
print("存储",spaces_info[0]["spaceName"])
|
||||
|
||||
|
||||
#遍历目录
|
||||
count=0
|
||||
nextToken = None
|
||||
files=[]
|
||||
while 1:
|
||||
dentries,nextToken = list_all_dentries(spaces_info[0]["spaceId"], user_details["unionid"],access_token,nextToken)
|
||||
|
||||
if dentries:
|
||||
print(json.dumps(dentries, indent=2, ensure_ascii=False))
|
||||
count += len(dentries)
|
||||
files.extend(dentries)
|
||||
print("文件数量",count)
|
||||
else:
|
||||
print('Failed to retrieve dentries.')
|
||||
if nextToken==None:
|
||||
break
|
||||
|
||||
for file in files:
|
||||
print(file["path"])
|
||||
#更具id下载某一个文件
|
||||
download_info = get_download_info(spaces_info[1]["spaceId"],file["id"], user_details["unionid"], access_token)
|
||||
if download_info:
|
||||
if len(download_info["headerSignatureInfo"]["resourceUrls"]) >1:
|
||||
print(json.dumps(download_info, indent=2, ensure_ascii=False))
|
||||
#download_file(download_info["headerSignatureInfo"]["resourceUrls"],download_info["headerSignatureInfo"]["headers"])
|
||||
else:
|
||||
print('Failed to retrieve download info.')
|
||||
|
||||
print(json.dumps(user_details, indent=2, ensure_ascii=False))
|
||||
Reference in New Issue
Block a user