76 lines
1.7 KiB
Go
76 lines
1.7 KiB
Go
package rocketmqx
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/apache/rocketmq-client-go/v2"
|
|
"github.com/apache/rocketmq-client-go/v2/consumer"
|
|
"github.com/apache/rocketmq-client-go/v2/primitive"
|
|
)
|
|
|
|
type mqCustomer struct {
|
|
rocketmq.PushConsumer
|
|
}
|
|
|
|
// TODO:需要封装成单例模式
|
|
|
|
// 普通消费者
|
|
func NewDefaultCustomer(brokerAddr []string, group string) (*mqCustomer, error) {
|
|
c, err := rocketmq.NewPushConsumer(
|
|
consumer.WithNsResolver(primitive.NewPassthroughResolver(brokerAddr)),
|
|
consumer.WithGroupName(group),
|
|
//consumer.WithConsumerModel(consumer.BroadCasting),
|
|
consumer.WithInstance(fmt.Sprintf("%v", time.Now().UnixNano())),
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
r := &mqCustomer{
|
|
c,
|
|
}
|
|
|
|
return r, nil
|
|
}
|
|
|
|
// 广播消费者
|
|
func NewBroadcastCustomer(brokerAddr []string, group string) (*mqCustomer, error) {
|
|
c, err := rocketmq.NewPushConsumer(
|
|
consumer.WithNsResolver(primitive.NewPassthroughResolver(brokerAddr)),
|
|
consumer.WithGroupName(group),
|
|
consumer.WithConsumerModel(consumer.BroadCasting),
|
|
consumer.WithInstance(fmt.Sprintf("%v", time.Now().UnixNano())),
|
|
consumer.WithConsumeFromWhere(consumer.ConsumeFromLastOffset),
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
r := &mqCustomer{
|
|
c,
|
|
}
|
|
|
|
return r, nil
|
|
}
|
|
|
|
// 订阅消息
|
|
func (r *mqCustomer) SubscribeMsg(topic string, tag string, f func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error)) error {
|
|
selector := consumer.MessageSelector{
|
|
Type: consumer.TAG,
|
|
Expression: tag,
|
|
}
|
|
err := r.Subscribe(topic, selector, f)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// 订阅后再启动
|
|
err = r.Start()
|
|
if err != nil {
|
|
r.Unsubscribe(topic)
|
|
return err
|
|
}
|
|
return nil
|
|
}
|