Hadoop生态圈-Kafka的完全分布式部署
作者:尹正杰
版权声明:原创作品,谢绝转载!否则将追究法律责任。
本篇博客主要内容就是搭建Kafka完全分布式,它是在kafka本地模式(https://www.cnblogs.com/yinzhengjie/p/9209058.html)的基础之上进一步实现完全分布式搭建过程。
一.试验环境
试验环境共计4台服务器
1>.管理服务器(s101)
2>.Kafka节点二(s102,已经部署好了zookeeper服务)
3>.Kafka节点三(s103,已经部署好了zookeeper服务)
4>.Kafka节点四(s104,已经部署好了zookeeper服务)
二.kafka完全分布式部署
1>.将kafka加压后的安装包发送到其他节点(s102,s103,s104)
[yinzhengjie@s101 data]$ more `which xrsync.sh`#!/bin/bash#@author :yinzhengjie#blog:http://www.cnblogs.com/yinzhengjie#EMAIL:y1053419035@qq.com#判断用户是否传参if [ $# -lt 1 ];then echo "请输入参数"; exitfi#获取文件路径file=$@#获取子路径filename=`basename $file`#获取父路径dirpath=`dirname $file`#获取完整路径cd $dirpathfullpath=`pwd -P`#同步文件到DataNodefor (( i=102;i<=105;i++ ))do #使终端变绿色 tput setaf 2 echo =========== s$i %file =========== #使终端变回原来的颜色,即白灰色 tput setaf 7 #远程执行命令 rsync -lr $filename `whoami`@s$i:$fullpath #判断命令是否执行成功 if [ $? == 0 ];then echo "命令执行成功" fidone[yinzhengjie@s101 data]$
[yinzhengjie@s101 data]$ xrsync.sh /soft/kafka=========== s102 %file ===========命令执行成功=========== s103 %file ===========命令执行成功=========== s104 %file ===========命令执行成功=========== s105 %file ===========命令执行成功[yinzhengjie@s101 data]$
[yinzhengjie@s101 data]$ xrsync.sh /soft/kafka_2.11-1.1.0/=========== s102 %file ===========命令执行成功=========== s103 %file ===========命令执行成功=========== s104 %file ===========命令执行成功=========== s105 %file ===========命令执行成功[yinzhengjie@s101 data]$
2>.分发环境变量
[yinzhengjie@s101 data]$ su Password: [root@s101 data]# xrsync.sh /etc/profile=========== s102 %file ===========命令执行成功=========== s103 %file ===========命令执行成功=========== s104 %file ===========命令执行成功=========== s105 %file ===========命令执行成功[root@s101 data]# exit exit[yinzhengjie@s101 data]$
3>.修zk节点的改配置文件
[yinzhengjie@s102 ~]$ grep broker.id /soft/kafka/config/server.propertiesbroker.id=102[yinzhengjie@s102 ~]$ grep listeners /soft/kafka/config/server.properties | grep -v ^#listeners=PLAINTEXT://s102:9092[yinzhengjie@s102 ~]$
[yinzhengjie@s103 ~]$ grep broker.id /soft/kafka/config/server.propertiesbroker.id=103[yinzhengjie@s103 ~]$ grep listeners /soft/kafka/config/server.properties | grep -v ^#listeners=PLAINTEXT://s103:9092[yinzhengjie@s103 ~]$ [yinzhengjie@s103 ~]$
[yinzhengjie@s104 ~]$ grep broker.id /soft/kafka/config/server.propertiesbroker.id=104[yinzhengjie@s104 ~]$ [yinzhengjie@s104 ~]$ grep listeners /soft/kafka/config/server.properties | grep -v ^#listeners=PLAINTEXT://s104:9092[yinzhengjie@s104 ~]$
4>.进入zookeeper客户端并删除zk的kafka节点数据
[yinzhengjie@s104 ~]$ zkCli.sh Connecting to localhost:21812018-06-21 01:23:54,935 [myid:] - INFO [main:Environment@100] - Client environment:zookeeper.version=3.4.12-e5259e437540f349646870ea94dc2658c4e44b3b, built on 03/27/2018 03:55 GMT2018-06-21 01:23:54,938 [myid:] - INFO [main:Environment@100] - Client environment:host.name=s1042018-06-21 01:23:54,938 [myid:] - INFO [main:Environment@100] - Client environment:java.version=1.8.0_1312018-06-21 01:23:54,940 [myid:] - INFO [main:Environment@100] - Client environment:java.vendor=Oracle Corporation2018-06-21 01:23:54,940 [myid:] - INFO [main:Environment@100] - Client environment:java.home=/soft/jdk1.8.0_131/jre2018-06-21 01:23:54,940 [myid:] - INFO [main:Environment@100] - Client environment:java.class.path=/soft/zk/bin/../build/classes:/soft/zk/bin/../build/lib/*.jar:/soft/zk/bin/../lib/slf4j-log4j12-1.7.25.jar:/soft/zk/bin/../lib/slf4j-api-1.7.25.jar:/soft/zk/bin/../lib/netty-3.10.6.Final.jar:/soft/zk/bin/../lib/log4j-1.2.17.jar:/soft/zk/bin/../lib/jline-0.9.94.jar:/soft/zk/bin/../lib/audience-annotations-0.5.0.jar:/soft/zk/bin/../zookeeper-3.4.12.jar:/soft/zk/bin/../src/java/lib/*.jar:/soft/zk/bin/../conf:2018-06-21 01:23:54,940 [myid:] - INFO [main:Environment@100] - Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib2018-06-21 01:23:54,940 [myid:] - INFO [main:Environment@100] - Client environment:java.io.tmpdir=/tmp2018-06-21 01:23:54,940 [myid:] - INFO [main:Environment@100] - Client environment:java.compiler=2018-06-21 01:23:54,940 [myid:] - INFO [main:Environment@100] - Client environment:os.name=Linux2018-06-21 01:23:54,941 [myid:] - INFO [main:Environment@100] - Client environment:os.arch=amd642018-06-21 01:23:54,941 [myid:] - INFO [main:Environment@100] - Client environment:os.version=3.10.0-327.el7.x86_642018-06-21 01:23:54,941 [myid:] - INFO [main:Environment@100] - Client environment:user.name=yinzhengjie2018-06-21 01:23:54,941 [myid:] - INFO [main:Environment@100] - Client environment:user.home=/home/yinzhengjie2018-06-21 01:23:54,941 [myid:] - INFO [main:Environment@100] - Client environment:user.dir=/home/yinzhengjie2018-06-21 01:23:54,942 [myid:] - INFO [main:ZooKeeper@441] - Initiating client connection, connectString=localhost:2181 sessionTimeout=30000 watcher=org.apache.zookeeper.ZooKeeperMain$MyWatcher@277050dcWelcome to ZooKeeper!JLine support is enabled2018-06-21 01:23:54,973 [myid:] - INFO [main-SendThread(localhost:2181):ClientCnxn$SendThread@1028] - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)2018-06-21 01:23:55,031 [myid:] - INFO [main-SendThread(localhost:2181):ClientCnxn$SendThread@878] - Socket connection established to localhost/127.0.0.1:2181, initiating session2018-06-21 01:23:55,049 [myid:] - INFO [main-SendThread(localhost:2181):ClientCnxn$SendThread@1302] - Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x6800003ae7350004, negotiated timeout = 30000WATCHER::WatchedEvent state:SyncConnected type:None path:null[zk: localhost:2181(CONNECTED) 0] ls / [a, cluster, controller, brokers, zookeeper, yarn-leader-election, hadoop-ha, admin, isr_change_notification, log_dir_event_notification, controller_epoch, consumers, latest_producer_id_block, config, hbase][zk: localhost:2181(CONNECTED) 1] rmr /controller /brokers /admin /controller_epoch /consumers /latest_producer_id_block /config /isr_change_notification /cluster /log_dir_event_notification [zk: localhost:2181(CONNECTED) 2]
5>.分别启动s102-s104的kafka
[yinzhengjie@s102 ~]$ kafka-server-start.sh -daemon /soft/kafka/config/server.properties[yinzhengjie@s102 ~]$
[yinzhengjie@s103 ~]$ kafka-server-start.sh -daemon /soft/kafka/config/server.properties[yinzhengjie@s103 ~]$
[yinzhengjie@s104 ~]$ kafka-server-start.sh -daemon /soft/kafka/config/server.properties[yinzhengjie@s104 ~]$
6>.创建主题
[yinzhengjie@s104 ~]$ kafka-topics.sh --zookeeper s102:2181 --listyinzhengjie[yinzhengjie@s104 ~]$
[yinzhengjie@s104 ~]$ kafka-topics.sh --zookeeper s104:2181 --create --partitions 2 --replication-factor 1 --topic yzjCreated topic "yzj".[yinzhengjie@s104 ~]$
7>.在任意zk节点开启控制台生产者(例如:在s102上)
[yinzhengjie@s102 ~]$ kafka-server-start.sh -daemon /soft/kafka/config/server.properties[yinzhengjie@s102 ~]$ kafka-console-producer.sh --broker-list s104:9092 --topic yzj>尹正杰到此一游!>
8>.在任意zk节点开启控制台消费者(例如:在s103上)
[yinzhengjie@s103 ~]$ kafka-console-consumer.sh --zookeeper s102:2181 --topic yzj --from-beginningUsing the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].尹正杰到此一游!
三.编写kafka启动脚本(“/usr/local/bin/xkafka.sh”,别忘记添加执行权限,而且需要你提前配置好秘钥对哟!)
[yinzhengjie@s101 ~]$ more /usr/local/bin/xkafka.sh #!/bin/bash#@author :yinzhengjie#blog:http://www.cnblogs.com/yinzhengjie#EMAIL:y1053419035@qq.com#判断用户是否传参if [ $# -ne 1 ];then echo "无效参数,用法为: $0 {start|stop}" exitfi#获取用户输入的命令cmd=$1for (( i=102 ; i<=104 ; i++ )) ; do tput setaf 2 echo ========== s$i $cmd ================ tput setaf 9 case $cmd in start) ssh s$i "source /etc/profile ; kafka-server-start.sh -daemon /soft/kafka/config/server.properties" echo s$i "服务已启动" ;; stop) ssh s$i "source /etc/profile ; kafka-server-stop.sh" echo s$i "服务已停止" ;; *) echo "无效参数,用法为: $0 {start|stop}" exit ;; esacdone[yinzhengjie@s101 ~]$ sudo chmod a+x /usr/local/bin/xkafka.sh [yinzhengjie@s101 ~]$ [yinzhengjie@s101 ~]$ ll /usr/local/bin/xkafka.sh -rwxr-xr-x 1 root root 742 Jun 21 02:02 /usr/local/bin/xkafka.sh[yinzhengjie@s101 ~]$