背景
通过Spark SQL从Hive读取大约1M行的数据,一次性写入Kafka时,大概会丢失20%的数据。
producer
的设置只设置了acks=all
,使用的是异步的send()
。
Kafka Cluster有4个brokers。
排查
首先,注意到producer
在程序结束时没有调用flush()
,加上flush()
之后,问题依旧。
从Kafka The Definitive Guide中了解到,设置retries
参数可以令producer自动重试retriable errors,设置retries=100
,问题依旧。
重试也不能解决这个问题,那么将异常打印出来试试,在send()
中传入一个失败时打印异常的Callback
,顺便记录下失败次数:
1 | producer.send(record, new Callback { |
再次运行,在Spark Executors的stderr中可以看见:
1 | 20/03/30 17:33:43 ERROR root: failed to send message: ProducerRecord(......) |
嗯?TimeoutException
?
这里说问题是在于send()
太快,KafkaProducer
有一个buffer(参数buffer.memory
)来存放即将发送的ProducerRecord
,当这个buffer满了的时候,后续的send()
就会阻塞(或抛出异常,根据配置而定),这个阻塞的上限时间由参数request.timeout.ms
确定,超出就会抛出TimeoutException
。
由于我们是一次性写入大量数据,可能producer处理不来,就会造成这种情况。
request.timeout.ms
默认是30000,也就是30s,尝试增大为300s后,丢失数据量降低至8%左右。
至此,问题的原因就找到了。
继续优化
难道要继续增大超时时间吗?这里的本质原因是producer处理吞吐量不够。所以增加producer的数量也是一种办法。
这是一个Spark SQL程序,每个parition分配了一个Kafka producer。
通过Spark Web UI监控到,当执行这部分代码的时候,只有两个task,推测是由于DataFrame的partition数量过少。
根据这里的说法,HDFS blockSize决定了Spark SQL读取Hive的DataFrame的partition数量,Hadoop 2.x的默认blockSize是大小是128MB。
假设我们一条消息占用20bytes,一百万条也才20M,生成这部分Hive数据是顺序写入的,所以默认的parition数量只有一个。
那么,尝试一下手动增大partition数量:
1 | // before |
问题解决了,数据一条都没有丢。
总结
KafkaProducer
的异步send()
,在buffer满了的时候,也会block;- 吞吐量是一个需要考虑的指标,即使是一个简单的任务,也可能因为吞吐量不匹配而出问题;
- 多读书,Kafka The Definitive Guide书里都有;
- 或者多读文档,官方文档里也都有。
致谢
感谢@liuyuan大佬为我解惑,问啥啥都懂,太强了!