public.py 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. # -*- coding: utf-8 -*-
  2. # @Author: wangkun
  3. # @Time: 2023/3/27
  4. import os, sys
  5. import time
  6. import random
  7. import difflib
  8. sys.path.append(os.getcwd())
  9. from common.common import Common
  10. from common.scheduling_db import MysqlHelper
  11. # from common import Common
  12. # from scheduling_db import MysqlHelper
  13. def get_user_from_mysql(log_type, crawler, source, env, action=''):
  14. sql = f"select * from crawler_user_v3 where source='{source}' and mode='{log_type}'"
  15. results = MysqlHelper.get_values(log_type, crawler, sql, env, action=action)
  16. if results:
  17. return results
  18. else:
  19. Common.logger(log_type, crawler).warning(f"爬虫:{crawler},没有查到抓取名单")
  20. return []
  21. def title_like(log_type, crawler, platform, title, env):
  22. """
  23. 标题相似度
  24. :param log_type: 日志
  25. :param crawler: 哪款爬虫
  26. :param platform: 爬虫渠道,如:公众号 / 小年糕
  27. :param title: 视频标题
  28. :param env: 环境
  29. :return: 相似度>=80%,返回 True;反之,返回 False
  30. """
  31. select_sql = f""" select video_title from crawler_video where platform="{platform}" """
  32. video_list = MysqlHelper.get_values(log_type, crawler, select_sql, env, action="")
  33. # print(video_list)
  34. if len(video_list) == 0:
  35. return False
  36. for video_dict in video_list:
  37. video_title = video_dict["video_title"]
  38. # print(video_title)
  39. if difflib.SequenceMatcher(None, title, video_title).quick_ratio() >= 0.8:
  40. return True
  41. else:
  42. continue
  43. return False
  44. def get_config_from_mysql(log_type, source, env, text, action=''):
  45. select_sql = f"""select * from crawler_config where source="{source}" """
  46. contents = MysqlHelper.get_values(log_type, source, select_sql, env, action=action)
  47. title_list = []
  48. filter_list = []
  49. emoji_list = []
  50. search_word_list = []
  51. for content in contents:
  52. config = content['config']
  53. config_dict = eval(config)
  54. for k, v in config_dict.items():
  55. if k == "title":
  56. title_list_config = v.split(",")
  57. for title in title_list_config:
  58. title_list.append(title)
  59. if k == "filter":
  60. filter_list_config = v.split(",")
  61. for filter_word in filter_list_config:
  62. filter_list.append(filter_word)
  63. if k == "emoji":
  64. emoji_list_config = v.split(",")
  65. for emoji in emoji_list_config:
  66. emoji_list.append(emoji)
  67. if k == "search_word":
  68. search_word_list_config = v.split(",")
  69. for search_word in search_word_list_config:
  70. search_word_list.append(search_word)
  71. if text == "title":
  72. return title_list
  73. elif text == "filter":
  74. return filter_list
  75. elif text == "emoji":
  76. return emoji_list
  77. elif text == "search_word":
  78. return search_word_list
  79. def random_title(log_type, crawler, env, text):
  80. random_title_list = get_config_from_mysql(log_type, crawler, env, text)
  81. return random.choice(random_title_list)
  82. def task_fun(task_str):
  83. task_str = task_str.replace("'[", '[').replace("]'", ']')
  84. task_dict = dict(eval(task_str))
  85. rule = task_dict['rule']
  86. task_dict['rule'] = dict()
  87. for item in rule:
  88. for k, val in item.items():
  89. task_dict['rule'][k] = val
  90. rule_dict = task_dict['rule']
  91. task_dict = {
  92. "task_dict": task_dict,
  93. "rule_dict": rule_dict
  94. }
  95. return task_dict
  96. def get_consumer(topic_name, group_id):
  97. # 初始化client。
  98. mq_client = MQClient(
  99. # 设置HTTP协议客户端接入点,进入云消息队列 RocketMQ 版控制台实例详情页面的接入点区域查看。
  100. "${HTTP_ENDPOINT}",
  101. # AccessKey ID,阿里云身份验证标识。获取方式,请参见创建AccessKey。
  102. "${ACCESS_KEY}",
  103. # AccessKey Secret,阿里云身份验证密钥。获取方式,请参见创建AccessKey。
  104. "${SECRET_KEY}"
  105. )
  106. # 消息所属的Topic,在云消息队列 RocketMQ 版控制台创建。
  107. # topic_name = "${TOPIC}"
  108. topic_name = f"{topic_name}"
  109. # 您在云消息队列 RocketMQ 版控制台创建的Group ID。
  110. # group_id = "${GROUP_ID}"
  111. group_id = f"{group_id}"
  112. # Topic所属的实例ID,在云消息队列 RocketMQ 版控制台创建。
  113. # 若实例有命名空间,则实例ID必须传入;若实例无命名空间,则实例ID传入空字符串。实例的命名空间可以在云消息队列 RocketMQ 版控制台的实例详情页面查看。
  114. instance_id = "${INSTANCE_ID}"
  115. consumer = mq_client.get_consumer(instance_id, topic_name, group_id)
  116. return consumer
  117. def ack_message(log_type, crawler, recv_msgs, consumer):
  118. # msg.next_consume_time前若不确认消息消费成功,则消息会被重复消费。
  119. # 消息句柄有时间戳,同一条消息每次消费拿到的都不一样。
  120. try:
  121. receipt_handle_list = [msg.receipt_handle for msg in recv_msgs]
  122. consumer.ack_message(receipt_handle_list)
  123. Common.logger(log_type, crawler).info(f"Ack {len(receipt_handle_list)} Message Succeed.\n")
  124. except MQExceptionBase as err:
  125. Common.logger(log_type, crawler).info(f"Ack Message Fail! Exception:{err}\n")
  126. def download_rule(log_type, crawler, video_dict, rule_dict):
  127. """
  128. 下载视频的基本规则
  129. :param log_type: 日志
  130. :param crawler: 哪款爬虫
  131. :param video_dict: 视频信息,字典格式
  132. :param rule_dict: 规则信息,字典格式
  133. :return: 满足规则,返回 True;反之,返回 False
  134. """
  135. # 格式化 video_dict:publish_time_stamp
  136. if "publish_time_stamp" in video_dict.keys():
  137. video_dict["publish_time"] = video_dict["publish_time_stamp"] * 1000
  138. # 格式化 video_dict:period
  139. if "period" not in video_dict.keys() and "publish_time" in video_dict.keys():
  140. video_dict["period"] = int((int(time.time() * 1000) - video_dict["publish_time"]) / (3600 * 24 * 1000))
  141. # 格式化 rule_dict 最大值取值为 0 的问题
  142. for rule_value in rule_dict.values():
  143. if rule_value["max"] == 0:
  144. rule_value["max"] = 999999999999999
  145. # 格式化 rule_dict 有的 key,video_dict 中没有的问题
  146. for rule_key in rule_dict.keys():
  147. if rule_key not in video_dict.keys():
  148. video_dict[rule_key] = int(rule_dict[rule_key]["max"] / 2)
  149. # 比较结果,输出:True / False
  150. for video_key, video_value in video_dict.items():
  151. for rule_key, rule_value in rule_dict.items():
  152. if video_key == rule_key == "period":
  153. result = 0 <= int(video_value) <= int(rule_value["max"])
  154. Common.logger(log_type, crawler).info(f'{video_key}: 0 <= {video_value} <= {rule_value["min"]}, {result}')
  155. elif video_key == rule_key:
  156. result = int(rule_value["min"]) <= int(video_value) <= int(rule_value["max"])
  157. Common.logger(log_type, crawler).info(f'{video_key}: {rule_value["min"]} <= {video_value} <= {rule_value["max"]},{result}')
  158. else:
  159. result = True
  160. if result is False:
  161. return False
  162. else:
  163. continue
  164. return True
  165. if __name__ == "__main__":
  166. # print(filter_word('public', 'xiaoniangao', '小年糕', 'prod'))
  167. # print(get_config_from_mysql('test', 'gongzhonghao', 'prod', 'filter'))
  168. # print(filter_word('test', 'gongzhonghao', '公众号', 'prod'))
  169. # task_str = "[('task_id','11')," \
  170. # "('task_name','小年糕小时榜')," \
  171. # "('source','xiaoniangao')," \
  172. # "('start_time','1681834560000')," \
  173. # "('interval','1'),('mode','hour')," \
  174. # "('rule','[{'duration':{'min':40,'max':0}},{'playCnt':{'min':4000,'max':0}},{'period':{'min':10,'max':0}},{'fans':{'min':0,'max':0}},{'videos':{'min':0,'max':0}},{'like':{'min':0,'max':0}},{'videoWidth':{'min':0,'max':0}},{'videoHeight':{'min':0,'max':0}}]')," \
  175. # "('spider_name','')," \
  176. # "('machine','')," \
  177. # "('status','0')," \
  178. # "('create_time','1681889875288')," \
  179. # "('update_time','1681889904908')," \
  180. # "('operator','王坤')]"
  181. # print(task(task_str))
  182. pass