在k8s中部署Kafka高可用集群超详细讲解

明明跟你说过 2024-07-01 09:37:01 阅读 73

🐇明明跟你说过:个人主页

🏅个人专栏:《数据流专家:Kafka探索》🏅

🔖行路有良友,便是天堂🔖

目录

一、引言

1、Kafka简介

2、为什么在Kubernetes中部署Kafka

二、Kubernetes基础 

1、Kubernetes概述

2、Pods、Services、Deployments等基本概念

三、Kafka集群架构

1、Kafka集群的组成与工作原理

2、Kafka集群的高可用性设计

四、准备部署环境 

1、准备k8s集群

2、准备StorageClass 

3、准备部署Kafka集群所需的镜像 

五、部署Kafka集群

1、部署Zookeeper集群

2、部署Kafka集群


一、引言

1、Kafka简介

阿帕奇Kafka(Apache Kafka)是一个开源的分布式流处理平台,由LinkedIn开发并于2011年开源,现在是Apache软件基金会的一部分。Kafka的设计初衷是为了处理实时数据流,具备高吞吐量、低延迟、高容错性和可扩展性的特点。

Kafka的核心概念

1. Producer(生产者):

生产者是向Kafka主题(topic)发送消息的应用程序。它们负责将数据推送到Kafka集群。

2. Consumer(消费者):

消费者是从Kafka主题中读取消息的应用程序。它们可以是单个消费者或消费者组(consumer group),每个消费者组中的消费者可以并行地读取数据。

3. Topic(主题):

主题是Kafka中的一个逻辑分类,用于将消息进行分组。每个主题可以细分为多个分区(partition),每个分区内的消息是有序的,但不同分区之间没有顺序保证。

4. Partition(分区):

分区是主题的一个子单元,每个分区可以存储大量消息,并且每个分区可以被多个消费者读取。分区使得Kafka能够水平扩展。

5. Broker(代理):

Broker是Kafka集群中的一个服务器节点,负责接收、存储和发送消息。一个Kafka集群可以包含多个broker,以提供高可用性和容错能力。

6. Zookeeper:

Kafka使用Zookeeper来进行分布式协调和管理集群状态。Zookeeper负责维护配置信息、分区的leader选举以及消费组的offset管理等。

Kafka的主要功能

1. 消息持久化:

Kafka将消息持久化到磁盘,并且允许设置保留策略,确保数据的可靠性。

2. 高吞吐量:

Kafka能够处理高吞吐量的数据流,适合处理大规模的数据流应用。

3. 低延迟:

Kafka的设计优化了数据传输路径,使得消息传输具有低延迟。

4. 扩展性:

Kafka的分区机制允许在集群中增加更多的broker和分区,从而实现水平扩展。

5. 容错性:

通过复制机制,Kafka确保了数据的高可用性,即使个别节点故障,数据仍然能够恢复。

Kafka的应用场景

1. 日志收集和聚合:

作为一种高效的日志收集系统,Kafka可以收集和聚合分布式系统的日志数据。

2. 实时数据流处理:

Kafka与流处理框架(如Apache Flink、Apache Storm等)结合,可以进行实时数据分析和处理。

3. 消息队列:

Kafka可以充当高吞吐量的消息队列系统,用于不同应用程序之间的解耦和异步通信。

4. 事件源:

Kafka可以用作事件源系统,捕获和存储所有变化事件,以便后续处理和回放。

2、为什么在Kubernetes中部署Kafka

弹性和可伸缩性:Kubernetes提供了强大的自动伸缩和调度功能,可以根据负载情况自动扩展Kafka集群的节点数量,以满足不同工作负载的需求。这种弹性能力使得Kafka能够更好地应对数据量的变化和突发性负载。易于管理:Kubernetes提供了统一的管理接口和工具,简化了Kafka集群的部署、扩展、更新和监控。通过Kubernetes的控制面板,管理员可以轻松地管理Kafka集群的生命周期,而无需深入了解底层的部署细节。高可用性:Kubernetes具有自动故障检测和恢复机制,可以在节点故障时自动重新调度Kafka的副本,确保数据的高可用性和持久性。资源隔离:通过Kubernetes的命名空间和资源限制功能,可以实现Kafka集群与其他应用程序之间的资源隔离,避免因资源竞争导致的性能问题。

 

wAAACH5BAEKAAAALAAAAAABAAEAAAICRAEAOw==

​编辑

二、Kubernetes基础 

1、Kubernetes概述

Kubernetes(K8s)是一个开源的容器编排平台,最初由Google开发并于2014年发布,现已成为Cloud Native Computing Foundation(CNCF)的一部分。Kubernetes旨在简化容器化应用程序的部署、管理和扩展,提供了丰富的功能和工具,使得用户可以更轻松地构建和运行分布式系统。

Kubernetes的主要功能

1. 自动化部署和扩展:

Kubernetes提供了丰富的调度和自动伸缩功能,可以根据负载情况自动调度和扩展应用程序的副本数量,以适应不同的工作负载需求。

2. 服务发现和负载均衡:

Kubernetes通过Service对象提供了统一的服务发现和负载均衡功能,使得应用程序可以通过简单的域名访问其他服务,并自动实现负载均衡。

3. 存储管理:

Kubernetes支持多种存储后端和存储卷类型,可以为应用程序提供持久化存储和共享存储功能,如PersistentVolume、PersistentVolumeClaim等。

4. 自我修复:

Kubernetes具有自我修复和健康检查机制,可以监控容器和节点的健康状态,并在出现故障时自动进行修复和恢复。

   

2、Pods、Services、Deployments等基本概念

1. Pods(容器组)

Pod是Kubernetes中最小的可部署单元,它可以包含一个或多个紧密相关的容器。这些容器共享相同的网络命名空间和存储卷,并且通常在同一节点上运行。Pod提供了一种逻辑主机的概念,使得容器之间可以共享资源和通信。Pod通常用于部署一个应用程序的一组相关容器。

2. Services(服务)

服务是Kubernetes中一种抽象,用于定义一组Pod的逻辑集合,并为它们提供统一的访问入口。服务通过标签选择器(Selector)将请求路由到对应的Pod,并实现负载均衡和服务发现功能。Kubernetes支持多种类型的服务,如ClusterIP、NodePort、LoadBalancer和ExternalName。

3. Deployments(部署)

部署是Kubernetes中用于管理Pod副本数量和应用程序版本更新的对象。部署定义了应用程序的期望状态,包括所需的副本数量、容器镜像和其他配置信息。部署控制器负责根据部署的定义创建、更新和删除Pod实例,同时提供滚动更新和回滚的功能。

三、Kafka集群架构

1、Kafka集群的组成与工作原理

Kafka集群由多个服务器节点(Broker)组成,每个Broker负责存储和处理一部分数据。Kafka的工作原理基于发布/订阅模式,其中生产者(Producer)将消息发布到一个或多个主题(Topic),而消费者(Consumer)订阅这些主题并从Broker拉取消息进行处理。

工作原理

1. 消息存储:

生产者将消息发布到主题,并根据主题的分区策略选择将消息存储到对应的分区中。每个分区的消息按顺序存储,并根据消息的偏移量(Offset)进行标识。

2. 消息复制:

Kafka通过副本集机制实现数据的高可用性和持久性。每个分区都有一个主副本(Leader Replica)和多个副本(Follower Replica),主副本负责处理读写请求,而副本用于备份数据。当主副本故障时,Kafka会自动选举一个新的主副本来接管工作。

3. 消息传输:

生产者将消息发送到Broker的主副本,Broker负责将消息复制到副本并进行持久化存储。消费者从Broker的主副本拉取消息进行处理,可以选择从不同的副本读取消息以实现负载均衡和故障恢复。

4. 消息保留策略:

Kafka支持设置消息的保留策略,包括基于时间、基于大小或基于日志段的策略。一旦消息达到保留期限或超过指定的大小限制,Kafka将自动删除过期消息,以释放存储空间。

   

2、Kafka集群的高可用性设计

1. 多副本复制

Kafka通过在多个Broker之间复制分区的副本来实现数据的冗余和高可用性。每个分区通常有多个副本,其中一个是主副本(Leader Replica),负责处理读写请求;其他副本是从副本(Follower Replica),用于备份数据。当主副本发生故障时,Kafka会自动选举一个新的主副本来接管工作,确保数据的可用性和一致性。

2. 自动故障检测与恢复

Kafka集群内置了自动故障检测和恢复机制,可以监控Broker和分区的健康状态,并在发现故障时自动进行故障转移和数据恢复。当Broker或分区发生故障时,Kafka会触发重新选举过程,选举出新的主副本并将数据复制到新的副本中,以实现快速的故障恢复。

3. 副本分布和均衡

Kafka会将分区的副本分布在不同的Broker上,以确保数据的冗余和均衡。通过将副本分布在不同的机架、数据中心或云区域中,可以提高系统的容错能力,即使整个机架或数据中心发生故障,系统仍然能够继续运行。

4. 水平扩展和动态伸缩

Kafka支持水平扩展和动态伸缩,可以根据负载情况和性能需求来增加或减少Broker的数量和容量。通过添加更多的Broker和分区,可以提高系统的吞吐量和容量,同时降低单点故障的风险,实现更高的可用性和可伸缩性。

   

四、准备部署环境 

1、准备k8s集群

这里我们使用的k8s集群版本为 1.23,也可以使用其他的版本,只是镜像导入命令不通,如果还未搭建k8s集群,请参考《在Centos中搭建 K8s 1.23 集群超详细讲解》这篇文章

2、准备StorageClass 

因为我们要对Kafka中的数据进行持久化,避免Pod漂移后数据丢失,保证数据的完整性与可用性

如果还未创建存储类,请参考《k8s 存储类(StorageClass)创建与动态生成PV解析,(附带镜像)》这篇文件。

3、准备部署Kafka集群所需的镜像 

 在k8s Node节点上执行以下命令

[root@node1 ~]# docker pull zookeeper:3.8

[root@node1 ~]# docker pull kafka:3.1.0

[root@node2 ~]# docker pull zookeeper:3.8

[root@node2 ~]# docker pull kafka:3.1.0

[root@node3 ~]# docker pull zookeeper:3.8

[root@node3 ~]# docker pull kafka:3.1.0

五、部署Kafka集群

1、部署Zookeeper集群

为什么部署kafka前要部署zookeeper?

Kafka依赖Zookeeper来实现分布式协调和配置管理。在Kafka集群中,Zookeeper扮演着多种角色,包括:

配置管理:Kafka集群的配置信息和元数据存储在Zookeeper中,包括主题(topics)、分区(partitions)、副本(replicas)等配置信息。Leader选举:Kafka的分区(partitions)被分布式存储在集群中的多个Broker上,每个分区都有一个Leader和多个Follower。Zookeeper负责Leader选举,确保每个分区都有一个活跃的Leader。Broker注册:Kafka Broker在启动时会向Zookeeper注册自己的信息,包括地址、ID等,以便其他Broker和客户端发现和连接。健康监测:Zookeeper监控Kafka集群中各个节点的健康状态,并在节点出现故障或宕机时触发相应的处理操作。

因此,在部署Kafka之前,需要先部署Zookeeper,确保Kafka集群正常运行所需的分布式协调和配置管理功能可用。没有Zookeeper,Kafka无法正常运行,并且无法实现高可用性、数据一致性和故障容错等特性。

编写部署Zookeeper集群的YAML文件

[root@master ~]# vim zook.yaml

# 添加如下内容

apiVersion: v1

kind: Namespace

metadata:

name: kafka

---

apiVersion: v1

kind: Service

metadata:

name: zookeeper-cluster #无头服务的名称,需要通过这个获取ip,与主机的对应关系

namespace: kafka

labels:

app: zookeeper

spec:

ports:

- port: 2181

name: zookeeper

- port: 2188

name: cluster1

- port: 3888

name: cluster2

clusterIP: None

selector:

app: zookeeper

---

apiVersion: v1

kind: Service

metadata:

name: zookeeper-nodeport-service-0

namespace: kafka

spec:

type: NodePort

selector:

statefulset.kubernetes.io/pod-name: zookeeper-0

ports:

- protocol: TCP

port: 80 # Service 暴露的端口

targetPort: 2181 # Pod 中容器的端口

nodePort: 32181 # NodePort 类型的端口范围为 30000-32767,可以根据需要调整

---

apiVersion: v1

kind: Service

metadata:

name: zookeeper-nodeport-service-1

namespace: kafka

spec:

type: NodePort

selector:

statefulset.kubernetes.io/pod-name: zookeeper-1

ports:

- protocol: TCP

port: 80 # Service 暴露的端口

targetPort: 2181 # Pod 中容器的端口

nodePort: 32182 # NodePort 类型的端口范围为 30000-32767,可以根据需要调整

---

apiVersion: v1

kind: Service

metadata:

name: zookeeper-nodeport-service-2

namespace: kafka

spec:

type: NodePort

selector:

statefulset.kubernetes.io/pod-name: zookeeper-2

ports:

- protocol: TCP

port: 80 # Service 暴露的端口

targetPort: 2181 # Pod 中容器的端口

nodePort: 32183 # NodePort 类型的端口范围为 30000-32767,可以根据需要调整

---

apiVersion: v1

kind: ConfigMap

metadata:

name: zookeeper-config

namespace: kafka

labels:

app: zookeeper

data: #具体挂载的配置文件

zoo.cfg: |+

tickTime=2000

initLimit=10

syncLimit=5

dataDir=/data

dataLogDir=/datalog

clientPort=2181

server.1=zookeeper-0.zookeeper-cluster.kafka:2188:3888

server.2=zookeeper-1.zookeeper-cluster.kafka:2188:3888

server.3=zookeeper-2.zookeeper-cluster.kafka:2188:3888

4lw.commands.whitelist=*

---

apiVersion: apps/v1

kind: StatefulSet

metadata:

name: zookeeper

namespace: kafka

spec:

serviceName: "zookeeper-cluster" #填写无头服务的名称

replicas: 3

selector:

matchLabels:

app: zookeeper

template:

metadata:

labels:

app: zookeeper

spec:

initContainers:

- name: set-zk-id

image: busybox:latest

command: ['sh', '-c', "hostname | cut -d '-' -f 2 | awk '{print $0 + 1}' > /data/myid"]

volumeMounts:

- name: data

mountPath: /data

containers:

- name: zookeeper

image: zookeeper:3.8

imagePullPolicy: Never

resources:

requests:

memory: "500Mi"

cpu: "500m"

limits:

memory: "1000Mi"

cpu: "1000m"

ports:

- containerPort: 2181

name: zookeeper

- containerPort: 2188

name: cluster1

- containerPort: 3888

name: cluster2

volumeMounts:

- name: zook-config #挂载配置

mountPath: /conf/zoo.cfg

subPath: zoo.cfg

- name: data

mountPath: /data

env:

- name: MY_POD_NAME

valueFrom:

fieldRef:

fieldPath: metadata.name #metadata.name获取自己pod名称添加到变量MY_POD_NAME

volumes:

- name: zook-config

configMap: #configMap挂载

name: zookeeper-config

volumeClaimTemplates: #这步自动创建pvc,并挂载动态pv

- metadata:

name: data

spec:

accessModes: ["ReadWriteMany"]

storageClassName: nfs

resources:

requests:

storage: 10Gi

创建Zookeeper集群

[root@master ~]# kubectl apply -f  zook.yaml

查看Pod状态  

2、部署Kafka集群

编写部署Kafka集群的YAML文件

[root@master ~]# vim kafka.yaml

# 输入如下内容

apiVersion: v1

kind: Service

metadata:

name: kafka-cluster #无头服务的名称,需要通过这个获取ip,与主机的对应关系

namespace: kafka

labels:

app: kafka

spec:

ports:

- port: 9092

name: kafka

clusterIP: None

selector:

app: kafka

---

apiVersion: v1

kind: Service

metadata:

name: kafka-nodeport-service-0

namespace: kafka

spec:

type: NodePort

selector:

statefulset.kubernetes.io/pod-name: kafka0-0

ports:

- protocol: TCP

port: 9092 # Service 暴露的端口

targetPort: 9092 # Pod 中容器的端口

nodePort: 30092 # NodePort 类型的端口范围为 30000-32767,可以根据需要调整

name: kafka

---

apiVersion: v1

kind: Service

metadata:

name: kafka-nodeport-service-1

namespace: kafka

spec:

type: NodePort

selector:

statefulset.kubernetes.io/pod-name: kafka1-0

ports:

- protocol: TCP

port: 9092 # Service 暴露的端口

targetPort: 9092 # Pod 中容器的端口

nodePort: 30093 # NodePort 类型的端口范围为 30000-32767,可以根据需要调整

name: kafka

---

apiVersion: v1

kind: Service

metadata:

name: kafka-nodeport-service-2

namespace: kafka

spec:

type: NodePort

selector:

statefulset.kubernetes.io/pod-name: kafka2-0

ports:

- protocol: TCP

port: 9092 # Service 暴露的端口

targetPort: 9092 # Pod 中容器的端口

nodePort: 30094 # NodePort 类型的端口范围为 30000-32767,可以根据需要调整

name: kafka

---

apiVersion: apps/v1

kind: StatefulSet

metadata:

name: kafka0

namespace: kafka

spec:

serviceName: "kafka-cluster" #填写无头服务的名称

replicas: 1

selector:

matchLabels:

app: kafka0

template:

metadata:

labels:

app: kafka0

spec:

containers:

- name: kafka

image: kafka:3.1.0

imagePullPolicy: Never

resources:

requests:

memory: "500Mi"

cpu: "500m"

limits:

memory: "1000Mi"

cpu: "2000m"

ports:

- containerPort: 9092

name: kafka

command:

- sh

- -c

- "exec /app/kafka/bin/kafka-server-start.sh /app/kafka/config/server.properties --override broker.id=0 \

--override listeners=PLAINTEXT://:9092 \

--override advertised.listeners=PLAINTEXT://192.168.40.181:30092 \

--override zookeeper.connect=192.168.40.181:32181,192.168.40.181:32182,192.168.40.181:32183/kafka \

--override log.dirs=/var/lib/kafka/data \

--override auto.create.topics.enable=true \

--override auto.leader.rebalance.enable=true \

--override background.threads=10 \

--override compression.type=producer \

--override delete.topic.enable=true \

--override leader.imbalance.check.interval.seconds=300 \

--override leader.imbalance.per.broker.percentage=10 \

--override log.flush.interval.messages=9223372036854775807 \

--override log.flush.offset.checkpoint.interval.ms=60000 \

--override log.flush.scheduler.interval.ms=9223372036854775807 \

--override log.retention.bytes=-1 \

--override log.retention.hours=168 \

--override log.roll.hours=168 \

--override log.roll.jitter.hours=0 \

--override log.segment.bytes=1073741824 \

--override log.segment.delete.delay.ms=60000 \

--override message.max.bytes=1000012 \

--override min.insync.replicas=1 \

--override num.io.threads=8 \

--override num.network.threads=3 \

--override num.recovery.threads.per.data.dir=1 \

--override num.replica.fetchers=1 \

--override offset.metadata.max.bytes=4096 \

--override offsets.commit.required.acks=-1 \

--override offsets.commit.timeout.ms=5000 \

--override offsets.load.buffer.size=5242880 \

--override offsets.retention.check.interval.ms=600000 \

--override offsets.retention.minutes=1440 \

--override offsets.topic.compression.codec=0 \

--override offsets.topic.num.partitions=50 \

--override offsets.topic.replication.factor=3 \

--override offsets.topic.segment.bytes=104857600 \

--override queued.max.requests=500 \

--override quota.consumer.default=9223372036854775807 \

--override quota.producer.default=9223372036854775807 \

--override replica.fetch.min.bytes=1 \

--override replica.fetch.wait.max.ms=500 \

--override replica.high.watermark.checkpoint.interval.ms=5000 \

--override replica.lag.time.max.ms=10000 \

--override replica.socket.receive.buffer.bytes=65536 \

--override replica.socket.timeout.ms=30000 \

--override request.timeout.ms=30000 \

--override socket.receive.buffer.bytes=102400 \

--override socket.request.max.bytes=104857600 \

--override socket.send.buffer.bytes=102400 \

--override unclean.leader.election.enable=true \

--override zookeeper.session.timeout.ms=6000 \

--override zookeeper.set.acl=false \

--override broker.id.generation.enable=true \

--override connections.max.idle.ms=600000 \

--override controlled.shutdown.enable=true \

--override controlled.shutdown.max.retries=3 \

--override controlled.shutdown.retry.backoff.ms=5000 \

--override controller.socket.timeout.ms=30000 \

--override default.replication.factor=1 \

--override fetch.purgatory.purge.interval.requests=1000 \

--override group.max.session.timeout.ms=300000 \

--override group.min.session.timeout.ms=6000 \

--override log.cleaner.backoff.ms=15000 \

--override log.cleaner.dedupe.buffer.size=134217728 \

--override log.cleaner.delete.retention.ms=86400000 \

--override log.cleaner.enable=true \

--override log.cleaner.io.buffer.load.factor=0.9 \

--override log.cleaner.io.buffer.size=524288 \

--override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308 \

--override log.cleaner.min.cleanable.ratio=0.5 \

--override log.cleaner.min.compaction.lag.ms=0 \

--override log.cleaner.threads=1 \

--override log.cleanup.policy=delete \

--override log.index.interval.bytes=4096 \

--override log.index.size.max.bytes=10485760 \

--override log.message.timestamp.difference.max.ms=9223372036854775807 \

--override log.message.timestamp.type=CreateTime \

--override log.preallocate=false \

--override log.retention.check.interval.ms=300000 \

--override max.connections.per.ip=2147483647 \

--override num.partitions=1 \

--override producer.purgatory.purge.interval.requests=1000 \

--override replica.fetch.backoff.ms=1000 \

--override replica.fetch.max.bytes=1048576 \

--override replica.fetch.response.max.bytes=10485760 \

--override reserved.broker.max.id=1000"

volumeMounts:

- name: data0

mountPath: /var/lib/kafka/data

env:

- name: MY_POD_NAME

valueFrom:

fieldRef:

fieldPath: metadata.name #metadata.name获取自己pod名称添加到变量MY_POD_NAME

- name: ALLOW_PLAINTEXT_LISTENER

value: "yes"

- name: KAFKA_HEAP_OPTS

value : "-Xms1g -Xmx1g"

- name: JMX_PORT

value: "5555"

volumeClaimTemplates: #这步自动创建pvc,并挂载动态pv

- metadata:

name: data0

spec:

accessModes: ["ReadWriteMany"]

storageClassName: nfs

resources:

requests:

storage: 10Gi

---

apiVersion: apps/v1

kind: StatefulSet

metadata:

name: kafka1

namespace: kafka

spec:

serviceName: "kafka-cluster" #填写无头服务的名称

replicas: 1

selector:

matchLabels:

app: kafka1

template:

metadata:

labels:

app: kafka1

spec:

containers:

- name: kafka

image: kafka:3.1.0

imagePullPolicy: Never

resources:

requests:

memory: "500Mi"

cpu: "500m"

limits:

memory: "1000Mi"

cpu: "2000m"

ports:

- containerPort: 9092

name: kafka

command:

- sh

- -c

- "exec /app/kafka/bin/kafka-server-start.sh /app/kafka/config/server.properties --override broker.id=1 \

--override listeners=PLAINTEXT://:9092 \

--override advertised.listeners=PLAINTEXT://192.168.40.181:30093 \

--override zookeeper.connect=192.168.40.181:32181,192.168.40.181:32182,192.168.40.181:32183/kafka \

--override log.dirs=/var/lib/kafka/data \

--override auto.create.topics.enable=true \

--override auto.leader.rebalance.enable=true \

--override background.threads=10 \

--override compression.type=producer \

--override delete.topic.enable=true \

--override leader.imbalance.check.interval.seconds=300 \

--override leader.imbalance.per.broker.percentage=10 \

--override log.flush.interval.messages=9223372036854775807 \

--override log.flush.offset.checkpoint.interval.ms=60000 \

--override log.flush.scheduler.interval.ms=9223372036854775807 \

--override log.retention.bytes=-1 \

--override log.retention.hours=168 \

--override log.roll.hours=168 \

--override log.roll.jitter.hours=0 \

--override log.segment.bytes=1073741824 \

--override log.segment.delete.delay.ms=60000 \

--override message.max.bytes=1000012 \

--override min.insync.replicas=1 \

--override num.io.threads=8 \

--override num.network.threads=3 \

--override num.recovery.threads.per.data.dir=1 \

--override num.replica.fetchers=1 \

--override offset.metadata.max.bytes=4096 \

--override offsets.commit.required.acks=-1 \

--override offsets.commit.timeout.ms=5000 \

--override offsets.load.buffer.size=5242880 \

--override offsets.retention.check.interval.ms=600000 \

--override offsets.retention.minutes=1440 \

--override offsets.topic.compression.codec=0 \

--override offsets.topic.num.partitions=50 \

--override offsets.topic.replication.factor=3 \

--override offsets.topic.segment.bytes=104857600 \

--override queued.max.requests=500 \

--override quota.consumer.default=9223372036854775807 \

--override quota.producer.default=9223372036854775807 \

--override replica.fetch.min.bytes=1 \

--override replica.fetch.wait.max.ms=500 \

--override replica.high.watermark.checkpoint.interval.ms=5000 \

--override replica.lag.time.max.ms=10000 \

--override replica.socket.receive.buffer.bytes=65536 \

--override replica.socket.timeout.ms=30000 \

--override request.timeout.ms=30000 \

--override socket.receive.buffer.bytes=102400 \

--override socket.request.max.bytes=104857600 \

--override socket.send.buffer.bytes=102400 \

--override unclean.leader.election.enable=true \

--override zookeeper.session.timeout.ms=6000 \

--override zookeeper.set.acl=false \

--override broker.id.generation.enable=true \

--override connections.max.idle.ms=600000 \

--override controlled.shutdown.enable=true \

--override controlled.shutdown.max.retries=3 \

--override controlled.shutdown.retry.backoff.ms=5000 \

--override controller.socket.timeout.ms=30000 \

--override default.replication.factor=1 \

--override fetch.purgatory.purge.interval.requests=1000 \

--override group.max.session.timeout.ms=300000 \

--override group.min.session.timeout.ms=6000 \

--override log.cleaner.backoff.ms=15000 \

--override log.cleaner.dedupe.buffer.size=134217728 \

--override log.cleaner.delete.retention.ms=86400000 \

--override log.cleaner.enable=true \

--override log.cleaner.io.buffer.load.factor=0.9 \

--override log.cleaner.io.buffer.size=524288 \

--override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308 \

--override log.cleaner.min.cleanable.ratio=0.5 \

--override log.cleaner.min.compaction.lag.ms=0 \

--override log.cleaner.threads=1 \

--override log.cleanup.policy=delete \

--override log.index.interval.bytes=4096 \

--override log.index.size.max.bytes=10485760 \

--override log.message.timestamp.difference.max.ms=9223372036854775807 \

--override log.message.timestamp.type=CreateTime \

--override log.preallocate=false \

--override log.retention.check.interval.ms=300000 \

--override max.connections.per.ip=2147483647 \

--override num.partitions=1 \

--override producer.purgatory.purge.interval.requests=1000 \

--override replica.fetch.backoff.ms=1000 \

--override replica.fetch.max.bytes=1048576 \

--override replica.fetch.response.max.bytes=10485760 \

--override reserved.broker.max.id=1000"

volumeMounts:

- name: data1

mountPath: /var/lib/kafka/data

env:

- name: MY_POD_NAME

valueFrom:

fieldRef:

fieldPath: metadata.name #metadata.name获取自己pod名称添加到变量MY_POD_NAME

- name: ALLOW_PLAINTEXT_LISTENER

value: "yes"

- name: KAFKA_HEAP_OPTS

value : "-Xms1g -Xmx1g"

- name: JMX_PORT

value: "5555"

volumeClaimTemplates: #这步自动创建pvc,并挂载动态pv

- metadata:

name: data1

spec:

accessModes: ["ReadWriteMany"]

storageClassName: nfs

resources:

requests:

storage: 10Gi

---

apiVersion: apps/v1

kind: StatefulSet

metadata:

name: kafka2

namespace: kafka

spec:

serviceName: "kafka-cluster" #填写无头服务的名称

replicas: 1

selector:

matchLabels:

app: kafka2

template:

metadata:

labels:

app: kafka2

spec:

containers:

- name: kafka

image: kafka:3.1.0

imagePullPolicy: Never

resources:

requests:

memory: "500Mi"

cpu: "500m"

limits:

memory: "1000Mi"

cpu: "2000m"

ports:

- containerPort: 9092

name: kafka

command:

- sh

- -c

- "exec /app/kafka/bin/kafka-server-start.sh /app/kafka/config/server.properties --override broker.id=2 \

--override listeners=PLAINTEXT://:9092 \

--override advertised.listeners=PLAINTEXT://192.168.40.181:30094 \

--override zookeeper.connect=192.168.40.181:32181,192.168.40.181:32182,192.168.40.181:32183/kafka \

--override log.dirs=/var/lib/kafka/data \

--override auto.create.topics.enable=true \

--override auto.leader.rebalance.enable=true \

--override background.threads=10 \

--override compression.type=producer \

--override delete.topic.enable=true \

--override leader.imbalance.check.interval.seconds=300 \

--override leader.imbalance.per.broker.percentage=10 \

--override log.flush.interval.messages=9223372036854775807 \

--override log.flush.offset.checkpoint.interval.ms=60000 \

--override log.flush.scheduler.interval.ms=9223372036854775807 \

--override log.retention.bytes=-1 \

--override log.retention.hours=168 \

--override log.roll.hours=168 \

--override log.roll.jitter.hours=0 \

--override log.segment.bytes=1073741824 \

--override log.segment.delete.delay.ms=60000 \

--override message.max.bytes=1000012 \

--override min.insync.replicas=1 \

--override num.io.threads=8 \

--override num.network.threads=3 \

--override num.recovery.threads.per.data.dir=1 \

--override num.replica.fetchers=1 \

--override offset.metadata.max.bytes=4096 \

--override offsets.commit.required.acks=-1 \

--override offsets.commit.timeout.ms=5000 \

--override offsets.load.buffer.size=5242880 \

--override offsets.retention.check.interval.ms=600000 \

--override offsets.retention.minutes=1440 \

--override offsets.topic.compression.codec=0 \

--override offsets.topic.num.partitions=50 \

--override offsets.topic.replication.factor=3 \

--override offsets.topic.segment.bytes=104857600 \

--override queued.max.requests=500 \

--override quota.consumer.default=9223372036854775807 \

--override quota.producer.default=9223372036854775807 \

--override replica.fetch.min.bytes=1 \

--override replica.fetch.wait.max.ms=500 \

--override replica.high.watermark.checkpoint.interval.ms=5000 \

--override replica.lag.time.max.ms=10000 \

--override replica.socket.receive.buffer.bytes=65536 \

--override replica.socket.timeout.ms=30000 \

--override request.timeout.ms=30000 \

--override socket.receive.buffer.bytes=102400 \

--override socket.request.max.bytes=104857600 \

--override socket.send.buffer.bytes=102400 \

--override unclean.leader.election.enable=true \

--override zookeeper.session.timeout.ms=6000 \

--override zookeeper.set.acl=false \

--override broker.id.generation.enable=true \

--override connections.max.idle.ms=600000 \

--override controlled.shutdown.enable=true \

--override controlled.shutdown.max.retries=3 \

--override controlled.shutdown.retry.backoff.ms=5000 \

--override controller.socket.timeout.ms=30000 \

--override default.replication.factor=1 \

--override fetch.purgatory.purge.interval.requests=1000 \

--override group.max.session.timeout.ms=300000 \

--override group.min.session.timeout.ms=6000 \

--override log.cleaner.backoff.ms=15000 \

--override log.cleaner.dedupe.buffer.size=134217728 \

--override log.cleaner.delete.retention.ms=86400000 \

--override log.cleaner.enable=true \

--override log.cleaner.io.buffer.load.factor=0.9 \

--override log.cleaner.io.buffer.size=524288 \

--override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308 \

--override log.cleaner.min.cleanable.ratio=0.5 \

--override log.cleaner.min.compaction.lag.ms=0 \

--override log.cleaner.threads=1 \

--override log.cleanup.policy=delete \

--override log.index.interval.bytes=4096 \

--override log.index.size.max.bytes=10485760 \

--override log.message.timestamp.difference.max.ms=9223372036854775807 \

--override log.message.timestamp.type=CreateTime \

--override log.preallocate=false \

--override log.retention.check.interval.ms=300000 \

--override max.connections.per.ip=2147483647 \

--override num.partitions=1 \

--override producer.purgatory.purge.interval.requests=1000 \

--override replica.fetch.backoff.ms=1000 \

--override replica.fetch.max.bytes=1048576 \

--override replica.fetch.response.max.bytes=10485760 \

--override reserved.broker.max.id=1000"

volumeMounts:

- name: data2

mountPath: /var/lib/kafka/data

env:

- name: MY_POD_NAME

valueFrom:

fieldRef:

fieldPath: metadata.name #metadata.name获取自己pod名称添加到变量MY_POD_NAME

- name: ALLOW_PLAINTEXT_LISTENER

value: "yes"

- name: KAFKA_HEAP_OPTS

value : "-Xms1g -Xmx1g"

- name: JMX_PORT

value: "5555"

volumeClaimTemplates: #这步自动创建pvc,并挂载动态pv

- metadata:

name: data2

spec:

accessModes: ["ReadWriteMany"]

storageClassName: nfs

resources:

requests:

storage: 10Gi

创建Kafka集群

[root@master ~]# kubectl apply -f  kafka.yaml 

 查看Pod状态

 

 

至此,Kafka集群部署完成

💕💕💕每一次的分享都是一次成长的旅程,感谢您的陪伴和关注。希望这些关于Kafka的文章能陪伴您走过技术的一段旅程,共同见证成长和进步!😺😺😺

🧨🧨🧨让我们一起在技术的海洋中探索前行,共同书写美好的未来!!! 



声明

本文内容仅代表作者观点,或转载于其他网站,本站不以此文作为商业用途
如有涉及侵权,请联系本站进行删除
转载本站原创文章,请注明来源及作者。