Files
k3GPT/main/second_search.py

264 lines
9.0 KiB
Python
Raw Normal View History

2025-11-19 19:42:46 +08:00
###
#
# 二次检索,对一次检索的内容进行去重,过滤,关联性判断,和二次的内容精准定位
#
###
from operator import itemgetter
import re
#==================================辅助函数======================================
#关键字个数
def cnt_keywords(name,kws):
cnt = 0
for k in kws:
if name.find(k) >=0:
cnt +=1
return cnt
#尽可能使用连贯起来的词,即整词,通过分词后在原句里是距离来决定
def jieba_zhengci(keyword,seg_list):
if len(seg_list)==1: return seg_list
#
zhengci={0:""}
seg_infos=[]
for seg in seg_list:
seg_infos.append({"seg":seg,"begin":keyword.find(seg),"len":len(seg)})
j=0
for i in range(len(seg_infos)):
if i+1>=len(seg_infos): #孤立的最后一个
zhengci[j] = zhengci[j]+seg_infos[i]["seg"]
break
if seg_infos[i+1]["begin"]==seg_infos[i]["begin"]+seg_infos[i]["len"]:
if i+1>=len(seg_infos): #最后一个
zhengci[j] = zhengci[j]+seg_infos[i]["seg"]+seg_infos[i+1]["seg"]
break
else:
zhengci[j] = zhengci[j]+seg_infos[i]["seg"]
else:
zhengci[j] = zhengci[j]+seg_infos[i]["seg"]
#开创一个新的
j +=1
zhengci[j]=""
return list(zhengci.values())
#最多2个词连贯起来的词即整词通过分词后在原句里是距离来决定
def jieba_zhengci2(keyword,seg_list):
if len(seg_list)==1: return seg_list
#
zhengci=[]
seg_infos=[]
k_copy= keyword #用来处理重复的词
start = 0
for seg in seg_list:
seg_infos.append({"seg":seg,"begin":k_copy.find(seg)+start,"len":len(seg)})
start += len(seg)
k_copy = keyword[start:]
i=0
count = len(seg_infos)
while i < count:
if i+1>=count: #孤立的最后一个
zhengci.append(seg_infos[i]["seg"])
break
if seg_infos[i+1]["begin"]==seg_infos[i]["begin"]+seg_infos[i]["len"]:
if not is_number(seg_infos[i]["seg"]) and not is_number(seg_infos[i+1]["seg"]): #两者都不是数字
zhengci.append(seg_infos[i]["seg"]+seg_infos[i+1]["seg"])
if i+1>=len(seg_infos): #最后一个
break
i +=2 #跳过一个
else:
zhengci.append(seg_infos[i]["seg"])
i +=1
else:
zhengci.append(seg_infos[i]["seg"])
i +=1
#print(zhengci,i)
return zhengci
def is_number(str_val):
try:
float(str_val) # 尝试将字符串转换为float
return True
except ValueError: # 捕获转换失败时的异常
return False
def clean_string(s):
# 替换两个或更多连续的换行符为一个换行符
s = re.sub(r'\n{2,}', '\n', s)
# 替换两个或更多连续的空格为一个空格
s = re.sub(r' {2,}', ' ', s)
return s
#################################召回#######################
"""
二次检索
参数
keyword 检索的关键字
seg_list 结巴分词后的关键字列表
matches 全文检索后的匹配列表
percent 计划要的分数
"""
def retrieve_result(keyword,seg_list,matches,percent):
slice_sum = len(matches)
print("一次检索",keyword,seg_list,slice_sum)
if slice_sum <20:
percent = 10 #分片数量少于20的情况下可信度降到10增加有效数量
r_cnt={}
r2={}
#整词理表
zhengci = jieba_zhengci2(keyword,seg_list)
all_words = zhengci + seg_list
v_hash=[] #内容重复去重
for i in range(3):
for m in matches:
if m.percent >percent:
doc_id = m.docid
file_name = m.document.get_value(0).decode("utf-8")
file_path = m.document.get_value(1).decode("utf-8")
file_base = m.document.get_value(3).decode("utf-8")
file_ctx = m.document.get_data().decode("utf-8")
if hash(file_ctx) in v_hash: #去除重复内容
continue
else:
v_hash.append(hash(file_ctx))
#文件路径和文件中包含关键词(+整词)的数量,即计算文件名的相关性
cnt = cnt_keywords(f"{file_path}.{file_name}",all_words)
r_cnt[doc_id] = cnt
r2[doc_id] = [file_name,file_ctx,file_path,file_base]
# 按值降序排序
sorted_cnt = sorted(r_cnt.items(), key=itemgetter(1), reverse=True)
sorted_r_cnt = dict(sorted_cnt)
# 二次检索,内容使用整词检索,排序
print("可信关联度",len(sorted_r_cnt),zhengci,percent)
if len(sorted_r_cnt) >0:
break
else:
percent -=20 #降低20个
#end for
r2_cnt = {} #整词数量
for k1,v1 in sorted_r_cnt.items():
k,v,path,base = r2[k1]
r2_cnt[k1] = cnt_keywords(v,zhengci)
#print(k,v1,r2_cnt[k1])
# 按值降序排序,取前n个数据分片
n = len(sorted_r_cnt)
if n >10: #前10+后续的一半数量
n = 10 + int((n-10)* 0.8)
sorted_cnt2 = sorted(r2_cnt.items(), key=itemgetter(1), reverse=True)
sorted_r2_cnt = dict(sorted_cnt2[0:n])
#开始进行分片内容的处理加工,提供更转确的信息给大模型
result=[]
meta=[]
#分片大小
size=512
if len(sorted_r2_cnt)<=5:
size=size*8
elif len(sorted_r2_cnt)<=10:
size=size*4
elif len(sorted_r2_cnt)<=20:
size = size*2
#
#信息的二次处理逻辑是要提取关键信息
for k1,v1 in sorted_r2_cnt.items():
k,v,path,base = r2[k1]
v = clean_string(v)
#最终内容
content=""
#关键字首次出现的位置
pos={}
for word in zhengci:
if v.find(word)!=-1:
pos[word]= v.find(word)
# 按出现位置值升序排序,转换为数组[[k,count],[k,count]]
word_pos = sorted(pos.items(), key=itemgetter(1), reverse=False)
print("整词个数",len(word_pos),k)
if len(word_pos)==0:
content = v[0:int(size/2)]
elif len(word_pos)==1: #一个关键字一个size
#判断关键字是否出现多次
count = v.count(word_pos[0][0])
if count==1:
start = 0 if word_pos[0][1]-100 <0 else word_pos[0][1]-100
content = v[start:start+int(size)]
else:
pos=[]
psize = int(size/count) #
start = 0
for i in range(count):
start = v.find(word_pos[0][0],start)
if start ==-1:
break
content += v[start-50:start+psize-50]
start += len(word_pos[0][0])
elif len(word_pos)==2: #两个关键字,最多两个size
if v.count(word_pos[0][0]) >=2 and v.count(word_pos[1][0]) >=2: #关键词多次出现不好判断的
#全部内容
content = v
else: #整词关键词只出现过一次的
start = 0 if word_pos[0][1]-100 <0 else word_pos[0][1]-100
if word_pos[1][1]- word_pos[0][1] < size:
content = v[start:start+size]
else:
content = v[start:start+size] +"\n"+ v[word_pos[1][1]-50:word_pos[1][1]-50+size]
else:#多关键字3*size--4*size
start = 0 if word_pos[0][1]-100 <0 else word_pos[0][1]-100
if word_pos[-1][1] - word_pos[0][1] < size:
content = v[start:start+size]
#滑动去找最后一个最关键的字
v2 = v
start2 = word_pos[-1][1]
while v2.rfind(word_pos[-1][0]) > word_pos[-1][1]: #最后一个关键字在文章中倒数的位置大于顺数的位置
start2 = v2.rfind(word_pos[-1][0])
#print(len(v2),start2)
if len(v)-start2 >size:
break
else:
v2 = v[0:start2]
#end while
last_start = 0 if start2-size <0 else start2-size
#print("start2",word_pos[-1][0],start2,last_start)
content +="\n"+v[last_start:last_start+size]
content +="\n"+v[start2:start2+size]
else:
content = v[start:start+size] +"\n"+ v[word_pos[-1][1]-50:word_pos[-1][1]-50+size]
if v.rfind(word_pos[-1][0]) > word_pos[-1][1]+size:#倒数那个词距离很远
start2 = v.rfind(word_pos[-1][0])
last_start = 0 if start2-size <0 else start2-size
content +="\n"+v[last_start:last_start+size]
content +="\n"+v[start2:start2+size]
#end if
result.append(content)
meta.append([k,path,base])
#end for
print("二次检索后分片数量",len(result))
return result,meta