# 读取飞书表格的抓取结果字段,取出body_text和title,替换识别结果中的body_text和title # # 功能说明: # 1. 读取飞书表格中的"抓取结果"字段 # 2. 从抓取结果中提取body_text和title # 3. 读取"识别结果"字段中的现有内容 # 4. 用抓取结果中的body_text和title替换识别结果中的相应字段 # 5. 保持识别结果中的images_comprehension字段不变 # 6. 更新飞书表格中的识别结果字段 # # 使用方法: # 1. 设置环境变量: # - FEISHU_APP_ID: 飞书应用ID # - FEISHU_APP_SECRET: 飞书应用密钥 # - FEISHU_FILE_TOKEN: 飞书文件Token # - FEISHU_TABLE_ID: 飞书表格ID (可选,也可在运行时传入) # - FEISHU_CRAWL_FIELD: 抓取结果字段名 (默认: '抓取结果') # - FEISHU_IDENTIFY_FIELD: 识别结果字段名 (默认: '识别结果') # # 2. 运行脚本: # python fit_content.py [table_id] [--dry-run] # # 示例: # python fit_content.py tblNdje7z6Cf3hax # 正常模式 # python fit_content.py tblNdje7z6Cf3hax --dry-run # 试运行模式 # python fit_content.py --dry-run # 使用环境变量中的表格ID,试运行模式 # # 注意事项: # - 试运行模式会显示将要处理的内容,但不会实际更新飞书表格 # - 脚本会自动处理分页,支持大量数据 # - 如果抓取结果或识别结果解析失败,会跳过该记录并继续处理其他记录 import json import os import sys from typing import Dict, Any, List, Optional from dotenv import load_dotenv # 导入自定义模块 sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from utils.fei_shu import FeiShu class ContentFitter: def __init__(self, table_id: Optional[str] = None): # 加载环境变量 load_dotenv() # 初始化飞书客户端 self.feishu = FeiShu() # 获取表格ID:优先使用传入的参数,其次使用环境变量 self.table_id = table_id or os.getenv('FEISHU_TABLE_ID') if not self.table_id: raise ValueError("请设置环境变量 FEISHU_TABLE_ID 或在运行时传入 table_id 参数") # 字段名称配置 self.crawl_field = os.getenv('FEISHU_CRAWL_FIELD', '抓取结果') self.identify_field = os.getenv('FEISHU_IDENTIFY_FIELD', '识别结果') def extract_crawl_content(self, crawl_result) -> Dict[str, str]: """从抓取结果中提取body_text和title""" title = '' body_text = '' if not crawl_result: return {'title': title, 'body_text': body_text} try: # 如果是字符串格式,尝试直接解析 if isinstance(crawl_result, str): json_data = json.loads(crawl_result) elif isinstance(crawl_result, list) and len(crawl_result) > 0: # 如果是数组格式,取第一个元素 crawl_data = crawl_result[0] if isinstance(crawl_data, dict): if 'text' in crawl_data: # 如果crawl_data是包含text字段的字典 json_data = json.loads(crawl_data['text']) else: # 如果crawl_data是直接的字典数据 json_data = crawl_data else: # 如果crawl_data不是字典,尝试直接解析 json_data = crawl_data elif isinstance(crawl_result, dict): # 如果crawl_result本身就是字典 json_data = crawl_result else: # 其他情况,尝试直接使用 json_data = crawl_result # 确保json_data是字典类型 if not isinstance(json_data, dict): print(f"抓取结果格式不正确,期望字典类型,实际类型: {type(json_data)}") return {'title': title, 'body_text': body_text} # 提取标题和正文内容 title = json_data.get('title', '') body_text = json_data.get('body_text', '') except (json.JSONDecodeError, KeyError, TypeError, AttributeError) as e: print(f"解析抓取结果失败: {e}") # 如果解析失败,尝试直接使用文本内容 if isinstance(crawl_result, str): body_text = crawl_result elif isinstance(crawl_result, list) and len(crawl_result) > 0: # 如果是列表,尝试将第一个元素转为字符串 body_text = str(crawl_result[0]) return {'title': title, 'body_text': body_text} def extract_identify_content(self, identify_result) -> Dict[str, Any]: """从识别结果中提取现有内容""" images_comprehension = [] title = '' body_text = '' if not identify_result: print(f" 调试: identify_result为空") return {'images_comprehension': images_comprehension, 'title': title, 'body_text': body_text} print(f" 调试: identify_result类型: {type(identify_result)}") print(f" 调试: identify_result内容前100字符: {identify_result[:100]}...") try: # 如果是字符串格式,尝试解析JSON if isinstance(identify_result, str): print(f" 调试: 尝试解析字符串格式的identify_result") json_data = self.safe_json_loads(identify_result) if json_data is None: print(f" 调试: JSON解析失败,返回空结果") return {'images_comprehension': images_comprehension, 'title': title, 'body_text': body_text} elif isinstance(identify_result, dict): print(f" 调试: identify_result本身就是字典") json_data = identify_result elif isinstance(identify_result, list) and len(identify_result) > 0: print(f" 调试: identify_result是列表,合并所有元素的内容") # 合并列表中所有元素的text字段 combined_text = "" for i, item in enumerate(identify_result): if isinstance(item, dict) and 'text' in item: combined_text += item['text'] print(f" 调试: 合并第{i+1}个元素的text,当前长度: {len(combined_text)}") print(f" 调试: 合并后的文本长度: {len(combined_text)}") if combined_text: json_data = self.safe_json_loads(combined_text) if json_data is None: print(f" 调试: 合并后JSON解析失败") return {'images_comprehension': images_comprehension, 'title': title, 'body_text': body_text} else: print(f" 调试: 没有找到text字段") return {'images_comprehension': images_comprehension, 'title': title, 'body_text': body_text} else: print(f" 调试: identify_result是其他类型: {type(identify_result)}") json_data = identify_result # 确保json_data是字典类型 if not isinstance(json_data, dict): print(f"识别结果格式不正确,期望字典类型,实际类型: {type(json_data)}") return {'images_comprehension': images_comprehension, 'title': title, 'body_text': body_text} print(f" 调试: json_data键: {list(json_data.keys())}") # 检查是否有text字段,如果有,尝试解析其中的JSON if 'text' in json_data and isinstance(json_data['text'], str): print(f" 调试: 发现text字段,尝试解析其中的JSON") text_content = self.safe_json_loads(json_data['text']) if text_content and isinstance(text_content, dict): print(f" 调试: text字段解析成功,键: {list(text_content.keys())}") # 从text_content中提取字段 images_comprehension = text_content.get('images_comprehension', []) title = text_content.get('title', '') body_text = text_content.get('body_text', '') else: print(f" 调试: text字段解析失败或不是字典") # 如果text字段解析失败,尝试直接提取images_comprehension数组 print(f" 调试: 尝试直接从text字段中提取images_comprehension数组") images_comprehension = self.extract_images_comprehension_from_text(json_data['text']) title = json_data.get('title', '') body_text = json_data.get('body_text', '') else: # 直接从json_data中提取字段 images_comprehension = json_data.get('images_comprehension', []) title = json_data.get('title', '') body_text = json_data.get('body_text', '') print(f" 调试: 提取的images_comprehension类型: {type(images_comprehension)}, 值: {images_comprehension}") # 确保images_comprehension是列表格式 if not isinstance(images_comprehension, list): if isinstance(images_comprehension, str): # 如果是字符串,尝试解析为列表 try: print(f" 调试: images_comprehension是字符串,尝试解析为列表") images_comprehension = json.loads(images_comprehension) if not isinstance(images_comprehension, list): images_comprehension = [] except (json.JSONDecodeError, TypeError): print(f" 调试: 解析images_comprehension字符串失败") images_comprehension = [] else: print(f" 调试: images_comprehension不是列表也不是字符串,设置为空列表") images_comprehension = [] # 调试信息:打印images_comprehension的结构 if images_comprehension: print(f" 调试: images_comprehension类型: {type(images_comprehension)}, 长度: {len(images_comprehension)}") if len(images_comprehension) > 0: print(f" 调试: 第一个元素类型: {type(images_comprehension[0])}") if isinstance(images_comprehension[0], dict): print(f" 调试: 第一个元素键: {list(images_comprehension[0].keys())}") else: print(f" 调试: images_comprehension为空") except (json.JSONDecodeError, KeyError, TypeError, AttributeError) as e: print(f"解析识别结果失败: {e}") print(f" 调试: identify_result类型: {type(identify_result)}") if isinstance(identify_result, str): print(f" 调试: identify_result内容前100字符: {identify_result[:100]}...") return {'images_comprehension': images_comprehension, 'title': title, 'body_text': body_text} def merge_content(self, crawl_content: Dict[str, str], identify_content: Dict[str, Any]) -> Dict[str, Any]: """合并抓取内容和识别内容,用抓取内容替换识别内容中的title和body_text""" return { 'images_comprehension': identify_content.get('images_comprehension', []), # 保持数组格式 'title': crawl_content.get('title', ''), # 使用抓取结果的title 'body_text': crawl_content.get('body_text', '') # 使用抓取结果的body_text } def update_feishu_record(self, record_id: str, merged_content: Dict[str, Any]): """更新飞书表格中的记录""" try: import lark_oapi as lark # 创建更新记录 update_record = (lark.bitable.v1.AppTableRecord.builder() .record_id(record_id) .fields({ self.identify_field: json.dumps(merged_content, ensure_ascii=False) }) .build()) # 执行更新 self.feishu.update_record(self.table_id, update_record) print(f"已更新记录 {record_id}") except Exception as e: print(f"更新飞书记录失败: {e}") def process_single_record(self, record, dry_run: bool = False) -> bool: """处理单条记录""" try: fields = record.fields # 提取抓取结果 crawl_result = fields.get(self.crawl_field, '') if not crawl_result: print(f"记录 {record.record_id} 没有抓取结果,跳过") return True # 提取识别结果 identify_result = fields.get(self.identify_field, '') print(f" 调试: 原始identify_result类型: {type(identify_result)}") if isinstance(identify_result, str): print(f" 调试: 原始identify_result内容前200字符: {identify_result[:200]}...") # 从抓取结果中提取title和body_text crawl_content = self.extract_crawl_content(crawl_result) # 从识别结果中提取现有内容 identify_content = self.extract_identify_content(identify_result) # 合并内容,用抓取结果替换识别结果中的title和body_text merged_content = self.merge_content(crawl_content, identify_content) print(f"处理记录 {record.record_id}") print(f" 抓取结果 - 标题: {crawl_content['title'][:50] if crawl_content['title'] else '无标题'}...") print(f" 抓取结果 - 内容长度: {len(crawl_content['body_text'])} 字符") # 处理images_comprehension的打印 images_comp = identify_content['images_comprehension'] if isinstance(images_comp, list) and len(images_comp) > 0: # 显示第一个元素的内容预览 first_item = images_comp[0] if isinstance(first_item, dict): content_preview = first_item.get('content', '')[:50] if first_item.get('content') else '无内容' print(f" 识别结果 - 图片理解: [{len(images_comp)}项] 第一项内容: {content_preview}...") else: images_comp_text = str(first_item)[:50] + "..." if len(str(first_item)) > 50 else str(first_item) print(f" 识别结果 - 图片理解: [{len(images_comp)}项] {images_comp_text}") else: print(f" 识别结果 - 图片理解: 无图片理解") if not dry_run: # 更新飞书表格 self.update_feishu_record(record.record_id, merged_content) else: print(f" [试运行] 将更新识别结果字段,新内容: {json.dumps(merged_content, ensure_ascii=False)[:100]}...") return True except Exception as e: print(f"处理记录 {record.record_id} 失败: {e}") return False def process_all_records(self, dry_run: bool = False): """处理所有记录""" mode_text = "试运行模式" if dry_run else "正常模式" print(f"开始处理飞书表格 {self.table_id} 中的所有记录 ({mode_text})") page_token = None total_processed = 0 total_success = 0 while True: try: # 获取记录 result = self.feishu.get_all_records(self.table_id, page_token) if not result.items: print("没有找到记录") break print(f"获取到 {len(result.items)} 条记录") # 处理每条记录 for record in result.items: total_processed += 1 if self.process_single_record(record, dry_run): total_success += 1 # 检查是否有下一页 if not result.has_more: break page_token = result.page_token print(f"继续获取下一页,token: {page_token}") except Exception as e: print(f"获取记录失败: {e}") break print(f"处理完成!总共处理 {total_processed} 条记录,成功 {total_success} 条") def safe_json_loads(self, json_str: str) -> Any: """安全地解析JSON字符串,处理可能的语法错误""" if not isinstance(json_str, str): return json_str try: return json.loads(json_str) except json.JSONDecodeError as e: print(f" 调试: JSON解析失败: {e}") # 尝试修复常见的JSON语法错误 try: # 移除多余的逗号 fixed_json = json_str.replace(',,', ',') # 移除末尾的逗号 fixed_json = fixed_json.rstrip(',') # 移除末尾的多个逗号 while fixed_json.endswith(',}'): fixed_json = fixed_json[:-2] + '}' while fixed_json.endswith(',]'): fixed_json = fixed_json[:-2] + ']' # 尝试修复未终止的字符串 if 'Unterminated string' in str(e): print(f" 调试: 检测到未终止的字符串,尝试修复") # 查找最后一个完整的JSON对象 import re # 查找匹配的大括号 brace_count = 0 end_pos = -1 for i, char in enumerate(fixed_json): if char == '{': brace_count += 1 elif char == '}': brace_count -= 1 if brace_count == 0: end_pos = i break if end_pos > 0: fixed_json = fixed_json[:end_pos + 1] print(f" 调试: 截取到位置 {end_pos + 1}") return json.loads(fixed_json) except json.JSONDecodeError: print(f" 调试: 修复JSON后仍然解析失败") # 尝试更激进的修复 try: # 如果还是失败,尝试找到最后一个有效的JSON对象 import re # 查找最后一个完整的JSON对象 pattern = r'\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}' matches = re.findall(pattern, json_str) if matches: last_match = matches[-1] return json.loads(last_match) except: pass # 最后的尝试:手动构建JSON对象 try: print(f" 调试: 尝试手动提取关键字段") # 尝试提取images_comprehension字段 import re # 查找images_comprehension数组的开始 pattern = r'"images_comprehension":\s*\[(.*?)\]' match = re.search(pattern, json_str, re.DOTALL) if match: array_content = match.group(1) # 尝试解析数组内容 try: # 构建一个简单的JSON对象 simple_json = f'{{"images_comprehension": [{array_content}]}}' return json.loads(simple_json) except: pass except: pass return None def extract_images_comprehension_from_text(self, text: str) -> list: """直接从文本中提取images_comprehension数组""" try: import re # 查找images_comprehension数组的开始和结束 pattern = r'"images_comprehension":\s*\[(.*?)\]' match = re.search(pattern, text, re.DOTALL) if match: array_content = match.group(1) print(f" 调试: 找到images_comprehension数组内容,长度: {len(array_content)}") # 尝试解析数组内容 try: # 构建一个简单的JSON对象 simple_json = f'{{"images_comprehension": [{array_content}]}}' result = json.loads(simple_json) return result.get('images_comprehension', []) except json.JSONDecodeError as e: print(f" 调试: 解析数组内容失败: {e}") # 尝试手动解析数组中的对象 try: # 查找数组中的每个对象 object_pattern = r'\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}' objects = re.findall(object_pattern, array_content) print(f" 调试: 找到 {len(objects)} 个对象") parsed_objects = [] for obj_str in objects: try: obj = json.loads(obj_str) parsed_objects.append(obj) except: # 如果单个对象解析失败,跳过 continue return parsed_objects except Exception as e2: print(f" 调试: 手动解析对象失败: {e2}") return [] else: print(f" 调试: 未找到images_comprehension数组") return [] except Exception as e: print(f" 调试: 提取images_comprehension失败: {e}") return [] def main(): """主函数""" import argparse parser = argparse.ArgumentParser(description='读取飞书表格抓取结果,替换识别结果中的body_text和title') parser.add_argument('table_id', nargs='?', help='飞书表格ID') parser.add_argument('--dry-run', action='store_true', help='试运行模式,只显示会处理的记录,不实际更新') args = parser.parse_args() try: # 创建ContentFitter实例 fitter = ContentFitter(args.table_id) # 处理所有记录 fitter.process_all_records(dry_run=args.dry_run) except Exception as e: print(f"程序执行失败: {e}") sys.exit(1) if __name__ == '__main__': main()