From 918315ab7a7f15a34febf50ba09962e0fa6aff7b Mon Sep 17 00:00:00 2001 From: 13315423919 <13315423919@qq.com> Date: Wed, 19 Nov 2025 19:42:53 +0800 Subject: [PATCH] Add File --- main/ff.py | 409 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 409 insertions(+) create mode 100644 main/ff.py diff --git a/main/ff.py b/main/ff.py new file mode 100644 index 0000000..ba9c025 --- /dev/null +++ b/main/ff.py @@ -0,0 +1,409 @@ + +import datetime +import paramiko +import time +import stat +import json + +from dingpan import ding_list_top_50_files,ding_get_file_content_init + +from k_database import Doc + + +#SFTP +def sftp_list_directory(hostname, port, username, password, remote_path): + try: + # 创建SSH客户端对象 + ssh = paramiko.SSHClient() + # 自动添加主机密钥,避免手动确认 + ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + + # 连接到SFTP服务器 + ssh.connect(hostname=hostname, port=port, username=username, password=password) + + # 打开SFTP会话 + sftp = ssh.open_sftp() + + # 列出远程目录的内容 + #print(f"Listing files in directory: {remote_path}") + if remote_path[0]!="/": + remote_path = f"/{remote_path}" + + data=[] + for fileattr in sftp.listdir_attr(remote_path): + #print(f"- {fileattr.filename} (Size: {fileattr.st_size} bytes, Modified: {fileattr.st_mtime})") + data.append({"name":fileattr.filename,"m_time":datetime.datetime.fromtimestamp(fileattr.st_mtime),"size":fileattr.st_size}) + + # 关闭SFTP会话和SSH连接 + sftp.close() + ssh.close() + + except Exception as e: + raise Exception(f"访问时发生一个错误: {e}") + + return data + + + + +#钉盘测试 +def ding_list_directory(app_key,app_secret,main_dir): + try: + data=[] + for file in ding_list_top_50_files(app_key,app_secret,main_dir): + #print(f"- {fileattr.filename} (Size: {fileattr.st_size} bytes, Modified: {fileattr.st_mtime})") + data.append({"name":file["fname"],"m_time":file["mtime"],"size":file["path"]}) + except Exception as e: + raise Exception(f"访问时发生一个错误: {e}") + + return data + + + +def sftp_conn(hostname, username, password): + # 创建SSH客户端对象 + ssh = paramiko.SSHClient() + # 自动添加主机密钥,避免手动确认 + ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + + # 连接到SFTP服务器 + ssh.connect(hostname=hostname, port=22, username=username, password=password) + + # 打开SFTP会话 + sftp = ssh.open_sftp() + + return ssh,sftp + +def sftp_close(ssh,sftp): + # 关闭SFTP会话和SSH连接 + try: + sftp.close() + ssh.close() + except: + pass + + +#SFTP,扫描所有子目录 +def sftp_scan_directory(hostname, username, password, remote_path): + try: + # 创建SSH客户端对象 + ssh = paramiko.SSHClient() + # 自动添加主机密钥,避免手动确认 + ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + + # 连接到SFTP服务器 + ssh.connect(hostname=hostname, port=22, username=username, password=password) + + # 打开SFTP会话 + sftp = ssh.open_sftp() + + # 列出远程目录的内容 + #print(f"Listing files in directory: {remote_path}") + if remote_path[0]!="/": + remote_path = f"/{remote_path}" + + directorys={} #总目录,用来做比较 + def scan_next(remote_path): + #print("scan_next",remote_path) + if remote_path not in directorys: + directorys[remote_path]=1 + try: + dirs=[] + for fileattr in sftp.listdir_attr(remote_path): + if stat.S_ISDIR(fileattr.st_mode): + #print(f"- {fileattr.filename} (Size: {fileattr.st_size} bytes, Modified: {fileattr.st_mtime})") + dirs.append(f"{remote_path}/{fileattr.filename}") + if stat.S_ISREG(fileattr.st_mode): #文件 + uid = fileattr.st_uid + # 使用 SSH 命令查询 UID 对应的用户名 + stdin, stdout, stderr = ssh.exec_command(f"getent passwd {uid} | cut -d: -f1") + owner_name = stdout.read().decode().strip() or str(uid) + + yield f"{remote_path}/{fileattr.filename}", fileattr.filename,fileattr.st_mtime,fileattr.st_size,owner_name + for dir in dirs: + yield from scan_next(dir) + except: + pass + + yield from scan_next(remote_path) + + # 关闭SFTP会话和SSH连接 + sftp.close() + ssh.close() + + except Exception as e: + raise Exception(f"访问时发生一个错误: {e}") + + +#windows共享 +def smb_list_directory(hostname, username, password, remote_path): + try: + import smbclient + smbclient.register_session(hostname, username, password) + + data=[] + for entry in smbclient.scandir(f"\\{hostname}\{remote_path}"): + # entry.is_dir() + # entry.is_file() + # entry.path + stats = entry.stat() + #print(entry.name) + data.append({"name":entry.name,"m_time":datetime.datetime.fromtimestamp(stats.st_mtime),"size":stats.st_size}) + + except Exception as e: + raise Exception(f"访问时发生一个错误: {e}") + + return data + + +def myscan(smbclient,directory): + print("scan2",directory) + # smbclient.register_session(hostname, username, password) + for entry in smbclient.scandir(f"\{directory}"): + stats = entry.stat() + if entry.is_file(): + content="" + # with smbclient.open_file(f"\\{hostname}\{directory}\{entry.name}", mode="rb") as fd: + # content = fd.read() + print(entry.path) + yield entry.path,entry.name,content,stats.st_mtime,stats.st_size + if entry.is_dir(): + myscan(smbclient,entry.path) + + +#windows共享,遍历文件 +def smb_scan_directory(hostname, username, password, remote_path): + try: + import smbclient + smbclient.register_session(hostname, username, password) + if remote_path[0]=="\\": + remote_path = remote_path[1:] + + directory = f"\\{hostname}\{remote_path}" + + ready_dirs={} + def scan_next(directory): + if directory not in ready_dirs: + ready_dirs[directory] = 1 + print("dir",directory) + dirs=[] + for entry in smbclient.scandir(directory): + # entry.is_dir() + # entry.is_file() + # entry.path + stats = entry.stat() + if entry.is_file(): + yield entry.path,entry.name,stats.st_mtime,stats.st_size + if entry.is_dir(): + dirs.append(entry.path) + + for dir in dirs: + yield from scan_next(dir) + + yield from scan_next(directory) + + except Exception as e: + raise Exception(f"访问时发生一个错误: {e}") + + + +#linux本地 +def linux_list_directory(directory): + from pathlib import Path + try: + data=[] + for entry in Path(directory).iterdir(): + stats = entry.stat() + data.append({"name":entry.name,"m_time":datetime.datetime.fromtimestamp(stats.st_mtime),"size":stats.st_size}) + + except Exception as e: + print(e) + raise Exception(f"访问时发生一个错误: {e}") + + return data + +#建立一个文件的cache,供下载使用 +def create_doc_cache(path,cfg): + if cfg["type"]=="SFTP": + ssh,sftp = sftp_conn(cfg["address"],cfg["user"],cfg["password"]) + + with sftp.file(path, 'rb') as remote_file: + content = remote_file.read() + cache_path=f"/dev/shm/{time.time()}" + with open(cache_path,"wb+") as file: + file.write(content) + + sftp_close(ssh,sftp) + + elif cfg["type"]=="Windows共享": + import smbclient + smbclient.register_session(cfg["address"],cfg["user"],cfg["password"]) + with smbclient.open_file(path, mode="rb") as fd: + content = fd.read() + cache_path=f"/dev/shm/{time.time()}" + with open(cache_path,"wb+") as file: + file.write(content) + + elif cfg["type"]=="钉盘": + doc = Doc.select(Doc.meta).where( (Doc.abs_path==path) & (Doc.base==cfg["name"])).first() + + meta = json.loads(doc.meta) + + content = ding_get_file_content_init(cfg["address"],cfg["password"],meta["spaceId"],meta["id"]) + cache_path=f"/dev/shm/{time.time()}" + with open(cache_path,"wb+") as file: + file.write(content) + + elif cfg["type"]=="本地": + cache_path=path + + return cache_path + + + +from ftplib import FTP, error_perm +import time +def parse_ftp_list_line(line): + parts = line.split(maxsplit=8) + if len(parts) == 9: + + permissions, links, owner, group, size, month, day, time_or_year, filename = parts + + try: + # 尝试解析时间或年份部分 + timestamp = time.strptime(f"{month} {day} {time_or_year}", "%b %d %H:%M") + current_year = time.localtime().tm_year + mtime = time.mktime((current_year, timestamp.tm_mon, timestamp.tm_mday, + timestamp.tm_hour, timestamp.tm_min, 0, 0, 0, -1)) + + # 如果是超过6个月前的文件,则调整年份 + six_months_ago = time.time() - 6 * 30 * 24 * 60 * 60 + if mtime < six_months_ago: + mtime = time.mktime((current_year - 1, timestamp.tm_mon, timestamp.tm_mday, + timestamp.tm_hour, timestamp.tm_min, 0, 0, 0, -1)) + + return { + 'permissions': permissions, + 'links': int(links), + 'owner': owner, + 'group': group, + 'size': int(size), + 'm_time': mtime, + 'name': filename + } + except ValueError: + return None + elif len(parts) == 4: #windows格式 + fdate,ftime,dir_or_size,filename = parts + + parsed_date = datetime.datetime.strptime(f"{fdate} {ftime}", "%m-%d-%y %I:%M%p") + + # 明确指定世纪 + if parsed_date.year < 100: + parsed_date = parsed_date.replace(year=parsed_date.year + 2000) + + str_date = parsed_date.strftime("%Y-%m-%d %H:%M:%S") + try: + item= { + 'size': int(dir_or_size), + 'type': 'f', + 'm_time': str_date,#转化后的字符串 + "mtime": parsed_date.timestamp() ,#时间戳 + 'name': filename + } + except: + item= { + 'size': 0, + 'type': 'd', + 'm_time': str_date, + "mtime": parsed_date.timestamp() ,#时间戳 + 'name': filename + } + return item + + + +#一次list目录 +def ftp_list_directory(ftp_host, ftp_user, ftp_password, directory): + + # 连接到FTP服务器 + with FTP(ftp_host) as ftp: + ftp.login(user=ftp_user, passwd=ftp_password) + + print(f"Current working directory: {ftp.pwd()}") + + # 改变到目标目录 + try: + ftp.cwd(directory) + print(f"Changed to directory: {directory}") + except error_perm as e: + print(f"Failed to change directory: {e}") + return + + # 列出当前目录下的文件和子目录 + #print("Directory contents:") + #ftp.dir() + + # 或者获取一个列表形式的结果 + # 修改之前的代码,在遍历文件时使用此函数解析每一行 + files = [] + ftp.retrlines('LIST', lambda line: files.append(parse_ftp_list_line(line))) + + data=[] + for entry in files: + if entry is not None: + data.append(entry) + + return data + +#扫描整个目录 +def ftp_scan_directory(ftp_host, ftp_user, ftp_password, directory): + + # 连接到FTP服务器 + with FTP(ftp_host) as ftp: + ftp.login(user=ftp_user, passwd=ftp_password) + + print(f"Current working directory: {ftp.pwd()}") + + + def scan_next(directory): + + # 改变到目标目录 + try: + ftp.cwd(directory) + print(f"Changed to directory: {directory}") + except error_perm as e: + print(f"Failed to change directory: {e}") + return + + # 或者获取一个列表形式的结果 + # 修改之前的代码,在遍历文件时使用此函数解析每一行 + files = [] + ftp.retrlines('LIST', lambda line: files.append(parse_ftp_list_line(line))) + + dirs=[] + for entry in files: + if entry is not None: + if entry["type"]=="f": + entry["path"] = directory+"/"+entry["name"] + yield entry + elif entry["type"]=="d": + dirs.append(entry) + for dir in dirs: + yield from scan_next(directory+"/"+dir["name"]) + + yield from scan_next(directory) + + + + + + +if __name__ == "__main__": + ftp_host = "host" # 替换为你的FTP主机地址 + ftp_user = "Administrator" # 替换为你的FTP用户名 + ftp_password = "12" # 替换为你的FTP密码 + directory = "/" # 替换为你想要列出的目录路径 + + for f in ftp_scan_directory(ftp_host, ftp_user, ftp_password, directory): + print(f) \ No newline at end of file