public.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. # -*- coding: utf-8 -*-
  2. # @Author: wangkun
  3. # @Time: 2023/3/27
  4. import requests
  5. from mq_http_sdk.mq_client import *
  6. from mq_http_sdk.mq_exception import MQExceptionBase
  7. from sklearn.feature_extraction.text import TfidfVectorizer
  8. from sklearn.metrics.pairwise import cosine_similarity
  9. import os, sys, jieba
  10. import time
  11. import random
  12. import difflib
  13. sys.path.append(os.getcwd())
  14. from common.common import Common
  15. from common.feishu import Feishu
  16. from common.scheduling_db import MysqlHelper
  17. # from common import Common
  18. # from feishu import Feishu
  19. # from scheduling_db import MysqlHelper
  20. def get_user_from_mysql(log_type, crawler, source, env, action=''):
  21. sql = f"select * from crawler_user_v3 where source='{source}' and mode='{log_type}'"
  22. results = MysqlHelper.get_values(log_type, crawler, sql, env, action=action)
  23. if results:
  24. return results
  25. else:
  26. Common.logger(log_type, crawler).warning(f"爬虫:{crawler},没有查到抓取名单")
  27. return []
  28. def similarity(title1, title2):
  29. # 分词
  30. seg1 = jieba.lcut(title1)
  31. seg2 = jieba.lcut(title2)
  32. # 构建TF-IDF向量
  33. tfidf_vectorizer = TfidfVectorizer()
  34. tfidf_matrix = tfidf_vectorizer.fit_transform(["".join(seg1), "".join(seg2)])
  35. # 计算余弦相似度
  36. similar = cosine_similarity(tfidf_matrix[0], tfidf_matrix[1])[0][0]
  37. return similar
  38. def title_like(log_type, crawler, platform, title, env):
  39. """
  40. 标题相似度
  41. :param log_type: 日志
  42. :param crawler: 哪款爬虫
  43. :param platform: 爬虫渠道,如:公众号 / 小年糕
  44. :param title: 视频标题
  45. :param env: 环境
  46. :return: 相似度>=80%,返回 True;反之,返回 False
  47. """
  48. select_sql = f""" select video_title from crawler_video where platform="{platform}" """
  49. video_list = MysqlHelper.get_values(log_type, crawler, select_sql, env, action="")
  50. # print(video_list)
  51. if len(video_list) == 0:
  52. return False
  53. for video_dict in video_list:
  54. video_title = video_dict["video_title"]
  55. # print(video_title)
  56. if difflib.SequenceMatcher(None, title, video_title).quick_ratio() >= 0.8:
  57. return True
  58. else:
  59. continue
  60. return False
  61. def get_config_from_mysql(log_type, source, env, text, action=''):
  62. select_sql = f"""select * from crawler_config where source="{source}" """
  63. contents = MysqlHelper.get_values(log_type, source, select_sql, env, action=action)
  64. title_list = []
  65. filter_list = []
  66. emoji_list = []
  67. search_word_list = []
  68. for content in contents:
  69. config = content['config']
  70. config_dict = eval(config)
  71. for k, v in config_dict.items():
  72. if k == "title":
  73. title_list_config = v.split(",")
  74. for title in title_list_config:
  75. title_list.append(title)
  76. if k == "filter":
  77. filter_list_config = v.split(",")
  78. for filter_word in filter_list_config:
  79. filter_list.append(filter_word)
  80. if k == "emoji":
  81. emoji_list_config = v.split(",")
  82. for emoji in emoji_list_config:
  83. emoji_list.append(emoji)
  84. if k == "search_word":
  85. search_word_list_config = v.split(",")
  86. for search_word in search_word_list_config:
  87. search_word_list.append(search_word)
  88. if text == "title":
  89. return title_list
  90. elif text == "filter":
  91. return filter_list
  92. elif text == "emoji":
  93. return emoji_list
  94. elif text == "search_word":
  95. return search_word_list
  96. def get_rule_from_mysql(task_id, log_type, crawler, env):
  97. select_rule_sql = f"""select rule from crawler_task_v3 where id={task_id}"""
  98. rule_list = MysqlHelper.get_values(log_type, crawler, select_rule_sql, env, action="")
  99. return json.loads(rule_list[0]['rule'])
  100. def random_title(log_type, crawler, env, text):
  101. random_title_list = get_config_from_mysql(log_type, crawler, env, text)
  102. return random.choice(random_title_list)
  103. def task_fun(task_str):
  104. task_str = task_str.replace("'[", '[').replace("]'", ']')
  105. task_dict = dict(eval(task_str))
  106. rule = task_dict['rule']
  107. task_dict['rule'] = dict()
  108. for item in rule:
  109. for k, val in item.items():
  110. task_dict['rule'][k] = val
  111. rule_dict = task_dict['rule']
  112. task_dict = {
  113. "task_dict": task_dict,
  114. "rule_dict": rule_dict
  115. }
  116. return task_dict
  117. def task_fun_mq(task_str):
  118. task_str = task_str.replace('"[', '[').replace(']"', ']').replace('\\', '')
  119. task_dict = dict(eval(task_str))
  120. rule = task_dict['rule']
  121. task_dict['rule'] = dict()
  122. for item in rule:
  123. for k, val in item.items():
  124. task_dict['rule'][k] = val
  125. rule_dict = task_dict['rule']
  126. task_dict = {
  127. "task_dict": task_dict,
  128. "rule_dict": rule_dict
  129. }
  130. return task_dict
  131. def get_consumer(topic_name, group_id):
  132. # 初始化client。
  133. mq_client = MQClient(
  134. # 设置HTTP协议客户端接入点,进入云消息队列 RocketMQ 版控制台实例详情页面的接入点区域查看。
  135. "http://1894469520484605.mqrest.cn-qingdao-public.aliyuncs.com",
  136. # AccessKey ID,阿里云身份验证标识。获取方式,请参见创建AccessKey。
  137. "LTAI4G7puhXtLyHzHQpD6H7A",
  138. # AccessKey Secret,阿里云身份验证密钥。获取方式,请参见创建AccessKey。
  139. "nEbq3xWNQd1qLpdy2u71qFweHkZjSG"
  140. )
  141. # 消息所属的Topic,在云消息队列 RocketMQ 版控制台创建。
  142. # topic_name = "${TOPIC}"
  143. topic_name = str(topic_name)
  144. # 您在云消息队列 RocketMQ 版控制台创建的Group ID。
  145. # group_id = "${GROUP_ID}"
  146. group_id = str(group_id)
  147. # Topic所属的实例ID,在云消息队列 RocketMQ 版控制台创建。
  148. # 若实例有命名空间,则实例ID必须传入;若实例无命名空间,则实例ID传入空字符串。实例的命名空间可以在云消息队列 RocketMQ 版控制台的实例详情页面查看。
  149. instance_id = "MQ_INST_1894469520484605_BXhXuzkZ"
  150. consumer = mq_client.get_consumer(instance_id, topic_name, group_id)
  151. return consumer
  152. def ack_message(log_type, crawler, recv_msgs, consumer):
  153. # msg.next_consume_time前若不确认消息消费成功,则消息会被重复消费。
  154. # 消息句柄有时间戳,同一条消息每次消费拿到的都不一样。
  155. try:
  156. receipt_handle_list = [msg.receipt_handle for msg in recv_msgs]
  157. consumer.ack_message(receipt_handle_list)
  158. Common.logger(log_type, crawler).info(f"Ack {len(receipt_handle_list)} Message Succeed.\n")
  159. except MQExceptionBase as err:
  160. Common.logger(log_type, crawler).info(f"Ack Message Fail! Exception:{err}\n")
  161. def download_rule(log_type, crawler, video_dict, rule_dict):
  162. """
  163. 下载视频的基本规则
  164. :param log_type: 日志
  165. :param crawler: 哪款爬虫
  166. :param video_dict: 视频信息,字典格式
  167. :param rule_dict: 规则信息,字典格式
  168. :return: 满足规则,返回 True;反之,返回 False
  169. """
  170. # 格式化 video_dict:publish_time_stamp
  171. if "publish_time_stamp" in video_dict.keys():
  172. video_dict["publish_time"] = video_dict["publish_time_stamp"] * 1000
  173. # 格式化 video_dict:period
  174. if "period" not in video_dict.keys() and "publish_time" in video_dict.keys():
  175. video_dict["period"] = int((int(time.time() * 1000) - video_dict["publish_time"]) / (3600 * 24 * 1000))
  176. # 格式化 rule_dict 最大值取值为 0 的问题
  177. for rule_value in rule_dict.values():
  178. if rule_value["max"] == 0:
  179. rule_value["max"] = 999999999999999
  180. # 格式化 rule_dict 有的 key,video_dict 中没有的问题
  181. for rule_key in rule_dict.keys():
  182. if rule_key not in video_dict.keys():
  183. video_dict[rule_key] = int(rule_dict[rule_key]["max"] / 2)
  184. # 比较结果,输出:True / False
  185. for video_key, video_value in video_dict.items():
  186. for rule_key, rule_value in rule_dict.items():
  187. if video_key == rule_key == "period":
  188. result = 0 <= int(video_value) <= int(rule_value["max"])
  189. Common.logger(log_type, crawler).info(
  190. f'{video_key}: 0 <= {video_value} <= {rule_value["min"]}, {result}')
  191. elif video_key == rule_key:
  192. result = int(rule_value["min"]) <= int(video_value) <= int(rule_value["max"])
  193. Common.logger(log_type, crawler).info(
  194. f'{video_key}: {rule_value["min"]} <= {video_value} <= {rule_value["max"]},{result}')
  195. else:
  196. result = True
  197. if result is False:
  198. return False
  199. else:
  200. continue
  201. return True
  202. def get_word_score(log_type, crawler, score_sheet, word):
  203. while True:
  204. score_sheet = Feishu.get_values_batch(log_type, crawler, score_sheet)
  205. if score_sheet is None:
  206. time.sleep(1)
  207. continue
  208. for i in range(1, len(score_sheet)):
  209. if word not in [y for x in score_sheet for y in x]:
  210. return 0
  211. if word == score_sheet[i][0]:
  212. word_score = score_sheet[i][8]
  213. return word_score
  214. def get_title_score(log_type, crawler, stop_sheet, score_sheet, title):
  215. # 获取停用词列表
  216. # while True:
  217. stop_word_list = []
  218. stop_word_sheet = Feishu.get_values_batch(log_type, crawler, stop_sheet)
  219. if stop_word_sheet is None:
  220. return 0
  221. # time.sleep(1)
  222. # continue
  223. for x in stop_word_sheet:
  224. for y in x:
  225. if y is None:
  226. pass
  227. else:
  228. stop_word_list.append(y)
  229. # break
  230. # 文本分词
  231. cut_word_list = []
  232. cut_list = jieba.lcut(title)
  233. for cut_item in cut_list:
  234. if cut_item == " ":
  235. continue
  236. if cut_item in stop_word_list:
  237. continue
  238. cut_word_list.append(cut_item)
  239. # 获取权重分列表
  240. score_list = []
  241. for word in cut_word_list:
  242. word_score = get_word_score(log_type, crawler, score_sheet, word)
  243. score_list.append(word_score)
  244. # 获取标题的权重总分
  245. title_score = sum(score_list)
  246. return title_score
  247. def task_unbind(log_type, crawler, taskid, uids, env):
  248. if env == "dev":
  249. url = "https://testadmin.piaoquantv.com/manager/crawler/v3/task/unbind"
  250. else:
  251. url = "https://admin.piaoquantv.com/manager/crawler/v3/task/unbind"
  252. params = {
  253. "taskId": taskid, # 任务 ID
  254. "uids": uids, # 解绑用户uid(多个英文逗号隔开),例如"3222121,213231"
  255. "operator": "" # 默认 system
  256. }
  257. Common.logger(log_type, crawler).info(f"url:{url}")
  258. Common.logging(log_type, crawler, env, f"url:{url}")
  259. Common.logger(log_type, crawler).info(f"params:{params}")
  260. Common.logging(log_type, crawler, env, f"params:{params}")
  261. response = requests.post(url=url, json=params)
  262. Common.logger(log_type, crawler).info(f"task_unbind_response:{response.text}")
  263. Common.logging(log_type, crawler, env, f"task_unbind_response:{response.text}")
  264. if response.status_code == 200 and response.json()["code"] == 0:
  265. return "success"
  266. else:
  267. return response.text
  268. if __name__ == "__main__":
  269. print(get_title_score("recommend", "kuaishou", "16QspO", "0usaDk", '像梦一场'))
  270. pass