""" @author: luojunhui 调用接口在微信内搜索视频 """ import json import time import requests from applications.mq import MQ from applications.log import logging from applications.config import gh_id_dict from applications.functions.item import VideoItem from applications.functions.common import sensitive_flag def wx_search(keys): """ WeChat search :param keys: :return: """ url = "http://8.217.190.241:8888/crawler/wei_xin/keyword" payload = json.dumps({ "keyword": keys, "cursor": "0", "content_type": "video" }) headers = { 'Content-Type': 'application/json' } response = requests.request("POST", url, headers=headers, data=payload) return response.json() def process_weixin_video_obj(video_obj, user, trace_id, title): """ 异步处理微信 video_obj 公众号和站内账号一一对应 :param title: :param trace_id: :param user: :param video_obj: :return: """ ETL_MQ = MQ(topic_name="topic_crawler_etl_prod") platform = "weixin_search" publish_time_stamp = int(video_obj['pubTime']) item = VideoItem() item.add_video_info("user_id", user["uid"]) item.add_video_info("user_name", user["nick_name"]) item.add_video_info("video_id", video_obj['hashDocID']) item.add_video_info("video_title", title) item.add_video_info("publish_time_stamp", int(publish_time_stamp)) item.add_video_info("video_url", video_obj["videoUrl"]) item.add_video_info("cover_url", video_obj["image"]) item.add_video_info("out_video_id", video_obj['hashDocID']) item.add_video_info("out_user_id", trace_id) item.add_video_info("platform", platform) item.add_video_info("strategy", "search") item.add_video_info("session", "{}-{}".format(platform, int(time.time()))) mq_obj = item.produce_item() ETL_MQ.send_msg(params=mq_obj) logging( code="6002", info="发送消息至 ETL", data=mq_obj ) def return_video(video_path, title, trace_id): """ search and send msg to ETL :param trace_id: :param title: 视频标题 :param video_path: 视频路径 :return: """ with open(video_path, encoding='utf-8') as f: my_obj = json.loads(f.read()) if my_obj: # 三者都搜索,优先搜索 title title_result = wx_search(keys=title) if title_result['msg'] == '未知错误': logging( code="7001", info="通过标题搜索失败---{}".format(title), trace_id=trace_id ) else: obj_list = title_result['data']['data'] for obj in obj_list: try: title = obj['items'][0]['title'].replace('', '').replace('', '').replace("#", "") if sensitive_flag(title): return obj else: continue except Exception as e: print(e) continue # search_keys search_keys_result = wx_search(keys=my_obj['search_keys'][0]) if search_keys_result['msg'] == '未知错误': logging( code="7001", info="通过搜索词搜索失败---{}".format(title), trace_id=trace_id ) else: obj_list = search_keys_result['data']['data'] for obj in obj_list: try: title = obj['items'][0]['title'].replace('', '').replace('', '').replace("#", "") if sensitive_flag(title): return obj else: continue except Exception as e: print(e) continue # theme theme_result = wx_search(keys=my_obj['theme']) if theme_result['msg'] == '未知错误': logging( code="7001", info="通过主题搜索失败---{}".format(title), trace_id=trace_id ) else: obj_list = theme_result['data']['data'] for obj in obj_list: try: title = obj['items'][0]['title'].replace('', '').replace('', '').replace("#", "") if sensitive_flag(title): return obj else: continue except Exception as e: print(e) continue return None else: logging( code="7000", info="标题--{}--kimi 挖掘数据失败".format(title), trace_id=trace_id ) return None def search_videos(video_path, title, trace_id, gh_id): """ search and send msg to ETL :param gh_id: :param video_path: :param title: :param trace_id: :return: """ video_obj = return_video(video_path, title, trace_id) if video_obj: logging( code="7002", info="视频搜索成功", trace_id=trace_id, data=video_obj ) title = video_obj['items'][0]['title'].replace('', '').replace('', '').replace("#", "") process_weixin_video_obj( video_obj=video_obj['items'][0], user=gh_id_dict.get(gh_id), trace_id=trace_id, title=title ) else: logging( code="7003", info="视频搜索失败", trace_id=trace_id )