Kafka 介绍和单机开发测试环境配置部署

原创 kafka

Kafka 是由 Apache 软件基金会开发的一个开源流处理平台,由 Scala 和 Java 编写。Kafka 是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像 Hadoop 的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka 的目的是通过 Hadoop 的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。

流媒体平台的三个关键功能

  1. 发布和订阅记录流,类似于消息队列或企业消息传递系统。
  2. 以容错持久的方式存储记录流。
  3. 处理记录发生的流。

kafka 通常用于两大类应用

  1. 构建可在系统或应用程序之间可靠获取数据的实时流数据管道
  2. 构建实时流应用程序,用于转换或响应数据流

几个概念

  • Kafka 作为一个集群运行在一台或多台可以跨越多个数据中心的服务器上。
  • kafka 集群以 topics 的形式存储记录流。
  • 每个记录由一个键,一个值和一个时间戳组成。

Kafka 的四个核心 API

在 Kafka 中,客户端和服务器之间的通信是通过一种简单的,高性能的,语言不可知的 TCP 协议完成的。 该协议是版本控制的,并保持与旧版本的向后兼容性。 我们为 Kafka 提供 Java 客户端,但客户端可以使用多种语言。

  1. Producer API: 允许应用程序将记录流发布到一个或多个 Kafka topics。
  2. Consumer API: 允许应用程序订阅一个或多个 topics 并处理生成给他们的记录流。
  3. Streams API: 允许应用程序充当流处理器,从一个或多个 topics 中消耗输入流,并将输出流生成为一个或多个输出 topics,从而将输入流有效地转换为输出流。
  4. Connector API: 用于构建和运行将 Kafka topics 和现有应用或数据系统连接的可重用的 produers 和 consumers。 例如,连接到关系数据库的连接器可能会捕获对表的每个更改。

Topics and Logs

Topic 是发布记录的类别。Kafka 中的 Topics 一般是多订阅者的,也就是一个 Topic 可以有 0 个或多个 Consumer 订阅它的数据。

对于每个主题,Kafka 会维护一个如下所示的分区日志:

kafka-anatomy-of-a-topic.png

每个分区是一个有序的,以不可变的记录顺序追加的 Commit Log。分区中的每个记录都有一个连续的 ID,称为 Offset,唯一标识分区内的记录。

Kafka 集群使用记录保存时间的配置来保存所有已发布的记录(无论他们是否被消费)。例如,配置策略为两天,那么在一条记录发布两天内,这条记录是可以被消费的,之后将被丢弃以腾出空间。Kafka 的性能和数据量无关,所以存储长时间的数据并不会成为问题。

kafka-producers-consumers.png

实际上唯一需要保存的元数据是消费者的消费进度,即消费日志的偏移量(Offset)。这个 Offset 是由 Consumer 控制的:通常消费者会在读取记录时以线性方式提升 Offset,但是事实上,由于 Offset 由 Consumer 控制,因此它可以以任何顺序消费记录。例如一个 Consumer 可以通过重置 Offset 来处理过去的数据或者跳过部分数据。

这个特征意味着 Kafka 的 Consumer 可以消费 “过去” 和 “将来” 的数据而不对集群和其他 Consumer 造成太大的影响。例如,可以使用命令行工具 tail 来获取 Topic 尾部的内容而不对已经在消费 Consumer 造成影响。

分区日志有几个目的:
第一,使服务器能承载日志的大小,每个分区的日志必须可以被保存在单个服务器上,但是一个 Topic 可以拥有多个分区,那么它可以处理任意大小的数据量。
第二,它们作为并行度的单位(更多的是这点的考虑)。

Distribution

分区日志分布在集群中服务器中,每个服务器处理一部分分区的数据和请求。每个分区可以配置分布的服务器,以实现容错。

每个分区拥有一个 Leader 节点,和零或多个 Follower。Leader 处理该分区所有的读写请求,Follower 复制 Leader 数据。如果 Leader 节点宕机,将会有一个 Follower 节点自动的转化为 Leader。每个节点成为其部分分区的 Leader,并成为剩余分区的 Follower,这样整个集群的负载将比较均衡。

Producers

Producer 发送数据到它选择的 Topic。Producer 负责决定将数据发送到 Topic 的那个分区上。这可以通过简单的循环方式来平衡负载,或则可以根据某些语义来决定分区(例如基于数据中一些关键字)。

Consumers

Consumer 使用一个 group name 来标识自己的身份,每条被发送到一个 Topic 的消息都将被分发到属于同一个 group 的 Consumer 的一个实例中(group name 相同的 Consumer 属于一个组,一个 Topic 的一条消息会被这个组中的一个 Consumer 实例消费)。Consumer 实例可以在单独的进程中或者单独的机器上。

如果所有的 Consumer 实例都是属于一个 group 的,那么所有的消息将被均衡的分发给每个实例。

如果所有的 Consumer 都属于不同的 group,那么每条消息将被广播给所有的 Consumer。

kafka-cluster.png

(上图)一个包含两个 Server 的 Kafka 集群,拥有四个分区(P0-P3),有两个 Consumer group:Group A 和 Group B。Group 有C1、C2 两个 Consumer,GroupB 有 C3、C4、C5、C6 四个 Consumer。

更常见的是,Topic 有少量的 Consumer group,每一个都是 “一个逻辑上的订阅者”。每个 group 包含多个 Consumer 实例,为了可伸缩性和容错性。这就是一个发布-订阅模式,只是订阅方是一个集群。

Kafka 中消费的实现方式是“公平”的将分区分配给 Consumer,每一个时刻分区都拥有它唯一的消费者。Consumer 成员关系有 Kafka 程序动态维护。如果新的 Consumer 加入了分区,那么它会从这个分区其他的 Consumer 中分配走一部分分区;如果部分 Consumer 实例宕机,它的分区会被其他 Consumer 实例接管。

Kafka 只保证同一个分区内记录的顺序,而不是同一个 Topic 的不同分区间数据的顺序。每个分区顺序结合按 Key 分配分区的能力,能满足大多数程序的需求。如果需要全局的顺序,可以使用只有一个分区的 Topic,这意味着每个 group 只能有一个 Consumer 实例(因为一个分区同一时刻只能被一份 Consumer 消费——多加的 Consumer 只能用于容错)。

Guarantees

Kafka 高级 API 中提供一些能力:

被一个 Producer 发送到特定 Topic 分区的消息将按照他们的发送顺序被添加到日志中。这意味着,如果 M1、M2 是被同一个 Producer 发送出来的,且 M1 先发送,那么 M1 拥有更小的 Offset,在日志中的位置更靠前。

Consumer 按照消息的存储顺序在日志文件中查找消息。

对于复制配置参数为 N 的 Topic,我们能容忍 N-1 的服务器故障,而不会丢失已经 Commit 的数据。有关这些保证更详细的信息,参见文档的设计部分。

Kafka as a Messaging System

Kafka 的流模式和传统的消息系统有什么区别?

消息传统上有两种模式:队列和发布-订阅。在队列中,一群 Consumer 从一个 Server 读取数据,每条消息被其中一个 Consumer 读取。在发布-订阅中,消息被广播给所有的 Consumer。这两种模式有各自的优缺点。队列模式的优点是你可以在多个消费者实例上分配数据处理,从而允许你对程序进行“伸缩”。确定是队列不是多用户的,一旦消息被一个 Consumer 读取就不会再给其他 Consumer。发布订阅模式允许广播数据到多个 Consumer,那么就没办法对单个 Consumer 进行伸缩。

Kafka 的 Consumer group 包含两个概念。与队列一样,消费组允许通过一些进程来划分处理(每个进程处理一部分)。与发布订阅一样,Kafka 允许广播消息到不同的 Consumer group。

Kafka 模式的优势是每个 Topic 都拥有队列和发布-订阅两种模式。

Kafka 比传统的消息系统有更强的顺序保证。

传统的消息系统在服务器上按顺序保存消息,如果多个 Consumer 从队列中消费消息,服务器按照存储的顺序输出消息。然后服务器虽然按照顺序输出消息,但是消息将被异步的传递给 Consumer,所以他们将以不确定的顺序到达 Consumer。这意味着在并行消费中将丢失消息顺序。传统消息系统通常采用“唯一消费者”的概念只让一个 Consumer 进行消费,但这就丢失了并行处理的能力。

Kafka 做的更好一些。通过提供分区的概念,Kafka 能提供消费集群顺序和负载的平衡。这是通过将分区分配个一个 Consumer group 中唯一的一个 Consumer 而实现的,一个分区只会被一个分组中的一个 Consumer 进行消费。通过这么实现,能让一个 Consumer 消费一个分区并按照顺序处理消息。因为存在多个分区,所有可以在多个 Consumer 实例上实现负载均衡。注意,一个分组内的 Consumer 实例数不能超过分区数。

Kafka as a Storage System

任何将发送消息和消费结构的消息队列都有效的用作一个消息的存储系统。不同的是 Kafka 是一个更好的存储系统。

被写入到 Kafka 的数据将被写入磁盘并复制以保证容错。Kafka 允许 Producer 等待确定,以保证 Producer 可以确认消息被成功持久化并复制完成。

Kafka 使用的存储结构,使其提供相同的能力,无论是存储 50KB 或者 50TB 持久化数据。

因为允许客户端控制读取的位置,可以将 Kafka 视为高性能,低延迟的日志存储、复制、传播的分布式系统。

Kafka for Stream Processing

仅仅是读写和存储流数据是不够的,Kafka 的目标是对流式数据的实时处理。

在 Kafka 中,Stream Producer 从输入的 Topic 中读取数据,执行一些操作,生成输出流到输出的 Topic 中。

例如,零售的应用程序将收到销售和出货的输入流,并输出根据该数据计算的重排序和价格调整后的数据流。

可以使用 Producer 和 Consumer 实现简单的处理。对于更复杂的转换,Kafka 提供了完整的 Stream API,允许构建将流中数据聚合或将流连接到一起的应用。

这用于解决以下的一些困难:处理无需的数据,执行有状态的计算等。

Stream API 基于 Kafka 的核心函数构建:使用 Producer 和 Consumer API 用于输入,使用 Kafka 作为有状态的存储,使用 group 机制来实现 Stream 处理器的容错。

Putting the Pieces Together

消息、存储和流处理这种组合看是不寻常,但是 Kafka 作为流式平台这是必须的。

类似 HDFS 的分布式文件系统存储静态的文件用于批处理。这种的系统允许存储和处理历史数据。

传统的企业消息系统允许处理在你订阅之后的未来的数据。以这种方式构建的应用程序在未来数据到达时进行处理。

Kafka 组合这些能力,并且组合这些对 Kafka 作为流应用平台和流数据通道至关重要。

通过组合存储和低延迟的订阅,流应用程序能以相同的方式处理过去和未来的数据。一个单一的程序可以处理过去的历史数据,并且不会在达到一个位置时停止,而是能继续处理将来到达的数据。这是一个广泛的流处理的概念,其中包含批处理和消息驱动的应用程序。

同样,对于数据流通道,组合订阅机制和实时事件使 Kafka 成为非常低延迟的管道;数据的存储能力使其能和可能会进行停机维护的周期性处理数据的离线系统集成,或用于必须保证数据被确认交付的场景。流处理程序可以在数据到达后进行处理。

其他有关 Kafka 提供的 guarantees、API、功能,参阅其他文档。

Kafka 单机配置部署

首先安装 jdk 和 zookeeper,jdk 安装就不讲了,简单的说下 zookeeper 单机的部署。

单机部署 zookeeper

下载 zookeeper 的 tar.gz 包,解压。

tar -zxf zookeeper-3.4.10.tar.gz -C /opt/

在主目录下创建 data 目录用于存储数据:

cd /opt/zookeeper-3.4.10/
mkdir data

/opt/zookeeper-3.4.10/conf 目录下新建 zoo.cfg 文件,写入以下内容保存:

tickTime=2000
dataDir=/opt/zookeeper-3.4.10/data
dataLogDir=/opt/zookeeper-3.4.10/logs
clientPort=2181

启动和停止 zookeeper。进入 /opt/zookeeper-3.4.10/bin 目录,启动、停止、重启和查看当前节点状态(包括集群中是何角色)分别执行:

./zkServer.sh start
./zkServer.sh stop
./zkServer.sh restart
./zkServer.sh status

单机部署 kafka

下载 kafka 的 tgz 包,解压至 /opt 目录。

tar -zxvf kafka_2.11-2.1.0.tgz -C /opt

现在 kafka 安装目录创建一个 logs 目录用于存储 kafka 日志数据。

mkdir -p /opt/kafka_2.11-2.1.0/logs

配置 kafka

进入 /opt/kafka_2.11-2.1.0/config 配置 server.properties

# 唯一 ID 同一集群下 broker.id 不能重复
broker.id=1

# 监听地址
listeners=PLAINTEXT://localhost:9092

# 日志数据目录
log.dirs=/opt/kafka_2.11-2.1.0/logs

# kafka 数据保留时间单位为 hour 默认 168小时即 7天
log.retention.hours=168

# (kafka 数据量最大值,超出范围自动清理,和 log.retention.hours 配合使用,注意其最大值设定不可超磁盘大小)
log.retention.bytes=1073741824

# (zookeeper 连接 ip 及 port,多个以逗号分隔)
zookeeper.connect:192.190.10.170:2181

启动和停止

./bin/kafka-server-start.sh config/server.propertiess
./bin/kafka-server-stop.sh

创建 topic

创建topic:

./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

展示topic:

./bin/kafka-topics.sh --list --zookeeper localhost:2181

生产者:

./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

消费者:

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

kafka 远程访问

如果 kafka 需要被远程访问,那么 listeners 配置项需要指定远程访问的 ip,或者设置为 0.0.0.0 允许所有远程客户端访问。这里需要注意一个问题,kafka 的 advertised.listeners 配置项默认使用 listeners 的值,listeners 允许设置为 0.0.0.0 以监听所有远程请求,而 advertised.listeners 不允许设置为 0.0.0.0,所以这种情况要配置 advertised.listeners 项,否则会报 requirement failed: advertised.listeners cannot use the nonroutable meta-address 0.0.0.0. Use a routable IP address. 错误:

root@Master:/opt/kafka_2.11-2.1.0# ./bin/kafka-server-start.sh ./config/server.properties 
[2018-12-20 18:23:42,861] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2018-12-20 18:23:43,808] ERROR Exiting Kafka due to fatal exception (kafka.Kafka$)
java.lang.IllegalArgumentException: requirement failed: advertised.listeners cannot use the nonroutable meta-address 0.0.0.0. Use a routable IP address.
	at scala.Predef$.require(Predef.scala:224)
	at kafka.server.KafkaConfig.validateValues(KafkaConfig.scala:1406)
	at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1374)
	at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1063)
	at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:1043)
	at kafka.server.KafkaServerStartable$.fromProps(KafkaServerStartable.scala:28)
	at kafka.Kafka$.main(Kafka.scala:59)
	at kafka.Kafka.main(Kafka.scala)
root@Master:/opt/kafka_2.11-2.1.0#

配置 kafka 允许远程访问:

listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://Master:9092

kafka 多网卡支持

前天有个朋友找我,他们的 kafka 服务设备上有多块网卡,需要配置 kafka 多网卡地址服务,即客户端请求两个网卡地址都可以获得 kafka 服务响应。这种情况,kafka 是支持的,listenersadvertised.listeners 的配置值是允许有多个的,使用逗号 , 分隔。

我这里是没有多网卡设备的,但是使用 127.0.0.1 模拟了另外一张网卡,最后在朋友的环境上测试是通过的。参考了论坛地址:KIP-103: Separation of Internal and External traffic#Public Interfaces

这种情况的配置需要注意两个点。第一需要给协议起别名,因为不允许有两个名称一摸一样的实例存在,需要配置 listener.security.protocol.map 项给协议分配别名。第二个端口不能一样,这个很好理解,一台主机上的一个端口不能被两个实例同时占用。否则会报端口占用问题:

[2018-12-20 18:52:36,667] ERROR Exiting Kafka due to fatal exception (kafka.Kafka$)
java.lang.IllegalArgumentException: requirement failed: Each listener must have a different port, listeners: PLAINTEXT://Master:9092,CLIENT2://localhost:9092
	at scala.Predef$.require(Predef.scala:224)
	at kafka.utils.CoreUtils$.validate$1(CoreUtils.scala:303)
	at kafka.utils.CoreUtils$.listenerListToEndPoints(CoreUtils.scala:314)
	at kafka.server.KafkaConfig.advertisedListeners(KafkaConfig.scala:1334)
	at kafka.server.KafkaConfig.validateValues(KafkaConfig.scala:1396)
	at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1374)
	at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1063)
	at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:1043)
	at kafka.server.KafkaServerStartable$.fromProps(KafkaServerStartable.scala:28)
	at kafka.Kafka$.main(Kafka.scala:59)
	at kafka.Kafka.main(Kafka.scala)
root@Master:/opt/kafka_2.11-2.1.0#

具体配置示例:

listener.security.protocol.map=PLAINTEXT:PLAINTEXT,CLIENT2:PLAINTEXT
listeners=PLAINTEXT://0.0.0.0:9092,CLIENT2://0.0.0.0:9093
advertised.listeners=PLAINTEXT://Master:9092,CLIENT2://localhost:9093

kafka 生产者和消费者 java 代码示例

生产者测试代码 ProducerTest.java:

/**
 * @author daimafans
 * @since 2018/12/19 19:09
 */
public class ProducerTest
{
    @Test
    public void testProducer()
    {
        Map<String, String> user = new HashMap<>();
        user.put("username", "liuqianfei");
        user.put("age", String.valueOf(24));

        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.190.10.170:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        producer.send(new ProducerRecord<>("spinfo", String.valueOf(1), JSON.toJSONString(user)));
        producer.close();
    }
}

运行完成后我们上服务器查看 topic 和刚刚生成的消息。

kafka-list-topic-data.jpg

消费者测试代码 ConsumerTest.java:

/**
 * @author liuqianfei
 * @since 2018/12/19 19:09
 */
public class ConsumerTest
{
    @Test
    public void testProducer()
    {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.190.10.170:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-1");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("spinfo"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
                System.out.println();
            }
        }
    }
}

启动监听轮询后,再次使用生产者发布一条消息,消费者就可以消费到消息了。

[2018-12-20 19:12:27.595 DEBUG] [main] {org.apache.kafka.clients.consumer.internals.Fetcher:202} - [Consumer clientId=consumer-1, groupId=group-1] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(spinfo-0), toForget=(), implied=()) to broker Master:9092 (id: 0 rack: null)
offset = 1, key = 2, value = {"age":"25","username":"liuqianfei2"}
如果觉得这对你有用,请随意赞赏,给与作者支持
评论 0
最新评论