Kafka
初识Kafka
apache kafka是由Apache开发的一种发布订阅消息系统,它是一个分布式、分区和重复的日志服务。Kafka之所以越来越受到青睐,得幸于它所扮演的三大角色:
- 消息系统:Kafka不仅提供和传统消息系统(消息中间件)同样的系统解耦、存储冗余、流量削峰、缓冲、异步、可扩展、可恢复性等功能,而且提供消息顺序性保障和回溯消费的功能。
- 存储系统:Kafka将消息持久化到磁盘,相比内存,降低了消息丢失的风险。Kafka的消息持久化和多副本机制,让它可以作为长久的存储系统,只需设置消息保留策略即可。
- 流式处理平台:Kafka可以为其他流式处理框架提供数据来源,并且提供完整的流式处理类库。
基本概念
一个典型的Kafka集群包括若干生产者(Producer)、若干节点(Broker)和若干消费者(Consumer),以及一个Zookeeper集群。如下图所示:

其中Zookeeper是Kafka负责集群元数据管理,控制器的选举等操作。Producer是将消息发送到Borker,Borker负责将消息存储到磁盘,而Consumer负责从Broker订阅消息数据。
整个kafka体系结构引入是哪个术语:
- Producer:发送消息的一方,负责创建消息,发送到Kafka中。
- Consumer:接收消息的一方,负责连接Kafka并接收消息,进而进行相应的业务逻辑。
- Broker:服务代理节点。对于 Kafka 而言, Broker 可以简单地看作一个独立的 Kafka 服务节点或 Kafka服务实例。大多数情况下也可以将Broker看作一台Kafka服务器,前提是这台服务器上只部署了一个Kafka实例。一个或多个Broker组成了 一个Kafka集群。
- Topic和Partition:Kafka中的消息以主题为单位进行归类,生产者将消息发送给特定的主题(发送到Kafka集群中的每条消息都需指定一个主题),而消费者负责订阅主题进行消费消息。
主题和分区
topic是一个逻辑概念,它可以细分成很多partition,一个partition只能属于一个topic,有时候称分区为主题分区(Topic-Partition)。分区在存储层面上可以看成一个可追加得日志,消息被追加到某个分区时会分配一个特定的偏移量(offset)。offset是消息在分区上的唯一标识,Kafka用offset保障消息在分区上的顺序性,但是offset不是跨分区的,在分区间消息是无序的。
如下图所示,主题中有4个分区,消息被顺序追加到每个分区日志文件的尾部。

注意
- 同一topic下的不同partition上的数据是不同的。
- 一条消息只存储到某个topic中的一个partition上。
- 主题中的消息是分区内有序,分区间无序。
- 一个主题中的分区可以分布在不同的broker上,可以提供比单个broker更强大的性能。
- 不同的consumer会记录自己的消费offset。
如下图所示,生产者和消费者对分区内偏移量的维护。

每一条消息被发送到broker之前,会根据分区规则选择存储到哪个具体的分区。如果分区规则设定得合理,所有的消息都可以均匀地分配到不同的分区中。
如果一个主题只对应一个文件,那么这个文件所在的机器 I/O 将会成为这个主题的性能瓶颈,而分区解决了这个问题。
在创建主题的时候可以通过指定的参数来设置分区的个数,当然也可以在主题创建完成之后去修 改分区的数量,通过增加分区的数量可以实现水平扩展。
多副本(Replica)机制
Kafka 为分区引入了多副本机制, 通过增加副本数量可以提升容灾能力。同一分区的不同副本中保存的是相同的消息(在同一时刻,副本之间并非完全一样)。
- 副本之间是 “一主多从”的关系,其中 leader副本负责处理读写请求, follower副本只负责与 leader副本的 消息同步。
- 副本处于不同的 broker 中 ,当 leader 副本出现故障时,从 follower 副本中重新选举新的 leader副本对外提供服务。
Kafka通过多副本机制实现了故障的自动转移,当 Kafka集群中某个 broker 失效时仍然能保证服务可用。
多副本架构
如下图所示,Kafka集群中有4个broker,某个主题中有3个分区,且副本因子(即副本个数〉也为 3,如此每个分区便有1个 leader副本和 2个 follower副本 。生产者和消费者只与leader副本进行交互,而follow副本只负责消息的同步,很多时候follower副本中的消息相对leader副本而言会有一定的滞后。

分区中的所有副本统称为 AR ( Assigned Replicas)。所有与leader 副本保持一定程度同步的副本(包括leader副本在内)组成ISR (On-Sync Replicas),ISR 集合是 AR 集合中的一个子集。
- 消息会先发送leader副本,然后 follower副本才能从leader副本中拉取消息进行同步,同步期间内 follower 副本相对于 leader 副本而言会有一定程度的滞后。
- 前面所说的“一定程度 的同步”是指可忍受的滞后范围,这个范围可以通过参数进行配置。
- 与 leader 副本同步滞后过多的副本(不包括leader副本)组成OSR(Out-of-Sync Replicas),由此可见,AR=ISR+OSR。
- 在正常情况下,所有的follower副本都应该与leader副本保持一定程度的同步,即 AR=ISR, OSR 集合为空。
- leader 副本负责维护和跟踪ISR 集合中所有follower副 本的滞后状态,当follower 副本落后 太多或失效时,leader副本会把它从ISR集合中剔除。
- 如果OSR集合中有follower副本“追上’了leader副本,那么 leader副本会把它从OSR集合转移至ISR集合。
- 默认情况下,当leader副本发生故障时,只有在ISR集合中的副本才有资格被选举为新的leader,而在OSR集合中的副本则没有任何机会(不过这个原则也可以通过修改相应的参数配置来改变)。
Kafka 的复制机制既不是完全的同步复制,也不是单纯的异步复制。
生产和消费初体验
终端初体验
创建一个名为topic_name的主题,并且该主题有4个分区,每个分区副本因子为3。
# kafka-topic.sh --zookeeper node1:2181 --create --topic toptic_name --replication-factor 3 --partition 4
展示主题topic_name的信息。
# kafka-topic.sh --zookeeper node1:2181 --describe --topic topic_name
创建消费者,并且消费主题topic_name
# kafka-console-consumer.sh --bootstrap-server node1:9092 --topic topic_name
创建生产者,并且生产消息到主题topic_name
# kafka-console-producer.sh --broker-list node1:9092 --topic topic_name
>hello world! (输入消息,在消费者终端可以看到此输出)
>
IDE初体验
1.添加依赖
Kafka的Java相关maven依赖:
<dependency>
<groupid>org.apache.kafka</groupid>
<artifactid>kafka_clients</artifactid>
<version>2.0.0</version>
</dependency>
2.生产者
生产者要往Kafka中写入消息。
- 首先要创建一个生产者客户端实例并设置一些配置参数。
- 然后构建消息的ProducerRecord对象,其中必须包含所要发往的主题及消息的消息体,进而再通过生产者客户端实例将消息发出。
- 最后可以通过close()方法来关闭生产者客户端实例并回收相应的资源。
public class Producer{
public static void main(String[] args){
//生产者客户端配置
Properties prop = new Properties();
prop.put("bootstrap-server","node1:9092");
....
//创建生产者实例
KafkaProducer<String,String> producer = new new KafkaProducer<>(prop);
//构建消息实体
ProducerRecord<String,String> record = new ProducerRecord<>(topic_name,"message");
//发送消息
try{
producer.send(record);
}catch(Exception e){
e.printStackTrace();
}
//关闭资源
producer.close();
}
}
3.消费者
消费者订阅主题消费消息:
- 首先创建一个消费者客户端实例并配置相应的参数。
- 然后订阅主题并消费即可。
public class Consumer{
public static void main(){
//消费者客户端配置
Properties prop = new Properties();
prop.put("bootstrap-server","node1:9092");
prop.put("group.id","group_name");
....
//创建消费者实例
KafkaConsumer<String,String> consumer = new new KafkaConsumer<>(prop);
//订阅主题
consumer.subscribe(Collection.singletoList(topic_name));
//循环消费消息
while(true){
ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));
for(ConsumerRecord<String,String> record : records){
System.out.println(record.value());
}
}
}
}
生产者
待续