""" 使用向量库和全文检索两个来构建知识库 """ from pathlib import Path from tika import parser from langchain.text_splitter import RecursiveCharacterTextSplitter from datetime import datetime from full_index_search import create_full_index,create_full_index2,delete_fs_index_by_base_path from k_database import Doc,BaiKe,BaiKe_Catalog import json import json5 import time #from llm_embedding2 import embeddings #from milvus import default_server #from langchain_milvus import Milvus from init import gcfg from ff import smb_scan_directory,sftp_scan_directory,sftp_conn,sftp_close,ftp_scan_directory def extract_text(content): #raw = parser.from_file(file_path) try: raw = parser.from_buffer(content) except: raw ={'metadata':{"Content-Type":""},'content':''} return raw["metadata"],raw['content'] def extract_text_path(file_path): try: raw = parser.from_file(file_path) except: raw ={'metadata':{"Content-Type":""},'content':''} return raw["metadata"],raw['content'] #遍历目录下的所有文件和子目录,返回文件path和meta信息 def list_files(directory,pre="*"): for path in Path(directory).rglob(pre): if path.is_file(): yield path,path.stat() # 创建一个 RecursiveCharacterTextSplitter 实例 text_splitter = RecursiveCharacterTextSplitter( chunk_size=gcfg["full_index"]["chunk_size"], # 设置每个文本块的目标大小 chunk_overlap=gcfg["full_index"]["chunk_overlap"] # 设置相邻文本块之间的重叠字符数 ) # vector_db=Milvus( # embeddings, # connection_args={"host":"127.0.0.1","port": default_server.listen_port}, # collection_name='test', # auto_id=True, # ) #忽略的文件类型 ignore_file_type=gcfg["full_index"]["ignore_ftype"].split(",") #忽略的文件大小 max_file_size = int(gcfg["full_index"]["ignore_fsize"])*1024*1024 #文件的扩展名 def get_file_ext(fname): return fname[fname.rfind("."):] import os import pwd def build_linux_base(base,cfg): count=0 error=0 for path,stat in list_files(cfg["path"]): count +=1 size = stat.st_size print(str(path)) if get_file_ext(path.name) not in ignore_file_type and size < max_file_size and path.name[0:2] not in [".~","~$"]: try: doc = Doc.get_or_none( (Doc.abs_path==str(path)) & (Doc.base==cfg["name"]) ) stat_info = os.stat(str(path)) uid = stat_info.st_uid username = pwd.getpwuid(uid).pw_name if not doc or doc.f_up_at.timestamp() < stat.st_mtime: meta,text = extract_text_path(str(path)) if text==None: text="" build_one_doc(base,str(path),path.name,meta,text,size,stat.st_mtime,username) except Exception as e : print(f"{path} 构建出错:{e}") error +=1 if error >0: raise Exception(f"本次扫描有{error}个文档错误") return count import smbclient #构建smb的知识库 def build_smb_base(base,cfg): count=0 smbclient.register_session(cfg["address"],cfg["user"],cfg["password"]) for path,fname,mtime,size in smb_scan_directory(cfg["address"],cfg["user"],cfg["password"],cfg["path"]): count +=1 print(count,path,size) if get_file_ext(fname) not in ignore_file_type and size < max_file_size and fname[0:2] not in [".~","~$"]: doc = Doc.get_or_none( (Doc.abs_path==str(path)) & (Doc.base==cfg["name"]) ) if not doc or doc.f_up_at.timestamp() < mtime: try: with smbclient.open_file(path, mode="rb") as fd: content = fd.read() except: #出错重连,继续 smbclient.register_session(cfg["address"],cfg["user"],cfg["password"]) continue meta,text = extract_text(content) if text==None: text="" build_one_doc(base,path,fname,meta,text,size,mtime) return count #构建sftp的知识库 def build_sftp_base(base,cfg): count=0 ssh,sftp = sftp_conn(cfg["address"],cfg["user"],cfg["password"]) for path,fname,mtime,size,ownername in sftp_scan_directory(cfg["address"],cfg["user"],cfg["password"],cfg["path"]): count +=1 print(count,path,size,ownername) if get_file_ext(fname) not in ignore_file_type and size < max_file_size and fname[0:2] not in [".~","~$"]: doc = Doc.get_or_none( (Doc.abs_path==str(path)) & (Doc.base==cfg["name"]) ) if not doc or doc.f_up_at.timestamp() < mtime: try: with sftp.file(path, 'r') as remote_file: content = remote_file.read() except: #出错重连,继续 sftp_close(ssh,sftp) ssh,sftp = sftp_conn(cfg["address"],cfg["user"],cfg["password"]) continue meta,text = extract_text(content) if text==None: text="" build_one_doc(base,path,fname,meta,text,size,mtime,ownername) sftp_close(ssh,sftp) return count from ftplib import FTP from io import BytesIO def get_binary_file_content(ftp, remote_path): """ 获取二进制文件的内容。 :param ftp: 已连接并登录的 FTP 对象 :param remote_path: 远程文件路径 :return: 文件内容 (bytes) """ binary_data = BytesIO() ftp.retrbinary(f'RETR {remote_path}', binary_data.write) return binary_data.getvalue() #构建ftp的知识库 def build_ftp_base(base,cfg): count=0 for entry in ftp_scan_directory(cfg["address"],cfg["user"],cfg["password"],cfg["path"]): path = entry["path"] fname = entry["name"] size = int(entry["size"]) mtime = entry["mtime"] count +=1 print(count,path,size,max_file_size) if get_file_ext(fname) not in ignore_file_type and size < max_file_size and fname[0:2] not in [".~","~$"]: doc = Doc.get_or_none( (Doc.abs_path==str(path)) & (Doc.base==cfg["name"]) ) if not doc or doc.f_up_at.timestamp() < mtime: # 连接到FTP服务器 with FTP(cfg["address"]) as ftp: ftp.login(user=cfg["user"], passwd=cfg["password"]) content = get_binary_file_content(ftp,path) meta,text = extract_text(content) if text==None: text="" build_one_doc(base,path,fname,meta,text,size,mtime) return count from dingpan import ding_scan_directory,ding_get_file_content #构建钉盘上文件目录的知识库 def build_ding_pan(base,cfg): count=0 for entry in ding_scan_directory(cfg["address"],cfg["password"],cfg["path"]): path = entry["path"] fname = entry["fname"] file_id = entry["id"] mtime = entry["mtime"] spaceId = entry["spaceId"] unionid = entry["unionid"] access_token = entry["access_token"] count +=1 print(count,path,file_id) if get_file_ext(fname) not in ignore_file_type and fname[0:2] not in [".~","~$"]: doc = Doc.get_or_none( (Doc.abs_path==str(path)) & (Doc.base==cfg["name"]) ) if not doc or doc.f_up_at < mtime: content = ding_get_file_content(spaceId,file_id,unionid,access_token) if content and len(content) < max_file_size: meta,text = extract_text(content) if text: build_one_doc(base,path,fname,{"id":file_id,"spaceId":entry["spaceId"]},text,len(content),mtime.timestamp()) return count #构建单个文档 def build_one_doc(base,path,file_name,meta,text,st_size,st_mtime,ownername="admin"): #向量数据库的处理 # #分割文档,包含文件名称的meta信息 #metas=[{"name":file_name}]*len(split_texts) #documents = text_splitter.create_documents(split_texts,metas) # try: # vector_db.add_documents(documents) # except: # pass # query = Doc.insert(base=base,abs_path=path, # f_name=file_name, # f_size=st_size, # f_up_at = datetime.fromtimestamp(st_mtime), # ctype= meta["Content-Type"], # catalog="内部", # meta=json.dumps(meta)[:1000] # ).on_conflict( # conflict_target=[Doc.abs_path], # update={Doc.f_up_at:datetime.fromtimestamp(st_mtime)} # ) #query.execute() try: Doc.create(base=base,abs_path=path, f_name=file_name, f_size=st_size, f_up_at = datetime.fromtimestamp(st_mtime), ctype= meta["Content-Type"], catalog="内部", meta=json.dumps(meta)[:1000], author = ownername, ) except: # 更新,同时删除原有全文索引 Doc.update({Doc.f_up_at:datetime.fromtimestamp(st_mtime),Doc.base:base, Doc.meta:json.dumps(meta)[:1000],Doc.author:ownername, }).where(Doc.abs_path==path).execute() delete_fs_index_by_base_path(base,path) #内容分片 split_texts = text_splitter.split_text(text) print(path,len(text),len(split_texts)) split_texts = list(map(lambda x: f"文件: {path}\n"+x,split_texts)) create_full_index(split_texts,file_name,path,st_mtime,base) #print(file,len(text),len(documents)) #百科分类的统计 def baike_catalog_total(): from peewee import fn query = (BaiKe. select( BaiKe.catalog,fn.COUNT(BaiKe.id).alias('count') ) .group_by(BaiKe.catalog) ) for record in query: BaiKe_Catalog.update(doc_count= record.count).where(BaiKe_Catalog.catalog == record.catalog).execute() # 清理过期session def clear_session_id(): import glob import os session_ids = glob.glob(f"/tmp/k3gpt/session_id_*", recursive=True) for file in session_ids: try: with open(file,"r") as f: s_timestamp = f.read() if int(time.time())-int(s_timestamp) > 3600*12: os.remove(file) except Exception as e: print (e) if __name__=="__main__": print("开始知识库构建工作") while True: try: with open(f"{gcfg['fs']['path']}/conf/source.conf") as conf: source = json5.load(conf) except: source={"测试":{"name":"测试","type":"FTP","address":"192.168.0.1:21","user":"admin","password":"1","path":"home","scan":1,"count":1024}} bases=list(source.values()) for base in bases: try: now = int(time.time()) #扫描周期 scant = int(float(source[base["name"]]["scan"])*3600) #扫描时间的判断,不到时间 if "scan_date" in base and now < source[base["name"]]["scan_date"] + scant: print("扫描时间未到",base["name"],datetime.now()) continue print("扫描开始",base["name"],datetime.now()) count=0 if base["type"]=="SFTP": count = build_sftp_base(base["name"],base) elif base["type"]=="本地": count = build_linux_base(base["name"],base) elif base["type"]=="Windows共享": count= build_smb_base(base["name"],base) elif base["type"]=="FTP": count= build_ftp_base(base["name"],base) elif base["type"]=="钉盘": count= build_ding_pan(base["name"],base) source[base["name"]]["count"] = count source[base["name"]]["scan_date"] = int(time.time()) print("扫描结束",base["name"],datetime.now()) except Exception as e : print(e) #更新 #end for #写回 #最新的配置 with open(f"{gcfg['fs']['path']}/conf/source.conf") as conf: source_new = json5.load(conf) #更新文件数和扫描时间 for k,v in source.items(): if "count" in v : source_new[k]["count"] = v["count"] if "scan_date" in v : source_new[k]["scan_date"] = v["scan_date"] with open(f"{gcfg['fs']['path']}/conf/source.conf","w+") as conf: json5.dump(source_new,conf,ensure_ascii=False) try: baike_catalog_total() except Exception as e: print("百科分类统计",e) clear_session_id() #休息30秒 time.sleep(30)