package rocketmqx import ( "sync" "time" "github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/primitive" "github.com/apache/rocketmq-client-go/v2/producer" ) type MqProducer struct { brokerAddr []string retries int timeout time.Duration groupMap sync.Map lock sync.Mutex } var pro *MqProducer var once sync.Once func InitProducer(brokerAddr []string, retries int, timeout time.Duration) *MqProducer { // 只能初始化一次 once.Do(func() { pro = &MqProducer{ brokerAddr: brokerAddr, retries: retries, timeout: timeout, groupMap: sync.Map{}, lock: sync.Mutex{}, } }) return pro } // 使用注意事项:一个服务同一个group只能有一个producer,否则会出现消息重复消费的问题 func (m *MqProducer) GetClient(group string) (rocketmq.Producer, error) { val, ok := m.groupMap.Load(group) if ok { return val.(rocketmq.Producer), nil } m.lock.Lock() defer m.lock.Unlock() p, err := rocketmq.NewProducer( producer.WithNsResolver(primitive.NewPassthroughResolver(m.brokerAddr)), producer.WithRetry(m.retries), producer.WithGroupName(group), producer.WithSendMsgTimeout(m.timeout), ) if err != nil { return nil, err } err = p.Start() if err != nil { return nil, err } m.groupMap.Store(group, p) return p, nil } // @param level 消息延迟时间等级 [0]0s [1]1s [2]5s [3]10s [4]30s [5]1m [6]2m [7]3m [8]4m [9]5m [10]6m [11]7m [12]8m [13]9m [14]10m [15]20m [16]30m [17]1h [18]2h func BuildMessage(msg, topic, tags string, keys []string, level int) *primitive.Message { message := &primitive.Message{ Topic: topic, Body: []byte(msg), } if len(tags) > 0 { //指定标签 message.WithTag(tags) } if len(keys) > 0 { // 业务逻辑key 保证消费幂等 message.WithKeys(keys) } if level > 0 { // 设置要消耗的消息延迟时间等级 message.WithDelayTimeLevel(level) } return message }