package rocketmqx import ( "context" "fmt" "time" "github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/consumer" "github.com/apache/rocketmq-client-go/v2/primitive" ) type mqCustomer struct { rocketmq.PushConsumer } // TODO:需要封装成单例模式 // 普通消费者 func NewDefaultCustomer(brokerAddr []string, group string) (*mqCustomer, error) { c, err := rocketmq.NewPushConsumer( consumer.WithNsResolver(primitive.NewPassthroughResolver(brokerAddr)), consumer.WithGroupName(group), //consumer.WithConsumerModel(consumer.BroadCasting), consumer.WithInstance(fmt.Sprintf("%v", time.Now().UnixNano())), ) if err != nil { return nil, err } r := &mqCustomer{ c, } return r, nil } // 广播消费者 func NewBroadcastCustomer(brokerAddr []string, group string) (*mqCustomer, error) { c, err := rocketmq.NewPushConsumer( consumer.WithNsResolver(primitive.NewPassthroughResolver(brokerAddr)), consumer.WithGroupName(group), consumer.WithConsumerModel(consumer.BroadCasting), consumer.WithInstance(fmt.Sprintf("%v", time.Now().UnixNano())), consumer.WithConsumeFromWhere(consumer.ConsumeFromLastOffset), ) if err != nil { return nil, err } r := &mqCustomer{ c, } return r, nil } // 订阅消息 func (r *mqCustomer) SubscribeMsg(topic string, tag string, f func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error)) error { selector := consumer.MessageSelector{ Type: consumer.TAG, Expression: tag, } err := r.Subscribe(topic, selector, f) if err != nil { return err } // 订阅后再启动 err = r.Start() if err != nil { r.Unsubscribe(topic) return err } return nil }