zhangyong 1 рік тому
батько
коміт
06ce726e95

+ 0 - 0
gongzhongxinhao/__init__.py


+ 0 - 0
gongzhongxinhao/gongzhongxinhao/__init__.py


+ 305 - 0
gongzhongxinhao/gongzhongxinhao/gongzhongxinhao_author.py

@@ -0,0 +1,305 @@
+import datetime
+import json
+import os
+import random
+import sys
+import time
+import uuid
+import requests
+
+from common.feishu import Feishu
+
+sys.path.append(os.getcwd())
+from selenium.webdriver import DesiredCapabilities
+from selenium.webdriver.chrome.service import Service
+from selenium.webdriver.common.by import By
+from selenium import webdriver
+
+from common.mq import MQ
+from common import AliyunLogger, PiaoQuanPipeline
+from datetime import datetime
+
+class GZXHAuthor:
+    """
+    公众新号账号爬虫
+    """
+    def __init__(self, platform, mode, rule_dict, user_dict, env, url_id):
+        self.platform = platform
+        self.mode = mode
+        self.rule_dict = rule_dict
+        self.user_dict = user_dict
+        self.env = env
+        self.url_id = url_id
+        self.download_cnt = 0
+        self.mq = MQ(topic_name="topic_crawler_etl_" + self.env)
+
+    def get_account_videos(self):
+            AliyunLogger.logging(
+                code="1003",
+                platform=self.platform,
+                mode=self.mode,
+                env=self.env,
+                message="开始抓取公众新号: {}".format(self.url_id["name"]),
+            )
+            try:
+                self.get_videoList()
+            except Exception as e:
+                AliyunLogger.logging(
+                    code="3000",
+                    platform=self.platform,
+                    mode=self.mode,
+                    env=self.env,
+                    message=f"抓取公众新号: {self.url_id['name']} 时异常,异常信息: {e}",
+                )
+            AliyunLogger.logging(
+                code="1004",
+                platform=self.platform,
+                mode=self.mode,
+                env=self.env,
+                message="抓取公众新号: {}".format(self.url_id["name"]),
+            )
+
+            # 获取腾讯视频下载链接
+
+    def get_tencent_video_url(self, video_id):
+        url = 'https://vv.video.qq.com/getinfo?vids=' + str(video_id) + '&platform=101001&charge=0&otype=json'
+        response = requests.get(url=url).text.replace('QZOutputJson=', '').replace('"};', '"}')
+        response = json.loads(response)
+        url = response['vl']['vi'][0]['ul']['ui'][0]['url']
+        fvkey = response['vl']['vi'][0]['fvkey']
+        video_url = url + str(video_id) + '.mp4?vkey=' + fvkey
+        return video_url
+
+    def get_video_url(self, article_url):
+        # 打印请求配置
+        ca = DesiredCapabilities.CHROME
+        ca["goog:loggingPrefs"] = {"performance": "ALL"}
+
+        # 不打开浏览器运行
+        chrome_options = webdriver.ChromeOptions()
+        chrome_options.add_argument("headless")
+        chrome_options.add_argument(
+            f"user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.79 Safari/537.36"
+        )
+        chrome_options.add_argument("--no-sandbox")
+
+        # driver初始化
+        if self.env == "prod":
+            driver = webdriver.Chrome(desired_capabilities=ca, options=chrome_options)
+        else:
+            driver = webdriver.Chrome(
+                desired_capabilities=ca,
+                options=chrome_options,
+                service=Service(
+                    "/Users/tzld/Downloads/chromedriver_mac64/chromedriver"
+                ),
+            )
+
+        driver.implicitly_wait(10)
+        driver.get(article_url)
+        time.sleep(1)
+
+        if (
+                len(
+                    driver.find_elements(
+                        By.XPATH, '//div[@class="js_video_poster video_poster"]/*[2]'
+                    )
+                )
+                != 0
+        ):
+            video_url = driver.find_element(
+                By.XPATH, '//div[@class="js_video_poster video_poster"]/*[2]'
+            ).get_attribute("src")
+        elif (
+                len(
+                    driver.find_elements(
+                        By.XPATH, '//span[@class="js_tx_video_container"]/*[1]'
+                    )
+                )
+                != 0
+        ):
+            iframe = driver.find_element(
+                By.XPATH, '//span[@class="js_tx_video_container"]/*[1]'
+            ).get_attribute("src")
+            video_id = iframe.split("vid=")[-1].split("&")[0]
+            video_url = self.get_tencent_video_url(video_id)
+        else:
+            video_url = 0
+        driver.quit()
+        if "mpvideo.qpic.cn" in str(video_url):
+            time.sleep(random.randint(1, 3))
+        return video_url
+
+    def get_wechat_gh(self, link: str):
+        url = "http://8.217.190.241:8888/crawler/wei_xin/account_info"
+        payload = json.dumps({"content_link": link})
+        headers = {'Content-Type': 'application/json'}
+        response = requests.request("POST", url, headers=headers, data=payload).json()
+        if response['code'] == 0:
+            wx_gh = response['data']['data']['wx_gh']
+        return wx_gh
+
+
+
+    # 获取文章列表
+    def get_videoList(self):
+        mq = MQ(topic_name="topic_crawler_etl_" + self.env)
+        time.sleep(1)
+        wechat_gh = self.get_wechat_gh(self.url_id["url"])
+        if None == wechat_gh:
+
+            AliyunLogger.logging(
+                code="2004",
+                platform=self.platform,
+                mode=self.mode,
+                env=self.env,
+                message=f"获取用主页为空{self.url_id['name']}",
+            )
+            return
+        time.sleep(1)
+        url = "http://61.48.133.26:30001/GetGh_Doc"
+        payload = json.dumps({
+            "appid": wechat_gh,
+            "decode": "1"
+        })
+        headers = {
+            'Content-Type': 'application/json'
+        }
+        r = requests.request("POST", url, headers=headers, data=payload)
+        if "list" not in r.json():
+            AliyunLogger.logging(
+                code="2000",
+                platform=self.platform,
+                mode=self.mode,
+                env=self.env,
+                message=f"status_code:{r.status_code}, get_videoList:{r.text}\n",
+            )
+            time.sleep(60 * 15)
+            return
+        if len(r.json()["list"]) == 0:
+
+            AliyunLogger.logging(
+                code="2000",
+                platform=self.platform,
+                mode=self.mode,
+                env=self.env,
+                message="没有更多视频了\n",
+            )
+            return
+        else:
+            user_name = r.json().get("gh_name")
+            app_msg_list = r.json()["list"]
+            for article in app_msg_list:
+                try:
+                    AliyunLogger.logging(
+                        code="1001",
+                        platform=self.platform,
+                        mode=self.mode,
+                        message="扫描到一条视频",
+                        env=self.env,
+                        data=article,
+                    )
+                    repeat_flag = self.process_video_obj(article, user_name)
+                    if not repeat_flag:
+                        return
+                except Exception as e:
+                    AliyunLogger.logging(
+                        code="3000",
+                        platform=self.platform,
+                        mode=self.mode,
+                        env=self.env,
+                        message=f"抓取单条视频异常:{e}\n",
+                    )
+                    return
+
+
+
+    def process_video_obj(self, article, user_name):
+        trace_id = self.platform + str(uuid.uuid1())
+        publish_time_str = article.get("published_time", 0)
+        date_format = "%Y-%m-%d %H:%M:%S"
+        date_time_obj = datetime.strptime(publish_time_str, date_format)
+        publish_time_stamp = int(date_time_obj.timestamp())
+        article_url = article.get("url", "")
+        our_user = random.choice(self.user_dict)
+        video_url = self.get_video_url(article_url)
+        video_dict = {
+            "user_name": user_name,
+            "video_id": article.get("aid", ""),
+            "video_title": article.get("title", "")
+                .replace(" ", "")
+                .replace('"', "")
+                .replace("'", ""),
+            "out_video_id": article.get("aid", ""),
+            "publish_time_stamp": publish_time_stamp,
+            "publish_time_str": publish_time_str,
+            "play_cnt": 0,
+            "comment_cnt": 0,
+            "like_cnt": 0,
+            "share_cnt": 0,
+            "user_id": our_user["uid"],
+            "cover_url": article.get("head_pic", ""),
+            "video_url": video_url,
+            "width": 0,
+            "height": 0,
+            "duration": 0,
+            "platform": self.platform,
+            "strategy": self.mode,
+            "crawler_rule": self.rule_dict,
+            "session": f"gongzhongxinhao-author-{int(time.time())}",
+        }
+
+        AliyunLogger.logging(
+            code="1001",
+            trace_id=trace_id,
+            platform=self.platform,
+            mode=self.mode,
+            env=self.env,
+            message="扫描到一条视频",
+            data=video_dict,
+        )
+        pipeline = PiaoQuanPipeline(
+            platform=self.platform,
+            mode=self.mode,
+            item=video_dict,
+            rule_dict=self.rule_dict,
+            env=self.env,
+            trace_id=trace_id,
+        )
+        if not pipeline.repeat_video():
+            return True
+        else:
+            values = [[
+                user_name,
+                article.get("aid", ""),
+                article.get("title", "")
+                    .replace(" ", "")
+                    .replace('"', "")
+                    .replace("'", ""),
+                publish_time_str,
+                video_url,
+                article.get("head_pic", ""),
+                self.url_id["name"],
+                self.url_id["url"]
+
+            ]]
+            Feishu.insert_columns('gongzhonghao', 'gongzhonghao', "9QU7wE", "ROWS", 1, 2)
+            time.sleep(0.5)
+            Feishu.update_values('gongzhonghao', 'gongzhonghao', "9QU7wE", "A2:Z2", values)
+            video_dict["publish_time"] = video_dict["publish_time_str"]
+            self.mq.send_msg(video_dict)
+            self.download_cnt += 1
+            AliyunLogger.logging(
+                code="1002",
+                platform=self.platform,
+                mode=self.mode,
+                env=self.env,
+                data=video_dict,
+                trace_id=trace_id,
+                message="成功发送 MQ 至 ETL",
+            )
+            time.sleep(5)
+        return True
+
+
+

+ 0 - 0
gongzhongxinhao/gongzhongxinhao_main/__init__.py


+ 185 - 0
gongzhongxinhao/gongzhongxinhao_main/run_gzxh_author.py

@@ -0,0 +1,185 @@
+import argparse
+import random
+from mq_http_sdk.mq_client import *
+from mq_http_sdk.mq_consumer import *
+from mq_http_sdk.mq_exception import MQExceptionBase
+
+
+sys.path.append(os.getcwd())
+from common.public import task_fun_mq, get_consumer, ack_message
+from common.scheduling_db import MysqlHelper
+from common.feishu import Feishu
+from common import AliyunLogger
+from gongzhongxinhao.gongzhongxinhao.gongzhongxinhao_author import GZXHAuthor
+
+
+
+
+def main(log_type, crawler, topic_name, group_id, env):
+    consumer = get_consumer(topic_name, group_id)
+    # 长轮询表示如果Topic没有消息,则客户端请求会在服务端挂起3秒,3秒内如果有消息可以消费则立即返回响应。
+    # 长轮询时间3秒(最多可设置为30秒)。
+    wait_seconds = 30
+    # 一次最多消费3条(最多可设置为16条)。
+    batch = 1
+    AliyunLogger.logging(
+        code="1000",
+        platform=crawler,
+        mode=log_type,
+        env=env,
+        message=f'{10 * "="}Consume And Ack Message From Topic{10 * "="}\n'
+        f"WaitSeconds:{wait_seconds}\n"
+        f"TopicName:{topic_name}\n"
+        f"MQConsumer:{group_id}",
+    )
+    while True:
+        try:
+            # 长轮询消费消息。
+            recv_msgs = consumer.consume_message(batch, wait_seconds)
+            for msg in recv_msgs:
+                AliyunLogger.logging(
+                    code="1000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message=f"Receive\n"
+                    f"MessageId:{msg.message_id}\n"
+                    f"MessageBodyMD5:{msg.message_body_md5}\n"
+                    f"MessageTag:{msg.message_tag}\n"
+                    f"ConsumedTimes:{msg.consumed_times}\n"
+                    f"PublishTime:{msg.publish_time}\n"
+                    f"Body:{msg.message_body}\n"
+                    f"NextConsumeTime:{msg.next_consume_time}\n"
+                    f"ReceiptHandle:{msg.receipt_handle}\n"
+                    f"Properties:{msg.properties}",
+                )
+                # ack_mq_message
+                ack_message(
+                    log_type=log_type,
+                    crawler=crawler,
+                    recv_msgs=recv_msgs,
+                    consumer=consumer,
+                )
+
+                # 解析 task_dict
+                task_dict = task_fun_mq(msg.message_body)["task_dict"]
+                AliyunLogger.logging(
+                    code="1000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message="f调度任务:{task_dict}",
+                )
+                # 解析 rule_dict
+                rule_dict = task_fun_mq(msg.message_body)["rule_dict"]
+                AliyunLogger.logging(
+                    code="1000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message=f"抓取规则:{rule_dict}\n",
+                )
+                audio_type = Feishu.get_values_batch("prod", "gongzhonghao", "QsTym9")
+                url_list = []
+                for row in audio_type[1:]:
+                    name = row[1]
+                    url = row[4]
+                    number = {"name": name, "url": url}
+                    if url:
+                        url_list.append(number)
+
+                # 解析 user_list
+                task_id = task_dict["id"]
+                select_user_sql = (
+                    f"""select * from crawler_user_v3 where task_id={task_id}"""
+                )
+                user_list = MysqlHelper.get_values(
+                    log_type, crawler, select_user_sql, env, action=""
+                )
+                AliyunLogger.logging(
+                    code="1003",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message="开始抓取"
+                )
+                for url_id in url_list:
+                    time.sleep(random.randint(1, 5))
+                    try:
+                        AliyunLogger.logging(
+                            code="1000",
+                            platform=crawler,
+                            mode=log_type,
+                            env=env,
+                            message="开始抓取公众新号{}".format(url_id["name"]),
+                        )
+                        # 初始化
+                        GZXH = GZXHAuthor(
+                            platform=crawler,
+                            mode=log_type,
+                            rule_dict=rule_dict,
+                            user_dict=user_list,
+                            env=env,
+                            url_id=url_id,
+                        )
+                        GZXH.get_account_videos()
+                        AliyunLogger.logging(
+                            code="1000",
+                            platform=crawler,
+                            mode=log_type,
+                            env=env,
+                            message="完成抓取公众新号{}".format(url_id["name"]),
+                        )
+                    except Exception as e:
+                        AliyunLogger.logging(
+                            code="3000",
+                            platform=crawler,
+                            mode=log_type,
+                            env=env,
+                            message="抓取公众新号{}出现问题, 报错为{}".format(url_id["name"], e),
+                        )
+
+
+                AliyunLogger.logging(
+                    code="1004", platform=crawler, mode=log_type, env=env,message="结束一轮抓取"
+                )
+
+        except MQExceptionBase as err:
+            # Topic中没有消息可消费。
+            if err.type == "MessageNotExist":
+                AliyunLogger.logging(
+                    code="2000",
+                    platform=crawler,
+                    mode=log_type,
+                    env=env,
+                    message=f"No new message! RequestId:{err.req_id}\n",
+                )
+                continue
+            AliyunLogger.logging(
+                code="2000",
+                platform=crawler,
+                mode=log_type,
+                env=env,
+                message=f"Consume Message Fail! Exception:{err}\n",
+            )
+
+            time.sleep(2)
+            continue
+
+
+if __name__ == "__main__":
+    parser = argparse.ArgumentParser()  ## 新建参数解释器对象
+    parser.add_argument("--log_type", type=str)  ## 添加参数,注明参数类型
+    parser.add_argument("--crawler")  ## 添加参数
+    parser.add_argument("--topic_name")  ## 添加参数
+    parser.add_argument("--group_id")  ## 添加参数
+    parser.add_argument("--env")  ## 添加参数
+    args = parser.parse_args()  ### 参数赋值,也可以通过终端赋值
+
+    main(
+        log_type=args.log_type,
+        crawler=args.crawler,
+        topic_name=args.topic_name,
+        group_id=args.group_id,
+        env=args.env,
+    )

+ 0 - 0
gongzhongxinhao/logs/__init__.py