content_identify.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286
  1. import os
  2. import json
  3. import time
  4. import sys
  5. import argparse
  6. from typing import Dict, Any, List, Optional
  7. from dotenv import load_dotenv
  8. # 导入自定义模块
  9. sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  10. from utils.fei_shu import FeiShu
  11. from coze.coze_hook import CozeHook
  12. class ContentIdentifier:
  13. def __init__(self, table_id: Optional[str] = None):
  14. # 加载环境变量
  15. load_dotenv()
  16. # 初始化飞书客户端
  17. self.feishu = FeiShu()
  18. # 初始化Coze客户端
  19. self.coze = CozeHook()
  20. # 获取表格ID:优先使用传入的参数,其次使用环境变量
  21. self.table_id = table_id or os.getenv('FEISHU_TABLE_ID')
  22. if not self.table_id:
  23. raise ValueError("请设置环境变量 FEISHU_TABLE_ID 或在运行时传入 table_id 参数")
  24. # 字段名称配置
  25. self.input_field = os.getenv('FEISHU_INPUT_FIELD', '抓取结果')
  26. self.output_field = os.getenv('FEISHU_OUTPUT_FIELD', '识别结果')
  27. def extract_content_from_record(self, record) -> Dict[str, Any]:
  28. """从飞书记录中提取内容"""
  29. fields = record.fields
  30. # 提取抓取结果
  31. crawl_result = fields.get(self.input_field, '')
  32. title = ''
  33. body_text = ''
  34. image_url_list = []
  35. # 解析抓取结果
  36. if crawl_result:
  37. if isinstance(crawl_result, list) and len(crawl_result) > 0:
  38. # 如果是数组格式,取第一个元素
  39. crawl_data = crawl_result[0]
  40. if isinstance(crawl_data, dict) and 'text' in crawl_data:
  41. try:
  42. # 解析JSON字符串
  43. json_data = json.loads(crawl_data['text'])
  44. # 提取标题
  45. title = json_data.get('title', '')
  46. # 提取正文内容
  47. body_text = json_data.get('body_text', '')
  48. # 提取图片链接
  49. image_data_list = json_data.get('image_url_list', [])
  50. for img_data in image_data_list:
  51. if isinstance(img_data, dict) and 'image_url' in img_data:
  52. image_url_list.append(img_data['image_url'])
  53. except json.JSONDecodeError as e:
  54. print(f"解析抓取结果JSON失败: {e}")
  55. # 如果解析失败,尝试直接使用文本内容
  56. if isinstance(crawl_data, dict) and 'text' in crawl_data:
  57. body_text = crawl_data['text']
  58. elif isinstance(crawl_result, str):
  59. # 如果是字符串格式,尝试直接解析
  60. try:
  61. json_data = json.loads(crawl_result)
  62. title = json_data.get('title', '')
  63. body_text = json_data.get('body_text', '')
  64. image_data_list = json_data.get('image_url_list', [])
  65. for img_data in image_data_list:
  66. if isinstance(img_data, dict) and 'image_url' in img_data:
  67. image_url_list.append(img_data['image_url'])
  68. except json.JSONDecodeError:
  69. body_text = crawl_result
  70. return {
  71. 'title': title,
  72. 'body_text': body_text,
  73. 'image_url_list': image_url_list,
  74. 'record_id': record.record_id
  75. }
  76. def call_coze_workflow(self, title: str, body_text: str, image_url_list: List[str]) -> Dict[str, Any]:
  77. """调用Coze工作流"""
  78. try:
  79. print(f"正在调用Coze工作流,标题: {title[:50]}...")
  80. response = self.coze.run(title, body_text, image_url_list)
  81. print("Coze工作流调用成功")
  82. return response
  83. except Exception as e:
  84. print(f"调用Coze工作流失败: {e}")
  85. return {"data": "{}"}
  86. def extract_coze_result(self, coze_response: Dict[str, Any]) -> Dict[str, str]:
  87. """
  88. 从API响应中提取images_comprehension、title、body_text字段
  89. """
  90. try:
  91. # 获取data字段
  92. data = coze_response.get("data")
  93. if not data:
  94. print("响应中没有data字段")
  95. return {"images_comprehension": "", "title": "", "body_text": ""}
  96. # 解析data字段(它是JSON字符串)
  97. if isinstance(data, str):
  98. try:
  99. data = json.loads(data)
  100. except json.JSONDecodeError as e:
  101. print(f"data字段JSON解析失败: {e}")
  102. return {"images_comprehension": "", "title": "", "body_text": ""}
  103. # 从解析后的data中提取字段
  104. extracted_fields = {
  105. "images_comprehension": data.get("images_comprehension", ""),
  106. "title": data.get("title", ""),
  107. "body_text": data.get("body_text", "")
  108. }
  109. return extracted_fields
  110. except Exception as e:
  111. print(f"提取Coze结果失败: {e}")
  112. return {"images_comprehension": "", "title": "", "body_text": ""}
  113. def update_feishu_record(self, record_id: str, result_dict: Dict[str, Any]):
  114. """更新飞书表格中的记录"""
  115. try:
  116. import lark_oapi as lark
  117. # 创建更新记录
  118. update_record = (lark.bitable.v1.AppTableRecord.builder()
  119. .record_id(record_id)
  120. .fields({
  121. self.output_field: json.dumps({
  122. 'images_comprehension': result_dict.get('images_comprehension', ''),
  123. 'title': result_dict.get('title', ''),
  124. 'body_text': result_dict.get('body_text', '')
  125. }, ensure_ascii=False)
  126. })
  127. .build())
  128. # 执行更新
  129. self.feishu.update_record(self.table_id, update_record)
  130. print(f"已更新记录 {record_id}")
  131. except Exception as e:
  132. print(f"更新飞书记录失败: {e}")
  133. def process_single_record(self, record) -> bool:
  134. """处理单条记录"""
  135. try:
  136. # 提取内容
  137. content = self.extract_content_from_record(record)
  138. # 检查是否已经有识别结果
  139. fields = record.fields
  140. existing_result = fields.get(self.output_field, '')
  141. # 如果已有识别结果,则跳过
  142. if existing_result and existing_result.strip():
  143. try:
  144. # 尝试解析JSON,如果成功说明已有有效结果
  145. json.loads(existing_result)
  146. print(f"记录 {record.record_id} 已有识别结果,跳过")
  147. return True
  148. except json.JSONDecodeError:
  149. # 如果JSON解析失败,说明可能是旧格式,继续处理
  150. pass
  151. # 检查是否有输入内容
  152. if not content['body_text'] or not content['body_text'].strip():
  153. print(f"记录 {record.record_id} 没有输入内容,跳过")
  154. return True
  155. print(f"处理记录 {record.record_id}")
  156. print(f"标题: {content['title'][:50]}...")
  157. print(f"内容长度: {len(content['body_text'])} 字符")
  158. print(f"图片数量: {len(content['image_url_list'])}")
  159. # 调用Coze工作流
  160. coze_response = self.call_coze_workflow(
  161. content['title'],
  162. content['body_text'],
  163. content['image_url_list']
  164. )
  165. # 提取结果
  166. result_dict = self.extract_coze_result(coze_response)
  167. # 更新飞书表格
  168. self.update_feishu_record(record.record_id, result_dict)
  169. # 添加延迟避免API限制
  170. time.sleep(1)
  171. return True
  172. except Exception as e:
  173. print(f"处理记录 {record.record_id} 失败: {e}")
  174. return False
  175. def process_all_records(self):
  176. """处理所有记录"""
  177. print(f"开始处理飞书表格 {self.table_id} 中的所有记录")
  178. page_token = None
  179. total_processed = 0
  180. total_success = 0
  181. while True:
  182. try:
  183. # 获取记录
  184. result = self.feishu.get_all_records(self.table_id, page_token)
  185. if not result.items:
  186. print("没有找到记录")
  187. break
  188. print(f"获取到 {len(result.items)} 条记录")
  189. # 处理每条记录
  190. for record in result.items:
  191. total_processed += 1
  192. if self.process_single_record(record):
  193. total_success += 1
  194. # 检查是否有下一页
  195. if not result.has_more:
  196. break
  197. page_token = result.page_token
  198. print(f"继续获取下一页,token: {page_token}")
  199. except Exception as e:
  200. print(f"获取记录失败: {e}")
  201. break
  202. print(f"处理完成!总共处理 {total_processed} 条记录,成功 {total_success} 条")
  203. def main():
  204. """主函数"""
  205. # 创建命令行参数解析器
  206. parser = argparse.ArgumentParser(description='内容识别脚本 - 处理飞书表格数据')
  207. parser.add_argument('table_id', nargs='?', help='飞书表格ID (可选,也可通过环境变量 FEISHU_TABLE_ID 设置)')
  208. parser.add_argument('--page-token', help='分页token,用于从指定位置开始处理')
  209. parser.add_argument('--dry-run', action='store_true', help='试运行模式,只显示会处理哪些记录,不实际调用API')
  210. args = parser.parse_args()
  211. try:
  212. # 创建内容识别器实例
  213. identifier = ContentIdentifier(table_id=args.table_id)
  214. print(f"使用表格ID: {identifier.table_id}")
  215. if args.dry_run:
  216. print("试运行模式:只显示会处理的记录,不实际调用API")
  217. # TODO: 实现试运行模式
  218. identifier.process_all_records()
  219. else:
  220. # 正常处理模式
  221. if args.page_token:
  222. print(f"从分页token开始处理: {args.page_token}")
  223. # TODO: 支持从指定分页token开始处理
  224. identifier.process_all_records()
  225. else:
  226. identifier.process_all_records()
  227. except Exception as e:
  228. print(f"程序执行失败: {e}")
  229. sys.exit(1)
  230. if __name__ == "__main__":
  231. main()