import json from typing import List, Any, Optional from simpleeval import simple_eval from client.CrawlerClient import CrawlerClient from model.automation_provide_job import DouYinSearchConfig, ChannelSearchAndDetailDTO, SearchFilterConfigItem from util.automation_provide_util import AutoProvideUtil crawler_client = CrawlerClient() preFilterThreshold = 3 result_txt_file = '/Users/zhao/Desktop/tzld/文档/分析文档/关键词分析.txt' def write_result_file(content, mode='a+'): with open(result_txt_file, mode) as f: f.write(content) f.write("\n") def log_info_print_title(): write_result_file( "视频ID,品类,关键词,爬取计划,结果,原因,搜索使用的账号ID,排序方式,站外视频ID,站外账号ID,过滤结果,分享量,点赞量,分享量/点赞量,视频时长(秒),观众年龄50+占比,观众年龄50+TGI,过滤规则表达式", 'w') def log_info_print(log_json: dict[str, Any], account_id: Optional[int] = None): if 'ext' in log_json and isinstance(log_json['ext'], dict): log_json['ext'] = json.dumps(log_json['ext'], ensure_ascii=False) if 'modelValueConfig' and isinstance(log_json['modelValueConfig'], dict): log_json['modelValueConfig'] = json.dumps(log_json['modelValueConfig'], ensure_ascii=False) video_id = log_json["videoId"] keywords = log_json['keywords'] crawler_plan_id = log_json.get("crawlerPlanId", "") result = log_json.get("result", False) reason = log_json.get("reason", "") merge_cate2 = log_json['mergeSecondLevelCate'] sort_type = json.loads(log_json.get("modelValueConfig", "{}")).get("sortType") ext_json = json.loads(log_json.get("ext", "{}")) account_id = account_id if account_id else 0 if not ext_json: write_result_file(f"{video_id},{merge_cate2},{keywords},'{crawler_plan_id},'{result},{reason},{account_id},{sort_type}") return for channel_content_id in ext_json: channel_ext_info = ext_json[channel_content_id] filter_result = channel_ext_info.get("result", False) rule_str = channel_ext_info.get("rule", "") rule_context = channel_ext_info.get('ruleContext', {}) share_cnt = rule_context.get('shareCnt', 0) video_duration_s = rule_context.get('videoDuration_s', 0) like_cnt = rule_context.get('likeCnt', 0) audience_age_50_rate = rule_context.get('audienceAge50Rate', 0) audience_age_50_tgi = rule_context.get('audienceAge50TGI', 0) share_div_link = rule_context.get('shareDivLink', 0) channel_account_id = "" if "contentDetail" in channel_ext_info: channel_account_id = channel_ext_info["contentDetail"].get("channelAccountId") elif "fanPortrait" in channel_ext_info: channel_account_id = channel_ext_info["fanPortrait"].get("channelAccountId") write_result_file(f"{video_id},{merge_cate2},{keywords},'{crawler_plan_id},'{result},{reason},{account_id},{sort_type},'{channel_content_id},{channel_account_id},{filter_result}," f"{share_cnt},{like_cnt},{share_div_link},{video_duration_s},{audience_age_50_rate},{audience_age_50_tgi},{rule_str}") def keywords_search(keywords: str, sort_type: str, account_id=None) -> List[ChannelSearchAndDetailDTO]: search_config = DouYinSearchConfig( search_content=keywords, sort_type=sort_type, account_id=account_id ) return crawler_client.dou_yin_keywords_search(search_config, True, True) def eval_expr(expr: str, context: dict) -> bool: expr = expr.replace("&&", " and ").replace("||", " or ") return bool(simple_eval(expr, names=context)) def keywords_search_and_filter(keywords: str, sort_type: str, account_id: int, log_json: dict[str, Any], filters: List[SearchFilterConfigItem]) -> dict[str, Any]: need_copy_keys = ["videoId", "accountFilters", "contentFilters", "mergeSecondLevelCate", "keywords"] result_json = {} for key in need_copy_keys: result_json[key] = log_json.get(key) log_ext_info = {} result_json['ext'] = log_ext_info result_json['result'] = True result_json['modelValueConfig'] = {"sortType": sort_type} rule_str = AutoProvideUtil.parse_filter_config_to_rule_str(filters) channel_search_and_detail_dtos = keywords_search(keywords, sort_type, account_id) if not channel_search_and_detail_dtos: result_json["result"] = False result_json['reason'] = '关键词搜索结果为空' return result_json cnt = 0 for channel_search_and_detail_dto in channel_search_and_detail_dtos: channel_content_id = channel_search_and_detail_dto.channel_content_id channel_account_id = channel_search_and_detail_dto.channel_account_id content_detail = channel_search_and_detail_dto.content_detail fans_portrait = channel_search_and_detail_dto.fans_portrait ext_json = {} log_ext_info[channel_content_id] = ext_json if content_detail: content_detail['channelAccountId'] = channel_account_id content_detail['channelContentId'] = channel_content_id ext_json['contentDetail'] = content_detail if fans_portrait: fans_portrait['channelAccountId'] = channel_account_id fans_portrait['channelContentId'] = channel_content_id ext_json['fanPortrait'] = fans_portrait if (not content_detail) and (not fans_portrait): ext_json["result"] = False continue rule_context = AutoProvideUtil.extract_content_rule_feature(content_detail=content_detail, fans_portrait=fans_portrait) ext_json['ruleContext'] = rule_context ext_json['rule'] = rule_str if not rule_context: cnt += 1 continue result = eval_expr(expr=rule_str, context=rule_context) ext_json['result'] = result if result: cnt += 1 if cnt <= preFilterThreshold: log_json["result"] = False log_json['reason'] = '该关键词首页满足条件的视频数不足' return {} def keywords_not_login_comprehensive_sort(keywords: str, log_json: dict[str, Any], filters: List[SearchFilterConfigItem]): """ 未登录,综合排序 """ account_id = 0 log_json = keywords_search_and_filter(keywords=keywords, sort_type="综合排序", account_id=account_id, log_json=log_json, filters=filters) log_info_print(log_json, account_id=account_id) def keywords_login_comprehensive_sort(keywords: str, log_json: dict[str, Any], filters: List[SearchFilterConfigItem]): """ 登录,综合排序 """ account_id = 771431186 log_json = keywords_search_and_filter(keywords=keywords, sort_type="综合排序", account_id=account_id, log_json=log_json, filters=filters) log_info_print(log_json, account_id=account_id) def keywords_login_like_sort(keywords: str, log_json: dict[str, Any], filters: List[SearchFilterConfigItem]): """ 登录状态,最多点赞 """ account_id = 771431186 log_json = keywords_search_and_filter(keywords=keywords, sort_type="最多点赞", account_id=account_id, log_json=log_json, filters=filters) log_info_print(log_json, account_id=account_id) def handle_log_json(log_json: dict[str, Any]): log_info_print(log_json) # 未登录,最多点赞 keywords = log_json['keywords'] account_filters = json.loads(log_json.get("accountFilters", "[]")) content_filters = json.loads(log_json.get("contentFilters", '[]')) search_filter_config_tems = [] for filter_item in account_filters + content_filters: search_filter_config_tems.append(SearchFilterConfigItem(**filter_item)) keywords_not_login_comprehensive_sort(keywords, log_json, search_filter_config_tems) keywords_login_comprehensive_sort(keywords, log_json, search_filter_config_tems) keywords_login_like_sort(keywords, log_json, search_filter_config_tems) def main(): file_path = "/Users/zhao/Downloads/keywords_filter_test_sample.json" log_list = [] with open(file_path, "r", encoding="utf-8") as f: line = f.readline() while line: log_list.append(json.loads(line)) line = f.readline() log_info_print_title() for log in log_list: handle_log_json(log) if __name__ == '__main__': main()