public.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285
  1. # -*- coding: utf-8 -*-
  2. # @Author: wangkun
  3. # @Time: 2023/3/27
  4. import requests
  5. from mq_http_sdk.mq_client import *
  6. from mq_http_sdk.mq_exception import MQExceptionBase
  7. import os, sys, jieba
  8. import time
  9. import random
  10. import difflib
  11. sys.path.append(os.getcwd())
  12. from common.common import Common
  13. from common.feishu import Feishu
  14. from common.scheduling_db import MysqlHelper
  15. # from common import Common
  16. # from feishu import Feishu
  17. # from scheduling_db import MysqlHelper
  18. def get_user_from_mysql(log_type, crawler, source, env, action=''):
  19. sql = f"select * from crawler_user_v3 where source='{source}' and mode='{log_type}'"
  20. results = MysqlHelper.get_values(log_type, crawler, sql, env, action=action)
  21. if results:
  22. return results
  23. else:
  24. Common.logger(log_type, crawler).warning(f"爬虫:{crawler},没有查到抓取名单")
  25. return []
  26. def title_like(log_type, crawler, platform, title, env):
  27. """
  28. 标题相似度
  29. :param log_type: 日志
  30. :param crawler: 哪款爬虫
  31. :param platform: 爬虫渠道,如:公众号 / 小年糕
  32. :param title: 视频标题
  33. :param env: 环境
  34. :return: 相似度>=80%,返回 True;反之,返回 False
  35. """
  36. select_sql = f""" select video_title from crawler_video where platform="{platform}" """
  37. video_list = MysqlHelper.get_values(log_type, crawler, select_sql, env, action="")
  38. # print(video_list)
  39. if len(video_list) == 0:
  40. return False
  41. for video_dict in video_list:
  42. video_title = video_dict["video_title"]
  43. # print(video_title)
  44. if difflib.SequenceMatcher(None, title, video_title).quick_ratio() >= 0.8:
  45. return True
  46. else:
  47. continue
  48. return False
  49. def get_config_from_mysql(log_type, source, env, text, action=''):
  50. select_sql = f"""select * from crawler_config where source="{source}" """
  51. contents = MysqlHelper.get_values(log_type, source, select_sql, env, action=action)
  52. title_list = []
  53. filter_list = []
  54. emoji_list = []
  55. search_word_list = []
  56. for content in contents:
  57. config = content['config']
  58. config_dict = eval(config)
  59. for k, v in config_dict.items():
  60. if k == "title":
  61. title_list_config = v.split(",")
  62. for title in title_list_config:
  63. title_list.append(title)
  64. if k == "filter":
  65. filter_list_config = v.split(",")
  66. for filter_word in filter_list_config:
  67. filter_list.append(filter_word)
  68. if k == "emoji":
  69. emoji_list_config = v.split(",")
  70. for emoji in emoji_list_config:
  71. emoji_list.append(emoji)
  72. if k == "search_word":
  73. search_word_list_config = v.split(",")
  74. for search_word in search_word_list_config:
  75. search_word_list.append(search_word)
  76. if text == "title":
  77. return title_list
  78. elif text == "filter":
  79. return filter_list
  80. elif text == "emoji":
  81. return emoji_list
  82. elif text == "search_word":
  83. return search_word_list
  84. def random_title(log_type, crawler, env, text):
  85. random_title_list = get_config_from_mysql(log_type, crawler, env, text)
  86. return random.choice(random_title_list)
  87. def task_fun(task_str):
  88. task_str = task_str.replace("'[", '[').replace("]'", ']')
  89. task_dict = dict(eval(task_str))
  90. rule = task_dict['rule']
  91. task_dict['rule'] = dict()
  92. for item in rule:
  93. for k, val in item.items():
  94. task_dict['rule'][k] = val
  95. rule_dict = task_dict['rule']
  96. task_dict = {
  97. "task_dict": task_dict,
  98. "rule_dict": rule_dict
  99. }
  100. return task_dict
  101. def task_fun_mq(task_str):
  102. task_str = task_str.replace('"[', '[').replace(']"', ']').replace('\\', '')
  103. task_dict = dict(eval(task_str))
  104. rule = task_dict['rule']
  105. task_dict['rule'] = dict()
  106. for item in rule:
  107. for k, val in item.items():
  108. task_dict['rule'][k] = val
  109. rule_dict = task_dict['rule']
  110. task_dict = {
  111. "task_dict": task_dict,
  112. "rule_dict": rule_dict
  113. }
  114. return task_dict
  115. def get_consumer(topic_name, group_id):
  116. # 初始化client。
  117. mq_client = MQClient(
  118. # 设置HTTP协议客户端接入点,进入云消息队列 RocketMQ 版控制台实例详情页面的接入点区域查看。
  119. "http://1894469520484605.mqrest.cn-qingdao-public.aliyuncs.com",
  120. # AccessKey ID,阿里云身份验证标识。获取方式,请参见创建AccessKey。
  121. "LTAI4G7puhXtLyHzHQpD6H7A",
  122. # AccessKey Secret,阿里云身份验证密钥。获取方式,请参见创建AccessKey。
  123. "nEbq3xWNQd1qLpdy2u71qFweHkZjSG"
  124. )
  125. # 消息所属的Topic,在云消息队列 RocketMQ 版控制台创建。
  126. # topic_name = "${TOPIC}"
  127. topic_name = str(topic_name)
  128. # 您在云消息队列 RocketMQ 版控制台创建的Group ID。
  129. # group_id = "${GROUP_ID}"
  130. group_id = str(group_id)
  131. # Topic所属的实例ID,在云消息队列 RocketMQ 版控制台创建。
  132. # 若实例有命名空间,则实例ID必须传入;若实例无命名空间,则实例ID传入空字符串。实例的命名空间可以在云消息队列 RocketMQ 版控制台的实例详情页面查看。
  133. instance_id = "MQ_INST_1894469520484605_BXhXuzkZ"
  134. consumer = mq_client.get_consumer(instance_id, topic_name, group_id)
  135. return consumer
  136. def ack_message(log_type, crawler, recv_msgs, consumer):
  137. # msg.next_consume_time前若不确认消息消费成功,则消息会被重复消费。
  138. # 消息句柄有时间戳,同一条消息每次消费拿到的都不一样。
  139. try:
  140. receipt_handle_list = [msg.receipt_handle for msg in recv_msgs]
  141. consumer.ack_message(receipt_handle_list)
  142. Common.logger(log_type, crawler).info(f"Ack {len(receipt_handle_list)} Message Succeed.\n")
  143. except MQExceptionBase as err:
  144. Common.logger(log_type, crawler).info(f"Ack Message Fail! Exception:{err}\n")
  145. def download_rule(log_type, crawler, video_dict, rule_dict):
  146. """
  147. 下载视频的基本规则
  148. :param log_type: 日志
  149. :param crawler: 哪款爬虫
  150. :param video_dict: 视频信息,字典格式
  151. :param rule_dict: 规则信息,字典格式
  152. :return: 满足规则,返回 True;反之,返回 False
  153. """
  154. # 格式化 video_dict:publish_time_stamp
  155. if "publish_time_stamp" in video_dict.keys():
  156. video_dict["publish_time"] = video_dict["publish_time_stamp"] * 1000
  157. # 格式化 video_dict:period
  158. if "period" not in video_dict.keys() and "publish_time" in video_dict.keys():
  159. video_dict["period"] = int((int(time.time() * 1000) - video_dict["publish_time"]) / (3600 * 24 * 1000))
  160. # 格式化 rule_dict 最大值取值为 0 的问题
  161. for rule_value in rule_dict.values():
  162. if rule_value["max"] == 0:
  163. rule_value["max"] = 999999999999999
  164. # 格式化 rule_dict 有的 key,video_dict 中没有的问题
  165. for rule_key in rule_dict.keys():
  166. if rule_key not in video_dict.keys():
  167. video_dict[rule_key] = int(rule_dict[rule_key]["max"] / 2)
  168. # 比较结果,输出:True / False
  169. for video_key, video_value in video_dict.items():
  170. for rule_key, rule_value in rule_dict.items():
  171. if video_key == rule_key == "period":
  172. result = 0 <= int(video_value) <= int(rule_value["max"])
  173. Common.logger(log_type, crawler).info(f'{video_key}: 0 <= {video_value} <= {rule_value["min"]}, {result}')
  174. elif video_key == rule_key:
  175. result = int(rule_value["min"]) <= int(video_value) <= int(rule_value["max"])
  176. Common.logger(log_type, crawler).info(f'{video_key}: {rule_value["min"]} <= {video_value} <= {rule_value["max"]},{result}')
  177. else:
  178. result = True
  179. if result is False:
  180. return False
  181. else:
  182. continue
  183. return True
  184. def get_word_score(log_type, crawler, score_sheet, word):
  185. while True:
  186. score_sheet = Feishu.get_values_batch(log_type, crawler, score_sheet)
  187. if score_sheet is None:
  188. time.sleep(1)
  189. continue
  190. for i in range(1, len(score_sheet)):
  191. if word not in [y for x in score_sheet for y in x]:
  192. return 0
  193. if word == score_sheet[i][0]:
  194. word_score = score_sheet[i][8]
  195. return word_score
  196. def get_title_score(log_type, crawler, stop_sheet, score_sheet, title):
  197. # 获取停用词列表
  198. # while True:
  199. stop_word_list = []
  200. stop_word_sheet = Feishu.get_values_batch(log_type, crawler, stop_sheet)
  201. if stop_word_sheet is None:
  202. return 0
  203. # time.sleep(1)
  204. # continue
  205. for x in stop_word_sheet:
  206. for y in x:
  207. if y is None:
  208. pass
  209. else:
  210. stop_word_list.append(y)
  211. # break
  212. # 文本分词
  213. cut_word_list = []
  214. cut_list = jieba.lcut(title)
  215. for cut_item in cut_list:
  216. if cut_item == " ":
  217. continue
  218. if cut_item in stop_word_list:
  219. continue
  220. cut_word_list.append(cut_item)
  221. # 获取权重分列表
  222. score_list = []
  223. for word in cut_word_list:
  224. word_score = get_word_score(log_type, crawler, score_sheet, word)
  225. score_list.append(word_score)
  226. # 获取标题的权重总分
  227. title_score = sum(score_list)
  228. return title_score
  229. def task_unbind(log_type, crawler, taskid, uids, env):
  230. if env == "dev":
  231. url = "https://testadmin.piaoquantv.com/manager/crawler/v3/task/unbind"
  232. else:
  233. url = "https://admin.piaoquantv.com/manager/crawler/v3/task/unbind"
  234. params = {
  235. "taskId": taskid, # 任务 ID
  236. "uids": uids, # 解绑用户uid(多个英文逗号隔开),例如"3222121,213231"
  237. "operator": "" # 默认 system
  238. }
  239. Common.logger(log_type, crawler).info(f"url:{url}")
  240. Common.logging(log_type, crawler, env, f"url:{url}")
  241. Common.logger(log_type, crawler).info(f"params:{params}")
  242. Common.logging(log_type, crawler, env, f"params:{params}")
  243. response = requests.post(url=url, json=params)
  244. Common.logger(log_type, crawler).info(f"task_unbind_response:{response.text}")
  245. Common.logging(log_type, crawler, env, f"task_unbind_response:{response.text}")
  246. if response.status_code == 200 and response.json()["code"] == 0:
  247. return "success"
  248. else:
  249. return response.text
  250. if __name__ == "__main__":
  251. print(get_title_score("recommend", "kuaishou", "16QspO", "0usaDk", '像梦一场'))
  252. pass