hadoop数据库模型(大数据Hadoop之Kafka)
hadoop数据库模型(大数据Hadoop之Kafka)ERROR Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer) kafka.common.InconsistentClusterIdException: The Cluster ID ELIH-KKbRP-NnPnHt4z-lA doesn't match stored clusterId Some(2HC_x7bTR_u2bCxrqw0Otw) in meta.properties. The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong. at kafka.server.KafkaServer.startup(KafkaServer.s
一、概述EFAK(Eagle For Apache KAFKA,以前称为 Kafka Eagle)是一款由国内公司开源的Kafka集群监控系统,可以用来监视kafka集群的broker状态、Topic信息、IO、内存、consumer线程、偏移量等信息,并进行可视化图表展示。独特的KQL还可以通过SQL在线查询kafka中的数据。
源码: htTPS://github.com/smartloli/kafka-eagle/
下载: http://download.kafka-eagle.org/
官方文档:https://www.kafka-eagle.org/articles/docs/documentation.html
EFAK分布式模式部署,这里以5个节点为例子(1个Master和4个Slave),各个节点的角色如下如所示:
三、EFAK数据采集原理对于 Kafka,我们可以收集以下数据
- Kafka broker常用机器加载信息:内存、cpu、IP、版本等。
- 服务监控数据:TPS、QPS、RT等
- 应用程序监控:组、消费者、生产者、主题等。
因为EFAK是kafka的监控系统,所以前提是需先安装Kafka和zookeeper。
四、安装Kafkakafka官网文档:https://kafka.apache.org/documentation/
1)Kafka下载$ cd /opt/bigdata/hadoop/software
$ wget https://dlcdn.apache.org/kafka/3.1.1/kafka_2.13-3.1.1.tgz
$ tar -xf kafka_2.13-3.1.1.tgz -C /opt/bigdata/hadoop/server/
2)配置环境变量
$ vi /etc/profile
export KAFKA_HOME=/opt/bigdata/hadoop/server/kafka_2.13-3.1.1
export PATH=$PATH:$KAFKA_HOME/bin
$ source /etc/profile
3)创建logs目录
$ mkdir $KAFKA_HOME/logs
4)修改kafka配置
$ cd $KAFKA_HOME
# 查看现有配置,去掉空行和注释
$ cat config/server.properties |grep -v '^$\|^#'
$ cat > $KAFKA_HOME/config/server.properties <<EOF
#broker的全局唯一编号,不能重复
broker.id=0
#删除topic功能使能
delete.topic.enable=true
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘IO的现成数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka数据的存储位置
log.dirs=/opt/bigdata/hadoop/server/kafka_2.13-3.1.1/logs
#topic在当前broker上的分区个数
num.partitions=1
#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
#segment文件保留的最长时间,超时将被删除
log.retention.hours=168
#配置连接Zookeeper集群地址
zookeeper.connect=hadoop-node1:12181 hadoop-node2:12181 hadoop-node3:12181
#zookeeper连接超时时间
zookeeper.connection.timeout.ms=60000
EOF
5)修改zookeeper配置
# 创建zookeeper data和logs目录
$ mkdir $KAFKA_HOME/zookeeper_data $KAFKA_HOME/zookeeper_logs
$ vi $KAFKA_HOME/config/zookeeper.properties
# 配置主要修改如下:
#数据目录
dataDir=/opt/bigdata/hadoop/server/kafka_2.13-3.1.1/zookeeper_data
#日志目录
dataLogDir=/opt/bigdata/hadoop/server/kafka_2.13-3.1.1/zookeeper_logs
#心跳间隔时间,zookeeper中使用的基本时间单位,毫秒值。每隔2秒发送一个心跳,session时间tickTime*2
tickTime=2000
#leader与客户端连接超时时间。表示5个心跳间隔
initLimit=5
#Leader与Follower之间的超时时间,表示2个心跳间隔
syncLimit=2
#客户端连接端口,默认端口2181
clientPort=12181
# zookeeper集群配置项,server.1,server.2,server.3是zk集群节点;hadoop-node1 hadoop-node2 hadoop-node3是主机名称;2888是主从通信端口;3888用来选举leader
server.1=hadoop-node1:2888:3888
server.2=hadoop-node2:2888:3888
server.3=hadoop-node3:2888:3888
6)配置Zookeeper myid
# 在hadoop-node1配置如下:
$ echo 1 > $KAFKA_HOME/zookeeper_data/myid
# 在hadoop-node2配置如下:
$ echo 2 > $KAFKA_HOME/zookeeper_data/myid
# 在hadoop-node3配置如下:
$ echo 3 > $KAFKA_HOME/zookeeper_data/myid
7)开启Kafka JMX监控
# 在kafka-server-start.sh文件中添加export JMX_PORT="9988",端口自定义就行
$ vi $KAFKA_HOME/bin/kafka-server-start.sh
重启kafka
$ $KAFKA_HOME/bin/kafka-server-stop.sh ; $KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
【问题】如遇以下报错:
ERROR Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer) kafka.common.InconsistentClusterIdException: The Cluster ID ELIH-KKbRP-NnPnHt4z-lA doesn't match stored clusterId Some(2HC_x7bTR_u2bCxrqw0Otw) in meta.properties. The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong. at kafka.server.KafkaServer.startup(KafkaServer.scala:228) at kafka.Kafka$.main(Kafka.scala:109) at kafka.Kafka.main(Kafka.scala)
【解决】在server.properties找到log.dirs配置的路径。将该路径下的meta.properties文件删除,或者编辑meta.properties文件修改里面的cluster.id即可。
8)将kafka目录推送到其它节点$ scp -r $KAFKA_HOME hadoop-node2:/opt/bigdata/hadoop/server/
$ scp -r $KAFKA_HOME hadoop-node3:/opt/bigdata/hadoop/server/
# 在hadoop-node2和hadoop-node3节点上设置环境变量
$ vi /etc/profile
export KAFKA_HOME=/opt/bigdata/hadoop/server/kafka_2.13-3.1.1
export PATH=$PATH:$KAFKA_HOME/bin
$ source /etc/profile
# 修改advertised.listeners,值改成对应的hostname或者ip
【温馨提示】修改hadoop-node2上server.properties文件的broker.id,设置为1和2,只要不重复就行,advertised.listeners地址改成对应机器IP。
9)启动服务启动zookeeper集群之后再启动kafka集群
$ cd $KAFKA_HOME
# -daemon后台启动
$ ./bin/zookeeper-server-start.sh -daemon ./config/zookeeper.properties
# 默认端口2181,可以在配置自定义,这里修改为12181端口
$ netstat -tnlp|grep 12181
启动Kafka
$ cd $KAFKA_HOME
# 默认端口9092,这里修改成了19092,可以修改listeners和advertised.listeners
$ ./bin/kafka-server-start.sh -daemon ./config/server.properties
$ netstat -tnlp|grep 9092
$ jps
五、安装EFAK1)下载EFAK
$ cd /opt/bigdata/hadoop/software
$ wget https://github.com/smartloli/kafka-eagle-bin/archive/v2.1.0.tar.gz
$ tar -xf kafka-eagle-bin-2.1.0.tar.gz -C /opt/bigdata/hadoop/server/
$ cd /opt/bigdata/hadoop/server/
2)创建数据库
$ mysql -uroot -p
123456
create database ke;
3)设置环境变量
$ vi /etc/profile
export KE_HOME=/opt/bigdata/hadoop/server/kafka-eagle-bin-2.1.0/efak-web-2.1.0
export PATH=$PATH:$KE_HOME/bin
$ source /etc/profile
4)配置
这里设置hadoop-node1为master节点,其它两个节点为slave节点,修改参数
$ vi $KE_HOME/conf/system-config.properties
# Multi zookeeper&kafka cluster list -- The client connection address of the Zookeeper cluster is set here
efak.zk.cluster.alias=cluster1
cluster1.zk.list=hadoop-node1:12181 hadoop-node2:12181 hadoop-node3:12181
######################################
# kafka jmx 地址,默认Apache发布的Kafka基本是这个默认值,
# 对于一些公有云Kafka厂商,它们会修改这个值,
# 比如会将jmxrmi修改为kafka或者是其它的值,
# 若是选择的公有云厂商的Kafka,可以根据实际的值来设置该属性
######################################
cluster1.efak.jmx.uri=service:jmx:rmi:///jndi/rmi://%s/jmxrmi
# Zkcli limit -- Zookeeper cluster allows the number of clients to connect to
# If you enable distributed mode you can set value to 4 or 8
kafka.zk.limit.size=16
# EFAK webui port -- WebConsole port access address
efak.webui.port=8048
######################################
# EFAK enable distributed,启用分布式部署
######################################
efak.distributed.enable=true
# 设置节点类型slave or master
# master worknode set status to master other node set status to slave
efak.cluster.mode.status=master
# deploy efak server address
efak.worknode.master.host=hadoop-node1
efak.worknode.port=8085
# Kafka offset storage -- Offset stored in a Kafka cluster if stored in the zookeeper you can not use this option
cluster1.efak.offset.storage=kafka
# Whether the Kafka performance monitoring diagram is enabled
efak.metrics.charts=true
# EFAK keeps data for 30 days by default
efak.metrics.retain=15
# If offset is out of range occurs enable this property -- Only suitable for kafka sql
efak.sql.fix.error=false
efak.sql.topic.records.max=5000
# Delete kafka topic token -- Set to delete the topic token so that administrators can have the right to delete
efak.topic.token=keadmin
# 关闭自带的sqlite数据库,使用外部的mysql数据库
# Default use sqlite to store data
# efak.driver=org.sqlite.jdbc
# It is important to note that the '/hadoop/kafka-eagle/db' path must be exist.
# efak.url=jdbc:sqlite:/hadoop/kafka-eagle/db/ke.db
# efak.username=root
# efak.password=smartloli
# 配置外部数据库
# (Optional) set mysql address
efak.driver=com.mysql.jdbc.Driver
efak.url=jdbc:mysql://hadoop-node1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
efak.username=root
efak.password=123456
5)调整启动参数
EFAK默认启动内存大小为2G,考虑到服务器情况可以将其调小
## 在 efak 安装目录执行
$ vi $KE_HOME/bin/ke.sh
## 将 KE_JAVA_OPTS 最大最小容量调小,例如:
export KE_JAVA_OPTS="-server -Xmx512m -Xms512m -XX:MaxGCPauseMillis=20 -XX: UseG1GC -XX:MetaspaceSize=128m -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80"
6)修改 works配置
默认是localhost
$ cat >$KE_HOME/conf/works<<EOF
hadoop-node2
hadoop-node3
EOF
7)将EFAK推送其它节点
# hadoop-node1推送
$ scp -r /opt/bigdata/hadoop/server/kafka-eagle-bin-2.1.0 hadoop-node2:/opt/bigdata/hadoop/server/
$ scp -r /opt/bigdata/hadoop/server/kafka-eagle-bin-2.1.0 hadoop-node3:/opt/bigdata/hadoop/server/
# 在hadoop-node2和hadoop-node3配置环境变量修改节点类型为slave
8)启动
上面配置文件配置的是分布式部署,当然也可以单机跑,但是不建议单机跑
# 在master节点上执行
$ cd $KE_HOME/bin
$ chmod x ke.sh
# 单机版启动
$ ke.sh start
# 集群方式启动
$ ke.sh cluster start
$ ke.sh cluster restart
web UI:http://192.168.0.113:8048/
账号密码:admin/123456
【增】添加topic
# 创建topic,1副本,1分区,设置数据过期时间72小时(-1表示不过期),单位ms,72*3600*1000=259200000
$ kafka-topics.sh --create --topic test002 --bootstrap-server hadoop-node1:9092 hadoop-node2:9092 hadoop-node3:9092 --partitions 1 --replication-factor 1 --config retention.ms=259200000
【查】
# 查看topic列表
$ kafka-topics.sh --bootstrap-server hadoop-node1:9092 hadoop-node2:9092 hadoop-node3:9092 --list
# 查看topic列表详情
$ kafka-topics.sh --bootstrap-server hadoop-node1:9092 hadoop-node2:9092 hadoop-node3:9092 --describe
# 指定topic
$ kafka-topics.sh --bootstrap-server hadoop-node1:9092 hadoop-node2:9092 hadoop-node3:9092 --describe --topic test002
# 查看消费者组
$ kafka-consumer-groups.sh --bootstrap-server hadoop-node1:9092 --list
$ kafka-consumer-groups.sh --bootstrap-server hadoop-node1:9092 --describe --group test002
【改】这里主要是修改最常用的三个参数:分区、副本,过期时间
# 修改分区,扩分区,不能减少分区
$ kafka-topics.sh --alter --bootstrap-server hadoop-node1:9092 --topic test002 --partitions 2
# 修改过期时间,下面两行都可以
$ kafka-configs.sh --bootstrap-server hadoop-node1:9092 --alter --topic test002 --add-config retention.ms=86400000
$ kafka-configs.sh --bootstrap-server hadoop-node1:9092 --alter --entity-name test002 --entity-type topics --add-config retention.ms=86400000
# 修改副本数,将副本数修改成3
$ cat >1.json<<EOF
{"version":1
"partitions":[
{"topic":"test002" "partition":0 "replicas":[0 1 2]}
{"topic":"test002" "partition":1 "replicas":[1 2 0]}
{"topic":"test002" "partition":2 "replicas":[2 0 1]}
]}
EOF
$ kafka-topics.sh --bootstrap-server hadoop-node1:9092 hadoop-node2:9092 hadoop-node3:9092 --describe --topic test002
【删】
$ kafka-topics.sh --delete --topic test002 --bootstrap-server hadoop-node1:9092 hadoop-node2:9092 hadoop-node3:9092
【生成者】
$ kafka-console-producer.sh --broker-list hadoop-node1:9092 --topic test002
{"id":"1" "name":"n1" "age":"20"}
{"id":"2" "name":"n2" "age":"21"}
{"id":"3" "name":"n3" "age":"22"}
【消费者】
# 从头开始消费
$ kafka-console-consumer.sh --bootstrap-server hadoop-node1:9092 --topic test002 --from-beginning
# 指定从分区的某个位置开始消费,这里只指定了一个分区,可以多写几行或者遍历对应的所有分区
$ kafka-console-consumer.sh --bootstrap-server hadoop-node1:9092 --topic test002 --partition 0 --offset 100
【消费组】
$ kafka-console-consumer.sh --bootstrap-server hadoop-node1:9092 --topic test002 --group test002
【查看数据积压】
$ kafka-consumer-groups.sh --bootstrap-server hadoop-node1:9092 --describe --group test002
$KE_HOME/bin/ke.sh启动脚本中包含以下命令:
命令 |
描述 |
ke.sh start |
启动 EFAK 服务器。 |
ke.sh status |
查看 EFAK 运行状态。 |
ke.sh stop |
停止 EFAK 服务器。 |
ke.sh restart |
重新启动 EFAK 服务器。 |
ke.sh stats |
查看 linux 操作系统中的 EFAK 句柄数。 |
ke.sh find [ClassName] |
在 jar 中找到类名的位置。 |
ke.sh gc |
查看 EFAK 进程 gc。 |
ke.sh version |
查看 EFAK 版本。 |
ke.sh jdk |
查看 EFAK 安装的 jdk 详细信息。 |
ke.sh sdate |
查看 EFAK 启动日期。 |
ke.sh cluster start |
查看 EFAK 集群分布式启动。 |
ke.sh cluster status |
查看 EFAK 集群分布式状态。 |
ke.sh cluster stop |
查看 EFAK 集群分布式停止。 |
ke.sh cluster restart |
查看 EFAK 集群分布式重启。 |
EFAK环境部署,和kafka的一些简单操作就到这里了,后续会分享KSQL和其它更详细的页面化操作,请小伙伴耐心等待~