It's our wits that make us men.

Kafka难学吗

Posted on By huism

Kafka

初识Kafka

apache kafka是由Apache开发的一种发布订阅消息系统,它是一个分布式、分区和重复的日志服务。Kafka之所以越来越受到青睐,得幸于它所扮演的三大角色:

  • 消息系统:Kafka不仅提供和传统消息系统(消息中间件)同样的系统解耦、存储冗余、流量削峰、缓冲、异步、可扩展、可恢复性等功能,而且提供消息顺序性保障和回溯消费的功能。
  • 存储系统:Kafka将消息持久化到磁盘,相比内存,降低了消息丢失的风险。Kafka的消息持久化和多副本机制,让它可以作为长久的存储系统,只需设置消息保留策略即可。
  • 流式处理平台:Kafka可以为其他流式处理框架提供数据来源,并且提供完整的流式处理类库。

基本概念

一个典型的Kafka集群包括若干生产者(Producer)、若干节点(Broker)和若干消费者(Consumer),以及一个Zookeeper集群。如下图所示:

Kafka集群组成

其中Zookeeper是Kafka负责集群元数据管理,控制器的选举等操作。Producer是将消息发送到Borker,Borker负责将消息存储到磁盘,而Consumer负责从Broker订阅消息数据。

整个kafka体系结构引入是哪个术语:

  • Producer:发送消息的一方,负责创建消息,发送到Kafka中。
  • Consumer:接收消息的一方,负责连接Kafka并接收消息,进而进行相应的业务逻辑。
  • Broker:服务代理节点。对于 Kafka 而言, Broker 可以简单地看作一个独立的 Kafka 服务节点或 Kafka服务实例。大多数情况下也可以将Broker看作一台Kafka服务器,前提是这台服务器上只部署了一个Kafka实例。一个或多个Broker组成了 一个Kafka集群。
  • Topic和Partition:Kafka中的消息以主题为单位进行归类,生产者将消息发送给特定的主题(发送到Kafka集群中的每条消息都需指定一个主题),而消费者负责订阅主题进行消费消息。

主题和分区

topic是一个逻辑概念,它可以细分成很多partition,一个partition只能属于一个topic,有时候称分区为主题分区(Topic-Partition)。分区在存储层面上可以看成一个可追加得日志,消息被追加到某个分区时会分配一个特定的偏移量(offset)。offset是消息在分区上的唯一标识,Kafka用offset保障消息在分区上的顺序性,但是offset不是跨分区的,在分区间消息是无序的。

如下图所示,主题中有4个分区,消息被顺序追加到每个分区日志文件的尾部。 kafka解析主题-分区

注意

  • 同一topic下的不同partition上的数据是不同的。
  • 一条消息只存储到某个topic中的一个partition上。
  • 主题中的消息是分区内有序,分区间无序。
  • 一个主题中的分区可以分布在不同的broker上,可以提供比单个broker更强大的性能。
  • 不同的consumer会记录自己的消费offset。

如下图所示,生产者和消费者对分区内偏移量的维护。 分区内偏移量

每一条消息被发送到broker之前,会根据分区规则选择存储到哪个具体的分区。如果分区规则设定得合理,所有的消息都可以均匀地分配到不同的分区中。

如果一个主题只对应一个文件,那么这个文件所在的机器 I/O 将会成为这个主题的性能瓶颈,而分区解决了这个问题。

在创建主题的时候可以通过指定的参数来设置分区的个数,当然也可以在主题创建完成之后去修 改分区的数量,通过增加分区的数量可以实现水平扩展。

多副本(Replica)机制

Kafka 为分区引入了多副本机制, 通过增加副本数量可以提升容灾能力。同一分区的不同副本中保存的是相同的消息(在同一时刻,副本之间并非完全一样)。

  • 副本之间是 “一主多从”的关系,其中 leader副本负责处理读写请求, follower副本只负责与 leader副本的 消息同步。
  • 副本处于不同的 broker 中 ,当 leader 副本出现故障时,从 follower 副本中重新选举新的 leader副本对外提供服务。

Kafka通过多副本机制实现了故障的自动转移,当 Kafka集群中某个 broker 失效时仍然能保证服务可用。

多副本架构

如下图所示,Kafka集群中有4个broker,某个主题中有3个分区,且副本因子(即副本个数〉也为 3,如此每个分区便有1个 leader副本和 2个 follower副本 。生产者和消费者只与leader副本进行交互,而follow副本只负责消息的同步,很多时候follower副本中的消息相对leader副本而言会有一定的滞后。

多副本架构

分区中的所有副本统称为 AR ( Assigned Replicas)。所有与leader 副本保持一定程度同步的副本(包括leader副本在内)组成ISR (On-Sync Replicas),ISR 集合是 AR 集合中的一个子集。

  • 消息会先发送leader副本,然后 follower副本才能从leader副本中拉取消息进行同步,同步期间内 follower 副本相对于 leader 副本而言会有一定程度的滞后。
  • 前面所说的“一定程度 的同步”是指可忍受的滞后范围,这个范围可以通过参数进行配置。
  • 与 leader 副本同步滞后过多的副本(不包括leader副本)组成OSR(Out-of-Sync Replicas),由此可见,AR=ISR+OSR。
  • 在正常情况下,所有的follower副本都应该与leader副本保持一定程度的同步,即 AR=ISR, OSR 集合为空。
  • leader 副本负责维护和跟踪ISR 集合中所有follower副 本的滞后状态,当follower 副本落后 太多或失效时,leader副本会把它从ISR集合中剔除。
  • 如果OSR集合中有follower副本“追上’了leader副本,那么 leader副本会把它从OSR集合转移至ISR集合。
  • 默认情况下,当leader副本发生故障时,只有在ISR集合中的副本才有资格被选举为新的leader,而在OSR集合中的副本则没有任何机会(不过这个原则也可以通过修改相应的参数配置来改变)。

Kafka 的复制机制既不是完全的同步复制,也不是单纯的异步复制。

生产和消费初体验

终端初体验

创建一个名为topic_name的主题,并且该主题有4个分区,每个分区副本因子为3。

# kafka-topic.sh --zookeeper node1:2181 --create --topic toptic_name --replication-factor 3 --partition 4

展示主题topic_name的信息。

# kafka-topic.sh --zookeeper node1:2181 --describe --topic topic_name

创建消费者,并且消费主题topic_name

# kafka-console-consumer.sh --bootstrap-server node1:9092 --topic topic_name

创建生产者,并且生产消息到主题topic_name

# kafka-console-producer.sh --broker-list node1:9092 --topic topic_name

>hello world!    (输入消息,在消费者终端可以看到此输出)
>

IDE初体验

1.添加依赖

Kafka的Java相关maven依赖:

<dependency>
    <groupid>org.apache.kafka</groupid>
    <artifactid>kafka_clients</artifactid>    
    <version>2.0.0</version>
</dependency>

2.生产者

生产者要往Kafka中写入消息。

  • 首先要创建一个生产者客户端实例并设置一些配置参数。
  • 然后构建消息的ProducerRecord对象,其中必须包含所要发往的主题及消息的消息体,进而再通过生产者客户端实例将消息发出。
  • 最后可以通过close()方法来关闭生产者客户端实例并回收相应的资源。
public class Producer{
    public static void main(String[] args){
        //生产者客户端配置
        Properties prop = new Properties();
        prop.put("bootstrap-server","node1:9092");
        ....
        
        //创建生产者实例
        KafkaProducer<String,String> producer = new new KafkaProducer<>(prop);
        
        //构建消息实体
        ProducerRecord<String,String> record = new ProducerRecord<>(topic_name,"message");
        
        //发送消息
        try{
            producer.send(record);
        }catch(Exception e){
            e.printStackTrace();
        }
        
        //关闭资源
        producer.close();
    }
}

3.消费者

消费者订阅主题消费消息:

  • 首先创建一个消费者客户端实例并配置相应的参数。
  • 然后订阅主题并消费即可。
public class Consumer{
    public static void main(){
        //消费者客户端配置
        Properties prop = new Properties();
        prop.put("bootstrap-server","node1:9092");
        prop.put("group.id","group_name");
        ....
        
        //创建消费者实例
        KafkaConsumer<String,String> consumer = new new KafkaConsumer<>(prop);
        
        //订阅主题
        consumer.subscribe(Collection.singletoList(topic_name));
        
        //循环消费消息
        while(true){
            ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));
            for(ConsumerRecord<String,String> record : records){
                System.out.println(record.value());
            }
        }
    }
}

生产者

待续