From 0fa984201fd4168ce4136d22808bbf8e2c4591b0 Mon Sep 17 00:00:00 2001 From: 13315423919 <13315423919@qq.com> Date: Wed, 19 Nov 2025 19:42:45 +0800 Subject: [PATCH] Add File --- main/build_full_index.py | 402 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 402 insertions(+) create mode 100644 main/build_full_index.py diff --git a/main/build_full_index.py b/main/build_full_index.py new file mode 100644 index 0000000..529750a --- /dev/null +++ b/main/build_full_index.py @@ -0,0 +1,402 @@ +""" +使用向量库和全文检索两个来构建知识库 +""" + +from pathlib import Path +from tika import parser + +from langchain.text_splitter import RecursiveCharacterTextSplitter + +from datetime import datetime + +from full_index_search import create_full_index,create_full_index2,delete_fs_index_by_base_path +from k_database import Doc,BaiKe,BaiKe_Catalog +import json +import json5 +import time + +#from llm_embedding2 import embeddings +#from milvus import default_server +#from langchain_milvus import Milvus + +from init import gcfg + +from ff import smb_scan_directory,sftp_scan_directory,sftp_conn,sftp_close,ftp_scan_directory + + +def extract_text(content): + #raw = parser.from_file(file_path) + try: + raw = parser.from_buffer(content) + except: + raw ={'metadata':{"Content-Type":""},'content':''} + return raw["metadata"],raw['content'] + +def extract_text_path(file_path): + try: + raw = parser.from_file(file_path) + except: + raw ={'metadata':{"Content-Type":""},'content':''} + return raw["metadata"],raw['content'] + + + +#遍历目录下的所有文件和子目录,返回文件path和meta信息 +def list_files(directory,pre="*"): + for path in Path(directory).rglob(pre): + if path.is_file(): + yield path,path.stat() + + + +# 创建一个 RecursiveCharacterTextSplitter 实例 +text_splitter = RecursiveCharacterTextSplitter( + chunk_size=gcfg["full_index"]["chunk_size"], # 设置每个文本块的目标大小 + chunk_overlap=gcfg["full_index"]["chunk_overlap"] # 设置相邻文本块之间的重叠字符数 +) + + +# vector_db=Milvus( +# embeddings, +# connection_args={"host":"127.0.0.1","port": default_server.listen_port}, +# collection_name='test', +# auto_id=True, + +# ) + +#忽略的文件类型 +ignore_file_type=gcfg["full_index"]["ignore_ftype"].split(",") + +#忽略的文件大小 +max_file_size = int(gcfg["full_index"]["ignore_fsize"])*1024*1024 + +#文件的扩展名 +def get_file_ext(fname): + return fname[fname.rfind("."):] + + +import os +import pwd + +def build_linux_base(base,cfg): + count=0 + error=0 + for path,stat in list_files(cfg["path"]): + count +=1 + size = stat.st_size + print(str(path)) + if get_file_ext(path.name) not in ignore_file_type and size < max_file_size and path.name[0:2] not in [".~","~$"]: + + try: + doc = Doc.get_or_none( (Doc.abs_path==str(path)) & (Doc.base==cfg["name"]) ) + stat_info = os.stat(str(path)) + uid = stat_info.st_uid + username = pwd.getpwuid(uid).pw_name + if not doc or doc.f_up_at.timestamp() < stat.st_mtime: + meta,text = extract_text_path(str(path)) + if text==None: + text="" + build_one_doc(base,str(path),path.name,meta,text,size,stat.st_mtime,username) + except Exception as e : + print(f"{path} 构建出错:{e}") + error +=1 + if error >0: + raise Exception(f"本次扫描有{error}个文档错误") + return count + +import smbclient +#构建smb的知识库 +def build_smb_base(base,cfg): + + count=0 + smbclient.register_session(cfg["address"],cfg["user"],cfg["password"]) + for path,fname,mtime,size in smb_scan_directory(cfg["address"],cfg["user"],cfg["password"],cfg["path"]): + count +=1 + print(count,path,size) + if get_file_ext(fname) not in ignore_file_type and size < max_file_size and fname[0:2] not in [".~","~$"]: + + doc = Doc.get_or_none( (Doc.abs_path==str(path)) & (Doc.base==cfg["name"]) ) + + if not doc or doc.f_up_at.timestamp() < mtime: + + try: + with smbclient.open_file(path, mode="rb") as fd: + content = fd.read() + except: + #出错重连,继续 + smbclient.register_session(cfg["address"],cfg["user"],cfg["password"]) + continue + meta,text = extract_text(content) + if text==None: + text="" + build_one_doc(base,path,fname,meta,text,size,mtime) + return count + + + +#构建sftp的知识库 +def build_sftp_base(base,cfg): + + count=0 + + ssh,sftp = sftp_conn(cfg["address"],cfg["user"],cfg["password"]) + + for path,fname,mtime,size,ownername in sftp_scan_directory(cfg["address"],cfg["user"],cfg["password"],cfg["path"]): + count +=1 + print(count,path,size,ownername) + if get_file_ext(fname) not in ignore_file_type and size < max_file_size and fname[0:2] not in [".~","~$"]: + + doc = Doc.get_or_none( (Doc.abs_path==str(path)) & (Doc.base==cfg["name"]) ) + + if not doc or doc.f_up_at.timestamp() < mtime: + + try: + with sftp.file(path, 'r') as remote_file: + content = remote_file.read() + except: + #出错重连,继续 + sftp_close(ssh,sftp) + ssh,sftp = sftp_conn(cfg["address"],cfg["user"],cfg["password"]) + continue + meta,text = extract_text(content) + if text==None: + text="" + build_one_doc(base,path,fname,meta,text,size,mtime,ownername) + + + sftp_close(ssh,sftp) + return count + + +from ftplib import FTP + +from io import BytesIO + +def get_binary_file_content(ftp, remote_path): + """ + 获取二进制文件的内容。 + + :param ftp: 已连接并登录的 FTP 对象 + :param remote_path: 远程文件路径 + :return: 文件内容 (bytes) + """ + binary_data = BytesIO() + ftp.retrbinary(f'RETR {remote_path}', binary_data.write) + return binary_data.getvalue() + +#构建ftp的知识库 +def build_ftp_base(base,cfg): + + count=0 + for entry in ftp_scan_directory(cfg["address"],cfg["user"],cfg["password"],cfg["path"]): + path = entry["path"] + fname = entry["name"] + size = int(entry["size"]) + mtime = entry["mtime"] + count +=1 + print(count,path,size,max_file_size) + if get_file_ext(fname) not in ignore_file_type and size < max_file_size and fname[0:2] not in [".~","~$"]: + + doc = Doc.get_or_none( (Doc.abs_path==str(path)) & (Doc.base==cfg["name"]) ) + + if not doc or doc.f_up_at.timestamp() < mtime: + + # 连接到FTP服务器 + with FTP(cfg["address"]) as ftp: + ftp.login(user=cfg["user"], passwd=cfg["password"]) + content = get_binary_file_content(ftp,path) + meta,text = extract_text(content) + if text==None: + text="" + build_one_doc(base,path,fname,meta,text,size,mtime) + + return count + + +from dingpan import ding_scan_directory,ding_get_file_content + +#构建钉盘上文件目录的知识库 +def build_ding_pan(base,cfg): + + count=0 + for entry in ding_scan_directory(cfg["address"],cfg["password"],cfg["path"]): + path = entry["path"] + fname = entry["fname"] + file_id = entry["id"] + mtime = entry["mtime"] + spaceId = entry["spaceId"] + unionid = entry["unionid"] + access_token = entry["access_token"] + count +=1 + print(count,path,file_id) + if get_file_ext(fname) not in ignore_file_type and fname[0:2] not in [".~","~$"]: + + doc = Doc.get_or_none( (Doc.abs_path==str(path)) & (Doc.base==cfg["name"]) ) + + if not doc or doc.f_up_at < mtime: + + content = ding_get_file_content(spaceId,file_id,unionid,access_token) + if content and len(content) < max_file_size: + meta,text = extract_text(content) + if text: + build_one_doc(base,path,fname,{"id":file_id,"spaceId":entry["spaceId"]},text,len(content),mtime.timestamp()) + + return count + +#构建单个文档 +def build_one_doc(base,path,file_name,meta,text,st_size,st_mtime,ownername="admin"): + + + + #向量数据库的处理 + # + #分割文档,包含文件名称的meta信息 + #metas=[{"name":file_name}]*len(split_texts) + + #documents = text_splitter.create_documents(split_texts,metas) + + # try: + # vector_db.add_documents(documents) + # except: + # pass + + # query = Doc.insert(base=base,abs_path=path, + # f_name=file_name, + # f_size=st_size, + # f_up_at = datetime.fromtimestamp(st_mtime), + # ctype= meta["Content-Type"], + # catalog="内部", + # meta=json.dumps(meta)[:1000] + # ).on_conflict( + # conflict_target=[Doc.abs_path], + # update={Doc.f_up_at:datetime.fromtimestamp(st_mtime)} + # ) + #query.execute() + + + try: + Doc.create(base=base,abs_path=path, + f_name=file_name, + f_size=st_size, + f_up_at = datetime.fromtimestamp(st_mtime), + ctype= meta["Content-Type"], + catalog="内部", + meta=json.dumps(meta)[:1000], + author = ownername, + ) + except: + # 更新,同时删除原有全文索引 + Doc.update({Doc.f_up_at:datetime.fromtimestamp(st_mtime),Doc.base:base, + Doc.meta:json.dumps(meta)[:1000],Doc.author:ownername, + }).where(Doc.abs_path==path).execute() + delete_fs_index_by_base_path(base,path) + + #内容分片 + split_texts = text_splitter.split_text(text) + print(path,len(text),len(split_texts)) + split_texts = list(map(lambda x: f"文件: {path}\n"+x,split_texts)) + + create_full_index(split_texts,file_name,path,st_mtime,base) + + + #print(file,len(text),len(documents)) + +#百科分类的统计 +def baike_catalog_total(): + from peewee import fn + query = (BaiKe. + select( BaiKe.catalog,fn.COUNT(BaiKe.id).alias('count') ) + .group_by(BaiKe.catalog) + ) + + for record in query: + BaiKe_Catalog.update(doc_count= record.count).where(BaiKe_Catalog.catalog == record.catalog).execute() + +# 清理过期session +def clear_session_id(): + import glob + import os + session_ids = glob.glob(f"/tmp/k3gpt/session_id_*", recursive=True) + for file in session_ids: + try: + with open(file,"r") as f: + s_timestamp = f.read() + if int(time.time())-int(s_timestamp) > 3600*12: + os.remove(file) + except Exception as e: + print (e) + + + +if __name__=="__main__": + + print("开始知识库构建工作") + while True: + try: + with open(f"{gcfg['fs']['path']}/conf/source.conf") as conf: + source = json5.load(conf) + except: + source={"测试":{"name":"测试","type":"FTP","address":"192.168.0.1:21","user":"admin","password":"1","path":"home","scan":1,"count":1024}} + + bases=list(source.values()) + for base in bases: + + try: + now = int(time.time()) + #扫描周期 + scant = int(float(source[base["name"]]["scan"])*3600) + + #扫描时间的判断,不到时间 + if "scan_date" in base and now < source[base["name"]]["scan_date"] + scant: + print("扫描时间未到",base["name"],datetime.now()) + continue + + print("扫描开始",base["name"],datetime.now()) + + count=0 + + if base["type"]=="SFTP": + count = build_sftp_base(base["name"],base) + + elif base["type"]=="本地": + count = build_linux_base(base["name"],base) + elif base["type"]=="Windows共享": + count= build_smb_base(base["name"],base) + elif base["type"]=="FTP": + count= build_ftp_base(base["name"],base) + elif base["type"]=="钉盘": + count= build_ding_pan(base["name"],base) + + source[base["name"]]["count"] = count + source[base["name"]]["scan_date"] = int(time.time()) + print("扫描结束",base["name"],datetime.now()) + except Exception as e : + print(e) + + #更新 + + #end for + + #写回 + #最新的配置 + with open(f"{gcfg['fs']['path']}/conf/source.conf") as conf: + source_new = json5.load(conf) + + #更新文件数和扫描时间 + for k,v in source.items(): + if "count" in v : source_new[k]["count"] = v["count"] + if "scan_date" in v : source_new[k]["scan_date"] = v["scan_date"] + + with open(f"{gcfg['fs']['path']}/conf/source.conf","w+") as conf: + json5.dump(source_new,conf,ensure_ascii=False) + + try: + baike_catalog_total() + except Exception as e: + print("百科分类统计",e) + + clear_session_id() + #休息30秒 + time.sleep(30) + +