• 文档
  • 控制台
  • 登录
  • 立即注册
    目前不支持用户自主注册,如需注册账号,请联系400-080-1100
消息队列Kafka
下载PDF

4 最佳实践

4.1.1 SDK接入版本

建议客户端与Kafka服务版本号一致。目前kafka 服务版本为2.8.2。

其他客户端版本或可导致使用故障。


4.1.2 Go SDK

本文介绍如何在VPC环境下通过SASL接入点接入CMQ-Kafka版消息队列收发消息。

前提条件

• 安装Go1.16或以上版本的SDK。

• 配置合适的IDE环境和编码环境。

• 在CMQ-Kafka版消息队列控制台新建实例。

开始开发

下面以GoLand作为IDE环境来开发。

(1)新建go modules工程。

(2)新建配置config.yaml。

brokers: XXX:XX,XXX:XX,XXX:XX

saslenable: true

user: XXX

pwd: "XXX"

topic: topic1

comsumergrp: g1

WX20241019-175415@2x.png

(3)生产消息。

producerDemo.go示例代码如下:

package main

import (

"encoding/json"

"fmt"

"github.com/Shopify/sarama"

"github.com/rs/zerolog/log"

"gopkg.in/yaml.v3"

"io/ioutil"

"strings"

"time"

)

type Config struct {

Brokers string `yaml:"brokers"`

SASLEnable bool `json:"saslenable"`

User string `yaml:"user"`

Pwd string `yaml:"pwd"`

Topic string `yaml:"topic"`

ComsumerGrp string `yaml:"comsumergrp"`

}

type producerClient struct {

syncProducer sarama.SyncProducer

client sarama.Client

createTime int64

}

func newProducerClient(brokers string,conf Config) (*producerClient, error) {

brokerList := strings.Split(brokers, ",")

config := sarama.NewConfig()

config.Version = sarama.V2_4_0_0

config.Producer.RequiredAcks = sarama.WaitForAll // 发送完数据需要leader和follow都确认

config.Producer.Partitioner = sarama.NewManualPartitioner // 指定一个partition

config.Producer.Return.Successes = true // 成功交付的消息将在success channel返回

config.Metadata.Retry.Max = 1

config.Metadata.Timeout = time.Second * 5

// sasl配置信息

//开启SASL认证

config.Net.SASL.Enable = conf.SASLEnable

//SASL认证用户名

config.Net.SASL.User = conf.User

//SASL认证密码

config.Net.SASL.Password = conf.Pwd

fmt.Println("newProducerClient",config)

//新建一个Client

client, err := sarama.NewClient(brokerList, config)

if err != nil {

return nil, err

}

//新建一个producer

syncProducer, err := sarama.NewSyncProducerFromClient(client)

if err != nil {

_ = client.Close()

return nil, err

}

producer := &producerClient{

syncProducer: syncProducer,

client: client,

createTime: time.Now().Unix(),

}

return producer, nil

}

func main(){

log.Info().Interface("produce messsage", "begin").Send()

//读取broker配置文件

var conf Config

yamlfile,err := ioutil.ReadFile("config.yaml")

if err!= nil{

fmt.Println("read config err",err)

}

err2 := yaml.Unmarshal(yamlfile,&conf)

if err2!=nil{

fmt.Println("unmarsharl config err",err2)

return

}

fmt.Println("config:",conf)

brokers := conf.Brokers

topicName := conf.Topic

fmt.Println("brokers => ",brokers)

log.Info().Interface("run", "生产消息").Send()

//新建一个生产者

producerClient, err := newProducerClient(brokers,conf)

if err != nil {

fmt.Println("newProducerClient error => ", err)

panic(err)

}

defer producerClient.client.Close()

//构造一个生产者消息

msg := &sarama.ProducerMessage{

Topic: topicName,

Value: sarama.StringEncoder(time.Now().String()),

Partition: 1,

}

//发送消息

_, _, err = producerClient.syncProducer.SendMessage(msg)

if err != nil {

fmt.Println("SendMessage error => ", err)

}

jmsg, _ := json.Marshal(msg)

fmt.Println("SendMessage => ", string(jmsg))

}

(4)消费消息。

consumerDemo.go示例代码如下:

package main

import (

"context"

"encoding/json"

"fmt"

"github.com/Shopify/sarama"

"github.com/rs/zerolog/log"

"gopkg.in/yaml.v3"

"io/ioutil"

"strings"

"sync"

"time"

)

type consumerGroupClient struct {

consumer sarama.ConsumerGroup

client sarama.Client

createTime int64

ready chan bool

}

type groupConsumer struct {

ready chan bool

msgCh chan *sarama.ConsumerMessage

limit int

received int

}

// Setup is run at the beginning of a new session, before ConsumeClaim

func (c *groupConsumer) Setup(sarama.ConsumerGroupSession) error {

// Mark the consumer as ready

close(c.ready)

return nil

}

// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited

func (c *groupConsumer) Cleanup(sarama.ConsumerGroupSession) error {

return nil

}

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().

func (c *groupConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {

// NOTE:

// Do not move the code below to a goroutine.

// The `ConsumeClaim` itself is called within a goroutine, see:

// https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29

for message := range claim.Messages() {

fmt.Println(fmt.Sprintf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic))

session.MarkMessage(message, "")

c.msgCh <- message

}

return nil

}

func newConsumerGroupClient(brokers string, groupID string, conf Config) (*consumerGroupClient, error) {

brokerList := strings.Split(brokers, ",")

config := sarama.NewConfig()

config.Version = sarama.V2_4_0_0

config.Metadata.Retry.Max = 1

config.Metadata.Timeout = time.Second * 5

config.Consumer.Offsets.Initial = sarama.OffsetOldest

config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin

// sasl 配置信息

//开启SASL验证

config.Net.SASL.Enable = conf.SASLEnable

//SASL认证用户名

config.Net.SASL.User = conf.User

//SASL认证密码

config.Net.SASL.Password = conf.Pwd

//新建一个Client

baseClient, err := sarama.NewClient(brokerList, config)

if err != nil {

return nil, err

}

//新建consumer

client, err := sarama.NewConsumerGroupFromClient(groupID, baseClient)

if err != nil {

_ = baseClient.Close()

return nil, err

}

consumer := &consumerGroupClient{

consumer: client,

client: baseClient,

createTime: time.Now().Unix(),

}

return consumer, nil

}

func (c *consumerGroupClient) ConsumeMessage(topics string, timeoutSec int, limit int) ([]*sarama.ConsumerMessage, error) {

/**

 * Setup a new Sarama consumer group

 */

if limit == -1 {

limit = 999999999

}

consumer := groupConsumer{

ready: make(chan bool),

msgCh: make(chan *sarama.ConsumerMessage),

limit: limit,

}

defer close(consumer.msgCh)

var messages []*sarama.ConsumerMessage

ctx, ctxCancel := context.WithCancel(context.Background())

wg := &sync.WaitGroup{}

wg.Add(1)

go func() {

defer wg.Done()

for {

// `Consume` should be called inside an infinite loop, when a

// server-side rebalance happens, the consumer session will need to be

// recreated to get the new claims

pConsumer := &consumer

if err := c.consumer.Consume(ctx, strings.Split(topics, ","), pConsumer); err != nil {

fmt.Println("Error from consumer:", err)

}

// check if context was cancelled, signaling that the consumer should stop

if ctx.Err() != nil {

fmt.Println("Consume Error from consumer:", ctx.Err())

return

}

consumer.ready = make(chan bool) // exited by rebalance, so not ready, call Consume again

}

}()

<-consumer.ready // Await till the consumer has been set up the first time(channel closed in SetUp())

fmt.Println("Sarama consumer up and running!...")

// read until timeout or limit

reading := true

for reading {

select {

case <-ctx.Done():

fmt.Println("consumer group terminating: context cancelled")

reading = false

case <-time.After(time.Second * time.Duration(timeoutSec)):

fmt.Println("consumer group terminating: by timeout")

reading = false

case msg := <-consumer.msgCh:

if msg == nil {

reading = false

} else {

messages = append(messages, msg)

consumer.received++

if consumer.received == consumer.limit {

fmt.Println("consumer group terminating: by count limit")

reading = false

}

}

}

}

ctxCancel()

wg.Wait()

return messages, nil

}

func main() {

log.Info().Interface("consumer messsage", "begin").Send()

//读取配置文件

var conf Config

yamlfile,err := ioutil.ReadFile("config.yaml")

if err!= nil{

fmt.Println("read config err",err)

}

err2 := yaml.Unmarshal(yamlfile,&conf)

if err2!=nil{

fmt.Println("unmarsharl config err",err2)

return

}

fmt.Println("config:",conf)

brokers := conf.Brokers

groupID := conf.ComsumerGrp

topicName := conf.Topic

consumerGroupClient, err := newConsumerGroupClient(brokers, groupID,conf)

if err != nil {

fmt.Println("newConsumerGroupClient error => ", err)

panic(err)

}

//消费消息

msg, err := consumerGroupClient.ConsumeMessage(topicName, 10, -1)

if err != nil {

fmt.Println("ConsumeMessage error => ", err)

}

jmsg, _ := json.Marshal(msg)

fmt.Println("ConsumeMessage success => ", string((jmsg)))

}





4.1.3 Java SDK

4.1.3.1 环境准备

• 安装1.8或以上版本JDK。具体操作请参见安装JDK。

• 安装2.5或以上版本Maven。具体操作请参见安装Maven。

• 安装编译工具。

4.1.3.2 安装Java依赖库

<dependency>

 <groupId>org.apache.kafka</groupId>

 <artifactId>kafka-clients</artifactId>

 <version>2.8.2</version>

</dependency>

4.1.3.3 生产消息Demo

private void produce(String bootstrapServer, String topic, String username, String passwd) {

 Properties props = new Properties();

 props.put("bootstrap.servers", bootstrapServer); // 实例接入点信息

 props.put("key.serializer", StringSerializer.class.getName());

 props.put("value.serializer", StringSerializer.class.getName());

 // 如kafka实例开通sasl 需要打开如下配置,并正确配置用名和密码

 // ops.setProperty("security.protocol", "SASL_PLAINTEXT");

 // props.setProperty("sasl.mechanism", "SCRAM-SHA-256");

 // String jassc = "org.apache.kafka.common.security.scram.ScramLoginModule required\n"

 // + "username=\"" + username + "\"\n"

 // + "password=\"" + passwd + "\";";

 // System.out.println(jassc);

 // props.setProperty("sasl.jaas.config", jassc);

 

 KafkaProducer<String, String> producer = new KafkaProducer<>(props);

 Future<RecordMetadata> future = producer.send(new ProducerRecord<>(topic, "key", "hello kafka!"));

 RecordMetadata metadata = future.get();

 System.out.println("produce ok. partition:"+metadata.partition() + ",offset:"+metadata.offset());

 producer.flush();

 producer.close();

}

4.1.3.4 消费消息Demo

private void consume(String bootstrapServer, String topic, String groupId, Integer pollRecords,

 Integer msgCount, String offset, String username, String passwd){

 Properties props = new Properties();

 props.put("bootstrap.servers", bootstrapServer);

 props.put("group.id", groupId);

 props.put("max.poll.records", pollRecords);

 props.put("auto.offset.reset", offset);

 props.put("key.deserializer", StringDeserializer.class.getName());

 props.put("value.deserializer", StringDeserializer.class.getName());

 // props.setProperty("security.protocol", "SASL_PLAINTEXT");

 // props.setProperty("sasl.mechanism", "SCRAM-SHA-256");

 // String jassc = "org.apache.kafka.common.security.scram.ScramLoginModule required\n"

 // + "username=\"" + username + "\"\n"

 // + "password=\"" + passwd + "\";";

 // props.setProperty("sasl.jaas.config", jassc);

 KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

 consumer.subscribe(singleton(topic));

 Integer pollMsgs = 0;

 Integer noMsgTryCount = 0;

 ConsumerRecords<String, String> msgList = consumer.poll(Duration.ofSeconds(1));

 if (null == msgList || 0 == msgList.count()) {

 System.out.println("no more msg");

 continue;

 }else {

System.out.println("consume msg OK, msg num:" + msgList.count());

} consumer.close();


意见反馈

文档内容是否对您有帮助?

如您有其他疑问,您也可以通过在线客服来与我们联系探讨 在线客服

联系我们
回到顶部