博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
分布式消息订阅发布系统Apache Kafka本机环境搭建和简单测试
阅读量:7110 次
发布时间:2019-06-28

本文共 8571 字,大约阅读时间需要 28 分钟。

hot3.png

kafka是什么

一言以蔽之:分布式、高吞吐量 的 订阅、发布 消息系统

kafka的四方:producer     broker      consumer                  由 zookeeper协调转发

安装搭建:  

本机环境描述

本机ip:10.10.113.120

操作系统:Linux bogon 2.6.32-358.23.2.el6.i686 #1 SMP Wed Oct 16 17:21:31 UTC 2013 i686 i686 i386 GNU/Linux,CentOS release 6.4 (Final)

节点情况:zookeeper-1个 ,broker-3个

测试点:

  • 添加 topic 
  • producer 产生消息, 消费者 消费的
  • 当非leader broker挂掉时 生产消费情况
  • 一个leader broker 挂掉时 生产消费情况

关键步骤

1、下载编译好的    我试过该包提供的 window下bat处理文件,总是提示 classnotfound异常。于是转到centos下。

2、jdk安装什么的自行解决

3、解压后的目录结构

drwxr-xr-x. 3 xxx xxx  4096 Jun 13 10:01 bindrwxr-xr-x. 2 xxx xxx  4096 Jun 12 21:30 configdrwxr-xr-x. 2 xxx xxx  4096 Apr 23 03:26 libs-rw-rw-r--. 1 xxx xxx 11358 Apr 23 02:37 LICENSE -rw-rw-r--. 1 xxx xxx   162 Apr 23 02:37 NOTICE
4、kafka会用到hostname,所以
需要修改操作系统的hostname,否者后面执行kafka的shell命令时会报 unknownhostname的异常
centos的hostname修改方式:
#1 root权限hostname bogon
#2 修改/etc/hosts127.0.0.1       localhost.localdomain localhost::1             localhost6.localdomain6 localhost610.10.113.120 bogon bogon
#3 修改/etc/sysconfig/networkNETWORKING=yesNETWORKING_IPV6=noHOSTNAME=bogon
5、 jvm参数中-XX标识的是实验性参数,kafka用了很多用来优化运行的jvm参数,而你安装的 jdk所带的jvm不一定支持这些参数,比如:
-XX:+UseCompressedOops
如果你遇到
Unrecognized VM option '+UseCompressedOops'
的错误,请
在bin/
kafka-run-class.sh中移除相关参数

上述两个步骤比较重要。

6、配置broker server的参数 config/server1.properties

# 关键的四个 配置broker.id=1port=9092host.name=10.10.113.120zookeeper.connect=10.10.113.120:2181
server2.properties 

# 关键的四个 配置broker.id=2port=9093host.name=10.10.113.120zookeeper.connect=10.10.113.120:2181
server3.properties 

# 关键的四个 配置broker.id=3port=9094host.name=10.10.113.120zookeeper.connect=10.10.113.120:2181

测试实验

在解压包的bin同级目录,编写启动zookeeper 和 broker server 的startall.sh,赋予执行权限

#!/bin/bash#start zookeeper servernohup bin/zookeeper-server-start.sh config/zookeeper.properties &#start 3 broker servers JMX_PORT=9997 bin/kafka-server-start.sh config/server1.properties &JMX_PORT=9998 bin/kafka-server-start.sh config/server2.properties &JMX_PORT=9999 bin/kafka-server-start.sh config/server3.properties &
运行 startall.sh,进行测试实验,执行脚本的命令行的使用方法和用 --help显示,每次更新命令行参数都有些改变

1、添加3个topic

bin/kafka-topics.sh --create --topic funckXXX1 --partitions 1 --replication-factor 3  --zookeeper 10.10.113.120:2181bin/kafka-topics.sh --create --topic funckXXX2 --partitions 1 --replication-factor 3  --zookeeper 10.10.113.120:2181bin/kafka-topics.sh --create --topic funckXXX3 --partitions 1 --replication-factor 3  --zookeeper 10.10.113.120:2181
查看已添加的topic
bin/kafka-topics.sh --create --describe --zookeeper 10.10.113.120:2181
如,主broker 为 Leader 2
Topic:funckXXX1 PartitionCount:1        ReplicationFactor:3     Configs:Topic: funckXXX1        Partition: 0    Leader: 2       Replicas: 2,3,1 Isr: 2,3,1Topic: funckXXX2 PartitionCount:1        ReplicationFactor:3     Configs:Topic: funckXXX2        Partition: 0    Leader: 2       Replicas: 2,3,1 Isr: 2,3,1Topic: funckXXX3 PartitionCount:1        ReplicationFactor:3     Configs:Topic: funckXXX3        Partition: 0    Leader: 2       Replicas: 2,3,1 Isr: 2,3,1
说明:
 partiton: partion id,由于此处只有一个partition,因此partition id 为0 leader:
当前负责读写的lead broker id relicas:
当前partition的所有replication broker list isr:
relicas的子集,只包含出于活动状态的broker

2、启动消费者控制台

看看某个topic的消费情况

bin/kafka-console-consumer.sh --topic fuckXXX1 --zookeeper 10.10.113.120:2181 --from-beginning

3、添加消息到 top观察 消费者的消费情况

启动生产者控制台

bin/kafka-console-producer.sh --topic fuckXXX1 --broker-list 10.10.113.120:9092,10.10.113.120:9093,10.10.113.120:9094

在其中输入一些消息 回车,如:

[bogon@bogon kafaka0811]$ bin/kafka-console-producer.sh --topic fuckXXX1 --broker-list 10.10.113.120:9092,110.113.120:9093,10.10.113.120:9094SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".SLF4J: Defaulting to no-operation (NOP) logger implementationSLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.message1message2message3

那么在消费者端应该有动作:

[bogon@bogon kafaka0811]$ bin/kafka-console-consumer.sh --zookeeper 10.10.113.120:2181 --from-beginning --topic fuckXXX1SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".SLF4J: Defaulting to no-operation (NOP) logger implementationSLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.[2014-06-13 15:55:16,203] INFO Closing socket connection to /10.10.113.120. (kafka.network.Processor)[2014-06-13 15:56:12,871] INFO Topic creation {"version":1,"partitions":{"1":[3],"0":[2]}} (kafka.admin.AdminUtils$)[2014-06-13 15:56:12,879] INFO [KafkaApi-3] Auto creation of topic fuckXXX1 with 2 partitions and replication factor 1 is successful! (kafka.server.KafkaApis)[2014-06-13 15:56:12,954] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions [fuckXXX1,0] (kafka.server.ReplicaFetcherManager)[2014-06-13 15:56:12,958] INFO [ReplicaFetcherManager on broker 3] Removed fetcher for partitions [fuckXXX1,1] (kafka.server.ReplicaFetcherManager)[2014-06-13 15:56:12,973] INFO Completed load of log fuckXXX1-1 with log end offset 0 (kafka.log.Log)[2014-06-13 15:56:12,985] INFO Created log for partition [fuckXXX1,1] in /tmp/kafka-broke3-logs with properties {segment.index.bytes -> 10485760, file.delete.delay.ms -> 60000, segment.bytes -> 536870912, flush.ms -> 9223372036854775807, delete.retention.ms -> 86400000, index.interval.bytes -> 4096, retention.bytes -> -1, cleanup.policy -> delete, segment.ms -> 604800000, max.message.bytes -> 1000012, flush.messages -> 9223372036854775807, min.cleanable.dirty.ratio -> 0.5, retention.ms -> 604800000}. (kafka.log.LogManager)[2014-06-13 15:56:12,986] WARN Partition [fuckXXX1,1] on broker 3: No checkpointed highwatermark is found for partition [fuckXXX1,1] (kafka.cluster.Partition)[2014-06-13 15:56:12,997] INFO Completed load of log fuckXXX1-0 with log end offset 0 (kafka.log.Log)[2014-06-13 15:56:13,017] INFO Created log for partition [fuckXXX1,0] in /tmp/kafka-broke2-logs with properties {segment.index.bytes -> 10485760, file.delete.delay.ms -> 60000, segment.bytes -> 536870912, flush.ms -> 9223372036854775807, delete.retention.ms -> 86400000, index.interval.bytes -> 4096, retention.bytes -> -1, cleanup.policy -> delete, segment.ms -> 604800000, max.message.bytes -> 1000012, flush.messages -> 9223372036854775807, min.cleanable.dirty.ratio -> 0.5, retention.ms -> 604800000}. (kafka.log.LogManager)[2014-06-13 15:56:13,021] WARN Partition [fuckXXX1,0] on broker 2: No checkpointed highwatermark is found for partition [fuckXXX1,0] (kafka.cluster.Partition)[2014-06-13 15:56:13,059] INFO Closing socket connection to /10.10.113.120. (kafka.network.Processor)[2014-06-13 15:56:13,288] INFO Closing socket connection to /10.10.113.120. (kafka.network.Processor)[2014-06-13 15:56:13,914] INFO Closing socket connection to /10.10.113.120. (kafka.network.Processor)message1message2message3

4、杀掉非leaderbroker 观察

查看所有java进程   : ps axf | grep java

然后kill掉除Leader 标识之外的任一  broker的进程

pkill -9 -f server1.properties

此时topic信息为:

[bogon@bogon kafaka0811]$ bin/kafka-topics.sh --describe  --zookeeper 10.10.113.120:2181Topic:fuckXXX1  PartitionCount:2        ReplicationFactor:1     Configs:Topic: fuckXXX1 Partition: 0    Leader: 2       Replicas: 2     Isr: 2Topic: fuckXXX1 Partition: 1    Leader: 3       Replicas: 3     Isr: 3Topic:funckXXX1 PartitionCount:1        ReplicationFactor:3     Configs:Topic: funckXXX1        Partition: 0    Leader: 2       Replicas: 2,3,1 Isr: 2,3Topic:funckXXX2 PartitionCount:1        ReplicationFactor:3     Configs:Topic: funckXXX2        Partition: 0    Leader: 2       Replicas: 2,3,1 Isr: 2,3Topic:funckXXX3 PartitionCount:1        ReplicationFactor:3     Configs:Topic: funckXXX3        Partition: 0    Leader: 2       Replicas: 2,3,1 Isr: 2,3

此时生产和消费消息依旧正常,请自己发布消息

5、杀掉leader broker 观察

杀掉主broker的进程,然后连续观察 topic详情

pkill -9 -f server2.properties
topic为,可看出 Leader 已经改变为3了

[bogon@bogon kafaka0811]$ bin/kafka-topics.sh --describe  --zookeeper 10.10.113.120:2181Topic:fuckXXX1  PartitionCount:2        ReplicationFactor:1     Configs:Topic: fuckXXX1 Partition: 0    Leader: -1      Replicas: 2     Isr: Topic: fuckXXX1 Partition: 1    Leader: 3       Replicas: 3     Isr: 3Topic:funckXXX1 PartitionCount:1        ReplicationFactor:3     Configs:Topic: funckXXX1        Partition: 0    Leader: 3       Replicas: 2,3,1 Isr: 3Topic:funckXXX2 PartitionCount:1        ReplicationFactor:3     Configs:Topic: funckXXX2        Partition: 0    Leader: 3       Replicas: 2,3,1 Isr: 3Topic:funckXXX3 PartitionCount:1        ReplicationFactor:3     Configs:Topic: funckXXX3        Partition: 0    Leader: 3       Replicas: 2,3,1 Isr: 3

此时生产和消费消息依旧正常,请自己发布消息

TODO:生产者消费者客户端功能编写

转载于:https://my.oschina.net/jingxing05/blog/279052

你可能感兴趣的文章
[leetcode-532-K-diff Pairs in an Array]
查看>>
iTextSharp 生成PDF
查看>>
基于胜任力模型为集团企业构建动态信息安全培训课程体系
查看>>
PHP的项目-毕业生论文选题系统
查看>>
yii2 url 美化参数
查看>>
20151130test->20160530
查看>>
Android学习笔记(二三): 多页显示-Flipper的使用
查看>>
天梯赛 - L2-001 紧急救援
查看>>
图片的画画板
查看>>
几种清理IE缓存的方式
查看>>
android 通过uri获取bitmap图片并压缩
查看>>
2015.3.31不使用debug/X86文件夹方式解决64/32位问题
查看>>
使用bitsadmin.exe 下载文件,配合bcn.bat玩出更多的花样~~
查看>>
Chrome浏览器 离线安装包
查看>>
Linux命令: 查找文件中的字符串
查看>>
JAVA高精度3_比较大小
查看>>
JavaScript内置对象--Math对象
查看>>
android通过wifi连接经常超时
查看>>
Javaweb学习笔记——(二十四)——————图书商城项目
查看>>
hadoop生态搭建(3节点)-11.storm配置
查看>>