import json import os.path import time from typing import List, Any, Optional from simpleeval import simple_eval from client.CrawlerClient import CrawlerClient from model.automation_provide_job import DouYinSearchConfig, ChannelSearchAndDetailDTO, SearchFilterConfigItem from util.automation_provide_util import AutoProvideUtil crawler_client = CrawlerClient() preFilterThreshold = 2 result_txt_file = '/Users/zhao/Desktop/tzld/文档/分析文档/关键词供给任务分析/关键词分析_20260226.txt' def write_result_file(content, mode='a+'): with open(result_txt_file, mode) as f: f.write(content) f.write("\n") def log_info_print_title(): write_result_file("品类,标题,videoid,解析的关键词,抖音搜索首页结果初判是否通过,搜索视频链接,热点宝账号链接,点赞量,分享量,分享比点赞,时长,账号50以上占比,账号50以上TGI", 'w') def log_info_print(log_json: dict[str, Any], account_id: Optional[str] = None): if 'ext' in log_json and isinstance(log_json['ext'], dict): log_json['ext'] = json.dumps(log_json['ext'], ensure_ascii=False) if 'modelValueConfig' in log_json and isinstance(log_json['modelValueConfig'], dict): log_json['modelValueConfig'] = json.dumps(log_json['modelValueConfig'], ensure_ascii=False) video_id = log_json.get("videoId", "-1") keywords = log_json.get('keywords', "") pq_title = log_json.get("pqTitle", "") crawler_plan_id = log_json.get("crawlerPlanId", "") result = log_json.get("result", False) reason = log_json.get("reason", "") merge_cate2 = log_json.get('mergeSecondLevelCate') sort_type = json.loads(log_json.get("modelValueConfig", "{}")).get("sortType") ext_json = json.loads(log_json.get("ext", "{}")) account_id = account_id if account_id else 0 if not ext_json: write_result_file(f"{merge_cate2},{pq_title},{video_id}.{keywords},False") return for channel_content_id in ext_json: if channel_content_id in ['mergeCate2Map']: continue channel_ext_info = ext_json[channel_content_id] filter_result = channel_ext_info.get("result", False) rule_str = channel_ext_info.get("rule", "") rule_context = channel_ext_info.get('ruleContext', {}) share_cnt = rule_context.get('shareCnt', 0) video_duration_s = rule_context.get('videoDuration_s', 0) like_cnt = rule_context.get('likeCnt', 0) audience_age_50_rate = rule_context.get('audienceAge50Rate', 0) audience_age_50_tgi = rule_context.get('audienceAge50TGI', 0) share_div_link = rule_context.get('shareDivLink', 0) channel_account_id = "" if "contentDetail" in channel_ext_info: channel_account_id = channel_ext_info["contentDetail"].get("channelAccountId") elif "fanPortrait" in channel_ext_info: channel_account_id = channel_ext_info["fanPortrait"].get("channelAccountId") write_result_file( f"{merge_cate2},{pq_title},{video_id},{keywords},{filter_result},{channel_content_id},{channel_account_id}," f"{like_cnt},{share_cnt},{share_div_link},{video_duration_s},{audience_age_50_rate},{audience_age_50_tgi}" ) def keywords_search(keywords: str, sort_type: str, account: Optional[str] = None) -> List[ChannelSearchAndDetailDTO]: search_config = DouYinSearchConfig( search_content=keywords, sort_type=sort_type, account_id=account, ) return crawler_client.dou_yin_keywords_search(search_config, True, True) def eval_expr(expr: str, context: dict) -> bool: expr = expr.replace("&&", " and ").replace("||", " or ") return bool(simple_eval(expr, names=context)) def keywords_search_and_filter(keywords: str, sort_type: str, account_id: str, log_json: dict[str, Any], filters: List[SearchFilterConfigItem]) -> dict[str, Any]: need_copy_keys = ["keywords"] result_json = {} for key in need_copy_keys: result_json[key] = log_json.get(key) log_ext_info = {} result_json['ext'] = log_ext_info result_json['result'] = True result_json['modelValueConfig'] = {"sortType": sort_type} rule_str = AutoProvideUtil.parse_filter_config_to_rule_str(filters) channel_search_and_detail_dtos = [] for i in range(5): channel_search_and_detail_dtos = keywords_search(keywords, sort_type, account_id) if channel_search_and_detail_dtos: break time.sleep(5) if not channel_search_and_detail_dtos: result_json["result"] = False result_json['reason'] = '关键词搜索结果为空' return result_json cnt = 0 for channel_search_and_detail_dto in channel_search_and_detail_dtos: channel_content_id = channel_search_and_detail_dto.channel_content_id channel_account_id = channel_search_and_detail_dto.channel_account_id content_detail = channel_search_and_detail_dto.content_detail fans_portrait = channel_search_and_detail_dto.fans_portrait ext_json = {} log_ext_info[channel_content_id] = ext_json if content_detail: content_detail['channelAccountId'] = channel_account_id content_detail['channelContentId'] = channel_content_id ext_json['contentDetail'] = content_detail if fans_portrait: fans_portrait['channelAccountId'] = channel_account_id fans_portrait['channelContentId'] = channel_content_id ext_json['fanPortrait'] = fans_portrait rule_context = AutoProvideUtil.extract_content_rule_feature(content_detail=content_detail, fans_portrait=fans_portrait) ext_json['ruleContext'] = rule_context ext_json['rule'] = rule_str if (not content_detail) or (not fans_portrait): ext_json["result"] = False continue if not rule_context: cnt += 1 continue result = False try: result = eval_expr(expr=rule_str, context=rule_context) except Exception as e: print(rule_str, rule_context) ext_json['result'] = result if result: cnt += 1 if cnt < preFilterThreshold: result_json["result"] = False result_json['reason'] = '该关键词首页满足条件的视频数不足' return result_json def keywords_not_login_comprehensive_sort(keywords: str, log_json: dict[str, Any], filters: List[SearchFilterConfigItem]): """ 未登录,综合排序 """ account_id = "0" log_json = keywords_search_and_filter(keywords=keywords, sort_type="综合排序", account_id=account_id, log_json=log_json, filters=filters) log_info_print(log_json, account_id=account_id) def keywords_login_comprehensive_sort(keywords: str, log_json: dict[str, Any], filters: List[SearchFilterConfigItem]): """ 登录,综合排序 """ account_id = "771431186" log_json = keywords_search_and_filter(keywords=keywords, sort_type="综合排序", account_id=account_id, log_json=log_json, filters=filters) log_info_print(log_json, account_id=account_id) def keywords_login_like_sort(keywords: str, log_json: dict[str, Any], filters: List[SearchFilterConfigItem]): """ 登录状态,最多点赞 """ account_id = "771431186" log_json = keywords_search_and_filter(keywords=keywords, sort_type="最多点赞", account_id=account_id, log_json=log_json, filters=filters) log_info_print(log_json, account_id=account_id) def handle_log_json(log_json: dict[str, Any]): # 登录,综合排序 log_info_print(log_json) keywords = log_json['keywords'] account_filters = json.loads(log_json.get("accountFilters", "[]")) content_filters = json.loads(log_json.get("contentFilters", '[]')) search_filter_config_tems = [] for filter_item in account_filters + content_filters: search_filter_config_tems.append(SearchFilterConfigItem(**filter_item)) # keywords_not_login_comprehensive_sort(keywords, log_json, search_filter_config_tems) # keywords_login_comprehensive_sort(keywords, log_json, search_filter_config_tems) # keywords_login_like_sort(keywords, log_json, search_filter_config_tems) def main(): if os.path.exists(result_txt_file): os.remove(result_txt_file) file_path = '/Users/zhao/Desktop/20260226_keywords_job.json' log_list = [] with open(file_path, "r", encoding="utf-8") as f: line = f.readline() while line: log_list.append(json.loads(line)) line = f.readline() log_info_print_title() for log in log_list: if "历史名人" == log.get("mergeSecondLevelCate", ""): handle_log_json(log) search_filter_config_tems = [] account_filters = json.loads(log_list[0].get("accountFilters", "[]")) content_filters = json.loads(log_list[0].get("contentFilters", '[]')) for filter_item in account_filters + content_filters: search_filter_config_tems.append(SearchFilterConfigItem(**filter_item)) keywords_list = [ "大字报式", "情感化", "事迹", "伟人", "毛主席", "历史人物", "民族自豪感", "轶事", "爱国主义", "800万", "物证", "刘永坦院士", "生活细节", "历史人物评价", "先驱先烈", "科研国士", "开国将帅" ] account_id = '771431206' for keyword in keywords_list: log_json = keywords_search_and_filter(keywords=keyword, sort_type="综合排序", account_id=account_id, log_json={"keywords": keyword}, filters=search_filter_config_tems) log_info_print(log_json, account_id=account_id) if __name__ == '__main__': main()