Uchechukwu Onyekwuluje's Knowledge & Brain Dumps

Kafka Multinode Cluster Setup

Kafka is a stream processing software platform. It is written in scala and java. Kafka aims to provide a unified, high-throughput, low-latency platform for handling real-time data feeds. Kafka is capable of connecting to external systems. Kafka can be setup on a single node or in cluster mode. For this post, we will focus on Multinode cluster setup.

Assumptions

This post assumes the following

Terminology

To have a better understanding of kafka it is important to understand it’s components, terminologies and how they fit together

Cluster Layout

We will be building a 3 node cluster with the following specs:

Specification Defaults
CPU 2Ghz
Memory 8GB
Disk 50Gb

Cluster Servers

Our cluster will be made up of the following:

Hostname IP Address
kafkanode1 10.0.2.4
kafkanode2 10.0.2.5
kafkanode3 10.0.2.6

Operating System

This post will be focused on CentOS/RHEL. At the time of this post, I’ll be using CentOS 7/RHEL7

Firewall Configurations

This update needs to be done on all 3 Nodes

Systems User

Create kafka user in all the nodes and update permissions on data folder

sudo adduser kafka
sudo passwd kafka
sudo usermod -aG wheel kafka
sudo mkdir /data/zookeeper
sudo chown -R kafka:kafka /data/zookeeper

Package Requirements

Run these on all 3 Nodes

sudo yum update -y
sudo yum install -y libxml2 libxml2-devel libxslt libxslt-devel wget vim make gcc telnet \
libffi-devel openssl-devel openssl-devel bzip2-devel java-11-openjdk-devel java-11-openjdk

Create Broker IDs

Create Broker IDs on each node:

Hostname Command
kafkanode1 sudo echo ‘1’ > /data/zookeeper/myid
kafkanode2 sudo echo ‘2’ > /data/zookeeper/myid
kafkanode3 sudo echo ‘3’ > /data/zookeeper/myid

Install Kafka

Download and install kafka on all 3 nodes:

cd /tmp
wget https://downloads.apache.org/kafka/2.6.0/kafka_2.13-2.6.0.tgz
tar -zxvf kafka_2.13-2.6.0.tgz
sudo mv kafka_2.13-2.6.0 /opt/kafka
sudo mkdir /opt/kafka/logs
sudo chown -R kafka:kafka /opt/kafka
sudo rm -Rf kafka_2.13-2.6.0.tgz

Service Updates

Create kafka and zookeeper service on all 3 nodes :
/lib/systemd/system/zookeeper.service

[Unit]
Requires=network.target remote-fs.target
After=network.target remote-fs.target

[Service]
Type=simple
User=kafka
ExecStart=/opt/kafka/bin/zookeeper-server-start.sh /opt/kafka/config/zookeeper.properties
ExecStop=/opt/kafka/bin/zookeeper-server-stop.sh
Restart=on-abnormal

[Install]
WantedBy=multi-user.target

/etc/systemd/system/kafka.service

[Unit]
Requires=network.target remote-fs.target zookeeper.service
After=network.target remote-fs.target zookeeper.service

[Service]
Type=simple
User=kafka
ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
ExecStop=/opt/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal

[Install]
WantedBy=multi-user.target

Update Zookeeper

Update zookeeper config on all 3 nodes: /opt/kafka/config/zookeeper.properties

# The directory where the snapshot is stored.
dataDir=/data/zookeeper
# the port at which the clients will connect
clientPort=2181
clientPortAddress=kafkanode1
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
# Disable the adminserver by default to avoid port conflicts.
# Set the port to something non-conflicting if choosing to enable this
admin.enableServer=false
# admin.serverPort=8080
# The number of milliseconds of each tick
tickTime=2000

# The number of ticks that the initial synchronization phase can take
initLimit=10

# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5

# zoo servers
server.1=kafkanode1:2888:3888
server.2=kafkanode2:2888:3888
server.3=kafkanode3:2888:3888

note: Change clientPortAddress as needed per node

Update Server Properties

Update Server Properties Config on all 3 nodes: /opt/kafka/config/server.properties. The following should be updated as needed

Enable Services

Enable Kafka and Zookeeper services on all nodes

sudo systemctl enable zookeeper.service
sudo systemctl enable kafka.service

sudo systemctl start zookeeper.service
sudo systemctl start kafka.service

After starting the services, verify they are working correctly.
Verify Zookeeper

sudo systemctl status zookeeper.service

if this works well, you should see this

● zookeeper.service
   Loaded: loaded (/usr/lib/systemd/system/zookeeper.service; enabled; vendor preset: disabled)
   Active: active (running) since Thu 2020-10-22 08:03:55 UTC; 12s ago
 Main PID: 1859 (java)
   CGroup: /system.slice/zookeeper.service
           └─1859 java -Xmx512M -Xms512M -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInl...

Oct 22 08:04:03 kafkanode1 zookeeper-server-start.sh[1859]: at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:242)
Oct 22 08:04:03 kafkanode1 zookeeper-server-start.sh[1859]: at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:224)
Oct 22 08:04:03 kafkanode1 zookeeper-server-start.sh[1859]: at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:403)
Oct 22 08:04:03 kafkanode1 zookeeper-server-start.sh[1859]: at java.base/java.net.Socket.connect(Socket.java:609)
Oct 22 08:04:03 kafkanode1 zookeeper-server-start.sh[1859]: at org.apache.zookeeper.server.quorum.QuorumCnxManager.initiateConnection(QuorumCnxManager.java:373)
Oct 22 08:04:03 kafkanode1 zookeeper-server-start.sh[1859]: at org.apache.zookeeper.server.quorum.QuorumCnxManager$QuorumConnectionReqThread.run(QuorumCnxManag...ava:436)
Oct 22 08:04:03 kafkanode1 zookeeper-server-start.sh[1859]: at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
Oct 22 08:04:03 kafkanode1 zookeeper-server-start.sh[1859]: at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
Oct 22 08:04:03 kafkanode1 zookeeper-server-start.sh[1859]: at java.base/java.lang.Thread.run(Thread.java:834)
Oct 22 08:04:06 kafkanode1 zookeeper-server-start.sh[1859]: [2020-10-22 08:04:06,382] WARN Exception causing close of session 0x0: ZooKeeperServer not running ...verCnxn)
Hint: Some lines were ellipsized, use -l to show in full.

Verify Kafka

sudo systemctl status kafka.service

if all works well, you should see

● kafka.service
   Loaded: loaded (/etc/systemd/system/kafka.service; enabled; vendor preset: disabled)
   Active: active (running) since Thu 2020-10-22 08:04:01 UTC; 13s ago
 Main PID: 2229 (java)
   CGroup: /system.slice/kafka.service
           └─2229 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineL...

Oct 22 08:04:10 kafkanode1 kafka-server-start.sh[2229]: [2020-10-22 08:04:10,524] INFO Socket error occurred: kafkanode3/10.0.2.6:2181: Connection refused (or...ientCnxn)
Oct 22 08:04:12 kafkanode1 kafka-server-start.sh[2229]: [2020-10-22 08:04:12,005] INFO Opening socket connection to server kafkanode2/10.0.2.5:2181. Will not ...ientCnxn)
Oct 22 08:04:12 kafkanode1 kafka-server-start.sh[2229]: [2020-10-22 08:04:12,009] INFO Socket error occurred: kafkanode2/10.0.2.5:2181: Connection refused (or...ientCnxn)
Oct 22 08:04:12 kafkanode1 kafka-server-start.sh[2229]: [2020-10-22 08:04:12,670] INFO Opening socket connection to server kafkanode1/10.0.2.4:2181. Will not ...ientCnxn)
Oct 22 08:04:12 kafkanode1 kafka-server-start.sh[2229]: [2020-10-22 08:04:12,670] INFO Socket connection established, initiating session, client: /10.0.2.4:47...ientCnxn)
Oct 22 08:04:12 kafkanode1 kafka-server-start.sh[2229]: [2020-10-22 08:04:12,675] INFO Unable to read additional data from server sessionid 0x0, likely server...ientCnxn)
Oct 22 08:04:12 kafkanode1 kafka-server-start.sh[2229]: [2020-10-22 08:04:12,935] INFO Opening socket connection to server kafkanode3/10.0.2.6:2181. Will not ...ientCnxn)
Oct 22 08:04:12 kafkanode1 kafka-server-start.sh[2229]: [2020-10-22 08:04:12,937] INFO Socket error occurred: kafkanode3/10.0.2.6:2181: Connection refused (or...ientCnxn)
Oct 22 08:04:14 kafkanode1 kafka-server-start.sh[2229]: [2020-10-22 08:04:14,621] INFO Opening socket connection to server kafkanode2/10.0.2.5:2181. Will not ...ientCnxn)
Oct 22 08:04:14 kafkanode1 kafka-server-start.sh[2229]: [2020-10-22 08:04:14,622] INFO Socket error occurred: kafkanode2/10.0.2.5:2181: Connection refused (or...ientCnxn)
Hint: Some lines were ellipsized, use -l to show in full.

Tests and Verification

sudo as user kafka and run the following

That is it. Your use case will be different so make changes as needed.

References