"""
@author: luojunhui
调用接口在微信内搜索视频
"""
import json
import time
import requests
from applications.mq import MQ
from applications.log import logging
from applications.config import gh_id_dict
from applications.functions.item import VideoItem
from applications.functions.common import sensitive_flag
def wx_search(keys):
"""
WeChat search
:param keys:
:return:
"""
url = "http://8.217.190.241:8888/crawler/wei_xin/keyword"
payload = json.dumps({
"keyword": keys,
"cursor": "0",
"content_type": "video"
})
headers = {
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
return response.json()
def process_weixin_video_obj(video_obj, user, trace_id, title):
"""
异步处理微信 video_obj
公众号和站内账号一一对应
:param title:
:param trace_id:
:param user:
:param video_obj:
:return:
"""
ETL_MQ = MQ(topic_name="topic_crawler_etl_prod")
platform = "weixin_search"
publish_time_stamp = int(video_obj['pubTime'])
item = VideoItem()
item.add_video_info("user_id", user["uid"])
item.add_video_info("user_name", user["nick_name"])
item.add_video_info("video_id", video_obj['hashDocID'])
item.add_video_info("video_title", title)
item.add_video_info("publish_time_stamp", int(publish_time_stamp))
item.add_video_info("video_url", video_obj["videoUrl"])
item.add_video_info("cover_url", video_obj["image"])
item.add_video_info("out_video_id", video_obj['hashDocID'])
item.add_video_info("out_user_id", trace_id)
item.add_video_info("platform", platform)
item.add_video_info("strategy", "search")
item.add_video_info("session", "{}-{}".format(platform, int(time.time())))
mq_obj = item.produce_item()
ETL_MQ.send_msg(params=mq_obj)
logging(
code="6002",
info="发送消息至 ETL",
data=mq_obj
)
def return_video(video_path, title, trace_id):
"""
search and send msg to ETL
:param trace_id:
:param title: 视频标题
:param video_path: 视频路径
:return:
"""
with open(video_path, encoding='utf-8') as f:
my_obj = json.loads(f.read())
if my_obj:
# 三者都搜索,优先搜索 title
title_result = wx_search(keys=title)
if title_result['msg'] == '未知错误':
logging(
code="7001",
info="通过标题搜索失败---{}".format(title),
trace_id=trace_id
)
else:
obj_list = title_result['data']['data']
for obj in obj_list:
try:
title = obj['items'][0]['title'].replace('', '').replace('',
'').replace("#",
"")
if sensitive_flag(title):
return obj
else:
continue
except Exception as e:
print(e)
continue
# search_keys
search_keys_result = wx_search(keys=my_obj['search_keys'][0])
if search_keys_result['msg'] == '未知错误':
logging(
code="7001",
info="通过搜索词搜索失败---{}".format(title),
trace_id=trace_id
)
else:
obj_list = search_keys_result['data']['data']
for obj in obj_list:
try:
title = obj['items'][0]['title'].replace('', '').replace('',
'').replace("#",
"")
if sensitive_flag(title):
return obj
else:
continue
except Exception as e:
print(e)
continue
# theme
theme_result = wx_search(keys=my_obj['theme'])
if theme_result['msg'] == '未知错误':
logging(
code="7001",
info="通过主题搜索失败---{}".format(title),
trace_id=trace_id
)
else:
obj_list = theme_result['data']['data']
for obj in obj_list:
try:
title = obj['items'][0]['title'].replace('', '').replace('',
'').replace("#",
"")
if sensitive_flag(title):
return obj
else:
continue
except Exception as e:
print(e)
continue
return None
else:
logging(
code="7000",
info="标题--{}--kimi 挖掘数据失败".format(title),
trace_id=trace_id
)
return None
def search_videos(video_path, title, trace_id, gh_id):
"""
search and send msg to ETL
:param gh_id:
:param video_path:
:param title:
:param trace_id:
:return:
"""
video_obj = return_video(video_path, title, trace_id)
if video_obj:
logging(
code="7002",
info="视频搜索成功",
trace_id=trace_id,
data=video_obj
)
title = video_obj['items'][0]['title'].replace('', '').replace('', '').replace("#",
"")
process_weixin_video_obj(
video_obj=video_obj['items'][0],
user=gh_id_dict.get(gh_id),
trace_id=trace_id,
title=title
)
else:
logging(
code="7003",
info="视频搜索失败",
trace_id=trace_id
)