Files
2024-03-16 15:18:11 +08:00

76 lines
1.7 KiB
Go

package rocketmqx
import (
"context"
"fmt"
"time"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
)
type mqCustomer struct {
rocketmq.PushConsumer
}
// TODO:需要封装成单例模式
// 普通消费者
func NewDefaultCustomer(brokerAddr []string, group string) (*mqCustomer, error) {
c, err := rocketmq.NewPushConsumer(
consumer.WithNsResolver(primitive.NewPassthroughResolver(brokerAddr)),
consumer.WithGroupName(group),
//consumer.WithConsumerModel(consumer.BroadCasting),
consumer.WithInstance(fmt.Sprintf("%v", time.Now().UnixNano())),
)
if err != nil {
return nil, err
}
r := &mqCustomer{
c,
}
return r, nil
}
// 广播消费者
func NewBroadcastCustomer(brokerAddr []string, group string) (*mqCustomer, error) {
c, err := rocketmq.NewPushConsumer(
consumer.WithNsResolver(primitive.NewPassthroughResolver(brokerAddr)),
consumer.WithGroupName(group),
consumer.WithConsumerModel(consumer.BroadCasting),
consumer.WithInstance(fmt.Sprintf("%v", time.Now().UnixNano())),
consumer.WithConsumeFromWhere(consumer.ConsumeFromLastOffset),
)
if err != nil {
return nil, err
}
r := &mqCustomer{
c,
}
return r, nil
}
// 订阅消息
func (r *mqCustomer) SubscribeMsg(topic string, tag string, f func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error)) error {
selector := consumer.MessageSelector{
Type: consumer.TAG,
Expression: tag,
}
err := r.Subscribe(topic, selector, f)
if err != nil {
return err
}
// 订阅后再启动
err = r.Start()
if err != nil {
r.Unsubscribe(topic)
return err
}
return nil
}