123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263 |
- #! /usr/bin/env python
- # -*- coding: utf-8 -*-
- # vim:fenc=utf-8
- #
- # Copyright © 2025 StrayWarrior <i@straywarrior.com>
- """
- 1.删除容易导致偏差的viewall特征
- 2.删除分桶不均匀的cpa特征
- 3.减少dense特征
- 4.增加U-I交叉统计
- 5.增加线性部分dense
- 6.减少wide部分embedding
- 7.减少部分bucket size
- 8.使用protobuf
- 9.调整embedding variable (PAI平台目前还不支持)
- """
- import re
- from easy_rec.python.protos.pipeline_pb2 import EasyRecConfig
- from easy_rec.python.protos.train_pb2 import TrainConfig
- from easy_rec.python.protos.eval_pb2 import EvalConfig, AUC, EvalMetrics
- from easy_rec.python.protos.dataset_pb2 import DatasetConfig
- from easy_rec.python.protos.feature_config_pb2 import FeatureConfig, FeatureGroupConfig, WideOrDeep, EVParams
- from easy_rec.python.protos.easy_rec_model_pb2 import EasyRecModel
- from easy_rec.python.protos.deepfm_pb2 import DeepFM
- from easy_rec.python.protos.dnn_pb2 import DNN
- from easy_rec.python.protos.export_pb2 import ExportConfig
- from easy_rec.python.protos.optimizer_pb2 import Optimizer, AdamOptimizer, LearningRate, ConstantLearningRate
- from google.protobuf import text_format
- raw_input = open("data_fields_v3.config").readlines()
- input_fields = dict(
- map(lambda x: (x[0], x[1]),
- map(lambda x: x.strip().split(' '), raw_input)))
- def read_features(filename, excludes=None):
- features = open(filename).readlines()
- features = [name.strip().lower() for name in features]
- if excludes:
- for x in excludes:
- if x in features:
- features.remove(x)
- return features
- exclude_features = ['viewall', 'cpa']
- dense_features = read_features("features_top300.config", exclude_features)
- top_dense_features = read_features('features_top50.config', exclude_features)
- sparse_features = [
- "cid", "adid", "adverid",
- "region", "city", "brand",
- "vid", "cate1", "cate2",
- "apptype", "hour", "hour_quarter", "root_source_scene", "root_source_channel", "is_first_layer", "title_split", "user_has_conver_1y",
- "user_adverid_view_3d", "user_adverid_view_7d", "user_adverid_view_30d",
- "user_adverid_click_3d", "user_adverid_click_7d", "user_adverid_click_30d",
- "user_adverid_conver_3d", "user_adverid_conver_7d", "user_adverid_conver_30d",
- "user_skuid_view_3d", "user_skuid_view_7d", "user_skuid_view_30d",
- "user_skuid_click_3d", "user_skuid_click_7d", "user_skuid_click_30d",
- "user_skuid_conver_3d", "user_skuid_conver_7d", "user_skuid_conver_30d"
- ]
- tag_features = [
- "user_vid_return_tags_2h", "user_vid_return_tags_1d", "user_vid_return_tags_3d",
- "user_vid_return_tags_7d", "user_vid_return_tags_14d"
- ]
- seq_features = [
- "user_cid_click_list", "user_cid_conver_list"
- ]
- input_type_map = {
- 'BIGINT': DatasetConfig.FieldType.INT64,
- 'DOUBLE': DatasetConfig.FieldType.DOUBLE,
- 'STRING': DatasetConfig.FieldType.STRING
- }
- use_ev_features = [
- "cid", "adid", "adverid", "vid"
- ]
- bucket_size_map = {
- 'adverid': 100000,
- 'region': 1000,
- 'city': 10000,
- 'brand': 10000,
- 'cate1': 10000,
- 'cate2': 10000,
- 'apptype': 1000,
- 'hour': 1000, # 实际上可以直接指定词表
- 'hour_quarter': 4000,
- 'root_source_scene': 100,
- 'root_source_channel': 1000,
- 'is_first_layer': 100,
- 'user_has_conver_1y': 100,
- }
- def create_config():
- config = EasyRecConfig()
- # 训练配置
- train_config = TrainConfig()
- # 配置多个优化器
- optimizers = [
- (0.0010, False), # wide参数
- (0.0006, False), # dense参数
- (0.002, False) # deep embedding参数
- ]
- for lr, use_moving_avg in optimizers:
- optimizer = Optimizer()
- adam_optimizer = AdamOptimizer()
- learning_rate = LearningRate()
- constant_lr = ConstantLearningRate()
- constant_lr.learning_rate = lr
- learning_rate.constant_learning_rate.CopyFrom(constant_lr)
- adam_optimizer.learning_rate.CopyFrom(learning_rate)
- optimizer.adam_optimizer.CopyFrom(adam_optimizer)
- optimizer.use_moving_average = use_moving_avg
- train_config.optimizer_config.append(optimizer)
- train_config.num_steps = 200000
- train_config.sync_replicas = True
- train_config.save_checkpoints_steps = 1100
- train_config.log_step_count_steps = 100
- train_config.save_summary_steps = 100
- config.train_config.CopyFrom(train_config)
- # 评估配置
- eval_config = EvalConfig()
- metrics_set = EvalMetrics()
- metrics_set.auc.SetInParent()
- eval_config.metrics_set.append(metrics_set)
- eval_config.eval_online = True
- eval_config.eval_interval_secs = 120
- config.eval_config.CopyFrom(eval_config)
- # 数据配置
- data_config = DatasetConfig()
- data_config.batch_size = 512
- data_config.num_epochs = 1
- data_config.prefetch_size = 32
- data_config.input_type = DatasetConfig.InputType.OdpsInputV2
- # 添加输入字段
- for name in input_fields:
- input_field = DatasetConfig.Field()
- input_field.input_name = name
- input_field.input_type = input_type_map[input_fields[name]]
- if name in dense_features:
- input_field.default_val = "0"
- data_config.input_fields.append(input_field)
- # 添加标签字段
- data_config.label_fields.append("has_conversion")
- config.data_config.CopyFrom(data_config)
- # 特征配置
- feature_configs = []
- # Dense特征配置
- boundaries = [ x / 100 for x in range(0, 101)]
- for name in dense_features:
- feature_config = FeatureConfig()
- feature_config.input_names.append(name)
- feature_config.feature_type = FeatureConfig.RawFeature
- feature_config.boundaries.extend(boundaries)
- feature_config.embedding_dim = 6
- feature_configs.append(feature_config)
- # Sparse特征配置
- for name in sparse_features:
- feature_config = FeatureConfig()
- feature_config.input_names.append(name)
- feature_config.feature_type = FeatureConfig.IdFeature
- # 只有INT64类型的特征才能使用embedding variable特性
- if name in use_ev_features:
- if input_type_map[input_fields[name]] != DatasetConfig.FieldType.INT64:
- raise ValueError(f"Feature {name} must be of type INT64 to use embedding variable.")
- feature_config.ev_params.filter_freq = 2
- else:
- feature_config.hash_bucket_size = bucket_size_map.get(name, 1000000)
- feature_config.embedding_dim = 6
- feature_configs.append(feature_config)
- # Tag特征配置
- for name in tag_features + seq_features:
- feature_config = FeatureConfig()
- feature_config.input_names.append(name)
- feature_config.feature_type = FeatureConfig.TagFeature
- feature_config.hash_bucket_size = bucket_size_map.get(name, 1000000)
- feature_config.embedding_dim = 6
- feature_config.separator = ','
- feature_configs.append(feature_config)
- config.feature_configs.extend(feature_configs)
- # 模型配置
- model_config = EasyRecModel()
- model_config.model_class = "DeepFM"
- # Wide特征组
- wide_group = FeatureGroupConfig()
- wide_group.group_name = 'wide'
- wide_group.feature_names.extend(dense_features + sparse_features)
- wide_group.wide_deep = WideOrDeep.WIDE
- model_config.feature_groups.append(wide_group)
- # Deep特征组
- deep_group = FeatureGroupConfig()
- deep_group.group_name = 'deep'
- deep_group.feature_names.extend(top_dense_features + sparse_features + tag_features + seq_features)
- deep_group.wide_deep = WideOrDeep.DEEP
- model_config.feature_groups.append(deep_group)
- # DeepFM配置
- deepfm = DeepFM()
- deepfm.wide_output_dim = 2
- # DNN配置
- dnn = DNN()
- dnn.hidden_units.extend([256, 128, 64])
- deepfm.dnn.CopyFrom(dnn)
- # Final DNN配置
- final_dnn = DNN()
- final_dnn.hidden_units.extend([64, 32])
- deepfm.final_dnn.CopyFrom(final_dnn)
- deepfm.l2_regularization = 1e-5
- model_config.deepfm.CopyFrom(deepfm)
- model_config.embedding_regularization = 1e-6
- config.model_config.CopyFrom(model_config)
- # 导出配置
- export_config = ExportConfig()
- export_config.exporter_type = "final"
- config.export_config.CopyFrom(export_config)
- return config
- def merge_repeated_elements(msg_str, field_name):
- msg_str = re.sub(
- fr'( +{field_name}: [^\n]+\n)+',
- lambda m: '{}{}: [{}]\n'.format(
- m.group(0).split(field_name)[0],
- field_name,
- ', '.join(re.findall(fr'{field_name}: ([^\n]+)', m.group(0)))
- ),
- msg_str
- )
- return msg_str
- def main():
- config = create_config()
- msg_str = text_format.MessageToString(config)
- msg_str = merge_repeated_elements(msg_str, 'boundaries')
- msg_str = merge_repeated_elements(msg_str, 'hidden_units')
- print(msg_str)
- if __name__ == '__main__':
- main()
|