kafka

Kafka 是一种高吞吐的分布式消息系统,能够替代传统的消息队列用于解耦合数据处理,缓存未处理消息等,同时具有更高的吞吐率,支持分区、多副本、冗余,因此被广泛用于大规模消息数据处理应用。

简介

特点

  1. 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间复杂度的访问性能。
  2. 高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条以上消息的传输。【据了解,Kafka每秒可以生产约25万消息(50 MB),每秒处理55万消息(110 MB)】
  3. 支持Kafka Server间的消息分区,同时保证每个Partition内的消息顺序传输。
  4. 分布式系统,易于向外扩展。所有的producer、broker和consumer都会有多个,均为分布式的。无需停机即可扩展机器。
  5. 消息被处理的状态是在consumer端维护,而不是由server端维护。当失败时能自动平衡。
  6. 同时支持离线数据处理和实时数据处理。

架构

Kafka的整体架构非常简单,producer、broker(kafka)和consumer都可以有多个。Producer,consumer实现Kafka注册的接口,数据从producer发送到broker,broker承担一个中间缓存和分发的作用。broker分发注册到系统中的consumer。broker的作用类似于缓存,即活跃的数据和离线处理系统之间的缓存。客户端和服务器端的通信,是基于简单,高性能,且与编程语言无关的TCP协议。

zookeeper 负责维持连接,保存broker信息,topic和 groupid,offset信息

broker 负责存储数据

基本概念

流程图

Message

消息,是通信的基本单位,每个producer可以向一个topic(主题)发布一些消息。

Topic

特指Kafka处理的消息的不同分类, topic可以有多个分区,但是每个分区最多只能被一个相同分组的消费者进行消费,也就是消费者与分区的关系是一对多的关系。

Partition

Topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。partition的数量是可以动态增加的(只能加不能减)。只能被一个consumer进行消费。

Producers

消息和数据生产者,向Kafka的一个topic发布消息的过程叫做producers。

Consumers

消息和数据消费者,订阅topics并处理其发布的消息的过程叫做consumers。可以同时消费多个partition。

Broker

缓存代理,Kafa集群中的一台或多台服务器统称为broker。

Group

consumer group, 定义同一组消费者,共享同一个偏移 ,一个主题下的一个分区只能被同一个消费组的一个消费者消费。

Offset

offset, 偏移,指针,可调,只要数据不过期,可以反复消费

安装

首先要确保已经安装好 JDK

从官网下载Kafka 安装包(https://kafka.apache.org/downloads , 选择合适的版本和镜像 ),并解压:

wget 'http://mirror.bit.edu.cn/apache/kafka/1.0.0/kafka_2.11-1.0.0.tgz'
tar zxvf kafka_2.11-1.0.0.tgz
mv kafka_2.11-1.0.0 /usr/local
cd /usr/local
ln -s kafka_2.11-1.0.0 kafka
cd  /usr/local/kafka

详见 kafka/install

常用命令

启动Zookeeper

使用安装包中的脚本启动单节点Zookeeper 实例:(正式环境中zookeeper是多节点的)

bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

zookeeper 会监听 2181 端口

启动Kafka 服务

使用kafka-server-start.sh 启动kafka 服务:

bin/kafka-server-start.sh  -daemon config/server.properties

注意:

kafka 默认会监听 9092 端口,自行检测是否与其他程序冲突

查看所有topic列表

bin/kafka-topics.sh --list --zookeeper localhost:2181
test

创建topic

使用kafka-topics.sh 创建单分区单副本的topic test:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
  • replication-factor 复制因子,小于等于brokers数量,同样一份数据复制到其他brokers用来做备份,避免完全宕机,实际上只有leader partition 会被客户端读写
  • partitions 分区

查看单个topic状态

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test	PartitionCount:10	ReplicationFactor:1	Configs:
	Topic: test	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
	Topic: test	Partition: 1	Leader: 0	Replicas: 0	Isr: 0
	Topic: test	Partition: 2	Leader: 0	Replicas: 0	Isr: 0
	Topic: test	Partition: 3	Leader: 0	Replicas: 0	Isr: 0

删除topic

bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test

产生消息

使用kafka-console-producer.sh 发送消息:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 
Hello world!
Hello Kafka!

使用 ctrl+D 结束输入

消费消息

使用kafka-console-consumer.sh 接收消息并在终端打印:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
Hello world!
Hello Kafka!

PHP操作Kafka

详见 php-kafka

参见

参考链接