public.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299
  1. # -*- coding: utf-8 -*-
  2. # @Author: wangkun
  3. # @Time: 2023/3/27
  4. import requests
  5. from mq_http_sdk.mq_client import *
  6. from mq_http_sdk.mq_exception import MQExceptionBase
  7. from sklearn.feature_extraction.text import TfidfVectorizer
  8. from sklearn.metrics.pairwise import cosine_similarity
  9. import os, sys, jieba
  10. import time
  11. import random
  12. import difflib
  13. sys.path.append(os.getcwd())
  14. from common.common import Common
  15. from common.feishu import Feishu
  16. from common.scheduling_db import MysqlHelper
  17. # from common import Common
  18. # from feishu import Feishu
  19. # from scheduling_db import MysqlHelper
  20. def get_user_from_mysql(log_type, crawler, source, env, action=''):
  21. sql = f"select * from crawler_user_v3 where source='{source}' and mode='{log_type}'"
  22. results = MysqlHelper.get_values(log_type, crawler, sql, env, action=action)
  23. if results:
  24. return results
  25. else:
  26. Common.logger(log_type, crawler).warning(f"爬虫:{crawler},没有查到抓取名单")
  27. return []
  28. def similarity(title1, title2):
  29. # 分词
  30. seg1 = jieba.lcut(title1)
  31. seg2 = jieba.lcut(title2)
  32. # 构建TF-IDF向量
  33. tfidf_vectorizer = TfidfVectorizer()
  34. tfidf_matrix = tfidf_vectorizer.fit_transform(["".join(seg1), "".join(seg2)])
  35. # 计算余弦相似度
  36. similar = cosine_similarity(tfidf_matrix[0], tfidf_matrix[1])[0][0]
  37. return similar
  38. def title_like(log_type, crawler, platform, title, env):
  39. """
  40. 标题相似度
  41. :param log_type: 日志
  42. :param crawler: 哪款爬虫
  43. :param platform: 爬虫渠道,如:公众号 / 小年糕
  44. :param title: 视频标题
  45. :param env: 环境
  46. :return: 相似度>=80%,返回 True;反之,返回 False
  47. """
  48. select_sql = f""" select video_title from crawler_video where platform="{platform}" """
  49. video_list = MysqlHelper.get_values(log_type, crawler, select_sql, env, action="")
  50. # print(video_list)
  51. if len(video_list) == 0:
  52. return False
  53. for video_dict in video_list:
  54. video_title = video_dict["video_title"]
  55. # print(video_title)
  56. if difflib.SequenceMatcher(None, title, video_title).quick_ratio() >= 0.8:
  57. return True
  58. else:
  59. continue
  60. return False
  61. def get_config_from_mysql(log_type, source, env, text, action=''):
  62. select_sql = f"""select * from crawler_config where source="{source}" """
  63. contents = MysqlHelper.get_values(log_type, source, select_sql, env, action=action)
  64. title_list = []
  65. filter_list = []
  66. emoji_list = []
  67. search_word_list = []
  68. for content in contents:
  69. config = content['config']
  70. config_dict = eval(config)
  71. for k, v in config_dict.items():
  72. if k == "title":
  73. title_list_config = v.split(",")
  74. for title in title_list_config:
  75. title_list.append(title)
  76. if k == "filter":
  77. filter_list_config = v.split(",")
  78. for filter_word in filter_list_config:
  79. filter_list.append(filter_word)
  80. if k == "emoji":
  81. emoji_list_config = v.split(",")
  82. for emoji in emoji_list_config:
  83. emoji_list.append(emoji)
  84. if k == "search_word":
  85. search_word_list_config = v.split(",")
  86. for search_word in search_word_list_config:
  87. search_word_list.append(search_word)
  88. if text == "title":
  89. return title_list
  90. elif text == "filter":
  91. return filter_list
  92. elif text == "emoji":
  93. return emoji_list
  94. elif text == "search_word":
  95. return search_word_list
  96. def random_title(log_type, crawler, env, text):
  97. random_title_list = get_config_from_mysql(log_type, crawler, env, text)
  98. return random.choice(random_title_list)
  99. def task_fun(task_str):
  100. task_str = task_str.replace("'[", '[').replace("]'", ']')
  101. task_dict = dict(eval(task_str))
  102. rule = task_dict['rule']
  103. task_dict['rule'] = dict()
  104. for item in rule:
  105. for k, val in item.items():
  106. task_dict['rule'][k] = val
  107. rule_dict = task_dict['rule']
  108. task_dict = {
  109. "task_dict": task_dict,
  110. "rule_dict": rule_dict
  111. }
  112. return task_dict
  113. def task_fun_mq(task_str):
  114. task_str = task_str.replace('"[', '[').replace(']"', ']').replace('\\', '')
  115. task_dict = dict(eval(task_str))
  116. rule = task_dict['rule']
  117. task_dict['rule'] = dict()
  118. for item in rule:
  119. for k, val in item.items():
  120. task_dict['rule'][k] = val
  121. rule_dict = task_dict['rule']
  122. task_dict = {
  123. "task_dict": task_dict,
  124. "rule_dict": rule_dict
  125. }
  126. return task_dict
  127. def get_consumer(topic_name, group_id):
  128. # 初始化client。
  129. mq_client = MQClient(
  130. # 设置HTTP协议客户端接入点,进入云消息队列 RocketMQ 版控制台实例详情页面的接入点区域查看。
  131. "http://1894469520484605.mqrest.cn-qingdao-public.aliyuncs.com",
  132. # AccessKey ID,阿里云身份验证标识。获取方式,请参见创建AccessKey。
  133. "LTAI4G7puhXtLyHzHQpD6H7A",
  134. # AccessKey Secret,阿里云身份验证密钥。获取方式,请参见创建AccessKey。
  135. "nEbq3xWNQd1qLpdy2u71qFweHkZjSG"
  136. )
  137. # 消息所属的Topic,在云消息队列 RocketMQ 版控制台创建。
  138. # topic_name = "${TOPIC}"
  139. topic_name = str(topic_name)
  140. # 您在云消息队列 RocketMQ 版控制台创建的Group ID。
  141. # group_id = "${GROUP_ID}"
  142. group_id = str(group_id)
  143. # Topic所属的实例ID,在云消息队列 RocketMQ 版控制台创建。
  144. # 若实例有命名空间,则实例ID必须传入;若实例无命名空间,则实例ID传入空字符串。实例的命名空间可以在云消息队列 RocketMQ 版控制台的实例详情页面查看。
  145. instance_id = "MQ_INST_1894469520484605_BXhXuzkZ"
  146. consumer = mq_client.get_consumer(instance_id, topic_name, group_id)
  147. return consumer
  148. def ack_message(log_type, crawler, recv_msgs, consumer):
  149. # msg.next_consume_time前若不确认消息消费成功,则消息会被重复消费。
  150. # 消息句柄有时间戳,同一条消息每次消费拿到的都不一样。
  151. try:
  152. receipt_handle_list = [msg.receipt_handle for msg in recv_msgs]
  153. consumer.ack_message(receipt_handle_list)
  154. Common.logger(log_type, crawler).info(f"Ack {len(receipt_handle_list)} Message Succeed.\n")
  155. except MQExceptionBase as err:
  156. Common.logger(log_type, crawler).info(f"Ack Message Fail! Exception:{err}\n")
  157. def download_rule(log_type, crawler, video_dict, rule_dict):
  158. """
  159. 下载视频的基本规则
  160. :param log_type: 日志
  161. :param crawler: 哪款爬虫
  162. :param video_dict: 视频信息,字典格式
  163. :param rule_dict: 规则信息,字典格式
  164. :return: 满足规则,返回 True;反之,返回 False
  165. """
  166. # 格式化 video_dict:publish_time_stamp
  167. if "publish_time_stamp" in video_dict.keys():
  168. video_dict["publish_time"] = video_dict["publish_time_stamp"] * 1000
  169. # 格式化 video_dict:period
  170. if "period" not in video_dict.keys() and "publish_time" in video_dict.keys():
  171. video_dict["period"] = int((int(time.time() * 1000) - video_dict["publish_time"]) / (3600 * 24 * 1000))
  172. # 格式化 rule_dict 最大值取值为 0 的问题
  173. for rule_value in rule_dict.values():
  174. if rule_value["max"] == 0:
  175. rule_value["max"] = 999999999999999
  176. # 格式化 rule_dict 有的 key,video_dict 中没有的问题
  177. for rule_key in rule_dict.keys():
  178. if rule_key not in video_dict.keys():
  179. video_dict[rule_key] = int(rule_dict[rule_key]["max"] / 2)
  180. # 比较结果,输出:True / False
  181. for video_key, video_value in video_dict.items():
  182. for rule_key, rule_value in rule_dict.items():
  183. if video_key == rule_key == "period":
  184. result = 0 <= int(video_value) <= int(rule_value["max"])
  185. Common.logger(log_type, crawler).info(f'{video_key}: 0 <= {video_value} <= {rule_value["min"]}, {result}')
  186. elif video_key == rule_key:
  187. result = int(rule_value["min"]) <= int(video_value) <= int(rule_value["max"])
  188. Common.logger(log_type, crawler).info(f'{video_key}: {rule_value["min"]} <= {video_value} <= {rule_value["max"]},{result}')
  189. else:
  190. result = True
  191. if result is False:
  192. return False
  193. else:
  194. continue
  195. return True
  196. def get_word_score(log_type, crawler, score_sheet, word):
  197. while True:
  198. score_sheet = Feishu.get_values_batch(log_type, crawler, score_sheet)
  199. if score_sheet is None:
  200. time.sleep(1)
  201. continue
  202. for i in range(1, len(score_sheet)):
  203. if word not in [y for x in score_sheet for y in x]:
  204. return 0
  205. if word == score_sheet[i][0]:
  206. word_score = score_sheet[i][8]
  207. return word_score
  208. def get_title_score(log_type, crawler, stop_sheet, score_sheet, title):
  209. # 获取停用词列表
  210. # while True:
  211. stop_word_list = []
  212. stop_word_sheet = Feishu.get_values_batch(log_type, crawler, stop_sheet)
  213. if stop_word_sheet is None:
  214. return 0
  215. # time.sleep(1)
  216. # continue
  217. for x in stop_word_sheet:
  218. for y in x:
  219. if y is None:
  220. pass
  221. else:
  222. stop_word_list.append(y)
  223. # break
  224. # 文本分词
  225. cut_word_list = []
  226. cut_list = jieba.lcut(title)
  227. for cut_item in cut_list:
  228. if cut_item == " ":
  229. continue
  230. if cut_item in stop_word_list:
  231. continue
  232. cut_word_list.append(cut_item)
  233. # 获取权重分列表
  234. score_list = []
  235. for word in cut_word_list:
  236. word_score = get_word_score(log_type, crawler, score_sheet, word)
  237. score_list.append(word_score)
  238. # 获取标题的权重总分
  239. title_score = sum(score_list)
  240. return title_score
  241. def task_unbind(log_type, crawler, taskid, uids, env):
  242. if env == "dev":
  243. url = "https://testadmin.piaoquantv.com/manager/crawler/v3/task/unbind"
  244. else:
  245. url = "https://admin.piaoquantv.com/manager/crawler/v3/task/unbind"
  246. params = {
  247. "taskId": taskid, # 任务 ID
  248. "uids": uids, # 解绑用户uid(多个英文逗号隔开),例如"3222121,213231"
  249. "operator": "" # 默认 system
  250. }
  251. Common.logger(log_type, crawler).info(f"url:{url}")
  252. Common.logging(log_type, crawler, env, f"url:{url}")
  253. Common.logger(log_type, crawler).info(f"params:{params}")
  254. Common.logging(log_type, crawler, env, f"params:{params}")
  255. response = requests.post(url=url, json=params)
  256. Common.logger(log_type, crawler).info(f"task_unbind_response:{response.text}")
  257. Common.logging(log_type, crawler, env, f"task_unbind_response:{response.text}")
  258. if response.status_code == 200 and response.json()["code"] == 0:
  259. return "success"
  260. else:
  261. return response.text
  262. if __name__ == "__main__":
  263. print(get_title_score("recommend", "kuaishou", "16QspO", "0usaDk", '像梦一场'))
  264. pass