Ver Fonte

把小时级数据写入 mysql

罗俊辉 há 1 ano atrás
pai
commit
964eda1c2c
3 ficheiros alterados com 117 adições e 49 exclusões
  1. 17 10
      functions/mysql.py
  2. 100 30
      read_data_from_odps.py
  3. 0 9
      test.py

+ 17 - 10
functions/mysql.py

@@ -94,17 +94,24 @@ class MySQLClientSpider(object):
         data = cursor.fetchall()
         return data
 
+    def update(self, sql):
+        """
+        插入
+        :param sql:
+        :return:
+        """
+        cursor = self.connection.cursor()
+        try:
+            res = cursor.execute(sql)
+            self.connection.commit()
+            return res
+        except Exception as e:
+            print("error", len(sql))
+            print(e)
+            self.connection.rollback()
+
     def close(self):
         """
         关闭连接
         """
-        self.connection.close()
-
-
-M = MySQLClientSpider()
-video_id = "14126697"
-sql = f"""SELECT like_cnt, play_cnt, duration from crawler_video where video_id = '{video_id}';"""
-w = M.select(sql)
-print(w[0])
-a, b, c = w[0]
-print(a, b, c)
+        self.connection.close()

+ 100 - 30
read_data_from_odps.py

@@ -13,6 +13,7 @@ from concurrent.futures import ThreadPoolExecutor
 sys.path.append(os.getcwd())
 
 from functions import PyODPS, generate_hourly_strings, generate_daily_strings
+from functions import MysqlClient, MySQLClientSpider
 
 
 class VideoDataGenerator(object):
@@ -22,6 +23,23 @@ class VideoDataGenerator(object):
 
     def __init__(self):
         self.oo = PyODPS()
+        # self.v_mysql = MysqlClient()
+        self.spider_mysql = MySQLClientSpider()
+
+    def insert_into_database(self, dt):
+        """
+        把小时级数据插入 MySQL 数据库
+        :return:
+        """
+        data_list = self.get_hour_data(dt)
+        for obj in tqdm(data_list):
+            insert_sql = f"""
+                INSERT INTO lightgbm_data
+                (video_id, user_id,  type, channel, user_fans, user_view_30, user_share_30, user_return_30, user_rov, user_str, out_user_id, spider_mode, out_play_cnt, out_like_cnt, out_share_cnt, hour_dt_str)
+                    values 
+                ({obj['video_id']}, '{obj['uid']}', '{obj['type']}', '{obj['channel']}', {obj['fans']}, {obj['view_count_user_30days']}, {obj['share_count_user_30days']}, {obj['return_count_user_30days']}, {obj['rov_user']}, {obj['str_user']}, '{obj['out_user_id']}', '{obj['mode']}', {obj['out_play_cnt']}, {obj['out_like_cnt']}, {obj['out_share_cnt']}, '{obj['dt']}');
+            """
+            self.spider_mysql.update(insert_sql)
 
     def get_hour_data(self, dt):
         """
@@ -34,10 +52,10 @@ class VideoDataGenerator(object):
         result = []
         for line in hour_data:
             obj = {
-                "uid": line["uid"],
-                "video_id": line["videoid"],
-                "type": line["type"],
-                "channel": line["channel"],
+                "uid": line["uid"] if line['uid'] else "",
+                "video_id": line["videoid"] if line['videoid'] else "",
+                "type": line["type"] if line['type'] else "",
+                "channel": line["channel"] if line['channel'] else "",
                 "fst": line["flowpool_start_type"],
                 "fsl": line["flowpool_start_level"],
                 "fet": line["flowpool_end_type"],
@@ -52,17 +70,17 @@ class VideoDataGenerator(object):
                 "rov_dms": line["rov_dms"],
                 "ros_sls": line["ros_sls"],
                 "rov_sls": line["rov_sls"],
-                "fans": line["fans"],
-                "view_count_user_30days": line["view_cnt_user_30days"],
-                "share_count_user_30days": line["share_cnt_user_30days"],
-                "return_count_user_30days": line["return_cnt_user_30days"],
-                "rov_user": line["rov_user"],
-                "str_user": line["str_user"],  # share / view
-                "out_user_id": line["out_user_id"],
-                "mode": line["strategy"],
-                "out_play_cnt": line["out_play_cnt"],
-                "out_like_cnt": line["out_like_cnt"],
-                "out_share_cnt": line["out_share_cnt"],
+                "fans": line["fans"] if line["fans"] else 0,
+                "view_count_user_30days": line["view_cnt_user_30days"] if line["view_cnt_user_30days"] else 0,
+                "share_count_user_30days": line["share_cnt_user_30days"] if line["share_cnt_user_30days"] else 0,
+                "return_count_user_30days": line["return_cnt_user_30days"] if line["return_cnt_user_30days"] else 0,
+                "rov_user": line["rov_user"] if line["rov_user"] else 0,
+                "str_user": line["str_user"] if line["str_user"] else 0, 
+                "out_user_id": line["out_user_id"] if line["out_user_id"] else "",
+                "mode": line["strategy"] if line["strategy"] else "",
+                "out_play_cnt": line["out_play_cnt"] if line["out_play_cnt"] else 0,
+                "out_like_cnt": line["out_like_cnt"] if line["out_like_cnt"] else 0,
+                "out_share_cnt": line["out_share_cnt"] if line["out_share_cnt"] else 0,
                 "out_collection_cnt": line["out_collection_cnt"],
                 "up_level_time_hour": line["up_level_time_hour"],
                 "dt": line["dt"],
@@ -94,6 +112,29 @@ class VideoDataGenerator(object):
         ]
         return result
 
+    def get_daily_user_info(self, dt):
+        """
+        获取用户的信息以及特征
+        :param dt:
+        """
+        sql = f"""select * from loghubods.conten_quality_base_day_new where dt = '{dt}';"""
+        data = self.oo.select(sql)
+        result = [
+            {
+                "uid": item["uid"],
+                "video_id": item["videoid"],
+                "address": item["city"],
+                "return_3days": item["return_count_user_3days"],
+                "view_3days": item["view_count_user_3days"],
+                "share_3days": item["share_count_3days"],
+                "3day_return_500_videos": item["return_count_user_3days_videocnt"],
+                "30day_return_2000_videos": item["return_count_user_30days_videocnt"],
+                "dt": item["dt"],
+            }
+            for item in data
+        ]
+        return result
+
 
 def save_daily_data(start_date, end_date, save_path):
     """
@@ -125,14 +166,14 @@ def download_hour_video_data(date_str):
     V = VideoDataGenerator()
     data_list = V.get_hour_data(date_str)
     L = []
-    for obj in data_list:
+    for obj in tqdm(data_list):
         L.append(obj)
     temp_path = "data/temp_data/hour_{}.json".format(date_str)
     with open(temp_path, "w") as f:
         f.write(json.dumps(L, ensure_ascii=False))
 
 
-def save_hourly_data(start_date, end_date, save_path):
+def save_hourly_data(start_date, end_date):
     """
     save hourly data
     :param start_date:
@@ -140,26 +181,47 @@ def save_hourly_data(start_date, end_date, save_path):
     :param save_path:
     :return:
     """
-    print(save_path)
+    # print(save_path)
+    V = VideoDataGenerator()
     date_list = generate_hourly_strings(start_date, end_date)
-    with ThreadPoolExecutor(max_workers=10) as Pool:
-        Pool.map(download_hour_video_data, date_list)
-    # for date_str in tqdm(date_list):
-    #     data_list = V.get_hour_data(date_str)
-    #     for obj in tqdm(data_list):
-    #         L.append(obj)
-    # with open(save_path, "w") as f:
-    #     f.write(json.dumps(L, ensure_ascii=False))
+    for obj in tqdm(date_list):
+        V.insert_into_database(obj)
+    # with ThreadPoolExecutor(max_workers=5) as Pool:
+    #     Pool.map(V.insert_into_database, date_list)
+
+
+def save_daily_user_info(start_date, end_date, save_path):
+    """
+    save daily user_info
+    :param start_date:
+    :param end_date:
+    :param save_path:
+    :return:
+    """
+    date_list = generate_daily_strings(start_date, end_date)
+    V = VideoDataGenerator()
+    L = {}
+    for date_str in tqdm(date_list):
+        L[date_str] = {}
+        data_list = V.get_daily_user_info(date_str)
+        for obj in tqdm(data_list):
+            video_id = obj["video_id"]
+            L[date_str][video_id] = obj
+    with open(save_path, "w") as f:
+        f.write(json.dumps(L, ensure_ascii=False))
 
 
 if __name__ == "__main__":
-    flag = int(input("请输入标识符,输入 1 获取小时级数据, 输入 2 获取天级数据: \n"))
+    flag = int(
+        input(
+            "请输入标识符,输入 1 获取小时级数据\n输入 2 获取天级数据\n输入 3 获取用户信息数据: \n"
+        )
+    )
     if flag == 1:
         start = str(input("请输入开始字符串, 格式为 yyyymmddhh:\n"))
         end = str(input("请输入结束字符串, 格式为 yyyymmddhh: \n"))
-        save_p = "data/hourly-train-{}-{}.json".format(start, end)
         if len(start) == 10 and len(end) == 10:
-            save_hourly_data(start, end, save_p)
+            save_hourly_data(start, end)
         else:
             print("Time format is not ok")
     elif flag == 2:
@@ -170,5 +232,13 @@ if __name__ == "__main__":
             save_daily_data(start, end, save_p)
         else:
             print("Time format is not ok")
+    elif flag == 3:
+        start = str(input("请输入开始字符串, 格式为 yyyymmdd:\n"))
+        end = str(input("请输入结束字符串, 格式为 yymmdd: \n"))
+        save_p = "data/train_data/daily-user-info-{}-{}.json".format(start, end)
+        if len(start) == 8 and len(end) == 8:
+            save_daily_user_info(start, end, save_p)
+        else:
+            print("Time format is not ok")
     else:
-        print("Input Error ! Make sure your input is 1 or 2!!")
+        print("Input Error ! Make sure your input is 1 or 2 or 3!!")

+ 0 - 9
test.py

@@ -1,9 +0,0 @@
-import tkinter.filedialog
-
-path = tkinter.filedialog.askopenfilename()
-
-if path:
-    with open(path, 'r') as f:
-        print(f.read())
-else:
-    print("No good")