import datetime import paramiko import time import stat import json from dingpan import ding_list_top_50_files,ding_get_file_content_init from k_database import Doc #SFTP def sftp_list_directory(hostname, port, username, password, remote_path): try: # 创建SSH客户端对象 ssh = paramiko.SSHClient() # 自动添加主机密钥,避免手动确认 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # 连接到SFTP服务器 ssh.connect(hostname=hostname, port=port, username=username, password=password) # 打开SFTP会话 sftp = ssh.open_sftp() # 列出远程目录的内容 #print(f"Listing files in directory: {remote_path}") if remote_path[0]!="/": remote_path = f"/{remote_path}" data=[] for fileattr in sftp.listdir_attr(remote_path): #print(f"- {fileattr.filename} (Size: {fileattr.st_size} bytes, Modified: {fileattr.st_mtime})") data.append({"name":fileattr.filename,"m_time":datetime.datetime.fromtimestamp(fileattr.st_mtime),"size":fileattr.st_size}) # 关闭SFTP会话和SSH连接 sftp.close() ssh.close() except Exception as e: raise Exception(f"访问时发生一个错误: {e}") return data #钉盘测试 def ding_list_directory(app_key,app_secret,main_dir): try: data=[] for file in ding_list_top_50_files(app_key,app_secret,main_dir): #print(f"- {fileattr.filename} (Size: {fileattr.st_size} bytes, Modified: {fileattr.st_mtime})") data.append({"name":file["fname"],"m_time":file["mtime"],"size":file["path"]}) except Exception as e: raise Exception(f"访问时发生一个错误: {e}") return data def sftp_conn(hostname, username, password): # 创建SSH客户端对象 ssh = paramiko.SSHClient() # 自动添加主机密钥,避免手动确认 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # 连接到SFTP服务器 ssh.connect(hostname=hostname, port=22, username=username, password=password) # 打开SFTP会话 sftp = ssh.open_sftp() return ssh,sftp def sftp_close(ssh,sftp): # 关闭SFTP会话和SSH连接 try: sftp.close() ssh.close() except: pass #SFTP,扫描所有子目录 def sftp_scan_directory(hostname, username, password, remote_path): try: # 创建SSH客户端对象 ssh = paramiko.SSHClient() # 自动添加主机密钥,避免手动确认 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # 连接到SFTP服务器 ssh.connect(hostname=hostname, port=22, username=username, password=password) # 打开SFTP会话 sftp = ssh.open_sftp() # 列出远程目录的内容 #print(f"Listing files in directory: {remote_path}") if remote_path[0]!="/": remote_path = f"/{remote_path}" directorys={} #总目录,用来做比较 def scan_next(remote_path): #print("scan_next",remote_path) if remote_path not in directorys: directorys[remote_path]=1 try: dirs=[] for fileattr in sftp.listdir_attr(remote_path): if stat.S_ISDIR(fileattr.st_mode): #print(f"- {fileattr.filename} (Size: {fileattr.st_size} bytes, Modified: {fileattr.st_mtime})") dirs.append(f"{remote_path}/{fileattr.filename}") if stat.S_ISREG(fileattr.st_mode): #文件 uid = fileattr.st_uid # 使用 SSH 命令查询 UID 对应的用户名 stdin, stdout, stderr = ssh.exec_command(f"getent passwd {uid} | cut -d: -f1") owner_name = stdout.read().decode().strip() or str(uid) yield f"{remote_path}/{fileattr.filename}", fileattr.filename,fileattr.st_mtime,fileattr.st_size,owner_name for dir in dirs: yield from scan_next(dir) except: pass yield from scan_next(remote_path) # 关闭SFTP会话和SSH连接 sftp.close() ssh.close() except Exception as e: raise Exception(f"访问时发生一个错误: {e}") #windows共享 def smb_list_directory(hostname, username, password, remote_path): try: import smbclient smbclient.register_session(hostname, username, password) data=[] for entry in smbclient.scandir(f"\\{hostname}\{remote_path}"): # entry.is_dir() # entry.is_file() # entry.path stats = entry.stat() #print(entry.name) data.append({"name":entry.name,"m_time":datetime.datetime.fromtimestamp(stats.st_mtime),"size":stats.st_size}) except Exception as e: raise Exception(f"访问时发生一个错误: {e}") return data def myscan(smbclient,directory): print("scan2",directory) # smbclient.register_session(hostname, username, password) for entry in smbclient.scandir(f"\{directory}"): stats = entry.stat() if entry.is_file(): content="" # with smbclient.open_file(f"\\{hostname}\{directory}\{entry.name}", mode="rb") as fd: # content = fd.read() print(entry.path) yield entry.path,entry.name,content,stats.st_mtime,stats.st_size if entry.is_dir(): myscan(smbclient,entry.path) #windows共享,遍历文件 def smb_scan_directory(hostname, username, password, remote_path): try: import smbclient smbclient.register_session(hostname, username, password) if remote_path[0]=="\\": remote_path = remote_path[1:] directory = f"\\{hostname}\{remote_path}" ready_dirs={} def scan_next(directory): if directory not in ready_dirs: ready_dirs[directory] = 1 print("dir",directory) dirs=[] for entry in smbclient.scandir(directory): # entry.is_dir() # entry.is_file() # entry.path stats = entry.stat() if entry.is_file(): yield entry.path,entry.name,stats.st_mtime,stats.st_size if entry.is_dir(): dirs.append(entry.path) for dir in dirs: yield from scan_next(dir) yield from scan_next(directory) except Exception as e: raise Exception(f"访问时发生一个错误: {e}") #linux本地 def linux_list_directory(directory): from pathlib import Path try: data=[] for entry in Path(directory).iterdir(): stats = entry.stat() data.append({"name":entry.name,"m_time":datetime.datetime.fromtimestamp(stats.st_mtime),"size":stats.st_size}) except Exception as e: print(e) raise Exception(f"访问时发生一个错误: {e}") return data #建立一个文件的cache,供下载使用 def create_doc_cache(path,cfg): if cfg["type"]=="SFTP": ssh,sftp = sftp_conn(cfg["address"],cfg["user"],cfg["password"]) with sftp.file(path, 'rb') as remote_file: content = remote_file.read() cache_path=f"/dev/shm/{time.time()}" with open(cache_path,"wb+") as file: file.write(content) sftp_close(ssh,sftp) elif cfg["type"]=="Windows共享": import smbclient smbclient.register_session(cfg["address"],cfg["user"],cfg["password"]) with smbclient.open_file(path, mode="rb") as fd: content = fd.read() cache_path=f"/dev/shm/{time.time()}" with open(cache_path,"wb+") as file: file.write(content) elif cfg["type"]=="钉盘": doc = Doc.select(Doc.meta).where( (Doc.abs_path==path) & (Doc.base==cfg["name"])).first() meta = json.loads(doc.meta) content = ding_get_file_content_init(cfg["address"],cfg["password"],meta["spaceId"],meta["id"]) cache_path=f"/dev/shm/{time.time()}" with open(cache_path,"wb+") as file: file.write(content) elif cfg["type"]=="本地": cache_path=path return cache_path from ftplib import FTP, error_perm import time def parse_ftp_list_line(line): parts = line.split(maxsplit=8) if len(parts) == 9: permissions, links, owner, group, size, month, day, time_or_year, filename = parts try: # 尝试解析时间或年份部分 timestamp = time.strptime(f"{month} {day} {time_or_year}", "%b %d %H:%M") current_year = time.localtime().tm_year mtime = time.mktime((current_year, timestamp.tm_mon, timestamp.tm_mday, timestamp.tm_hour, timestamp.tm_min, 0, 0, 0, -1)) # 如果是超过6个月前的文件,则调整年份 six_months_ago = time.time() - 6 * 30 * 24 * 60 * 60 if mtime < six_months_ago: mtime = time.mktime((current_year - 1, timestamp.tm_mon, timestamp.tm_mday, timestamp.tm_hour, timestamp.tm_min, 0, 0, 0, -1)) return { 'permissions': permissions, 'links': int(links), 'owner': owner, 'group': group, 'size': int(size), 'm_time': mtime, 'name': filename } except ValueError: return None elif len(parts) == 4: #windows格式 fdate,ftime,dir_or_size,filename = parts parsed_date = datetime.datetime.strptime(f"{fdate} {ftime}", "%m-%d-%y %I:%M%p") # 明确指定世纪 if parsed_date.year < 100: parsed_date = parsed_date.replace(year=parsed_date.year + 2000) str_date = parsed_date.strftime("%Y-%m-%d %H:%M:%S") try: item= { 'size': int(dir_or_size), 'type': 'f', 'm_time': str_date,#转化后的字符串 "mtime": parsed_date.timestamp() ,#时间戳 'name': filename } except: item= { 'size': 0, 'type': 'd', 'm_time': str_date, "mtime": parsed_date.timestamp() ,#时间戳 'name': filename } return item #一次list目录 def ftp_list_directory(ftp_host, ftp_user, ftp_password, directory): # 连接到FTP服务器 with FTP(ftp_host) as ftp: ftp.login(user=ftp_user, passwd=ftp_password) print(f"Current working directory: {ftp.pwd()}") # 改变到目标目录 try: ftp.cwd(directory) print(f"Changed to directory: {directory}") except error_perm as e: print(f"Failed to change directory: {e}") return # 列出当前目录下的文件和子目录 #print("Directory contents:") #ftp.dir() # 或者获取一个列表形式的结果 # 修改之前的代码,在遍历文件时使用此函数解析每一行 files = [] ftp.retrlines('LIST', lambda line: files.append(parse_ftp_list_line(line))) data=[] for entry in files: if entry is not None: data.append(entry) return data #扫描整个目录 def ftp_scan_directory(ftp_host, ftp_user, ftp_password, directory): # 连接到FTP服务器 with FTP(ftp_host) as ftp: ftp.login(user=ftp_user, passwd=ftp_password) print(f"Current working directory: {ftp.pwd()}") def scan_next(directory): # 改变到目标目录 try: ftp.cwd(directory) print(f"Changed to directory: {directory}") except error_perm as e: print(f"Failed to change directory: {e}") return # 或者获取一个列表形式的结果 # 修改之前的代码,在遍历文件时使用此函数解析每一行 files = [] ftp.retrlines('LIST', lambda line: files.append(parse_ftp_list_line(line))) dirs=[] for entry in files: if entry is not None: if entry["type"]=="f": entry["path"] = directory+"/"+entry["name"] yield entry elif entry["type"]=="d": dirs.append(entry) for dir in dirs: yield from scan_next(directory+"/"+dir["name"]) yield from scan_next(directory) if __name__ == "__main__": ftp_host = "host" # 替换为你的FTP主机地址 ftp_user = "Administrator" # 替换为你的FTP用户名 ftp_password = "12" # 替换为你的FTP密码 directory = "/" # 替换为你想要列出的目录路径 for f in ftp_scan_directory(ftp_host, ftp_user, ftp_password, directory): print(f)