diff --git a/main/dingtalk.py b/main/dingtalk.py new file mode 100644 index 0000000..0a73f29 --- /dev/null +++ b/main/dingtalk.py @@ -0,0 +1,179 @@ +import requests +""" +钉钉扫码登录的用户认证过程 +1. 扫码认证 +2. 在钉钉客户端,免登录认证 + +""" + +################ 1.扫码认证 ################################## + +def get_user_access_token(app_key, app_secret, auth_code): + """ + 使用 authCode 调用 /v1.0/oauth2/userAccessToken 接口获取用户访问令牌 + """ + url = "https://api.dingtalk.com/v1.0/oauth2/userAccessToken" + headers = { + "Content-Type": "application/json" + } + payload = { + "clientId": app_key, + "clientSecret": app_secret, + "code": auth_code, + "grantType": "authorization_code" + } + response = requests.post(url, json=payload, headers=headers) + if response.status_code == 200: + result = response.json() + if "accessToken" in result: + return result["accessToken"] + else: + raise Exception(f"Error: {result.get('message', 'Unknown error')}") + else: + raise Exception(f"HTTP Error: {response.status_code}, Response: {response.text}") + + +def get_user_details(access_token,unionId="me"): + """ + 使用 userId 和 access_token 调用 /v1.0/contact/users/{userId} 接口获取用户详细信息 + """ + url = f"https://api.dingtalk.com/v1.0/contact/users/{unionId}" + headers = { + "x-acs-dingtalk-access-token": access_token, + "Content-Type": "application/json" + } + response = requests.get(url, headers=headers) + if response.status_code == 200: + result = response.json() + return result + else: + raise Exception(f"HTTP Error: {response.status_code}, Response: {response.text}") + +""" +返回详情: +{ + HTTP/1.1 200 OK +Content-Type:application/json + +{ + "nick" : "zhangsan", + "avatarUrl" : "https://xxx", + "mobile" : "150xxxx9144", + "openId" : "123", + "unionId" : "z21HjQliSzpw0Yxxxx", + "email" : "zhangsan@alibaba-inc.com", + "stateCode" : "86" +} +""" + +#用户详情 +def get_detailed_user_info(access_token, user_id): + url = f"https://oapi.dingtalk.com/user/get" + params = { + "access_token": access_token, + "userid": user_id + } + response = requests.get(url, params=params) + if response.status_code == 200: + result = response.json() + if result.get("errcode") == 0: + return result # 返回详细用户信息 + else: + raise Exception(f"Error: {result.get('errmsg')}") + else: + raise Exception(f"HTTP Error: {response.status_code}") + +""" +用户详情数据 +{ + "errcode": 0, + "errmsg": "ok", + "userid": "zhangsan", // 员工在企业内的UserID + "name": "张三", // 员工姓名 + "tel": "13800000000", // 分机号(仅部分企业有) + "workPlace": "", // 办公地点 + "remark": "", // 备注 + "mobile": "13800000000", // 手机号码 + "email": "zhangsan@example.com", // 员工邮箱 + "orgEmail": "zhangsan@company.com", // 企业邮箱 + "active": true, // 是否激活 + "isAdmin": false, // 是否为管理员 + "isBoss": false, // 是否为企业老板 + "isLeader": true, // 是否为部门主管 + "isHide": false, // 是否隐藏手机号 + "department": [1, 2], // 所属部门ID列表 + "position": "工程师", // 职位 + "hiredDate": 1589760000000, // 入职时间(时间戳,单位:毫秒) + "jobnumber": "00123456", // 工号 + "stateCode": "86", // 国家地区码 + "unionid": "Pii7xxxxxxxxxxxxxxxxxx" // 用户在钉钉平台的唯一标识 +} +""" + + +###################2. 在钉钉客户端,免登录认证 + +#获得access_token +def get_access_token(corpId,client_id,client_secret): + import requests + + url = f"https://api.dingtalk.com/v1.0/oauth2/{corpId}/token" + + headers = { + "Content-Type": "application/json" + } + + data = { + "client_id": client_id, + "client_secret": client_secret, + "grant_type": "client_credentials" + } + + response = requests.post(url, headers=headers, json=data) + + # 输出响应结果 + if response.status_code == 200: + result = response.json() + return result["access_token"] + else: + raise Exception(f"请求失败,状态码:{response.status_code}") + + +#根据token和code得到用户信息 +def get_userinfo_by_token_and_code(access_token,code): + """ + 返回值: + { + "errcode": 0, + "result": { + "associated_unionid": "N2o5U3axxxx", + "unionid": "gliiW0piiii02zBUjUxxxx", + "device_id": "12drtfxxxxx", + "sys_level": 1, + "name": "张xx", + "sys": true, + "userid": "userid123" + }, + "errmsg": "ok" + } + """ + + data = { + "code":code + } + + url = f"https://oapi.dingtalk.com/topapi/v2/user/getuserinfo?access_token={access_token}" + + # 发送 POST 请求(不需要 body) + response = requests.post(url,json=data) + + # 处理响应 + if response.status_code == 200: + result = response.json() + if result.get("errcode") == 0: + return result["result"] + else: + raise Exception(f"接口调用失败:{result.get('errmsg')}") + else: + raise Exception(f"请求失败,状态码:{response.status_code}") +