消费 Demo-多语言 SDK

最近更新时间:2025-10-24 11:39:02

我的收藏
本文介绍使用 Python、Java 消费 CLS 日志。

Python SDK

说明:
推荐您使用 Python 版本:3.9及以上。
Python kafka 客户端:kafka-python、kafka-python-ng、confluent-kafka-python。
如果您配置了数据压缩格式:格式 SNAPPY,请确认安装了 python-snappy 包;格式 LZ4,确认安装了 lz4 包。

单个消费者

import uuid
from kafka import KafkaConsumer,TopicPartition,OffsetAndMetadata
consumer = KafkaConsumer(
# cls kafka 协议消费控制台给出的的主题名称,例如XXXXXX-633a268c-XXXX-4a4c-XXXX-7a9a1a7baXXXX,可在控制台复制
'您的消费主题',
group_id = '您的消费组名称',
auto_offset_reset='earliest',
# 服务地址+端口,外网端口9096,内网端口9095,例子是内网消费,请根据您的实际情况填写
bootstrap_servers = ['kafkaconsumer-${region}.cls.tencentyun.com:9095'],
security_protocol = "SASL_PLAINTEXT",
sasl_mechanism = 'PLAIN',
# 用户名是日志集合ID,例如ca5cXXXXdd2e-4ac0af12-92d4b677d2c6
sasl_plain_username = "${logsetID}",
# 密码是用户的 SecretId#SecretKey 组合的字符串,例如 AKID********************************#XXXXuXtymIXT0Lac注意不要丢失#。建议使用子账号密钥,主账号为子账号授权时,遵循最小权限原则,即子账号的访问策略中的 action、resource 都配置为最小范围,可以满足操作即可。
sasl_plain_password = "${SecretId}#${SecretKey}",
api_version = (0,10,1)
)
print('begin')
for message in consumer:
print('begins')
print ("Topic:[%s] Partition:[%d] Offset:[%d] Value:[%s]" % (message.topic, message.partition, message.offset, message.value))
print('end')

多个消费者

from kafka import KafkaConsumer
import threading

TOPIC_NAME = '您的消费主题'
GROUP_ID = '您的消费组名称'
# 服务地址+端口,外网端口9096,内网端口9095,例子是内网消费,请根据您的实际情况填写
BOOTSTRAP_SERVERS = ''kafkaconsumer-${region}.cls.tencentyun.com:9095''

def consume_messages(thread_id):
# 创建 Kafka 消费者实例
consumer = KafkaConsumer(
TOPIC_NAME,
group_id=GROUP_ID,
bootstrap_servers=BOOTSTRAP_SERVERS,
value_deserializer=lambda m: m.decode('utf-8'),
auto_offset_reset='earliest',
security_protocol = "SASL_PLAINTEXT",
sasl_mechanism = 'PLAIN',
sasl_plain_username = "${logsetID}",
sasl_plain_password = "${SecretId}#${SecretKey}",
api_version = (2, 5, 1)
)

try:
for message in consumer:
print(f"Thread {thread_id}: partition = {message.partition}, offset = {message.offset}, value = {message.value}")
except KeyboardInterrupt:
pass
finally:
# 关闭消费者
consumer.close()

if __name__ == "__main__":
# 启动3个消费者线程,这是个例子,请您根据实际情况配置
num_consumers = 3
threads = []
for i in range(num_consumers):
thread = threading.Thread(target=consume_messages, args=(i,))
threads.append(thread)
thread.start()

# 等待所有线程结束
for thread in threads:
thread.join()

Java SDK

注意:
下面的例子中的 Java 代码,在 jaas.config 的配置中,${SecretId}#${SecretKey} 后有(; 分号),不要漏填,否则会报错。
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.5.0</version>
</dependency>
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerGroupTest {

public static void consume() {
Properties props = new Properties();
String logset_id = "${logsetID}";
// CLS 控制台 kafka 协议消费页面展示的主题名称
String topic_id = "您的消费主题";

String accessKeyID = System.getenv("${SecretId}");
String accessKeySecret = System.getenv("${SecretKey}");

String groupId = "您的消费组名称";

// 服务地址+端口,外网端口9096,内网端口9095,例子是内网消费,请根据您的实际情况填写
String hosts = "kafkaconsumer-${region}.cls.tencentyun.com:9095";
props.put("bootstrap.servers", hosts);
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "PLAIN");
props.put("sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\\"" +
logset_id + "\\" password=\\"" + accessKeyID + "#" + accessKeySecret + "\\";");

// Kafka 消费者配置
props.put("group.id", groupId);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "5000");
props.put("session.timeout.ms", "10000");
props.put("auto.offset.reset", "earliest");
props.put("max.poll.interval.ms", "120000");
props.put("heartbeat.interval.ms", "3000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

// 创建 Kafka 消费者实例
KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(props);
consumer.subscribe(Collections.singletonList(topic_id));
while(true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10000));
for (ConsumerRecord<String, String> record : records) {
System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset());
}
}
}
public static void main(String[] args){
consume();
}
}

Go SDK

注意:
下面的例子中的 Java 代码,在 jaas.config 的配置中,${SecretId}#${SecretKey} 后有(; 分号),不要漏填,否则会报错。
package main

import (
"context"
"fmt"
"github.com/Shopify/sarama"
"log"
"os"
"os/signal"
"syscall"
)

func main() {
// 创建Sarama消费者配置
//TOPIC_NAME是您的消费主题,在控制台查看.
topicName := "${TOPIC_NAME}"
//GROUP_ID 是 您的消费组名称
groupID := "${GROUP_ID}"
//BOOTSTRAP_SERVERS 是消费服务地址+端口,外网端口9096,内网端口9095,例如 kafkaconsumer-${region}.cls.tencentyun.com:9095
endpoint := "${BOOTSTRAP_SERVERS"
config := sarama.NewConfig()
config.Net.SASL.Enable = true
config.Net.SASL.User = "${logsetID}"
config.Net.SASL.Password = "${SecretId}#${SecretKey}"
config.Net.SASL.Mechanism = sarama.SASLTypePlaintext
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
config.Version = sarama.V1_1_1_0
config.Consumer.Offsets.Initial = sarama.OffsetNewest

// 创建Sarama消费者
sarama.Logger = log.New(os.Stdout, "[Sarama] ", log.LstdFlags)
consumer, err := sarama.NewConsumerGroup([]string{endpoint}, groupID, config)
if err != nil {
log.Fatal(err)
}
defer consumer.Close()

// 处理接收到的消息
handler := &ConsumerGroupHandler{}
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)

go func() {
for {
err := consumer.Consume(context.Background(), []string{topicName}, handler)
if err != nil {
log.Fatal(err)
}
if handler.ready {
break
}
}
}()

<-signals
fmt.Println("Exiting...")
}

// ConsumerGroupHandler 实现了sarama.ConsumerGroupHandler接口
type ConsumerGroupHandler struct {
ready bool
}

// Setup 在消费者组启动之前调用
func (h *ConsumerGroupHandler) Setup(sarama.ConsumerGroupSession) error {
h.ready = true
return nil
}

// Cleanup 在消费者组停止之后调用
func (h *ConsumerGroupHandler) Cleanup(sarama.ConsumerGroupSession) error {
h.ready = false
return nil
}

// ConsumeClaim 消费Claim中的消息
func (h *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
fmt.Printf("Received message: %s\\n", string(message.Value))
session.MarkMessage(message, "")
}
return nil
}