开源分布式消息队列—Kafka,具备高吞吐量和高并发的特性,其网络通信层是如何做到消息的高效传输的呢?为了解开自己心中的疑虑,就查阅了Kafka的Network通信模块的源码,乘机会写本篇文章。
本文主要通过对Kafka源码的分析来简述其Reactor的多线程网络通信模型和总体框架结构,同时简要介绍Kafka网络通信层的设计与具体实现。

Kafka网络通信模型概述

Kafka的网络通信模型是基于NIO的Reactor多线程模型来设计的。这里先引用Kafka源码中注释的一段话:

An NIO socket server. The threading model is
1 Acceptor thread that handles new connections.
Acceptor has N Processor threads that each have their own selector and read requests from sockets.
M Handler threads that handle requests and produce responses back to the processor threads for writing.

相信大家看了上面的这段引文注释后,大致可以了解到Kafka的网络通信层模型,主要采用了1(1个Acceptor线程)+N(N个Processor线程)+M(M个业务处理线程)。下面的表格简要的列举了下(这里先简单的看下后面还会详细说明):

线程数 线程名 线程具体说明
1 kafka-socket-acceptor_%x Acceptor线程,负责监听Client端发起的请求
N kafka-network-thread_%d Processor线程,负责对Socket进行读写
M kafka-request-handler-_%d Worker线程,处理具体的业务逻辑并生成Response返回

Kafka网络通信层的完整框架图如下图所示:
Kafka Server 网络通信框架图

刚开始看到上面的这个框架图可能会有一些不太理解,并不要紧,这里可以先对Kafka的网络通信层框架结构有一个大致了解。本文后面会结合Kafka的部分重要源码来详细阐述上面的过程。这里可以简单总结一下其网络通信模型中的几个重要概念:

  1. Acceptor:1个接收线程,负责监听 Socket 新的连接请求,注册了 OP_ACCEPT 事件,将新的连接按照 round robin 方式交给对应的 Processor 线程处理;
  2. Processor:N个处理器线程,其中每个 Processor 都有自己的 selector,它会向 Acceptor 分配的 SocketChannel 注册相应的 OP_READ 事件,N 的大小由num.networker.threads决定;
  3. KafkaRequestHandler:M个请求处理线程,包含在线程池—KafkaRequestHandlerPool内部,从RequestChannel的全局请求队列—requestQueue中获取请求数据并交给KafkaApis处理,M的大小由num.io.threads决定;
  4. RequestChannel:其为Kafka服务端的请求通道,该数据结构中包含了一个全局的请求队列 requestQueue和多个与Processor处理器相对应的响应队列responseQueue,提供给Processor与请求处理线程KafkaRequestHandler和KafkaApis交换数据的地方。
  5. NetworkClient:其底层是对 Java NIO 进行相应的封装,位于Kafka的网络接口层。Kafka消息生产者对象—KafkaProducer的send方法主要调用NetworkClient完成消息发送;
  6. SocketServer:其是一个NIO的服务,它同时启动一个Acceptor接收线程和多个Processor处理器线程。提供了一种典型的Reactor多线程模式,将接收客户端请求和处理请求相分离;
  7. KafkaServer:代表了一个Kafka Broker的实例;其startup方法为实例启动的入口;
  8. KafkaApis:Kafka的业务逻辑处理Api,负责处理不同类型的请求;比如“发送消息”、“获取消息偏移量—offset”和“处理心跳请求”等;

上图展示的整体的处理流程如下所示:

  1. Acceptor 监听到来自请求者(请求者可以是来自 client,也可以来自 server)的新的连接,Acceptor 将这个请求者按照 round robin 的方式交给对对应的 Processor 进行处理;
  2. Processor 注册这个 SocketChannel 的 OP_READ 的事件,如果有请求发送过来就可以被 Processor 的 Selector 选中;
  3. Processor 将请求者发送的请求放入到一个 Request Queue 中,这是所有 Processor 共有的一个队列;
  4. KafkaRequestHandler 从 Request Queue 中取出请求;
  5. 调用 KafkaApis 进行相应的处理;
  6. 处理的结果放入到该 Processor 对应的 Response Queue 中(每个 request 都标识它们来自哪个 Processor),Request Queue 的数量与 Processor 的数量保持一致;
  7. Processor 从对应的 Response Queue 中取出 response;
  8. Processor 将处理的结果返回给对应的请求者。

Kafka网络通信层的设计与具体实现

这一节将结合Kafka网络通信层的源码来分析其设计与实现,这里主要详细介绍网络通信层的几个重要元素—SocketServer、Acceptor、Processor、RequestChannel和KafkaRequestHandler。本文分析的源码部分均基于Kafka的0.10.0.1版本。

Server 网络模型整体流程

Kafka Server 启动后,会通过 KafkaServer 的 startup() 方法初始化涉及到网络模型的相关对象,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
 def startup() {
try {
info("starting")

if(isShuttingDown.get)
throw new IllegalStateException("Kafka server is still shutting down, cannot re-start!")

if(startupComplete.get)
return

val canStartup = isStartingUp.compareAndSet(false, true)
if (canStartup) {
metrics = new Metrics(metricConfig, reporters, kafkaMetricsTime, true)

brokerState.newState(Starting)

/* start scheduler */
kafkaScheduler.startup()

/* setup zookeeper */
zkUtils = initZk()

/* start log manager */
logManager = createLogManager(zkUtils.zkClient, brokerState)
logManager.startup()

/* generate brokerId */
config.brokerId = getBrokerId
this.logIdent = "[Kafka Server " + config.brokerId + "], "
//note: socketServer
socketServer = new SocketServer(config, metrics, kafkaMetricsTime)
socketServer.startup()

/* start replica manager */
replicaManager = new ReplicaManager(config, metrics, time, kafkaMetricsTime, zkUtils, kafkaScheduler, logManager,
isShuttingDown)
replicaManager.startup()

/* start kafka controller */
kafkaController = new KafkaController(config, zkUtils, brokerState, kafkaMetricsTime, metrics, threadNamePrefix)
kafkaController.startup()

/* start group coordinator */
groupCoordinator = GroupCoordinator(config, zkUtils, replicaManager, kafkaMetricsTime)
groupCoordinator.startup()

/* Get the authorizer and initialize it if one is specified.*/
authorizer = Option(config.authorizerClassName).filter(_.nonEmpty).map { authorizerClassName =>
val authZ = CoreUtils.createObject[Authorizer](authorizerClassName)
authZ.configure(config.originals())
authZ
}

/* start processing requests //NOTE: 初始化 KafkaApis 实例,每个 Server 只会启动一个线程*/
apis = new KafkaApis(socketServer.requestChannel, replicaManager, groupCoordinator,
kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer)
requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
brokerState.newState(RunningAsBroker)

Mx4jLoader.maybeLoad()

/* start dynamic config manager */
dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager, config),
ConfigType.Client -> new ClientIdConfigHandler(apis.quotaManagers))

// Apply all existing client configs to the ClientIdConfigHandler to bootstrap the overrides
// TODO: Move this logic to DynamicConfigManager
AdminUtils.fetchAllEntityConfigs(zkUtils, ConfigType.Client).foreach {
case (clientId, properties) => dynamicConfigHandlers(ConfigType.Client).processConfigChanges(clientId, properties)
}

// Create the config manager. start listening to notifications
dynamicConfigManager = new DynamicConfigManager(zkUtils, dynamicConfigHandlers)
dynamicConfigManager.startup()

/* tell everyone we are alive */
val listeners = config.advertisedListeners.map {case(protocol, endpoint) =>
if (endpoint.port == 0)
(protocol, EndPoint(endpoint.host, socketServer.boundPort(protocol), endpoint.protocolType))
else
(protocol, endpoint)
}
kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, zkUtils, config.rack,
config.interBrokerProtocolVersion)
kafkaHealthcheck.startup()

// Now that the broker id is successfully registered via KafkaHealthcheck, checkpoint it
checkpointBrokerId(config.brokerId)

/* register broker metrics */
registerStats()

shutdownLatch = new CountDownLatch(1)
startupComplete.set(true)
isStartingUp.set(false)
AppInfoParser.registerAppInfo(jmxPrefix, config.brokerId.toString)
info("started")
}
}
...
}

Kafka Server 在启动时会初始化 SocketServerKafkaApisKafkaRequestHandlerPool 对象,这也是 Server 网络处理模型的主要组成部分。Kafka Server 的网络处理模型也是基于 Java NIO 机制实现的,实现模式与 Reactor 模式类似

上面是 Server 端网络处理的整体流程,下面我们开始详细讲述上面内容在 Kafka 中实现。

SocketServer

SocketServer 是接收 Socket 连接、处理请求并返回处理结果的地方,Acceptor 及 Processor 的初始化、处理逻辑都是在这里实现的。在SocketServer 内有几个比较重要的变量,这里先来看下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time, val credentialProvider: CredentialProvider) extends Logging with KafkaMetricsGroup {
private val endpoints = config.listeners.map(l => l.listenerName -> l).toMap //note: broker 开放的端口数
private val numProcessorThreads = config.numNetworkThreads //note: num.network.threads 默认为 3个,即 processor
private val maxQueuedRequests = config.queuedMaxRequests //note: queued.max.requests,request 队列中允许的最多请求数,默认是500
private val totalProcessorThreads = numProcessorThreads * endpoints.size //note: 每个端口会对应 N 个 processor
private val maxConnectionsPerIp = config.maxConnectionsPerIp //note: 默认 2147483647
private val maxConnectionsPerIpOverrides = config.maxConnectionsPerIpOverrides
this.logIdent = "[Socket Server on Broker " + config.brokerId + "], "
//note: 请求队列
val requestChannel = new RequestChannel(totalProcessorThreads, maxQueuedRequests)
private val processors = new Array[Processor](totalProcessorThreads)
private[network] val acceptors = mutable.Map[EndPoint, Acceptor]()
}
class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMetricsGroup {
private var responseListeners: List[(Int) => Unit] = Nil
//note: 一个 requestQueue 队列,N 个 responseQueues 队列
private val requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize)
private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors)
}

其中

  1. numProcessorThreads:决定了 Processor 的个数,默认是3个,也就是 1+N+M 的 N 的数值;
  2. maxQueuedRequests:决定了 request queue 中最多允许放入多少个请求(等待处理的请求),默认是 500;
  3. RequestChannel 中初始化了一个 requestQueue 和 N 个 responseQueue。

    SocketServer 初始化

    Boker在启动的时候会调用SocketServer的startup方法,会初始化 1 个 Acceptor 和 N 个 Processor 线程,并启动,其实现如下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    def startup() {
    this.synchronized {
    //note: 一台 broker 一般只设置一个端口,当然这里也可以设置两个
    config.listeners.foreach { endpoint =>
    val listenerName = endpoint.listenerName
    val securityProtocol = endpoint.securityProtocol
    val processorEndIndex = processorBeginIndex + numProcessorThreads
    //note: N 个 processor
    for (i <- processorBeginIndex until processorEndIndex)
    processors(i) = newProcessor(i, connectionQuotas, listenerName, securityProtocol)
    //note: 1个 Acceptor
    val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId,
    processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas)
    acceptors.put(endpoint, acceptor)
    Utils.newThread(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor, false).start()
    acceptor.awaitStartup()
    processorBeginIndex = processorEndIndex
    }
    }
    }

Acceptor

Acceptor是一个继承自抽象类AbstractServerThread的线程类。Acceptor的主要任务是监听并且接收客户端的请求,同时建立数据传输通道—SocketChannel,然后以轮询的方式交给一个后端的Processor线程处理(具体的方式是添加socketChannel至并发队列并唤醒Processor线程处理)。

在该线程类中主要可以关注以下两个重要的变量:

  1. nioSelector:通过NSelector.open()方法创建的变量,封装了JAVA NIO Selector的相关操作;
  2. serverChannel:用于监听端口的服务端Socket套接字对象;

实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
def run() {
serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)//note: 注册 accept 事件
startupComplete()
try {
var currentProcessor = 0
while (isRunning) {
try {
val ready = nioSelector.select(500)
if (ready > 0) {
val keys = nioSelector.selectedKeys()
val iter = keys.iterator()
while (iter.hasNext && isRunning) {
try {
val key = iter.next
iter.remove()
if (key.isAcceptable)
accept(key, processors(currentProcessor))//note: 拿到一个socket 连接,轮询选择一个processor进行处理
else
throw new IllegalStateException("Unrecognized key state for acceptor thread.")
//note: 轮询算法,使用 round robin
// round robin to the next processor thread
currentProcessor = (currentProcessor + 1) % processors.length
} catch {
case e: Throwable => error("Error while accepting connection", e)
}
}
}
}
catch {
// We catch all the throwables to prevent the acceptor thread from exiting on exceptions due
// to a select operation on a specific channel or a bad request. We don't want
// the broker to stop responding to requests from other clients in these scenarios.
case e: ControlThrowable => throw e
case e: Throwable => error("Error occurred", e)
}
}
} finally {
debug("Closing server socket and selector.")
swallowError(serverChannel.close())
swallowError(nioSelector.close())
shutdownComplete()
}
}

Acceptor 通过 accept() 将该新连接交给对应的 Processor,其实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//note: 处理一个新的连接
def accept(key: SelectionKey, processor: Processor) {
//note: accept 事件发生时,获取注册到 selector 上的 ServerSocketChannel
val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
val socketChannel = serverSocketChannel.accept()
try {
connectionQuotas.inc(socketChannel.socket().getInetAddress)
socketChannel.configureBlocking(false)
socketChannel.socket().setTcpNoDelay(true)
socketChannel.socket().setKeepAlive(true)
if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
socketChannel.socket().setSendBufferSize(sendBufferSize)
debug("Accepted connection from %s on %s and assigned it to processor %d, sendBufferSize [actual|requested]: [%d|%d] recvBufferSize [actual|requested]: [%d|%d]"
.format(socketChannel.socket.getRemoteSocketAddress, socketChannel.socket.getLocalSocketAddress, processor.id,
socketChannel.socket.getSendBufferSize, sendBufferSize,
socketChannel.socket.getReceiveBufferSize, recvBufferSize))
//note: 轮询选择不同的 processor 进行处理
processor.accept(socketChannel)
} catch {
case e: TooManyConnectionsException =>
info("Rejected connection from %s, address already has the configured maximum of %d connections.".format(e.ip, e.count))
close(socketChannel)
}
}

在上面源码中可以看到,Acceptor线程启动后,首先会向用于监听端口的服务端套接字对象—ServerSocketChannel上注册OP_ACCEPT 事件。然后以轮询的方式等待所关注的事件发生。如果该事件发生,则调用accept()方法对OP_ACCEPT事件进行处理。这里,Processor是通过round robin方法选择的,这样可以保证后面多个Processor线程的负载基本均匀。
Acceptor的accept()方法的作用主要如下:

  1. 通过SelectionKey取得与之对应的serverSocketChannel实例,并调用它的accept()方法与客户端建立连接;
  2. 调用connectionQuotas.inc()方法增加连接统计计数;并同时设置第(1)步中创建返回的socketChannel属性(如sendBufferSize、KeepAlive、TcpNoDelay、configureBlocking等)
  3. 将socketChannel交给processor.accept()方法进行处理。这里主要是将socketChannel加入Processor处理器的并发队列newConnections队列中,然后唤醒Processor线程从队列中获取socketChannel并处理。其中,newConnections会被Acceptor线程和Processor线程并发访问操作,所以newConnections是ConcurrentLinkedQueue队列(一个基于链接节点的无界线程安全队列)

Processor

Processor同Acceptor一样,也是一个线程类,继承了抽象类AbstractServerThread。其主要是从客户端的请求中读取数据和将KafkaRequestHandler处理完响应结果返回给客户端。在该线程类中主要关注以下几个重要的变量:

  1. newConnections:在上面的Acceptor一节中已经提到过,它是一种ConcurrentLinkedQueue[SocketChannel]类型的队列,用于保存新连接交由Processor处理的socketChannel;
  2. inflightResponses:是一个Map[String, RequestChannel.Response]类型的集合,用于记录尚未发送的响应;
  3. selector:是一个类型为KSelector变量,用于管理网络连接;
    下面先给出Processor处理器线程run方法执行的流程图:
    Kafk_Processor线程的处理流程图

在前面,Acceptor 通过 accept() 将新的连接交给 Processor,Processor 实际上是将该 SocketChannel 添加到该 Processor 的 newConnections 队列中,实现如下:

1
2
3
4
def accept(socketChannel: SocketChannel) {
newConnections.add(socketChannel)//note: 添加到队列中
wakeup()//note: 唤醒 Processor 的 selector(如果此时在阻塞的话)
}

这里详细看下 Processor 线程做了什么事情,其 run() 方法的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
override def run() {
startupComplete()
while (isRunning) {
try {
// setup any new connections that have been queued up
configureNewConnections()//note: 对新的 socket 连接,并注册 READ 事件
// register any new responses for writing
processNewResponses()//note: 处理 response 队列中 response
poll() //note: 监听所有的 socket channel,是否有新的请求发送过来
processCompletedReceives() //note: 处理接收到请求,将其放入到 request queue 中
processCompletedSends() //note: 处理已经完成的发送
processDisconnected() //note: 处理断开的连接
} catch {
// We catch all the throwables here to prevent the processor thread from exiting. We do this because
// letting a processor exit might cause a bigger impact on the broker. Usually the exceptions thrown would
// be either associated with a specific socket channel or a bad request. We just ignore the bad socket channel
// or request. This behavior might need to be reviewed if we see an exception that need the entire broker to stop.
case e: ControlThrowable => throw e
case e: Throwable =>
error("Processor got uncaught exception.", e)
}
}
debug("Closing selector - processor " + id)
swallowError(closeAll())
shutdownComplete()
}

Processor 在一次循环中,主要做的事情如下:

  1. configureNewConnections():对新添加到 newConnections 队列中的 SocketChannel 进行处理,这里主要是遍历取出队列中的每个socketChannel并将其在selector上注册OP_READ事件;
  2. processNewResponses():从该 Processor 对应的 response queue 中取出一个 response,进行发送,在这一步中会根据responseAction的类型(NoOpAction/SendAction/CloseConnectionAction)进行判断,若为“NoOpAction”,表示该连接对应的请求无需响应;若为“SendAction”,表示该Response需要发送给客户端,则会通过“selector.send”注册OP_WRITE事件,并且将该Response从responseQueue响应队列中移至inflightResponses集合中;“CloseConnectionAction”,表示该连接是要关闭的;;
  3. poll():调用 selector 的 poll() 方法,遍历注册的 SocketChannel,查看是否有事件准备就绪;
  4. processCompletedReceives():将接收到请求添加到的 request queue 中,在processCompletedReceives方法中调用“requestChannel.sendRequest”方法将请求Request添加至requestChannel的全局请求队列—requestQueue中,等待KafkaRequestHandler来处理。同时,调用“selector.mute”方法取消与该请求对应的连接通道上的OP_READ事件;
  5. processCompletedSends():处理已经完成的响应发送,当已经完成将response发送给客户端,则将其从inflightResponses移除,同时通过调用“selector.unmute”方法为对应的连接通道重新注册OP_READ事件;
  6. processDisconnected():处理断开的 SocketChannel, 将该response从inflightResponses集合中移除,同时将connectionQuotas统计计数减1。

上面就是 Processor 线程处理的主要逻辑,先是向新的 SocketChannel 注册相应的事件,监控是否有请求发送过来,接着从 response queue 中取出处理完成的请求发送给对应的请求者,然后调用一下 selector 的 poll(),遍历一下注册的所有 SocketChannel,判断是否有事件就绪,然后做相应的处理。这里需要注意的是,request queue 是所有 Processor 公用的一个队列,而 response queue 则是与 Processor 一一对应的,因为每个 Processor 监听的 SocketChannel 并不是同一批的,如果公有一个 response queue,那么这个 N 个 Processor 的 selector 要去监听所有的 SocketChannel,而不是现在这种,只需要去关注分配给自己的 SocketChannel。

下面分别看下上面的这些方法的具体实现。

configureNewConnections

configureNewConnections() 对新添加到 newConnections 队列中的 SocketChannel 进行处理,主要是 selector 注册相应的 OP_READ 事件,其实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//note: 如果有新的连接过来,将该 Channel 的 OP_READ 事件注册到 selector 上
private def configureNewConnections() {
while (!newConnections.isEmpty) {
val channel = newConnections.poll()
try {
debug(s"Processor $id listening to new connection from ${channel.socket.getRemoteSocketAddress}")
val localHost = channel.socket().getLocalAddress.getHostAddress
val localPort = channel.socket().getLocalPort
val remoteHost = channel.socket().getInetAddress.getHostAddress
val remotePort = channel.socket().getPort
val connectionId = ConnectionId(localHost, localPort, remoteHost, remotePort).toString
selector.register(connectionId, channel)
} catch {
// We explicitly catch all non fatal exceptions and close the socket to avoid a socket leak. The other
// throwables will be caught in processor and logged as uncaught exceptions.
case NonFatal(e) =>
val remoteAddress = channel.getRemoteAddress
// need to close the channel here to avoid a socket leak.
close(channel)
error(s"Processor $id closed connection from $remoteAddress", e)
}
}
}

processNewResponses

processNewResponses() 方法是从该 Processor 对应的 response queue 中取出一个 response,Processor 是通过 RequestChannel 的 receiveResponse() 从该 Processor 对应的 response queue 中取出 response,如下所示:

1
2
3
4
5
6
7
//note: 获取 response
def receiveResponse(processor: Int): RequestChannel.Response = {
val response = responseQueues(processor).poll()
if (response != null)
response.request.responseDequeueTimeMs = Time.SYSTEM.milliseconds
response
}

取到相应的 response 之后,会判断该 response 的类型,进行相应的操作,如果需要返回,那么会调用 sendResponse() 发送该 response,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
//note: 处理一个新的 response 响应
private def processNewResponses() {
var curr = requestChannel.receiveResponse(id)
while (curr != null) {
try {
curr.responseAction match {
case RequestChannel.NoOpAction => //note: 如果这个请求不需要返回 response,再次注册该监听事件
// There is no response to send to the client, we need to read more pipelined requests
// that are sitting in the server's socket buffer
curr.request.updateRequestMetrics
trace("Socket server received empty response to send, registering for read: " + curr)
val channelId = curr.request.connectionId
if (selector.channel(channelId) != null || selector.closingChannel(channelId) != null)
selector.unmute(channelId)
case RequestChannel.SendAction => //note: 需要发送的 response,那么进行发送
sendResponse(curr)
case RequestChannel.CloseConnectionAction => //note: 要关闭的 response
curr.request.updateRequestMetrics
trace("Closing socket connection actively according to the response code.")
close(selector, curr.request.connectionId)
}
} finally {
curr = requestChannel.receiveResponse(id)
}
}
}
/* `protected` for test usage */
//note: 发送的对应的 response
protected[network] def sendResponse(response: RequestChannel.Response) {
trace(s"Socket server received response to send, registering for write and sending data: $response")
val channel = selector.channel(response.responseSend.destination)
// `channel` can be null if the selector closed the connection because it was idle for too long
if (channel == null) {
warn(s"Attempting to send response via channel for which there is no open connection, connection id $id")
response.request.updateRequestMetrics()
}
else {
selector.send(response.responseSend) //note: 发送该 response
inflightResponses += (response.request.connectionId -> response) //note: 添加到 inflinght 中
}
}

processCompletedReceives

processCompletedReceives()方法的主要作用是处理接收到请求,并将其放入到 request queue 中,其实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//note: 处理接收到的所有请求
private def processCompletedReceives() {
selector.completedReceives.asScala.foreach { receive =>
try {
val openChannel = selector.channel(receive.source)
val session = {
// Only methods that are safe to call on a disconnected channel should be invoked on 'channel'.
val channel = if (openChannel != null) openChannel else selector.closingChannel(receive.source)
RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName), channel.socketAddress)
}
val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session,
buffer = receive.payload, startTimeMs = time.milliseconds, listenerName = listenerName,
securityProtocol = securityProtocol)
requestChannel.sendRequest(req) //note: 添加到请求队列,如果队列满了,将会阻塞
selector.mute(receive.source) //note: 移除该连接的 OP_READ 监听
} catch {
case e @ (_: InvalidRequestException | _: SchemaException) =>
// note that even though we got an exception, we can assume that receive.source is valid. Issues with constructing a valid receive object were handled earlier
error(s"Closing socket for ${receive.source} because of error", e)
close(selector, receive.source)
}
}
}

processCompletedSends

processCompletedSends() 方法是处理已经完成的发送,其实现如下:

1
2
3
4
5
6
7
8
9
10
private def processCompletedSends() {
selector.completedSends.asScala.foreach { send =>
//note: response 发送完成,从正在发送的集合中移除
val resp = inflightResponses.remove(send.destination).getOrElse {
throw new IllegalStateException(s"Send for ${send.destination} completed, but not in `inflightResponses`")
}
resp.request.updateRequestMetrics()
selector.unmute(send.destination) //note: 完成这个请求之后再次监听 OP_READ 事件
}
}

KafkaRequestHandlerPool

上面主要是讲述 SocketServer 中 Acceptor 与 Processor 的处理内容,也就是 1+N+M 模型中 1+N 部分,下面开始讲述 M 部分,也就是 KafkaRequestHandler 的内容,其初始化实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class KafkaRequestHandlerPool(val brokerId: Int,
val requestChannel: RequestChannel,
val apis: KafkaApis,
time: Time,
numThreads: Int) extends Logging with KafkaMetricsGroup {
/* a meter to track the average free capacity of the request handlers */
private val aggregateIdleMeter = newMeter("RequestHandlerAvgIdlePercent", "percent", TimeUnit.NANOSECONDS)
this.logIdent = "[Kafka Request Handler on Broker " + brokerId + "], "
val threads = new Array[Thread](numThreads)
val runnables = new Array[KafkaRequestHandler](numThreads)
//note: 建立 M 个(numThreads)KafkaRequestHandler
for(i <- 0 until numThreads) {
//note: requestChannel 是 Processor 存放 request 请求的地方,也是 Handler 处理完请求存放 response 的地方
runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis, time)
threads(i) = Utils.daemonThread("kafka-request-handler-" + i, runnables(i))
threads(i).start()
}
def shutdown() {
info("shutting down")
for(handler <- runnables)
handler.shutdown
for(thread <- threads)
thread.join
info("shut down completely")
}
}

如上面实现所示:

  1. KafkaRequestHandlerPool 会初始化 M 个 KafkaRequestHandler 线程,并启动该线程;
  2. 在初始化 KafkaRequestHandler 时,传入一个 requestChannel 变量,这个是 Processor 存放 request 的地方,KafkaRequestHandler 在处理请求时,会从这个 queue 中取出相应的 request。

    KafkaRequestHandler

    KafkaRequestHandler 线程的处理实现如下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    def run() {
    while(true) {
    try {
    var req : RequestChannel.Request = null
    while (req == null) {
    // We use a single meter for aggregate idle percentage for the thread pool.
    // Since meter is calculated as total_recorded_value / time_window and
    // time_window is independent of the number of threads, each recorded idle
    // time should be discounted by # threads.
    val startSelectTime = time.nanoseconds
    req = requestChannel.receiveRequest(300) //note: 从 request queue 中拿去 request
    val idleTime = time.nanoseconds - startSelectTime
    aggregateIdleMeter.mark(idleTime / totalHandlerThreads)
    }
    if(req eq RequestChannel.AllDone) {
    debug("Kafka request handler %d on broker %d received shut down command".format(
    id, brokerId))
    return
    }
    req.requestDequeueTimeMs = time.milliseconds
    trace("Kafka request handler %d on broker %d handling request %s".format(id, brokerId, req))
    apis.handle(req) //note: 处理请求,并将处理的结果通过 sendResponse 放入 response queue 中
    } catch {
    case e: Throwable => error("Exception when handling request", e)
    }
    }
    }

上述方法的实现逻辑:

  1. KafkaRequestHandler不断的从requestChannel队列里面取出request交给apis处理;
  2. KafkaApis 处理这个 request,并通过 requestChannel.sendResponse() 将处理的结果放入 requestChannel 的 response queue 中,如下所示:
    1
    2
    3
    4
    5
    6
    //note: 将 response 添加到对应的队列中
    def sendResponse(response: RequestChannel.Response) {
    responseQueues(response.processor).put(response)
    for(onResponse <- responseListeners)
    onResponse(response.processor) //note: 调用对应 processor 的 wakeup 方法
    }

KafkaRequestHandler也是一种线程类,在KafkaServer实例启动时候会实例化一个线程池—KafkaRequestHandlerPool对象(包含了若干个KafkaRequestHandler线程),这些线程以守护线程的方式在后台运行。在KafkaRequestHandler的run方法中会循环地从RequestChannel中阻塞式读取request,读取后再交由KafkaApis来具体处理。

KafkaApis

KafkaApis是用于处理对通信网络传输过来的业务消息请求的中心转发组件。该组件反映出Kafka Broker Server可以提供哪些服务。
apis根据不同的请求类型调用不同的方法进行处理, 代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* Top-level method that handles all requests and multiplexes to the right api
*/
def handle(request: RequestChannel.Request) {
try {
ApiKeys.forId(request.requestId) match {
case ApiKeys.PRODUCE => handleProducerRequest(request)
case ApiKeys.FETCH => handleFetchRequest(request)
case ApiKeys.LIST_OFFSETS => handleOffsetRequest(request)
case ApiKeys.METADATA => handleTopicMetadataRequest(request)
case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request)
case ApiKeys.UPDATE_METADATA_KEY => handleUpdateMetadataRequest(request)
case ApiKeys.CONTROLLED_SHUTDOWN_KEY => handleControlledShutdownRequest(request)
case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request)
case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
case ApiKeys.GROUP_COORDINATOR => handleGroupCoordinatorRequest(request)
case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request)
case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request)
case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request)
case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request)
case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request)
case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)
case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
case requestId => throw new KafkaException("Unknown api code " + requestId)
}
}catch {
}
} finally
request.apiLocalCompleteTimeMs = SystemTime.milliseconds

显然,此处处理的速度影响Kafka整体的消息处理的速度。
这里我们分析一个处理方法handleProducerRequest。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
/**
* Handle a produce request
*/
def handleProducerRequest(request: RequestChannel.Request) {
val produceRequest = request.body.asInstanceOf[ProduceRequest]

// the callback for sending a produce response
def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]) {

var errorInResponse = false

mergedResponseStatus.foreach { case (topicPartition, status) =>
if (status.errorCode != Errors.NONE.code) {
errorInResponse = true
}
}

def produceResponseCallback(delayTimeMs: Int) {
if (produceRequest.acks == 0) {
// no operation needed if producer request.required.acks = 0; however, if there is any error in handling
// the request, since no response is expected by the producer, the server will close socket server so that
// the producer client will know that some error has happened and will refresh its metadata
if (errorInResponse) {
val exceptionsSummary = mergedResponseStatus.map { case (topicPartition, status) =>
topicPartition -> Errors.forCode(status.errorCode).exceptionName
}.mkString(", ")
requestChannel.closeConnection(request.processor, request)
} else {
requestChannel.noOperation(request.processor, request)
}
} else {
val respHeader = new ResponseHeader(request.header.correlationId)
val respBody = request.header.apiVersion match {
case 0 => new ProduceResponse(mergedResponseStatus.asJava)
case version@(1 | 2) => new ProduceResponse(mergedResponseStatus.asJava, delayTimeMs, version)
// This case shouldn't happen unless a new version of ProducerRequest is added without
// updating this part of the code to handle it properly.
case version => throw new IllegalArgumentException(s"Version `$version` of ProduceRequest is not handled. Code must be updated.")
}

requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, respHeader, respBody)))
}
}

// When this callback is triggered, the remote API call has completed
request.apiRemoteCompleteTimeMs = SystemTime.milliseconds

quotaManagers(ApiKeys.PRODUCE.id).recordAndMaybeThrottle(
request.header.clientId,
numBytesAppended,
produceResponseCallback)
}

if (authorizedRequestInfo.isEmpty)
sendResponseCallback(Map.empty)
else {
val internalTopicsAllowed = request.header.clientId == AdminUtils.AdminClientId

// Convert ByteBuffer to ByteBufferMessageSet
val authorizedMessagesPerPartition = authorizedRequestInfo.map {
case (topicPartition, buffer) => (topicPartition, new ByteBufferMessageSet(buffer))
}

// call the replica manager to append messages to the replicas
replicaManager.appendMessages(
produceRequest.timeout.toLong,
produceRequest.acks,
internalTopicsAllowed,
authorizedMessagesPerPartition,
sendResponseCallback)

// if the request is put into the purgatory, it will have a held reference
// and hence cannot be garbage collected; hence we clear its data here in
// order to let GC re-claim its memory since it is already appended to log
produceRequest.clearPartitionRecords()
}
}

这里会调用replicaManager.appendMessages处理Kafka message的保存和备份,也就是leader和备份节点上。

Replication Subsystem

顺藤摸瓜,我们进入replicaManager.appendMessages的代码。
这个方法会将消息放到leader分区上,并复制到备份分区上。在超时或者根据required acks的值及时返回response。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/**
* Append messages to leader replicas of the partition, and wait for them to be replicated to other replicas;
* the callback function will be triggered either when timeout or the required acks are satisfied
*/
def appendMessages(timeout: Long,
requiredAcks: Short,
internalTopicsAllowed: Boolean,
messagesPerPartition: Map[TopicPartition, MessageSet],
responseCallback: Map[TopicPartition, PartitionResponse] => Unit) {

if (isValidRequiredAcks(requiredAcks)) {
val sTime = SystemTime.milliseconds
val localProduceResults = appendToLocalLog(internalTopicsAllowed, messagesPerPartition, requiredAcks)
debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime))

val produceStatus = localProduceResults.map { case (topicPartition, result) =>
topicPartition ->
ProducePartitionStatus(
result.info.lastOffset + 1, // required offset
new PartitionResponse(result.errorCode, result.info.firstOffset, result.info.timestamp)) // response status
}

if (delayedRequestRequired(requiredAcks, messagesPerPartition, localProduceResults)) {
// create delayed produce operation
val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback)

// create a list of (topic, partition) pairs to use as keys for this delayed produce operation
val producerRequestKeys = messagesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toSeq

// try to complete the request immediately, otherwise put it into the purgatory
// this is because while the delayed produce operation is being created, new
// requests may arrive and hence make this operation completable.
delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)

} else {
// we can respond immediately
val produceResponseStatus = produceStatus.mapValues(status => status.responseStatus)
responseCallback(produceResponseStatus)
}
} else {
// If required.acks is outside accepted range, something is wrong with the client
// Just return an error and don't handle the request at all
val responseStatus = messagesPerPartition.map {
case (topicAndPartition, messageSet) =>
(topicAndPartition -> new PartitionResponse(Errors.INVALID_REQUIRED_ACKS.code,
LogAppendInfo.UnknownLogAppendInfo.firstOffset,
Message.NoTimestamp))
}
responseCallback(responseStatus)
}
}

Log Subsystem

LogManager负责管理Kafka的Log(Kafka消息), 包括log/Log文件夹的创建,获取和清理。它也会通过定时器检查内存中的log是否要缓存到磁盘中。重要的类包括LogManager和Log。

OffsetManager

负责管理offset,提供offset的读写。

TopicConfigManager

它负责动态改变Topic的配置属性。
如果某个topic的配置属性改变了,Kafka会在ZooKeeper上创建一个类似/brokers/config_changes/config_change_13321的节点, topicConfigManager会监控这些节点, 获得属性改变的topics并处理,实际上以新的LogConfig替换老的

RequestChannel

在Kafka的网络通信层中,RequestChannel为Processor处理器线程与KafkaRequestHandler线程之间的数据交换提供了一个数据缓冲区,是通信过程中Request和Response缓存的地方。因此,其作用就是在通信中起到了一个数据缓冲队列的作用。Processor线程将读取到的请求添加至RequestChannel的全局请求队列—requestQueue中;KafkaRequestHandler线程从请求队列中获取并处理,处理完以后将Response添加至RequestChannel的响应队列—responseQueue中,并通过responseListeners唤醒对应的Processor线程,最后Processor线程从响应队列中取出后发送至客户端。

到这里为止,一个请求从 Processor 接收,到 KafkaRequestHandler 通过 KafkaApis 处理并放回该 Processor 对应的 response queue 这整个过程就完成了(建议阅读本文的时候结合最前面的流程图一起看)。

参考

消息中间件—简谈Kafka中的NIO网络通信模型

kafka源码解析之八:Broker分析