264 lines
9.0 KiB
Python
264 lines
9.0 KiB
Python
###
|
||
#
|
||
# 二次检索,对一次检索的内容进行去重,过滤,关联性判断,和二次的内容精准定位
|
||
#
|
||
###
|
||
from operator import itemgetter
|
||
import re
|
||
|
||
#==================================辅助函数======================================
|
||
|
||
#关键字个数
|
||
def cnt_keywords(name,kws):
|
||
cnt = 0
|
||
for k in kws:
|
||
if name.find(k) >=0:
|
||
cnt +=1
|
||
return cnt
|
||
|
||
|
||
#尽可能使用连贯起来的词,即整词,通过分词后在原句里是距离来决定
|
||
def jieba_zhengci(keyword,seg_list):
|
||
if len(seg_list)==1: return seg_list
|
||
#
|
||
zhengci={0:""}
|
||
seg_infos=[]
|
||
for seg in seg_list:
|
||
seg_infos.append({"seg":seg,"begin":keyword.find(seg),"len":len(seg)})
|
||
|
||
j=0
|
||
for i in range(len(seg_infos)):
|
||
if i+1>=len(seg_infos): #孤立的最后一个
|
||
zhengci[j] = zhengci[j]+seg_infos[i]["seg"]
|
||
break
|
||
if seg_infos[i+1]["begin"]==seg_infos[i]["begin"]+seg_infos[i]["len"]:
|
||
if i+1>=len(seg_infos): #最后一个
|
||
zhengci[j] = zhengci[j]+seg_infos[i]["seg"]+seg_infos[i+1]["seg"]
|
||
break
|
||
else:
|
||
zhengci[j] = zhengci[j]+seg_infos[i]["seg"]
|
||
else:
|
||
zhengci[j] = zhengci[j]+seg_infos[i]["seg"]
|
||
#开创一个新的
|
||
j +=1
|
||
zhengci[j]=""
|
||
return list(zhengci.values())
|
||
|
||
#最多2个词连贯起来的词,即整词,通过分词后在原句里是距离来决定
|
||
def jieba_zhengci2(keyword,seg_list):
|
||
if len(seg_list)==1: return seg_list
|
||
#
|
||
zhengci=[]
|
||
seg_infos=[]
|
||
k_copy= keyword #用来处理重复的词
|
||
start = 0
|
||
for seg in seg_list:
|
||
seg_infos.append({"seg":seg,"begin":k_copy.find(seg)+start,"len":len(seg)})
|
||
start += len(seg)
|
||
k_copy = keyword[start:]
|
||
|
||
i=0
|
||
count = len(seg_infos)
|
||
while i < count:
|
||
if i+1>=count: #孤立的最后一个
|
||
zhengci.append(seg_infos[i]["seg"])
|
||
break
|
||
if seg_infos[i+1]["begin"]==seg_infos[i]["begin"]+seg_infos[i]["len"]:
|
||
if not is_number(seg_infos[i]["seg"]) and not is_number(seg_infos[i+1]["seg"]): #两者都不是数字
|
||
zhengci.append(seg_infos[i]["seg"]+seg_infos[i+1]["seg"])
|
||
|
||
if i+1>=len(seg_infos): #最后一个
|
||
break
|
||
i +=2 #跳过一个
|
||
else:
|
||
zhengci.append(seg_infos[i]["seg"])
|
||
i +=1
|
||
else:
|
||
zhengci.append(seg_infos[i]["seg"])
|
||
i +=1
|
||
#print(zhengci,i)
|
||
return zhengci
|
||
|
||
def is_number(str_val):
|
||
try:
|
||
float(str_val) # 尝试将字符串转换为float
|
||
return True
|
||
except ValueError: # 捕获转换失败时的异常
|
||
return False
|
||
|
||
|
||
|
||
def clean_string(s):
|
||
# 替换两个或更多连续的换行符为一个换行符
|
||
s = re.sub(r'\n{2,}', '\n', s)
|
||
# 替换两个或更多连续的空格为一个空格
|
||
s = re.sub(r' {2,}', ' ', s)
|
||
return s
|
||
|
||
#################################召回#######################
|
||
|
||
"""
|
||
二次检索
|
||
|
||
参数:
|
||
keyword 检索的关键字
|
||
seg_list 结巴分词后的关键字列表
|
||
matches 全文检索后的匹配列表
|
||
percent 计划要的分数
|
||
"""
|
||
def retrieve_result(keyword,seg_list,matches,percent):
|
||
|
||
slice_sum = len(matches)
|
||
print("一次检索",keyword,seg_list,slice_sum)
|
||
if slice_sum <20:
|
||
percent = 10 #分片数量少于20的情况下,可信度降到10,增加有效数量
|
||
r_cnt={}
|
||
r2={}
|
||
|
||
#整词理表
|
||
zhengci = jieba_zhengci2(keyword,seg_list)
|
||
all_words = zhengci + seg_list
|
||
|
||
v_hash=[] #内容重复去重
|
||
for i in range(3):
|
||
for m in matches:
|
||
if m.percent >percent:
|
||
doc_id = m.docid
|
||
file_name = m.document.get_value(0).decode("utf-8")
|
||
file_path = m.document.get_value(1).decode("utf-8")
|
||
file_base = m.document.get_value(3).decode("utf-8")
|
||
file_ctx = m.document.get_data().decode("utf-8")
|
||
|
||
if hash(file_ctx) in v_hash: #去除重复内容
|
||
continue
|
||
else:
|
||
v_hash.append(hash(file_ctx))
|
||
|
||
#文件路径和文件中包含关键词(+整词)的数量,即计算文件名的相关性
|
||
cnt = cnt_keywords(f"{file_path}.{file_name}",all_words)
|
||
r_cnt[doc_id] = cnt
|
||
r2[doc_id] = [file_name,file_ctx,file_path,file_base]
|
||
|
||
|
||
# 按值降序排序
|
||
sorted_cnt = sorted(r_cnt.items(), key=itemgetter(1), reverse=True)
|
||
sorted_r_cnt = dict(sorted_cnt)
|
||
|
||
# 二次检索,内容使用整词检索,排序
|
||
print("可信关联度",len(sorted_r_cnt),zhengci,percent)
|
||
|
||
if len(sorted_r_cnt) >0:
|
||
break
|
||
else:
|
||
percent -=20 #降低20个
|
||
#end for
|
||
|
||
r2_cnt = {} #整词数量
|
||
for k1,v1 in sorted_r_cnt.items():
|
||
k,v,path,base = r2[k1]
|
||
r2_cnt[k1] = cnt_keywords(v,zhengci)
|
||
#print(k,v1,r2_cnt[k1])
|
||
|
||
# 按值降序排序,取前n个数据分片
|
||
n = len(sorted_r_cnt)
|
||
if n >10: #前10+后续的一半数量
|
||
n = 10 + int((n-10)* 0.8)
|
||
sorted_cnt2 = sorted(r2_cnt.items(), key=itemgetter(1), reverse=True)
|
||
sorted_r2_cnt = dict(sorted_cnt2[0:n])
|
||
|
||
#开始进行分片内容的处理加工,提供更转确的信息给大模型
|
||
result=[]
|
||
meta=[]
|
||
|
||
#分片大小
|
||
size=512
|
||
|
||
if len(sorted_r2_cnt)<=5:
|
||
size=size*8
|
||
elif len(sorted_r2_cnt)<=10:
|
||
size=size*4
|
||
elif len(sorted_r2_cnt)<=20:
|
||
size = size*2
|
||
#
|
||
|
||
#信息的二次处理逻辑是要提取关键信息
|
||
for k1,v1 in sorted_r2_cnt.items():
|
||
k,v,path,base = r2[k1]
|
||
v = clean_string(v)
|
||
|
||
#最终内容
|
||
content=""
|
||
|
||
#关键字首次出现的位置
|
||
pos={}
|
||
for word in zhengci:
|
||
if v.find(word)!=-1:
|
||
pos[word]= v.find(word)
|
||
|
||
# 按出现位置值升序排序,转换为数组[[k,count],[k,count]]
|
||
word_pos = sorted(pos.items(), key=itemgetter(1), reverse=False)
|
||
print("整词个数",len(word_pos),k)
|
||
if len(word_pos)==0:
|
||
content = v[0:int(size/2)]
|
||
elif len(word_pos)==1: #一个关键字,一个size
|
||
#判断关键字是否出现多次
|
||
count = v.count(word_pos[0][0])
|
||
if count==1:
|
||
start = 0 if word_pos[0][1]-100 <0 else word_pos[0][1]-100
|
||
content = v[start:start+int(size)]
|
||
else:
|
||
pos=[]
|
||
psize = int(size/count) #
|
||
start = 0
|
||
for i in range(count):
|
||
start = v.find(word_pos[0][0],start)
|
||
if start ==-1:
|
||
break
|
||
content += v[start-50:start+psize-50]
|
||
start += len(word_pos[0][0])
|
||
|
||
elif len(word_pos)==2: #两个关键字,最多两个size
|
||
if v.count(word_pos[0][0]) >=2 and v.count(word_pos[1][0]) >=2: #关键词多次出现不好判断的
|
||
#全部内容
|
||
content = v
|
||
else: #整词关键词只出现过一次的
|
||
start = 0 if word_pos[0][1]-100 <0 else word_pos[0][1]-100
|
||
if word_pos[1][1]- word_pos[0][1] < size:
|
||
content = v[start:start+size]
|
||
else:
|
||
content = v[start:start+size] +"\n"+ v[word_pos[1][1]-50:word_pos[1][1]-50+size]
|
||
else:#多关键字,3*size--4*size
|
||
start = 0 if word_pos[0][1]-100 <0 else word_pos[0][1]-100
|
||
|
||
if word_pos[-1][1] - word_pos[0][1] < size:
|
||
content = v[start:start+size]
|
||
#滑动去找最后一个最关键的字
|
||
v2 = v
|
||
start2 = word_pos[-1][1]
|
||
while v2.rfind(word_pos[-1][0]) > word_pos[-1][1]: #最后一个关键字在文章中倒数的位置大于顺数的位置
|
||
start2 = v2.rfind(word_pos[-1][0])
|
||
#print(len(v2),start2)
|
||
if len(v)-start2 >size:
|
||
break
|
||
else:
|
||
v2 = v[0:start2]
|
||
#end while
|
||
last_start = 0 if start2-size <0 else start2-size
|
||
#print("start2",word_pos[-1][0],start2,last_start)
|
||
content +="\n"+v[last_start:last_start+size]
|
||
content +="\n"+v[start2:start2+size]
|
||
|
||
else:
|
||
content = v[start:start+size] +"\n"+ v[word_pos[-1][1]-50:word_pos[-1][1]-50+size]
|
||
if v.rfind(word_pos[-1][0]) > word_pos[-1][1]+size:#倒数那个词距离很远
|
||
start2 = v.rfind(word_pos[-1][0])
|
||
last_start = 0 if start2-size <0 else start2-size
|
||
content +="\n"+v[last_start:last_start+size]
|
||
content +="\n"+v[start2:start2+size]
|
||
#end if
|
||
result.append(content)
|
||
meta.append([k,path,base])
|
||
#end for
|
||
|
||
print("二次检索后分片数量",len(result))
|
||
return result,meta |