Files
k3GPT/main/ff.py

409 lines
13 KiB
Python
Raw Permalink Normal View History

2025-11-19 19:42:53 +08:00
import datetime
import paramiko
import time
import stat
import json
from dingpan import ding_list_top_50_files,ding_get_file_content_init
from k_database import Doc
#SFTP
def sftp_list_directory(hostname, port, username, password, remote_path):
try:
# 创建SSH客户端对象
ssh = paramiko.SSHClient()
# 自动添加主机密钥,避免手动确认
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 连接到SFTP服务器
ssh.connect(hostname=hostname, port=port, username=username, password=password)
# 打开SFTP会话
sftp = ssh.open_sftp()
# 列出远程目录的内容
#print(f"Listing files in directory: {remote_path}")
if remote_path[0]!="/":
remote_path = f"/{remote_path}"
data=[]
for fileattr in sftp.listdir_attr(remote_path):
#print(f"- {fileattr.filename} (Size: {fileattr.st_size} bytes, Modified: {fileattr.st_mtime})")
data.append({"name":fileattr.filename,"m_time":datetime.datetime.fromtimestamp(fileattr.st_mtime),"size":fileattr.st_size})
# 关闭SFTP会话和SSH连接
sftp.close()
ssh.close()
except Exception as e:
raise Exception(f"访问时发生一个错误: {e}")
return data
#钉盘测试
def ding_list_directory(app_key,app_secret,main_dir):
try:
data=[]
for file in ding_list_top_50_files(app_key,app_secret,main_dir):
#print(f"- {fileattr.filename} (Size: {fileattr.st_size} bytes, Modified: {fileattr.st_mtime})")
data.append({"name":file["fname"],"m_time":file["mtime"],"size":file["path"]})
except Exception as e:
raise Exception(f"访问时发生一个错误: {e}")
return data
def sftp_conn(hostname, username, password):
# 创建SSH客户端对象
ssh = paramiko.SSHClient()
# 自动添加主机密钥,避免手动确认
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 连接到SFTP服务器
ssh.connect(hostname=hostname, port=22, username=username, password=password)
# 打开SFTP会话
sftp = ssh.open_sftp()
return ssh,sftp
def sftp_close(ssh,sftp):
# 关闭SFTP会话和SSH连接
try:
sftp.close()
ssh.close()
except:
pass
#SFTP,扫描所有子目录
def sftp_scan_directory(hostname, username, password, remote_path):
try:
# 创建SSH客户端对象
ssh = paramiko.SSHClient()
# 自动添加主机密钥,避免手动确认
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 连接到SFTP服务器
ssh.connect(hostname=hostname, port=22, username=username, password=password)
# 打开SFTP会话
sftp = ssh.open_sftp()
# 列出远程目录的内容
#print(f"Listing files in directory: {remote_path}")
if remote_path[0]!="/":
remote_path = f"/{remote_path}"
directorys={} #总目录,用来做比较
def scan_next(remote_path):
#print("scan_next",remote_path)
if remote_path not in directorys:
directorys[remote_path]=1
try:
dirs=[]
for fileattr in sftp.listdir_attr(remote_path):
if stat.S_ISDIR(fileattr.st_mode):
#print(f"- {fileattr.filename} (Size: {fileattr.st_size} bytes, Modified: {fileattr.st_mtime})")
dirs.append(f"{remote_path}/{fileattr.filename}")
if stat.S_ISREG(fileattr.st_mode): #文件
uid = fileattr.st_uid
# 使用 SSH 命令查询 UID 对应的用户名
stdin, stdout, stderr = ssh.exec_command(f"getent passwd {uid} | cut -d: -f1")
owner_name = stdout.read().decode().strip() or str(uid)
yield f"{remote_path}/{fileattr.filename}", fileattr.filename,fileattr.st_mtime,fileattr.st_size,owner_name
for dir in dirs:
yield from scan_next(dir)
except:
pass
yield from scan_next(remote_path)
# 关闭SFTP会话和SSH连接
sftp.close()
ssh.close()
except Exception as e:
raise Exception(f"访问时发生一个错误: {e}")
#windows共享
def smb_list_directory(hostname, username, password, remote_path):
try:
import smbclient
smbclient.register_session(hostname, username, password)
data=[]
for entry in smbclient.scandir(f"\\{hostname}\{remote_path}"):
# entry.is_dir()
# entry.is_file()
# entry.path
stats = entry.stat()
#print(entry.name)
data.append({"name":entry.name,"m_time":datetime.datetime.fromtimestamp(stats.st_mtime),"size":stats.st_size})
except Exception as e:
raise Exception(f"访问时发生一个错误: {e}")
return data
def myscan(smbclient,directory):
print("scan2",directory)
# smbclient.register_session(hostname, username, password)
for entry in smbclient.scandir(f"\{directory}"):
stats = entry.stat()
if entry.is_file():
content=""
# with smbclient.open_file(f"\\{hostname}\{directory}\{entry.name}", mode="rb") as fd:
# content = fd.read()
print(entry.path)
yield entry.path,entry.name,content,stats.st_mtime,stats.st_size
if entry.is_dir():
myscan(smbclient,entry.path)
#windows共享,遍历文件
def smb_scan_directory(hostname, username, password, remote_path):
try:
import smbclient
smbclient.register_session(hostname, username, password)
if remote_path[0]=="\\":
remote_path = remote_path[1:]
directory = f"\\{hostname}\{remote_path}"
ready_dirs={}
def scan_next(directory):
if directory not in ready_dirs:
ready_dirs[directory] = 1
print("dir",directory)
dirs=[]
for entry in smbclient.scandir(directory):
# entry.is_dir()
# entry.is_file()
# entry.path
stats = entry.stat()
if entry.is_file():
yield entry.path,entry.name,stats.st_mtime,stats.st_size
if entry.is_dir():
dirs.append(entry.path)
for dir in dirs:
yield from scan_next(dir)
yield from scan_next(directory)
except Exception as e:
raise Exception(f"访问时发生一个错误: {e}")
#linux本地
def linux_list_directory(directory):
from pathlib import Path
try:
data=[]
for entry in Path(directory).iterdir():
stats = entry.stat()
data.append({"name":entry.name,"m_time":datetime.datetime.fromtimestamp(stats.st_mtime),"size":stats.st_size})
except Exception as e:
print(e)
raise Exception(f"访问时发生一个错误: {e}")
return data
#建立一个文件的cache,供下载使用
def create_doc_cache(path,cfg):
if cfg["type"]=="SFTP":
ssh,sftp = sftp_conn(cfg["address"],cfg["user"],cfg["password"])
with sftp.file(path, 'rb') as remote_file:
content = remote_file.read()
cache_path=f"/dev/shm/{time.time()}"
with open(cache_path,"wb+") as file:
file.write(content)
sftp_close(ssh,sftp)
elif cfg["type"]=="Windows共享":
import smbclient
smbclient.register_session(cfg["address"],cfg["user"],cfg["password"])
with smbclient.open_file(path, mode="rb") as fd:
content = fd.read()
cache_path=f"/dev/shm/{time.time()}"
with open(cache_path,"wb+") as file:
file.write(content)
elif cfg["type"]=="钉盘":
doc = Doc.select(Doc.meta).where( (Doc.abs_path==path) & (Doc.base==cfg["name"])).first()
meta = json.loads(doc.meta)
content = ding_get_file_content_init(cfg["address"],cfg["password"],meta["spaceId"],meta["id"])
cache_path=f"/dev/shm/{time.time()}"
with open(cache_path,"wb+") as file:
file.write(content)
elif cfg["type"]=="本地":
cache_path=path
return cache_path
from ftplib import FTP, error_perm
import time
def parse_ftp_list_line(line):
parts = line.split(maxsplit=8)
if len(parts) == 9:
permissions, links, owner, group, size, month, day, time_or_year, filename = parts
try:
# 尝试解析时间或年份部分
timestamp = time.strptime(f"{month} {day} {time_or_year}", "%b %d %H:%M")
current_year = time.localtime().tm_year
mtime = time.mktime((current_year, timestamp.tm_mon, timestamp.tm_mday,
timestamp.tm_hour, timestamp.tm_min, 0, 0, 0, -1))
# 如果是超过6个月前的文件则调整年份
six_months_ago = time.time() - 6 * 30 * 24 * 60 * 60
if mtime < six_months_ago:
mtime = time.mktime((current_year - 1, timestamp.tm_mon, timestamp.tm_mday,
timestamp.tm_hour, timestamp.tm_min, 0, 0, 0, -1))
return {
'permissions': permissions,
'links': int(links),
'owner': owner,
'group': group,
'size': int(size),
'm_time': mtime,
'name': filename
}
except ValueError:
return None
elif len(parts) == 4: #windows格式
fdate,ftime,dir_or_size,filename = parts
parsed_date = datetime.datetime.strptime(f"{fdate} {ftime}", "%m-%d-%y %I:%M%p")
# 明确指定世纪
if parsed_date.year < 100:
parsed_date = parsed_date.replace(year=parsed_date.year + 2000)
str_date = parsed_date.strftime("%Y-%m-%d %H:%M:%S")
try:
item= {
'size': int(dir_or_size),
'type': 'f',
'm_time': str_date,#转化后的字符串
"mtime": parsed_date.timestamp() ,#时间戳
'name': filename
}
except:
item= {
'size': 0,
'type': 'd',
'm_time': str_date,
"mtime": parsed_date.timestamp() ,#时间戳
'name': filename
}
return item
#一次list目录
def ftp_list_directory(ftp_host, ftp_user, ftp_password, directory):
# 连接到FTP服务器
with FTP(ftp_host) as ftp:
ftp.login(user=ftp_user, passwd=ftp_password)
print(f"Current working directory: {ftp.pwd()}")
# 改变到目标目录
try:
ftp.cwd(directory)
print(f"Changed to directory: {directory}")
except error_perm as e:
print(f"Failed to change directory: {e}")
return
# 列出当前目录下的文件和子目录
#print("Directory contents:")
#ftp.dir()
# 或者获取一个列表形式的结果
# 修改之前的代码,在遍历文件时使用此函数解析每一行
files = []
ftp.retrlines('LIST', lambda line: files.append(parse_ftp_list_line(line)))
data=[]
for entry in files:
if entry is not None:
data.append(entry)
return data
#扫描整个目录
def ftp_scan_directory(ftp_host, ftp_user, ftp_password, directory):
# 连接到FTP服务器
with FTP(ftp_host) as ftp:
ftp.login(user=ftp_user, passwd=ftp_password)
print(f"Current working directory: {ftp.pwd()}")
def scan_next(directory):
# 改变到目标目录
try:
ftp.cwd(directory)
print(f"Changed to directory: {directory}")
except error_perm as e:
print(f"Failed to change directory: {e}")
return
# 或者获取一个列表形式的结果
# 修改之前的代码,在遍历文件时使用此函数解析每一行
files = []
ftp.retrlines('LIST', lambda line: files.append(parse_ftp_list_line(line)))
dirs=[]
for entry in files:
if entry is not None:
if entry["type"]=="f":
entry["path"] = directory+"/"+entry["name"]
yield entry
elif entry["type"]=="d":
dirs.append(entry)
for dir in dirs:
yield from scan_next(directory+"/"+dir["name"])
yield from scan_next(directory)
if __name__ == "__main__":
ftp_host = "host" # 替换为你的FTP主机地址
ftp_user = "Administrator" # 替换为你的FTP用户名
ftp_password = "12" # 替换为你的FTP密码
directory = "/" # 替换为你想要列出的目录路径
for f in ftp_scan_directory(ftp_host, ftp_user, ftp_password, directory):
print(f)