apache kafka系列之源码分析走读kafka内部模块分析
–>
转载至:http://blog.csdn.net/lizhitao/article/details/37911993
kafka整体结构分析:
kafka源代码工程目录结构如下图:
下面只对core目录结构作说明,其他都是测试类或Java客户端代码
admin –管理员模块,操作和管理topic,paritions相关,包含create,delete topic,扩展 patitions
Api —该模块主要负责交互数据的组装,客户端与服务端交互数据编解码
client –该模块比较简单,就一个类,Producer读取kafka broker元数据信息,
topic和partitions,以及leader
cluster –该模块包含几个实体类,Broker,Cluster,Partition,Replica,解释他们之间关系: Cluster由多个broker组成,一个Broker包含多个partition,一个topic的所有
partitions分布在不同broker的中,一个Replica包含多个Partition。
common –通用模块,只包含异常类和错误验证
consumer –consumer处理模块,负责所有客户端消费者数据和逻辑处理
contoroller –负责*控制器选举,partition的leader选举,副本分配,副本重新分配,
partition和replica扩容。
javaapi –提供java的producer和consumer接口api
log –Kafka文件存储模块,负责读写所有kafka的topic消息数据。
message –封装多个消息组成一个“消息集”或压缩消息集。
metrics —内部状态的监控模块
network –网络事件处理模块,负责处理和接收客户端连接
producer –producer实现模块,包括同步和异步发送消息。
serializer –序列化或反序列化当前消息
kafka –kafka门面入口类,副本管理,topic配置管理,leader选举实现(由contoroller模块调用)。
tools –一看这就是工具模块,包含内容比较多:
a.导出对应consumer的offset值.
b.导出LogSegments信息,当前topic的log写的位置信息.
c.导出zk上所有consumer的offset值.
d.修改注册在zk的consumer的offset值.
f.producer和consumer的使用例子.
utils –Json工具类,Zkutils工具类,Utils创建线程工具类,KafkaScheduler公共调度器类,公共日志类等等。
1.kafka启动类:kafka.Scala
kafka为kafka broker的main启动类,其主要作用为加载配置,启动report服务(内部状态的监控),注册释放资源的钩子,以及门面入口类。
kafka类代码如下:
……
try {
val props = Utils.loadProps(args(0)) //加载配置文件
val serverConfig = new KafkaConfig(props)
KafkaMetricsReporter.startReporters(serverConfig.props) //启动report服务(内部状态的监控)
val kafkaServerStartble = new KafkaServerStartable(serverConfig) //kafka server核心入口类
// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread() { //钩子程序,当jvm退出前,销毁所有资源
override def run() = {
kafkaServerStartble.shutdown
}
})
kafkaServerStartble.startup
kafkaServerStartble.awaitShutdown
}
……
KafkaServerStartble类包装了KafkaSever类,其实啥都没有做。只是调用包装类而已
KafkaSever类是kafka broker运行控制的核心入口类,它是采用门面模式设计的。
kafka中KafkaServer类,采用门面模式,是网络处理,io处理等得入口.
ReplicaManager
副本管理
KafkaApis 处理所有request的Proxy类,根据requestKey决定调⽤用具体的handler
KafkaRequestHandlerPool 处理request的线程池,请求处理池 <– num.io.threads io线程数量
LogManager kafka文件存储系统管理,负责处理和存储所有Kafka的topic的partiton数据
TopicConfigManager
监听此zk节点的⼦子节点/config/changes/,通过LogManager更新topic的配置信息,topic粒度配置管理,具体请查看topic级别配置
KafkaHealthcheck 监听zk session expire,在zk上创建broker信息,便于其他broker和consumer获取其信息
KafkaController
kafka集群*控制器选举,leader选举,副本分配。
KafkaScheduler
负责副本管理和日志管理调度等等
ZkClient 负责注册zk相关信息.
BrokerTopicStats
topic信息统计和监控
ControllerStats *控制器统计和监控
KafkaServer部分主要代码如下:
[java] view
plain copy
- ……
- def startup() {
- info(“starting”)
- isShuttingDown = new AtomicBoolean(false)
- shutdownLatch = new CountDownLatch(1)
- /* start scheduler */
- kafkaScheduler.startup()
- /* setup zookeeper */
- zkClient = initZk()
- /* start log manager */
- logManager = createLogManager(zkClient)
- logManager.startup()
- socketServer = new SocketServer(config.brokerId,
- config.hostName,
- config.port,
- config.numNetworkThreads,
- config.queuedMaxRequests,
- config.socketSendBufferBytes,
- config.socketReceiveBufferBytes,
- config.socketRequestMaxBytes)
- socketServer.startup()
- replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager, isShuttingDown)
- kafkaController = new KafkaController(config, zkClient)
- /* start processing requests */
- apis = new KafkaApis(socketServer.requestChannel, replicaManager, zkClient, config.brokerId, config, kafkaController)
- requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
- Mx4jLoader.maybeLoad()
- replicaManager.startup()
- kafkaController.startup()
- topicConfigManager = new TopicConfigManager(zkClient, logManager)
- topicConfigManager.startup()
- /* tell everyone we are alive */
- kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient)
- kafkaHealthcheck.startup()
- registerStats()
- startupComplete.set(true);
- info(“started”)
- }
- private def initZk(): ZkClient = {
- info(“Connecting to zookeeper on “ + config.zkConnect)
- val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer)
- ZkUtils.setupCommonPaths(zkClient)
- zkClient
- }
- /**
- * Forces some dynamic jmx beans to be registered on server startup.
- */
- private def registerStats() {
- BrokerTopicStats.getBrokerAllTopicsStats()
- ControllerStats.uncleanLeaderElectionRate
- ControllerStats.leaderElectionTimer
- }
- …….
本文来源 互联网收集,文章内容系作者个人观点,不代表 本站 对观点赞同或支持。如需转载,请注明文章来源,如您发现有涉嫌抄袭侵权的内容,请联系本站核实处理。