Kafka安装

root
233
文章
0
评论
2021年2月21日18:38:31 评论 6240字阅读20分48秒

Kafka安装

kafka定义:

  • kafka是一个分布式的基于发布/订阅模式的消息队列,主要应用于大数据实时处理领域

官网地址:http://kafka.apache.org/

下载地址:http://archive.apache.org/dist/kafka/

 

[root@kk data]# tar -xf kafka_2.12-2.5.0.tgz 

[root@kafka01 data]# mv kafka_2.12-2.5.0 kafka

 

使用消息队列的好处

(1)解耦

允许你独立的扩展或修改 两边的处理过程,只要确保它们遵守同样的接口约束。就是可以把消息放到队列里,不用保证两边消息互传的时候同时在线

(2)可恢复性

系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理

(3)缓冲

有助于控制和优化数据流经过系统的速度,解决产生消息和消费消息的处理速度不一致【解决生产消息大于消费消息的问题】

(4)灵活性&峰值处理能力

在访问量剧增的情况下,应用仍然需要继续 发挥作用,但是这样的突发流量并不常见。动态增减服务器

消息队列的两种模式

(1)点对点模式(一对一,消费者主动拉取数据,消息收到后消息清除)

        消息生产者生产消息发送到Queue中,然后消息消费者从Queue中取出并且消费消息。消息被消费以后,queue中不再有存储,所以消费者不可能消费到已经被消费的消息。

Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。

(2)发布/订阅模式(一对多,消费者消费数据之后不会消除消息)

两种模式:一种订阅号主动推送,一种消费者主动拉去【kafka就是这种模式】

消息生产者(发布)将消息发布到topic中,同时多个消息消费者(订阅)消费该消息。和 点对点方式不同,发布到topic的消息会被所有订阅者消费

Kafka基础架构

1)Producer :消息生产者,就是向kafka broker发送消息的客户端

2)Consumer:消息消费者,向kafka broker取消息的客户端

1、主题(Topic):一个主题类似新闻中的体育、娱乐、教育等分类概念,在实际工程中通常一个业务一个主题。
2、分区(Partition):一个Topic中的消息数据按照多个分区组织,分区是kafka消息队列组织的最小单位,一个分区可以看作是一个FIFO( First Input First Output的缩写,先入先出队列)的队列。
kafka分区是提高kafka性能的关键所在,当你发现你的集群性能不高时,常用手段就是增加Topic的分区,分区里面的消息是按照从新到老的顺序进行组织,消费者从队列头订阅消息,生产者从队列尾添加消息。

Kafka默认存储消息在磁盘,默认保存7天,存在主题里

Kafka的安装

首先安好zookeep集群:linux环境下安装zookeeper

[root@kafka01 data]# tar -xf kafka_2.12-2.5.0.tgz

[root@kafka01 data]# mv kafka_2.12-2.5.0 kafka


修改配置文件

主要关注:server.properties 这个文件即可,我们可以发现在目录下:

有很多文件,这里可以发现有Zookeeper文件,我们可以根据Kafka内带的zk集群来启动,但是建议使用独立的zk集群

[root@kafka01 kafka]# ll config/
total 72
-rw-r--r-- 1 root root  906 Apr  8  2020 connect-console-sink.properties
-rw-r--r-- 1 root root  909 Apr  8  2020 connect-console-source.properties
-rw-r--r-- 1 root root 5321 Apr  8  2020 connect-distributed.properties
-rw-r--r-- 1 root root  883 Apr  8  2020 connect-file-sink.properties
-rw-r--r-- 1 root root  881 Apr  8  2020 connect-file-source.properties
-rw-r--r-- 1 root root 2247 Apr  8  2020 connect-log4j.properties
-rw-r--r-- 1 root root 2540 Apr  8  2020 connect-mirror-maker.properties
-rw-r--r-- 1 root root 2262 Apr  8  2020 connect-standalone.properties
-rw-r--r-- 1 root root 1221 Apr  8  2020 consumer.properties
-rw-r--r-- 1 root root 4675 Apr  8  2020 log4j.properties
-rw-r--r-- 1 root root 1925 Apr  8  2020 producer.properties
-rw-r--r-- 1 root root 6849 Apr  8  2020 server.properties
-rw-r--r-- 1 root root 1032 Apr  8  2020 tools-log4j.properties
-rw-r--r-- 1 root root 1169 Apr  8  2020 trogdor.conf
-rw-r--r-- 1 root root 1205 Apr  8  2020 zookeeper.properties

Server.properties配置文件说明

broker.id不得重复

[root@kafka01 config]# egrep -vn "^$|#" server.properties
broker.id=0                    #当前机器在集群中的唯一标识,和zookeeper的myid性质一样
delete.topic.enable=true   #删除topic功能使用
port=19092                    #当前kafka对外提供服务的端口默认是9092
host.name=192.168.1.8         #这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问题。
num.network.threads=3             #这个是borker进行网络处理的线程数
num.io.threads=8                      #这个是borker进行I/O处理的线程数

log.dirs=/data/kafka/kafkadata  #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个

socket.send.buffer.bytes=102400  #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能

socket.receive.buffer.bytes=102400      #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
socket.request.max.bytes=104857600  #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
num.partitions=1                                  #默认的分区数,一个topic默认1个分区数
log.retention.hours=168                       #默认消息的最大持久化时间,168小时,7天
message.max.byte=5242880                #消息保存的最大值5M
default.replication.factor=2                  #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务
replica.fetch.max.bytes=5242880         #取消息的最大直接数
log.segment.bytes=1073741824          #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
log.retention.check.interval.ms=300000                              #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除
log.cleaner.enable=false #是否启用log压缩,一般不用启用,启用的话可以提高性能
zookeeper.connect=kafka01:2181,kafka02:2181,kafka03:2181  #设置zookeeper的连接端口

配置环境变量

[root@kafka01 config]# vim /etc/profile
export KAFKA_HOME=/data/kafka/
export PATH=$PATH:$KAFKA_HOME/bin


[root@kafka01 config]# source /etc/profile

发送配置文件,主要把kafka配置文件里的broker号改了

[root@kafka01 kafka]# scp -rp /data/kafka root@kafka02:/data
[root@kafka01 kafka]# scp -rp /data/kafka root@kafka03:/data
[root@kafka01 kafka]# scp -rp /etc/profile root@kafka03:/etc/
root@kafka03's password: 
profile                                                                               100% 2159   557.8KB/s   00:00    
[root@kafka01 kafka]# scp -rp /etc/profile root@kafka02:/etc/
root@kafka02's password: 
profile

启动kafka集群

[root@kafka01 bin]# cd /data/kafka/bin 
[root@kafka01 bin]# ./kafka-server-start.sh -daemon ../config/server.properties

[root@kafka02 bin]# cd /data/kafka/bin
[root@kafka02 bin]# ./kafka-server-start.sh -daemon ../config/server.properties

[root@kafka03 bin]# cd /data/kafka/bin 
[root@kafka03 bin]# ./kafka-server-start.sh -daemon ../config/server.properties

检测是否启动

[root@kafka01 bin]# jps
10033 QuorumPeerMain
16167 Jps
16015 Kafka

 

kafka常用的命令

kafka-producer-perf-test.sh 命令是生产者做集群压力测试的命令

kafka-consumer-perf-test.sh 命令是消费者做集群压力测试的命令

启动/关闭脚本(没秘钥容易出问题)

#!/bin/bash
kafka01=192.168.1.8
kafka02=192.168.1.9
kafka03=192.168.1.10
case $1 in
        "start"){
                for i in kafka01 kafka02 kafka03
                do
                        echo "*********$i*********"
                        ssh $i "/data/kafka/bin/kafka-server-start.sh -daemon /data/kafka/config/server.properties"
                done
        };;
        "stop"){
                for i in kafka01 kafka02 kafka03
                do
                        echo "*********$i*********"
                        ssh $i "/data/kafka/bin/kafka-server-stop.sh"
                done
        };;
esac

 

Kafka命令行操作

1.查看当前服务器中的所有topic

[root@kafka01 bin]# kafka-topics.sh --zookeeper kafka01:2181 --list

 

2.创建topic

[root@kafka01 bin]# kafka-topics.sh --zookeeper kafka01:2181 --create --replication-factor 3 --partitions 1 --topic first

 

报错了

[root@kafka02 bin]# sh kafka-topics.sh --zookeeper kafka02:2181 --create --replication-factor 2 --partitions 2 --topic first
Error while executing topic command : Replication factor: 2 larger than available brokers: 1.

原因:是因为replication-factor(topic副本)个数不能超过broker(服务器)的个数。这里broker只有1个,而replication-factor 设置为2,所以会报错。

--topic   主题的名称

--replication 副本数量

--partitions 分区数量

发现是有些broker没有启kafka,再次创建成功了

[root@kafka01 bin]# sh kafka-topics.sh --zookeeper kafka01:2181 --create --replication-factor 2 --partitions 2 --topic first
Created topic first.

 

可以看到在一个kafka上创建了主题以后,每台都可以看到主题-分区号

 

3.删除主题命令

[root@kafka03 kafka]# sh bin/kafka-topics.sh --delete --zookeeper kafka01:2181 --topic first
Topic first is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

查看是否删除了主题

[root@kafka03 kafka]# sh bin/kafka-topics.sh --zookeeper kafka01:2181 --list
[root@kafka03 kafka]# 

 

4.生产者连接主题

[root@kafka01 kafka]# sh bin/kafka-console-producer.sh --topic first --broker-list kafka01:19092

 

5.消费者订阅主题

[root@kafka02 kafka]# sh bin/kafka-console-consumer.sh --bootstrap-server kafka01:19092 --topic first --from-beginning

 

//--zookeeper空格  :2181
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --zookeeper :2181 --topic test --from-beginning

 

 

 

 

继续阅读
weinxin
我的微信
这是我的微信扫一扫
  • 文本由 发表于 2021年2月21日18:38:31
  • 除非特殊声明,本站文章均为原创,转载请务必保留本文链接
VSFTPd 软件管理

VSFTPd

VSFTPd vsFTPd的软件信息 服务端软件名:vsftpd 客户端软件名:ftp 服务名:vsftpd 端口号:20,21,地址范围内随机端口     vsFTP是linux...
匿名

发表评论

匿名网友 填写信息

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen: