一、前言
我们都知道Kafka是基于磁盘进行存储的,但Kafka官方又称其具有高性能、高吞吐、低延时的特点,其吞吐量动辄几十上百万。小伙伴们是不是有点困惑了,一般认为在磁盘上读写数据是会降低性能的,因为寻址会比较消耗时间。那Kafka又是怎么做到其吞吐量动辄几十上百万的呢?
Kafka高性能,是多方面协同的结果,包括宏观架构、分布式partition存储、ISR数据同步、以及“无所不用其极”的高效利用磁盘、操作系统特性。
别急,下面老周从数据的写入与读取两个维度来带大家一探究竟。
二、顺序写入
磁盘读写有两种方式:顺序读写或者随机读写。在顺序读写的情况下,磁盘的顺序读写速度和内存持平。
因为磁盘是机械结构,每次读写都会寻址-写入,其中寻址是一个“机械动作”。为了提高读写磁盘的速度,Kafka就是使用顺序I/O。
Kafka利用了一种分段式的、只追加(Append-Only)的日志,基本上把自身的读写操作限制为顺序I/O,也就使得它在各种存储介质上能有很快的速度。一直以来,有一种广泛的误解认为磁盘很慢。实际上,存储介质(特别是旋转式的机械硬盘)的性能很大程度依赖于访问模式。在一个转/分钟的SATA机械硬盘上,随机I/O的性能比顺序I/O低了大概3到4个数量级。此外,一般来说现代的操作系统都会提供预读和延迟写技术:以大数据块的倍数预先载入数据,以及合并多个小的逻辑写操作成一个大的物理写操作。正因为如此,顺序I/O和随机I/O之间的性能差距在flash和其他固态非易失性存储介质中仍然很明显,尽管它远没有旋转式的存储介质那么明显。
这里给出著名学术期刊ACMQueue上的性能对比图:
下图就展示了Kafka是如何写入数据的,每一个Partition其实都是一个文件,收到消息后Kafka会把数据插入到文件末尾(虚框部分):
这种方法采用了只读设计,所以Kafka是不会修改、删除数据的,它会把所有的数据都保留下来,每个消费者(Consumer)对每个Topic都有一个offset用来表示读取到了第几条数据。
磁盘的顺序读写是磁盘使用模式中最有规律的,并且操作系统也对这种模式做了大量优化,Kafka就是使用了磁盘顺序读写来提升的性能。Kafka的message是不断追加到本地磁盘文件末尾的,而不是随机的写入,这使得Kafka写入吞吐量得到了显著提升。
三、页缓存
即便是顺序写入硬盘,硬盘的访问速度还是不可能追上内存。所以Kafka的数据并不是实时的写入硬盘,它充分利用了现代操作系统分页存储来利用内存提高I/O效率。具体来说,就是把磁盘中的数据缓存到内存中,把对磁盘的访问变为对内存的访问。
Kafka接收来自socketbuffer的网络数据,应用进程不需要中间处理、直接进行持久化时。可以使用mmap内存文件映射。
3.1MemoryMappedFiles
简称mmap,简单描述其作用就是:将磁盘文件映射到内存,用户通过修改内存就能修改磁盘文件。
它的工作原理是直接利用操作系统的Page来实现磁盘文件到物理内存的直接映射。完成映射之后你对物理内存的操作会被同步到硬盘上(操作系统在适当的时候)。
通过mmap,进程像读写硬盘一样读写内存(当然是虚拟机内存)。使用这种方式可以获取很大的I/O提升,省去了用户空间到内核空间复制的开销。
mmap也有一个很明显的缺陷:不可靠,写到mmap中的数据并没有被真正的写到硬盘,操作系统会在程序主动调用flush的时候才把数据真正的写到硬盘。
Kafka提供了一个参数producer.type来控制是不是主动flush:
如果Kafka写入到mmap之后就立即flush,然后再返回Producer叫同步(sync);
写入mmap之后立即返回Producer不调用flush叫异步(async)。
3.2JavaNIO对文件映射的支持
JavaNIO,提供了一个MappedByteBuffer类可以用来实现内存映射。
MappedByteBuffer只能通过调用FileChannel的map()取得,再没有其他方式。
FileChannel.map()是抽象方法,具体实现是在FileChannelImpl.map()可自行查看JDK源码,其map0()方法就是调用了Linux内核的mmap的API。
3.3使用MappedByteBuffer类注意事项
mmap的文件映射,在fullgc时才会进行释放。当close时,需要手动清除内存映射文件,可以反射调用sun.misc.Cleaner方法。
当一个进程准备读取磁盘上的文件内容时:
操作系统会先查看待读取的数据所在的页(page)是否在页缓存(pagecache)中,如果存在(命中)则直接返回数据,从而避免了对物理磁盘的I/O操作;
如果没有命中,则操作系统会向磁盘发起读取请求并将读取的数据页存入页缓存,之后再将数据返回给进程。
如果一个进程需要将数据写入磁盘:
操作系统也会检测数据对应的页是否在页缓存中,如果不存在,则会先在页缓存中添加相应的页,最后将数据写入对应的页。
被修改过后的页也就变成了脏页,操作系统会在合适的时间把脏页中的数据写入磁盘,以保持数据的一致性。
对一个进程而言,它会在进程内部缓存处理所需的数据,然而这些数据有可能还缓存在操作系统的页缓存中,因此同一份数据有可能被缓存了两次。并且,除非使用DirectI/O的方式,否则页缓存很难被禁止。
当使用页缓存的时候,即使Kafka服务重启,页缓存还是会保持有效,然而进程内的缓存却需要重建。这样也极大地简化了代码逻辑,因为维护页缓存和文件之间的一致性交由操作系统来负责,这样会比进程内维护更加安全有效。
Kafka中大量使用了页缓存,这是Kafka实现高吞吐的重要因素之一。
消息先被写入页缓存,由操作系统负责刷盘任务。
四、零拷贝
导致应用程序效率低下的一个典型根源是缓冲区之间的字节数据拷贝。Kafka使用由Producer、Broker和Consumer多方共享的二进制消息格式,因此数据块即便是处于压缩状态也可以在不被修改的情况下在端到端之间流动。虽然消除通信各方之间的结构化差异是非常重要的一步,但它本身并不能避免数据的拷贝。
Kafka通过利用Java的NIO框架,尤其是java.nio.channels.FileChannel里的transferTo这个方法,解决了前面提到的在Linux等类UNIX系统上的数据拷贝问题。此方法能够在不借助作为传输中介的应用程序的情况下,将字节数据从源通道直接传输到接收通道。要了解NIO的带来的改进,请考虑传统方式下作为两个单独的操作:源通道中的数据被读入字节缓冲区,接着写入接收通道:
File.read(fileDesc,buf,len);Socket.send(socket,buf,len);
通过图表来说明,这个过程可以被描述如下:
尽管上面的过程看起来已经足够简单,但是在内部仍需要4次用户态和内核态的上下文切换来完成拷贝操作,而且需要拷贝4次数据才能完成这个操作。下面的示意图概述了每一个步骤中的上下文切换。
让我们来更详细地看一下细节:
初始的read()调用导致了一次用户态到内核态的上下文切换。DMA(DirectMemoryAccess直接内存访问)引擎读取文件,并将其内容复制到内核地址空间中的缓冲区中。这个缓冲区和上面的代码片段里使用的并非同一个。
在从read()返回之前,内核缓冲区的数据会被拷贝到用户态的缓冲区。此时,我们的程序可以读取文件的内容。
接下来的send()方法会切换回内核态,拷贝用户态的缓冲区数据到内核地址空间——这一次是拷贝到一个关联着目标套接字的不同缓冲区。在后台,DMA引擎会接手这一操作,异步地把数据从内核缓冲区拷贝到协议堆栈,由网卡进行网络传输。send()方法在返回之前不等待此操作。
send()调用返回,切换回用户态。
尽管模式切换的效率很低,而且需要进行额外的拷贝,但在许多情况下,中间内核缓冲区的性能实际上可以进一步提高。比如它可以作为一个预读缓存,异步预载入数据块,从而可以在应用程序前端运行请求。但是,当请求的数据量极大地超过内核缓冲区大小时,内核缓冲区就会成为性能瓶颈。它不会直接拷贝数据,而是迫使系统在用户态和内核态之间摇摆,直到所有数据都被传输完成。
相比之下,零拷贝方式能在单个操作中处理完成。前面示例中的代码片段现在能重写为一行程序:
fileDesc.transferTo(offset,len,socket);
零拷贝方式可以用下图来说明:
在这种模式下,上下文的切换次数被缩减至一次。具体来说,transferTo()方法指示数据块设备通过DMA引擎将数据读入读缓冲区,然后这个缓冲区的数据拷贝到另一个内核缓冲区中,分阶段写入套接字。最后,DMA将套接字缓冲区的数据拷贝到NIC缓冲区中。
最终结果,我们已经把拷贝的次数从4降到了3,而且其中只有一次拷贝占用了CPU资源。我们也已经把上下文切换的次数从4降到了2。
把磁盘文件读取OS内核缓冲区后的fileChannel,直接转给socketChannel发送;底层就是sendfile。消费者从broker读取数据,就是由此实现。
具体来看,Kafka的数据传输通过TransportLayer来完成,其子类PlaintextTransportLayer通过JavaNIO的FileChannel的transferTo和transferFrom方法实现零拷贝。
注:transferTo和transferFrom并不保证一定能使用零拷贝,需要操作系统支持。
这是一个巨大的提升,不过还没有实现完全"零拷贝"。我们可以通过利用Linux内核2.4或更高版本以及支持gather操作的网卡来做进一步的优化从而实现真正的"零拷贝"。下面的示意图可以说明:
调用transferTo()方法会致使设备通过DMA引擎将数据读入内核读缓冲区,就像前面的例子那样。然而,通过gather操作,读缓冲区和套接字缓冲区之间的数据拷贝将不复存在。相反地,NIC被赋予一个指向读缓冲区的指针,连同偏移量和长度,所有数据都将通过DMA抽取干净并拷贝到NIC缓冲区。在这个过程中,在缓冲区间拷贝数据将无需占用任何CPU资源。
传统的方式和零拷贝方式在MB字节到GB字节的文件大小范围内的性能对比显示,零拷贝方式相较于传统方式的性能提升幅度在2到3倍。但更令人惊叹的是,Kafka仅仅是在一个纯JVM虚拟机下、没有使用本地库或JNI代码,就实现了这一点。
五、Broker性能
5.1日志记录批处理
顺序I/O在大多数的存储介质上都非常快,几乎可以和网络I/O的峰值性能相媲美。在实践中,这意味着一个设计良好的日志结构的持久层将可以紧随网络流量的速度。事实上,Kafka的瓶颈通常是网络而非磁盘。因此,除了由操作系统提供的底层批处理能力之外,Kafka的Clients和Brokers会把多条读写的日志记录合并成一个批次,然后才通过网络发送出去。日志记录的批处理通过使用更大的包以及提高带宽效率来摊薄网络往返的开销。
5.2批量压缩
当启用压缩功能时,批处理的影响尤为明显,因为压缩效率通常会随着数据量大小的增加而变得更高。特别是当使用JSON等基于文本的数据格式时,压缩效果会非常显著,压缩比通常能达到5到7倍。此外,日志记录批处理在很大程度上是作为Client侧的操作完成的,此举把负载转移到Client上,不仅对网络带宽效率、而且对Brokers的磁盘I/O利用率也有很大的提升。
5.3非强制刷新缓冲写操作
另一个助力Kafka高性能、同时也是一个值得更进一步去探究的底层原因:Kafka在确认写成功ACK之前的磁盘写操作不会真正调用fsync命令;通常只需要确保日志记录被写入到I/OBuffer里就可以给Client回复ACK信号。这是一个鲜为人知却至关重要的事实:事实上,这正是让Kafka能表现得如同一个内存型消息队列的原因——因为Kafka是一个基于磁盘的内存型消息队列(受缓冲区/页面缓存大小的限制)。
另一方面,这种形式的写入是不安全的,因为副本的写失败可能会导致数据丢失,即使日志记录似乎已经被确认成功。换句话说,与关系型数据库不同,确认一个写操作成功并不等同于持久化成功。真正使得Kafka具备持久化能力的是运行多个同步的副本的设计;即便有一个副本写失败了,其他的副本(假设有多个)仍然可以保持可用状态,前提是写失败是不相关的(例如,多个副本由于一个共同的上游故障而同时写失败)。因此,不使用fsync的I/O非阻塞方法和冗余同步副本的结合,使得Kafka同时具备了高吞吐量、持久性和可用性。
六、流数据并行
日志结构I/O的效率是影响性能的一个关键因素,主要影响写操作;Kafka在对Topic结构和Consumer群组的并行处理是其读性能的基础。这种组合产生了非常高的端到端消息传递总体吞吐量。并发性根深蒂固地存在于Kafka的分区方案和ConsumerGroups的操作中,这是Kafka中一种有效的负载均衡机制——把数据分区(Partition)近似均匀地分配给组内的各个Consumer实例。将此与更传统的MQ进行比较:在RabbitMQ的等效设置中,多个并发的Consumers可能以轮询的方式从队列读取数据,然而这样做,就会失去消息消费的顺序性。
分区机制也使得KafkaBrokers可以水平扩展。每个分区都有一个专门的Leader;因此,任何重要的主题Topic(具有多个分区)都可以利用整个Broker集群进行写操作,这是Kafka和消息队列之间的另一个区别;后者利用集群来获得可用性,而Kafka将真正地在Brokers之间负载均衡,以获得可用性、持久性和吞吐量。
生产者在发布日志记录之时指定分区,假设你正在发布消息到一个有多个分区的Topic上。(也可能有单一分区的Topic,这种情况下将不成问题。)这可以通过直接指定分区索引来完成,或者间接通过日志记录的键值来完成,该键值能被确定性地哈希到一个一致的(即每次都相同)分区索引。拥有相同哈希值的日志记录将会被存储到同一个分区中。假设一个Topic有多个分区,那些不同哈希值的日志记录将很可能最后被存储到不同的分区里。但是,由于哈希碰撞的缘故,不同哈希值的日志记录也可能最后被存储到相同的分区里。这是哈希的本质,如果你理解哈希表的原理,那应该是显而易见的。
日志记录的实际处理是由一个在(可选的)ConsumerGroup中的Consumer操作完成。Kafka确保一个分区最多只能分配给它的ConsumerGroup中的一个Consumer。(我们说"最多"是因为考虑到一种全部Consumer都离线的情况。)当第一个ConsumerGroup里的Consumer订阅了Topic,它将消费这个Topic下的所有分区的数据。当第二个Consumer紧随其后加入订阅时,它将大致获得这个Topic的一半分区,减轻第一个Consumer先前负荷的一半。这使得你能够并行处理事件流,并根据需要增加Consumer(理想情况下,使用自动伸缩机制),前提是你已经对事件流进行了合理的分区。
日志记录吞吐量的控制一般通过以下两种方式来达成:
Topic的分区方案。应该对Topics进行分区,以最大限度地增加独立子事件流的数量。换句话说,日志记录的顺序应该只保留在绝对必要的地方。如果任意两个日志记录在某种意义上没有合理的关联,那它们就不应该被绑定到同一个分区。这暗示你要使用不同的键值,因为Kafka将使用日志记录的键值作为一个散列源来派生其一致的分区映射。
一个组里的Consumers数量。你可以增加ConsumerGroup里的Consumer数量来均衡入站的日志记录的负载,这个数量的上限是Topic的分区数量。(如果你愿意的话,你当然可以增加更多的Consumers,不过分区计数将会设置一个上限来确保每一个活跃的Consumer至少被指派到一个分区,多出来的Consumers将会一直保持在一个空闲的状态。)请注意,Consumer可以是进程或线程。依据Consumer执行的工作负载类型,你可以在线程池中使用多个独立的Consumer线程或进程记录。
如果你之前一直想知道Kafka是否很快、它是如何拥有其现如今公认的高性能标签,或者它是否可以满足你的使用场景,那么相信你现在应该有了所需的答案。
为了让事情足够清楚,必须说明Kafka并不是最快的(也就是说,具有最大吞吐量能力的)消息传递中间件,还有其他具有更大吞吐量的平台——有些是基于软件的——有些是在硬件中实现的。ApachePulsar是一项极具前景的技术,它具备可扩展性,在提供相同的消息顺序性和持久性保证的同时,还能实现更好的吞吐量-延迟效果。使用Kafka的根本原因是,它作为一个完整的生态系统仍然是无与伦比的。它展示了卓越的性能,同时提供了一个丰富和成熟而且还在不断进化的环境,尽管Kafka的规模已经相当庞大了,但仍以一种令人羡慕的速度在成长。
Kafka的设计者和维护者们在创造一个以性能导向为核心的解决方案这方面做得非常出色。它的大多数设计/理念元素都是早期就构思完成、几乎没有什么是事后才想到的,也没有什么是附加的。从把工作负载分摊到Client到Broker上的日志结构持久性,批处理、压缩、零拷贝I/O和流数据级并行——Kafka向几乎所有其他面向消息的中间件(商业的或开源的)发起了挑战。而且最令人叹为观止的是,它做到这些事情的同时竟然没有牺牲掉持久性、日志记录顺序性和至少交付一次的语义等特性。
七、总结
7.1mmap和sendfile
Linux内核提供、实现零拷贝的API。
mmap将磁盘文件映射到内存,支持读和写,对内存的操作会反映在磁盘文件上。
sendfile是将读到内核空间的数据,转到socketbuffer,进行网络发送。
RocketMQ在消费消息时,使用了mmap;Kafka使用了sendfile。
7.2Kafka为啥这么快?
Partition顺序读写,充分利用磁盘特性,这是基础。
Producer生产的数据持久化到Broker,采用mmap文件映射,实现顺序的快速写入。
Customer从Broker读取数据,采用sendfile,将磁盘文件读到OS内核缓冲区后,直接转到socketbuffer进行网络发送。
Broker性能优化:日志记录批处理、批量压缩、非强制刷新缓冲写操作等。
流数据并行