shipinhao_author.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394
  1. import os
  2. import json
  3. import random
  4. import sys
  5. import time
  6. import uuid
  7. import datetime
  8. import requests
  9. import cv2
  10. sys.path.append(os.getcwd())
  11. from datetime import datetime
  12. from common.feishu import Feishu
  13. from common import PiaoQuanPipeline, AliyunLogger
  14. from common.db import MysqlHelper
  15. from common.mq import MQ
  16. from common.public import clean_title
  17. from common.limit import AuthorLimit
  18. def find_target_user(name, user_list):
  19. """
  20. 在搜索到到账号列表中找目标列表
  21. """
  22. for obj in user_list:
  23. if obj['contact']["nickname"] == name:
  24. return obj
  25. else:
  26. continue
  27. return False
  28. class ShiPinHaoAuthor(object):
  29. """
  30. 视频号账号爬虫
  31. """
  32. def __init__(self, platform, mode, rule_dict, user_dict, env):
  33. self.account_name = user_dict["link"]
  34. self.platform = platform
  35. self.mode = mode
  36. self.rule_dict = rule_dict
  37. self.user_dict = user_dict
  38. self.env = env
  39. self.download_cnt = 0
  40. self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)
  41. self.limiter = AuthorLimit(platform=self.platform, mode=self.mode)
  42. def get_history_id(self):
  43. """
  44. 从数据库表中读取 id
  45. """
  46. select_user_sql = f"""select name_id from accounts where name = "{self.account_name}" and platform = "{self.platform}" and useful = 1 limit 1"""
  47. name_id = MysqlHelper.get_values(
  48. log_type=self.mode,
  49. crawler=self.platform,
  50. sql=select_user_sql,
  51. env=self.env,
  52. machine="",
  53. )
  54. if name_id:
  55. return name_id[0][0]
  56. else:
  57. return False
  58. def get_account_id(self):
  59. """
  60. 读历史数据,如果存在 id,则直接返回 id
  61. """
  62. history_id = self.get_history_id()
  63. if history_id:
  64. return history_id
  65. else:
  66. url = "http://61.48.133.26:30001/Find_Video_Content"
  67. payload = json.dumps({
  68. "content": self.account_name,
  69. "type": "19"
  70. })
  71. headers = {
  72. 'Content-Type': 'application/json'
  73. }
  74. response = requests.request("POST", url, headers=headers, data=payload)
  75. info_list = response.json()['info_list']
  76. if len(info_list) == 0:
  77. return False
  78. target_user = find_target_user(name=self.account_name, user_list=info_list)
  79. # 写入 MySql 数据库
  80. if target_user:
  81. update_sql = f"""INSERT INTO accounts (name, name_id, platform, useful) values ("{self.account_name}", "{target_user['contact']['username']}", "{self.platform}", 1 )"""
  82. MysqlHelper.update_values(
  83. log_type=self.mode,
  84. crawler=self.platform,
  85. sql=update_sql,
  86. env=self.env,
  87. machine="",
  88. )
  89. return target_user['contact']["username"]
  90. else:
  91. return False
  92. def get_account_videos(self):
  93. if self.download_cnt >= int(
  94. self.rule_dict.get("videos_cnt", {}).get("min", 300)
  95. ):
  96. return
  97. account_id = self.get_account_id()
  98. if account_id:
  99. url = "http://61.48.133.26:30001/FinderGetUpMasterNextPage"
  100. last_buffer = ""
  101. for i in range(10):
  102. if self.download_cnt >= int(
  103. self.rule_dict.get("videos_cnt", {}).get("min", 30)
  104. ):
  105. return
  106. headers = {
  107. 'Content-Type': 'application/json'
  108. }
  109. payload = json.dumps({
  110. "username": account_id,
  111. "last_buffer": last_buffer
  112. })
  113. response = requests.request("POST", url, headers=headers, data=payload)
  114. time.sleep(random.randint(1, 5))
  115. if "objectId" not in response.text or response.status_code != 200:
  116. AliyunLogger.logging(
  117. code="2000",
  118. platform=self.platform,
  119. mode=self.mode,
  120. env=self.env,
  121. message="没有更多视频了",
  122. )
  123. return
  124. res_json = response.json()
  125. if len(res_json["UpMasterHomePage"]) == 0:
  126. AliyunLogger.logging(
  127. code="2000",
  128. platform=self.platform,
  129. mode=self.mode,
  130. env=self.env,
  131. message="没有更多视频了",
  132. )
  133. return
  134. if not res_json["UpMasterHomePage"]:
  135. AliyunLogger.logging(
  136. code="2000",
  137. platform=self.platform,
  138. mode=self.mode,
  139. env=self.env,
  140. message="没有更多视频了",
  141. )
  142. return
  143. else:
  144. last_buffer = res_json.get('last_buffer')
  145. count = 0
  146. for obj in res_json["UpMasterHomePage"]:
  147. try:
  148. AliyunLogger.logging(
  149. code="1001",
  150. platform=self.platform,
  151. mode=self.mode,
  152. message="扫描到一条视频",
  153. env=self.env,
  154. data=obj,
  155. )
  156. repeat_flag = self.process_video_obj(obj, count)
  157. count += 1
  158. if not repeat_flag:
  159. return
  160. except Exception as e:
  161. AliyunLogger.logging(
  162. code="3000",
  163. platform=self.platform,
  164. mode=self.mode,
  165. env=self.env,
  166. message=f"抓取单条视频异常:{e}\n",
  167. )
  168. else:
  169. AliyunLogger.logging(
  170. code="3000",
  171. platform=self.platform,
  172. mode=self.mode,
  173. env=self.env,
  174. message="{}\t获取 id 失败".format(self.account_name),
  175. )
  176. def process_video_obj(self, obj, count):
  177. objectId = obj['objectId']
  178. objectNonceId = obj['objectNonceId']
  179. trace_id = self.platform + "new" + str(uuid.uuid1())
  180. url = "http://61.48.133.26:30001/GetFinderDownloadAddress"
  181. payload = json.dumps({
  182. "objectId": objectId,
  183. "objectNonceId": objectNonceId
  184. })
  185. headers = {
  186. 'Content-Type': 'text/plain'
  187. }
  188. response = requests.request("POST", url, headers=headers, data=payload)
  189. time.sleep(random.randint(0, 1))
  190. video_obj = response.json()
  191. publish_time_str = obj['createtime']
  192. datetime_obj = datetime.strptime(publish_time_str, '%Y-%m-%d %H:%M:%S')
  193. # 将datetime对象转换为时间戳
  194. publish_time_stamp = int(datetime_obj.timestamp())
  195. video_url = video_obj.get('DownloadAddress')
  196. duration = int(self.video_duration(video_url))
  197. share_cnt = int(obj['forward_count'])
  198. like_cnt = int(obj['like_count'])
  199. # 获取当前时间
  200. current_time = datetime.now()
  201. formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
  202. video_dict = {
  203. "video_id": objectId,
  204. "video_title": clean_title(video_obj.get('title').split("\n")[0].split("#")[0]),
  205. "out_video_id": objectId,
  206. "publish_time_stamp": publish_time_stamp,
  207. "publish_time_str": publish_time_str,
  208. "play_cnt": 0,
  209. "fav_count": int(obj['fav_count']),
  210. "comment_cnt": int(obj['comment_count']),
  211. "like_cnt": like_cnt,
  212. "share_cnt": share_cnt,
  213. "user_id": self.user_dict["uid"],
  214. "cover_url": video_obj.get('thumb_url'),
  215. "video_url": video_url,
  216. "avatar_url": video_obj.get('thumb_url'),
  217. "width": video_obj.get('width'),
  218. "height": video_obj.get('height'),
  219. "duration": duration,
  220. "platform": self.platform,
  221. "strategy": self.mode,
  222. "crawler_rule": self.rule_dict,
  223. "session": f"shipinhao-author-{int(time.time())}",
  224. }
  225. fav_count = int(obj['fav_count'])
  226. if fav_count == 0:
  227. fav_count = 1
  228. divisor_cnt = 0
  229. else:
  230. divisor_cnt = int(share_cnt / fav_count)
  231. # 视频时长小于30秒 返回
  232. if duration < 20:
  233. values = [[
  234. obj['nickname'],
  235. publish_time_str,
  236. formatted_time,
  237. int(obj['fav_count']),
  238. int(obj['comment_count']),
  239. int(obj['like_count']),
  240. int(obj['forward_count']),
  241. divisor_cnt,
  242. video_obj.get('title').split("\n")[0].split("#")[0],
  243. duration,
  244. '否',
  245. '时长小于30秒',
  246. video_obj.get('DownloadAddress')
  247. ]]
  248. Feishu.insert_columns(self.platform, 'shipinhao', "Vq7NeH", "ROWS", 1, 2)
  249. time.sleep(0.5)
  250. Feishu.update_values(self.platform, 'shipinhao', "Vq7NeH", "A2:Z2", values)
  251. return True
  252. # 分享小于1000 返回
  253. if share_cnt < 500:
  254. values = [[
  255. obj['nickname'],
  256. publish_time_str,
  257. formatted_time,
  258. int(obj['fav_count']),
  259. int(obj['comment_count']),
  260. int(obj['like_count']),
  261. int(obj['forward_count']),
  262. divisor_cnt,
  263. video_obj.get('title').split("\n")[0].split("#")[0],
  264. duration,
  265. '否',
  266. '分享小于500',
  267. video_obj.get('DownloadAddress')
  268. ]]
  269. Feishu.insert_columns(self.platform, 'shipinhao', "Vq7NeH", "ROWS", 1, 2)
  270. time.sleep(0.5)
  271. Feishu.update_values(self.platform, 'shipinhao', "Vq7NeH", "A2:Z2", values)
  272. return True
  273. # 分享小于等于99999
  274. if share_cnt <= 99999 and divisor_cnt < 1:
  275. values = [[
  276. obj['nickname'],
  277. publish_time_str,
  278. formatted_time,
  279. int(obj['fav_count']),
  280. int(obj['comment_count']),
  281. int(obj['like_count']),
  282. int(obj['forward_count']),
  283. divisor_cnt,
  284. video_obj.get('title').split("\n")[0].split("#")[0],
  285. duration,
  286. '否',
  287. f'分享小于100000,分享/大拇指:{divisor_cnt}',
  288. video_obj.get('DownloadAddress')
  289. ]]
  290. Feishu.insert_columns(self.platform, 'shipinhao', "Vq7NeH", "ROWS", 1, 2)
  291. time.sleep(0.5)
  292. Feishu.update_values(self.platform, 'shipinhao', "Vq7NeH", "A2:Z2", values)
  293. return True
  294. pipeline = PiaoQuanPipeline(
  295. platform=self.platform,
  296. mode=self.mode,
  297. item=video_dict,
  298. rule_dict=self.rule_dict,
  299. env=self.env,
  300. trace_id=trace_id,
  301. )
  302. if not pipeline.repeat_video():
  303. values = [[
  304. obj['nickname'],
  305. publish_time_str,
  306. formatted_time,
  307. int(obj['fav_count']),
  308. int(obj['comment_count']),
  309. int(obj['like_count']),
  310. int(obj['forward_count']),
  311. divisor_cnt,
  312. video_obj.get('title').split("\n")[0].split("#")[0],
  313. duration,
  314. '否',
  315. '重复视频',
  316. video_obj.get('DownloadAddress')
  317. ]]
  318. Feishu.insert_columns(self.platform, 'shipinhao', "Vq7NeH", "ROWS", 1, 2)
  319. time.sleep(0.5)
  320. Feishu.update_values(self.platform, 'shipinhao', "Vq7NeH", "A2:Z2", values)
  321. if count > 3:
  322. return False
  323. else:
  324. return True
  325. else:
  326. values = [[
  327. obj['nickname'],
  328. publish_time_str,
  329. formatted_time,
  330. int(obj['fav_count']),
  331. int(obj['comment_count']),
  332. int(obj['like_count']),
  333. int(obj['forward_count']),
  334. divisor_cnt,
  335. video_obj.get('title').split("\n")[0].split("#")[0],
  336. duration,
  337. '是',
  338. '',
  339. video_obj.get('DownloadAddress')
  340. ]]
  341. Feishu.insert_columns(self.platform, 'shipinhao', "Vq7NeH", "ROWS", 1, 2)
  342. time.sleep(0.5)
  343. Feishu.update_values(self.platform, 'shipinhao', "Vq7NeH", "A2:Z2", values)
  344. video_dict["publish_time"] = video_dict["publish_time_str"]
  345. limit_flag = self.limiter.author_limitation(user_id=video_dict['user_id'])
  346. if limit_flag:
  347. self.mq.send_msg(video_dict)
  348. self.download_cnt += 1
  349. AliyunLogger.logging(
  350. code="1002",
  351. platform=self.platform,
  352. mode=self.mode,
  353. env=self.env,
  354. data=video_dict,
  355. trace_id=trace_id,
  356. message="成功发送 MQ 至 ETL",
  357. )
  358. time.sleep(5)
  359. return True
  360. def video_duration(self, filename):
  361. cap = cv2.VideoCapture(filename)
  362. if cap.isOpened():
  363. rate = cap.get(5)
  364. frame_num = cap.get(7)
  365. duration = frame_num / rate
  366. return duration
  367. return 0
  368. if __name__ == "__main__":
  369. SP = ShiPinHaoAuthor(
  370. platform="shipinhao",
  371. mode="author",
  372. user_dict={"uid": "123456", "link": "老赵讲育儿", "user_id": "1234565"},
  373. rule_dict={},
  374. env="prod",
  375. )
  376. SP.get_account_videos()