403 lines
13 KiB
Python
403 lines
13 KiB
Python
|
|
"""
|
|||
|
|
使用向量库和全文检索两个来构建知识库
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
from pathlib import Path
|
|||
|
|
from tika import parser
|
|||
|
|
|
|||
|
|
from langchain.text_splitter import RecursiveCharacterTextSplitter
|
|||
|
|
|
|||
|
|
from datetime import datetime
|
|||
|
|
|
|||
|
|
from full_index_search import create_full_index,create_full_index2,delete_fs_index_by_base_path
|
|||
|
|
from k_database import Doc,BaiKe,BaiKe_Catalog
|
|||
|
|
import json
|
|||
|
|
import json5
|
|||
|
|
import time
|
|||
|
|
|
|||
|
|
#from llm_embedding2 import embeddings
|
|||
|
|
#from milvus import default_server
|
|||
|
|
#from langchain_milvus import Milvus
|
|||
|
|
|
|||
|
|
from init import gcfg
|
|||
|
|
|
|||
|
|
from ff import smb_scan_directory,sftp_scan_directory,sftp_conn,sftp_close,ftp_scan_directory
|
|||
|
|
|
|||
|
|
|
|||
|
|
def extract_text(content):
|
|||
|
|
#raw = parser.from_file(file_path)
|
|||
|
|
try:
|
|||
|
|
raw = parser.from_buffer(content)
|
|||
|
|
except:
|
|||
|
|
raw ={'metadata':{"Content-Type":""},'content':''}
|
|||
|
|
return raw["metadata"],raw['content']
|
|||
|
|
|
|||
|
|
def extract_text_path(file_path):
|
|||
|
|
try:
|
|||
|
|
raw = parser.from_file(file_path)
|
|||
|
|
except:
|
|||
|
|
raw ={'metadata':{"Content-Type":""},'content':''}
|
|||
|
|
return raw["metadata"],raw['content']
|
|||
|
|
|
|||
|
|
|
|||
|
|
|
|||
|
|
#遍历目录下的所有文件和子目录,返回文件path和meta信息
|
|||
|
|
def list_files(directory,pre="*"):
|
|||
|
|
for path in Path(directory).rglob(pre):
|
|||
|
|
if path.is_file():
|
|||
|
|
yield path,path.stat()
|
|||
|
|
|
|||
|
|
|
|||
|
|
|
|||
|
|
# 创建一个 RecursiveCharacterTextSplitter 实例
|
|||
|
|
text_splitter = RecursiveCharacterTextSplitter(
|
|||
|
|
chunk_size=gcfg["full_index"]["chunk_size"], # 设置每个文本块的目标大小
|
|||
|
|
chunk_overlap=gcfg["full_index"]["chunk_overlap"] # 设置相邻文本块之间的重叠字符数
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
|
|||
|
|
# vector_db=Milvus(
|
|||
|
|
# embeddings,
|
|||
|
|
# connection_args={"host":"127.0.0.1","port": default_server.listen_port},
|
|||
|
|
# collection_name='test',
|
|||
|
|
# auto_id=True,
|
|||
|
|
|
|||
|
|
# )
|
|||
|
|
|
|||
|
|
#忽略的文件类型
|
|||
|
|
ignore_file_type=gcfg["full_index"]["ignore_ftype"].split(",")
|
|||
|
|
|
|||
|
|
#忽略的文件大小
|
|||
|
|
max_file_size = int(gcfg["full_index"]["ignore_fsize"])*1024*1024
|
|||
|
|
|
|||
|
|
#文件的扩展名
|
|||
|
|
def get_file_ext(fname):
|
|||
|
|
return fname[fname.rfind("."):]
|
|||
|
|
|
|||
|
|
|
|||
|
|
import os
|
|||
|
|
import pwd
|
|||
|
|
|
|||
|
|
def build_linux_base(base,cfg):
|
|||
|
|
count=0
|
|||
|
|
error=0
|
|||
|
|
for path,stat in list_files(cfg["path"]):
|
|||
|
|
count +=1
|
|||
|
|
size = stat.st_size
|
|||
|
|
print(str(path))
|
|||
|
|
if get_file_ext(path.name) not in ignore_file_type and size < max_file_size and path.name[0:2] not in [".~","~$"]:
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
doc = Doc.get_or_none( (Doc.abs_path==str(path)) & (Doc.base==cfg["name"]) )
|
|||
|
|
stat_info = os.stat(str(path))
|
|||
|
|
uid = stat_info.st_uid
|
|||
|
|
username = pwd.getpwuid(uid).pw_name
|
|||
|
|
if not doc or doc.f_up_at.timestamp() < stat.st_mtime:
|
|||
|
|
meta,text = extract_text_path(str(path))
|
|||
|
|
if text==None:
|
|||
|
|
text=""
|
|||
|
|
build_one_doc(base,str(path),path.name,meta,text,size,stat.st_mtime,username)
|
|||
|
|
except Exception as e :
|
|||
|
|
print(f"{path} 构建出错:{e}")
|
|||
|
|
error +=1
|
|||
|
|
if error >0:
|
|||
|
|
raise Exception(f"本次扫描有{error}个文档错误")
|
|||
|
|
return count
|
|||
|
|
|
|||
|
|
import smbclient
|
|||
|
|
#构建smb的知识库
|
|||
|
|
def build_smb_base(base,cfg):
|
|||
|
|
|
|||
|
|
count=0
|
|||
|
|
smbclient.register_session(cfg["address"],cfg["user"],cfg["password"])
|
|||
|
|
for path,fname,mtime,size in smb_scan_directory(cfg["address"],cfg["user"],cfg["password"],cfg["path"]):
|
|||
|
|
count +=1
|
|||
|
|
print(count,path,size)
|
|||
|
|
if get_file_ext(fname) not in ignore_file_type and size < max_file_size and fname[0:2] not in [".~","~$"]:
|
|||
|
|
|
|||
|
|
doc = Doc.get_or_none( (Doc.abs_path==str(path)) & (Doc.base==cfg["name"]) )
|
|||
|
|
|
|||
|
|
if not doc or doc.f_up_at.timestamp() < mtime:
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
with smbclient.open_file(path, mode="rb") as fd:
|
|||
|
|
content = fd.read()
|
|||
|
|
except:
|
|||
|
|
#出错重连,继续
|
|||
|
|
smbclient.register_session(cfg["address"],cfg["user"],cfg["password"])
|
|||
|
|
continue
|
|||
|
|
meta,text = extract_text(content)
|
|||
|
|
if text==None:
|
|||
|
|
text=""
|
|||
|
|
build_one_doc(base,path,fname,meta,text,size,mtime)
|
|||
|
|
return count
|
|||
|
|
|
|||
|
|
|
|||
|
|
|
|||
|
|
#构建sftp的知识库
|
|||
|
|
def build_sftp_base(base,cfg):
|
|||
|
|
|
|||
|
|
count=0
|
|||
|
|
|
|||
|
|
ssh,sftp = sftp_conn(cfg["address"],cfg["user"],cfg["password"])
|
|||
|
|
|
|||
|
|
for path,fname,mtime,size,ownername in sftp_scan_directory(cfg["address"],cfg["user"],cfg["password"],cfg["path"]):
|
|||
|
|
count +=1
|
|||
|
|
print(count,path,size,ownername)
|
|||
|
|
if get_file_ext(fname) not in ignore_file_type and size < max_file_size and fname[0:2] not in [".~","~$"]:
|
|||
|
|
|
|||
|
|
doc = Doc.get_or_none( (Doc.abs_path==str(path)) & (Doc.base==cfg["name"]) )
|
|||
|
|
|
|||
|
|
if not doc or doc.f_up_at.timestamp() < mtime:
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
with sftp.file(path, 'r') as remote_file:
|
|||
|
|
content = remote_file.read()
|
|||
|
|
except:
|
|||
|
|
#出错重连,继续
|
|||
|
|
sftp_close(ssh,sftp)
|
|||
|
|
ssh,sftp = sftp_conn(cfg["address"],cfg["user"],cfg["password"])
|
|||
|
|
continue
|
|||
|
|
meta,text = extract_text(content)
|
|||
|
|
if text==None:
|
|||
|
|
text=""
|
|||
|
|
build_one_doc(base,path,fname,meta,text,size,mtime,ownername)
|
|||
|
|
|
|||
|
|
|
|||
|
|
sftp_close(ssh,sftp)
|
|||
|
|
return count
|
|||
|
|
|
|||
|
|
|
|||
|
|
from ftplib import FTP
|
|||
|
|
|
|||
|
|
from io import BytesIO
|
|||
|
|
|
|||
|
|
def get_binary_file_content(ftp, remote_path):
|
|||
|
|
"""
|
|||
|
|
获取二进制文件的内容。
|
|||
|
|
|
|||
|
|
:param ftp: 已连接并登录的 FTP 对象
|
|||
|
|
:param remote_path: 远程文件路径
|
|||
|
|
:return: 文件内容 (bytes)
|
|||
|
|
"""
|
|||
|
|
binary_data = BytesIO()
|
|||
|
|
ftp.retrbinary(f'RETR {remote_path}', binary_data.write)
|
|||
|
|
return binary_data.getvalue()
|
|||
|
|
|
|||
|
|
#构建ftp的知识库
|
|||
|
|
def build_ftp_base(base,cfg):
|
|||
|
|
|
|||
|
|
count=0
|
|||
|
|
for entry in ftp_scan_directory(cfg["address"],cfg["user"],cfg["password"],cfg["path"]):
|
|||
|
|
path = entry["path"]
|
|||
|
|
fname = entry["name"]
|
|||
|
|
size = int(entry["size"])
|
|||
|
|
mtime = entry["mtime"]
|
|||
|
|
count +=1
|
|||
|
|
print(count,path,size,max_file_size)
|
|||
|
|
if get_file_ext(fname) not in ignore_file_type and size < max_file_size and fname[0:2] not in [".~","~$"]:
|
|||
|
|
|
|||
|
|
doc = Doc.get_or_none( (Doc.abs_path==str(path)) & (Doc.base==cfg["name"]) )
|
|||
|
|
|
|||
|
|
if not doc or doc.f_up_at.timestamp() < mtime:
|
|||
|
|
|
|||
|
|
# 连接到FTP服务器
|
|||
|
|
with FTP(cfg["address"]) as ftp:
|
|||
|
|
ftp.login(user=cfg["user"], passwd=cfg["password"])
|
|||
|
|
content = get_binary_file_content(ftp,path)
|
|||
|
|
meta,text = extract_text(content)
|
|||
|
|
if text==None:
|
|||
|
|
text=""
|
|||
|
|
build_one_doc(base,path,fname,meta,text,size,mtime)
|
|||
|
|
|
|||
|
|
return count
|
|||
|
|
|
|||
|
|
|
|||
|
|
from dingpan import ding_scan_directory,ding_get_file_content
|
|||
|
|
|
|||
|
|
#构建钉盘上文件目录的知识库
|
|||
|
|
def build_ding_pan(base,cfg):
|
|||
|
|
|
|||
|
|
count=0
|
|||
|
|
for entry in ding_scan_directory(cfg["address"],cfg["password"],cfg["path"]):
|
|||
|
|
path = entry["path"]
|
|||
|
|
fname = entry["fname"]
|
|||
|
|
file_id = entry["id"]
|
|||
|
|
mtime = entry["mtime"]
|
|||
|
|
spaceId = entry["spaceId"]
|
|||
|
|
unionid = entry["unionid"]
|
|||
|
|
access_token = entry["access_token"]
|
|||
|
|
count +=1
|
|||
|
|
print(count,path,file_id)
|
|||
|
|
if get_file_ext(fname) not in ignore_file_type and fname[0:2] not in [".~","~$"]:
|
|||
|
|
|
|||
|
|
doc = Doc.get_or_none( (Doc.abs_path==str(path)) & (Doc.base==cfg["name"]) )
|
|||
|
|
|
|||
|
|
if not doc or doc.f_up_at < mtime:
|
|||
|
|
|
|||
|
|
content = ding_get_file_content(spaceId,file_id,unionid,access_token)
|
|||
|
|
if content and len(content) < max_file_size:
|
|||
|
|
meta,text = extract_text(content)
|
|||
|
|
if text:
|
|||
|
|
build_one_doc(base,path,fname,{"id":file_id,"spaceId":entry["spaceId"]},text,len(content),mtime.timestamp())
|
|||
|
|
|
|||
|
|
return count
|
|||
|
|
|
|||
|
|
#构建单个文档
|
|||
|
|
def build_one_doc(base,path,file_name,meta,text,st_size,st_mtime,ownername="admin"):
|
|||
|
|
|
|||
|
|
|
|||
|
|
|
|||
|
|
#向量数据库的处理
|
|||
|
|
#
|
|||
|
|
#分割文档,包含文件名称的meta信息
|
|||
|
|
#metas=[{"name":file_name}]*len(split_texts)
|
|||
|
|
|
|||
|
|
#documents = text_splitter.create_documents(split_texts,metas)
|
|||
|
|
|
|||
|
|
# try:
|
|||
|
|
# vector_db.add_documents(documents)
|
|||
|
|
# except:
|
|||
|
|
# pass
|
|||
|
|
|
|||
|
|
# query = Doc.insert(base=base,abs_path=path,
|
|||
|
|
# f_name=file_name,
|
|||
|
|
# f_size=st_size,
|
|||
|
|
# f_up_at = datetime.fromtimestamp(st_mtime),
|
|||
|
|
# ctype= meta["Content-Type"],
|
|||
|
|
# catalog="内部",
|
|||
|
|
# meta=json.dumps(meta)[:1000]
|
|||
|
|
# ).on_conflict(
|
|||
|
|
# conflict_target=[Doc.abs_path],
|
|||
|
|
# update={Doc.f_up_at:datetime.fromtimestamp(st_mtime)}
|
|||
|
|
# )
|
|||
|
|
#query.execute()
|
|||
|
|
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
Doc.create(base=base,abs_path=path,
|
|||
|
|
f_name=file_name,
|
|||
|
|
f_size=st_size,
|
|||
|
|
f_up_at = datetime.fromtimestamp(st_mtime),
|
|||
|
|
ctype= meta["Content-Type"],
|
|||
|
|
catalog="内部",
|
|||
|
|
meta=json.dumps(meta)[:1000],
|
|||
|
|
author = ownername,
|
|||
|
|
)
|
|||
|
|
except:
|
|||
|
|
# 更新,同时删除原有全文索引
|
|||
|
|
Doc.update({Doc.f_up_at:datetime.fromtimestamp(st_mtime),Doc.base:base,
|
|||
|
|
Doc.meta:json.dumps(meta)[:1000],Doc.author:ownername,
|
|||
|
|
}).where(Doc.abs_path==path).execute()
|
|||
|
|
delete_fs_index_by_base_path(base,path)
|
|||
|
|
|
|||
|
|
#内容分片
|
|||
|
|
split_texts = text_splitter.split_text(text)
|
|||
|
|
print(path,len(text),len(split_texts))
|
|||
|
|
split_texts = list(map(lambda x: f"文件: {path}\n"+x,split_texts))
|
|||
|
|
|
|||
|
|
create_full_index(split_texts,file_name,path,st_mtime,base)
|
|||
|
|
|
|||
|
|
|
|||
|
|
#print(file,len(text),len(documents))
|
|||
|
|
|
|||
|
|
#百科分类的统计
|
|||
|
|
def baike_catalog_total():
|
|||
|
|
from peewee import fn
|
|||
|
|
query = (BaiKe.
|
|||
|
|
select( BaiKe.catalog,fn.COUNT(BaiKe.id).alias('count') )
|
|||
|
|
.group_by(BaiKe.catalog)
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
for record in query:
|
|||
|
|
BaiKe_Catalog.update(doc_count= record.count).where(BaiKe_Catalog.catalog == record.catalog).execute()
|
|||
|
|
|
|||
|
|
# 清理过期session
|
|||
|
|
def clear_session_id():
|
|||
|
|
import glob
|
|||
|
|
import os
|
|||
|
|
session_ids = glob.glob(f"/tmp/k3gpt/session_id_*", recursive=True)
|
|||
|
|
for file in session_ids:
|
|||
|
|
try:
|
|||
|
|
with open(file,"r") as f:
|
|||
|
|
s_timestamp = f.read()
|
|||
|
|
if int(time.time())-int(s_timestamp) > 3600*12:
|
|||
|
|
os.remove(file)
|
|||
|
|
except Exception as e:
|
|||
|
|
print (e)
|
|||
|
|
|
|||
|
|
|
|||
|
|
|
|||
|
|
if __name__=="__main__":
|
|||
|
|
|
|||
|
|
print("开始知识库构建工作")
|
|||
|
|
while True:
|
|||
|
|
try:
|
|||
|
|
with open(f"{gcfg['fs']['path']}/conf/source.conf") as conf:
|
|||
|
|
source = json5.load(conf)
|
|||
|
|
except:
|
|||
|
|
source={"测试":{"name":"测试","type":"FTP","address":"192.168.0.1:21","user":"admin","password":"1","path":"home","scan":1,"count":1024}}
|
|||
|
|
|
|||
|
|
bases=list(source.values())
|
|||
|
|
for base in bases:
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
now = int(time.time())
|
|||
|
|
#扫描周期
|
|||
|
|
scant = int(float(source[base["name"]]["scan"])*3600)
|
|||
|
|
|
|||
|
|
#扫描时间的判断,不到时间
|
|||
|
|
if "scan_date" in base and now < source[base["name"]]["scan_date"] + scant:
|
|||
|
|
print("扫描时间未到",base["name"],datetime.now())
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
print("扫描开始",base["name"],datetime.now())
|
|||
|
|
|
|||
|
|
count=0
|
|||
|
|
|
|||
|
|
if base["type"]=="SFTP":
|
|||
|
|
count = build_sftp_base(base["name"],base)
|
|||
|
|
|
|||
|
|
elif base["type"]=="本地":
|
|||
|
|
count = build_linux_base(base["name"],base)
|
|||
|
|
elif base["type"]=="Windows共享":
|
|||
|
|
count= build_smb_base(base["name"],base)
|
|||
|
|
elif base["type"]=="FTP":
|
|||
|
|
count= build_ftp_base(base["name"],base)
|
|||
|
|
elif base["type"]=="钉盘":
|
|||
|
|
count= build_ding_pan(base["name"],base)
|
|||
|
|
|
|||
|
|
source[base["name"]]["count"] = count
|
|||
|
|
source[base["name"]]["scan_date"] = int(time.time())
|
|||
|
|
print("扫描结束",base["name"],datetime.now())
|
|||
|
|
except Exception as e :
|
|||
|
|
print(e)
|
|||
|
|
|
|||
|
|
#更新
|
|||
|
|
|
|||
|
|
#end for
|
|||
|
|
|
|||
|
|
#写回
|
|||
|
|
#最新的配置
|
|||
|
|
with open(f"{gcfg['fs']['path']}/conf/source.conf") as conf:
|
|||
|
|
source_new = json5.load(conf)
|
|||
|
|
|
|||
|
|
#更新文件数和扫描时间
|
|||
|
|
for k,v in source.items():
|
|||
|
|
if "count" in v : source_new[k]["count"] = v["count"]
|
|||
|
|
if "scan_date" in v : source_new[k]["scan_date"] = v["scan_date"]
|
|||
|
|
|
|||
|
|
with open(f"{gcfg['fs']['path']}/conf/source.conf","w+") as conf:
|
|||
|
|
json5.dump(source_new,conf,ensure_ascii=False)
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
baike_catalog_total()
|
|||
|
|
except Exception as e:
|
|||
|
|
print("百科分类统计",e)
|
|||
|
|
|
|||
|
|
clear_session_id()
|
|||
|
|
#休息30秒
|
|||
|
|
time.sleep(30)
|
|||
|
|
|
|||
|
|
|