通博游戏

首页 > 正文

图解kafka - 设计原理解析

www.coscoskop.com2019-07-16
通博官方网注册

db31a904-cb4b-48cc-b522-48e0608f6097

什么是消息队列?

简单来说,消息队列是存放消息的容器。客户端可以将消息发送到消息服务器,也可以从消息服务器获取消息。

问题导读:

生产者如何发送消息?消费者如何消费消息?如何保证消息不被重复消费?如何保证消息的可靠性传输?如何保证消息的顺序性?

为什么需要消息系统?

削峰

数据库的处理能力是有限的,在峰值期,过多的请求落到后台,一旦超过系统的处理能力,可能会使系统挂掉。

b8fc525431a946378f682f6e4964b1cb

如上图所是,系统的处理能力是2K/s时,MQ处理能力是8K /秒,峰值请求5K/s时,MQ的处理能力远远大于数据库,在高峰期,请求可以先积压在MQ中,系统可以根据自身的处理能力以2K/s的的速度消费这些请求。这样等高峰期一过,请求可能只有100/s时,系统可以很快的消费掉积压在MQ中的请求。

注意,上面的请求指的是写请求,查询请求一般通过缓存解决。

解耦

如下场景,S系统与A,B,C系统紧密耦合由于需求变动,A系统修改了相关代码,S系统也需要调整一个相关的代码;过几天,C系统需要删除,S紧跟着删除相关代码;又过了几天,需要新增d系统,S系统又要添加与d相关的代码;再过几天,程序猿疯了.

e97c1866c5464595bd5794c245570ef3

XX这样,各种系统紧密耦合,不利于维护,不利于扩展。现在介绍MQ,A系统的变化,A可以修改自己的代码; C系统删除,直接取消订阅;添加D系统,订阅相关新闻。

5e798f9f700b4927807752f0a205dbce

通过这种方式,通过引入消息中间件,每个系统与MQ交互,从而避免它们之间错综复杂的呼叫关系。

卡夫卡建筑

19426a5e040e4c66a2b58caea69a2097

相关概念

经纪人

服务器包含在kafka集群中。

制片人

消息制作人。

3.消费者

消息使用者

4.消费者群体

每个消费者属于一个消费者组,每个消息只能由消费者组中的一个消费者使用,但可以由多个消费者组使用。

5.主题

消息的类别。每条消息都属于一个主题,不同的主题彼此独立,即kafka是面向主题的。

6.分区

每个主题分为多个分区,分区是由kafka分配的单元。 kafka的物理概念等同于目录,目录中的日志文件构成此分区。

7.复制品

partition的副本,保障 partition 的高可用。

8. leader

replica 中的一个角色, producer 和 consumer 只跟 leader 交互。

9. follower

replica 中的一个角色,从 leader 中复制数据。

10. controller

kafka 集群中的其中一个服务器,用来进行 leader election 以及 各种 failover。

12. zookeeper

kafka 通过 zookeeper 来存储集群的 meta 信息。

Topic and Logs

Message是按照topic来组织的,每个topic可以分成多个partition(对应server.properties/num.partitions)。partition是一个顺序的追加日志,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障 kafka 吞吐率)。其结构如下

7575cccb-6b39-4fa4-8b06-8eb4a4dafd67

server.properties/num.partitions 表示文件 server.properties 中的 num.partitions 配置项,下同

67073e389c1f45bd9c1a3f0faaa813ee

xxxx分区中的每条记录都包含三个属性:offset,messageSize和data。 offset表示消息偏移量; messageSize表示消息的大小; data表示消息的特定内容。

分区作为文件存储在文件系统中,该位置由server.properties/log.dirs指定,其命名约定为 -

例如,主题为“page_visits”的消息被分为五个分区,其目录结构为:

a0aa1ad70b424c0d9018fb08e7e1a7bf

分区可能在不同的经纪人身上

分区是分段的,每个段都是一个段文件。段的常见配置是:

#server.properties

#segmentFile size,默认为1G

Log.segment.bytes=1024 * 1024 * 1024

#滚动以生成最长持续时间的新段文件

Log.roll.hours=24 * 7

#segment文件保留的最长持续时间,超时将被删除

Log.retention.hours=24 * 7

分区目录包含数据文件和索引文件。下图显示了分区的目录结构:

80ea61baa2264cf297ea24ae284a596a

Index uses sparse storage. It does not index every message, but creates an index every certain number of bytes to avoid excessive space occupied by index files. The disadvantage is that an offset that is not indexed cannot be located at the same time as the message. A sequential scan is required, but the scan range is small.

The index consists of two parts (both 4 byte numbers), which are relative offset and position. The relative offset indicates the offset in the segment file, and the position indicates the position of the message in the data file.

Summary: Kafka's Message storage uses partitions, disk sequential reads and writes, segmentation (LogSegment) and sparse indexing to achieve high efficiency.

Partition and Replica

A topic is physically divided into multiple partitions, located on different brokers. If there is no replica, once the broker is down, all the patitions on it will not be available.

Each partition can have multiple replicas (corresponding to server.properties/default.replication.factor), assigned to different brokers, one of which is responsible for reading and writing, processing requests from producers and consumers; others as follower from leader pull Message, keep in sync with the leader.

How to allocate partition and replica to broker

xx将所有Broker(假设共n个Broker)和待分配的Partition排序将第i个Partition分配到第(i mod n)个Broker上将第i个Partition的第j个Replica分配到第((i + j) mode n)个Broker上

根据上面的分配规则,若replica的数量大于broker的数量,必定会有两个相同的replica分配到同一个broker上,产生冗余。因此replica的数量应该小于或等于broker的数量。

leader选举

kafka 在 zookeeper 中(/brokers/topics/[topic]/partitions/[partition]/state)动态维护了一个 ISR(in-sync replicas),ISR 里面的所有 replica 都'跟上'了 leader,controller将会从ISR里选一个做leader。具体流程如下:

1. controller 在 zookeeper 的 /brokers/ids/[brokerId]节点注册 Watcher,当 broker 宕机时 zookeeper 会 fire watch

2. controller 从 /brokers/ids 节点读取可用broker

3. controller决定set_p,该集合包含宕机 broker 上的所有 partition

4. 对 set_p 中的每一个 partition

xxxx4.1从/brokers/topics/[topic]/partitions/[partition]/state node

中读取ISR

4.2决定新的领导者

4.3将新的leader,ISR,controller_epoch和leader_epoch等信息写入状态节点

5.通过RPC

将leaderAndISRRequest命令发送到相关代理

当ISR为空时,选择副本(不一定是ISR成员)作为领导者;当所有副本都关闭时,任何副本都会被复活并用作领导者。

ISR中的跟随者(同步列表)都“跟随”领导者,而“跟随”并不意味着完全相同。它由server.properties/replica.lag.time.max.ms配置,指示领导者正在等待跟随者同步消息。最长时间,如果超时,领导者将从ISR中删除关注者。

配置项replica.lag.max.messages已被删除

副本同步

Kafka通过“拉模式”同步消息,也就是说,跟随者从领导者中提取数据以进行同步。具体可靠性由生产者确定(根据配置项producer.properties/acks)。

在Kafka 0.9中,request.required.acks=-1,生产者的配置由acks=all替换,但是这个旧的配置已经在docs中了。 (在0.9版本中,生产者配置项request.required.acks=-1是Acks=all被替换,但旧配置项仍在文档中.ps:最新文档2.2.x request.required.acks不再存在)

在acksdescription0producer发送消息后,它将直接返回。它不会等待服务器确认。 1服务器在将记录写入本地日志后将返回记录,并且不会等待跟随者同步消息。领导者关闭后,一些不同步的消息可能会丢失。 -1/all服务器将记录写入本地日志,并等待所有ISR中的消息进行同步和返回。除非领导者和所有ISR被绞死,否则信息不会丢失

在acks=-1时,如果ISR小于min.insync.replicas指定的数量,则将抛出NotEnoughReplicas或NotEnoughReplicasAfterAppend异常。

Prodecer如何发送消息

Producer首先将消息封装到ProducerRecord实例中。

acf59bb8b5f04027a093132c018cd258

消息路由

If a partition is specified when sending a message, it is used directly; if a key is specified, the key is hashed and a partition is selected. This hash (the partitioning mechanism) is implemented by the class specified by producer.properties/partitioner.class. This routing class needs to implement the Partitioner interface; if not specified, the partition is selected by round-robin.

The message is not sent immediately, but is serialized and sent to the Partitioner, which is the hash function mentioned above. After the Partitioner determines the target partition, it is sent to a memory buffer (send queue). Producer's other worker thread (that is, the Sender thread) is responsible for extracting the prepared message from the buffer in real time and packaging it into a batch and sending it to the corresponding broker. The process is roughly like this:

0c811df9b8f8412ab87dd55a8cd42975

Image courtesy of 123archu

Consumer

Each Consumer is assigned to a logical Consumer Group. A partition can only be consumed by a Consumer in the same Consumer Group, but can be consumed by different Consumer Groups.

If the number of partitions for a topic is p, the number of consumers in the Consumer Group that subscribe to this topic is c; then:

p < c: will have c - p consumers idle, resulting in wastage

xxp> c:消费者对应多个分区

p=c:消费者对应于分区

应合理分配消费者和分区的数量,以避免造成资源偏差。最佳分区数是消费者数量的整数倍。

如何将分区分配给消费者

在生产过程中,代理必须分配分区,此处的使用过程也会将分区分配给使用者。类似于代理选择了一个控制器,消费者也从代理中选择一个协调器,用于分配分区。

当分区或消费者的数量发生变化时,例如添加消费者,减少消费者(主动或被动),添加分区,它将重新平衡。

过程如下:

使用者向协调器发送JoinGroupRequest请求。此时,当其他消费者发送心跳请求时,协调员会告诉他们重新平衡。其他消费者也发送JoinGroupRequest请求。协调员选择消费者中的领导者,另一个作为跟随者,通知每个消费者,并且对于领导者,它还将跟随者的元数据带给它。消费者领导者根据消费者元数据重新分配partitionconsumer以向协调器发送SyncGroupRequest,其中领导者的SyncGroupRequest将包含分配。协调器返回包并告诉消费者有关分配的信息,包括领导者。

消费者获取消息

消费者使用“拉模式”消费者消息,以便消费者可以确定消费者的行为。

消费者调用poll(duration)来从服务器提取消息。拉取消息的具体行为由以下配置项确定:

#consumer.properties

#消费者来量多多多的记录

Max.poll.records=500

#消费者poll分区返回的最大数据量

Max.partition.fetch.bytes=1048576

#Consumer最大轮询间隔

#超过此值,服务器将认为此消费者失败

#并将此消费者踢出相应的消费者群体

Max.poll.interval.ms=300000

在分区中,每条消息都有一个偏移量。新消息被写入分区末尾(最新段文件的末尾),每个分区上的消息按顺序消耗,并且未定义消息在不同分区之间消耗的顺序。

cb93571a-1de3-4df4-98ef-0ed0ad41ff70

如果使用者消耗多个分区,则每个分区之前的消耗顺序是未定义的,但它是每个分区上的顺序消耗。

如果来自不同消费者群体的多个消费者使用相同的分区,则消费者之间的消费不会相互影响,并且每个消费者都有自己的偏移量。

afa8b56f2e2a47f1ab83427e915c3369

消费者A和消费者B属于不同的消费者群体。 Cosumer A读取offset=9,Consumer B读取offset=11,该值表示下一次读取的位置。也就是说,消费者A已经读取了偏移0~8的消息,而消费者B已经读取了偏移0~10的消息。

消费者下次从offset=9读取时不一定是消费者A,因为可能会发生重新平衡

保存偏移量

当消费者使用分区时,您需要保存偏移记录以记录当前消耗位置。

可以通过提交或调用Consumer的commitSync()或commitAsync()来手动提交偏移量。相关配置是:

#你自动提交偏移量

Enable.auto.commit=真

#自动提交间隔。在enable.auto.commit=true

时有效

Auto.commit.interval.ms=5000

偏移量保存在名为__consumeroffsets的主题中。编写消息的关键包括groupid,topic和partition,value是offset。

一般情况下,每个密钥的偏移量都是缓存在内存中,查询的时候不用遍历分区,如果没有缓存,第一次就会遍历分区建立缓存,然后查询返回。

__consumeroffsets的分区数量由下面的服务器配置决定:

offsets.topic.num.partitions=50

offset保存在哪个分区上,即__consumeroffsets的分区机制,可以表示为:

groupId.hashCode()模式groupMetadataTopicPartitionCount

groupMetadataTopicPartitionCount是上面配置的分区数。

因为一个分区只能被同一个消费者群体的一个消费者消费,因此可以用groupId表示此消费者消费所有分区

消息系统可能遇到那些问题

卡夫卡支持3种消息投递语义

XX最多一次:最多一次,消息可能丢失,但数据不会被重复检索 - >提交偏移 - >业务处理至少一次:至少一次,消息不会丢失,数据可能被检索反复 - >业务处理 - &gt;提交偏移量。恰好一次:只有一次,消息不丢失,不重复,只消耗一次(实施中0.11,只在下游也是kafka)

如何确保不重复消费消息? (信息的幂等性)

自然是幂等的更新操作。

对于新操作,您可以为每条消息指定唯一ID,并确定在处理之前是否已对其进行处理。此id可以存储在Redis中,如果是写入数据库,则可以使用主键绑定。

如何确保消息的可靠传输? (缺少消息的问题)

根据kafka架构,有三个地方可能会丢失消息:Consumer,Producer和Server

消费者丢失了数据

当server.properties/enable.auto.commit设置为true时,kafka将提交offset,然后处理该消息。如果此时发生异常,则消息将丢失。

因此,您可以关闭自动提交偏移并在处理完成后手动提交偏移。这可确保消息不会丢失。但是,如果提交了偏移量,则可能出现重复消费的问题,并且保证了幂等性。

卡夫卡失去了信息

如果代理意外挂起,如果只有一个副本,则代理上的消息将丢失;如果复制品> 1,重新选择一个追随者作为新的领导者,如果追随者仍然有一些消息未同步,这部分消息就丢失了。

可以进行以下配置以避免上述问题:

将replication.factor参数设置为topic:此值必须大于1,每个分区至少需要2个副本。在Kafka服务器上设置min.insync.replicas参数:此值必须大于1,这是要求领导者至少感知至少一个跟随者与自己保持联系,而不是留下,以确保领导者挂了,有一个追随者。在生产者方面设置acks=all:这要求每个数据都被写入所有副本,然后才能被认为是成功的。在生产者端设置retries=MAX(一个非常大且非常大的值,意味着无限次重试):这是一次写入失败后无限失败的要求,并且它被困在这里。

制片人失去了信息

在生产者端设置acks=all以确保所有ISR同步并且消息被认为是成功的。

如何确保邮件的顺序?

kafka中分区上的消息是顺序的,需要按顺序消耗的消息可以发送到同一个分区并由单个消费者使用。

上面是学习kafka时总结的,如有错误或不合理的地方,欢迎指正!

需要的Java架构师方面的资料可以关注之后私信哈,回复“资料”领取免费架构视频资料,记得要点赞转发噢!

xxxx
热门浏览
热门排行榜
热门标签
日期归档