2024年9月kafka原理分析?大数据Kafka是什么呢

 更新时间:2024-09-21 09:43:15

  ⑴kafka原理分析?大数据Kafka是什么呢

  ⑵kafka会先分配所有topic的partitionleader副本,consumer不能消费kafka消息,这样用户可以根据集群实际的状态和各partition的流量情况分配副本kafka集群controller的一项功能是在partition的副本中选择一个副本作为leader副本,kafka集群会自动调整和重新分配consumer消费的partition,会使用请求中topicpartition的offset查一个skiplist表(该表的节点key值是该partition每个LogSegment中第一条消息的offset值确定消息所属的LogSegment,会增加consumer和kafkabroker的消息处理负载,kafka提供了多种offset提交方式partitionoffset提交和管理对kafka消息系统效率来说非常关键,controller需要为在该broker上的有leader副本的所有partition重新选择一个leader。

  ⑶作为一款典型的消息中间件产品,kafka系统仍然由producer、broker、consumer三部分组成。kafka涉及的几个常用概念和组件简单介绍如下:当consumergroup的状态发生变化(如有consumer故障、增减consumer成员等或consumergroup消费的topic状态发生变化(如增加了partition,消费的topic发生变化,kafka集群会自动调整和重新分配consumer消费的partition,这个过程就叫做rebalance(再平衡。__consumer_offsets是kafka集群自己维护的一个特殊的topic,它里面存储的是每个consumergroup已经消费了每个topicpartition的offset。__consumer_offsets中offset消息的key由groupid,topiame,partitionid组成,格式为{topiame}-${partitionid},value值就是consumer提交的已消费的topicpartitionoffset值。__consumer_offsets的分区数和副本数分别由offsets.topic.num.partitions(默认值为和offsets.topic.replication.factor(默认值为参数配置。我们通过公式hash(groupid)%offsets.topic.num.partitions就可以计算出指定consumergroup的已提交offset存储的partition。由于consumergroup提交的offset消息只有最后一条消息有意义,所以__consumer_offsets是一个pacttopic,kafka集群会周期性的对__consumer_offsets执行pact操作,只保留最新的一次提交offset。groupcoordinator运行在kafka某个broker上,负责consumergroup内所有的consumer成员管理、所有的消费的topic的partition的消费关系分配、offset管理、触发rebalance等功能。groupcoordinator管理partition分配时,会指定consumergroup内某个consumer作为groupleader执行具体的partition分配任务。存储某个consumergroup已提交offset的__consumer_offsetspartitionleader副本所在的broker就是该consumergroup的协调器运行的broker。跟大多数分布式系统一样,集群有一个master角色管理整个集群,协调集群中各个成员的行为。kafka集群中的controller就相当于其它分布式系统的master,用来负责集群topic的分区分配,分区leader选举以及维护集群的所有partition的ISR等集群协调功能。集群中哪个borker是controller也是通过一致性协议选举产生的,.版本之前通过zookeeper进行选主,.版本后通过kafkaraft协议进行选举。如果controller崩溃,集群会重新选举一个broker作为新的controller,并增加controllerepoch值(相当于zookeeperZAB协议的epoch,raft协议的term值当kafka集群新建了topic或为一个topic新增了partition,controller需要为这些新增加的partition分配到具体的broker上,并把分配结果记录下来,供producer和consumer查询获取。因为只有partition的leader副本才会处理producer和consumer的读写请求,而partition的其他follower副本需要从相应的leader副本同步消息,为了尽量保证集群中所有broker的负载是均衡的,controller在进行集群全局partition副本分配时需要使partition的分布情况是如下这样的:在默认情况下,kafka采用轮询(round-robin的方式分配partition副本。由于partitionleader副本承担的流量比follower副本大,kafka会先分配所有topic的partitionleader副本,使所有partitionleader副本全局尽量平衡,然后再分配各个partition的follower副本。partition第一个follower副本的位置是相应leader副本的下一个可用broker,后面的副本位置依此类推。举例来说,假设我们有两个topic,每个topic有两个partition,每个partition有两个副本,这些副本分别标记为--,--,--,--,--,--,--,--(编码格式为topic-partition-replia,编号均从开始,第一个replica是leaderreplica,其他的是followerreplica。共有四个broker,编号是-。我们先对broker按brokerid进行排序,然后分配leader副本,最后分配foller副本。没有配置broker.rack的情况现将副本--分配到broker,然后--分配到broker,依此类推,--会分配到broker。partition-的leader副本分配在broker上,那么下一个可用节点是broker,所以将副本--分配到broker上。同理,partition-的leader副本分配在broker上,那么下一个可用节点是broker,所以将副本--分配到broker上。依此类推分配其他的副本分片。最后分配的结果如下图所示:配置了broker.rack的情况假设配置了两个rack,broker和broker属于Rack,broker和broker属于Rack。我们对rack和rack内的broker分别排序。然后先将副本--分配到Rack的broker,然后将副本--分配到下一个Rack的第一个broker,即Rack的broker。其他的parttitionleader副本依此类推。然后分配follower副本,partition-的leader副本--分配在Rack的broker上,下一个可用的broker是Rack的broker,所以分配到broker上,其他依此类推。最后分配的结果如下图所示:kafka除了按照集群情况自动分配副本,也提供了reassign工具人工分配和迁移副本到指定broker,这样用户可以根据集群实际的状态和各partition的流量情况分配副本kafka集群controller的一项功能是在partition的副本中选择一个副本作为leader副本。在topic的partition创建时,controller首先分配的副本就是leader副本,这个副本又叫做preferenceleader副本。当leader副本所在broker失效时(宕机或网络分区等,controller需要为在该broker上的有leader副本的所有partition重新选择一个leader,选择方法就是在该partition的ISR中选择第一个副本作为新的leader副本。但是,如果ISR成员只有一个,就是失效的leader自身,其余的副本都落后于leader怎么办?kafka提供了一个unclean.leader.election配置参数,它的默认值为true。当unclean.leader.election值为true时,controller还是会在非ISR副本中选择一个作为leader,但是这时候使用者需要承担数据丢失和数据不一致的风险。当unclean.leader.election值为false时,则不会选择新的leader,该partition处于不可用状态,只能恢复失效的leader使partition重新变为可用。当preferenceleader失效后,controller重新选择一个新的leader,但是preferenceleader又恢复了,而且同步上了新的leader,是ISR的成员,这时候preferenceleader仍然会成为实际的leader,原先的新leader变为follower。因为在partitionleader初始分配时,使按照集群副本均衡规则进行分配的,这样做可以让集群尽量保持平衡。为了保证topic的高可用,topic的partition往往有多个副本,所有的follower副本像普通的consumer一样不断地从相应的leader副本pull消息。每个partition的leader副本会维护一个ISR列表存储到集群信息库里,follower副本成为ISR成员或者说与leader是同步的,需要满足以下条件:follower副本处于活跃状态,与zookeeper(.之前版本或kafkaraftmaster之间的心跳正常follower副本最近replica.lag.time.max.ms(默认是秒时间内从leader同步过最新消息。需要注意的是,一定要拉取到最新消息,如果最近replica.lag.time.max.ms时间内拉取过消息,但不是最新的,比如落后follower在追赶leader过程中,也不会成为ISR。follower在同步leader过程中,follower和leader都会维护几个参数,来表示他们之间的同步情况。leader和follower都会为自己的消息队列维护LEO(LastEndOffset和HW(HighWatermark。leader还会为每一个follower维护一个LEO。LEO表示leader或follower队列写入的最后一条消息的offset。HW表示的offset对应的消息写入了所有的ISR。当leader发现所有follower的LEO的最小值大于HW时,则会增加HW值到这个最小值LEO。follower拉取leader的消息时,同时能获取到leader维护的HW值,如果follower发现自己维护的HW值小于leader发送过来的HW值,也会增加本地的HW值到leader的HW值。这样我们可以得到一个不等式:followerHW《=leaderHW《=followerLEO《=leaderLEO。HW对应的log又叫做mittedlog,consumer消费partititon的消息时,只能消费到offset值小于或等于HW值的消息的,由于这个原因,kafka系统又称为分布式mittedlog消息系统。kafka的消息内容存储在log.dirs参数配置的目录下。kafka每个partition的数据存放在本地磁盘log.dirs目录下的一个单独的目录下,目录命名规范为${topiame}-${partitionId},每个partition由多个LogSegment组成,每个LogSegment由一个数据文件(命名规范为:{baseOffset}.index和一个时间戳索引文件(命名规范为:${baseOffset}.timeindex组成,文件名的baseOffset就是相应LogSegment中第一条消息的offset。.index文件存储的是消息的offset到该消息在相应.log文件中的偏移,便于快速在.log文件中快速找到指定offset的消息。.index是一个稀疏索引,每隔一定间隔大小的offset才会建立相应的索引(比如每间隔条消息建立一个索引)。.timeindex也是一个稀疏索引文件,这样可以根据消息的时间找到对应的消息。可以考虑将消息日志存放到多个磁盘中,这样多个磁盘可以并发访问,增加消息读写的吞吐量。这种情况下,log.dirs配置的是一个目录列表,kafka会根据每个目录下partition的数量,将新分配的partition放到partition数最少的目录下。如果我们新增了一个磁盘,你会发现新分配的partition都出现在新增的磁盘上。kafka提供了两个参数log.segment.bytes和log.segment.ms来控制LogSegment文件的大小。log.segment.bytes默认值是GB,当LogSegment大小达到log.segment.bytes规定的阈值时,kafka会关闭当前LogSegment,生成一个新的LogSegment供消息写入,当前供消息写入的LogSegment称为活跃(ActiveLogSegment。log.segment.ms表示最大多长时间会生成一个新的LogSegment,log.segment.ms没有默认值。当这两个参数都配置了值,kafka看哪个阈值先达到,触发生成新的LogSegment。kafka还提供了log.retention.ms和log.retention.bytes两个参数来控制消息的保留时间。当消息的时间超过了log.retention.ms配置的阈值(默认是小时,也就是一周,则会被认为是过期的,会被kafka自动删除。或者是partition的总的消息大小超过了log.retention.bytes配置的阈值时,最老的消息也会被kafka自动删除,使相应partition保留的总消息大小维持在log.retention.bytes阈值以下。这个地方需要注意的是,kafka并不是以消息为粒度进行删除的,而是以LogSegment为粒度删除的。也就是说,只有当一个LogSegment的最后一条消息的时间超过log.retention.ms阈值时,该LogSegment才会被删除。这两个参数都配置了值时,也是只要有一个先达到阈值,就会执行相应的删除策略当我们使用KafkaProducer向kafka发送消息时非常简单,只要构造一个包含消息key、value、接收topic信息的ProducerRecord对象就可以通过KafkaProducer的send()向kafka发送消息了,而且是线程安全的。KafkaProducer支持通过三种消息发送方式KafkaProducer客户端虽然使用简单,但是一条消息从客户端到topicpartition的日志文件,中间需要经历许多的处理过程。KafkaProducer的内部结构如下所示:从图中可以看出,消息的发送涉及两类线程,一类是调用KafkaProducer.send()方法的应用程序线程,因为KafkaProducer.send()是多线程安全的,所以这样的线程可以有多个;另一类是与kafka集群通信,实际将消息发送给kafka集群的Sender线程,当我们创建一个KafkaProducer实例时,会创建一个Sender线程,通过该KafkaProducer实例发送的所有消息最终通过该Sender线程发送出去。RecordAumulator则是一个消息队列,是应用程序线程与Sender线程之间消息传递的桥梁。当我们调用KafkaProducer.send()方法时,消息并没有直接发送出去,只是写入了RecordAumulator中相应的队列中,最终需要Sender线程在适当的时机将消息从RecordAumulator队列取出来发送给kafka集群。消息的发送过程如下:在使用KafkaConsumer实例消费kafka消息时,有一个特性我们要特别注意,就是KafkaConsumer不是多线程安全的,KafkaConsumer方法都在调用KafkaConsumer的应用程序线程中运行(除了consumer向kafka集群发送的心跳,心跳在一个专门的单独线程中发送,所以我们调用KafkaConsumer的所有方法均需要保证在同一个线程中调用,除了KafkaConsumer.wakeup()方法,它设计用来通过其它线程向consumer线程发送信号,从而终止consumer执行。跟producer一样,consumer要与kafka集群通信,消费kafka消息,首先需要获取消费的topicpartitionleaderreplica所在的broker地址等信息,这些信息可以通过向kafka集群任意broker发送Metadata请求消息获取。我们知道,一个consumergroup有多个consumer,一个topic有多个partition,而且topic的partition在同一时刻只能被consumergroup内的一个consumer消费,那么consumer在消费partition消息前需要先确定消费topic的哪个partition。partition的分配通过groupcoordinator来实现。基本过程如下:我们可以通过实现接口.apache.kafka.clients.consumer.internals.PartitionAssignor自定义partition分配策略,但是kafka已经提供了三种分配策略可以直接使用。partition分配完后,每个consumer知道了自己消费的topicpartition,通过metadata请求可以获取相应partition的leader副本所在的broker信息,然后就可以向brokerpoll消息了。但是consumer从哪个offset开始poll消息?所以consumer在第一次向broker发送FetchRequestpoll消息之前需要向GroupCoordinator发送OffsetFetchRequest获取消费消息的起始位置。GroupCoordinator会通过key{topic}-${partition}查询__consumer_offsetstopic中是否有offset的有效记录,如果存在,则将consumer所属consumergroup最近已提交的offset返回给consumer。如果没有(可能是该partition是第一次分配给该consumergroup消费,也可能是该partition长时间没有被该consumergroup消费,则根据consumer配置参数auto.offset.reset值确定consumer消费的其实offset。如果auto.offset.reset值为latest,表示从partition的末尾开始消费,如果值为earliest,则从partition的起始位置开始消费。当然,consumer也可以随时通过KafkaConsumer.seek()方法人工设置消费的起始offset。kafkabroker在收到FetchRequest请求后,会使用请求中topicpartition的offset查一个skiplist表(该表的节点key值是该partition每个LogSegment中第一条消息的offset值确定消息所属的LogSegment,然后继续查LogSegment的稀疏索引表(存储在.index文件中,确定offset对应的消息在LogSegment文件中的位置。为了提升消息消费的效率,consumer通过参数fetch.min.bytes和max.partition.fetch.bytes告诉broker每次拉取的消息总的最小值和每个partition的最大值(consumer一次会拉取多个partition的消息。当kafka中消息较少时,为了让broker及时将消息返回给consumer,consumer通过参数fetch.max.wait.ms告诉broker即使消息大小没有达到fetch.min.bytes值,在收到请求后最多等待fetch.max.wait.ms时间后,也将当前消息返回给consumer。fetch.min.bytes默认值为MB,待fetch.max.wait.ms默认值为ms。为了提升消息的传输效率,kafka采用零拷贝技术让内核通过DMA把磁盘中的消息读出来直接发送到网络上。因为kafka写入消息时将消息写入内存中就返回了,如果consumer跟上了producer的写入速度,拉取消息时不需要读磁盘,直接从内存获取消息发送出去就可以了。为了避免发生再平衡后,consumer重复拉取消息,consumer需要将已经消费完的消息的offset提交给groupcoordinator。这样发生再平衡后,consumer可以从上次已提交offset出继续拉取消息。kafka提供了多种offset提交方式partitionoffset提交和管理对kafka消息系统效率来说非常关键,它直接影响了再平衡后consumer是否会重复拉取消息以及重复拉取消息的数量。如果offset提交的比较频繁,会增加consumer和kafkabroker的消息处理负载,降低消息处理效率;如果offset提交的间隔比较大,再平衡后重复拉取的消息就会比较多。还有比较重要的一点是,kafka只是简单的记录每次提交的offset值,把最后一次提交的offset值作为最新的已提交offset值,作为再平衡后消息的起始offset,而什么时候提交offset,每次提交的offset值具体是多少,kafka几乎不关心(这个offset对应的消息应该存储在kafka中,否则是无效的offset,所以应用程序可以先提交,然后提交,再平衡后从处开始消费,决定权完全在consumer这边。kafka中的topicpartition与consumergroup中的consumer的消费关系其实是一种配对关系,当配对双方发生了变化时,kafka会进行再平衡,也就是重新确定这种配对关系,以提升系统效率、高可用性和伸缩性。当然,再平衡也会带来一些负面效果,比如在再平衡期间,consumer不能消费kafka消息,相当于这段时间内系统是不可用的。再平衡后,往往会出现消息的重复拉取和消费的现象。触发再平衡的条件包括:需要注意的是,kafka集群broker的增减或者topicpartitionleader重新选主这类集群状态的变化并不会触发在平衡有两种情况与日常应用开发比较关系比较密切:consumer在调用subscribe()方法时,支持传入一个ConsumerRebalanceListener监听器,ConsumerRebalanceListener提供了两个方法,onPartitionRevoked()方法在consumer停止消费之后,再平衡开始之前被执行。可以发现,这个地方是提交offset的好时机。onPartitonAssigned()方法则会在重新进行partition分配好了之后,但是新的consumer还未消费之前被执行。我们在提到kafka时,首先想到的是它的吞吐量非常大,这也是很多人选择kafka作为消息传输组件的重要原因。以下是保证kafka吞吐量大的一些设计考虑:但是kafka是不是总是这么快?我们同时需要看到kafka为了追求快舍弃了一些特性:所以,kafka在消息独立、允许少量消息丢失或重复、不关心消息顺序的场景下可以保证非常高的吞吐量,但是在需要考虑消息事务、严格保证消息顺序等场景下producer和consumer端需要进行复杂的考虑和处理,可能会比较大的降低kafka的吞吐量,例如对可靠性和保序要求比较高的控制类消息需要非常谨慎的权衡是否适合使用kafka。我们通过producer向kafka集群发送消息,总是期望消息能被consumer成功消费到。最不能忍的是producer收到了kafka集群消息写入的正常响应,但是consumer仍然没有消费到消息。kafka提供了一些机制来保证消息的可靠传递,但是有一些因素需要仔细权衡考虑,这些因素往往会影响kafka的吞吐量,需要在可靠性与吞吐量之间求得平衡:kafka只保证partition消息顺序,不保证topic级别的顺序,而且保证的是partition写入顺序与读取顺序一致,不是业务端到端的保序。如果对保序要求比较高,topic需要只设置一个partition。这时可以把参数max.in.flight.requests.per.connection设置为,而retries设置为大于的数。这样即使发生了可恢复型错误,仍然能保证消息顺序,但是如果发生不可恢复错误,应用层进行重试的话,就无法保序了。也可以采用同步发送的方式,但是这样也极大的降低了吞吐量。如果消息携带了表示顺序的字段,可以在接收端对消息进行重新排序以保证最终的有序。

  ⑷大数据Kafka是什么呢

  ⑸Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统,常见可以用于web/nginx日志、访问日志,消息服务等等,Linkedin于年贡献给了Apache基金会并成为顶级开源项目。

  ⑹主要应用场景是:日志收集系统和消息系统。

  ⑺Kafka主要设计目标如下:

  ⑻kafka定义二、kafka的优势三、kafka的原理四、kafka起源一、Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition、多副本的(replica,基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于年贡献给了Apache基金会并成为顶级开源项目。二、kafka的优势高吞吐量、低延迟:kafka美妙之处是可以处理几十万条信息,它的延迟最低只有几毫秒,每个topic可以分多个partition,consumergroup对partition进行consume操作。可扩展性:kafka集群支持热扩展持久化、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失容错性:允许集群中节点失败(若副本数量为n,则允许n-个节点失败高并发:支持数千个客户端同时读写三、kafka的原理kafka是如何实现以上所述这几点,我们逐一说明:.高吞吐量、低延迟kafka在设计之初就是为了针对大数据量的传输处理,高吞吐量、低延迟最主要看的就是单位时间内所能读写的数据总量,我们先来看生产端。kafka采取了一定量的批处理机制,即当生产数据达到一定数量或者达到时间窗口后,将所收集到的数据一批次的提交到服务器,我们假设处理一次数据的时间为ms,那每秒钟能处理条,延时为ms,如果此时将处理间隔变成ms,即每ms处理一批数据,假设这段时间接收到条处理,那每秒则能处理条,但是延时变成了ms。为了获得最大的吞吐量,需要牺牲一定的延迟,但是这样的牺牲是值得的。当确定了这种小批量方式之后,高速的写则取决于kafka自身写磁盘的速度了。而由于kafka本身对数据不做任何的处理,只管写入数据,保管数据,分发数据,因此会是一种批量顺序写入数据的情况,而磁盘的读写速度大量消耗在寻址上,也就是随机读写,但是对于顺序写入的速度是非常快的,甚至能媲美内存的随机写入速度。有人做过一个对比,普通磁盘顺序写入每秒能达到.M/s,SSD的顺序写入速度为.M/s,内存的顺序写入速度为.M/s。kafka正是利用了这个特性,顺序写入,速度相对较快。而kafka本身虽然也是写入磁盘持久化数据,但实际上kafka是将数据顺序写入页缓存中(pagecache,然后由操作系统自行决定何时写到磁盘上,因此kafka的写操作能在每秒轻轻松松达到写入数十万条记录。并且基于kafka的动态扩展,这个数字还能不断增大。kafka在消费端也有着高吞吐量,由于kafka是将数据写入到页缓存中,同时由于读写相间的间隔并不大,很大可能性会在缓存中命中,从而保证高吞吐量。另外kafka由于本身不对数据做任何的修改,完全使用零拷贝技术,大大提升数据的读取能力。.kafka每个节点叫做broker,而每一个broker都是独立运行的,可以随时加入kafka集群,集群的心跳管理是由zookeeper负责,新加入的broker只要brokerid不与原有的冲突就能顺利的加入集群中,实现动态扩展。.kafka的持久化在上面已经提到,kafka绕过了java的堆处理数据,直接将数据写入页缓存,然后由操作系统来管理页缓存写入磁盘,实现持久化。kafka每一个主题topic是一个业务数据,他可由多个partition组成,而每个partition可以有多个replica副本,用于保证数据的可靠性。replica分为两个角色,一个是leader,一个是追随者,同一时间,每一个partition只能有一个leader,其他都是追问随者,laeder负责接收数据并写入log,而追随者不能被用户写入数据,只是从leader角色的replica副本中同步log写入自己的log,保持数据同步。kafka中有一个概念,ISR,全称是in-syncreplica,即所有可用的replica副本,这里的ISR数量只要大于,这个partition就能正常运作,因此容错性非常好,假设n个replica,那最多可以坏n-个replica的情况下,还能保持系统正常运行。当replica迟滞到一定时间后,会被kafka从ISR中剔除,当再次同步后,可以再次加入ISR,如果这时候leader出现问题,会从ISR中重新选举一个leader,原先的leader再次同步成功后会重新加入ISR,成为一个flower。.上面提到了kafka的ISR机制,kafka的容错性就是由ISR的机制来保证的。.kafka集群可以动态扩展broker,多个partition同时写入消费数据,实现真正的高并发。四、kafka的起源kafka起源于LinkedIn公司,当时领英公司需要收集两大类数据,一是业务系统和应用程序的性能监控指标数据,而是用户的操作行为数据。当时为了收集这两类数据,领英自研了两套相应的数据收集系统,但是这两套系统都存在一些弊端,无法实现实时交互、实时性差、维护成本高。因此领英的工程师希望找到一个统一的组件来收集分发消费这些大批量的数据,ActiveMQ由于扩展性不足,不能支撑大数据量而被抛弃,从而决定自研一套满足需求的系统组件,也就是kafka。kafka的设计之初主要有三个目标:.为生产者和消费者提供一套简单的API.降低网络传输和磁盘存储开销.具有高伸缩性架构目前kafka可以算是超额完成了目标。kafka的名称由来也很有意思,因为kafka系统的写操作性能特别强,因此想使用一个作家的名字来命名kafka,而JayKreps,kafka的三位作者之一,在上大学的时候很喜欢FranzKafka,因此起来这样一个名字。kafka在年开源,年月正式进入Apache进行孵化,年月顺利毕业,后成为Apache的顶级项目。

您可能感兴趣的文章:

相关文章