public.py 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. # -*- coding: utf-8 -*-
  2. # @Author: wangkun
  3. # @Time: 2023/3/27
  4. from mq_http_sdk.mq_client import *
  5. from mq_http_sdk.mq_exception import MQExceptionBase
  6. import os, sys, jieba
  7. import time
  8. import random
  9. import difflib
  10. sys.path.append(os.getcwd())
  11. from common.common import Common
  12. from common.feishu import Feishu
  13. from common.scheduling_db import MysqlHelper
  14. # from common import Common
  15. # from feishu import Feishu
  16. # from scheduling_db import MysqlHelper
  17. def get_user_from_mysql(log_type, crawler, source, env, action=''):
  18. sql = f"select * from crawler_user_v3 where source='{source}' and mode='{log_type}'"
  19. results = MysqlHelper.get_values(log_type, crawler, sql, env, action=action)
  20. if results:
  21. return results
  22. else:
  23. Common.logger(log_type, crawler).warning(f"爬虫:{crawler},没有查到抓取名单")
  24. return []
  25. def title_like(log_type, crawler, platform, title, env):
  26. """
  27. 标题相似度
  28. :param log_type: 日志
  29. :param crawler: 哪款爬虫
  30. :param platform: 爬虫渠道,如:公众号 / 小年糕
  31. :param title: 视频标题
  32. :param env: 环境
  33. :return: 相似度>=80%,返回 True;反之,返回 False
  34. """
  35. select_sql = f""" select video_title from crawler_video where platform="{platform}" """
  36. video_list = MysqlHelper.get_values(log_type, crawler, select_sql, env, action="")
  37. # print(video_list)
  38. if len(video_list) == 0:
  39. return False
  40. for video_dict in video_list:
  41. video_title = video_dict["video_title"]
  42. # print(video_title)
  43. if difflib.SequenceMatcher(None, title, video_title).quick_ratio() >= 0.8:
  44. return True
  45. else:
  46. continue
  47. return False
  48. def get_config_from_mysql(log_type, source, env, text, action=''):
  49. select_sql = f"""select * from crawler_config where source="{source}" """
  50. contents = MysqlHelper.get_values(log_type, source, select_sql, env, action=action)
  51. title_list = []
  52. filter_list = []
  53. emoji_list = []
  54. search_word_list = []
  55. for content in contents:
  56. config = content['config']
  57. config_dict = eval(config)
  58. for k, v in config_dict.items():
  59. if k == "title":
  60. title_list_config = v.split(",")
  61. for title in title_list_config:
  62. title_list.append(title)
  63. if k == "filter":
  64. filter_list_config = v.split(",")
  65. for filter_word in filter_list_config:
  66. filter_list.append(filter_word)
  67. if k == "emoji":
  68. emoji_list_config = v.split(",")
  69. for emoji in emoji_list_config:
  70. emoji_list.append(emoji)
  71. if k == "search_word":
  72. search_word_list_config = v.split(",")
  73. for search_word in search_word_list_config:
  74. search_word_list.append(search_word)
  75. if text == "title":
  76. return title_list
  77. elif text == "filter":
  78. return filter_list
  79. elif text == "emoji":
  80. return emoji_list
  81. elif text == "search_word":
  82. return search_word_list
  83. def random_title(log_type, crawler, env, text):
  84. random_title_list = get_config_from_mysql(log_type, crawler, env, text)
  85. return random.choice(random_title_list)
  86. def task_fun(task_str):
  87. task_str = task_str.replace("'[", '[').replace("]'", ']')
  88. task_dict = dict(eval(task_str))
  89. rule = task_dict['rule']
  90. task_dict['rule'] = dict()
  91. for item in rule:
  92. for k, val in item.items():
  93. task_dict['rule'][k] = val
  94. rule_dict = task_dict['rule']
  95. task_dict = {
  96. "task_dict": task_dict,
  97. "rule_dict": rule_dict
  98. }
  99. return task_dict
  100. def task_fun_mq(task_str):
  101. task_str = task_str.replace('"[', '[').replace(']"', ']').replace('\\', '')
  102. task_dict = dict(eval(task_str))
  103. rule = task_dict['rule']
  104. task_dict['rule'] = dict()
  105. for item in rule:
  106. for k, val in item.items():
  107. task_dict['rule'][k] = val
  108. rule_dict = task_dict['rule']
  109. task_dict = {
  110. "task_dict": task_dict,
  111. "rule_dict": rule_dict
  112. }
  113. return task_dict
  114. def get_consumer(topic_name, group_id):
  115. # 初始化client。
  116. mq_client = MQClient(
  117. # 设置HTTP协议客户端接入点,进入云消息队列 RocketMQ 版控制台实例详情页面的接入点区域查看。
  118. "http://1894469520484605.mqrest.cn-qingdao-public.aliyuncs.com",
  119. # AccessKey ID,阿里云身份验证标识。获取方式,请参见创建AccessKey。
  120. "LTAI4G7puhXtLyHzHQpD6H7A",
  121. # AccessKey Secret,阿里云身份验证密钥。获取方式,请参见创建AccessKey。
  122. "nEbq3xWNQd1qLpdy2u71qFweHkZjSG"
  123. )
  124. # 消息所属的Topic,在云消息队列 RocketMQ 版控制台创建。
  125. # topic_name = "${TOPIC}"
  126. topic_name = str(topic_name)
  127. # 您在云消息队列 RocketMQ 版控制台创建的Group ID。
  128. # group_id = "${GROUP_ID}"
  129. group_id = str(group_id)
  130. # Topic所属的实例ID,在云消息队列 RocketMQ 版控制台创建。
  131. # 若实例有命名空间,则实例ID必须传入;若实例无命名空间,则实例ID传入空字符串。实例的命名空间可以在云消息队列 RocketMQ 版控制台的实例详情页面查看。
  132. instance_id = "MQ_INST_1894469520484605_BXhXuzkZ"
  133. consumer = mq_client.get_consumer(instance_id, topic_name, group_id)
  134. return consumer
  135. def ack_message(log_type, crawler, recv_msgs, consumer):
  136. # msg.next_consume_time前若不确认消息消费成功,则消息会被重复消费。
  137. # 消息句柄有时间戳,同一条消息每次消费拿到的都不一样。
  138. try:
  139. receipt_handle_list = [msg.receipt_handle for msg in recv_msgs]
  140. consumer.ack_message(receipt_handle_list)
  141. Common.logger(log_type, crawler).info(f"Ack {len(receipt_handle_list)} Message Succeed.\n")
  142. except MQExceptionBase as err:
  143. Common.logger(log_type, crawler).info(f"Ack Message Fail! Exception:{err}\n")
  144. def download_rule(log_type, crawler, video_dict, rule_dict):
  145. """
  146. 下载视频的基本规则
  147. :param log_type: 日志
  148. :param crawler: 哪款爬虫
  149. :param video_dict: 视频信息,字典格式
  150. :param rule_dict: 规则信息,字典格式
  151. :return: 满足规则,返回 True;反之,返回 False
  152. """
  153. # 格式化 video_dict:publish_time_stamp
  154. if "publish_time_stamp" in video_dict.keys():
  155. video_dict["publish_time"] = video_dict["publish_time_stamp"] * 1000
  156. # 格式化 video_dict:period
  157. if "period" not in video_dict.keys() and "publish_time" in video_dict.keys():
  158. video_dict["period"] = int((int(time.time() * 1000) - video_dict["publish_time"]) / (3600 * 24 * 1000))
  159. # 格式化 rule_dict 最大值取值为 0 的问题
  160. for rule_value in rule_dict.values():
  161. if rule_value["max"] == 0:
  162. rule_value["max"] = 999999999999999
  163. # 格式化 rule_dict 有的 key,video_dict 中没有的问题
  164. for rule_key in rule_dict.keys():
  165. if rule_key not in video_dict.keys():
  166. video_dict[rule_key] = int(rule_dict[rule_key]["max"] / 2)
  167. # 比较结果,输出:True / False
  168. for video_key, video_value in video_dict.items():
  169. for rule_key, rule_value in rule_dict.items():
  170. if video_key == rule_key == "period":
  171. result = 0 <= int(video_value) <= int(rule_value["max"])
  172. Common.logger(log_type, crawler).info(f'{video_key}: 0 <= {video_value} <= {rule_value["min"]}, {result}')
  173. elif video_key == rule_key:
  174. result = int(rule_value["min"]) <= int(video_value) <= int(rule_value["max"])
  175. Common.logger(log_type, crawler).info(f'{video_key}: {rule_value["min"]} <= {video_value} <= {rule_value["max"]},{result}')
  176. else:
  177. result = True
  178. if result is False:
  179. return False
  180. else:
  181. continue
  182. return True
  183. def get_word_score(log_type, crawler, score_sheet, word):
  184. while True:
  185. score_sheet = Feishu.get_values_batch(log_type, crawler, score_sheet)
  186. if score_sheet is None:
  187. time.sleep(1)
  188. continue
  189. for i in range(1, len(score_sheet)):
  190. if word not in [y for x in score_sheet for y in x]:
  191. return 0
  192. if word == score_sheet[i][0]:
  193. word_score = score_sheet[i][8]
  194. return word_score
  195. def get_title_score(log_type, crawler, stop_sheet, score_sheet, title):
  196. # 获取停用词列表
  197. # while True:
  198. stop_word_list = []
  199. stop_word_sheet = Feishu.get_values_batch(log_type, crawler, stop_sheet)
  200. if stop_word_sheet is None:
  201. return 0
  202. # time.sleep(1)
  203. # continue
  204. for x in stop_word_sheet:
  205. for y in x:
  206. if y is None:
  207. pass
  208. else:
  209. stop_word_list.append(y)
  210. # break
  211. # 文本分词
  212. cut_word_list = []
  213. cut_list = jieba.lcut(title)
  214. for cut_item in cut_list:
  215. if cut_item == " ":
  216. continue
  217. if cut_item in stop_word_list:
  218. continue
  219. cut_word_list.append(cut_item)
  220. # 获取权重分列表
  221. score_list = []
  222. for word in cut_word_list:
  223. word_score = get_word_score(log_type, crawler, score_sheet, word)
  224. score_list.append(word_score)
  225. # 获取标题的权重总分
  226. title_score = sum(score_list)
  227. return title_score
  228. if __name__ == "__main__":
  229. print(get_title_score("recommend", "kuaishou", "16QspO", "0usaDk", '像梦一场'))
  230. pass