Kafka安装
kafka定义:
- kafka是一个分布式的基于发布/订阅模式的消息队列,主要应用于大数据实时处理领域
下载地址:http://archive.apache.org/dist/kafka/
[root@kk data]# tar -xf kafka_2.12-2.5.0.tgz [root@kafka01 data]# mv kafka_2.12-2.5.0 kafka
使用消息队列的好处
(1)解耦
允许你独立的扩展或修改 两边的处理过程,只要确保它们遵守同样的接口约束。就是可以把消息放到队列里,不用保证两边消息互传的时候同时在线
(2)可恢复性
系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理
(3)缓冲
有助于控制和优化数据流经过系统的速度,解决产生消息和消费消息的处理速度不一致【解决生产消息大于消费消息的问题】
(4)灵活性&峰值处理能力
在访问量剧增的情况下,应用仍然需要继续 发挥作用,但是这样的突发流量并不常见。动态增减服务器
消息队列的两种模式
(1)点对点模式(一对一,消费者主动拉取数据,消息收到后消息清除)
消息生产者生产消息发送到Queue中,然后消息消费者从Queue中取出并且消费消息。消息被消费以后,queue中不再有存储,所以消费者不可能消费到已经被消费的消息。
Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。
(2)发布/订阅模式(一对多,消费者消费数据之后不会消除消息)
两种模式:一种订阅号主动推送,一种消费者主动拉去【kafka就是这种模式】
消息生产者(发布)将消息发布到topic中,同时多个消息消费者(订阅)消费该消息。和 点对点方式不同,发布到topic的消息会被所有订阅者消费
Kafka基础架构
1)Producer :消息生产者,就是向kafka broker发送消息的客户端
2)Consumer:消息消费者,向kafka broker取消息的客户端
Kafka默认存储消息在磁盘,默认保存7天,存在主题里
Kafka的安装
首先安好zookeep集群:linux环境下安装zookeeper
[root@kafka01 data]# tar -xf kafka_2.12-2.5.0.tgz [root@kafka01 data]# mv kafka_2.12-2.5.0 kafka
修改配置文件
主要关注:server.properties 这个文件即可,我们可以发现在目录下:
有很多文件,这里可以发现有Zookeeper文件,我们可以根据Kafka内带的zk集群来启动,但是建议使用独立的zk集群
[root@kafka01 kafka]# ll config/
total 72
-rw-r--r-- 1 root root 906 Apr 8 2020 connect-console-sink.properties
-rw-r--r-- 1 root root 909 Apr 8 2020 connect-console-source.properties
-rw-r--r-- 1 root root 5321 Apr 8 2020 connect-distributed.properties
-rw-r--r-- 1 root root 883 Apr 8 2020 connect-file-sink.properties
-rw-r--r-- 1 root root 881 Apr 8 2020 connect-file-source.properties
-rw-r--r-- 1 root root 2247 Apr 8 2020 connect-log4j.properties
-rw-r--r-- 1 root root 2540 Apr 8 2020 connect-mirror-maker.properties
-rw-r--r-- 1 root root 2262 Apr 8 2020 connect-standalone.properties
-rw-r--r-- 1 root root 1221 Apr 8 2020 consumer.properties
-rw-r--r-- 1 root root 4675 Apr 8 2020 log4j.properties
-rw-r--r-- 1 root root 1925 Apr 8 2020 producer.properties
-rw-r--r-- 1 root root 6849 Apr 8 2020 server.properties
-rw-r--r-- 1 root root 1032 Apr 8 2020 tools-log4j.properties
-rw-r--r-- 1 root root 1169 Apr 8 2020 trogdor.conf
-rw-r--r-- 1 root root 1205 Apr 8 2020 zookeeper.properties
Server.properties配置文件说明
broker.id不得重复
[root@kafka01 config]# egrep -vn "^$|#" server.properties
broker.id=0 #当前机器在集群中的唯一标识,和zookeeper的myid性质一样
delete.topic.enable=true #删除topic功能使用
port=19092 #当前kafka对外提供服务的端口默认是9092
host.name=192.168.1.8 #这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问题。
num.network.threads=3 #这个是borker进行网络处理的线程数
num.io.threads=8 #这个是borker进行I/O处理的线程数
log.dirs=/data/kafka/kafkadata #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个
socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
num.partitions=1 #默认的分区数,一个topic默认1个分区数
log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天
message.max.byte=5242880 #消息保存的最大值5M
default.replication.factor=2 #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务
replica.fetch.max.bytes=5242880 #取消息的最大直接数
log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除
log.cleaner.enable=false #是否启用log压缩,一般不用启用,启用的话可以提高性能
zookeeper.connect=kafka01:2181,kafka02:2181,kafka03:2181 #设置zookeeper的连接端口
配置环境变量
[root@kafka01 config]# vim /etc/profile export KAFKA_HOME=/data/kafka/ export PATH=$PATH:$KAFKA_HOME/bin [root@kafka01 config]# source /etc/profile
发送配置文件,主要把kafka配置文件里的broker号改了
[root@kafka01 kafka]# scp -rp /data/kafka root@kafka02:/data [root@kafka01 kafka]# scp -rp /data/kafka root@kafka03:/data
[root@kafka01 kafka]# scp -rp /etc/profile root@kafka03:/etc/ root@kafka03's password: profile 100% 2159 557.8KB/s 00:00 [root@kafka01 kafka]# scp -rp /etc/profile root@kafka02:/etc/ root@kafka02's password: profile
启动kafka集群
[root@kafka01 bin]# cd /data/kafka/bin [root@kafka01 bin]# ./kafka-server-start.sh -daemon ../config/server.properties [root@kafka02 bin]# cd /data/kafka/bin [root@kafka02 bin]# ./kafka-server-start.sh -daemon ../config/server.properties [root@kafka03 bin]# cd /data/kafka/bin [root@kafka03 bin]# ./kafka-server-start.sh -daemon ../config/server.properties
检测是否启动
[root@kafka01 bin]# jps
10033 QuorumPeerMain
16167 Jps
16015 Kafka
kafka常用的命令
kafka-producer-perf-test.sh 命令是生产者做集群压力测试的命令
kafka-consumer-perf-test.sh 命令是消费者做集群压力测试的命令
启动/关闭脚本(没秘钥容易出问题)
#!/bin/bash kafka01=192.168.1.8 kafka02=192.168.1.9 kafka03=192.168.1.10 case $1 in "start"){ for i in kafka01 kafka02 kafka03 do echo "*********$i*********" ssh $i "/data/kafka/bin/kafka-server-start.sh -daemon /data/kafka/config/server.properties" done };; "stop"){ for i in kafka01 kafka02 kafka03 do echo "*********$i*********" ssh $i "/data/kafka/bin/kafka-server-stop.sh" done };; esac
Kafka命令行操作
1.查看当前服务器中的所有topic
[root@kafka01 bin]# kafka-topics.sh --zookeeper kafka01:2181 --list
2.创建topic
[root@kafka01 bin]# kafka-topics.sh --zookeeper kafka01:2181 --create --replication-factor 3 --partitions 1 --topic first
报错了
[root@kafka02 bin]# sh kafka-topics.sh --zookeeper kafka02:2181 --create --replication-factor 2 --partitions 2 --topic first Error while executing topic command : Replication factor: 2 larger than available brokers: 1.
原因:是因为replication-factor(topic副本)个数不能超过broker(服务器)的个数。这里broker只有1个,而replication-factor 设置为2,所以会报错。
--topic 主题的名称
--replication 副本数量
--partitions 分区数量
发现是有些broker没有启kafka,再次创建成功了
[root@kafka01 bin]# sh kafka-topics.sh --zookeeper kafka01:2181 --create --replication-factor 2 --partitions 2 --topic first Created topic first.
可以看到在一个kafka上创建了主题以后,每台都可以看到主题-分区号
3.删除主题命令
[root@kafka03 kafka]# sh bin/kafka-topics.sh --delete --zookeeper kafka01:2181 --topic first
Topic first is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
查看是否删除了主题
[root@kafka03 kafka]# sh bin/kafka-topics.sh --zookeeper kafka01:2181 --list
[root@kafka03 kafka]#
4.生产者连接主题
[root@kafka01 kafka]# sh bin/kafka-console-producer.sh --topic first --broker-list kafka01:19092
5.消费者订阅主题
[root@kafka02 kafka]# sh bin/kafka-console-consumer.sh --bootstrap-server kafka01:19092 --topic first --from-beginning
//--zookeeper空格 :2181 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --zookeeper :2181 --topic test --from-beginning

评论