Files
k3GPT/main/web_search.py
2025-11-19 19:42:51 +08:00

404 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import requests
from lxml import html,etree
import re
#百度网页搜索
def baidu_web(keywords):
# 百度搜索的URL模板
url = "https://www.baidu.com/s"
# 搜索关键词
params = {
"wd": keywords
}
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0 Safari/537.36"
}
response = requests.get(url, params=params, headers=headers)
# 设置编码格式(自动检测)
response.encoding = response.apparent_encoding
pages=[]
# 确保响应正常
if response.status_code == 200:
#print(response.content.decode("utf-8"))
# 使用 lxml 解析 HTML
tree = html.fromstring(response.content)
# 使用 XPath 提取所有搜索结果标题和链接
results = tree.xpath('//div[@id="content_left"]/div[contains(@class, "result")]')
for result in results:
title = result.xpath('.//h3/a//text()')
link = result.xpath('.//h3/a/@href')
title = ''.join(title).strip() if title else ''
link = link[0] if link else ''
# 使用 XPath 查找 data-module="abstract" 的 div
desc = result.xpath('.//div[@data-module="abstract"]//text()')
desc = ''.join(desc).strip() if title else ''
# print("标题:", title)
# print("链接:", link)
# print("摘要:", desc)
# print("-" * 80)
if title and link:
pages.append({"title":title,"link":link,"desc":desc})
else:
print(f"请求失败,状态码:{response.status_code}")
return pages
"""
单独的网页
"""
def get_single_page(url,size=2048):
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0 Safari/537.36"
}
try:
response = requests.get(url, headers=headers)
# 设置编码格式(自动检测)
response.encoding = response.apparent_encoding
# 确保响应正常
if response.status_code == 200:
#print(response.text)
tree = html.fromstring(response.text)
all_text=""
# 获取<body>标签下的所有文本
body = tree.find('.//body')
if body is not None:
for tag in body.xpath('.//script | .//style | .//noscript'):
parent = tag.getparent()
if parent is not None:
parent.remove(tag)
# 可选:移除内联事件(如 onclick
for elem in body.xpath('.//*[@onclick or @onload or @onerror]'):
# 移除特定属性
if 'onclick' in elem.attrib:
del elem.attrib['onclick']
if 'onload' in elem.attrib:
del elem.attrib['onload']
all_text = body.text_content()
all_text = all_text.strip()
#print(all_text)
# 提取 title
title = tree.xpath('//title/text()')
title = title[0] if title else ""
# 提取 keywords
keywords = tree.xpath('//meta[@name="keywords"]/@content')
keywords = keywords[0] if keywords else ""
result={"code":200,"url":url,"title":title,"keywords":keywords,"content":all_text[0:size]}
except Exception as e :
result={"code":500,"url":url,"content":f'{e}'},None
return result,response.text
#百度网页搜索
def cn_bing_web(keywords):
# 百度搜索的URL模板
url = "https://cn.bing.com/search"
# 搜索关键词
params = {
"q": keywords
}
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0 Safari/537.36"
}
response = requests.get(url, params=params, headers=headers)
# 设置编码格式(自动检测)
response.encoding = response.apparent_encoding
pages=[]
# 确保响应正常
if response.status_code == 200:
#print(response.content.decode("utf-8"))
# 使用 lxml 解析 HTML
tree = html.fromstring(response.text)
search_items = tree.xpath('//li[@class="b_algo"]')
for item in search_items:
title = item.xpath('.//h2/a/text()')
link = item.xpath('.//h2/a/@href')
title = ''.join(title).strip() if title else ''
link = link[0] if link else ''
if title and link:
pages.append({"title":title,"link":link,"desc":title})
else:
print(f"请求失败,状态码:{response.status_code}")
return pages
#联网搜索的主入口
def search_web(keywords):
r0 = sogou_search(keywords)
r1 = baidu_web(keywords)
r2 = cn_bing_web(keywords)
print(f"搜狗[{len(r0)}],百度[{len(r1)}],必应[{len(r2)}]===={keywords}")
return r0+r1+r2
"""
页面的详细内容
"""
def get_detail_page(pages,size=2048):
result=[]
for page in pages:
url = page["link"]
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0 Safari/537.36"
}
try:
response = requests.get(url, headers=headers)
# 设置编码格式(自动检测)
response.encoding = response.apparent_encoding
# 确保响应正常
if response.status_code == 200:
tree = html.fromstring(response.text)
# 获取<body>标签下的所有文本
body_element = tree.find('.//body')
if body_element is not None:
all_text = body_element.text_content()
result.append({"url":page["link"],"title":page["title"],"desc":page["desc"],"content":all_text[0:size]})
except:
pass
#end for
return result
import asyncio
import aiohttp
#单个页面
async def fetch(session, url):
try:
async with session.get(url) as resp:
if resp.status == 200:
text = await resp.text()
body = html.fromstring(text)
# 增加对sogo重定向页面的处理
noscript_meta = body.xpath('//noscript/meta[@http-equiv="refresh"]/@content')
if noscript_meta:
content = noscript_meta[0]
# 匹配 URL='xxx' 或 URL=xxx
match = re.search(r'URL\s*=\s*["\']?([^"\'>\s]+)', content, re.IGNORECASE)
if match:
url = match.group(1)
return await fetch(session,url)
# 获取<body>标签下的所有文本
for tag in body.xpath('.//script | .//style | .//noscript'):
parent = tag.getparent()
if parent is not None:
parent.remove(tag)
# 可选:移除内联事件(如 onclick
for elem in body.xpath('.//*[@onclick or @onload or @onerror]'):
# 移除特定属性
if 'onclick' in elem.attrib:
del elem.attrib['onclick']
if 'onload' in elem.attrib:
del elem.attrib['onload']
all_text = body.text_content()
all_text = all_text.strip()
return {"url": url, "content": all_text[0:6000]}
except:
pass
#任务池
async def task_pool(urls):
# 设置全局 headers
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0 Safari/537.36"
}
async with aiohttp.ClientSession(headers=headers) as session:
tasks = [fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks) # 并发执行,收集结果
return results # 返回所有结果
#异步请求
def get_detail_page2(pages):
result = asyncio.run(task_pool([page["link"] for page in pages]))
return [r for r in result if r ]
#
def test_web(keywords):
# 百度搜索的URL模板
url = "https://www.sogou.com/web"
# 搜索关键词
params = {
"query": keywords
}
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0 Safari/537.36"
}
response = requests.get(url, params=params, headers=headers)
print(response.text)
def sogou_search(keyword, timeout=10):
"""
模拟搜狗网页搜索,返回解析后的结果列表
:param keyword: 搜索关键词
:param page: 页码 (从1开始)
:param timeout: 请求超时时间
:return: 搜索结果列表 [{title, url, abstract}, ...]
"""
# 搜狗搜索URL构造
url = f"https://www.sogou.com/web"
# 搜索关键词
params = {
"query": keyword
}
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Referer': 'https://www.sogou.com/',
}
try:
response = requests.get(url, params=params,headers=headers, timeout=timeout)
response.raise_for_status()
response.encoding = 'utf-8'
html = etree.HTML(response.text)
results = []
# 搜狗搜索结果容器(可能随前端改版而变化)
result_nodes = html.xpath('//div[@class="vrwrap"] | //div[@class="rb"]')
for node in result_nodes:
try:
# 标题
title_node = node.xpath('.//h3/a')
if not title_node:
continue
title = ''.join(title_node[0].xpath('.//text()')).strip()
# 链接
href = title_node[0].get('href', '')
# 搜狗的链接是跳转链接,需要处理
if href.startswith('/link?url='):
full_url = 'https://www.sogou.com' + href
# 可选请求一次获取真实URL会增加延迟和请求量
real_url = get_real_url(full_url, headers, timeout)
else:
real_url = href
# 摘要/描述
abstract_parts = node.xpath('.//div[contains(@class, "fz-mid")]//text() | .//p[contains(@class, "str_info")]//text()')
abstract = ''.join(abstract_parts).strip().replace('\n', '').replace('\r', '').replace(' ', '')
if title and real_url:
results.append({
'title': title,
'link': real_url,
'desc': abstract
})
except Exception as e:
print(f"[解析单条结果出错]: {e}")
continue
return results
except Exception as e:
print(f"[请求失败]: {e}")
return []
def get_real_url(sogou_redirect_url, headers, timeout=10):
"""
可选访问搜狗跳转链接获取真实目标URL
注意:频繁请求可能触发反爬
"""
try:
r = requests.get(sogou_redirect_url, headers=headers, timeout=timeout, allow_redirects=True)
return r.url
except:
return sogou_redirect_url # 失败则返回原跳转链接
if __name__ == "__main__":
# keyword = "4b模型"
# print(f"正在搜索: {keyword}")
# results = sogou_search(keyword)
# for i, res in enumerate(results, 1):
# print(f"\n{i}. {res['title']}")
# print(f" URL: {res['url']}")
# print(f" 摘要: {res['abstract'][:100]}...")
# print(f"\n共找到 {len(results)} 条结果")
import sys
question = "云智信安"
print("=========================Search=============================")
#r = cn_bing_web(question)
r = search_web(sys.argv[1])
print(len(r),r)
print("=========================Detail=============================")
pages = get_detail_page2(r)
for page in pages:
print(page)
print(len(pages))
"""
# r2 = get_detail_page(r)
# print(r2)
print("===========================BAIDU===========================")
r1 = baidu_web(question)
print(len(r1),r1)
# r2 = get_detail_page(r)
# print(r2)
print("===========================R2===========================")
r4 = search_web(question)
print(len(r4),r4)
print("===========================get_detail_page2===========================")
r5 = get_detail_page2(r4)
print(len(r5),r5)
"""