| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256 |
- import json
- import os.path
- import time
- from typing import List, Any, Optional
- from simpleeval import simple_eval
- from client.CrawlerClient import CrawlerClient
- from model.automation_provide_job import DouYinSearchConfig, ChannelSearchAndDetailDTO, SearchFilterConfigItem
- from util.automation_provide_util import AutoProvideUtil
- crawler_client = CrawlerClient()
- preFilterThreshold = 2
- result_txt_file = '/Users/zhao/Desktop/tzld/文档/分析文档/关键词供给任务分析/关键词分析_20260226.txt'
- def write_result_file(content, mode='a+'):
- with open(result_txt_file, mode) as f:
- f.write(content)
- f.write("\n")
- def log_info_print_title():
- write_result_file("品类,标题,videoid,解析的关键词,抖音搜索首页结果初判是否通过,搜索视频链接,热点宝账号链接,点赞量,分享量,分享比点赞,时长,账号50以上占比,账号50以上TGI", 'w')
- def log_info_print(log_json: dict[str, Any], account_id: Optional[str] = None):
- if 'ext' in log_json and isinstance(log_json['ext'], dict):
- log_json['ext'] = json.dumps(log_json['ext'], ensure_ascii=False)
- if 'modelValueConfig' in log_json and isinstance(log_json['modelValueConfig'], dict):
- log_json['modelValueConfig'] = json.dumps(log_json['modelValueConfig'], ensure_ascii=False)
- video_id = log_json.get("videoId", "-1")
- keywords = log_json.get('keywords', "")
- pq_title = log_json.get("pqTitle", "")
- crawler_plan_id = log_json.get("crawlerPlanId", "")
- result = log_json.get("result", False)
- reason = log_json.get("reason", "")
- merge_cate2 = log_json.get('mergeSecondLevelCate')
- sort_type = json.loads(log_json.get("modelValueConfig", "{}")).get("sortType")
- ext_json = json.loads(log_json.get("ext", "{}"))
- account_id = account_id if account_id else 0
- if not ext_json:
- write_result_file(f"{merge_cate2},{pq_title},{video_id}.{keywords},False")
- return
- for channel_content_id in ext_json:
- if channel_content_id in ['mergeCate2Map']:
- continue
- channel_ext_info = ext_json[channel_content_id]
- filter_result = channel_ext_info.get("result", False)
- rule_str = channel_ext_info.get("rule", "")
- rule_context = channel_ext_info.get('ruleContext', {})
- share_cnt = rule_context.get('shareCnt', 0)
- video_duration_s = rule_context.get('videoDuration_s', 0)
- like_cnt = rule_context.get('likeCnt', 0)
- audience_age_50_rate = rule_context.get('audienceAge50Rate', 0)
- audience_age_50_tgi = rule_context.get('audienceAge50TGI', 0)
- share_div_link = rule_context.get('shareDivLink', 0)
- channel_account_id = ""
- if "contentDetail" in channel_ext_info:
- channel_account_id = channel_ext_info["contentDetail"].get("channelAccountId")
- elif "fanPortrait" in channel_ext_info:
- channel_account_id = channel_ext_info["fanPortrait"].get("channelAccountId")
- write_result_file(
- f"{merge_cate2},{pq_title},{video_id},{keywords},{filter_result},{channel_content_id},{channel_account_id},"
- f"{like_cnt},{share_cnt},{share_div_link},{video_duration_s},{audience_age_50_rate},{audience_age_50_tgi}"
- )
- def keywords_search(keywords: str, sort_type: str, account: Optional[str] = None) -> List[ChannelSearchAndDetailDTO]:
- search_config = DouYinSearchConfig(
- search_content=keywords,
- sort_type=sort_type,
- account_id=account,
- )
- return crawler_client.dou_yin_keywords_search(search_config, True, True)
- def eval_expr(expr: str, context: dict) -> bool:
- expr = expr.replace("&&", " and ").replace("||", " or ")
- return bool(simple_eval(expr, names=context))
- def keywords_search_and_filter(keywords: str, sort_type: str, account_id: str, log_json: dict[str, Any], filters: List[SearchFilterConfigItem]) -> dict[str, Any]:
- need_copy_keys = ["keywords"]
- result_json = {}
- for key in need_copy_keys:
- result_json[key] = log_json.get(key)
- log_ext_info = {}
- result_json['ext'] = log_ext_info
- result_json['result'] = True
- result_json['modelValueConfig'] = {"sortType": sort_type}
- rule_str = AutoProvideUtil.parse_filter_config_to_rule_str(filters)
- channel_search_and_detail_dtos = []
- for i in range(5):
- channel_search_and_detail_dtos = keywords_search(keywords, sort_type, account_id)
- if channel_search_and_detail_dtos:
- break
- time.sleep(5)
- if not channel_search_and_detail_dtos:
- result_json["result"] = False
- result_json['reason'] = '关键词搜索结果为空'
- return result_json
- cnt = 0
- for channel_search_and_detail_dto in channel_search_and_detail_dtos:
- channel_content_id = channel_search_and_detail_dto.channel_content_id
- channel_account_id = channel_search_and_detail_dto.channel_account_id
- content_detail = channel_search_and_detail_dto.content_detail
- fans_portrait = channel_search_and_detail_dto.fans_portrait
- ext_json = {}
- log_ext_info[channel_content_id] = ext_json
- if content_detail:
- content_detail['channelAccountId'] = channel_account_id
- content_detail['channelContentId'] = channel_content_id
- ext_json['contentDetail'] = content_detail
- if fans_portrait:
- fans_portrait['channelAccountId'] = channel_account_id
- fans_portrait['channelContentId'] = channel_content_id
- ext_json['fanPortrait'] = fans_portrait
- rule_context = AutoProvideUtil.extract_content_rule_feature(content_detail=content_detail, fans_portrait=fans_portrait)
- ext_json['ruleContext'] = rule_context
- ext_json['rule'] = rule_str
- if (not content_detail) or (not fans_portrait):
- ext_json["result"] = False
- continue
- if not rule_context:
- cnt += 1
- continue
- result = False
- try:
- result = eval_expr(expr=rule_str, context=rule_context)
- except Exception as e:
- print(rule_str, rule_context)
- ext_json['result'] = result
- if result:
- cnt += 1
- if cnt < preFilterThreshold:
- result_json["result"] = False
- result_json['reason'] = '该关键词首页满足条件的视频数不足'
- return result_json
- def keywords_not_login_comprehensive_sort(keywords: str, log_json: dict[str, Any], filters: List[SearchFilterConfigItem]):
- """
- 未登录,综合排序
- """
- account_id = "0"
- log_json = keywords_search_and_filter(keywords=keywords, sort_type="综合排序", account_id=account_id, log_json=log_json, filters=filters)
- log_info_print(log_json, account_id=account_id)
- def keywords_login_comprehensive_sort(keywords: str, log_json: dict[str, Any], filters: List[SearchFilterConfigItem]):
- """
- 登录,综合排序
- """
- account_id = "771431186"
- log_json = keywords_search_and_filter(keywords=keywords, sort_type="综合排序", account_id=account_id, log_json=log_json, filters=filters)
- log_info_print(log_json, account_id=account_id)
- def keywords_login_like_sort(keywords: str, log_json: dict[str, Any], filters: List[SearchFilterConfigItem]):
- """
- 登录状态,最多点赞
- """
- account_id = "771431186"
- log_json = keywords_search_and_filter(keywords=keywords, sort_type="最多点赞", account_id=account_id, log_json=log_json, filters=filters)
- log_info_print(log_json, account_id=account_id)
- def handle_log_json(log_json: dict[str, Any]):
- # 登录,综合排序
- log_info_print(log_json)
- keywords = log_json['keywords']
- account_filters = json.loads(log_json.get("accountFilters", "[]"))
- content_filters = json.loads(log_json.get("contentFilters", '[]'))
- search_filter_config_tems = []
- for filter_item in account_filters + content_filters:
- search_filter_config_tems.append(SearchFilterConfigItem(**filter_item))
- # keywords_not_login_comprehensive_sort(keywords, log_json, search_filter_config_tems)
- # keywords_login_comprehensive_sort(keywords, log_json, search_filter_config_tems)
- # keywords_login_like_sort(keywords, log_json, search_filter_config_tems)
- def main():
- if os.path.exists(result_txt_file):
- os.remove(result_txt_file)
- file_path = '/Users/zhao/Desktop/20260226_keywords_job.json'
- log_list = []
- with open(file_path, "r", encoding="utf-8") as f:
- line = f.readline()
- while line:
- log_list.append(json.loads(line))
- line = f.readline()
- log_info_print_title()
- for log in log_list:
- if "历史名人" == log.get("mergeSecondLevelCate", ""):
- handle_log_json(log)
- search_filter_config_tems = []
- account_filters = json.loads(log_list[0].get("accountFilters", "[]"))
- content_filters = json.loads(log_list[0].get("contentFilters", '[]'))
- for filter_item in account_filters + content_filters:
- search_filter_config_tems.append(SearchFilterConfigItem(**filter_item))
- keywords_list = [
- "大字报式",
- "情感化",
- "事迹",
- "伟人",
- "毛主席",
- "历史人物",
- "民族自豪感",
- "轶事",
- "爱国主义",
- "800万",
- "物证",
- "刘永坦院士",
- "生活细节",
- "历史人物评价",
- "先驱先烈",
- "科研国士",
- "开国将帅"
- ]
- account_id = '771431206'
- for keyword in keywords_list:
- log_json = keywords_search_and_filter(keywords=keyword, sort_type="综合排序", account_id=account_id, log_json={"keywords": keyword}, filters=search_filter_config_tems)
- log_info_print(log_json, account_id=account_id)
- if __name__ == '__main__':
- main()
|