首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >flume采集kafka的时候报错,一直循环报错?

flume采集kafka的时候报错,一直循环报错?

提问于 2018-11-23 18:25:57
回答 1关注 0查看 1.8K

报错信息:

Source.java:120)] Event #: 0

2018-11-23 17:59:18,995 (PollableSourceRunner-KafkaSource-kaSource) [DEBUG - org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:119)] Waited: 965

2018-11-23 17:59:18,995 (PollableSourceRunner-KafkaSource-kaSource) [DEBUG - org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:120)] Event #: 0

2018-11-23 17:59:19,005 (PollableSourceRunner-KafkaSource-kaSource) [DEBUG - org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:119)] Waited: 975

2018-11-23 17:59:19,005 (PollableSourceRunner-KafkaSource-kaSource) [DEBUG - org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:120)] Event #: 0

2018-11-23 17:59:19,015 (PollableSourceRunner-KafkaSource-kaSource) [DEBUG - org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:119)] Waited: 985

2018-11-23 17:59:19,015 (PollableSourceRunner-KafkaSource-kaSource) [DEBUG - org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:120)] Event #: 0

2018-11-23 17:59:19,025 (PollableSourceRunner-KafkaSource-kaSource) [DEBUG - org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:119)] Waited: 995

2018-11-23 17:59:19,025 (PollableSourceRunner-KafkaSource-kaSource) [DEBUG - org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:120)] Event #: 0

2018-11-23 17:59:19,036 (PollableSourceRunner-KafkaSource-kaSource) [DEBUG - org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:119)] Waited: 1006

2018-11-23 17:59:19,036 (PollableSourceRunner-KafkaSource-kaSource) [DEBUG - org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:120)] Event #: 0

2018-11-23 17:59:19,036 (PollableSourceRunner-KafkaSource-kaSource) [ERROR - org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:153)] KafkaSource EXCEPTION, {}

java.lang.NullPointerException

at org.apache.flume.instrumentation.MonitoredCounterGroup.increment(MonitoredCounterGroup.java:261)

at org.apache.flume.instrumentation.kafka.KafkaSourceCounter.incrementKafkaEmptyCount(KafkaSourceCounter.java:49)

at org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:146)

at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:139)

at java.lang.Thread.run(Thread.java:748)

--------------------------------------------

配置文件

kafkaLogger.sources = kaSource

kafkaLogger.channels = memoryChannel

kafkaLogger.sinks = kaSink

# The channel can be defined as follows.

kafkaLogger.sources.kaSource.channels = memoryChannel

kafkaLogger.sources.kaSource.type= org.apache.flume.source.kafka.KafkaSource

kafkaLogger.sources.kaSource.zookeeperConnect=192.168.130.4:2181,192.168.130.5:2181,192.168.130.6:2181

kafkaLogger.sources.kaSource.topic=dwd-topic

kafkaLogger.sources.kaSource.groupId = 0

kafkaLogger.channels.memoryChannel.type=memory

kafkaLogger.channels.memoryChannel.capacity = 1000

kafkaLogger.channels.memoryChannel.keep-alive = 60

kafkaLogger.sinks.kaSink.type = elasticsearch

kafkaLogger.sinks.kaSink.hostNames = 192.168.130.6:9300

kafkaLogger.sinks.kaSink.indexName = flume_mq_es_d

kafkaLogger.sinks.kaSink.indexType = flume_mq_es

kafkaLogger.sinks.kaSink.clusterName = zyuc-elasticsearch

kafkaLogger.sinks.kaSink.batchSize = 100

kafkaLogger.sinks.kaSink.client = transport

kafkaLogger.sinks.kaSink.serializer = com.commons.flume.sink.elasticsearch.CommonElasticSearchIndexRequestBuilderFactory

kafkaLogger.sinks.kaSink.serializer.parse = com.commons.log.parser.LogTextParse

kafkaLogger.sinks.kaSink.serializer.formatPattern = yyyyMMdd

kafkaLogger.sinks.kaSink.serializer.dateFieldName = time

kafkaLogger.sinks.kaSink.channel = memoryChannel

flume版本1.6.0.1 kafka版本kafka_2.11-2.0.1

回答

和开发者交流更多问题细节吧,去 写回答
相关文章

相似问题

相关问答用户
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档