Files
k3GPT/main/second_search.py
2025-11-19 19:42:46 +08:00

264 lines
9.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

###
#
# 二次检索,对一次检索的内容进行去重,过滤,关联性判断,和二次的内容精准定位
#
###
from operator import itemgetter
import re
#==================================辅助函数======================================
#关键字个数
def cnt_keywords(name,kws):
cnt = 0
for k in kws:
if name.find(k) >=0:
cnt +=1
return cnt
#尽可能使用连贯起来的词,即整词,通过分词后在原句里是距离来决定
def jieba_zhengci(keyword,seg_list):
if len(seg_list)==1: return seg_list
#
zhengci={0:""}
seg_infos=[]
for seg in seg_list:
seg_infos.append({"seg":seg,"begin":keyword.find(seg),"len":len(seg)})
j=0
for i in range(len(seg_infos)):
if i+1>=len(seg_infos): #孤立的最后一个
zhengci[j] = zhengci[j]+seg_infos[i]["seg"]
break
if seg_infos[i+1]["begin"]==seg_infos[i]["begin"]+seg_infos[i]["len"]:
if i+1>=len(seg_infos): #最后一个
zhengci[j] = zhengci[j]+seg_infos[i]["seg"]+seg_infos[i+1]["seg"]
break
else:
zhengci[j] = zhengci[j]+seg_infos[i]["seg"]
else:
zhengci[j] = zhengci[j]+seg_infos[i]["seg"]
#开创一个新的
j +=1
zhengci[j]=""
return list(zhengci.values())
#最多2个词连贯起来的词即整词通过分词后在原句里是距离来决定
def jieba_zhengci2(keyword,seg_list):
if len(seg_list)==1: return seg_list
#
zhengci=[]
seg_infos=[]
k_copy= keyword #用来处理重复的词
start = 0
for seg in seg_list:
seg_infos.append({"seg":seg,"begin":k_copy.find(seg)+start,"len":len(seg)})
start += len(seg)
k_copy = keyword[start:]
i=0
count = len(seg_infos)
while i < count:
if i+1>=count: #孤立的最后一个
zhengci.append(seg_infos[i]["seg"])
break
if seg_infos[i+1]["begin"]==seg_infos[i]["begin"]+seg_infos[i]["len"]:
if not is_number(seg_infos[i]["seg"]) and not is_number(seg_infos[i+1]["seg"]): #两者都不是数字
zhengci.append(seg_infos[i]["seg"]+seg_infos[i+1]["seg"])
if i+1>=len(seg_infos): #最后一个
break
i +=2 #跳过一个
else:
zhengci.append(seg_infos[i]["seg"])
i +=1
else:
zhengci.append(seg_infos[i]["seg"])
i +=1
#print(zhengci,i)
return zhengci
def is_number(str_val):
try:
float(str_val) # 尝试将字符串转换为float
return True
except ValueError: # 捕获转换失败时的异常
return False
def clean_string(s):
# 替换两个或更多连续的换行符为一个换行符
s = re.sub(r'\n{2,}', '\n', s)
# 替换两个或更多连续的空格为一个空格
s = re.sub(r' {2,}', ' ', s)
return s
#################################召回#######################
"""
二次检索
参数:
keyword 检索的关键字
seg_list 结巴分词后的关键字列表
matches 全文检索后的匹配列表
percent 计划要的分数
"""
def retrieve_result(keyword,seg_list,matches,percent):
slice_sum = len(matches)
print("一次检索",keyword,seg_list,slice_sum)
if slice_sum <20:
percent = 10 #分片数量少于20的情况下可信度降到10增加有效数量
r_cnt={}
r2={}
#整词理表
zhengci = jieba_zhengci2(keyword,seg_list)
all_words = zhengci + seg_list
v_hash=[] #内容重复去重
for i in range(3):
for m in matches:
if m.percent >percent:
doc_id = m.docid
file_name = m.document.get_value(0).decode("utf-8")
file_path = m.document.get_value(1).decode("utf-8")
file_base = m.document.get_value(3).decode("utf-8")
file_ctx = m.document.get_data().decode("utf-8")
if hash(file_ctx) in v_hash: #去除重复内容
continue
else:
v_hash.append(hash(file_ctx))
#文件路径和文件中包含关键词(+整词)的数量,即计算文件名的相关性
cnt = cnt_keywords(f"{file_path}.{file_name}",all_words)
r_cnt[doc_id] = cnt
r2[doc_id] = [file_name,file_ctx,file_path,file_base]
# 按值降序排序
sorted_cnt = sorted(r_cnt.items(), key=itemgetter(1), reverse=True)
sorted_r_cnt = dict(sorted_cnt)
# 二次检索,内容使用整词检索,排序
print("可信关联度",len(sorted_r_cnt),zhengci,percent)
if len(sorted_r_cnt) >0:
break
else:
percent -=20 #降低20个
#end for
r2_cnt = {} #整词数量
for k1,v1 in sorted_r_cnt.items():
k,v,path,base = r2[k1]
r2_cnt[k1] = cnt_keywords(v,zhengci)
#print(k,v1,r2_cnt[k1])
# 按值降序排序,取前n个数据分片
n = len(sorted_r_cnt)
if n >10: #前10+后续的一半数量
n = 10 + int((n-10)* 0.8)
sorted_cnt2 = sorted(r2_cnt.items(), key=itemgetter(1), reverse=True)
sorted_r2_cnt = dict(sorted_cnt2[0:n])
#开始进行分片内容的处理加工,提供更转确的信息给大模型
result=[]
meta=[]
#分片大小
size=512
if len(sorted_r2_cnt)<=5:
size=size*8
elif len(sorted_r2_cnt)<=10:
size=size*4
elif len(sorted_r2_cnt)<=20:
size = size*2
#
#信息的二次处理逻辑是要提取关键信息
for k1,v1 in sorted_r2_cnt.items():
k,v,path,base = r2[k1]
v = clean_string(v)
#最终内容
content=""
#关键字首次出现的位置
pos={}
for word in zhengci:
if v.find(word)!=-1:
pos[word]= v.find(word)
# 按出现位置值升序排序,转换为数组[[k,count],[k,count]]
word_pos = sorted(pos.items(), key=itemgetter(1), reverse=False)
print("整词个数",len(word_pos),k)
if len(word_pos)==0:
content = v[0:int(size/2)]
elif len(word_pos)==1: #一个关键字一个size
#判断关键字是否出现多次
count = v.count(word_pos[0][0])
if count==1:
start = 0 if word_pos[0][1]-100 <0 else word_pos[0][1]-100
content = v[start:start+int(size)]
else:
pos=[]
psize = int(size/count) #
start = 0
for i in range(count):
start = v.find(word_pos[0][0],start)
if start ==-1:
break
content += v[start-50:start+psize-50]
start += len(word_pos[0][0])
elif len(word_pos)==2: #两个关键字,最多两个size
if v.count(word_pos[0][0]) >=2 and v.count(word_pos[1][0]) >=2: #关键词多次出现不好判断的
#全部内容
content = v
else: #整词关键词只出现过一次的
start = 0 if word_pos[0][1]-100 <0 else word_pos[0][1]-100
if word_pos[1][1]- word_pos[0][1] < size:
content = v[start:start+size]
else:
content = v[start:start+size] +"\n"+ v[word_pos[1][1]-50:word_pos[1][1]-50+size]
else:#多关键字3*size--4*size
start = 0 if word_pos[0][1]-100 <0 else word_pos[0][1]-100
if word_pos[-1][1] - word_pos[0][1] < size:
content = v[start:start+size]
#滑动去找最后一个最关键的字
v2 = v
start2 = word_pos[-1][1]
while v2.rfind(word_pos[-1][0]) > word_pos[-1][1]: #最后一个关键字在文章中倒数的位置大于顺数的位置
start2 = v2.rfind(word_pos[-1][0])
#print(len(v2),start2)
if len(v)-start2 >size:
break
else:
v2 = v[0:start2]
#end while
last_start = 0 if start2-size <0 else start2-size
#print("start2",word_pos[-1][0],start2,last_start)
content +="\n"+v[last_start:last_start+size]
content +="\n"+v[start2:start2+size]
else:
content = v[start:start+size] +"\n"+ v[word_pos[-1][1]-50:word_pos[-1][1]-50+size]
if v.rfind(word_pos[-1][0]) > word_pos[-1][1]+size:#倒数那个词距离很远
start2 = v.rfind(word_pos[-1][0])
last_start = 0 if start2-size <0 else start2-size
content +="\n"+v[last_start:last_start+size]
content +="\n"+v[start2:start2+size]
#end if
result.append(content)
meta.append([k,path,base])
#end for
print("二次检索后分片数量",len(result))
return result,meta