import dashscope class QwenClient: def __init__(self): self.api_key = "sk-1022fe8e15ff4e0e9abc20541b281165" def chat(self, model="qwen3-max", system_prompt="You are a helpful assistant.", user_prompt=""): """ 普通聊天,不使用搜索功能 Args: model: 模型名称,默认为qwen3-max system_prompt: 系统提示词 user_prompt: 用户提示词 Returns: str: AI回复内容 """ try: messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}, ] response = dashscope.Generation.call( api_key=self.api_key, model=model, messages=messages, result_format="message" ) if response.status_code != 200: raise Exception(f"API调用失败: {response.message}") return response["output"]["choices"][0]["message"]["content"] except Exception as e: raise Exception(f"QwenClient chat失败: {str(e)}") def search_and_chat(self, model="qwen3-max", system_prompt="You are a helpful assistant.", user_prompt="", search_strategy="max"): """ 搜索并聊天 Args: model: 模型名称,默认为qwen3-max system_prompt: 系统提示词 user_prompt: 用户提示词 search_strategy: 搜索策略,可选值: turbo, max, agent Returns: dict: 包含回复内容和搜索结果 """ try: messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}, ] response = dashscope.Generation.call( api_key=self.api_key, model=model, messages=messages, enable_search=True, search_options={ "forced_search": True, "enable_source": True, "search_strategy": search_strategy }, result_format="message" ) if response.status_code != 200: raise Exception(f"API调用失败: {response.message}") content = response["output"]["choices"][0]["message"]["content"] search_results = [] if hasattr(response.output, 'search_info') and response.output.search_info: search_results = response.output.search_info.get("search_results", []) return { "content": content, "search_results": search_results } except Exception as e: raise Exception(f"QwenClient search_and_chat失败: {str(e)}") if __name__ == "__main__": client = QwenClient() # 测试 try: # result = client.chat(user_prompt="hello") # print(result) user_prompt = """你是一个专业的信息搜索专家,负责从网络中搜索某个工具的页面操作路径 **名词解释** 页面操作路径:表示在网页上使用某个工具完成某个功能需要进行的必要操作。通常是分步骤操作,最后形成一个完整的页面操作路径。 比如:1. 打开xxx网站, 2. 输入xxx提示词, 3. 点击确认按钮, 4. 等待图片生成完成, 5. 返回图片的url **任务目标** 搜索并整理 新红热搜词榜单功能 的页面操作路径 **数据要求** - 操作页面必须是官方网站,排除任何第三方网站、移动端APP、PC软件 - 页面操作路径数据最好有详细的操作步骤,如果没有,也可以是简单的操作步骤描述。步骤中如果包含账号注册/登录,需要去除,我们假设网站已经成功登录了 - 页面操作路径数据要排除关于API的调用数据 - 保留原始链接用于追溯,输出在 content_link 字段 - 整理工具的核心功能名称和功能描述 - 如果有多份数据,保留和任务目标最相关的一份数据即可 **输出要求:** 严格按照以下JSON格式输出,不添加任何其他文字说明: { "content_link": "原始链接地址", "功能名称": "具体功能名称", "功能描述": "功能用途和作用描述", "页面操作路径": "完整的页面操作路径" }""" # user_prompt = "请搜索 白瓜AI 官网" result = client.search_and_chat(user_prompt=user_prompt, search_strategy="agent") print("="*20 + "搜索结果" + "="*20) for web in result["search_results"]: print(f"[{web['index']}]: [{web['title']}]({web['url']})") print("="*20 + "回复内容" + "="*20) print(result["content"]) except Exception as e: print(f"错误: {e}")