| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203 |
- import json
- from typing import List, Any, Optional
- from simpleeval import simple_eval
- from client.CrawlerClient import CrawlerClient
- from model.automation_provide_job import DouYinSearchConfig, ChannelSearchAndDetailDTO, SearchFilterConfigItem
- from util.automation_provide_util import AutoProvideUtil
- crawler_client = CrawlerClient()
- preFilterThreshold = 3
- result_txt_file = '/Users/zhao/Desktop/tzld/文档/分析文档/关键词分析.txt'
- def write_result_file(content, mode='a+'):
- with open(result_txt_file, mode) as f:
- f.write(content)
- f.write("\n")
- def log_info_print_title():
- write_result_file(
- "视频ID,品类,关键词,爬取计划,结果,原因,搜索使用的账号ID,排序方式,站外视频ID,站外账号ID,过滤结果,分享量,点赞量,分享量/点赞量,视频时长(秒),观众年龄50+占比,观众年龄50+TGI,过滤规则表达式", 'w')
- def log_info_print(log_json: dict[str, Any], account_id: Optional[int] = None):
- if 'ext' in log_json and isinstance(log_json['ext'], dict):
- log_json['ext'] = json.dumps(log_json['ext'], ensure_ascii=False)
- if 'modelValueConfig' and isinstance(log_json['modelValueConfig'], dict):
- log_json['modelValueConfig'] = json.dumps(log_json['modelValueConfig'], ensure_ascii=False)
- video_id = log_json["videoId"]
- keywords = log_json['keywords']
- crawler_plan_id = log_json.get("crawlerPlanId", "")
- result = log_json.get("result", False)
- reason = log_json.get("reason", "")
- merge_cate2 = log_json['mergeSecondLevelCate']
- sort_type = json.loads(log_json.get("modelValueConfig", "{}")).get("sortType")
- ext_json = json.loads(log_json.get("ext", "{}"))
- account_id = account_id if account_id else 0
- if not ext_json:
- write_result_file(f"{video_id},{merge_cate2},{keywords},'{crawler_plan_id},'{result},{reason},{account_id},{sort_type}")
- return
- for channel_content_id in ext_json:
- channel_ext_info = ext_json[channel_content_id]
- filter_result = channel_ext_info.get("result", False)
- rule_str = channel_ext_info.get("rule", "")
- rule_context = channel_ext_info.get('ruleContext', {})
- share_cnt = rule_context.get('shareCnt', 0)
- video_duration_s = rule_context.get('videoDuration_s', 0)
- like_cnt = rule_context.get('likeCnt', 0)
- audience_age_50_rate = rule_context.get('audienceAge50Rate', 0)
- audience_age_50_tgi = rule_context.get('audienceAge50TGI', 0)
- share_div_link = rule_context.get('shareDivLink', 0)
- channel_account_id = ""
- if "contentDetail" in channel_ext_info:
- channel_account_id = channel_ext_info["contentDetail"].get("channelAccountId")
- elif "fanPortrait" in channel_ext_info:
- channel_account_id = channel_ext_info["fanPortrait"].get("channelAccountId")
- write_result_file(f"{video_id},{merge_cate2},{keywords},'{crawler_plan_id},'{result},{reason},{account_id},{sort_type},'{channel_content_id},{channel_account_id},{filter_result},"
- f"{share_cnt},{like_cnt},{share_div_link},{video_duration_s},{audience_age_50_rate},{audience_age_50_tgi},{rule_str}")
- def keywords_search(keywords: str, sort_type: str, account_id=None) -> List[ChannelSearchAndDetailDTO]:
- search_config = DouYinSearchConfig(
- search_content=keywords,
- sort_type=sort_type,
- account_id=account_id
- )
- return crawler_client.dou_yin_keywords_search(search_config, True, True)
- def eval_expr(expr: str, context: dict) -> bool:
- expr = expr.replace("&&", " and ").replace("||", " or ")
- return bool(simple_eval(expr, names=context))
- def keywords_search_and_filter(keywords: str, sort_type: str, account_id: int, log_json: dict[str, Any], filters: List[SearchFilterConfigItem]) -> dict[str, Any]:
- need_copy_keys = ["videoId", "accountFilters", "contentFilters", "mergeSecondLevelCate", "keywords"]
- result_json = {}
- for key in need_copy_keys:
- result_json[key] = log_json.get(key)
- log_ext_info = {}
- result_json['ext'] = log_ext_info
- result_json['result'] = True
- result_json['modelValueConfig'] = {"sortType": sort_type}
- rule_str = AutoProvideUtil.parse_filter_config_to_rule_str(filters)
- channel_search_and_detail_dtos = keywords_search(keywords, sort_type, account_id)
- if not channel_search_and_detail_dtos:
- result_json["result"] = False
- result_json['reason'] = '关键词搜索结果为空'
- return result_json
- cnt = 0
- for channel_search_and_detail_dto in channel_search_and_detail_dtos:
- channel_content_id = channel_search_and_detail_dto.channel_content_id
- channel_account_id = channel_search_and_detail_dto.channel_account_id
- content_detail = channel_search_and_detail_dto.content_detail
- fans_portrait = channel_search_and_detail_dto.fans_portrait
- ext_json = {}
- log_ext_info[channel_content_id] = ext_json
- if content_detail:
- content_detail['channelAccountId'] = channel_account_id
- content_detail['channelContentId'] = channel_content_id
- ext_json['contentDetail'] = content_detail
- if fans_portrait:
- fans_portrait['channelAccountId'] = channel_account_id
- fans_portrait['channelContentId'] = channel_content_id
- ext_json['fanPortrait'] = fans_portrait
- if (not content_detail) and (not fans_portrait):
- ext_json["result"] = False
- continue
- rule_context = AutoProvideUtil.extract_content_rule_feature(content_detail=content_detail, fans_portrait=fans_portrait)
- ext_json['ruleContext'] = rule_context
- ext_json['rule'] = rule_str
- if not rule_context:
- cnt += 1
- continue
- result = eval_expr(expr=rule_str, context=rule_context)
- ext_json['result'] = result
- if result:
- cnt += 1
- if cnt <= preFilterThreshold:
- log_json["result"] = False
- log_json['reason'] = '该关键词首页满足条件的视频数不足'
- return {}
- def keywords_not_login_comprehensive_sort(keywords: str, log_json: dict[str, Any], filters: List[SearchFilterConfigItem]):
- """
- 未登录,综合排序
- """
- account_id = 0
- log_json = keywords_search_and_filter(keywords=keywords, sort_type="综合排序", account_id=account_id, log_json=log_json, filters=filters)
- log_info_print(log_json, account_id=account_id)
- def keywords_login_comprehensive_sort(keywords: str, log_json: dict[str, Any], filters: List[SearchFilterConfigItem]):
- """
- 登录,综合排序
- """
- account_id = 771431186
- log_json = keywords_search_and_filter(keywords=keywords, sort_type="综合排序", account_id=account_id, log_json=log_json, filters=filters)
- log_info_print(log_json, account_id=account_id)
- def keywords_login_like_sort(keywords: str, log_json: dict[str, Any], filters: List[SearchFilterConfigItem]):
- """
- 登录状态,最多点赞
- """
- account_id = 771431186
- log_json = keywords_search_and_filter(keywords=keywords, sort_type="最多点赞", account_id=account_id, log_json=log_json, filters=filters)
- log_info_print(log_json, account_id=account_id)
- def handle_log_json(log_json: dict[str, Any]):
- log_info_print(log_json)
- # 未登录,最多点赞
- keywords = log_json['keywords']
- account_filters = json.loads(log_json.get("accountFilters", "[]"))
- content_filters = json.loads(log_json.get("contentFilters", '[]'))
- search_filter_config_tems = []
- for filter_item in account_filters + content_filters:
- search_filter_config_tems.append(SearchFilterConfigItem(**filter_item))
- keywords_not_login_comprehensive_sort(keywords, log_json, search_filter_config_tems)
- keywords_login_comprehensive_sort(keywords, log_json, search_filter_config_tems)
- keywords_login_like_sort(keywords, log_json, search_filter_config_tems)
- def main():
- file_path = "/Users/zhao/Downloads/keywords_filter_test_sample.json"
- log_list = []
- with open(file_path, "r", encoding="utf-8") as f:
- line = f.readline()
- while line:
- log_list.append(json.loads(line))
- line = f.readline()
- log_info_print_title()
- for log in log_list:
- handle_log_json(log)
- if __name__ == '__main__':
- main()
|