主题 Topic
topic 是 Kafka 最基础的组织单位,类似于关系数据库中的数据表。做为使用 kafka 的开发者,你最应该考虑的是和 topoc 相关的抽象。创建不同的 topic 保存不同种类的 events,或者通过不同的 topic 保存各种版本经过过滤、转换后的同类 events。
topic 就是一连串 event 日志(相对 queue 来说,没有索引,不允许从中间插入,只能通过追加的方式写入)。对于日志来说,语义简单,数据结构也简单。第一,日志只支持追加写入:当在日志中写入消息时,总是写在末端。第二,读取日志在日志中搜索偏移量(offset),然后扫描有序的日志条目。第三,日志中一系列事件具有不可变性——覆水难收。日志简单的语义,使其在 Kafka 中,进出 topic 时能保持较高的吞吐量,并且意味着复制 topic 将更加容易。
日志也是基本的持久化手段。传统企业的消息发送系统有 topic 和 queue,该系统在发送者和接受者之间暂时缓存消息。
因为 Kafka 的 topic 是日志,所以在 topic 中的数据不存在暂存的问题。每个 topic 都可以配置数据的有效期(或者固定大小),从几秒钟到几年,甚至无限期。构成 Kafka topic 的日志做为文件存储在磁盘上。当你向 topic 中写入事件时,该事件的持久性就像你将它写入你信任的数据库一样。
简单的日志、内容的不可变性是 Kafka 在现代数据基础架构中,成功的关键——但这些只是开始。
Kafka 分区
如果一个 topic 只能在一台机器上存活,将极大的限制 Kafka 的可扩展性。Kafka 可以跨多台机器管理多个 topic——毕竟,Kafka 是分布式系统——但没有哪个主题能够无限制的变大或者容纳太多的读写。幸运的是,Kafka 赋予了我们将 topic 分区的能力。
分区将 topic 的单个日志文件拆分为多个日志文件,每一个日志文件在 Kafka 集群中,隔离的节点上都能存活。这样一来,存储消息、写入消息、处理已有的消息都能分离到集群的节点中。
Kafka 如何分区
如果将 topic 拆分到各个分区,我们需要将消息分配至分区的方法。通常而言,如果消息没有 key,消息将通过轮询的方式写入 topic 的各个分区中。在这种情况下,所有分区平均分配数据,但是我们不保留写入的消息顺序。如果消息有 key,将会通过 key 的哈希值计算出目标分区。这使得 Kafka 能保证相同的 key 写入同一个分区,并且有序。
例如,如果你生成一些和同一个客户相关的事件,使用客户的 ID 做为 key 将保证来自客户的所有事件是有序的。但是这可能会导致,一个非常活跃的 key 创建一个又大、又活跃的分区,但是在实践中,这种风险很小,并且出现这种情况时,是可控的。大多数情况下,保持 key 的有序性是值得的。
Kafka Brokers
目前为止,我们讨论了事件、主题、分区,但是还没有提到架构中的计算机。从物理基础设施来说,Kafka 是由一个机器(称为 broker)网络组成的。就目前的部署方式来说,这些 broker 可能不会部署在物理隔离的服务器上,而是基于 pod 部署在容器中,而这些 pod 又是基于虚拟服务器运行,虚拟服务器基于某个物理数据中的核心运行。无论如何,这些 broker 部署后,它们之间互相独立运行 Kafka broker 进程。每个 broker 持有一些分区,并且处理请求以写入新的事件到那些分区中或者从那些分区中读取事件。broker 同时处理分区之间的 replication。
Replication
如果我们只有一个 broker 保存 partition,那么 replication 将不工作。无论 broker 是部署在裸机上还是托管在容器中,它们以及它们潜在的存储都可能受到宕机的影响,所以我们需要将分区的数据拷贝至几个 broker 中,从而保证数据的安全性。那些拷贝叫做从副本,反之,主分区叫做主副本。当你生产数据到主分区中时——一般而言,读和写都是通过主分区完成的——主分区和从分区同时工作,复制那些新的写入到从分区中。
这个过程是自动的,而你可以在生产者中通过设置生成各种层级的持久化担保,这个不是开发者基于 Kafka 构建系统时都需要考虑的工作。做为开发者,你只需要知道你的数据是安全的,如果集群中的某个节点挂了,另一个节点将完全接手它的工作。
客户端应用程序
现在,我们脱离 Kafka 集群本身不谈,看看使用 Kafka 的应用程序:生产者和消费者。这些客户端应用程序,包含将消息放入 topic 中,从 topic 中读取消息的代码。Kafka 平台中的所有组件,除 broker 外,要么是生产者,要么是消费者,或者两者兼备。生产和消费是你与集群的交互方式。
Kafka 生产者
生产者库的暴露的 API 相当轻量化:在 Java 中,有一个叫做 KafkaProducer 的类,你可以用其连接集群。使用 Map 来配置参数,包括集群中的几个 broker 的地址,可用的安全配置,以及其他决定生产者网络行为的配置。还有一个类叫做 ProduceRecord,用于持有你要发送到集群中的键值对。
大致来说,这就是生产消息所需的所有 API。从底层来说,该库管理了链接池、网络缓冲、等待 broker 确认消息、在必要的时候转发消息、以及一些开发者不需要关心的主机相关的 host 细节。
Kafka 消费者
消费者 API 原则上和生产者差不多。使用一个叫做 KafkaConsumer 的类连接集群(使用 Map 配置集群地址、安全性、以及其他参数)。然后使用该链接订阅多个 topic。当 topic 中有可用的消息时,客户端以 ConsumerRecord 的格式接收消息,并保存在 ConsumerRecords 集合中。一个 ConsumerRecod 对象代表一条 Kafka 消息的键值对。
KafkaConsumer 管理链接池以及网络协议,就和 KafkaProducer 一样,但是在读取方面,除了网络管道之外,还有更多的内容。首当其冲的是,Kafka 与其他消息队列不同,只是读取消息,不销毁消息;其他消费者仍然可以消费那些消息。事实上,在 Kafka 中,多个消费者从一个 topic 中消费信息非常正常。这个小事实对结合 Kafka 的软件架构来说,产生了极大的影响。
同时,消费者需要处理以下这种情况,某个主题的消费速率,结合处理单条消息的计算成本后,单个实例无法承受。也就是说,需要扩展消费者。在 Kafka 中,扩展消费者组或多或少是自动的。
Kafka 组件、生态系统
如果你的系统只是随着生产者(写事件)和消费者(读事件)的不断增长 ,通过 broker 管理分区、复制 topic,那么你的系统将是一个非常有用的系统。相比之下,Kafka 社区的经验表明,某些模式显现出来,并促使你和你的组员围绕核心 Kafka 重复的构建相同的功能。
最终,你将构建一些公共的应用程序层,从而避免重复的无差别任务。这些代码的能力非常重要,但是和你的业务完全无关。并且对你的用户不产生直接价值。这些是基础设施,并且应该由社区或者基础设施供应商提供。
写这些代码可能很有诱惑力,但是你不应该这么做。这类基础设施代码有:Kafka Connect、Confulent Schema Registry、Kafka Streams、ksqlDB。接下来,我们将逐一进行讲解。
Kafk Connect
在存储和检索的世界里,不都是使用 Kafka。有时候,你可能想把存储在那些系统里的数据存入 Kafka topic 中,有时候,你想把 Kafka topic 里的数据存入那些系统中。做为 Apache Kafka 的集成 API,Kafka Connect 负责这些能力。
Kafka Connect 做了哪些事情?
一方面,Kafka Connect 是可插拔连接器中的一个生态系统,另一方面,它也是一个客户端应用程序。做为客户端应用程序,Connect 是一个直接在硬件上运行的服务端进程,独立于 Kafka broker。Connect 可扩展并且容错,这表明你不仅可以运行单一的 Connect worker,还可以运行 Connect worker 集群,这些 worker 共享从外部系统进出 Kafka 的负载。 Kafka Connect 同时抽象了编码,只需要使用 JSON 配置即可运行。以下从 Kafka 到 Elasticsearch 的流数据配置实例:
{
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"topics" : "my_topic",
"connection.url" : "http://elasticsearch:9200",
"type.name" : "_doc",
"key.ignore" : "true",
"schema.ignore" : "true"
}
Kafka Connect 的优势
Kafka Connect 最主要的优势是连接器的强大生态。将数据写入云存储、ES、或者关系数据库中的代码,在不同的业务之间,不会产生很大的变化。同样,从关系数据库、Salesforce、或者遗留 HDFS 文件系统读取数据也是相似的。你可以自己编写这部分代码,但是花费的时间并不会为你的客户带来独特的价值,也不会让你的业务更有竞争力。
Kafka Streams
在一个持续更新、集成了 Kafka 的应用程序中,消费者会变得越来越复杂。刚开始的时候,可能只是一个简单的无状态转换器(例如,个人信息脱敏、转换消息格式),不久后,集成进复杂的聚合、填充数据等操作。如果你回顾我们上面提到的消费者代码,就会发现 API 不支持那些操作:你将要构建很多框架代码来处理时间窗口、延迟消息、查找数据表、通过 Key 做集成,等等。等你实现那些代码后,你会发现像聚合、填充数据这类操作通常是有状态的。
“状态”需要保存在程序的堆中,这就意味着它的容错债务。如果你的流处理应用程序下线了,应用程序中的状态也就丢失了,除非你设计了一张表来保存状态。在可扩展的情况下,编写、调试这类事情并不简单,同时也不会让你的用户受益。这就是为什么 Apache Kafka 提供流处理 API 的原因。也是为什么需要 Kafka Streams 的原因。
Kafka Streams 是什么
Kafka Streams 是一个 Java API,在访问流处理计算原语时,为你提供便利性:过滤、分组、聚合、连接,等等,也不需要你基于消费者 API 编写框架代码。同时支持流处理计算产生的大量状态。如果你对一个吞吐量较高的 topic ,通过某个字段做事件分组,然后每个小时对分组数据做一次汇总,那么你可能需要很大的内存。
确实,对于数据量较大的 topic,复杂的流处理拓扑图来说,不难想象,你必须要部署一个集群来共享流处理负载,就和消费组一样。Steams API 为你解决了分布式状态的问题:它将状态持久化至本地磁盘和 Kafka 集群的 topic 中,并且在流处理集群中添加或移除流处理节点时,在多节点间自动分配状态。
在一个典型的微服务中,流处理是其他功能的附加产物。例如,发货通知服务可能会将产品信息变更日志(包括客户记录)结合起来,生成发货通知对象,其他服务将其转换为电子邮件或者短信。但是,当为移动 App 或者 Web 前端展示发货状态时,发货通知服务也有义务通过 REST API 封装同步关键字检索接口。
该服务是基于事件响应的服务——在这种情况下,连接(join) 3 个流,可能还要基于连接(join)结果做窗口计算——但是它也提供了基于 REST 接口提供的 HTTP 服务,可能基于 Spring 框架或者 Micronaut, 或者其他常用的 Java API。鉴于 Kafka Streams 是一个只做流处理的 Java 库,而不是一组专门做流处理的基础组件,所以,使用其他框架实现 REST 接口,以及复杂的、可扩展的、容错的流处理非常容易。