Add File
This commit is contained in:
404
main/web_search.py
Normal file
404
main/web_search.py
Normal file
@@ -0,0 +1,404 @@
|
||||
import requests
|
||||
from lxml import html,etree
|
||||
import re
|
||||
|
||||
#百度网页搜索
|
||||
def baidu_web(keywords):
|
||||
# 百度搜索的URL模板
|
||||
url = "https://www.baidu.com/s"
|
||||
|
||||
# 搜索关键词
|
||||
params = {
|
||||
"wd": keywords
|
||||
}
|
||||
|
||||
headers = {
|
||||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0 Safari/537.36"
|
||||
}
|
||||
|
||||
response = requests.get(url, params=params, headers=headers)
|
||||
|
||||
|
||||
# 设置编码格式(自动检测)
|
||||
response.encoding = response.apparent_encoding
|
||||
|
||||
pages=[]
|
||||
# 确保响应正常
|
||||
if response.status_code == 200:
|
||||
#print(response.content.decode("utf-8"))
|
||||
# 使用 lxml 解析 HTML
|
||||
tree = html.fromstring(response.content)
|
||||
|
||||
# 使用 XPath 提取所有搜索结果标题和链接
|
||||
results = tree.xpath('//div[@id="content_left"]/div[contains(@class, "result")]')
|
||||
|
||||
for result in results:
|
||||
title = result.xpath('.//h3/a//text()')
|
||||
link = result.xpath('.//h3/a/@href')
|
||||
|
||||
title = ''.join(title).strip() if title else ''
|
||||
link = link[0] if link else ''
|
||||
|
||||
|
||||
# 使用 XPath 查找 data-module="abstract" 的 div
|
||||
desc = result.xpath('.//div[@data-module="abstract"]//text()')
|
||||
desc = ''.join(desc).strip() if title else ''
|
||||
# print("标题:", title)
|
||||
# print("链接:", link)
|
||||
# print("摘要:", desc)
|
||||
# print("-" * 80)
|
||||
if title and link:
|
||||
pages.append({"title":title,"link":link,"desc":desc})
|
||||
else:
|
||||
print(f"请求失败,状态码:{response.status_code}")
|
||||
|
||||
return pages
|
||||
|
||||
|
||||
"""
|
||||
单独的网页
|
||||
"""
|
||||
def get_single_page(url,size=2048):
|
||||
|
||||
headers = {
|
||||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0 Safari/537.36"
|
||||
}
|
||||
|
||||
try:
|
||||
response = requests.get(url, headers=headers)
|
||||
# 设置编码格式(自动检测)
|
||||
response.encoding = response.apparent_encoding
|
||||
# 确保响应正常
|
||||
if response.status_code == 200:
|
||||
#print(response.text)
|
||||
tree = html.fromstring(response.text)
|
||||
all_text=""
|
||||
# 获取<body>标签下的所有文本
|
||||
body = tree.find('.//body')
|
||||
if body is not None:
|
||||
for tag in body.xpath('.//script | .//style | .//noscript'):
|
||||
parent = tag.getparent()
|
||||
if parent is not None:
|
||||
parent.remove(tag)
|
||||
|
||||
# 可选:移除内联事件(如 onclick)
|
||||
for elem in body.xpath('.//*[@onclick or @onload or @onerror]'):
|
||||
# 移除特定属性
|
||||
if 'onclick' in elem.attrib:
|
||||
del elem.attrib['onclick']
|
||||
if 'onload' in elem.attrib:
|
||||
del elem.attrib['onload']
|
||||
all_text = body.text_content()
|
||||
all_text = all_text.strip()
|
||||
#print(all_text)
|
||||
|
||||
# 提取 title
|
||||
title = tree.xpath('//title/text()')
|
||||
title = title[0] if title else ""
|
||||
|
||||
# 提取 keywords
|
||||
keywords = tree.xpath('//meta[@name="keywords"]/@content')
|
||||
keywords = keywords[0] if keywords else ""
|
||||
|
||||
result={"code":200,"url":url,"title":title,"keywords":keywords,"content":all_text[0:size]}
|
||||
except Exception as e :
|
||||
result={"code":500,"url":url,"content":f'{e}'},None
|
||||
return result,response.text
|
||||
|
||||
#百度网页搜索
|
||||
def cn_bing_web(keywords):
|
||||
# 百度搜索的URL模板
|
||||
url = "https://cn.bing.com/search"
|
||||
|
||||
# 搜索关键词
|
||||
params = {
|
||||
"q": keywords
|
||||
}
|
||||
|
||||
headers = {
|
||||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0 Safari/537.36"
|
||||
}
|
||||
|
||||
response = requests.get(url, params=params, headers=headers)
|
||||
|
||||
|
||||
# 设置编码格式(自动检测)
|
||||
response.encoding = response.apparent_encoding
|
||||
|
||||
pages=[]
|
||||
# 确保响应正常
|
||||
if response.status_code == 200:
|
||||
#print(response.content.decode("utf-8"))
|
||||
# 使用 lxml 解析 HTML
|
||||
tree = html.fromstring(response.text)
|
||||
|
||||
search_items = tree.xpath('//li[@class="b_algo"]')
|
||||
|
||||
for item in search_items:
|
||||
title = item.xpath('.//h2/a/text()')
|
||||
link = item.xpath('.//h2/a/@href')
|
||||
|
||||
title = ''.join(title).strip() if title else ''
|
||||
link = link[0] if link else ''
|
||||
|
||||
if title and link:
|
||||
pages.append({"title":title,"link":link,"desc":title})
|
||||
|
||||
else:
|
||||
print(f"请求失败,状态码:{response.status_code}")
|
||||
|
||||
return pages
|
||||
|
||||
|
||||
#联网搜索的主入口
|
||||
def search_web(keywords):
|
||||
r0 = sogou_search(keywords)
|
||||
r1 = baidu_web(keywords)
|
||||
r2 = cn_bing_web(keywords)
|
||||
print(f"搜狗[{len(r0)}],百度[{len(r1)}],必应[{len(r2)}]===={keywords}")
|
||||
return r0+r1+r2
|
||||
|
||||
"""
|
||||
页面的详细内容
|
||||
"""
|
||||
def get_detail_page(pages,size=2048):
|
||||
result=[]
|
||||
for page in pages:
|
||||
url = page["link"]
|
||||
|
||||
headers = {
|
||||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0 Safari/537.36"
|
||||
}
|
||||
|
||||
try:
|
||||
response = requests.get(url, headers=headers)
|
||||
# 设置编码格式(自动检测)
|
||||
response.encoding = response.apparent_encoding
|
||||
# 确保响应正常
|
||||
if response.status_code == 200:
|
||||
tree = html.fromstring(response.text)
|
||||
# 获取<body>标签下的所有文本
|
||||
body_element = tree.find('.//body')
|
||||
if body_element is not None:
|
||||
all_text = body_element.text_content()
|
||||
result.append({"url":page["link"],"title":page["title"],"desc":page["desc"],"content":all_text[0:size]})
|
||||
except:
|
||||
pass
|
||||
#end for
|
||||
return result
|
||||
|
||||
|
||||
|
||||
|
||||
import asyncio
|
||||
import aiohttp
|
||||
|
||||
#单个页面
|
||||
async def fetch(session, url):
|
||||
try:
|
||||
async with session.get(url) as resp:
|
||||
if resp.status == 200:
|
||||
text = await resp.text()
|
||||
body = html.fromstring(text)
|
||||
# 增加对sogo重定向页面的处理
|
||||
|
||||
noscript_meta = body.xpath('//noscript/meta[@http-equiv="refresh"]/@content')
|
||||
if noscript_meta:
|
||||
content = noscript_meta[0]
|
||||
# 匹配 URL='xxx' 或 URL=xxx
|
||||
match = re.search(r'URL\s*=\s*["\']?([^"\'>\s]+)', content, re.IGNORECASE)
|
||||
if match:
|
||||
url = match.group(1)
|
||||
return await fetch(session,url)
|
||||
|
||||
# 获取<body>标签下的所有文本
|
||||
for tag in body.xpath('.//script | .//style | .//noscript'):
|
||||
parent = tag.getparent()
|
||||
if parent is not None:
|
||||
parent.remove(tag)
|
||||
|
||||
# 可选:移除内联事件(如 onclick)
|
||||
for elem in body.xpath('.//*[@onclick or @onload or @onerror]'):
|
||||
# 移除特定属性
|
||||
if 'onclick' in elem.attrib:
|
||||
del elem.attrib['onclick']
|
||||
if 'onload' in elem.attrib:
|
||||
del elem.attrib['onload']
|
||||
all_text = body.text_content()
|
||||
all_text = all_text.strip()
|
||||
return {"url": url, "content": all_text[0:6000]}
|
||||
except:
|
||||
pass
|
||||
#任务池
|
||||
async def task_pool(urls):
|
||||
# 设置全局 headers
|
||||
headers = {
|
||||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0 Safari/537.36"
|
||||
}
|
||||
|
||||
async with aiohttp.ClientSession(headers=headers) as session:
|
||||
tasks = [fetch(session, url) for url in urls]
|
||||
results = await asyncio.gather(*tasks) # 并发执行,收集结果
|
||||
|
||||
return results # 返回所有结果
|
||||
|
||||
#异步请求
|
||||
def get_detail_page2(pages):
|
||||
result = asyncio.run(task_pool([page["link"] for page in pages]))
|
||||
return [r for r in result if r ]
|
||||
|
||||
|
||||
#
|
||||
def test_web(keywords):
|
||||
# 百度搜索的URL模板
|
||||
url = "https://www.sogou.com/web"
|
||||
|
||||
# 搜索关键词
|
||||
params = {
|
||||
"query": keywords
|
||||
}
|
||||
|
||||
headers = {
|
||||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0 Safari/537.36"
|
||||
}
|
||||
|
||||
response = requests.get(url, params=params, headers=headers)
|
||||
|
||||
print(response.text)
|
||||
|
||||
|
||||
|
||||
def sogou_search(keyword, timeout=10):
|
||||
"""
|
||||
模拟搜狗网页搜索,返回解析后的结果列表
|
||||
:param keyword: 搜索关键词
|
||||
:param page: 页码 (从1开始)
|
||||
:param timeout: 请求超时时间
|
||||
:return: 搜索结果列表 [{title, url, abstract}, ...]
|
||||
"""
|
||||
# 搜狗搜索URL构造
|
||||
url = f"https://www.sogou.com/web"
|
||||
|
||||
# 搜索关键词
|
||||
params = {
|
||||
"query": keyword
|
||||
}
|
||||
|
||||
|
||||
headers = {
|
||||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0 Safari/537.36',
|
||||
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
|
||||
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
|
||||
'Accept-Encoding': 'gzip, deflate, br',
|
||||
'Connection': 'keep-alive',
|
||||
'Referer': 'https://www.sogou.com/',
|
||||
}
|
||||
|
||||
try:
|
||||
response = requests.get(url, params=params,headers=headers, timeout=timeout)
|
||||
response.raise_for_status()
|
||||
response.encoding = 'utf-8'
|
||||
|
||||
html = etree.HTML(response.text)
|
||||
|
||||
results = []
|
||||
# 搜狗搜索结果容器(可能随前端改版而变化)
|
||||
result_nodes = html.xpath('//div[@class="vrwrap"] | //div[@class="rb"]')
|
||||
|
||||
for node in result_nodes:
|
||||
try:
|
||||
# 标题
|
||||
title_node = node.xpath('.//h3/a')
|
||||
if not title_node:
|
||||
continue
|
||||
title = ''.join(title_node[0].xpath('.//text()')).strip()
|
||||
|
||||
# 链接
|
||||
href = title_node[0].get('href', '')
|
||||
# 搜狗的链接是跳转链接,需要处理
|
||||
if href.startswith('/link?url='):
|
||||
full_url = 'https://www.sogou.com' + href
|
||||
# 可选:请求一次获取真实URL(会增加延迟和请求量)
|
||||
real_url = get_real_url(full_url, headers, timeout)
|
||||
else:
|
||||
real_url = href
|
||||
|
||||
# 摘要/描述
|
||||
abstract_parts = node.xpath('.//div[contains(@class, "fz-mid")]//text() | .//p[contains(@class, "str_info")]//text()')
|
||||
abstract = ''.join(abstract_parts).strip().replace('\n', '').replace('\r', '').replace(' ', '')
|
||||
|
||||
if title and real_url:
|
||||
results.append({
|
||||
'title': title,
|
||||
'link': real_url,
|
||||
'desc': abstract
|
||||
})
|
||||
except Exception as e:
|
||||
print(f"[解析单条结果出错]: {e}")
|
||||
continue
|
||||
|
||||
return results
|
||||
|
||||
except Exception as e:
|
||||
print(f"[请求失败]: {e}")
|
||||
return []
|
||||
|
||||
def get_real_url(sogou_redirect_url, headers, timeout=10):
|
||||
"""
|
||||
(可选)访问搜狗跳转链接,获取真实目标URL
|
||||
注意:频繁请求可能触发反爬
|
||||
"""
|
||||
try:
|
||||
r = requests.get(sogou_redirect_url, headers=headers, timeout=timeout, allow_redirects=True)
|
||||
return r.url
|
||||
except:
|
||||
return sogou_redirect_url # 失败则返回原跳转链接
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# keyword = "4b模型"
|
||||
# print(f"正在搜索: {keyword}")
|
||||
|
||||
# results = sogou_search(keyword)
|
||||
|
||||
# for i, res in enumerate(results, 1):
|
||||
# print(f"\n{i}. {res['title']}")
|
||||
# print(f" URL: {res['url']}")
|
||||
# print(f" 摘要: {res['abstract'][:100]}...")
|
||||
|
||||
# print(f"\n共找到 {len(results)} 条结果")
|
||||
import sys
|
||||
|
||||
|
||||
question = "云智信安"
|
||||
print("=========================Search=============================")
|
||||
#r = cn_bing_web(question)
|
||||
r = search_web(sys.argv[1])
|
||||
|
||||
|
||||
|
||||
print(len(r),r)
|
||||
print("=========================Detail=============================")
|
||||
pages = get_detail_page2(r)
|
||||
|
||||
|
||||
for page in pages:
|
||||
print(page)
|
||||
|
||||
print(len(pages))
|
||||
|
||||
"""
|
||||
# r2 = get_detail_page(r)
|
||||
# print(r2)
|
||||
print("===========================BAIDU===========================")
|
||||
r1 = baidu_web(question)
|
||||
print(len(r1),r1)
|
||||
# r2 = get_detail_page(r)
|
||||
# print(r2)
|
||||
print("===========================R2===========================")
|
||||
r4 = search_web(question)
|
||||
print(len(r4),r4)
|
||||
print("===========================get_detail_page2===========================")
|
||||
r5 = get_detail_page2(r4)
|
||||
print(len(r5),r5)
|
||||
"""
|
||||
Reference in New Issue
Block a user