search_toolkit.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. import os
  2. import xml.etree.ElementTree as ET
  3. from typing import Any, Dict, List, Literal, Optional, TypeAlias, Union
  4. import requests
  5. from pqai_agent.toolkit.base import BaseToolkit
  6. from pqai_agent.toolkit.function_tool import FunctionTool
  7. class SearchToolkit(BaseToolkit):
  8. r"""A class representing a toolkit for web search.
  9. """
  10. def search_baidu(self, query: str, max_results: int = 5) -> Dict[str, Any]:
  11. r"""Search Baidu using web scraping to retrieve relevant search
  12. results. This method queries Baidu's search engine and extracts search
  13. results including titles, descriptions, and URLs.
  14. Args:
  15. query (str): Search query string to submit to Baidu.
  16. max_results (int): Maximum number of results to return.
  17. (default: :obj:`5`)
  18. Returns:
  19. Dict[str, Any]: A dictionary containing search results or error
  20. message.
  21. """
  22. from bs4 import BeautifulSoup
  23. try:
  24. url = "https://www.baidu.com/s"
  25. headers = {
  26. "User-Agent": (
  27. "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
  28. "AppleWebKit/537.36 (KHTML, like Gecko) "
  29. "Chrome/120.0.0.0 Safari/537.36"
  30. ),
  31. "Referer": "https://www.baidu.com",
  32. }
  33. params = {"wd": query, "rn": str(max_results)}
  34. response = requests.get(url, headers=headers, params=params)
  35. response.encoding = "utf-8"
  36. soup = BeautifulSoup(response.text, "html.parser")
  37. results = []
  38. for idx, item in enumerate(soup.select(".result"), 1):
  39. title_element = item.select_one("h3 > a")
  40. title = (
  41. title_element.get_text(strip=True) if title_element else ""
  42. )
  43. link = title_element["href"] if title_element else ""
  44. desc_element = item.select_one(".c-abstract, .c-span-last")
  45. desc = (
  46. desc_element.get_text(strip=True) if desc_element else ""
  47. )
  48. results.append(
  49. {
  50. "result_id": idx,
  51. "title": title,
  52. "description": desc,
  53. "url": link,
  54. }
  55. )
  56. if len(results) >= max_results:
  57. break
  58. if not results:
  59. print(
  60. "Warning: No results found. Check "
  61. "if Baidu HTML structure has changed."
  62. )
  63. return {"results": results}
  64. except Exception as e:
  65. return {"error": f"Baidu scraping error: {e!s}"}
  66. def search_bing(self, query: str, max_results: int = 5) -> Dict[str, Any]:
  67. r"""Use Bing search engine to search information for the given query.
  68. This function queries the Chinese version of Bing search engine (cn.
  69. bing.com) using web scraping to retrieve relevant search results. It
  70. extracts search results including titles, snippets, and URLs. This
  71. function is particularly useful when the query is in Chinese or when
  72. Chinese search results are desired.
  73. Args:
  74. query (str): The search query string to submit to Bing. Works best
  75. with Chinese queries or when Chinese results are preferred.
  76. max_results (int): Maximum number of results to return.
  77. (default: :obj:`5`)
  78. Returns:
  79. Dict ([str, Any]): A dictionary containing either:
  80. - 'results': A list of dictionaries, each with:
  81. - 'result_id': The index of the result.
  82. - 'snippet': A brief description of the search result.
  83. - 'title': The title of the search result.
  84. - 'link': The URL of the search result.
  85. - or 'error': An error message if something went wrong.
  86. """
  87. from typing import Any, Dict, List, cast
  88. from urllib.parse import urlencode
  89. from bs4 import BeautifulSoup, Tag
  90. try:
  91. query = urlencode({"q": query})
  92. url = f'https://cn.bing.com/search?{query}'
  93. headers = {
  94. "User-Agent": (
  95. "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
  96. "AppleWebKit/537.36 (KHTML, like Gecko) "
  97. "Chrome/120.0.0.0 Safari/537.36"
  98. ),
  99. }
  100. # Add timeout to prevent hanging
  101. response = requests.get(url, headers=headers, timeout=10)
  102. # Check if the request was successful
  103. if response.status_code != 200:
  104. return {
  105. "error": (
  106. f"Bing returned status code: "
  107. f"{response.status_code}"
  108. )
  109. }
  110. response.encoding = 'utf-8'
  111. soup = BeautifulSoup(response.text, 'html.parser')
  112. b_results_element = soup.find("ol", id="b_results")
  113. if b_results_element is None:
  114. return {"results": []}
  115. # Ensure b_results is a Tag and find all li elements
  116. b_results_tag = cast(Tag, b_results_element)
  117. result_items = b_results_tag.find_all("li")
  118. results: List[Dict[str, Any]] = []
  119. for i in range(min(len(result_items), max_results)):
  120. row = result_items[i]
  121. if not isinstance(row, Tag):
  122. continue
  123. h2_element = row.find("h2")
  124. if h2_element is None:
  125. continue
  126. h2_tag = cast(Tag, h2_element)
  127. title = h2_tag.get_text().strip()
  128. link_tag_element = h2_tag.find("a")
  129. if link_tag_element is None:
  130. continue
  131. link_tag = cast(Tag, link_tag_element)
  132. link = link_tag.get("href")
  133. if link is None:
  134. continue
  135. content_element = row.find("p", class_="b_algoSlug")
  136. content_text = ""
  137. if content_element is not None and isinstance(
  138. content_element, Tag
  139. ):
  140. content_text = content_element.get_text()
  141. row_data = {
  142. "result_id": i + 1,
  143. "snippet": content_text,
  144. "title": title,
  145. "link": link,
  146. }
  147. results.append(row_data)
  148. if not results:
  149. return {
  150. "warning": "No results found. Check if "
  151. "Bing HTML structure has changed."
  152. }
  153. return {"results": results}
  154. except Exception as e:
  155. return {"error": f"Bing scraping error: {e!s}"}
  156. def get_tools(self) -> List[FunctionTool]:
  157. r"""Returns a list of FunctionTool objects representing the
  158. functions in the toolkit.
  159. Returns:
  160. List[FunctionTool]: A list of FunctionTool objects
  161. representing the functions in the toolkit.
  162. """
  163. return [
  164. FunctionTool(self.search_baidu),
  165. FunctionTool(self.search_bing),
  166. ]