409 lines
13 KiB
Python
409 lines
13 KiB
Python
|
||
import datetime
|
||
import paramiko
|
||
import time
|
||
import stat
|
||
import json
|
||
|
||
from dingpan import ding_list_top_50_files,ding_get_file_content_init
|
||
|
||
from k_database import Doc
|
||
|
||
|
||
#SFTP
|
||
def sftp_list_directory(hostname, port, username, password, remote_path):
|
||
try:
|
||
# 创建SSH客户端对象
|
||
ssh = paramiko.SSHClient()
|
||
# 自动添加主机密钥,避免手动确认
|
||
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
||
|
||
# 连接到SFTP服务器
|
||
ssh.connect(hostname=hostname, port=port, username=username, password=password)
|
||
|
||
# 打开SFTP会话
|
||
sftp = ssh.open_sftp()
|
||
|
||
# 列出远程目录的内容
|
||
#print(f"Listing files in directory: {remote_path}")
|
||
if remote_path[0]!="/":
|
||
remote_path = f"/{remote_path}"
|
||
|
||
data=[]
|
||
for fileattr in sftp.listdir_attr(remote_path):
|
||
#print(f"- {fileattr.filename} (Size: {fileattr.st_size} bytes, Modified: {fileattr.st_mtime})")
|
||
data.append({"name":fileattr.filename,"m_time":datetime.datetime.fromtimestamp(fileattr.st_mtime),"size":fileattr.st_size})
|
||
|
||
# 关闭SFTP会话和SSH连接
|
||
sftp.close()
|
||
ssh.close()
|
||
|
||
except Exception as e:
|
||
raise Exception(f"访问时发生一个错误: {e}")
|
||
|
||
return data
|
||
|
||
|
||
|
||
|
||
#钉盘测试
|
||
def ding_list_directory(app_key,app_secret,main_dir):
|
||
try:
|
||
data=[]
|
||
for file in ding_list_top_50_files(app_key,app_secret,main_dir):
|
||
#print(f"- {fileattr.filename} (Size: {fileattr.st_size} bytes, Modified: {fileattr.st_mtime})")
|
||
data.append({"name":file["fname"],"m_time":file["mtime"],"size":file["path"]})
|
||
except Exception as e:
|
||
raise Exception(f"访问时发生一个错误: {e}")
|
||
|
||
return data
|
||
|
||
|
||
|
||
def sftp_conn(hostname, username, password):
|
||
# 创建SSH客户端对象
|
||
ssh = paramiko.SSHClient()
|
||
# 自动添加主机密钥,避免手动确认
|
||
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
||
|
||
# 连接到SFTP服务器
|
||
ssh.connect(hostname=hostname, port=22, username=username, password=password)
|
||
|
||
# 打开SFTP会话
|
||
sftp = ssh.open_sftp()
|
||
|
||
return ssh,sftp
|
||
|
||
def sftp_close(ssh,sftp):
|
||
# 关闭SFTP会话和SSH连接
|
||
try:
|
||
sftp.close()
|
||
ssh.close()
|
||
except:
|
||
pass
|
||
|
||
|
||
#SFTP,扫描所有子目录
|
||
def sftp_scan_directory(hostname, username, password, remote_path):
|
||
try:
|
||
# 创建SSH客户端对象
|
||
ssh = paramiko.SSHClient()
|
||
# 自动添加主机密钥,避免手动确认
|
||
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
||
|
||
# 连接到SFTP服务器
|
||
ssh.connect(hostname=hostname, port=22, username=username, password=password)
|
||
|
||
# 打开SFTP会话
|
||
sftp = ssh.open_sftp()
|
||
|
||
# 列出远程目录的内容
|
||
#print(f"Listing files in directory: {remote_path}")
|
||
if remote_path[0]!="/":
|
||
remote_path = f"/{remote_path}"
|
||
|
||
directorys={} #总目录,用来做比较
|
||
def scan_next(remote_path):
|
||
#print("scan_next",remote_path)
|
||
if remote_path not in directorys:
|
||
directorys[remote_path]=1
|
||
try:
|
||
dirs=[]
|
||
for fileattr in sftp.listdir_attr(remote_path):
|
||
if stat.S_ISDIR(fileattr.st_mode):
|
||
#print(f"- {fileattr.filename} (Size: {fileattr.st_size} bytes, Modified: {fileattr.st_mtime})")
|
||
dirs.append(f"{remote_path}/{fileattr.filename}")
|
||
if stat.S_ISREG(fileattr.st_mode): #文件
|
||
uid = fileattr.st_uid
|
||
# 使用 SSH 命令查询 UID 对应的用户名
|
||
stdin, stdout, stderr = ssh.exec_command(f"getent passwd {uid} | cut -d: -f1")
|
||
owner_name = stdout.read().decode().strip() or str(uid)
|
||
|
||
yield f"{remote_path}/{fileattr.filename}", fileattr.filename,fileattr.st_mtime,fileattr.st_size,owner_name
|
||
for dir in dirs:
|
||
yield from scan_next(dir)
|
||
except:
|
||
pass
|
||
|
||
yield from scan_next(remote_path)
|
||
|
||
# 关闭SFTP会话和SSH连接
|
||
sftp.close()
|
||
ssh.close()
|
||
|
||
except Exception as e:
|
||
raise Exception(f"访问时发生一个错误: {e}")
|
||
|
||
|
||
#windows共享
|
||
def smb_list_directory(hostname, username, password, remote_path):
|
||
try:
|
||
import smbclient
|
||
smbclient.register_session(hostname, username, password)
|
||
|
||
data=[]
|
||
for entry in smbclient.scandir(f"\\{hostname}\{remote_path}"):
|
||
# entry.is_dir()
|
||
# entry.is_file()
|
||
# entry.path
|
||
stats = entry.stat()
|
||
#print(entry.name)
|
||
data.append({"name":entry.name,"m_time":datetime.datetime.fromtimestamp(stats.st_mtime),"size":stats.st_size})
|
||
|
||
except Exception as e:
|
||
raise Exception(f"访问时发生一个错误: {e}")
|
||
|
||
return data
|
||
|
||
|
||
def myscan(smbclient,directory):
|
||
print("scan2",directory)
|
||
# smbclient.register_session(hostname, username, password)
|
||
for entry in smbclient.scandir(f"\{directory}"):
|
||
stats = entry.stat()
|
||
if entry.is_file():
|
||
content=""
|
||
# with smbclient.open_file(f"\\{hostname}\{directory}\{entry.name}", mode="rb") as fd:
|
||
# content = fd.read()
|
||
print(entry.path)
|
||
yield entry.path,entry.name,content,stats.st_mtime,stats.st_size
|
||
if entry.is_dir():
|
||
myscan(smbclient,entry.path)
|
||
|
||
|
||
#windows共享,遍历文件
|
||
def smb_scan_directory(hostname, username, password, remote_path):
|
||
try:
|
||
import smbclient
|
||
smbclient.register_session(hostname, username, password)
|
||
if remote_path[0]=="\\":
|
||
remote_path = remote_path[1:]
|
||
|
||
directory = f"\\{hostname}\{remote_path}"
|
||
|
||
ready_dirs={}
|
||
def scan_next(directory):
|
||
if directory not in ready_dirs:
|
||
ready_dirs[directory] = 1
|
||
print("dir",directory)
|
||
dirs=[]
|
||
for entry in smbclient.scandir(directory):
|
||
# entry.is_dir()
|
||
# entry.is_file()
|
||
# entry.path
|
||
stats = entry.stat()
|
||
if entry.is_file():
|
||
yield entry.path,entry.name,stats.st_mtime,stats.st_size
|
||
if entry.is_dir():
|
||
dirs.append(entry.path)
|
||
|
||
for dir in dirs:
|
||
yield from scan_next(dir)
|
||
|
||
yield from scan_next(directory)
|
||
|
||
except Exception as e:
|
||
raise Exception(f"访问时发生一个错误: {e}")
|
||
|
||
|
||
|
||
#linux本地
|
||
def linux_list_directory(directory):
|
||
from pathlib import Path
|
||
try:
|
||
data=[]
|
||
for entry in Path(directory).iterdir():
|
||
stats = entry.stat()
|
||
data.append({"name":entry.name,"m_time":datetime.datetime.fromtimestamp(stats.st_mtime),"size":stats.st_size})
|
||
|
||
except Exception as e:
|
||
print(e)
|
||
raise Exception(f"访问时发生一个错误: {e}")
|
||
|
||
return data
|
||
|
||
#建立一个文件的cache,供下载使用
|
||
def create_doc_cache(path,cfg):
|
||
if cfg["type"]=="SFTP":
|
||
ssh,sftp = sftp_conn(cfg["address"],cfg["user"],cfg["password"])
|
||
|
||
with sftp.file(path, 'rb') as remote_file:
|
||
content = remote_file.read()
|
||
cache_path=f"/dev/shm/{time.time()}"
|
||
with open(cache_path,"wb+") as file:
|
||
file.write(content)
|
||
|
||
sftp_close(ssh,sftp)
|
||
|
||
elif cfg["type"]=="Windows共享":
|
||
import smbclient
|
||
smbclient.register_session(cfg["address"],cfg["user"],cfg["password"])
|
||
with smbclient.open_file(path, mode="rb") as fd:
|
||
content = fd.read()
|
||
cache_path=f"/dev/shm/{time.time()}"
|
||
with open(cache_path,"wb+") as file:
|
||
file.write(content)
|
||
|
||
elif cfg["type"]=="钉盘":
|
||
doc = Doc.select(Doc.meta).where( (Doc.abs_path==path) & (Doc.base==cfg["name"])).first()
|
||
|
||
meta = json.loads(doc.meta)
|
||
|
||
content = ding_get_file_content_init(cfg["address"],cfg["password"],meta["spaceId"],meta["id"])
|
||
cache_path=f"/dev/shm/{time.time()}"
|
||
with open(cache_path,"wb+") as file:
|
||
file.write(content)
|
||
|
||
elif cfg["type"]=="本地":
|
||
cache_path=path
|
||
|
||
return cache_path
|
||
|
||
|
||
|
||
from ftplib import FTP, error_perm
|
||
import time
|
||
def parse_ftp_list_line(line):
|
||
parts = line.split(maxsplit=8)
|
||
if len(parts) == 9:
|
||
|
||
permissions, links, owner, group, size, month, day, time_or_year, filename = parts
|
||
|
||
try:
|
||
# 尝试解析时间或年份部分
|
||
timestamp = time.strptime(f"{month} {day} {time_or_year}", "%b %d %H:%M")
|
||
current_year = time.localtime().tm_year
|
||
mtime = time.mktime((current_year, timestamp.tm_mon, timestamp.tm_mday,
|
||
timestamp.tm_hour, timestamp.tm_min, 0, 0, 0, -1))
|
||
|
||
# 如果是超过6个月前的文件,则调整年份
|
||
six_months_ago = time.time() - 6 * 30 * 24 * 60 * 60
|
||
if mtime < six_months_ago:
|
||
mtime = time.mktime((current_year - 1, timestamp.tm_mon, timestamp.tm_mday,
|
||
timestamp.tm_hour, timestamp.tm_min, 0, 0, 0, -1))
|
||
|
||
return {
|
||
'permissions': permissions,
|
||
'links': int(links),
|
||
'owner': owner,
|
||
'group': group,
|
||
'size': int(size),
|
||
'm_time': mtime,
|
||
'name': filename
|
||
}
|
||
except ValueError:
|
||
return None
|
||
elif len(parts) == 4: #windows格式
|
||
fdate,ftime,dir_or_size,filename = parts
|
||
|
||
parsed_date = datetime.datetime.strptime(f"{fdate} {ftime}", "%m-%d-%y %I:%M%p")
|
||
|
||
# 明确指定世纪
|
||
if parsed_date.year < 100:
|
||
parsed_date = parsed_date.replace(year=parsed_date.year + 2000)
|
||
|
||
str_date = parsed_date.strftime("%Y-%m-%d %H:%M:%S")
|
||
try:
|
||
item= {
|
||
'size': int(dir_or_size),
|
||
'type': 'f',
|
||
'm_time': str_date,#转化后的字符串
|
||
"mtime": parsed_date.timestamp() ,#时间戳
|
||
'name': filename
|
||
}
|
||
except:
|
||
item= {
|
||
'size': 0,
|
||
'type': 'd',
|
||
'm_time': str_date,
|
||
"mtime": parsed_date.timestamp() ,#时间戳
|
||
'name': filename
|
||
}
|
||
return item
|
||
|
||
|
||
|
||
#一次list目录
|
||
def ftp_list_directory(ftp_host, ftp_user, ftp_password, directory):
|
||
|
||
# 连接到FTP服务器
|
||
with FTP(ftp_host) as ftp:
|
||
ftp.login(user=ftp_user, passwd=ftp_password)
|
||
|
||
print(f"Current working directory: {ftp.pwd()}")
|
||
|
||
# 改变到目标目录
|
||
try:
|
||
ftp.cwd(directory)
|
||
print(f"Changed to directory: {directory}")
|
||
except error_perm as e:
|
||
print(f"Failed to change directory: {e}")
|
||
return
|
||
|
||
# 列出当前目录下的文件和子目录
|
||
#print("Directory contents:")
|
||
#ftp.dir()
|
||
|
||
# 或者获取一个列表形式的结果
|
||
# 修改之前的代码,在遍历文件时使用此函数解析每一行
|
||
files = []
|
||
ftp.retrlines('LIST', lambda line: files.append(parse_ftp_list_line(line)))
|
||
|
||
data=[]
|
||
for entry in files:
|
||
if entry is not None:
|
||
data.append(entry)
|
||
|
||
return data
|
||
|
||
#扫描整个目录
|
||
def ftp_scan_directory(ftp_host, ftp_user, ftp_password, directory):
|
||
|
||
# 连接到FTP服务器
|
||
with FTP(ftp_host) as ftp:
|
||
ftp.login(user=ftp_user, passwd=ftp_password)
|
||
|
||
print(f"Current working directory: {ftp.pwd()}")
|
||
|
||
|
||
def scan_next(directory):
|
||
|
||
# 改变到目标目录
|
||
try:
|
||
ftp.cwd(directory)
|
||
print(f"Changed to directory: {directory}")
|
||
except error_perm as e:
|
||
print(f"Failed to change directory: {e}")
|
||
return
|
||
|
||
# 或者获取一个列表形式的结果
|
||
# 修改之前的代码,在遍历文件时使用此函数解析每一行
|
||
files = []
|
||
ftp.retrlines('LIST', lambda line: files.append(parse_ftp_list_line(line)))
|
||
|
||
dirs=[]
|
||
for entry in files:
|
||
if entry is not None:
|
||
if entry["type"]=="f":
|
||
entry["path"] = directory+"/"+entry["name"]
|
||
yield entry
|
||
elif entry["type"]=="d":
|
||
dirs.append(entry)
|
||
for dir in dirs:
|
||
yield from scan_next(directory+"/"+dir["name"])
|
||
|
||
yield from scan_next(directory)
|
||
|
||
|
||
|
||
|
||
|
||
|
||
if __name__ == "__main__":
|
||
ftp_host = "host" # 替换为你的FTP主机地址
|
||
ftp_user = "Administrator" # 替换为你的FTP用户名
|
||
ftp_password = "12" # 替换为你的FTP密码
|
||
directory = "/" # 替换为你想要列出的目录路径
|
||
|
||
for f in ftp_scan_directory(ftp_host, ftp_user, ftp_password, directory):
|
||
print(f) |