Files
k3GPT/main/build_full_index.py
2025-11-19 19:42:45 +08:00

403 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
使用向量库和全文检索两个来构建知识库
"""
from pathlib import Path
from tika import parser
from langchain.text_splitter import RecursiveCharacterTextSplitter
from datetime import datetime
from full_index_search import create_full_index,create_full_index2,delete_fs_index_by_base_path
from k_database import Doc,BaiKe,BaiKe_Catalog
import json
import json5
import time
#from llm_embedding2 import embeddings
#from milvus import default_server
#from langchain_milvus import Milvus
from init import gcfg
from ff import smb_scan_directory,sftp_scan_directory,sftp_conn,sftp_close,ftp_scan_directory
def extract_text(content):
#raw = parser.from_file(file_path)
try:
raw = parser.from_buffer(content)
except:
raw ={'metadata':{"Content-Type":""},'content':''}
return raw["metadata"],raw['content']
def extract_text_path(file_path):
try:
raw = parser.from_file(file_path)
except:
raw ={'metadata':{"Content-Type":""},'content':''}
return raw["metadata"],raw['content']
#遍历目录下的所有文件和子目录,返回文件path和meta信息
def list_files(directory,pre="*"):
for path in Path(directory).rglob(pre):
if path.is_file():
yield path,path.stat()
# 创建一个 RecursiveCharacterTextSplitter 实例
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=gcfg["full_index"]["chunk_size"], # 设置每个文本块的目标大小
chunk_overlap=gcfg["full_index"]["chunk_overlap"] # 设置相邻文本块之间的重叠字符数
)
# vector_db=Milvus(
# embeddings,
# connection_args={"host":"127.0.0.1","port": default_server.listen_port},
# collection_name='test',
# auto_id=True,
# )
#忽略的文件类型
ignore_file_type=gcfg["full_index"]["ignore_ftype"].split(",")
#忽略的文件大小
max_file_size = int(gcfg["full_index"]["ignore_fsize"])*1024*1024
#文件的扩展名
def get_file_ext(fname):
return fname[fname.rfind("."):]
import os
import pwd
def build_linux_base(base,cfg):
count=0
error=0
for path,stat in list_files(cfg["path"]):
count +=1
size = stat.st_size
print(str(path))
if get_file_ext(path.name) not in ignore_file_type and size < max_file_size and path.name[0:2] not in [".~","~$"]:
try:
doc = Doc.get_or_none( (Doc.abs_path==str(path)) & (Doc.base==cfg["name"]) )
stat_info = os.stat(str(path))
uid = stat_info.st_uid
username = pwd.getpwuid(uid).pw_name
if not doc or doc.f_up_at.timestamp() < stat.st_mtime:
meta,text = extract_text_path(str(path))
if text==None:
text=""
build_one_doc(base,str(path),path.name,meta,text,size,stat.st_mtime,username)
except Exception as e :
print(f"{path} 构建出错:{e}")
error +=1
if error >0:
raise Exception(f"本次扫描有{error}个文档错误")
return count
import smbclient
#构建smb的知识库
def build_smb_base(base,cfg):
count=0
smbclient.register_session(cfg["address"],cfg["user"],cfg["password"])
for path,fname,mtime,size in smb_scan_directory(cfg["address"],cfg["user"],cfg["password"],cfg["path"]):
count +=1
print(count,path,size)
if get_file_ext(fname) not in ignore_file_type and size < max_file_size and fname[0:2] not in [".~","~$"]:
doc = Doc.get_or_none( (Doc.abs_path==str(path)) & (Doc.base==cfg["name"]) )
if not doc or doc.f_up_at.timestamp() < mtime:
try:
with smbclient.open_file(path, mode="rb") as fd:
content = fd.read()
except:
#出错重连,继续
smbclient.register_session(cfg["address"],cfg["user"],cfg["password"])
continue
meta,text = extract_text(content)
if text==None:
text=""
build_one_doc(base,path,fname,meta,text,size,mtime)
return count
#构建sftp的知识库
def build_sftp_base(base,cfg):
count=0
ssh,sftp = sftp_conn(cfg["address"],cfg["user"],cfg["password"])
for path,fname,mtime,size,ownername in sftp_scan_directory(cfg["address"],cfg["user"],cfg["password"],cfg["path"]):
count +=1
print(count,path,size,ownername)
if get_file_ext(fname) not in ignore_file_type and size < max_file_size and fname[0:2] not in [".~","~$"]:
doc = Doc.get_or_none( (Doc.abs_path==str(path)) & (Doc.base==cfg["name"]) )
if not doc or doc.f_up_at.timestamp() < mtime:
try:
with sftp.file(path, 'r') as remote_file:
content = remote_file.read()
except:
#出错重连,继续
sftp_close(ssh,sftp)
ssh,sftp = sftp_conn(cfg["address"],cfg["user"],cfg["password"])
continue
meta,text = extract_text(content)
if text==None:
text=""
build_one_doc(base,path,fname,meta,text,size,mtime,ownername)
sftp_close(ssh,sftp)
return count
from ftplib import FTP
from io import BytesIO
def get_binary_file_content(ftp, remote_path):
"""
获取二进制文件的内容。
:param ftp: 已连接并登录的 FTP 对象
:param remote_path: 远程文件路径
:return: 文件内容 (bytes)
"""
binary_data = BytesIO()
ftp.retrbinary(f'RETR {remote_path}', binary_data.write)
return binary_data.getvalue()
#构建ftp的知识库
def build_ftp_base(base,cfg):
count=0
for entry in ftp_scan_directory(cfg["address"],cfg["user"],cfg["password"],cfg["path"]):
path = entry["path"]
fname = entry["name"]
size = int(entry["size"])
mtime = entry["mtime"]
count +=1
print(count,path,size,max_file_size)
if get_file_ext(fname) not in ignore_file_type and size < max_file_size and fname[0:2] not in [".~","~$"]:
doc = Doc.get_or_none( (Doc.abs_path==str(path)) & (Doc.base==cfg["name"]) )
if not doc or doc.f_up_at.timestamp() < mtime:
# 连接到FTP服务器
with FTP(cfg["address"]) as ftp:
ftp.login(user=cfg["user"], passwd=cfg["password"])
content = get_binary_file_content(ftp,path)
meta,text = extract_text(content)
if text==None:
text=""
build_one_doc(base,path,fname,meta,text,size,mtime)
return count
from dingpan import ding_scan_directory,ding_get_file_content
#构建钉盘上文件目录的知识库
def build_ding_pan(base,cfg):
count=0
for entry in ding_scan_directory(cfg["address"],cfg["password"],cfg["path"]):
path = entry["path"]
fname = entry["fname"]
file_id = entry["id"]
mtime = entry["mtime"]
spaceId = entry["spaceId"]
unionid = entry["unionid"]
access_token = entry["access_token"]
count +=1
print(count,path,file_id)
if get_file_ext(fname) not in ignore_file_type and fname[0:2] not in [".~","~$"]:
doc = Doc.get_or_none( (Doc.abs_path==str(path)) & (Doc.base==cfg["name"]) )
if not doc or doc.f_up_at < mtime:
content = ding_get_file_content(spaceId,file_id,unionid,access_token)
if content and len(content) < max_file_size:
meta,text = extract_text(content)
if text:
build_one_doc(base,path,fname,{"id":file_id,"spaceId":entry["spaceId"]},text,len(content),mtime.timestamp())
return count
#构建单个文档
def build_one_doc(base,path,file_name,meta,text,st_size,st_mtime,ownername="admin"):
#向量数据库的处理
#
#分割文档包含文件名称的meta信息
#metas=[{"name":file_name}]*len(split_texts)
#documents = text_splitter.create_documents(split_texts,metas)
# try:
# vector_db.add_documents(documents)
# except:
# pass
# query = Doc.insert(base=base,abs_path=path,
# f_name=file_name,
# f_size=st_size,
# f_up_at = datetime.fromtimestamp(st_mtime),
# ctype= meta["Content-Type"],
# catalog="内部",
# meta=json.dumps(meta)[:1000]
# ).on_conflict(
# conflict_target=[Doc.abs_path],
# update={Doc.f_up_at:datetime.fromtimestamp(st_mtime)}
# )
#query.execute()
try:
Doc.create(base=base,abs_path=path,
f_name=file_name,
f_size=st_size,
f_up_at = datetime.fromtimestamp(st_mtime),
ctype= meta["Content-Type"],
catalog="内部",
meta=json.dumps(meta)[:1000],
author = ownername,
)
except:
# 更新,同时删除原有全文索引
Doc.update({Doc.f_up_at:datetime.fromtimestamp(st_mtime),Doc.base:base,
Doc.meta:json.dumps(meta)[:1000],Doc.author:ownername,
}).where(Doc.abs_path==path).execute()
delete_fs_index_by_base_path(base,path)
#内容分片
split_texts = text_splitter.split_text(text)
print(path,len(text),len(split_texts))
split_texts = list(map(lambda x: f"文件: {path}\n"+x,split_texts))
create_full_index(split_texts,file_name,path,st_mtime,base)
#print(file,len(text),len(documents))
#百科分类的统计
def baike_catalog_total():
from peewee import fn
query = (BaiKe.
select( BaiKe.catalog,fn.COUNT(BaiKe.id).alias('count') )
.group_by(BaiKe.catalog)
)
for record in query:
BaiKe_Catalog.update(doc_count= record.count).where(BaiKe_Catalog.catalog == record.catalog).execute()
# 清理过期session
def clear_session_id():
import glob
import os
session_ids = glob.glob(f"/tmp/k3gpt/session_id_*", recursive=True)
for file in session_ids:
try:
with open(file,"r") as f:
s_timestamp = f.read()
if int(time.time())-int(s_timestamp) > 3600*12:
os.remove(file)
except Exception as e:
print (e)
if __name__=="__main__":
print("开始知识库构建工作")
while True:
try:
with open(f"{gcfg['fs']['path']}/conf/source.conf") as conf:
source = json5.load(conf)
except:
source={"测试":{"name":"测试","type":"FTP","address":"192.168.0.1:21","user":"admin","password":"1","path":"home","scan":1,"count":1024}}
bases=list(source.values())
for base in bases:
try:
now = int(time.time())
#扫描周期
scant = int(float(source[base["name"]]["scan"])*3600)
#扫描时间的判断,不到时间
if "scan_date" in base and now < source[base["name"]]["scan_date"] + scant:
print("扫描时间未到",base["name"],datetime.now())
continue
print("扫描开始",base["name"],datetime.now())
count=0
if base["type"]=="SFTP":
count = build_sftp_base(base["name"],base)
elif base["type"]=="本地":
count = build_linux_base(base["name"],base)
elif base["type"]=="Windows共享":
count= build_smb_base(base["name"],base)
elif base["type"]=="FTP":
count= build_ftp_base(base["name"],base)
elif base["type"]=="钉盘":
count= build_ding_pan(base["name"],base)
source[base["name"]]["count"] = count
source[base["name"]]["scan_date"] = int(time.time())
print("扫描结束",base["name"],datetime.now())
except Exception as e :
print(e)
#更新
#end for
#写回
#最新的配置
with open(f"{gcfg['fs']['path']}/conf/source.conf") as conf:
source_new = json5.load(conf)
#更新文件数和扫描时间
for k,v in source.items():
if "count" in v : source_new[k]["count"] = v["count"]
if "scan_date" in v : source_new[k]["scan_date"] = v["scan_date"]
with open(f"{gcfg['fs']['path']}/conf/source.conf","w+") as conf:
json5.dump(source_new,conf,ensure_ascii=False)
try:
baike_catalog_total()
except Exception as e:
print("百科分类统计",e)
clear_session_id()
#休息30秒
time.sleep(30)