kafka是什么
一言以蔽之:分布式、高吞吐量 的 订阅、发布 消息系统
kafka的四方:producer broker consumer 由 zookeeper协调转发
安装搭建:
本机环境描述
本机ip:10.10.113.120
操作系统:Linux bogon 2.6.32-358.23.2.el6.i686 #1 SMP Wed Oct 16 17:21:31 UTC 2013 i686 i686 i386 GNU/Linux,CentOS release 6.4 (Final)
节点情况:zookeeper-1个 ,broker-3个
测试点:
- 添加 topic
- producer 产生消息, 消费者 消费的
- 当非leader broker挂掉时 生产消费情况
- 一个leader broker 挂掉时 生产消费情况
关键步骤
1、下载编译好的 我试过该包提供的 window下bat处理文件,总是提示 classnotfound异常。于是转到centos下。
2、jdk安装什么的自行解决
3、解压后的目录结构
drwxr-xr-x. 3 xxx xxx 4096 Jun 13 10:01 bindrwxr-xr-x. 2 xxx xxx 4096 Jun 12 21:30 configdrwxr-xr-x. 2 xxx xxx 4096 Apr 23 03:26 libs-rw-rw-r--. 1 xxx xxx 11358 Apr 23 02:37 LICENSE -rw-rw-r--. 1 xxx xxx 162 Apr 23 02:37 NOTICE4、kafka会用到hostname,所以 需要修改操作系统的hostname,否者后面执行kafka的shell命令时会报 unknownhostname的异常 centos的hostname修改方式:
#1 root权限hostname bogon
#2 修改/etc/hosts127.0.0.1 localhost.localdomain localhost::1 localhost6.localdomain6 localhost610.10.113.120 bogon bogon
#3 修改/etc/sysconfig/networkNETWORKING=yesNETWORKING_IPV6=noHOSTNAME=bogon5、 jvm参数中-XX标识的是实验性参数,kafka用了很多用来优化运行的jvm参数,而你安装的 jdk所带的jvm不一定支持这些参数,比如: -XX:+UseCompressedOops 如果你遇到
Unrecognized VM option '+UseCompressedOops'的错误,请 在bin/ kafka-run-class.sh中移除相关参数
上述两个步骤比较重要。
6、配置broker server的参数 config/server1.properties
# 关键的四个 配置broker.id=1port=9092host.name=10.10.113.120zookeeper.connect=10.10.113.120:2181server2.properties
# 关键的四个 配置broker.id=2port=9093host.name=10.10.113.120zookeeper.connect=10.10.113.120:2181server3.properties
# 关键的四个 配置broker.id=3port=9094host.name=10.10.113.120zookeeper.connect=10.10.113.120:2181
测试实验
在解压包的bin同级目录,编写启动zookeeper 和 broker server 的startall.sh,赋予执行权限
#!/bin/bash#start zookeeper servernohup bin/zookeeper-server-start.sh config/zookeeper.properties &#start 3 broker servers JMX_PORT=9997 bin/kafka-server-start.sh config/server1.properties &JMX_PORT=9998 bin/kafka-server-start.sh config/server2.properties &JMX_PORT=9999 bin/kafka-server-start.sh config/server3.properties &运行 startall.sh,进行测试实验,执行脚本的命令行的使用方法和用 --help显示,每次更新命令行参数都有些改变
1、添加3个topic
bin/kafka-topics.sh --create --topic funckXXX1 --partitions 1 --replication-factor 3 --zookeeper 10.10.113.120:2181bin/kafka-topics.sh --create --topic funckXXX2 --partitions 1 --replication-factor 3 --zookeeper 10.10.113.120:2181bin/kafka-topics.sh --create --topic funckXXX3 --partitions 1 --replication-factor 3 --zookeeper 10.10.113.120:2181查看已添加的topic
bin/kafka-topics.sh --create --describe --zookeeper 10.10.113.120:2181如,主broker 为 Leader 2
Topic:funckXXX1 PartitionCount:1 ReplicationFactor:3 Configs:Topic: funckXXX1 Partition: 0 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1Topic: funckXXX2 PartitionCount:1 ReplicationFactor:3 Configs:Topic: funckXXX2 Partition: 0 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1Topic: funckXXX3 PartitionCount:1 ReplicationFactor:3 Configs:Topic: funckXXX3 Partition: 0 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
说明: partiton: partion id,由于此处只有一个partition,因此partition id 为0 leader: 当前负责读写的lead broker id relicas: 当前partition的所有replication broker list isr: relicas的子集,只包含出于活动状态的broker
2、启动消费者控制台
看看某个topic的消费情况
bin/kafka-console-consumer.sh --topic fuckXXX1 --zookeeper 10.10.113.120:2181 --from-beginning
3、添加消息到 top观察 消费者的消费情况
启动生产者控制台
bin/kafka-console-producer.sh --topic fuckXXX1 --broker-list 10.10.113.120:9092,10.10.113.120:9093,10.10.113.120:9094
在其中输入一些消息 回车,如:
[bogon@bogon kafaka0811]$ bin/kafka-console-producer.sh --topic fuckXXX1 --broker-list 10.10.113.120:9092,110.113.120:9093,10.10.113.120:9094SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".SLF4J: Defaulting to no-operation (NOP) logger implementationSLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.message1message2message3
那么在消费者端应该有动作:
[bogon@bogon kafaka0811]$ bin/kafka-console-consumer.sh --zookeeper 10.10.113.120:2181 --from-beginning --topic fuckXXX1SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".SLF4J: Defaulting to no-operation (NOP) logger implementationSLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.[2014-06-13 15:55:16,203] INFO Closing socket connection to /10.10.113.120. (kafka.network.Processor)[2014-06-13 15:56:12,871] INFO Topic creation {"version":1,"partitions":{"1":[3],"0":[2]}} (kafka.admin.AdminUtils$)[2014-06-13 15:56:12,879] INFO [KafkaApi-3] Auto creation of topic fuckXXX1 with 2 partitions and replication factor 1 is successful! (kafka.server.KafkaApis)[2014-06-13 15:56:12,954] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions [fuckXXX1,0] (kafka.server.ReplicaFetcherManager)[2014-06-13 15:56:12,958] INFO [ReplicaFetcherManager on broker 3] Removed fetcher for partitions [fuckXXX1,1] (kafka.server.ReplicaFetcherManager)[2014-06-13 15:56:12,973] INFO Completed load of log fuckXXX1-1 with log end offset 0 (kafka.log.Log)[2014-06-13 15:56:12,985] INFO Created log for partition [fuckXXX1,1] in /tmp/kafka-broke3-logs with properties {segment.index.bytes -> 10485760, file.delete.delay.ms -> 60000, segment.bytes -> 536870912, flush.ms -> 9223372036854775807, delete.retention.ms -> 86400000, index.interval.bytes -> 4096, retention.bytes -> -1, cleanup.policy -> delete, segment.ms -> 604800000, max.message.bytes -> 1000012, flush.messages -> 9223372036854775807, min.cleanable.dirty.ratio -> 0.5, retention.ms -> 604800000}. (kafka.log.LogManager)[2014-06-13 15:56:12,986] WARN Partition [fuckXXX1,1] on broker 3: No checkpointed highwatermark is found for partition [fuckXXX1,1] (kafka.cluster.Partition)[2014-06-13 15:56:12,997] INFO Completed load of log fuckXXX1-0 with log end offset 0 (kafka.log.Log)[2014-06-13 15:56:13,017] INFO Created log for partition [fuckXXX1,0] in /tmp/kafka-broke2-logs with properties {segment.index.bytes -> 10485760, file.delete.delay.ms -> 60000, segment.bytes -> 536870912, flush.ms -> 9223372036854775807, delete.retention.ms -> 86400000, index.interval.bytes -> 4096, retention.bytes -> -1, cleanup.policy -> delete, segment.ms -> 604800000, max.message.bytes -> 1000012, flush.messages -> 9223372036854775807, min.cleanable.dirty.ratio -> 0.5, retention.ms -> 604800000}. (kafka.log.LogManager)[2014-06-13 15:56:13,021] WARN Partition [fuckXXX1,0] on broker 2: No checkpointed highwatermark is found for partition [fuckXXX1,0] (kafka.cluster.Partition)[2014-06-13 15:56:13,059] INFO Closing socket connection to /10.10.113.120. (kafka.network.Processor)[2014-06-13 15:56:13,288] INFO Closing socket connection to /10.10.113.120. (kafka.network.Processor)[2014-06-13 15:56:13,914] INFO Closing socket connection to /10.10.113.120. (kafka.network.Processor)message1message2message3
4、杀掉非leaderbroker 观察
查看所有java进程 : ps axf | grep java
然后kill掉除Leader 标识之外的任一 broker的进程
pkill -9 -f server1.properties
此时topic信息为:
[bogon@bogon kafaka0811]$ bin/kafka-topics.sh --describe --zookeeper 10.10.113.120:2181Topic:fuckXXX1 PartitionCount:2 ReplicationFactor:1 Configs:Topic: fuckXXX1 Partition: 0 Leader: 2 Replicas: 2 Isr: 2Topic: fuckXXX1 Partition: 1 Leader: 3 Replicas: 3 Isr: 3Topic:funckXXX1 PartitionCount:1 ReplicationFactor:3 Configs:Topic: funckXXX1 Partition: 0 Leader: 2 Replicas: 2,3,1 Isr: 2,3Topic:funckXXX2 PartitionCount:1 ReplicationFactor:3 Configs:Topic: funckXXX2 Partition: 0 Leader: 2 Replicas: 2,3,1 Isr: 2,3Topic:funckXXX3 PartitionCount:1 ReplicationFactor:3 Configs:Topic: funckXXX3 Partition: 0 Leader: 2 Replicas: 2,3,1 Isr: 2,3
此时生产和消费消息依旧正常,请自己发布消息
5、杀掉leader broker 观察
杀掉主broker的进程,然后连续观察 topic详情
pkill -9 -f server2.propertiestopic为,可看出 Leader 已经改变为3了
[bogon@bogon kafaka0811]$ bin/kafka-topics.sh --describe --zookeeper 10.10.113.120:2181Topic:fuckXXX1 PartitionCount:2 ReplicationFactor:1 Configs:Topic: fuckXXX1 Partition: 0 Leader: -1 Replicas: 2 Isr: Topic: fuckXXX1 Partition: 1 Leader: 3 Replicas: 3 Isr: 3Topic:funckXXX1 PartitionCount:1 ReplicationFactor:3 Configs:Topic: funckXXX1 Partition: 0 Leader: 3 Replicas: 2,3,1 Isr: 3Topic:funckXXX2 PartitionCount:1 ReplicationFactor:3 Configs:Topic: funckXXX2 Partition: 0 Leader: 3 Replicas: 2,3,1 Isr: 3Topic:funckXXX3 PartitionCount:1 ReplicationFactor:3 Configs:Topic: funckXXX3 Partition: 0 Leader: 3 Replicas: 2,3,1 Isr: 3
此时生产和消费消息依旧正常,请自己发布消息
TODO:生产者消费者客户端功能编写