search_toolkit.py 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230
  1. from typing import Any, Dict, List
  2. import requests
  3. from pqai_agent.toolkit.base import BaseToolkit
  4. from pqai_agent.toolkit.function_tool import FunctionTool
  5. class SearchToolkit(BaseToolkit):
  6. r"""A class representing a toolkit for web search.
  7. """
  8. def search_baidu(self, query: str, max_results: int = 5) -> Dict[str, Any]:
  9. r"""Search Baidu using web scraping to retrieve relevant search
  10. results. This method queries Baidu's search engine and extracts search
  11. results including titles, descriptions, and URLs.
  12. Args:
  13. query (str): Search query string to submit to Baidu.
  14. max_results (int): Maximum number of results to return.
  15. (default: :obj:`5`)
  16. Returns:
  17. Dict[str, Any]: A dictionary containing search results or error
  18. message.
  19. """
  20. from bs4 import BeautifulSoup
  21. try:
  22. url = "https://www.baidu.com/s"
  23. headers = {
  24. "User-Agent": (
  25. "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
  26. "AppleWebKit/537.36 (KHTML, like Gecko) "
  27. "Chrome/120.0.0.0 Safari/537.36"
  28. ),
  29. "Referer": "https://www.baidu.com",
  30. }
  31. params = {"wd": query, "rn": str(max_results)}
  32. response = requests.get(url, headers=headers, params=params)
  33. response.encoding = "utf-8"
  34. soup = BeautifulSoup(response.text, "html.parser")
  35. results = []
  36. for idx, item in enumerate(soup.select(".result"), 1):
  37. title_element = item.select_one("h3 > a")
  38. title = (
  39. title_element.get_text(strip=True) if title_element else ""
  40. )
  41. link = title_element["href"] if title_element else ""
  42. desc_element = item.select_one(".c-abstract, .c-span-last")
  43. desc = (
  44. desc_element.get_text(strip=True) if desc_element else ""
  45. )
  46. results.append(
  47. {
  48. "result_id": idx,
  49. "title": title,
  50. "description": desc,
  51. "url": link,
  52. }
  53. )
  54. if len(results) >= max_results:
  55. break
  56. if not results:
  57. print(
  58. "Warning: No results found. Check "
  59. "if Baidu HTML structure has changed."
  60. )
  61. return {"results": results}
  62. except Exception as e:
  63. return {"error": f"Baidu scraping error: {e!s}"}
  64. def search_bing(self, query: str, max_results: int = 5) -> Dict[str, Any]:
  65. r"""Use Bing search engine to search information for the given query.
  66. This function queries the Chinese version of Bing search engine (cn.
  67. bing.com) using web scraping to retrieve relevant search results. It
  68. extracts search results including titles, snippets, and URLs. This
  69. function is particularly useful when the query is in Chinese or when
  70. Chinese search results are desired.
  71. Args:
  72. query (str): The search query string to submit to Bing. Works best
  73. with Chinese queries or when Chinese results are preferred.
  74. max_results (int): Maximum number of results to return.
  75. (default: :obj:`5`)
  76. Returns:
  77. Dict ([str, Any]): A dictionary containing either:
  78. - 'results': A list of dictionaries, each with:
  79. - 'result_id': The index of the result.
  80. - 'snippet': A brief description of the search result.
  81. - 'title': The title of the search result.
  82. - 'link': The URL of the search result.
  83. - or 'error': An error message if something went wrong.
  84. """
  85. from typing import Any, Dict, List, cast
  86. from urllib.parse import urlencode
  87. from bs4 import BeautifulSoup, Tag
  88. try:
  89. query = urlencode({"q": query})
  90. url = f'https://cn.bing.com/search?{query}'
  91. headers = {
  92. "User-Agent": (
  93. "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
  94. "AppleWebKit/537.36 (KHTML, like Gecko) "
  95. "Chrome/120.0.0.0 Safari/537.36"
  96. ),
  97. }
  98. # Add timeout to prevent hanging
  99. response = requests.get(url, headers=headers, timeout=10)
  100. # Check if the request was successful
  101. if response.status_code != 200:
  102. return {
  103. "error": (
  104. f"Bing returned status code: "
  105. f"{response.status_code}"
  106. )
  107. }
  108. response.encoding = 'utf-8'
  109. soup = BeautifulSoup(response.text, 'html.parser')
  110. b_results_element = soup.find("ol", id="b_results")
  111. if b_results_element is None:
  112. return {"results": []}
  113. # Ensure b_results is a Tag and find all li elements
  114. b_results_tag = cast(Tag, b_results_element)
  115. result_items = b_results_tag.find_all("li")
  116. results: List[Dict[str, Any]] = []
  117. for i in range(min(len(result_items), max_results)):
  118. row = result_items[i]
  119. if not isinstance(row, Tag):
  120. continue
  121. h2_element = row.find("h2")
  122. if h2_element is None:
  123. continue
  124. h2_tag = cast(Tag, h2_element)
  125. title = h2_tag.get_text().strip()
  126. link_tag_element = h2_tag.find("a")
  127. if link_tag_element is None:
  128. continue
  129. link_tag = cast(Tag, link_tag_element)
  130. link = link_tag.get("href")
  131. if link is None:
  132. continue
  133. content_element = row.find("p", class_="b_algoSlug")
  134. content_text = ""
  135. if content_element is not None and isinstance(
  136. content_element, Tag
  137. ):
  138. content_text = content_element.get_text()
  139. row_data = {
  140. "result_id": i + 1,
  141. "snippet": content_text,
  142. "title": title,
  143. "link": link,
  144. }
  145. results.append(row_data)
  146. if not results:
  147. return {
  148. "warning": "No results found. Check if "
  149. "Bing HTML structure has changed."
  150. }
  151. return {"results": results}
  152. except Exception as e:
  153. return {"error": f"Bing scraping error: {e!s}"}
  154. def aiddit_search(self, keyword: str) -> Dict[str, Any]:
  155. r"""Search using Aiddit API.
  156. Args:
  157. keyword (str): The search keyword.
  158. Returns:
  159. Dict[str, Any]: A dictionary containing search results.
  160. """
  161. url = "http://smcp-api.aiddit.com/mcp/custom/search"
  162. headers = {
  163. "Content-Type": "application/json",
  164. }
  165. data = {
  166. "keyword": keyword
  167. }
  168. try:
  169. response = requests.post(url, headers=headers, json=data)
  170. response.raise_for_status()
  171. resp_json = response.json()
  172. if resp_json.get('code') != 0:
  173. return {"error": f"Aiddit search error: {resp_json.get('message', 'Unknown error')}"}
  174. resp_data = resp_json['data']
  175. results = resp_data.get('results', [])[:5]
  176. return {'results': results} # Limit to 5 results
  177. except requests.RequestException as e:
  178. return {"error": f"Aiddit search error: {e!s}"}
  179. def get_tools(self) -> List[FunctionTool]:
  180. r"""Returns a list of FunctionTool objects representing the
  181. functions in the toolkit.
  182. Returns:
  183. List[FunctionTool]: A list of FunctionTool objects
  184. representing the functions in the toolkit.
  185. """
  186. return [
  187. FunctionTool(self.search_baidu),
  188. FunctionTool(self.search_bing),
  189. FunctionTool(self.aiddit_search),
  190. ]