自定义消费

最近更新时间:2025-09-29 16:46:02

我的收藏

前提条件

1. 开通日志服务,创建 日志集日志主题,并成功采集到日志数据。
2. 子账号/协作者需要主账号授权,授权步骤参见 基于 CAM 管理权限,复制授权策略参见 CLS 访问策略模板

消费组消费流程

使用消费组消费数据时,服务端会管理消费组里的所有消费者的消费任务,根据主题分区和消费者的数量关系自动调整消费任务的均衡性,同时会记录每个主题分区的消费进度,保证不同消费者可以无重复消费数据。消费组消费的具体流程如下:
1. 创建消费组。
2. 每个消费者定期向服务端发送心跳。
3. 消费组根据主题分区负载情况自动分配主题分区给消费者。
4. 消费者根据所分配的分区列表,获取分区 offset 并消费数据。
5. 消费者周期性地更新分区的消费进度到消费组,便于下次消费组分配任务。
6. 重复步骤2 - 步骤6,直至消费结束。

负载均衡消费原理

消费组会根据活跃消费者和主题分区的数量动态调整每个消费者的消费任务,保证消费的均衡性。同时,消费者会保存每个主题分区的消费进度,保证故障恢复后可继续消费数据,避免重复消费。

例一:主题分区发生变化

例如,某个日志主题有两个消费者,消费者 A 负责消费1、2号分区,消费者 B 负责消费3、4号分区,通过分裂操作新增主题分区5后,消费组会自动将5号分区分配给消费者 B 进行消费,如下图所示:

1561034489523



例二:消费者发生变化

例如,某个日志主题有两个消费者,消费者 A 负责消费1、2、3号分区,消费者 B 负责消费4、5、6号分区,为保证消费速度持平生成速度,新增一个消费者 C,消费组会重新进行均衡分配,将3、6号分区分配给新消费者 C 进行消费,如下图所示:

1561035193214



消费 Demo(Python)

Python Demo 完整示例请参见 continous_sample_consumer,推荐使用3.5及以上 Python 版本进行数据消费。Demo 的使用和说明如下。
1. 安装 SDK,具体可参见 tencentcloud-cls-sdk-python
pip install git+https://github.com/TencentCloud/tencentcloud-cls-sdk-python.git
2. 消费者处理消费的数据,用户的日志数据被保存到 log_group 结构体中。log_group 结构体如下:
log_group {
source //日志来源,一般为机器的 IP
filename //日志文件名
logs {
time //日志时间,unix 时间戳,微秒级别
user_defined_log_kvs //用户日志字段
}
}
下面是 SampleConsumer 方法的实现:
class SampleConsumer(ConsumerProcessorBase):
last_check_time = 0

def initialize(self, topic_id):
self.topic_id = topic_id

def process(self, log_groups, offset_tracker):
for log_group in log_groups:
for log in log_group.logs:
# 处理单行数据
item = dict()
item['filename'] = log_group.filename
item['source'] = log_group.source
item['time'] = log.time
for content in log.contents:
item[content.key] = content.value

# Subsequent data processing
# put your business logic here
print(json.dumps(item))

# offset commit
current_time = time.time()
if current_time - self.last_check_time > 3:
try:
self.last_check_time = current_time
offset_tracker.save_offset(True)
except Exception:
import traceback
traceback.print_exc()
else:
try:
offset_tracker.save_offset(False)
except Exception:
import traceback
traceback.print_exc()

return None
3. 创建消费者并启动消费者线程,该消费者会从指定的主题中消费数据。
配置参数
说明
默认值
取值范围
endpoint
请求域名API 上传日志标签页面的域名。
-
支持地域:ALL
access_key_id
用户的 Secret_id,请前往 CAM 查看。
-
-
access_key
用户的 Secret_key,请前往 CAM 查看。
-
-
region
主题所在地域,例如 ap-beijing、ap-guangzhou、ap-shanghai,详情请参见 地域和访问域名
-
支持地域:ALL
logset_id
日志集 ID,仅支持一个日志集。
-
-
topic_ids
日志主题 ID,多个主题请使用','隔开。
-
-
consumer_group_name
消费者组名称。
-
-
internal
内网:TRUE
公网:FALSE
说明:
内网/外网读流量费用请参见 产品定价
FALSE
TRUE/FALSE
consumer_name
消费者名称。同一个消费者组内,消费者名称不可重复。
-
0-9、aA-zZ、 '-'、'_'、'.'组成的字符串
heartbeat_interval
消费者心跳上报间隔,2个间隔没有上报心跳,会被认为是消费者下线。
20
0-30分钟
data_fetch_interval
消费者拉取数据间隔,不小于1秒。
2
-
offset_start_time
拉取数据的开始时间,字符串类型的 UNIX 时间戳,精度为秒,例如 "1711607794",也可以直接可配置为"begin"、"end"。
begin:日志主题生命周期内的最早数据
end:日志主题生命周期内的最新数据
"end"
"begin"/"end"/UNIX 时间戳
max_fetch_log_group_size
消费者单次拉取数据大小,默认2M,最大10M。
2097152
2M - 10M
offset_end_time
拉取数据的结束时间,支持字符串类型的 UNIX 时间戳,精度为秒,例如"1711607794"。不填写代表持续拉取
-
-
注意:
消费开始时间:如您从指定时间点(如 offset_start_time=1711607794)消费了一次,如需再次从该时间点开始消费,请修改消费组的名称。
如下 Demo 中只有一个消费者,建议您的消费者个数与日志主题分区数保持一致。
class App:
def __init__(self):
self.shutdown_flag = False
# access endpoint
self.endpoint = os.environ.get('TENCENTCLOUD_LOG_SAMPLE_ENDPOINT', '')
# region
self.region = os.environ.get('TENCENTCLOUD_LOG_SAMPLE_REGION', '')
# secret id
self.access_key_id = os.environ.get(
'TENCENTCLOUD_LOG_SAMPLE_ACCESSID', '')
# secret key
self.access_key = os.environ.get(
'TENCENTCLOUD_LOG_SAMPLE_ACCESSKEY', '')
# logset id
self.logset_id = os.environ.get(
'TENCENTCLOUD_LOG_SAMPLE_LOGSET_ID', '')
# topic ids
self.topic_ids = os.environ.get(
'TENCENTCLOUD_LOG_SAMPLE_TOPICS', '').split(',')
# consumer group name,
self.consumer_group = 'consumer-group-1'
# consumer id, 建议您的消费者个数=日志主题分区数.
self.consumer_name1 = "consumer-group-1-A"
assert self.endpoint and self.access_key_id and self.access_key and self.logset_id, ValueError("endpoint/access_id/access_key and "
"logset_id cannot be empty")
signal.signal(signal.SIGTERM, self.signal_handler)
signal.signal(signal.SIGINT, self.signal_handler)

def signal_handler(self, signum, frame):
print(f"catch signal {signum},cleanup...")
self.shutdown_flag = True

def run(self):
print("*** start to run consumer...")
self.consume()
# waiting for exit signal
while not self.shutdown_flag:
time.sleep(1)
# shutdown consumer
print("*** stopping workers")
self.consumer.shutdown()
sys.exit(0)

def consume(self):
try:
# consumer config
option1 = LogHubConfig(self.endpoint, self.access_key_id, self.access_key, self.region, self.logset_id, self.topic_ids, self.consumer_group,
self.consumer_name1, heartbeat_interval=3, data_fetch_interval=1,
offset_start_time='begin', max_fetch_log_group_size=1048576)
# init consumer
self.consumer = ConsumerWorker(
SampleConsumer, consumer_option=option1)

# start consumer
print("*** start to consume data...")
self.consumer.start()
except Exception as e:
import traceback
traceback.print_exc()
raise e