Files
rocketmqx/producer.go
T
2024-03-16 15:18:11 +08:00

84 lines
1.9 KiB
Go

package rocketmqx
import (
"sync"
"time"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
)
type MqProducer struct {
brokerAddr []string
retries int
timeout time.Duration
groupMap sync.Map
lock sync.Mutex
}
var pro *MqProducer
var once sync.Once
func InitProducer(brokerAddr []string, retries int, timeout time.Duration) *MqProducer {
// 只能初始化一次
once.Do(func() {
pro = &MqProducer{
brokerAddr: brokerAddr,
retries: retries,
timeout: timeout,
groupMap: sync.Map{},
lock: sync.Mutex{},
}
})
return pro
}
// 使用注意事项:一个服务同一个group只能有一个producer,否则会出现消息重复消费的问题
func (m *MqProducer) GetClient(group string) (rocketmq.Producer, error) {
val, ok := m.groupMap.Load(group)
if ok {
return val.(rocketmq.Producer), nil
}
m.lock.Lock()
defer m.lock.Unlock()
p, err := rocketmq.NewProducer(
producer.WithNsResolver(primitive.NewPassthroughResolver(m.brokerAddr)),
producer.WithRetry(m.retries),
producer.WithGroupName(group),
producer.WithSendMsgTimeout(m.timeout),
)
if err != nil {
return nil, err
}
err = p.Start()
if err != nil {
return nil, err
}
m.groupMap.Store(group, p)
return p, nil
}
// @param level 消息延迟时间等级 [0]0s [1]1s [2]5s [3]10s [4]30s [5]1m [6]2m [7]3m [8]4m [9]5m [10]6m [11]7m [12]8m [13]9m [14]10m [15]20m [16]30m [17]1h [18]2h
func BuildMessage(msg, topic, tags string, keys []string, level int) *primitive.Message {
message := &primitive.Message{
Topic: topic,
Body: []byte(msg),
}
if len(tags) > 0 {
//指定标签
message.WithTag(tags)
}
if len(keys) > 0 {
// 业务逻辑key 保证消费幂等
message.WithKeys(keys)
}
if level > 0 {
// 设置要消耗的消息延迟时间等级
message.WithDelayTimeLevel(level)
}
return message
}