0%

记一个Kafka Producer丢消息问题

背景

通过Spark SQL从Hive读取大约1M行的数据,一次性写入Kafka时,大概会丢失20%的数据。

producer的设置只设置了acks=all,使用的是异步的send()

Kafka Cluster有4个brokers。

排查

首先,注意到producer在程序结束时没有调用flush(),加上flush()之后,问题依旧。

Kafka The Definitive Guide中了解到,设置retries参数可以令producer自动重试retriable errors,设置retries=100,问题依旧。

重试也不能解决这个问题,那么将异常打印出来试试,在send()中传入一个失败时打印异常的Callback,顺便记录下失败次数:

1
2
3
4
5
6
7
8
9
10
11
producer.send(record, new Callback {
override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
if (exception != null) {
failCount.incrementAndGet()
logger.error("failed to send message: {}", record)
logger.error("exception: ", exception)
} else {
successCount.incrementAndGet()
}
}
})

再次运行,在Spark Executors的stderr中可以看见:

1
2
3
20/03/30 17:33:43 ERROR root: failed to send message: ProducerRecord(......)
20/03/30 17:33:43 ERROR root: exception:
org.apache.kafka.common.errors.TimeoutException: Expiring 288 record(s) for TOPICNAME-18: 30001 ms has passed since batch creation plus linger time

嗯?TimeoutException

这里说问题是在于send()太快,KafkaProducer有一个buffer(参数buffer.memory)来存放即将发送的ProducerRecord,当这个buffer满了的时候,后续的send()就会阻塞(或抛出异常,根据配置而定),这个阻塞的上限时间由参数request.timeout.ms确定,超出就会抛出TimeoutException

由于我们是一次性写入大量数据,可能producer处理不来,就会造成这种情况。

request.timeout.ms默认是30000,也就是30s,尝试增大为300s后,丢失数据量降低至8%左右。

至此,问题的原因就找到了。

继续优化

难道要继续增大超时时间吗?这里的本质原因是producer处理吞吐量不够。所以增加producer的数量也是一种办法。

这是一个Spark SQL程序,每个parition分配了一个Kafka producer。

通过Spark Web UI监控到,当执行这部分代码的时候,只有两个task,推测是由于DataFrame的partition数量过少。

根据这里的说法,HDFS blockSize决定了Spark SQL读取Hive的DataFrame的partition数量,Hadoop 2.x的默认blockSize是大小是128MB。

假设我们一条消息占用20bytes,一百万条也才20M,生成这部分Hive数据是顺序写入的,所以默认的parition数量只有一个。

那么,尝试一下手动增大partition数量:

1
2
3
4
5
// before
df.mapParitions(...)
// after
df.repartition(50) // number of paritions
.mapParitions(...)

问题解决了,数据一条都没有丢。

总结

  1. KafkaProducer的异步send(),在buffer满了的时候,也会block;
  2. 吞吐量是一个需要考虑的指标,即使是一个简单的任务,也可能因为吞吐量不匹配而出问题;
  3. 多读书,Kafka The Definitive Guide书里都有;
  4. 或者多读文档,官方文档里也都有。

致谢

感谢@liuyuan大佬为我解惑,问啥啥都懂,太强了!

References

  1. Kafka The Definitive Guide
  2. Kafka Producer error Expiring 10 record(s) for TOPIC:XXXXXX: 6686 ms has passed since batch creation plus linger time - Stack Overflow
  3. How does Spark SQL decide the number of partitions it will use when loading data from a Hive table? - Stack Overflow