84 lines
1.9 KiB
Go
84 lines
1.9 KiB
Go
|
|
package rocketmqx
|
||
|
|
|
||
|
|
import (
|
||
|
|
"sync"
|
||
|
|
"time"
|
||
|
|
|
||
|
|
"github.com/apache/rocketmq-client-go/v2"
|
||
|
|
"github.com/apache/rocketmq-client-go/v2/primitive"
|
||
|
|
"github.com/apache/rocketmq-client-go/v2/producer"
|
||
|
|
)
|
||
|
|
|
||
|
|
type MqProducer struct {
|
||
|
|
brokerAddr []string
|
||
|
|
retries int
|
||
|
|
timeout time.Duration
|
||
|
|
groupMap sync.Map
|
||
|
|
lock sync.Mutex
|
||
|
|
}
|
||
|
|
|
||
|
|
var pro *MqProducer
|
||
|
|
var once sync.Once
|
||
|
|
|
||
|
|
func InitProducer(brokerAddr []string, retries int, timeout time.Duration) *MqProducer {
|
||
|
|
// 只能初始化一次
|
||
|
|
once.Do(func() {
|
||
|
|
pro = &MqProducer{
|
||
|
|
brokerAddr: brokerAddr,
|
||
|
|
retries: retries,
|
||
|
|
timeout: timeout,
|
||
|
|
groupMap: sync.Map{},
|
||
|
|
lock: sync.Mutex{},
|
||
|
|
}
|
||
|
|
})
|
||
|
|
return pro
|
||
|
|
}
|
||
|
|
|
||
|
|
// 使用注意事项:一个服务同一个group只能有一个producer,否则会出现消息重复消费的问题
|
||
|
|
func (m *MqProducer) GetClient(group string) (rocketmq.Producer, error) {
|
||
|
|
val, ok := m.groupMap.Load(group)
|
||
|
|
if ok {
|
||
|
|
return val.(rocketmq.Producer), nil
|
||
|
|
}
|
||
|
|
|
||
|
|
m.lock.Lock()
|
||
|
|
defer m.lock.Unlock()
|
||
|
|
|
||
|
|
p, err := rocketmq.NewProducer(
|
||
|
|
producer.WithNsResolver(primitive.NewPassthroughResolver(m.brokerAddr)),
|
||
|
|
producer.WithRetry(m.retries),
|
||
|
|
producer.WithGroupName(group),
|
||
|
|
producer.WithSendMsgTimeout(m.timeout),
|
||
|
|
)
|
||
|
|
if err != nil {
|
||
|
|
return nil, err
|
||
|
|
}
|
||
|
|
err = p.Start()
|
||
|
|
if err != nil {
|
||
|
|
return nil, err
|
||
|
|
}
|
||
|
|
m.groupMap.Store(group, p)
|
||
|
|
return p, nil
|
||
|
|
}
|
||
|
|
|
||
|
|
// @param level 消息延迟时间等级 [0]0s [1]1s [2]5s [3]10s [4]30s [5]1m [6]2m [7]3m [8]4m [9]5m [10]6m [11]7m [12]8m [13]9m [14]10m [15]20m [16]30m [17]1h [18]2h
|
||
|
|
func BuildMessage(msg, topic, tags string, keys []string, level int) *primitive.Message {
|
||
|
|
message := &primitive.Message{
|
||
|
|
Topic: topic,
|
||
|
|
Body: []byte(msg),
|
||
|
|
}
|
||
|
|
if len(tags) > 0 {
|
||
|
|
//指定标签
|
||
|
|
message.WithTag(tags)
|
||
|
|
}
|
||
|
|
if len(keys) > 0 {
|
||
|
|
// 业务逻辑key 保证消费幂等
|
||
|
|
message.WithKeys(keys)
|
||
|
|
}
|
||
|
|
if level > 0 {
|
||
|
|
// 设置要消耗的消息延迟时间等级
|
||
|
|
message.WithDelayTimeLevel(level)
|
||
|
|
}
|
||
|
|
return message
|
||
|
|
}
|