import os import json import time import sys import argparse from typing import Dict, Any, List, Optional from dotenv import load_dotenv # 导入自定义模块 sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from utils.fei_shu import FeiShu from coze.coze_hook import CozeHook class ContentIdentifier: def __init__(self, table_id: Optional[str] = None): # 加载环境变量 load_dotenv() # 初始化飞书客户端 self.feishu = FeiShu() # 初始化Coze客户端 self.coze = CozeHook() # 获取表格ID:优先使用传入的参数,其次使用环境变量 self.table_id = table_id or os.getenv('FEISHU_TABLE_ID') if not self.table_id: raise ValueError("请设置环境变量 FEISHU_TABLE_ID 或在运行时传入 table_id 参数") # 字段名称配置 self.input_field = os.getenv('FEISHU_INPUT_FIELD', '抓取结果') self.output_field = os.getenv('FEISHU_OUTPUT_FIELD', '识别结果') def extract_content_from_record(self, record) -> Dict[str, Any]: """从飞书记录中提取内容""" fields = record.fields # 提取抓取结果 crawl_result = fields.get(self.input_field, '') title = '' body_text = '' image_url_list = [] # 解析抓取结果 if crawl_result: if isinstance(crawl_result, list) and len(crawl_result) > 0: # 如果是数组格式,取第一个元素 crawl_data = crawl_result[0] if isinstance(crawl_data, dict) and 'text' in crawl_data: try: # 解析JSON字符串 json_data = json.loads(crawl_data['text']) # 提取标题 title = json_data.get('title', '') # 提取正文内容 body_text = json_data.get('body_text', '') # 提取图片链接 image_data_list = json_data.get('image_url_list', []) for img_data in image_data_list: if isinstance(img_data, dict) and 'image_url' in img_data: image_url_list.append(img_data['image_url']) except json.JSONDecodeError as e: print(f"解析抓取结果JSON失败: {e}") # 如果解析失败,尝试直接使用文本内容 if isinstance(crawl_data, dict) and 'text' in crawl_data: body_text = crawl_data['text'] elif isinstance(crawl_result, str): # 如果是字符串格式,尝试直接解析 try: json_data = json.loads(crawl_result) title = json_data.get('title', '') body_text = json_data.get('body_text', '') image_data_list = json_data.get('image_url_list', []) for img_data in image_data_list: if isinstance(img_data, dict) and 'image_url' in img_data: image_url_list.append(img_data['image_url']) except json.JSONDecodeError: body_text = crawl_result return { 'title': title, 'body_text': body_text, 'image_url_list': image_url_list, 'record_id': record.record_id } def call_coze_workflow(self, title: str, body_text: str, image_url_list: List[str]) -> Dict[str, Any]: """调用Coze工作流""" try: print(f"正在调用Coze工作流,标题: {title[:50]}...") response = self.coze.run(title, body_text, image_url_list) print("Coze工作流调用成功") return response except Exception as e: print(f"调用Coze工作流失败: {e}") return {"data": "{}"} def extract_coze_result(self, coze_response: Dict[str, Any]) -> Dict[str, str]: """ 从API响应中提取images_comprehension、title、body_text字段 """ try: # 获取data字段 data = coze_response.get("data") if not data: print("响应中没有data字段") return {"images_comprehension": "", "title": "", "body_text": ""} # 解析data字段(它是JSON字符串) if isinstance(data, str): try: data = json.loads(data) except json.JSONDecodeError as e: print(f"data字段JSON解析失败: {e}") return {"images_comprehension": "", "title": "", "body_text": ""} # 从解析后的data中提取字段 extracted_fields = { "images_comprehension": data.get("images_comprehension", ""), "title": data.get("title", ""), "body_text": data.get("body_text", "") } return extracted_fields except Exception as e: print(f"提取Coze结果失败: {e}") return {"images_comprehension": "", "title": "", "body_text": ""} def update_feishu_record(self, record_id: str, result_dict: Dict[str, Any]): """更新飞书表格中的记录""" try: import lark_oapi as lark # 创建更新记录 update_record = (lark.bitable.v1.AppTableRecord.builder() .record_id(record_id) .fields({ self.output_field: json.dumps({ 'images_comprehension': result_dict.get('images_comprehension', ''), 'title': result_dict.get('title', ''), 'body_text': result_dict.get('body_text', '') }, ensure_ascii=False) }) .build()) # 执行更新 self.feishu.update_record(self.table_id, update_record) print(f"已更新记录 {record_id}") except Exception as e: print(f"更新飞书记录失败: {e}") def process_single_record(self, record) -> bool: """处理单条记录""" try: # 提取内容 content = self.extract_content_from_record(record) # 检查是否已经有识别结果 fields = record.fields existing_result = fields.get(self.output_field, '') # 如果已有识别结果,则跳过 if existing_result and existing_result.strip(): try: # 尝试解析JSON,如果成功说明已有有效结果 json.loads(existing_result) print(f"记录 {record.record_id} 已有识别结果,跳过") return True except json.JSONDecodeError: # 如果JSON解析失败,说明可能是旧格式,继续处理 pass # 检查是否有输入内容 if not content['body_text'] or not content['body_text'].strip(): print(f"记录 {record.record_id} 没有输入内容,跳过") return True print(f"处理记录 {record.record_id}") print(f"标题: {content['title'][:50]}...") print(f"内容长度: {len(content['body_text'])} 字符") print(f"图片数量: {len(content['image_url_list'])}") # 调用Coze工作流 coze_response = self.call_coze_workflow( content['title'], content['body_text'], content['image_url_list'] ) # 提取结果 result_dict = self.extract_coze_result(coze_response) # 更新飞书表格 self.update_feishu_record(record.record_id, result_dict) # 添加延迟避免API限制 time.sleep(1) return True except Exception as e: print(f"处理记录 {record.record_id} 失败: {e}") return False def process_all_records(self): """处理所有记录""" print(f"开始处理飞书表格 {self.table_id} 中的所有记录") page_token = None total_processed = 0 total_success = 0 while True: try: # 获取记录 result = self.feishu.get_all_records(self.table_id, page_token) if not result.items: print("没有找到记录") break print(f"获取到 {len(result.items)} 条记录") # 处理每条记录 for record in result.items: total_processed += 1 if self.process_single_record(record): total_success += 1 # 检查是否有下一页 if not result.has_more: break page_token = result.page_token print(f"继续获取下一页,token: {page_token}") except Exception as e: print(f"获取记录失败: {e}") break print(f"处理完成!总共处理 {total_processed} 条记录,成功 {total_success} 条") def main(): """主函数""" # 创建命令行参数解析器 parser = argparse.ArgumentParser(description='内容识别脚本 - 处理飞书表格数据') parser.add_argument('table_id', nargs='?', help='飞书表格ID (可选,也可通过环境变量 FEISHU_TABLE_ID 设置)') parser.add_argument('--page-token', help='分页token,用于从指定位置开始处理') parser.add_argument('--dry-run', action='store_true', help='试运行模式,只显示会处理哪些记录,不实际调用API') args = parser.parse_args() try: # 创建内容识别器实例 identifier = ContentIdentifier(table_id=args.table_id) print(f"使用表格ID: {identifier.table_id}") if args.dry_run: print("试运行模式:只显示会处理的记录,不实际调用API") # TODO: 实现试运行模式 identifier.process_all_records() else: # 正常处理模式 if args.page_token: print(f"从分页token开始处理: {args.page_token}") # TODO: 支持从指定分页token开始处理 identifier.process_all_records() else: identifier.process_all_records() except Exception as e: print(f"程序执行失败: {e}") sys.exit(1) if __name__ == "__main__": main()