大数据-Kafka-Kafka相关参数配置

1.broker相关配置

  • broker.id
    broker在kafka集群中的唯一标识,必须是一个大于等于0的整数,如果不写的话默认从1001开始。
    建议:把它设置成与机器名具有相关性的整数。

  • port
    设置kafka的端口号,默认情况下是9092,不建议修改成1024以下的端口,因为需要使用root权限启动。

  • zookeeper.connect
    设置zookeeper集群,该配置参数是一组用逗号隔开的host:port/path列表

  • host是zookeeper服务器的机器名或ip地址

  • port是zookeeper服务器的端口

  • /path是可选的zookeeper路径,作为kafka集群的chroot环境,默认是根路径,如果指定的chroot路径不存在,kafka会在启动的时候创建它。使用chroot使得zookeeper集群可以共享给其他应用程序时而不会产生冲突。

  • log.dirs
    配置kafka日志片段的目录位置,多个路径以逗号隔开。如果配置了多个路径,kafka会根据最少使用原则,把统一分区的日志保存到同一路径下,注意的是,kafka会往拥有最少数量分区的路径新增分区,而不是往拥有最小磁盘空间的路径新增分区。

  • auto.create.topics.enable
    配置是否开启自动创建topic,如果设置为true, kafka会在以下几种场景自动创建topic:

  • 当一个producer开始往topic写入消息时。

  • 当一个consumer开始从topic消费消息时。

  • 当一个client向topic发送元数据请求时。

  • num.partitions
    配置创建主题时包含多少个分区,默认值为1,因为我们能增加主题的分区数,但是不能减少分区的个数,所以,如果要让一个主题的分区个数少于num.partitions需要手动创建该主题而不是通过自动创建主题。

  • log.retention.hours
    配置kafka保留数据的时间,默认为168小时也就是7天,效果等同log.retention.minutes和log.retention.ms,只是单位不一样,分别是小时,分钟,和毫秒,推荐使用log.retention.ms,粒度更加细,如果三个参数都配置了则去数值最小的配置。

  • log.retention.bytes
    配置一个分区能保存最大的字节数,如果超出的部分就会被删除,同时配置了log.retention.hours/log.retention.minutes/log.retention.ms的话,任一个满足条件都会触发数据删除。

  • message.max.bytes
    配置消息的大小限制,默认为100000,也就是1M,这里的大小是指在kafka压缩后的大小,也就是说实际消息可以大于1M,如果消息超过这个限制,则会被kafka拒收。


2.producer相关配置

  • bootstrap.servers
    配置broker的地址,格式为host:port,如果多个则以逗号隔开,不需要配置所有的broker,producer会从给定的broker查询其他broker的信息,不过建议至少填写两个,以防在一个宕机的情况还能从另外一个去获取broker的信息。

  • acks
    acks指定了必须要有多少个分区副本收到消息,producer才会认为消息写入是成功的。

  • acks=0 producer发送消息不等待任何来自服务器的响应,所以会出现消息丢失而producer不感知的情况,该模式下可获取最大的吞吐量。

  • acks=1 只要集群的leader节点收到消息,producer就会收到一个服务器的成功响应,如果消息无法达到leader节点,那么producer就会获取到一个错误响应,这时候为了避免消息的丢失,producer可以选择重发,不过如果一个没有收到消息的节点成为新的leader,那么消息还是会丢失。

  • acks=all 只有当leader节点和follower节点都收到消息时,producer才会收到成功的响应,这是一个避免消息丢失最安全的做法,不过这种模式吞吐量最低

  • client.id
    可以是任意字符串,标识消息的来源

  • max.in.flight.requests.per.connection
    配置producer在收到服务器响应前可以发送的消息个数,值越高,吞吐量就会越高,不过相应的占用的内存也会越多,设置为1可以保证消息可以按照发送的顺序写入服务,即便发生了重试。

  • max.request.size
    配置producer单次发送的所有消息的总的大小限制,例如设置为1M,单个消息大小为1K,那么单次可以发的上限是1000个,最好跟message.max.bytes配置匹配,避免发送到broker的消息被拒绝。

  • retries
    该参数决定了producer可以重发消息的次数,producer从broker收到的错误可能是临时性的,例如分区找不到首领,这种情况下,producer在进行retries次重试后就会放弃重试并且返回错误,默认情况下,重试的时间间隔为100ms,可以通过retry.backoff.ms参数配置,建议在设置重试间隔之前最好测试一下恢复一个崩溃的节点要多长时间,重试的间隔最好比恢复时间要长。

  • batch.size
    当多个消息往同一个分区发送时,producer会把这些消息放到同一个分区,该参数指定了一个批次可以使用的内存大小,按字节数计算,当批次填满时消息就会被发送出去,不过producer不一定等批次被填满才会发送,甚至只有一个消息也会被发送,所以就算把该值设置得很大也不会造成延迟,只不过会占用内存,但是如果设置太小的话,producer会很频繁的发送,增加一些额外的开销。

  • linger.ms
    指定producer发送批次之前要等待更多消息加入批次的时间,producer会在批次填满或者longer.ms到达上限时把批次发送出去,默认情况下,只要有可用的线程,就算批次只有一个消息,producer也会把消息发送出去。把linger.ms设置成比0大的数,让producer在发送批次之前多等待一会,可以使得更多的消息可以加入到批次,虽然增加了延迟,但是同时也增加了吞吐量。


3.Consumer相关配置

  • fetch.min.bytes
    配置Consumer从broker获取记录的最小字节数,broker在收到Consumer的数据请求时,如果可用的数据量小于该配置,那么broker会等到有足够的可用数据时才把它返回给Consumer。

  • fetch.max.wait.ms
    配置broker的等待时间,默认为500ms,如果没有足够的数据流入,导致不满足fetch.mis.bytes,最终会导致500ms的延迟。如果fetch.mis.bytes配置为1M,fetch.max.wait.ms配置为500ms,那么最终broker要么返回1M的数据,要么等待500ms后返回所有可用的数据,取决于哪个条件先得到满足。

  • max.partition.fetch.bytes
    配置broker从每个分区返回给Consumer的最大字节数,默认为1MB,也就是说,KafkaConsumer.poll()方法从每个分区里面返回的记录最多不超过该值,加入有10个分区5个消费者,则每个消费者需要2MB的内存来接收消息,需要注意的是,如果该值设置过大,导致消费者处理的业务的时间过长,会有回话超时的风险。

  • session.timeout.ms
    配置了Consumer被认定为死亡前可以与服务器断开连接的时间,默认3秒,如果服务器在超过该值时间没有收到Consumer的心跳,就会认定Consumer死亡,会触发再均衡,把死亡Consumer的分区分配给其他Consumer,这个配置要跟heartbeat.interval.ms配合使用,heartbeat.interval.ms设置了poll()方法向协调器发送心跳的频率。建议heartbeat.interval.ms的值为session.timeout.ms的三分之一。

  • enable.auto.commit
    指定了Consumer是否开启自动提交偏移量,默认为true。可以把它设置为false,由程序自己控制何时提交偏移量来避免出现重复数据和数据丢失的情况。

  • auto.offset.reset
    指定在Consumer读取一个没有偏移量的分区或者偏移量无效的情况下的策略(因为消费者长时间失效,包含偏移量的记录已经过时并删除),默认为latest,表示会从最新的记录开始读取(在消费者启动之后生成的记录,会出现漏消费历史记录的情况),另外一个配置是earliest,表示从最早的消息开始消费(会出现重复消费的情况)

  • partition.assignment.strategy
    分区分配给Consumer的策略,有两种:

  • Range
    把topic的若干个连续的分区分配给Consumer,假设有Consumer1和Consumer2,分别订阅了topic1和topic2,每个topic都有3个分区,那么Consumer1可能分配到topic1和topic2的分区0和分区1,Consumer2分配到topic1和topic2的分区2,这种策略会导致当分区数量(针对单个topic,上面例子是3个分区)无法被消费者数量(上面例子是2个消费者)整除时,就会出现分区分布不均匀的情况。
    img

  • RoundRobin
    该策略会把所有的分区逐个分配给Consumer,还是上面的例子,如果按这种策略分配那么Consumer1最终分到的是topic1的分区0,分区2和topic2的分区1,Consumer2最终分到的是topic1的分区1和topic2的分区0和分区2。
    img

默认使用org.apache.kafka.clients.consumer.RangeAssignor,这个类实现了Range策略,RoundRabin的实现类为org.apache.kafka.clients.consumer.RoundRobinAssignor,我们还可以自定义策略。

  • max.poll.records
    指定单次调用poll()方法返回的记录数量。
THE END