本文介绍如何在VPC环境下通过SASL接入点接入CMQ-Kafka版消息队列收发消息。
前提条件
• 安装Go1.16或以上版本的SDK。
• 配置合适的IDE环境和编码环境。
• 在CMQ-Kafka版消息队列控制台新建实例。
开始开发
下面以GoLand作为IDE环境来开发。
(1)新建go modules工程。
(2)新建配置config.yaml。
brokers: XXX:XX,XXX:XX,XXX:XX
saslenable: true
user: XXX
pwd: "XXX"
topic: topic1
comsumergrp: g1

(3)生产消息。
producerDemo.go示例代码如下:
package main
import (
"encoding/json"
"fmt"
"github.com/Shopify/sarama"
"github.com/rs/zerolog/log"
"gopkg.in/yaml.v3"
"io/ioutil"
"strings"
"time"
)
type Config struct {
Brokers string `yaml:"brokers"`
SASLEnable bool `json:"saslenable"`
User string `yaml:"user"`
Pwd string `yaml:"pwd"`
Topic string `yaml:"topic"`
ComsumerGrp string `yaml:"comsumergrp"`
}
type producerClient struct {
syncProducer sarama.SyncProducer
client sarama.Client
createTime int64
}
func newProducerClient(brokers string,conf Config) (*producerClient, error) {
brokerList := strings.Split(brokers, ",")
config := sarama.NewConfig()
config.Version = sarama.V2_4_0_0
config.Producer.RequiredAcks = sarama.WaitForAll // 发送完数据需要leader和follow都确认
config.Producer.Partitioner = sarama.NewManualPartitioner // 指定一个partition
config.Producer.Return.Successes = true // 成功交付的消息将在success channel返回
config.Metadata.Retry.Max = 1
config.Metadata.Timeout = time.Second * 5
// sasl配置信息
//开启SASL认证
config.Net.SASL.Enable = conf.SASLEnable
//SASL认证用户名
config.Net.SASL.User = conf.User
//SASL认证密码
config.Net.SASL.Password = conf.Pwd
fmt.Println("newProducerClient",config)
//新建一个Client
client, err := sarama.NewClient(brokerList, config)
if err != nil {
return nil, err
}
//新建一个producer
syncProducer, err := sarama.NewSyncProducerFromClient(client)
if err != nil {
_ = client.Close()
return nil, err
}
producer := &producerClient{
syncProducer: syncProducer,
client: client,
createTime: time.Now().Unix(),
}
return producer, nil
}
func main(){
log.Info().Interface("produce messsage", "begin").Send()
//读取broker配置文件
var conf Config
yamlfile,err := ioutil.ReadFile("config.yaml")
if err!= nil{
fmt.Println("read config err",err)
}
err2 := yaml.Unmarshal(yamlfile,&conf)
if err2!=nil{
fmt.Println("unmarsharl config err",err2)
return
}
fmt.Println("config:",conf)
brokers := conf.Brokers
topicName := conf.Topic
fmt.Println("brokers => ",brokers)
log.Info().Interface("run", "生产消息").Send()
//新建一个生产者
producerClient, err := newProducerClient(brokers,conf)
if err != nil {
fmt.Println("newProducerClient error => ", err)
panic(err)
}
defer producerClient.client.Close()
//构造一个生产者消息
msg := &sarama.ProducerMessage{
Topic: topicName,
Value: sarama.StringEncoder(time.Now().String()),
Partition: 1,
}
//发送消息
_, _, err = producerClient.syncProducer.SendMessage(msg)
if err != nil {
fmt.Println("SendMessage error => ", err)
}
jmsg, _ := json.Marshal(msg)
fmt.Println("SendMessage => ", string(jmsg))
}
(4)消费消息。
consumerDemo.go示例代码如下:
package main
import (
"context"
"encoding/json"
"fmt"
"github.com/Shopify/sarama"
"github.com/rs/zerolog/log"
"gopkg.in/yaml.v3"
"io/ioutil"
"strings"
"sync"
"time"
)
type consumerGroupClient struct {
consumer sarama.ConsumerGroup
client sarama.Client
createTime int64
ready chan bool
}
type groupConsumer struct {
ready chan bool
msgCh chan *sarama.ConsumerMessage
limit int
received int
}
// Setup is run at the beginning of a new session, before ConsumeClaim
func (c *groupConsumer) Setup(sarama.ConsumerGroupSession) error {
// Mark the consumer as ready
close(c.ready)
return nil
}
// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (c *groupConsumer) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}
// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (c *groupConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
// NOTE:
// Do not move the code below to a goroutine.
// The `ConsumeClaim` itself is called within a goroutine, see:
// https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
for message := range claim.Messages() {
fmt.Println(fmt.Sprintf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic))
session.MarkMessage(message, "")
c.msgCh <- message
}
return nil
}
func newConsumerGroupClient(brokers string, groupID string, conf Config) (*consumerGroupClient, error) {
brokerList := strings.Split(brokers, ",")
config := sarama.NewConfig()
config.Version = sarama.V2_4_0_0
config.Metadata.Retry.Max = 1
config.Metadata.Timeout = time.Second * 5
config.Consumer.Offsets.Initial = sarama.OffsetOldest
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
// sasl 配置信息
//开启SASL验证
config.Net.SASL.Enable = conf.SASLEnable
//SASL认证用户名
config.Net.SASL.User = conf.User
//SASL认证密码
config.Net.SASL.Password = conf.Pwd
//新建一个Client
baseClient, err := sarama.NewClient(brokerList, config)
if err != nil {
return nil, err
}
//新建consumer
client, err := sarama.NewConsumerGroupFromClient(groupID, baseClient)
if err != nil {
_ = baseClient.Close()
return nil, err
}
consumer := &consumerGroupClient{
consumer: client,
client: baseClient,
createTime: time.Now().Unix(),
}
return consumer, nil
}
func (c *consumerGroupClient) ConsumeMessage(topics string, timeoutSec int, limit int) ([]*sarama.ConsumerMessage, error) {
/**
* Setup a new Sarama consumer group
*/
if limit == -1 {
limit = 999999999
}
consumer := groupConsumer{
ready: make(chan bool),
msgCh: make(chan *sarama.ConsumerMessage),
limit: limit,
}
defer close(consumer.msgCh)
var messages []*sarama.ConsumerMessage
ctx, ctxCancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for {
// `Consume` should be called inside an infinite loop, when a
// server-side rebalance happens, the consumer session will need to be
// recreated to get the new claims
pConsumer := &consumer
if err := c.consumer.Consume(ctx, strings.Split(topics, ","), pConsumer); err != nil {
fmt.Println("Error from consumer:", err)
}
// check if context was cancelled, signaling that the consumer should stop
if ctx.Err() != nil {
fmt.Println("Consume Error from consumer:", ctx.Err())
return
}
consumer.ready = make(chan bool) // exited by rebalance, so not ready, call Consume again
}
}()
<-consumer.ready // Await till the consumer has been set up the first time(channel closed in SetUp())
fmt.Println("Sarama consumer up and running!...")
// read until timeout or limit
reading := true
for reading {
select {
case <-ctx.Done():
fmt.Println("consumer group terminating: context cancelled")
reading = false
case <-time.After(time.Second * time.Duration(timeoutSec)):
fmt.Println("consumer group terminating: by timeout")
reading = false
case msg := <-consumer.msgCh:
if msg == nil {
reading = false
} else {
messages = append(messages, msg)
consumer.received++
if consumer.received == consumer.limit {
fmt.Println("consumer group terminating: by count limit")
reading = false
}
}
}
}
ctxCancel()
wg.Wait()
return messages, nil
}
func main() {
log.Info().Interface("consumer messsage", "begin").Send()
//读取配置文件
var conf Config
yamlfile,err := ioutil.ReadFile("config.yaml")
if err!= nil{
fmt.Println("read config err",err)
}
err2 := yaml.Unmarshal(yamlfile,&conf)
if err2!=nil{
fmt.Println("unmarsharl config err",err2)
return
}
fmt.Println("config:",conf)
brokers := conf.Brokers
groupID := conf.ComsumerGrp
topicName := conf.Topic
consumerGroupClient, err := newConsumerGroupClient(brokers, groupID,conf)
if err != nil {
fmt.Println("newConsumerGroupClient error => ", err)
panic(err)
}
//消费消息
msg, err := consumerGroupClient.ConsumeMessage(topicName, 10, -1)
if err != nil {
fmt.Println("ConsumeMessage error => ", err)
}
jmsg, _ := json.Marshal(msg)
fmt.Println("ConsumeMessage success => ", string((jmsg)))
}