luojunhui 5 дней назад
Родитель
Сommit
03a850644b

+ 2 - 0
app/infra/external/__init__.py

@@ -4,6 +4,7 @@ from .apollo import AsyncApolloApi
 from .feishu import FeishuBotApi
 from .feishu import FeishuBotApi
 from .feishu import FeishuSheetApi
 from .feishu import FeishuSheetApi
 from .elastic_search import AsyncElasticSearchClient
 from .elastic_search import AsyncElasticSearchClient
+from .odps_service import OdpsService
 
 
 feishu_robot = FeishuBotApi()
 feishu_robot = FeishuBotApi()
 feishu_sheet = FeishuSheetApi()
 feishu_sheet = FeishuSheetApi()
@@ -15,4 +16,5 @@ __all__ = [
     "fetch_deepseek_completion",
     "fetch_deepseek_completion",
     "log",
     "log",
     "AsyncElasticSearchClient",
     "AsyncElasticSearchClient",
+    "OdpsService",
 ]
 ]

+ 39 - 0
app/infra/external/odps_service.py

@@ -0,0 +1,39 @@
+import asyncio
+from odps import ODPS
+
+
+class OdpsService:
+    def __init__(self, access_id, secret_access_key, project, endpoint):
+        self.odps_client = ODPS(access_id, secret_access_key, project, endpoint)
+
+    async def execute_odps_query(self, query: str) -> bool:
+        loop = asyncio.get_running_loop()
+
+        def _execute():
+            instance = self.odps_client.execute_sql(
+                query,
+                hints={"odps.sql.submit.mode": "script"}
+            )
+            instance.wait_for_success()
+
+        try:
+            await loop.run_in_executor(None, _execute)
+            return True
+        except Exception as e:
+            print(f"[ODPS ERROR] {e}")
+            return False
+
+    async def read_from_odps(self, query: str):
+        loop = asyncio.get_running_loop()
+
+        def _read():
+            with self.odps_client.execute_sql(query).open_reader() as reader:
+                if reader:
+                    return [item for item in reader]
+                return []
+
+        try:
+            return await loop.run_in_executor(None, _read)
+        except Exception as e:
+            print(f"[ODPS READ ERROR] {e}")
+            return []

+ 44 - 15
app/recommend/offline_recommend/core.py

@@ -1,4 +1,4 @@
-from typing import Dict, Set, List
+from typing import Dict, Set, List, Tuple
 
 
 from app.core.config import GlobalConfigSettings
 from app.core.config import GlobalConfigSettings
 from app.core.database import DatabaseManager
 from app.core.database import DatabaseManager
@@ -6,11 +6,12 @@ from app.core.observability import LogService
 
 
 from app.infra.mapper import LongArticleDatabaseMapper
 from app.infra.mapper import LongArticleDatabaseMapper
 from app.infra.mapper import PiaoquanCrawlerDatabaseMapper
 from app.infra.mapper import PiaoquanCrawlerDatabaseMapper
-from app.infra.shared.tools import init_odps_client
+from app.infra.external import OdpsService
 
 
 from app.recommend.offline_recommend.strategy import I2I
 from app.recommend.offline_recommend.strategy import I2I
 from app.recommend.offline_recommend.strategy import GetTopArticleStrategy
 from app.recommend.offline_recommend.strategy import GetTopArticleStrategy
 from app.recommend.offline_recommend.utils import RecommendApolloClient
 from app.recommend.offline_recommend.utils import RecommendApolloClient
+from app.recommend.offline_recommend.utils import ProduceBaseData
 
 
 
 
 class BaseOffRecommendUtils:
 class BaseOffRecommendUtils:
@@ -21,15 +22,12 @@ class BaseOffRecommendUtils:
     ):
     ):
         self.pool = pool
         self.pool = pool
         self.recommend_apollo_client = RecommendApolloClient(config=config)
         self.recommend_apollo_client = RecommendApolloClient(config=config)
-        self.odps_client = init_odps_client()
-
-    # read from odps
-    async def read_from_odps(self, query: str) -> List:
-        with self.odps_client.execute_sql(query).open_reader() as reader:
-            if reader:
-                return [item for item in reader]
-            else:
-                return []
+        self.odps_client = OdpsService(
+            access_id="LTAIWYUujJAm7CbH",
+            secret_access_key="RfSjdiWwED1sGFlsjXv0DlfTnZTG1P",
+            endpoint="http://service.cn.maxcompute.aliyun.com/api",
+            project="loghubods",
+        )
 
 
     # 获取全局过滤标题
     # 获取全局过滤标题
     async def get_global_filter_title(self) -> Set[str]:
     async def get_global_filter_title(self) -> Set[str]:
@@ -57,10 +55,39 @@ class BaseOffRecommendUtils:
             case _:
             case _:
                 query = I2I.batch_base(title_list)
                 query = I2I.batch_base(title_list)
 
 
-        recommend_articles = await self.read_from_odps(query)
+        recommend_articles = await self.odps_client.read_from_odps(query)
         return recommend_articles
         return recommend_articles
 
 
 
 
+class BaseOfflineDataProduce(BaseOffRecommendUtils):
+    """
+    构建实验离线数据
+    """
+
+    def __init__(
+        self,
+        pool: DatabaseManager,
+        config: GlobalConfigSettings,
+        log_service: LogService,
+    ):
+        super().__init__(pool, config)
+        self.pool = pool
+        self.log_service = log_service
+
+    async def produce_article_data(self, account_tuple: Tuple[str]):
+        query = ProduceBaseData.article_unionid_mapper(account_tuple)
+        print(query)
+        await self.odps_client.execute_odps_query(query)
+
+    async def produce_title_data(self):
+        query2 = ProduceBaseData.title_unionid_mapper()
+        await self.odps_client.execute_odps_query(query2)
+
+    async def produce_i2i_table(self, dt: str):
+        query = ProduceBaseData.i2i_mapper(dt=dt)
+        await self.odps_client.execute_odps_query(query)
+
+
 class BaseOfflineRecommend(BaseOffRecommendUtils):
 class BaseOfflineRecommend(BaseOffRecommendUtils):
     def __init__(
     def __init__(
         self,
         self,
@@ -134,13 +161,15 @@ class BaseOfflineRecommend(BaseOffRecommendUtils):
         match strategy:
         match strategy:
             case "v1":
             case "v1":
                 odps_query = GetTopArticleStrategy.strategy_v1(account_name)
                 odps_query = GetTopArticleStrategy.strategy_v1(account_name)
-                top_articles = await self.read_from_odps(odps_query)
+                top_articles = await self.odps_client.read_from_odps(odps_query)
                 top_titles = [i.title for i in top_articles]
                 top_titles = [i.title for i in top_articles]
 
 
             case "base":
             case "base":
                 mysql_query = GetTopArticleStrategy.base()
                 mysql_query = GetTopArticleStrategy.base()
-                top_articles = await LongArticleDatabaseMapper.Recommend.get_top_articles(
-                    pool=self.pool, query=mysql_query, gh_id=gh_id
+                top_articles = (
+                    await LongArticleDatabaseMapper.Recommend.get_top_articles(
+                        pool=self.pool, query=mysql_query, gh_id=gh_id
+                    )
                 )
                 )
                 top_titles = [i["title"] for i in top_articles]
                 top_titles = [i["title"] for i in top_articles]
 
 

+ 2 - 1
app/recommend/offline_recommend/utils/__init__.py

@@ -1,4 +1,5 @@
+from .produce_data import ProduceBaseData
 from .recommend_apollo import RecommendApolloClient
 from .recommend_apollo import RecommendApolloClient
 
 
 
 
-__all__ = ["RecommendApolloClient"]
+__all__ = ["RecommendApolloClient", "ProduceBaseData"]

+ 162 - 0
app/recommend/offline_recommend/utils/produce_data.py

@@ -0,0 +1,162 @@
+from typing import Tuple
+
+
+class ProduceBaseData:
+    @staticmethod
+    def article_unionid_mapper(account_name_tuple: Tuple[str]) -> str:
+        odps_query = f"""
+        -- wx_sn粒度表
+        CREATE TABLE IF NOT EXISTS article_union_id_mapper
+        (
+            publish_date     STRING COMMENT '发布日期 yyyymmdd'
+            ,accountname     STRING COMMENT '账号名'
+            ,show_view_count BIGINT COMMENT '展示/阅读量'
+            ,title           STRING COMMENT '文章标题'
+            ,wx_sn           STRING COMMENT '微信序列号'
+            ,root_source_id  STRING COMMENT 'root_source_id'
+            ,union_id        STRING COMMENT '微信 union_id'
+        )
+        COMMENT '文章与 union_id 映射表'
+        ;
+        
+        -- title 粒度表
+        CREATE TABLE IF NOT EXISTS title_union_id_mapper
+        (
+            title     STRING COMMENT '文章标题'
+            ,union_id STRING COMMENT '微信 union_id'
+        )
+        COMMENT '标题与 union_id 映射表'
+        ;
+        
+        -- 文章基础信息表
+        WITH article_table AS 
+        (
+            SELECT  TO_CHAR(FROM_UNIXTIME(publish_timestamp),'yyyymmdd') AS publish_date
+                    ,accountname
+                    ,show_view_count
+                    ,title
+                    ,wx_sn
+                    ,root_source_id
+            FROM    loghubods.official_articles_v2_hour
+            LATERAL VIEW EXPLODE(SPLIT(REGEXP_REPLACE(root_source_id_list,'\\[|\\]|"',''),',')) tmp AS root_source_id
+            WHERE   accountname IN {account_name_tuple}
+            AND     itemindex = 1
+            AND     dt = MAX_PT('loghubods.official_articles_v2_hour')
+        ) -- 文章对应的 root_source_id + publish_date,用于 JOIN 替代 IN,并做分区裁剪
+        ,article_root_sources AS 
+        (
+            SELECT  root_source_id
+                    ,publish_date
+            FROM    article_table
+            GROUP BY root_source_id
+                     ,publish_date
+        ) -- openid 和 root_source_id 的映射表(用 JOIN 替代 IN,只扫文章相关分区)
+        ,first_level_openid_table AS 
+        (
+            SELECT  SUBSTRING_INDEX(t.machinecode,'weixin_openid_',-1) AS openid
+                    ,t.rootsourceid
+                    ,t.dt AS publish_date
+            FROM    loghubods.ods_user_active_log_info_day t
+            INNER JOIN article_root_sources ars
+            ON      t.rootsourceid = ars.root_source_id
+            AND     t.dt = ars.publish_date
+            WHERE   t.user_share_depth = 0
+        ) -- openid_unionid 映射表
+        ,open_union_id_mapper AS 
+        (
+            SELECT  open_id
+                    ,union_id
+            FROM    loghubods.user_wechat_identity_info_ha
+            WHERE   dt = MAX_PT('loghubods.user_wechat_identity_info_ha')
+        )
+        ,final_table AS 
+        (
+            SELECT  t1.publish_date
+                    ,t1.accountname
+                    ,t1.show_view_count
+                    ,t1.title
+                    ,t1.wx_sn
+                    ,t1.root_source_id
+                    ,t2.openid
+                    ,t3.union_id
+            FROM    article_table t1
+            INNER JOIN first_level_openid_table t2
+            ON      t1.root_source_id = t2.rootsourceid
+            AND     t1.publish_date = t2.publish_date
+            INNER JOIN open_union_id_mapper t3
+            ON      t2.openid = t3.open_id
+        )
+        INSERT OVERWRITE TABLE article_union_id_mapper
+        SELECT  DISTINCT publish_date
+                ,accountname
+                ,show_view_count
+                ,title
+                ,CAST(wx_sn AS STRING)
+                ,root_source_id
+                ,union_id
+        FROM    final_table
+        ;
+        """
+        return odps_query
+
+    @staticmethod
+    def title_unionid_mapper():
+        odps_query = """
+            INSERT OVERWRITE TABLE title_union_id_mapper
+            SELECT  DISTINCT title
+                    ,union_id
+            FROM    article_union_id_mapper
+        ;
+        """
+        return odps_query
+
+    @staticmethod
+    def i2i_mapper(dt):
+        odps_query = f"""
+        CREATE TABLE IF NOT EXISTS i2i_table
+        (
+            source_title                STRING
+            ,associated_title           STRING
+            ,association_count          BIGINT
+            ,associated_title_uid_count BIGINT
+            ,uid_coverage_rate          DOUBLE
+        )
+        PARTITIONED BY (dt STRING)
+        ;
+        INSERT OVERWRITE TABLE i2i_table PARTITION (dt = '{dt}')
+        WITH src_title_user AS
+        (
+            SELECT  DISTINCT title AS 源标题
+                    ,union_id
+            FROM    loghubods.title_union_id_mapper
+        )
+        ,co_occur AS
+        (
+            SELECT  s.源标题
+                    ,t.title AS 联想标题
+                    ,COUNT(*) AS 联想次数
+            FROM    src_title_user s
+            JOIN    loghubods.title_union_id_mapper t
+            ON      s.union_id = t.union_id
+            WHERE   s.源标题 != t.title
+            GROUP BY s.源标题
+                     ,t.title
+        )
+        ,title_uid_cnt AS
+        (
+            SELECT  title
+                    ,COUNT(DISTINCT union_id) AS 联想标题_uid数量
+            FROM    loghubods.title_union_id_mapper
+            GROUP BY title
+        )
+        SELECT  c.源标题 AS source_title
+                ,c.联想标题 AS associated_title
+                ,c.联想次数 AS association_count
+                ,COALESCE(u.联想标题_uid数量, 0) AS associated_title_uid_count
+                ,CAST(c.联想次数 AS DOUBLE) / (COALESCE(u.联想标题_uid数量, 0) + 10000) AS uid_coverage_rate
+        FROM    co_occur c
+        LEFT JOIN title_uid_cnt u
+        ON      c.联想标题 = u.title
+        ;
+        """
+        return odps_query