Files
k3GPT/main/ff.py
2025-11-19 19:42:53 +08:00

409 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import datetime
import paramiko
import time
import stat
import json
from dingpan import ding_list_top_50_files,ding_get_file_content_init
from k_database import Doc
#SFTP
def sftp_list_directory(hostname, port, username, password, remote_path):
try:
# 创建SSH客户端对象
ssh = paramiko.SSHClient()
# 自动添加主机密钥,避免手动确认
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 连接到SFTP服务器
ssh.connect(hostname=hostname, port=port, username=username, password=password)
# 打开SFTP会话
sftp = ssh.open_sftp()
# 列出远程目录的内容
#print(f"Listing files in directory: {remote_path}")
if remote_path[0]!="/":
remote_path = f"/{remote_path}"
data=[]
for fileattr in sftp.listdir_attr(remote_path):
#print(f"- {fileattr.filename} (Size: {fileattr.st_size} bytes, Modified: {fileattr.st_mtime})")
data.append({"name":fileattr.filename,"m_time":datetime.datetime.fromtimestamp(fileattr.st_mtime),"size":fileattr.st_size})
# 关闭SFTP会话和SSH连接
sftp.close()
ssh.close()
except Exception as e:
raise Exception(f"访问时发生一个错误: {e}")
return data
#钉盘测试
def ding_list_directory(app_key,app_secret,main_dir):
try:
data=[]
for file in ding_list_top_50_files(app_key,app_secret,main_dir):
#print(f"- {fileattr.filename} (Size: {fileattr.st_size} bytes, Modified: {fileattr.st_mtime})")
data.append({"name":file["fname"],"m_time":file["mtime"],"size":file["path"]})
except Exception as e:
raise Exception(f"访问时发生一个错误: {e}")
return data
def sftp_conn(hostname, username, password):
# 创建SSH客户端对象
ssh = paramiko.SSHClient()
# 自动添加主机密钥,避免手动确认
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 连接到SFTP服务器
ssh.connect(hostname=hostname, port=22, username=username, password=password)
# 打开SFTP会话
sftp = ssh.open_sftp()
return ssh,sftp
def sftp_close(ssh,sftp):
# 关闭SFTP会话和SSH连接
try:
sftp.close()
ssh.close()
except:
pass
#SFTP,扫描所有子目录
def sftp_scan_directory(hostname, username, password, remote_path):
try:
# 创建SSH客户端对象
ssh = paramiko.SSHClient()
# 自动添加主机密钥,避免手动确认
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 连接到SFTP服务器
ssh.connect(hostname=hostname, port=22, username=username, password=password)
# 打开SFTP会话
sftp = ssh.open_sftp()
# 列出远程目录的内容
#print(f"Listing files in directory: {remote_path}")
if remote_path[0]!="/":
remote_path = f"/{remote_path}"
directorys={} #总目录,用来做比较
def scan_next(remote_path):
#print("scan_next",remote_path)
if remote_path not in directorys:
directorys[remote_path]=1
try:
dirs=[]
for fileattr in sftp.listdir_attr(remote_path):
if stat.S_ISDIR(fileattr.st_mode):
#print(f"- {fileattr.filename} (Size: {fileattr.st_size} bytes, Modified: {fileattr.st_mtime})")
dirs.append(f"{remote_path}/{fileattr.filename}")
if stat.S_ISREG(fileattr.st_mode): #文件
uid = fileattr.st_uid
# 使用 SSH 命令查询 UID 对应的用户名
stdin, stdout, stderr = ssh.exec_command(f"getent passwd {uid} | cut -d: -f1")
owner_name = stdout.read().decode().strip() or str(uid)
yield f"{remote_path}/{fileattr.filename}", fileattr.filename,fileattr.st_mtime,fileattr.st_size,owner_name
for dir in dirs:
yield from scan_next(dir)
except:
pass
yield from scan_next(remote_path)
# 关闭SFTP会话和SSH连接
sftp.close()
ssh.close()
except Exception as e:
raise Exception(f"访问时发生一个错误: {e}")
#windows共享
def smb_list_directory(hostname, username, password, remote_path):
try:
import smbclient
smbclient.register_session(hostname, username, password)
data=[]
for entry in smbclient.scandir(f"\\{hostname}\{remote_path}"):
# entry.is_dir()
# entry.is_file()
# entry.path
stats = entry.stat()
#print(entry.name)
data.append({"name":entry.name,"m_time":datetime.datetime.fromtimestamp(stats.st_mtime),"size":stats.st_size})
except Exception as e:
raise Exception(f"访问时发生一个错误: {e}")
return data
def myscan(smbclient,directory):
print("scan2",directory)
# smbclient.register_session(hostname, username, password)
for entry in smbclient.scandir(f"\{directory}"):
stats = entry.stat()
if entry.is_file():
content=""
# with smbclient.open_file(f"\\{hostname}\{directory}\{entry.name}", mode="rb") as fd:
# content = fd.read()
print(entry.path)
yield entry.path,entry.name,content,stats.st_mtime,stats.st_size
if entry.is_dir():
myscan(smbclient,entry.path)
#windows共享,遍历文件
def smb_scan_directory(hostname, username, password, remote_path):
try:
import smbclient
smbclient.register_session(hostname, username, password)
if remote_path[0]=="\\":
remote_path = remote_path[1:]
directory = f"\\{hostname}\{remote_path}"
ready_dirs={}
def scan_next(directory):
if directory not in ready_dirs:
ready_dirs[directory] = 1
print("dir",directory)
dirs=[]
for entry in smbclient.scandir(directory):
# entry.is_dir()
# entry.is_file()
# entry.path
stats = entry.stat()
if entry.is_file():
yield entry.path,entry.name,stats.st_mtime,stats.st_size
if entry.is_dir():
dirs.append(entry.path)
for dir in dirs:
yield from scan_next(dir)
yield from scan_next(directory)
except Exception as e:
raise Exception(f"访问时发生一个错误: {e}")
#linux本地
def linux_list_directory(directory):
from pathlib import Path
try:
data=[]
for entry in Path(directory).iterdir():
stats = entry.stat()
data.append({"name":entry.name,"m_time":datetime.datetime.fromtimestamp(stats.st_mtime),"size":stats.st_size})
except Exception as e:
print(e)
raise Exception(f"访问时发生一个错误: {e}")
return data
#建立一个文件的cache,供下载使用
def create_doc_cache(path,cfg):
if cfg["type"]=="SFTP":
ssh,sftp = sftp_conn(cfg["address"],cfg["user"],cfg["password"])
with sftp.file(path, 'rb') as remote_file:
content = remote_file.read()
cache_path=f"/dev/shm/{time.time()}"
with open(cache_path,"wb+") as file:
file.write(content)
sftp_close(ssh,sftp)
elif cfg["type"]=="Windows共享":
import smbclient
smbclient.register_session(cfg["address"],cfg["user"],cfg["password"])
with smbclient.open_file(path, mode="rb") as fd:
content = fd.read()
cache_path=f"/dev/shm/{time.time()}"
with open(cache_path,"wb+") as file:
file.write(content)
elif cfg["type"]=="钉盘":
doc = Doc.select(Doc.meta).where( (Doc.abs_path==path) & (Doc.base==cfg["name"])).first()
meta = json.loads(doc.meta)
content = ding_get_file_content_init(cfg["address"],cfg["password"],meta["spaceId"],meta["id"])
cache_path=f"/dev/shm/{time.time()}"
with open(cache_path,"wb+") as file:
file.write(content)
elif cfg["type"]=="本地":
cache_path=path
return cache_path
from ftplib import FTP, error_perm
import time
def parse_ftp_list_line(line):
parts = line.split(maxsplit=8)
if len(parts) == 9:
permissions, links, owner, group, size, month, day, time_or_year, filename = parts
try:
# 尝试解析时间或年份部分
timestamp = time.strptime(f"{month} {day} {time_or_year}", "%b %d %H:%M")
current_year = time.localtime().tm_year
mtime = time.mktime((current_year, timestamp.tm_mon, timestamp.tm_mday,
timestamp.tm_hour, timestamp.tm_min, 0, 0, 0, -1))
# 如果是超过6个月前的文件则调整年份
six_months_ago = time.time() - 6 * 30 * 24 * 60 * 60
if mtime < six_months_ago:
mtime = time.mktime((current_year - 1, timestamp.tm_mon, timestamp.tm_mday,
timestamp.tm_hour, timestamp.tm_min, 0, 0, 0, -1))
return {
'permissions': permissions,
'links': int(links),
'owner': owner,
'group': group,
'size': int(size),
'm_time': mtime,
'name': filename
}
except ValueError:
return None
elif len(parts) == 4: #windows格式
fdate,ftime,dir_or_size,filename = parts
parsed_date = datetime.datetime.strptime(f"{fdate} {ftime}", "%m-%d-%y %I:%M%p")
# 明确指定世纪
if parsed_date.year < 100:
parsed_date = parsed_date.replace(year=parsed_date.year + 2000)
str_date = parsed_date.strftime("%Y-%m-%d %H:%M:%S")
try:
item= {
'size': int(dir_or_size),
'type': 'f',
'm_time': str_date,#转化后的字符串
"mtime": parsed_date.timestamp() ,#时间戳
'name': filename
}
except:
item= {
'size': 0,
'type': 'd',
'm_time': str_date,
"mtime": parsed_date.timestamp() ,#时间戳
'name': filename
}
return item
#一次list目录
def ftp_list_directory(ftp_host, ftp_user, ftp_password, directory):
# 连接到FTP服务器
with FTP(ftp_host) as ftp:
ftp.login(user=ftp_user, passwd=ftp_password)
print(f"Current working directory: {ftp.pwd()}")
# 改变到目标目录
try:
ftp.cwd(directory)
print(f"Changed to directory: {directory}")
except error_perm as e:
print(f"Failed to change directory: {e}")
return
# 列出当前目录下的文件和子目录
#print("Directory contents:")
#ftp.dir()
# 或者获取一个列表形式的结果
# 修改之前的代码,在遍历文件时使用此函数解析每一行
files = []
ftp.retrlines('LIST', lambda line: files.append(parse_ftp_list_line(line)))
data=[]
for entry in files:
if entry is not None:
data.append(entry)
return data
#扫描整个目录
def ftp_scan_directory(ftp_host, ftp_user, ftp_password, directory):
# 连接到FTP服务器
with FTP(ftp_host) as ftp:
ftp.login(user=ftp_user, passwd=ftp_password)
print(f"Current working directory: {ftp.pwd()}")
def scan_next(directory):
# 改变到目标目录
try:
ftp.cwd(directory)
print(f"Changed to directory: {directory}")
except error_perm as e:
print(f"Failed to change directory: {e}")
return
# 或者获取一个列表形式的结果
# 修改之前的代码,在遍历文件时使用此函数解析每一行
files = []
ftp.retrlines('LIST', lambda line: files.append(parse_ftp_list_line(line)))
dirs=[]
for entry in files:
if entry is not None:
if entry["type"]=="f":
entry["path"] = directory+"/"+entry["name"]
yield entry
elif entry["type"]=="d":
dirs.append(entry)
for dir in dirs:
yield from scan_next(directory+"/"+dir["name"])
yield from scan_next(directory)
if __name__ == "__main__":
ftp_host = "host" # 替换为你的FTP主机地址
ftp_user = "Administrator" # 替换为你的FTP用户名
ftp_password = "12" # 替换为你的FTP密码
directory = "/" # 替换为你想要列出的目录路径
for f in ftp_scan_directory(ftp_host, ftp_user, ftp_password, directory):
print(f)