### # # 二次检索,对一次检索的内容进行去重,过滤,关联性判断,和二次的内容精准定位 # ### from operator import itemgetter import re #==================================辅助函数====================================== #关键字个数 def cnt_keywords(name,kws): cnt = 0 for k in kws: if name.find(k) >=0: cnt +=1 return cnt #尽可能使用连贯起来的词,即整词,通过分词后在原句里是距离来决定 def jieba_zhengci(keyword,seg_list): if len(seg_list)==1: return seg_list # zhengci={0:""} seg_infos=[] for seg in seg_list: seg_infos.append({"seg":seg,"begin":keyword.find(seg),"len":len(seg)}) j=0 for i in range(len(seg_infos)): if i+1>=len(seg_infos): #孤立的最后一个 zhengci[j] = zhengci[j]+seg_infos[i]["seg"] break if seg_infos[i+1]["begin"]==seg_infos[i]["begin"]+seg_infos[i]["len"]: if i+1>=len(seg_infos): #最后一个 zhengci[j] = zhengci[j]+seg_infos[i]["seg"]+seg_infos[i+1]["seg"] break else: zhengci[j] = zhengci[j]+seg_infos[i]["seg"] else: zhengci[j] = zhengci[j]+seg_infos[i]["seg"] #开创一个新的 j +=1 zhengci[j]="" return list(zhengci.values()) #最多2个词连贯起来的词,即整词,通过分词后在原句里是距离来决定 def jieba_zhengci2(keyword,seg_list): if len(seg_list)==1: return seg_list # zhengci=[] seg_infos=[] k_copy= keyword #用来处理重复的词 start = 0 for seg in seg_list: seg_infos.append({"seg":seg,"begin":k_copy.find(seg)+start,"len":len(seg)}) start += len(seg) k_copy = keyword[start:] i=0 count = len(seg_infos) while i < count: if i+1>=count: #孤立的最后一个 zhengci.append(seg_infos[i]["seg"]) break if seg_infos[i+1]["begin"]==seg_infos[i]["begin"]+seg_infos[i]["len"]: if not is_number(seg_infos[i]["seg"]) and not is_number(seg_infos[i+1]["seg"]): #两者都不是数字 zhengci.append(seg_infos[i]["seg"]+seg_infos[i+1]["seg"]) if i+1>=len(seg_infos): #最后一个 break i +=2 #跳过一个 else: zhengci.append(seg_infos[i]["seg"]) i +=1 else: zhengci.append(seg_infos[i]["seg"]) i +=1 #print(zhengci,i) return zhengci def is_number(str_val): try: float(str_val) # 尝试将字符串转换为float return True except ValueError: # 捕获转换失败时的异常 return False def clean_string(s): # 替换两个或更多连续的换行符为一个换行符 s = re.sub(r'\n{2,}', '\n', s) # 替换两个或更多连续的空格为一个空格 s = re.sub(r' {2,}', ' ', s) return s #################################召回####################### """ 二次检索 参数: keyword 检索的关键字 seg_list 结巴分词后的关键字列表 matches 全文检索后的匹配列表 percent 计划要的分数 """ def retrieve_result(keyword,seg_list,matches,percent): slice_sum = len(matches) print("一次检索",keyword,seg_list,slice_sum) if slice_sum <20: percent = 10 #分片数量少于20的情况下,可信度降到10,增加有效数量 r_cnt={} r2={} #整词理表 zhengci = jieba_zhengci2(keyword,seg_list) all_words = zhengci + seg_list v_hash=[] #内容重复去重 for i in range(3): for m in matches: if m.percent >percent: doc_id = m.docid file_name = m.document.get_value(0).decode("utf-8") file_path = m.document.get_value(1).decode("utf-8") file_base = m.document.get_value(3).decode("utf-8") file_ctx = m.document.get_data().decode("utf-8") if hash(file_ctx) in v_hash: #去除重复内容 continue else: v_hash.append(hash(file_ctx)) #文件路径和文件中包含关键词(+整词)的数量,即计算文件名的相关性 cnt = cnt_keywords(f"{file_path}.{file_name}",all_words) r_cnt[doc_id] = cnt r2[doc_id] = [file_name,file_ctx,file_path,file_base] # 按值降序排序 sorted_cnt = sorted(r_cnt.items(), key=itemgetter(1), reverse=True) sorted_r_cnt = dict(sorted_cnt) # 二次检索,内容使用整词检索,排序 print("可信关联度",len(sorted_r_cnt),zhengci,percent) if len(sorted_r_cnt) >0: break else: percent -=20 #降低20个 #end for r2_cnt = {} #整词数量 for k1,v1 in sorted_r_cnt.items(): k,v,path,base = r2[k1] r2_cnt[k1] = cnt_keywords(v,zhengci) #print(k,v1,r2_cnt[k1]) # 按值降序排序,取前n个数据分片 n = len(sorted_r_cnt) if n >10: #前10+后续的一半数量 n = 10 + int((n-10)* 0.8) sorted_cnt2 = sorted(r2_cnt.items(), key=itemgetter(1), reverse=True) sorted_r2_cnt = dict(sorted_cnt2[0:n]) #开始进行分片内容的处理加工,提供更转确的信息给大模型 result=[] meta=[] #分片大小 size=512 if len(sorted_r2_cnt)<=5: size=size*8 elif len(sorted_r2_cnt)<=10: size=size*4 elif len(sorted_r2_cnt)<=20: size = size*2 # #信息的二次处理逻辑是要提取关键信息 for k1,v1 in sorted_r2_cnt.items(): k,v,path,base = r2[k1] v = clean_string(v) #最终内容 content="" #关键字首次出现的位置 pos={} for word in zhengci: if v.find(word)!=-1: pos[word]= v.find(word) # 按出现位置值升序排序,转换为数组[[k,count],[k,count]] word_pos = sorted(pos.items(), key=itemgetter(1), reverse=False) print("整词个数",len(word_pos),k) if len(word_pos)==0: content = v[0:int(size/2)] elif len(word_pos)==1: #一个关键字,一个size #判断关键字是否出现多次 count = v.count(word_pos[0][0]) if count==1: start = 0 if word_pos[0][1]-100 <0 else word_pos[0][1]-100 content = v[start:start+int(size)] else: pos=[] psize = int(size/count) # start = 0 for i in range(count): start = v.find(word_pos[0][0],start) if start ==-1: break content += v[start-50:start+psize-50] start += len(word_pos[0][0]) elif len(word_pos)==2: #两个关键字,最多两个size if v.count(word_pos[0][0]) >=2 and v.count(word_pos[1][0]) >=2: #关键词多次出现不好判断的 #全部内容 content = v else: #整词关键词只出现过一次的 start = 0 if word_pos[0][1]-100 <0 else word_pos[0][1]-100 if word_pos[1][1]- word_pos[0][1] < size: content = v[start:start+size] else: content = v[start:start+size] +"\n"+ v[word_pos[1][1]-50:word_pos[1][1]-50+size] else:#多关键字,3*size--4*size start = 0 if word_pos[0][1]-100 <0 else word_pos[0][1]-100 if word_pos[-1][1] - word_pos[0][1] < size: content = v[start:start+size] #滑动去找最后一个最关键的字 v2 = v start2 = word_pos[-1][1] while v2.rfind(word_pos[-1][0]) > word_pos[-1][1]: #最后一个关键字在文章中倒数的位置大于顺数的位置 start2 = v2.rfind(word_pos[-1][0]) #print(len(v2),start2) if len(v)-start2 >size: break else: v2 = v[0:start2] #end while last_start = 0 if start2-size <0 else start2-size #print("start2",word_pos[-1][0],start2,last_start) content +="\n"+v[last_start:last_start+size] content +="\n"+v[start2:start2+size] else: content = v[start:start+size] +"\n"+ v[word_pos[-1][1]-50:word_pos[-1][1]-50+size] if v.rfind(word_pos[-1][0]) > word_pos[-1][1]+size:#倒数那个词距离很远 start2 = v.rfind(word_pos[-1][0]) last_start = 0 if start2-size <0 else start2-size content +="\n"+v[last_start:last_start+size] content +="\n"+v[start2:start2+size] #end if result.append(content) meta.append([k,path,base]) #end for print("二次检索后分片数量",len(result)) return result,meta