123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189 |
- """
- @author: luojunhui
- 调用接口在微信内搜索视频
- """
- import json
- import time
- import requests
- from applications.mq import MQ
- from applications.log import logging
- from applications.config import gh_id_dict
- from applications.functions.item import VideoItem
- from applications.functions.common import sensitive_flag
- def wx_search(keys):
- """
- WeChat search
- :param keys:
- :return:
- """
- url = "http://8.217.190.241:8888/crawler/wei_xin/keyword"
- payload = json.dumps({
- "keyword": keys,
- "cursor": "0",
- "content_type": "video"
- })
- headers = {
- 'Content-Type': 'application/json'
- }
- response = requests.request("POST", url, headers=headers, data=payload)
- return response.json()
- def process_weixin_video_obj(video_obj, user, trace_id, title):
- """
- 异步处理微信 video_obj
- 公众号和站内账号一一对应
- :param title:
- :param trace_id:
- :param user:
- :param video_obj:
- :return:
- """
- ETL_MQ = MQ(topic_name="topic_crawler_etl_prod")
- platform = "weixin_search"
- publish_time_stamp = int(video_obj['pubTime'])
- item = VideoItem()
- item.add_video_info("user_id", user["uid"])
- item.add_video_info("user_name", user["nick_name"])
- item.add_video_info("video_id", video_obj['hashDocID'])
- item.add_video_info("video_title", title)
- item.add_video_info("publish_time_stamp", int(publish_time_stamp))
- item.add_video_info("video_url", video_obj["videoUrl"])
- item.add_video_info("cover_url", video_obj["image"])
- item.add_video_info("out_video_id", video_obj['hashDocID'])
- item.add_video_info("out_user_id", trace_id)
- item.add_video_info("platform", platform)
- item.add_video_info("strategy", "search")
- item.add_video_info("session", "{}-{}".format(platform, int(time.time())))
- mq_obj = item.produce_item()
- ETL_MQ.send_msg(params=mq_obj)
- logging(
- code="6002",
- info="发送消息至 ETL",
- data=mq_obj
- )
- def return_video(video_path, title, trace_id):
- """
- search and send msg to ETL
- :param trace_id:
- :param title: 视频标题
- :param video_path: 视频路径
- :return:
- """
- with open(video_path, encoding='utf-8') as f:
- my_obj = json.loads(f.read())
- if my_obj:
- # 三者都搜索,优先搜索 title
- title_result = wx_search(keys=title)
- if title_result['msg'] == '未知错误':
- logging(
- code="7001",
- info="通过标题搜索失败---{}".format(title),
- trace_id=trace_id
- )
- else:
- obj_list = title_result['data']['data']
- for obj in obj_list:
- try:
- title = obj['items'][0]['title'].replace('<em class=\"highlight\">', '').replace('</em>',
- '').replace("#",
- "")
- if sensitive_flag(title):
- return obj
- else:
- continue
- except Exception as e:
- print(e)
- continue
- # search_keys
- search_keys_result = wx_search(keys=my_obj['search_keys'][0])
- if search_keys_result['msg'] == '未知错误':
- logging(
- code="7001",
- info="通过搜索词搜索失败---{}".format(title),
- trace_id=trace_id
- )
- else:
- obj_list = search_keys_result['data']['data']
- for obj in obj_list:
- try:
- title = obj['items'][0]['title'].replace('<em class=\"highlight\">', '').replace('</em>',
- '').replace("#",
- "")
- if sensitive_flag(title):
- return obj
- else:
- continue
- except Exception as e:
- print(e)
- continue
- # theme
- theme_result = wx_search(keys=my_obj['theme'])
- if theme_result['msg'] == '未知错误':
- logging(
- code="7001",
- info="通过主题搜索失败---{}".format(title),
- trace_id=trace_id
- )
- else:
- obj_list = theme_result['data']['data']
- for obj in obj_list:
- try:
- title = obj['items'][0]['title'].replace('<em class=\"highlight\">', '').replace('</em>',
- '').replace("#",
- "")
- if sensitive_flag(title):
- return obj
- else:
- continue
- except Exception as e:
- print(e)
- continue
- return None
- else:
- logging(
- code="7000",
- info="标题--{}--kimi 挖掘数据失败".format(title),
- trace_id=trace_id
- )
- return None
- def search_videos(video_path, title, trace_id, gh_id):
- """
- search and send msg to ETL
- :param gh_id:
- :param video_path:
- :param title:
- :param trace_id:
- :return:
- """
- video_obj = return_video(video_path, title, trace_id)
- if video_obj:
- logging(
- code="7002",
- info="视频搜索成功",
- trace_id=trace_id,
- data=video_obj
- )
- title = video_obj['items'][0]['title'].replace('<em class=\"highlight\">', '').replace('</em>', '').replace("#",
- "")
- process_weixin_video_obj(
- video_obj=video_obj['items'][0],
- user=gh_id_dict.get(gh_id),
- trace_id=trace_id,
- title=title
- )
- else:
- logging(
- code="7003",
- info="视频搜索失败",
- trace_id=trace_id
- )
|