kafka.go 2.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. package datasource
  2. import (
  3. "context"
  4. "fmt"
  5. "strings"
  6. kafka "github.com/segmentio/kafka-go"
  7. "github.com/segmentio/kafka-go/compress"
  8. "gitlab.alibaba-inc.com/pai_biz_arch/pairec/log"
  9. "gitlab.alibaba-inc.com/pai_biz_arch/pairec/recconf"
  10. )
  11. type KafkaLog struct{}
  12. func (l *KafkaLog) Errorf(msg string, args ...interface{}) {
  13. log.Error(fmt.Sprintf("msg=%s, args=%v", msg, args))
  14. }
  15. type KafkaProducer struct {
  16. BootstrapServers string
  17. Topic string
  18. Producer *kafka.Writer
  19. }
  20. var kafkaProducerInstances = make(map[string]*KafkaProducer)
  21. func GetKafkaProducer(name string) (*KafkaProducer, error) {
  22. if _, ok := kafkaProducerInstances[name]; !ok {
  23. return nil, fmt.Errorf("KafkaProducer not found, name:%s", name)
  24. }
  25. return kafkaProducerInstances[name], nil
  26. }
  27. func NewKafkaProducer(bootstrapServers, topic string) *KafkaProducer {
  28. p := &KafkaProducer{
  29. BootstrapServers: bootstrapServers,
  30. Topic: topic,
  31. }
  32. return p
  33. }
  34. func (k *KafkaProducer) Init() error {
  35. l := &KafkaLog{}
  36. w := kafka.NewWriter(kafka.WriterConfig{
  37. Brokers: strings.Split(k.BootstrapServers, ","),
  38. Topic: k.Topic,
  39. Balancer: kafka.CRC32Balancer{},
  40. MaxAttempts: 3,
  41. Async: true,
  42. BatchBytes: 1048576 * 4,
  43. ErrorLogger: kafka.LoggerFunc(l.Errorf),
  44. CompressionCodec: compress.Snappy.Codec(),
  45. })
  46. k.Producer = w
  47. return nil
  48. }
  49. func (k *KafkaProducer) SendMessage(message string) {
  50. err := k.Producer.WriteMessages(context.Background(),
  51. kafka.Message{
  52. Value: []byte(message),
  53. })
  54. if err != nil {
  55. log.Error(fmt.Sprintf("error=kafka write message(%v)", err))
  56. }
  57. }
  58. func (k *KafkaProducer) Close() {
  59. if k.Producer != nil {
  60. k.Producer.Close()
  61. }
  62. }
  63. func Load(config *recconf.RecommendConfig) {
  64. for name, conf := range config.KafkaConfs {
  65. m := &KafkaProducer{
  66. BootstrapServers: conf.BootstrapServers,
  67. Topic: conf.Topic,
  68. }
  69. err := m.Init()
  70. if err != nil {
  71. panic(err)
  72. }
  73. kafkaProducerInstances[name] = m
  74. }
  75. }