shipinhao_author.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384
  1. import os
  2. import json
  3. import random
  4. import sys
  5. import time
  6. import uuid
  7. import datetime
  8. import requests
  9. import cv2
  10. sys.path.append(os.getcwd())
  11. from datetime import datetime
  12. from common.feishu import Feishu
  13. from common import PiaoQuanPipeline, AliyunLogger
  14. from common.db import MysqlHelper
  15. from common.mq import MQ
  16. from common.public import clean_title
  17. def find_target_user(name, user_list):
  18. """
  19. 在搜索到到账号列表中找目标列表
  20. """
  21. for obj in user_list:
  22. if obj['contact']["nickname"] == name:
  23. return obj
  24. else:
  25. continue
  26. return False
  27. class ShiPinHaoAuthor(object):
  28. """
  29. 视频号账号爬虫
  30. """
  31. def __init__(self, platform, mode, rule_dict, user_dict, env):
  32. self.account_name = user_dict["link"]
  33. self.platform = platform
  34. self.mode = mode
  35. self.rule_dict = rule_dict
  36. self.user_dict = user_dict
  37. self.env = env
  38. self.download_cnt = 0
  39. self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)
  40. def get_history_id(self):
  41. """
  42. 从数据库表中读取 id
  43. """
  44. select_user_sql = f"""select name_id from accounts where name = "{self.account_name}" and platform = "{self.platform}" and useful = 1 limit 1"""
  45. name_id = MysqlHelper.get_values(
  46. log_type=self.mode,
  47. crawler=self.platform,
  48. sql=select_user_sql,
  49. env=self.env,
  50. machine="",
  51. )
  52. if name_id:
  53. return name_id[0][0]
  54. else:
  55. return False
  56. def get_account_id(self):
  57. """
  58. 读历史数据,如果存在 id,则直接返回 id
  59. """
  60. history_id = self.get_history_id()
  61. if history_id:
  62. return history_id
  63. else:
  64. url = "http://61.48.133.26:30001/Find_Video_Content"
  65. payload = json.dumps({
  66. "content": self.account_name,
  67. "type": "19"
  68. })
  69. headers = {
  70. 'Content-Type': 'application/json'
  71. }
  72. response = requests.request("POST", url, headers=headers, data=payload)
  73. info_list = response.json()['info_list']
  74. if len(info_list) == 0:
  75. return False
  76. target_user = find_target_user(name=self.account_name, user_list=info_list)
  77. # 写入 MySql 数据库
  78. if target_user:
  79. update_sql = f"""INSERT INTO accounts (name, name_id, platform, useful) values ("{self.account_name}", "{target_user['contact']['username']}", "{self.platform}", 1 )"""
  80. MysqlHelper.update_values(
  81. log_type=self.mode,
  82. crawler=self.platform,
  83. sql=update_sql,
  84. env=self.env,
  85. machine="",
  86. )
  87. return target_user['contact']["username"]
  88. else:
  89. return False
  90. def get_account_videos(self):
  91. account_id = self.get_account_id()
  92. if account_id:
  93. url = "http://61.48.133.26:30001/FinderGetUpMasterNextPage"
  94. last_buffer = ""
  95. for i in range(10):
  96. if self.download_cnt >= int(
  97. self.rule_dict.get("videos_cnt", {}).get("min", 30)
  98. ):
  99. return
  100. headers = {
  101. 'Content-Type': 'application/json'
  102. }
  103. payload = json.dumps({
  104. "username": account_id,
  105. "last_buffer": last_buffer
  106. })
  107. response = requests.request("POST", url, headers=headers, data=payload)
  108. time.sleep(random.randint(1, 5))
  109. if "objectId" not in response.text or response.status_code != 200:
  110. AliyunLogger.logging(
  111. code="2000",
  112. platform=self.platform,
  113. mode=self.mode,
  114. env=self.env,
  115. message="没有更多视频了",
  116. )
  117. return
  118. res_json = response.json()
  119. if len(res_json["UpMasterHomePage"]) == 0:
  120. AliyunLogger.logging(
  121. code="2000",
  122. platform=self.platform,
  123. mode=self.mode,
  124. env=self.env,
  125. message="没有更多视频了",
  126. )
  127. return
  128. if not res_json["UpMasterHomePage"]:
  129. AliyunLogger.logging(
  130. code="2000",
  131. platform=self.platform,
  132. mode=self.mode,
  133. env=self.env,
  134. message="没有更多视频了",
  135. )
  136. return
  137. else:
  138. last_buffer = res_json.get('last_buffer')
  139. count = 0
  140. for obj in res_json["UpMasterHomePage"]:
  141. try:
  142. AliyunLogger.logging(
  143. code="1001",
  144. platform=self.platform,
  145. mode=self.mode,
  146. message="扫描到一条视频",
  147. env=self.env,
  148. data=obj,
  149. )
  150. repeat_flag = self.process_video_obj(obj, count)
  151. count += 1
  152. if not repeat_flag:
  153. return
  154. except Exception as e:
  155. AliyunLogger.logging(
  156. code="3000",
  157. platform=self.platform,
  158. mode=self.mode,
  159. env=self.env,
  160. message=f"抓取单条视频异常:{e}\n",
  161. )
  162. else:
  163. AliyunLogger.logging(
  164. code="3000",
  165. platform=self.platform,
  166. mode=self.mode,
  167. env=self.env,
  168. message="{}\t获取 id 失败".format(self.account_name),
  169. )
  170. def process_video_obj(self, obj, count):
  171. objectId = obj['objectId']
  172. objectNonceId = obj['objectNonceId']
  173. trace_id = self.platform + "new" + str(uuid.uuid1())
  174. url = "http://61.48.133.26:30001/GetFinderDownloadAddress"
  175. payload = json.dumps({
  176. "objectId": objectId,
  177. "objectNonceId": objectNonceId
  178. })
  179. headers = {
  180. 'Content-Type': 'text/plain'
  181. }
  182. response = requests.request("POST", url, headers=headers, data=payload)
  183. time.sleep(random.randint(0, 1))
  184. video_obj = response.json()
  185. publish_time_str = obj['createtime']
  186. datetime_obj = datetime.strptime(publish_time_str, '%Y-%m-%d %H:%M:%S')
  187. # 将datetime对象转换为时间戳
  188. publish_time_stamp = int(datetime_obj.timestamp())
  189. video_url = video_obj.get('DownloadAddress')
  190. duration = int(self.video_duration(video_url))
  191. share_cnt = int(obj['forward_count'])
  192. like_cnt = int(obj['like_count'])
  193. # 获取当前时间
  194. current_time = datetime.now()
  195. formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
  196. video_dict = {
  197. "video_id": objectId,
  198. "video_title": clean_title(video_obj.get('title').split("\n")[0].split("#")[0]),
  199. "out_video_id": trace_id,
  200. "publish_time_stamp": publish_time_stamp,
  201. "publish_time_str": publish_time_str,
  202. "play_cnt": 0,
  203. "fav_count": int(obj['fav_count']),
  204. "comment_cnt": int(obj['comment_count']),
  205. "like_cnt": like_cnt,
  206. "share_cnt": share_cnt,
  207. "user_id": self.user_dict["uid"],
  208. "cover_url": video_obj.get('thumb_url'),
  209. "video_url": video_url,
  210. "avatar_url": video_obj.get('thumb_url'),
  211. "width": video_obj.get('width'),
  212. "height": video_obj.get('height'),
  213. "duration": duration,
  214. "platform": self.platform,
  215. "strategy": self.mode,
  216. "crawler_rule": self.rule_dict,
  217. "session": f"shipinhao-author-{int(time.time())}",
  218. }
  219. if share_cnt == 0:
  220. divisor_cnt = 0
  221. else:
  222. divisor_cnt = int(share_cnt / like_cnt)
  223. # 视频时长小于30秒 返回
  224. if duration < 30:
  225. values = [[
  226. obj['nickname'],
  227. publish_time_str,
  228. formatted_time,
  229. int(obj['fav_count']),
  230. int(obj['comment_count']),
  231. int(obj['like_count']),
  232. int(obj['forward_count']),
  233. divisor_cnt,
  234. video_obj.get('title').split("\n")[0].split("#")[0],
  235. duration,
  236. '否',
  237. '时长小于30秒',
  238. video_obj.get('DownloadAddress')
  239. ]]
  240. Feishu.insert_columns(self.platform, 'shipinhao', "Vq7NeH", "ROWS", 1, 2)
  241. time.sleep(0.5)
  242. Feishu.update_values(self.platform, 'shipinhao', "Vq7NeH", "A2:Z2", values)
  243. return True
  244. # 分享小于1000 返回
  245. if share_cnt < 1000:
  246. values = [[
  247. obj['nickname'],
  248. publish_time_str,
  249. formatted_time,
  250. int(obj['fav_count']),
  251. int(obj['comment_count']),
  252. int(obj['like_count']),
  253. int(obj['forward_count']),
  254. divisor_cnt,
  255. video_obj.get('title').split("\n")[0].split("#")[0],
  256. duration,
  257. '否',
  258. '分享小于1000',
  259. video_obj.get('DownloadAddress')
  260. ]]
  261. Feishu.insert_columns(self.platform, 'shipinhao', "Vq7NeH", "ROWS", 1, 2)
  262. time.sleep(0.5)
  263. Feishu.update_values(self.platform, 'shipinhao', "Vq7NeH", "A2:Z2", values)
  264. return True
  265. # 分享小于等于99999
  266. if share_cnt <= 99999 and divisor_cnt < 2:
  267. values = [[
  268. obj['nickname'],
  269. publish_time_str,
  270. formatted_time,
  271. int(obj['fav_count']),
  272. int(obj['comment_count']),
  273. int(obj['like_count']),
  274. int(obj['forward_count']),
  275. divisor_cnt,
  276. video_obj.get('title').split("\n")[0].split("#")[0],
  277. duration,
  278. '否',
  279. f'分享小于100000,分享/点赞:{divisor_cnt}',
  280. video_obj.get('DownloadAddress')
  281. ]]
  282. Feishu.insert_columns(self.platform, 'shipinhao', "Vq7NeH", "ROWS", 1, 2)
  283. time.sleep(0.5)
  284. Feishu.update_values(self.platform, 'shipinhao', "Vq7NeH", "A2:Z2", values)
  285. return True
  286. pipeline = PiaoQuanPipeline(
  287. platform=self.platform,
  288. mode=self.mode,
  289. item=video_dict,
  290. rule_dict=self.rule_dict,
  291. env=self.env,
  292. trace_id=trace_id,
  293. )
  294. if not pipeline.repeat_video():
  295. values = [[
  296. obj['nickname'],
  297. publish_time_str,
  298. formatted_time,
  299. int(obj['fav_count']),
  300. int(obj['comment_count']),
  301. int(obj['like_count']),
  302. int(obj['forward_count']),
  303. divisor_cnt,
  304. video_obj.get('title').split("\n")[0].split("#")[0],
  305. duration,
  306. '否',
  307. '重复视频',
  308. video_obj.get('DownloadAddress')
  309. ]]
  310. Feishu.insert_columns(self.platform, 'shipinhao', "Vq7NeH", "ROWS", 1, 2)
  311. time.sleep(0.5)
  312. Feishu.update_values(self.platform, 'shipinhao', "Vq7NeH", "A2:Z2", values)
  313. if count > 3:
  314. return False
  315. else:
  316. return True
  317. else:
  318. values = [[
  319. obj['nickname'],
  320. publish_time_str,
  321. formatted_time,
  322. int(obj['fav_count']),
  323. int(obj['comment_count']),
  324. int(obj['like_count']),
  325. int(obj['forward_count']),
  326. divisor_cnt,
  327. video_obj.get('title').split("\n")[0].split("#")[0],
  328. duration,
  329. '是',
  330. '',
  331. video_obj.get('DownloadAddress')
  332. ]]
  333. Feishu.insert_columns(self.platform, 'shipinhao', "Vq7NeH", "ROWS", 1, 2)
  334. time.sleep(0.5)
  335. Feishu.update_values(self.platform, 'shipinhao', "Vq7NeH", "A2:Z2", values)
  336. video_dict["publish_time"] = video_dict["publish_time_str"]
  337. self.mq.send_msg(video_dict)
  338. self.download_cnt += 1
  339. AliyunLogger.logging(
  340. code="1002",
  341. platform=self.platform,
  342. mode=self.mode,
  343. env=self.env,
  344. data=video_dict,
  345. trace_id=trace_id,
  346. message="成功发送 MQ 至 ETL",
  347. )
  348. time.sleep(5)
  349. return True
  350. def video_duration(self, filename):
  351. cap = cv2.VideoCapture(filename)
  352. if cap.isOpened():
  353. rate = cap.get(5)
  354. frame_num = cap.get(7)
  355. duration = frame_num / rate
  356. return duration
  357. return 0
  358. if __name__ == "__main__":
  359. SP = ShiPinHaoAuthor(
  360. platform="shipinhao",
  361. mode="author",
  362. user_dict={"uid": "123456", "link": "老碗哥说文解惑", "user_id": "1234565"},
  363. rule_dict={},
  364. env="prod",
  365. )
  366. SP.get_account_videos()