Prechádzať zdrojové kódy

Merge branch 'main' of https://git.yishihui.com/ai/knowledge

jihuaqiang 1 týždeň pred
rodič
commit
967dcec826
5 zmenil súbory, kde vykonal 395 pridanie a 297 odobranie
  1. 5 0
      .gitignore
  2. 270 156
      2_identify.py
  3. 75 116
      3_handle.py
  4. 3 1
      README.md
  5. 42 24
      utils/mysql_db.py

+ 5 - 0
.gitignore

@@ -286,4 +286,9 @@ poetry.toml
 # LSP config files
 pyrightconfig.json
 
+# custom
+myenv/
+
+
+
 # End of https://www.toptal.com/developers/gitignore/api/python,pycharm

+ 270 - 156
2_identify.py

@@ -1,172 +1,286 @@
-import config
-import logging
 import os
-from feishu_client import FeishuClient
-from coze_client import CozeClient
+import json
+import time
+import sys
+import argparse
+from typing import Dict, Any, List, Optional
+from dotenv import load_dotenv
 
-# 配置日志
-logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
+# 导入自定义模块
+sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
 
-def process_feishu_data_with_coze_flow(
-    feishu_app_id: str,
-    feishu_app_secret: str,
-    feishu_base_id: str,
-    feishu_table_id: str,
-    feishu_input_field_name: str,
-    feishu_output_field_name: str,
-    coze_api_key: str,
-    coze_bot_id: str,
-    coze_prompt_template: str,
-    max_records_to_process: int = 50, # 每次处理的记录数
-    overwrite_existing_output: bool = True # 是否覆盖已有的输出字段内容
-) -> None:
-    """
-    从飞书多维表格读取数据,调用Coze API进行处理,并将结果写入多维表格。
-    这是整个业务流程的协调函数。
+from utils.fei_shu import FeiShu
+from coze.coze_hook import CozeHook
 
-    Args:
-        feishu_app_id (str): 飞书应用的 App ID。
-        feishu_app_secret (str): 飞书应用的 App Secret。
-        feishu_base_id (str): 飞书多维表格的 Base ID (应用 Token)。
-        feishu_table_id (str): 多维表格中要操作的表的 Table ID。
-        feishu_input_field_name (str): 多维表格中用于输入给Coze的字段名称。
-        feishu_output_field_name (str): 多维表格中用于存储Coze返回结果的字段名称。
-        coze_api_key (str): Coze API 密钥。
-        coze_bot_id (str): Coze 机器人的 Bot ID。
-        coze_prompt_template (str): Coze API的提示模板字符串,需要包含 '{input_data}' 占位符。
-                                   例如: "请总结以下文本的关键信息: {input_data}"
-        max_records_to_process (int): 每次函数调用最多从飞书读取并处理的记录数。
-        overwrite_existing_output (bool): 如果输出字段已有内容,是否覆盖。True为覆盖,False为跳过。
-    """
-    logging.info("--- 🚀 开始执行飞书数据与Coze交互流程 🚀 ---")
 
-    try:
-        # 初始化客户端
-        feishu_client = FeishuClient(feishu_app_id, feishu_app_secret)
-        coze_client = CozeClient(coze_api_key)
-
-        # 1. 从飞书多维表格读取数据
-        logging.info("阶段 1/3: 从飞书多维表格读取数据...")
-        read_field_names = [feishu_input_field_name, feishu_output_field_name]
-        raw_records = feishu_client.read_records(
-            feishu_base_id,
-            feishu_table_id,
-            read_field_names,
-            page_size=max_records_to_process # 限制读取数量,防止一次性处理过多
-        )
-
-        records_to_process = []
-        for record in raw_records:
-            record_id = record.get("record_id")
-            fields = record.get("fields", {})
-            input_value = fields.get(feishu_input_field_name)
-            output_value_existing = fields.get(feishu_output_field_name)
-
-            # 筛选需要处理的记录
-            if input_value is None or str(input_value).strip() == "":
-                logging.debug(f"记录 {record_id} 的输入字段 '{feishu_input_field_name}' 为空,跳过。")
-                continue
-            if not overwrite_existing_output and output_value_existing is not None and str(output_value_existing).strip() != "":
-                logging.info(f"记录 {record_id} 的输出字段 '{feishu_output_field_name}' 已有内容且设置为不覆盖,跳过。")
-                continue
-            
-            records_to_process.append({
-                "record_id": record_id,
-                "input_data": input_value
-            })
+class ContentIdentifier:
+    def __init__(self, table_id: Optional[str] = None):
+        # 加载环境变量
+        load_dotenv()
         
-        if not records_to_process:
-            logging.info("没有符合条件的记录需要处理,流程结束。")
-            return
-
-        logging.info(f"共筛选出 {len(records_to_process)} 条记录待Coze处理。")
-
-        # 2. 调用 Coze API 处理数据
-        logging.info("阶段 2/3: 调用Coze API处理数据...")
-        updated_feishu_records = []
-        for i, record_info in enumerate(records_to_process):
-            record_id = record_info["record_id"]
-            input_data = record_info["input_data"]
-            
-            logging.info(f"正在处理第 {i+1}/{len(records_to_process)} 条记录 (ID: {record_id})...")
+        # 初始化飞书客户端
+        self.feishu = FeiShu()
+        
+        # 初始化Coze客户端
+        self.coze = CozeHook()
+        
+        # 获取表格ID:优先使用传入的参数,其次使用环境变量
+        self.table_id = table_id or os.getenv('FEISHU_TABLE_ID')
+        if not self.table_id:
+            raise ValueError("请设置环境变量 FEISHU_TABLE_ID 或在运行时传入 table_id 参数")
+        
+        # 字段名称配置
+        self.input_field = os.getenv('FEISHU_INPUT_FIELD', '抓取结果')
+        self.output_field = os.getenv('FEISHU_OUTPUT_FIELD', '识别结果')
+        
+    def extract_content_from_record(self, record) -> Dict[str, Any]:
+        """从飞书记录中提取内容"""
+        fields = record.fields
+        
+        # 提取抓取结果
+        crawl_result = fields.get(self.input_field, '')
+        title = ''
+        body_text = ''
+        image_url_list = []
+        
+        # 解析抓取结果
+        if crawl_result:
+            if isinstance(crawl_result, list) and len(crawl_result) > 0:
+                # 如果是数组格式,取第一个元素
+                crawl_data = crawl_result[0]
+                if isinstance(crawl_data, dict) and 'text' in crawl_data:
+                    try:
+                        # 解析JSON字符串
+                        json_data = json.loads(crawl_data['text'])
+                        
+                        # 提取标题
+                        title = json_data.get('title', '')
+                        
+                        # 提取正文内容
+                        body_text = json_data.get('body_text', '')
+                        
+                        # 提取图片链接
+                        image_data_list = json_data.get('image_url_list', [])
+                        for img_data in image_data_list:
+                            if isinstance(img_data, dict) and 'image_url' in img_data:
+                                image_url_list.append(img_data['image_url'])
+                        
+                    except json.JSONDecodeError as e:
+                        print(f"解析抓取结果JSON失败: {e}")
+                        # 如果解析失败,尝试直接使用文本内容
+                        if isinstance(crawl_data, dict) and 'text' in crawl_data:
+                            body_text = crawl_data['text']
+            elif isinstance(crawl_result, str):
+                # 如果是字符串格式,尝试直接解析
+                try:
+                    json_data = json.loads(crawl_result)
+                    title = json_data.get('title', '')
+                    body_text = json_data.get('body_text', '')
+                    image_data_list = json_data.get('image_url_list', [])
+                    for img_data in image_data_list:
+                        if isinstance(img_data, dict) and 'image_url' in img_data:
+                            image_url_list.append(img_data['image_url'])
+                except json.JSONDecodeError:
+                    body_text = crawl_result
+        
+        return {
+            'title': title,
+            'body_text': body_text,
+            'image_url_list': image_url_list,
+            'record_id': record.record_id
+        }
+    
+    def call_coze_workflow(self, title: str, body_text: str, image_url_list: List[str]) -> Dict[str, Any]:
+        """调用Coze工作流"""
+        try:
+            print(f"正在调用Coze工作流,标题: {title[:50]}...")
+            response = self.coze.run(title, body_text, image_url_list)
+            print("Coze工作流调用成功")
+            return response
+        except Exception as e:
+            print(f"调用Coze工作流失败: {e}")
+            return {"data": "{}"}
+    
+    def extract_coze_result(self, coze_response: Dict[str, Any]) -> Dict[str, str]:
+        """
+        从API响应中提取images_comprehension、title、body_text字段
+        """
+        try:
+            # 获取data字段
+            data = coze_response.get("data")
+            if not data:
+                print("响应中没有data字段")
+                return {"images_comprehension": "", "title": "", "body_text": ""}
+            
+            # 解析data字段(它是JSON字符串)
+            if isinstance(data, str):
+                try:
+                    data = json.loads(data)
+                except json.JSONDecodeError as e:
+                    print(f"data字段JSON解析失败: {e}")
+                    return {"images_comprehension": "", "title": "", "body_text": ""}
+            
+            # 从解析后的data中提取字段
+            extracted_fields = {
+                "images_comprehension": data.get("images_comprehension", ""),
+                "title": data.get("title", ""),
+                "body_text": data.get("body_text", "")
+            }
+            
+            return extracted_fields
+            
+        except Exception as e:
+            print(f"提取Coze结果失败: {e}")
+            return {"images_comprehension": "", "title": "", "body_text": ""}
+    
+    def update_feishu_record(self, record_id: str, result_dict: Dict[str, Any]):
+        """更新飞书表格中的记录"""
+        try:
+            import lark_oapi as lark
+                        # 创建更新记录
+            update_record = (lark.bitable.v1.AppTableRecord.builder()
+                           .record_id(record_id)
+                           .fields({
+                               self.output_field: json.dumps({
+                                    'images_comprehension': result_dict.get('images_comprehension', ''),
+                                    'title': result_dict.get('title', ''),
+                                    'body_text': result_dict.get('body_text', '')
+                                }, ensure_ascii=False)
+                           })
+                           .build())
+            
+            # 执行更新
+            self.feishu.update_record(self.table_id, update_record)
+            print(f"已更新记录 {record_id}")
+            
+        except Exception as e:
+            print(f"更新飞书记录失败: {e}")
+    
+    def process_single_record(self, record) -> bool:
+        """处理单条记录"""
+        try:
+            # 提取内容
+            content = self.extract_content_from_record(record)
+            
+            # 检查是否已经有识别结果
+            fields = record.fields
+            existing_result = fields.get(self.output_field, '')
+            
+            # 如果已有识别结果,则跳过
+            if existing_result and existing_result.strip():
+                try:
+                    # 尝试解析JSON,如果成功说明已有有效结果
+                    json.loads(existing_result)
+                    print(f"记录 {record.record_id} 已有识别结果,跳过")
+                    return True
+                except json.JSONDecodeError:
+                    # 如果JSON解析失败,说明可能是旧格式,继续处理
+                    pass
+            
+            # 检查是否有输入内容
+            if not content['body_text'] or not content['body_text'].strip():
+                print(f"记录 {record.record_id} 没有输入内容,跳过")
+                return True
+            
+            print(f"处理记录 {record.record_id}")
+            print(f"标题: {content['title'][:50]}...")
+            print(f"内容长度: {len(content['body_text'])} 字符")
+            print(f"图片数量: {len(content['image_url_list'])}")
+            
+            # 调用Coze工作流
+            coze_response = self.call_coze_workflow(
+                content['title'],
+                content['body_text'],
+                content['image_url_list']
+            )
+            
+            # 提取结果
+            result_dict = self.extract_coze_result(coze_response)
+            
+            # 更新飞书表格
+            self.update_feishu_record(record.record_id, result_dict)
+            
+            # 添加延迟避免API限制
+            time.sleep(1)
+            
+            return True
+            
+        except Exception as e:
+            print(f"处理记录 {record.record_id} 失败: {e}")
+            return False
+    
+    def process_all_records(self):
+        """处理所有记录"""
+        print(f"开始处理飞书表格 {self.table_id} 中的所有记录")
+        
+        page_token = None
+        total_processed = 0
+        total_success = 0
+        
+        while True:
             try:
-                coze_output = coze_client.send_message(
-                    coze_bot_id,
-                    coze_prompt_template,
-                    str(input_data) # 确保输入是字符串
-                )
+                # 获取记录
+                result = self.feishu.get_all_records(self.table_id, page_token)
+                
+                if not result.items:
+                    print("没有找到记录")
+                    break
+                
+                print(f"获取到 {len(result.items)} 条记录")
+                
+                # 处理每条记录
+                for record in result.items:
+                    total_processed += 1
+                    if self.process_single_record(record):
+                        total_success += 1
+                
+                # 检查是否有下一页
+                if not result.has_more:
+                    break
+                
+                page_token = result.page_token
+                print(f"继续获取下一页,token: {page_token}")
                 
-                if coze_output:
-                    updated_feishu_records.append({
-                        "record_id": record_id,
-                        "fields": {
-                            feishu_output_field_name: coze_output
-                        }
-                    })
-                else:
-                    logging.warning(f"Coze API 返回空结果给记录 {record_id},不更新此记录。")
-
             except Exception as e:
-                logging.error(f"处理记录 {record_id} 时调用Coze API失败: {e}。该记录将被跳过。")
-                # 可以在这里记录到单独的错误日志或错误字段
+                print(f"获取记录失败: {e}")
+                break
+        
+        print(f"处理完成!总共处理 {total_processed} 条记录,成功 {total_success} 条")
 
-        if not updated_feishu_records:
-            logging.info("没有记录成功通过 Coze API 处理并准备更新,无需写入飞书。流程结束。")
-            return
 
-        # 3. 将结果写入飞书多维表格
-        logging.info("阶段 3/3: 将处理结果写回飞书多维表格...")
-        feishu_client.update_records(
-            feishu_base_id,
-            feishu_table_id,
-            updated_feishu_records
-        )
+def main():
+    """主函数"""
+    # 创建命令行参数解析器
+    parser = argparse.ArgumentParser(description='内容识别脚本 - 处理飞书表格数据')
+    parser.add_argument('table_id', nargs='?', help='飞书表格ID (可选,也可通过环境变量 FEISHU_TABLE_ID 设置)')
+    parser.add_argument('--page-token', help='分页token,用于从指定位置开始处理')
+    parser.add_argument('--dry-run', action='store_true', help='试运行模式,只显示会处理哪些记录,不实际调用API')
+    
+    args = parser.parse_args()
+    
+    try:
+        # 创建内容识别器实例
+        identifier = ContentIdentifier(table_id=args.table_id)
         
-        logging.info("--- ✅ 流程执行完毕 ✅ ---")
-
+        print(f"使用表格ID: {identifier.table_id}")
+        
+        if args.dry_run:
+            print("试运行模式:只显示会处理的记录,不实际调用API")
+            # TODO: 实现试运行模式
+            identifier.process_all_records()
+        else:
+            # 正常处理模式
+            if args.page_token:
+                print(f"从分页token开始处理: {args.page_token}")
+                # TODO: 支持从指定分页token开始处理
+                identifier.process_all_records()
+            else:
+                identifier.process_all_records()
+                
     except Exception as e:
-        logging.critical(f"主流程执行过程中发生致命错误: {e}")
-        logging.critical("请检查配置信息、网络连接、API权限以及日志中的详细错误信息。")
+        print(f"程序执行失败: {e}")
+        sys.exit(1)
 
 
 if __name__ == "__main__":
-    # --- 环境变量/配置信息加载 ---
-    # 推荐使用环境变量加载敏感信息,而不是硬编码。
-    # 例如:export FEISHU_APP_ID="your_id"
-    # 或者从配置文件 (如 config.ini, .env 文件) 中加载
-
-    # 飞书配置
-    FEISHU_APP_ID = config.FEISHU_APP_ID
-    FEISHU_APP_SECRET = config.FEISHU_APP_SECRET
-    FEISHU_BASE_ID = config.FEISHU_BASE_ID
-    FEISHU_TABLE_ID = config.FEISHU_TABLE_ID
-
-    FEISHU_INPUT_FIELD = config.FEISHU_INPUT_FIELD # 你的飞书表格中用于输入的列名
-    FEISHU_OUTPUT_FIELD = config.FEISHU_OUTPUT_FIELD # 你的飞书表格中用于输出的列名
-
-    # Coze 配置
-    COZE_API_KEY = config.COZE_API_KEY
-    COZE_BOT_ID = config.COZE_BOT_ID # 例如: "7343685511394590740"
-
-    # Coze 提示模板,请确保包含 {input_data} 占位符
-    # 这是一个示例,你可以根据你的机器人功能设计更复杂的提示
-    COZE_PROMPT_TEMPLATE = os.getenv("COZE_PROMPT_TEMPLATE", "请作为一位专业的编辑,总结以下文章的核心内容,要求言简意赅,200字以内: {input_data}")
-
-    # --- 执行流程 ---
-    if "YOUR_" in FEISHU_APP_ID or "YOUR_" in COZE_API_KEY:
-        logging.error("⛔️ 请检查 main.py 或环境变量,确保所有 'YOUR_' 占位符都已替换为您的实际配置信息!⛔️")
-        logging.error("流程未执行。")
-    else:
-        process_feishu_data_with_coze_flow(
-            feishu_app_id=FEISHU_APP_ID,
-            feishu_app_secret=FEISHU_APP_SECRET,
-            feishu_base_id=FEISHU_BASE_ID,
-            feishu_table_id=FEISHU_TABLE_ID,
-            feishu_input_field_name=FEISHU_INPUT_FIELD,
-            feishu_output_field_name=FEISHU_OUTPUT_FIELD,
-            coze_api_key=COZE_API_KEY,
-            coze_bot_id=COZE_BOT_ID,
-            coze_prompt_template=COZE_PROMPT_TEMPLATE,
-            max_records_to_process=10, # 每次运行最多处理10条记录
-            overwrite_existing_output=True # 总是覆盖输出字段
-        )
+    main()

+ 75 - 116
3_handle.py

@@ -3,149 +3,108 @@ import json
 import time
 import sys
 import argparse
-from typing import Dict, Any, List, Optional
+from typing import Dict, Any, List, Optional, Tuple
 
 # 导入自定义模块
 sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
 
-from utils.fei_shu import FeiShu
+from utils.mysql_db import MysqlHelper
 from gemini import GeminiProcessor
 from utils.file import File
 
 
 class Handler:
-    def __init__(self, table_id: Optional[str] = None):
-        
-        # 初始化飞书客户端
-        self.feishu = FeiShu(file_token='VEBsbCfaWa3gF3slQILc6Rybnde')
+    def __init__(self):
+        # 初始化处理器
         self.processor = GeminiProcessor()
-        
-        
-        # 获取表格ID:优先使用传入的参数,其次使用环境变量
-        self.table_id = table_id
-        # 字段名称配置
-        self.input_field = '识别结果'
-        self.output_field = '初步理解'
-
         self.system_prompt = File.read_file('prompt/handle.md')
-
-        print(self.system_prompt)
-
-    def extract_content_from_record(self, record) -> Dict[str, Any]:
-        """从飞书记录中提取内容"""
-        fields = record.fields
-        
-        # 提取识别结果
-        result = fields.get(self.input_field, [])
-
-        return ''.join([item['text'] for item in result])
-        
-    
-    def update_feishu_record(self, record_id: str, content: str):
-        """更新飞书表格中的记录"""
-        try:
-            import lark_oapi as lark
-                        # 创建更新记录
-            update_record = (lark.bitable.v1.AppTableRecord.builder()
-                           .record_id(record_id)
-                           .fields({
-                               self.output_field: content
-                           })
-                           .build())
-            
-            # 执行更新
-            self.feishu.update_record(self.table_id, update_record)
-            print(f"已更新记录 {record_id}")
-            
-        except Exception as e:
-            print(f"更新飞书记录失败: {e}")
     
-    def process_single_record(self, record) -> bool:
-        """处理单条记录"""
-        try:
-            # 提取内容
-            content = self.extract_content_from_record(record)
-
-            # 检查是否有输入内容
-            if not content.strip() :
-                print(f"记录 {record.record_id} 没有输入内容,跳过")
-                return True
-
-            result = self.processor.process(content, self.system_prompt)
-
-            
-            # 更新飞书表格
-            self.update_feishu_record(record.record_id, result)
-
-            
-            # 添加延迟避免API限制
-            time.sleep(1)
-            
-            return True
+    def build_query_conditions(self, query_word: Optional[str], 
+                             source_type: Optional[str], 
+                             source_channel: Optional[str]) -> Tuple[str, Tuple]:
+        """构建查询条件和参数"""
+        conditions = ["formatted_content is not null", "multimodal_recognition is null"]
+        params = []
+        
+        if query_word is not None:
+            conditions.append("query_word = %s")
+            params.append(query_word)
+        if source_type is not None:
+            conditions.append("source_type = %s")
+            params.append(source_type)
+        if source_channel is not None:
+            conditions.append("source_channel = %s")
+            params.append(source_channel)
             
-        except Exception as e:
-            print(f"处理记录 {record.record_id} 失败: {e}")
-            return False
+        where_clause = " AND ".join(conditions)
+        return where_clause, tuple(params)
     
-    def process_all_records(self):
+    def process_all_records(self, query_word: Optional[str], 
+                           source_type: Optional[str], 
+                           source_channel: Optional[str]):
         """处理所有记录"""
-        print(f"开始处理飞书表格 {self.table_id} 中的所有记录")
-        
-        page_token = None
         total_processed = 0
         total_success = 0
         
-        while True:
-            try:
-                # 获取记录
-                result = self.feishu.get_all_records(self.table_id, page_token)
-                
-                if not result.items:
-                    print("没有找到记录")
-                    break
-                
-                print(f"获取到 {len(result.items)} 条记录")
-                
-                # 处理每条记录
-                for record in result.items:
-                    total_processed += 1
-                    if self.process_single_record(record):
-                        total_success += 1
-                
-                # 检查是否有下一页
-                if not result.has_more:
-                    break
-                
-                page_token = result.page_token
-                print(f"继续获取下一页,token: {page_token}")
-                
-            except Exception as e:
-                print(f"获取记录失败: {e}")
-                break
-        
-        print(f"处理完成!总共处理 {total_processed} 条记录,成功 {total_success} 条")
+        try:
+            # 构建查询条件和参数
+            where_clause, params = self.build_query_conditions(query_word, source_type, source_channel)
+            sql = f"""
+                SELECT id, formatted_content 
+                FROM knowledge_search_content 
+                WHERE {where_clause}
+            """
+            
+            # 查询记录
+            records = MysqlHelper.get_values(sql, params)
+            print(f"获取到 {len(records)} 条记录")
+            
+            # 处理每条记录
+            for row in records:
+                total_processed += 1
+                try:
+                    # 处理内容
+                    result = self.processor.process(row[1], self.system_prompt)
+                    
+                    # 更新数据库
+                    update_sql = """
+                        UPDATE knowledge_search_content 
+                        SET multimodal_recognition = %s 
+                        WHERE id = %s
+                    """
+                    MysqlHelper.update_values(update_sql, (result, row[0]))
+                    
+                    # 添加延迟避免API限制
+                    time.sleep(1)
+                    total_success += 1
+                    
+                except Exception as e:
+                    print(f"处理记录 {row[0]} 失败: {str(e)}")
+                    
+        except Exception as e:
+            print(f"处理过程中发生错误: {str(e)}")
+        finally:
+            print(f"处理完成!总共处理 {total_processed} 条记录,成功 {total_success} 条")
 
 
 def main():
     """主函数"""
-    # 创建命令行参数解析器
-    parser = argparse.ArgumentParser(description='内容识别脚本 - 处理飞书表格数据')
-    parser.add_argument('--table_id', nargs='?', help='飞书表格ID (可选,也可通过环境变量 FEISHU_TABLE_ID 设置)')
-    parser.add_argument('--page-token', help='分页token,用于从指定位置开始处理')
-    parser.add_argument('--dry-run', action='store_true', help='试运行模式,只显示会处理哪些记录,不实际调用API')
+    parser = argparse.ArgumentParser(description='内容识别脚本')
+    parser.add_argument('--query_word', default=None, help='query词')
+    parser.add_argument('--source_type', default=None, help='数据源类型')
+    parser.add_argument('--source_channel', default=None, help='数据源渠道')
     
     args = parser.parse_args()
     
     try:
-        # 创建内容识别器实例
-        hadnler = Handler(table_id=args.table_id)
-        
-        print(f"使用表格ID: {hadnler.table_id}")
-        
-        hadnler.process_all_records()
-                
+        handler = Handler()
+        handler.process_all_records(
+            query_word=args.query_word, 
+            source_type=args.source_type, 
+            source_channel=args.source_channel
+        )
     except Exception as e:
-        print(f"程序执行失败: {e}")
+        print(f"程序执行失败: {str(e)}")
         sys.exit(1)
 
 

+ 3 - 1
README.md

@@ -2,4 +2,6 @@ python3 -m venv myenv
 
 source myenv/bin/activate
 
-pip install -r requirements.txt
+pip install -r requirements.txt
+
+

+ 42 - 24
utils/mysql_db.py

@@ -49,32 +49,50 @@ class MysqlHelper:
         except Exception as e:
             print(f"get_values异常:{e}\n")
 
-    @classmethod
-    def update_values(cls, sql, params=None):
-        # 连接数据库
+@classmethod
+def update_values(cls, sql, params=None):
+    """
+    执行更新操作(INSERT/UPDATE/DELETE)
+    
+    参数:
+        sql: 要执行的SQL语句
+        params: SQL参数(可选,元组或字典)
+    
+    返回:
+        成功时返回影响的行数,失败返回None
+    """
+    connect = None
+    cursor = None
+    
+    try:
         connect = cls.connect_mysql()
-        # 返回一个 Cursor对象
-        mysql = connect.cursor()
-        try:
-            # 执行 sql 语句
-            if params:
-                res = mysql.execute(sql, params)
-            else:
-                res = mysql.execute(sql)
-            # 注意 一定要commit,否则添加数据不生效
-            connect.commit()
-            return res
-        except Exception as e:
-            # 发生错误时回滚
-            logger.error(f"Error executing SQL: {e}")
+        cursor = connect.cursor()
+        
+        # 执行SQL语句
+        if params:
+            affected_rows = cursor.execute(sql, params)
+        else:
+            affected_rows = cursor.execute(sql)
+        
+        connect.commit()
+        return affected_rows
+        
+    except Exception as e:
+        logger.error(f"SQL执行失败: {e}")
+        logger.error(f"SQL语句: {sql}")
+        if params:
+            logger.error(f"参数: {params}")
+        
+        if connect:
             connect.rollback()
-            return None  # 返回 None 表示失败
-        finally:
-            # 确保资源关闭
-            if mysql:
-                mysql.close()
-            if connect:
-                connect.close()
+        return None
+        
+    finally:
+        # 确保资源关闭
+        if cursor:
+            cursor.close()
+        if connect:
+            connect.close()