This document was translated from the polish original with google translate and slightly corrected manually. After translation I did not verify if it works correctly - if any command fails, take a look at the polish original.

how to run and use Kafka with SSL in a very simple scenario

I recently learned Kafka (see also here). For this purpose, among other things, I wanted to run a Kafka broker listening over SSL on my computer, send a message and receive it. Here are my notes on how I did it.

The plan is this: There will be one docker container with zookeeper and one with broker. I will create a keystore with the broker's public and private keys - this keystore will be used by the broker. I will export the public key from this keystore to the server's certificate - it will be used by the client to be able to talk to the broker (because everything that the client sends to the broker must be encrypted with the broker's public key). The client will also use this certificate to authenticate the broker (i.e. to verify that the broker they connected to is real). But since this broker's certificate is self-signed, then in order for the client to accept this certificate, I need to create a truststore for the client and import the broker's certificate into this truststore. But the other way around is similar: the client must authenticate to the broker, so the client must have a pair of keys (private and public), must have a certificate with this public key, and the broker must accept this certificate. So I will also create a second keystore that will have the private and public keys of the kafka client - this keystore will be used by the client. I will export the public key from this keystore to a client certificate. Since this client certificate is self-signed, the broker would not normally trust it - but I will create a second truststore that the broker will use, and import the client certificate into this broker's truststore. And then there is one more thing that was not obvious to me: the client must trust it when uploading its certificate, and the server must also trust it when uploading its certificate. So to the broker truststore I will also import the broker's certificate and to the client truststore I will also import the client's certificate. Well, it turns out that both truststores contain the same certificates. So, probably in this case, in practice it would be enough to have one truststore and for both the broker and the client to use it - but I don't do that, because it would be less didactic.

0. I install kafkacat; if necessary, oram docker, especially if I do these experiments not for the first time:
$ sudo apt-get install kafkacat
$ docker rm -f $(docker ps -aq)
$ docker network prune

1. I run zookeeper:
$ docker network create kafka
$ docker run -d --network=kafka --name=zookeeper -e ZOOKEEPER_CLIENT_PORT=2181 confluentinc/cp-zookeeper:latest

2. I am creating a keystore for kafka (server):
$ mkdir -p kafka/secrets
$ cd kafka/secrets
$ keytool -keystore kafka.server.keystore.jks -alias localhost -validity 365 -genkey -keyalg RSA -storepass your_server_secret_password -keypass your_server_secret_password -dname "CN=localhost, OU=Your_Organization_Unit, O=Your_Organization, L=Your_City, S=Your_State , C=Your_Country" -storetype pkcs12

3. I create a keystore for a client:
$ keytool -keystore kafka.client.keystore.jks -alias localhost -validity 365 -genkey -keyalg RSA -storepass your_client_secret_password -keypass your_client_secret_password -dname "CN=localhost, OU=Your_Organization_Unit, O=Your_Organization, L=Your_City, S=Your_State , C=Your_Country" -storetype pkcs12

4. I export server and client certificates:
$ keytool -export -alias localhost -file server-certificate.crt -keystore kafka.server.keystore.jks -storepass your_server_secret_password
$ keytool -export -alias localhost -file client-certificate.crt -keystore kafka.client.keystore.jks -storepass your_client_secret_password

5. I create a truststore for the server and import the client certificate (so the server can trust the client) and my server certificate (so the server can trust its own certificate):
$ keytool -import -noprompt -file client-certificate.crt -alias client -keystore kafka.server.truststore.jks -storepass your_server_truststore_password
$ keytool -import -noprompt -file server-certificate.crt -alias server -keystore kafka.server.truststore.jks -storepass your_server_truststore_password

6. I create a truststore for the client and import the server certificate (so the client can trust the server) and my client certificate (so the client can trust its own certificate):
$ keytool -import -noprompt -file server-certificate.crt -alias server -keystore kafka.client.truststore.jks -storepass your_client_truststore_password
$ keytool -import -noprompt -file client-certificate.crt -alias client -keystore kafka.client.truststore.jks -storepass your_client_truststore_password

7. I run kafka broker:
$ echo "your_server_secret_password" > my_server_password
$ cd ../..
$ docker run -d --network=kafka --name=broker1 -p 9093:9093 -v $(pwd)/kafka/secrets:/etc/kafka/secrets -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 -e KAFKA_BROKER_ID=1 - e KAFKA_LISTENERS=SSL://:9093 -e KAFKA_ADVERTISED_LISTENER=SSL://localhost:9093 -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=SSL:SSL -e KAFKA_INTER_BROKER_LISTENER_NAME=SSL -e KAFKA_SSL_KEYSTORE_PASSWORD=your_server_sec ret_password -e KAFKA_SSL_KEYSTORE_LOCATION=/etc/kafka/ -e KAFKA_SSL_KEYSTORE_FILENAME= kafka.server.keystore.jks -e KAFKA_SSL_KEY_PASSWORD=your_server_secret_password -e KAFKA_SSL_KEY_CREDENTIALS=my_server_password -e KAFKA_SSL_KEYSTORE_CREDENTIALS=my_server_password -e KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM= - e KAFKA_SSL_TRUSTSTORE_LOCATION=/etc/kafka/secrets/kafka.server.truststore.jks -e KAFKA_SSL_TRUSTSTORE_PASSWORD=your_server_truststore_password confluentinc /cp-kafka:latest
$ docker logs broker1 | loess

8. I hit the broker1 machine:
$ docker exec -it broker1 /bin/bash

9. There, on the broker1 machine, I create a configuration file that I will use as a kafka client:
$ cat > my_client_config.properties
security.protocol=SSL
ssl.truststore.location=/etc/kafka/secrets/kafka.client.truststore.jks
ssl.truststore.password=your_client_truststore_password
ssl.keystore.location=/etc/kafka/secrets/kafka.client.keystore.jks
ssl.keystore.password=your_client_secret_password
ssl.key.password=your_client_secret_password

10. Still on the broker1 machine, I create a topic:
$ kafka-topics --create --topic_videos_watched --bootstrap-server localhost:9093 --command-config my_client_config.properties --replication-factor 1 --partitions 1

11. Still on the broker1 machine, I am watching the topic information:
$ kafka-topics --describe --topic_videos watched --command-config my_client_config.properties --bootstrap-server localhost:9093

12. I exit the broker1 machine, returning to the host.
$ exit

13. On the host, I send a message to kafka:
$ openssl x509 -inform der -in kafka/secrets/server-certificate.crt -out kafka/secrets/server-certificate.pem
$ echo '12345:{"userId": 12345, "movie": "By the Nemunas", "date": "2023-07-06"}' | kafkacat -P -b localhost:9093 -t_videos_watched -K: -X security.protocol=SSL -X ssl.ca.location=./kafka/secrets/server-certificate.pem -X ssl.keystore.location=./ kafka/secrets/kafka.client.keystore.jks -X ssl.keystore.password=your_client_secret_password -X ssl.key.password=your_client_secret_password

14. I receive messages from kafka:
$ kafkacat -C -b localhost:9093 -t_videos_watched -X security.protocol=SSL -X ssl.ca.location=./kafka/secrets/server-certificate.pem -X ssl.keystore.location=./kafka/ secrets/kafka.client.keystore.jks -X ssl.keystore.password=your_client_secret_password -X ssl.key.password=your_client_secret_password

15. And now I will broadcast the message programmatically. I create a directory structure for the project:
$ mkdir -p sender/src/main/java/com/example/kafka

16. I create a java file:

$ cat > sender/src/main/java/com/example/kafka/ProducerExample.java
package com.example.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class ProducerExample {
     public static void main(String[] args) {
         Properties props = new Properties();
         props.put("bootstrap.servers", "localhost:9093");
         props.put("security.protocol", "SSL");
         props.put("ssl.truststore.location", "kafka/secrets/kafka.client.truststore.jks");
         props.put("ssl.truststore.password", "your_client_truststore_password");
         props.put("ssl.keystore.location", "kafka/secrets/kafka.client.keystore.jks");
         props.put("ssl.keystore.password", "your_client_secret_password");
         props.put("ssl.key.password", "your_client_secret_password");
         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

         Producer producer = new KafkaProducer<>(props);
         try {
             String key = "12346";
             String value = "{\"userId\": 12346, \"movie\": \"Lighthouse Keeper\", \"date\": \"2023-07-26\"}";
             producer.send(new ProducerRecord("watched_movies", key, value));
         } finally {
             producer.close();
         }
     }
}

17. I create a pom.xml file:

$ cat > sender/pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
                              http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <modelVersion>4.0.0</modelVersion>

     <groupId>com.example</groupId>
     <artifactId>kafka-java-producer</artifactId>
     <version>1.0-SNAPSHOT</version>
     <packaging>jar</packaging>

     <properties>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         <maven.compiler.source>1.8</maven.compiler.source>
         <maven.compiler.target>1.8</maven.compiler.target>
     </properties>

     <dependencies>
         <dependency>
             <groupId>org.apache.kafka</groupId>
             <artifactId>kafka-clients</artifactId>
             <version>2.8.0</version>
         </dependency>
         <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-simple</artifactId>
             <version>1.7.30</version>
         </dependency>
     </dependencies>

     <build>
         <plugins>
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-compiler-plugin</artifactId>
                 <version>3.8.0</version>
             </plugin>
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-jar-plugin</artifactId>
                 <version>3.1.2</version>
                 <configuration>
                     <archive>
                         <manifesto>
                             <mainClass>com.example.kafka.ProducerExample</mainClass>
                         </manifesto>
                     </archive>
                 </configuration>
             </plugin>
             <plugin>
                 <artifactId>maven-assembly-plugin</artifactId>
                 <version>3.3.0</version>
                 <configuration>
                     <archive>
                         <manifesto>
                             <mainClass>com.example.kafka.ProducerExample</mainClass>
                         </manifesto>
                     </archive>
                     <descriptorRefs>
                         <descriptorRef>jar-with-dependencies</descriptorRef>
                     </descriptorRefs>
                 </configuration>
                 <executions>
                     <execution>
                         <id>make-assembly</id>
                         <phase>package</phase>
                         <goals>
                             <goal>single</goal>
                         </goals>
                     </execution>
                 </executions>
             </plugin>
         </plugins>
     </build>
</project>

18. I build:
$ cd sender
$ mvn clean package

19. I run a program that sends a message to the broker:
$cd..
$ java -jar sender/target/kafka-java-producer-1.0-SNAPSHOT-jar-with-dependencies.jar

20. I receive messages from kafka:
$ kafkacat -C -b localhost:9093 -t_videos_watched -X security.protocol=SSL -X ssl.ca.location=./kafka/secrets/server-certificate.pem -X ssl.keystore.location=./kafka/ secrets/kafka.client.keystore.jks -X ssl.keystore.password=your_client_secret_password -X ssl.key.password=your_client_secret_password