diff --git a/main/web_search.py b/main/web_search.py new file mode 100644 index 0000000..92c86d7 --- /dev/null +++ b/main/web_search.py @@ -0,0 +1,404 @@ +import requests +from lxml import html,etree +import re + +#百度网页搜索 +def baidu_web(keywords): + # 百度搜索的URL模板 + url = "https://www.baidu.com/s" + + # 搜索关键词 + params = { + "wd": keywords + } + + headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0 Safari/537.36" + } + + response = requests.get(url, params=params, headers=headers) + + + # 设置编码格式(自动检测) + response.encoding = response.apparent_encoding + + pages=[] + # 确保响应正常 + if response.status_code == 200: + #print(response.content.decode("utf-8")) + # 使用 lxml 解析 HTML + tree = html.fromstring(response.content) + + # 使用 XPath 提取所有搜索结果标题和链接 + results = tree.xpath('//div[@id="content_left"]/div[contains(@class, "result")]') + + for result in results: + title = result.xpath('.//h3/a//text()') + link = result.xpath('.//h3/a/@href') + + title = ''.join(title).strip() if title else '' + link = link[0] if link else '' + + + # 使用 XPath 查找 data-module="abstract" 的 div + desc = result.xpath('.//div[@data-module="abstract"]//text()') + desc = ''.join(desc).strip() if title else '' + # print("标题:", title) + # print("链接:", link) + # print("摘要:", desc) + # print("-" * 80) + if title and link: + pages.append({"title":title,"link":link,"desc":desc}) + else: + print(f"请求失败,状态码:{response.status_code}") + + return pages + + +""" +单独的网页 +""" +def get_single_page(url,size=2048): + + headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0 Safari/537.36" + } + + try: + response = requests.get(url, headers=headers) + # 设置编码格式(自动检测) + response.encoding = response.apparent_encoding + # 确保响应正常 + if response.status_code == 200: + #print(response.text) + tree = html.fromstring(response.text) + all_text="" + # 获取标签下的所有文本 + body = tree.find('.//body') + if body is not None: + for tag in body.xpath('.//script | .//style | .//noscript'): + parent = tag.getparent() + if parent is not None: + parent.remove(tag) + + # 可选:移除内联事件(如 onclick) + for elem in body.xpath('.//*[@onclick or @onload or @onerror]'): + # 移除特定属性 + if 'onclick' in elem.attrib: + del elem.attrib['onclick'] + if 'onload' in elem.attrib: + del elem.attrib['onload'] + all_text = body.text_content() + all_text = all_text.strip() + #print(all_text) + + # 提取 title + title = tree.xpath('//title/text()') + title = title[0] if title else "" + + # 提取 keywords + keywords = tree.xpath('//meta[@name="keywords"]/@content') + keywords = keywords[0] if keywords else "" + + result={"code":200,"url":url,"title":title,"keywords":keywords,"content":all_text[0:size]} + except Exception as e : + result={"code":500,"url":url,"content":f'{e}'},None + return result,response.text + +#百度网页搜索 +def cn_bing_web(keywords): + # 百度搜索的URL模板 + url = "https://cn.bing.com/search" + + # 搜索关键词 + params = { + "q": keywords + } + + headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0 Safari/537.36" + } + + response = requests.get(url, params=params, headers=headers) + + + # 设置编码格式(自动检测) + response.encoding = response.apparent_encoding + + pages=[] + # 确保响应正常 + if response.status_code == 200: + #print(response.content.decode("utf-8")) + # 使用 lxml 解析 HTML + tree = html.fromstring(response.text) + + search_items = tree.xpath('//li[@class="b_algo"]') + + for item in search_items: + title = item.xpath('.//h2/a/text()') + link = item.xpath('.//h2/a/@href') + + title = ''.join(title).strip() if title else '' + link = link[0] if link else '' + + if title and link: + pages.append({"title":title,"link":link,"desc":title}) + + else: + print(f"请求失败,状态码:{response.status_code}") + + return pages + + +#联网搜索的主入口 +def search_web(keywords): + r0 = sogou_search(keywords) + r1 = baidu_web(keywords) + r2 = cn_bing_web(keywords) + print(f"搜狗[{len(r0)}],百度[{len(r1)}],必应[{len(r2)}]===={keywords}") + return r0+r1+r2 + +""" +页面的详细内容 +""" +def get_detail_page(pages,size=2048): + result=[] + for page in pages: + url = page["link"] + + headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0 Safari/537.36" + } + + try: + response = requests.get(url, headers=headers) + # 设置编码格式(自动检测) + response.encoding = response.apparent_encoding + # 确保响应正常 + if response.status_code == 200: + tree = html.fromstring(response.text) + # 获取标签下的所有文本 + body_element = tree.find('.//body') + if body_element is not None: + all_text = body_element.text_content() + result.append({"url":page["link"],"title":page["title"],"desc":page["desc"],"content":all_text[0:size]}) + except: + pass + #end for + return result + + + + +import asyncio +import aiohttp + +#单个页面 +async def fetch(session, url): + try: + async with session.get(url) as resp: + if resp.status == 200: + text = await resp.text() + body = html.fromstring(text) + # 增加对sogo重定向页面的处理 + + noscript_meta = body.xpath('//noscript/meta[@http-equiv="refresh"]/@content') + if noscript_meta: + content = noscript_meta[0] + # 匹配 URL='xxx' 或 URL=xxx + match = re.search(r'URL\s*=\s*["\']?([^"\'>\s]+)', content, re.IGNORECASE) + if match: + url = match.group(1) + return await fetch(session,url) + + # 获取标签下的所有文本 + for tag in body.xpath('.//script | .//style | .//noscript'): + parent = tag.getparent() + if parent is not None: + parent.remove(tag) + + # 可选:移除内联事件(如 onclick) + for elem in body.xpath('.//*[@onclick or @onload or @onerror]'): + # 移除特定属性 + if 'onclick' in elem.attrib: + del elem.attrib['onclick'] + if 'onload' in elem.attrib: + del elem.attrib['onload'] + all_text = body.text_content() + all_text = all_text.strip() + return {"url": url, "content": all_text[0:6000]} + except: + pass +#任务池 +async def task_pool(urls): + # 设置全局 headers + headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0 Safari/537.36" + } + + async with aiohttp.ClientSession(headers=headers) as session: + tasks = [fetch(session, url) for url in urls] + results = await asyncio.gather(*tasks) # 并发执行,收集结果 + + return results # 返回所有结果 + +#异步请求 +def get_detail_page2(pages): + result = asyncio.run(task_pool([page["link"] for page in pages])) + return [r for r in result if r ] + + +# +def test_web(keywords): + # 百度搜索的URL模板 + url = "https://www.sogou.com/web" + + # 搜索关键词 + params = { + "query": keywords + } + + headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0 Safari/537.36" + } + + response = requests.get(url, params=params, headers=headers) + + print(response.text) + + + +def sogou_search(keyword, timeout=10): + """ + 模拟搜狗网页搜索,返回解析后的结果列表 + :param keyword: 搜索关键词 + :param page: 页码 (从1开始) + :param timeout: 请求超时时间 + :return: 搜索结果列表 [{title, url, abstract}, ...] + """ + # 搜狗搜索URL构造 + url = f"https://www.sogou.com/web" + + # 搜索关键词 + params = { + "query": keyword + } + + + headers = { + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0 Safari/537.36', + 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', + 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', + 'Accept-Encoding': 'gzip, deflate, br', + 'Connection': 'keep-alive', + 'Referer': 'https://www.sogou.com/', + } + + try: + response = requests.get(url, params=params,headers=headers, timeout=timeout) + response.raise_for_status() + response.encoding = 'utf-8' + + html = etree.HTML(response.text) + + results = [] + # 搜狗搜索结果容器(可能随前端改版而变化) + result_nodes = html.xpath('//div[@class="vrwrap"] | //div[@class="rb"]') + + for node in result_nodes: + try: + # 标题 + title_node = node.xpath('.//h3/a') + if not title_node: + continue + title = ''.join(title_node[0].xpath('.//text()')).strip() + + # 链接 + href = title_node[0].get('href', '') + # 搜狗的链接是跳转链接,需要处理 + if href.startswith('/link?url='): + full_url = 'https://www.sogou.com' + href + # 可选:请求一次获取真实URL(会增加延迟和请求量) + real_url = get_real_url(full_url, headers, timeout) + else: + real_url = href + + # 摘要/描述 + abstract_parts = node.xpath('.//div[contains(@class, "fz-mid")]//text() | .//p[contains(@class, "str_info")]//text()') + abstract = ''.join(abstract_parts).strip().replace('\n', '').replace('\r', '').replace(' ', '') + + if title and real_url: + results.append({ + 'title': title, + 'link': real_url, + 'desc': abstract + }) + except Exception as e: + print(f"[解析单条结果出错]: {e}") + continue + + return results + + except Exception as e: + print(f"[请求失败]: {e}") + return [] + +def get_real_url(sogou_redirect_url, headers, timeout=10): + """ + (可选)访问搜狗跳转链接,获取真实目标URL + 注意:频繁请求可能触发反爬 + """ + try: + r = requests.get(sogou_redirect_url, headers=headers, timeout=timeout, allow_redirects=True) + return r.url + except: + return sogou_redirect_url # 失败则返回原跳转链接 + + +if __name__ == "__main__": + # keyword = "4b模型" + # print(f"正在搜索: {keyword}") + + # results = sogou_search(keyword) + + # for i, res in enumerate(results, 1): + # print(f"\n{i}. {res['title']}") + # print(f" URL: {res['url']}") + # print(f" 摘要: {res['abstract'][:100]}...") + + # print(f"\n共找到 {len(results)} 条结果") + import sys + + + question = "云智信安" + print("=========================Search=============================") + #r = cn_bing_web(question) + r = search_web(sys.argv[1]) + + + + print(len(r),r) + print("=========================Detail=============================") + pages = get_detail_page2(r) + + + for page in pages: + print(page) + + print(len(pages)) + + """ + # r2 = get_detail_page(r) + # print(r2) + print("===========================BAIDU===========================") + r1 = baidu_web(question) + print(len(r1),r1) + # r2 = get_detail_page(r) + # print(r2) + print("===========================R2===========================") + r4 = search_web(question) + print(len(r4),r4) + print("===========================get_detail_page2===========================") + r5 = get_detail_page2(r4) + print(len(r5),r5) + """ \ No newline at end of file