Niedawno uczyłem się Kafki (zobacz też tu). W tym celu między innymi chciałem uruchomić sobie na komputerze brokera Kafki słuchającego po SSL, wysłać jakiś komunikat i odebrać go. Oto moje notatki, jak to robiłem.
Plan jest taki: Będzie jeden kontener dockerowy z zookeeperem i drugi z brokerem. Stworzę keystore, w którym będzie klucz publiczny i prywatny brokera - tego keystore'a będzie używał broker. Klucz publiczny z tego keystore'a wyeksportuję do certyfikatu serwera - będzie go używał klient, żeby móc rozmawiać z brokerem (no bo wszystko, co klient wysyła do brokera, musi przecież szyfrować kluczem publicznym brokera). Klient będzie też używał tego certyfikatu do uwierzytelnienia brokera (czyli do sprawdzenia, czy ten broker, z którym się połaczył, jest prawdziwy). Ale ponieważ ten certyfikat brokera jest samopodpisany, to żeby klient zaakceptował ten certyfikat, muszę stworzyć truststore dla klienta i do tego truststore'a zaimportować certyfikat brokera. No ale w drugą stronę z kolei podobnie: klient musi się uwierzytelnić przed brokerem, więc klient musi mieć parę kluczy (prywatny i publiczny), musi mieć certyfikat z tym kluczem publicznym, a broker musi zaakceptować ten certyfikat. Więc stworzę też drugi keystore, w którym będzie klucz prywatny i publiczny klienta kafkowego - tego keystore'a będzie używał klient. Klucz publiczny z tego keystore'a wyeksportuję do certyfikatu klienta. Ponieważ ten certyfikat klienta jest samopodpisany, to tak normalnie broker by mu nie zaufał - ale ja stworzę drugi truststore, którego będzie używał broker, i do tego truststore'a brokerowego zaimportuję certyfikat klienta. A potem jest jeszcze jedna sprawa, która była dla mnie nieoczywista: klient ładując swój certyfikat musi mu zaufać, a serwer ładując swój certyfikat też musi mu zaufać. Więc do truststore'a brokerowego zaimportuję również certyfikat brokera, a do truststore'a klienckiego zaimportuję również certfikat klienta. No więc w sumie okazuje się, że oba truststore'y zawierają te same certyfikaty. Więc pewnie w tym przypadku w praktyce wystarczyłoby mieć jeden truststore'a i żeby używał go i broker, i klient - ale ja tak nie robię, bo byłoby to mniej jasne dydaktycznie.
0. Instaluję kafkacat; jeśli trzeba, oram dockera, zwłaszcza jeśli te eksperymenty robię nie pierwszy raz:
$ sudo apt-get install kafkacat
$ docker rm -f $(docker ps -aq)
$ docker network prune
1. Uruchamiam zookeepera:
$ docker network create kafka
$ docker run -d --network=kafka --name=zookeeper -e ZOOKEEPER_CLIENT_PORT=2181 confluentinc/cp-zookeeper:latest
2. Tworzę keystore dla kafki (serwera):
$ mkdir -p kafka/secrets
$ cd kafka/secrets
$ keytool -keystore kafka.server.keystore.jks -alias localhost -validity 365 -genkey -keyalg RSA -storepass your_server_secret_password -keypass your_server_secret_password -dname "CN=localhost, OU=Your_Organization_Unit, O=Your_Organization, L=Your_City, S=Your_State, C=Your_Country" -storetype pkcs12
3. Tworzę keystore dla klienta:
$ keytool -keystore kafka.client.keystore.jks -alias localhost -validity 365 -genkey -keyalg RSA -storepass your_client_secret_password -keypass your_client_secret_password -dname "CN=localhost, OU=Your_Organization_Unit, O=Your_Organization, L=Your_City, S=Your_State, C=Your_Country" -storetype pkcs12
4. Eksportuję certyfikaty serwera i klienta:
$ keytool -export -alias localhost -file server-certificate.crt -keystore kafka.server.keystore.jks -storepass your_server_secret_password
$ keytool -export -alias localhost -file client-certificate.crt -keystore kafka.client.keystore.jks -storepass your_client_secret_password
5. Tworzę truststore dla serwera i importuję doń certyfikat klienta (aby serwer mógł zaufać klientowi) oraz swój certyfikat serwera (aby serwer ufał swojemu własnemu certyfikatowi):
$ keytool -import -noprompt -file client-certificate.crt -alias client -keystore kafka.server.truststore.jks -storepass your_server_truststore_password
$ keytool -import -noprompt -file server-certificate.crt -alias server -keystore kafka.server.truststore.jks -storepass your_server_truststore_password
6. Tworzę truststore dla klienta i importuję doń certyfikat serwera (aby klient mógł zaufać serwerowi) oraz swój certyfikat klienta (aby klient ufał swojemu własnemu certyfikatowi):
$ keytool -import -noprompt -file server-certificate.crt -alias server -keystore kafka.client.truststore.jks -storepass your_client_truststore_password
$ keytool -import -noprompt -file client-certificate.crt -alias client -keystore kafka.client.truststore.jks -storepass your_client_truststore_password
7. Uruchamiam brokera kafki:
$ echo "your_server_secret_password" > moje_haslo_serwera
$ cd ../..
$ docker run -d --network=kafka --name=broker1 -p 9093:9093 -v $(pwd)/kafka/secrets:/etc/kafka/secrets -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 -e KAFKA_BROKER_ID=1 -e KAFKA_LISTENERS=SSL://:9093 -e KAFKA_ADVERTISED_LISTENERS=SSL://localhost:9093 -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=SSL:SSL -e KAFKA_INTER_BROKER_LISTENER_NAME=SSL -e KAFKA_SSL_KEYSTORE_PASSWORD=your_server_secret_password -e KAFKA_SSL_KEYSTORE_LOCATION=/etc/kafka/ -e KAFKA_SSL_KEYSTORE_FILENAME=kafka.server.keystore.jks -e KAFKA_SSL_KEY_PASSWORD=your_server_secret_password -e KAFKA_SSL_KEY_CREDENTIALS=moje_haslo_serwera -e KAFKA_SSL_KEYSTORE_CREDENTIALS=moje_haslo_serwera -e KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM= -e KAFKA_SSL_TRUSTSTORE_LOCATION=/etc/kafka/secrets/kafka.server.truststore.jks -e KAFKA_SSL_TRUSTSTORE_PASSWORD=your_server_truststore_password confluentinc/cp-kafka:latest
$ docker logs broker1 | less
8. Wbijam się na maszynę broker1:
$ docker exec -it broker1 /bin/bash
9. Tam, na maszynie broker1, tworzę plik konfiguracyjny, którego zaraz użyję jako klient kafki:
$ cat > konfiguracja_mojego_klienta.properties
security.protocol=SSL
ssl.truststore.location=/etc/kafka/secrets/kafka.client.truststore.jks
ssl.truststore.password=your_client_truststore_password
ssl.keystore.location=/etc/kafka/secrets/kafka.client.keystore.jks
ssl.keystore.password=your_client_secret_password
ssl.key.password=your_client_secret_password
10. Nadal będąc na maszynie broker1 tworzę topik:
$ kafka-topics --create --topic obejrzane_filmy --bootstrap-server localhost:9093 --command-config konfiguracja_mojego_klienta.properties --replication-factor 1 --partitions 1
11. Nadal będąc na maszynie broker1 oglądam informacje o topiku:
$ kafka-topics --describe --topic obejrzane_filmy --command-config konfiguracja_mojego_klienta.properties --bootstrap-server localhost:9093
12. Wychodzę z maszyny broker1, wracając na gospodarza.
$ exit
13. Na gospodarzu nadaję komunikat do kafki:
$ openssl x509 -inform der -in kafka/secrets/server-certificate.crt -out kafka/secrets/server-certificate.pem
$ echo '12345:{"userId": 12345, "film": "Nad Niemnem", "data": "2023-07-06"}' | kafkacat -P -b localhost:9093 -t obejrzane_filmy -K: -X security.protocol=SSL -X ssl.ca.location=./kafka/secrets/server-certificate.pem -X ssl.keystore.location=./kafka/secrets/kafka.client.keystore.jks -X ssl.keystore.password=your_client_secret_password -X ssl.key.password=your_client_secret_password
14. Odbieram komunikaty z kafki:
$ kafkacat -C -b localhost:9093 -t obejrzane_filmy -X security.protocol=SSL -X ssl.ca.location=./kafka/secrets/server-certificate.pem -X ssl.keystore.location=./kafka/secrets/kafka.client.keystore.jks -X ssl.keystore.password=your_client_secret_password -X ssl.key.password=your_client_secret_password
15. A teraz nadam komunikat programistycznie. Tworzę strukturę katalogów dla projektu:
$ mkdir -p nadawacz/src/main/java/com/example/kafka
16. Tworzę plik javowy:
$ cat > nadawacz/src/main/java/com/example/kafka/ProducerExample.java
package com.example.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class ProducerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9093");
props.put("security.protocol", "SSL");
props.put("ssl.truststore.location", "kafka/secrets/kafka.client.truststore.jks");
props.put("ssl.truststore.password", "your_client_truststore_password");
props.put("ssl.keystore.location", "kafka/secrets/kafka.client.keystore.jks");
props.put("ssl.keystore.password", "your_client_secret_password");
props.put("ssl.key.password", "your_client_secret_password");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer producer = new KafkaProducer<>(props);
try {
String key = "12346";
String value = "{\"userId\": 12346, \"film\": \"Latarnik\", \"data\": \"2023-07-26\"}";
producer.send(new ProducerRecord("obejrzane_filmy", key, value));
} finally {
producer.close();
}
}
}
17. Tworzę plik pom.xml:
$ cat > nadawacz/pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>kafka-java-producer</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.30</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.1.2</version>
<configuration>
<archive>
<manifest>
<mainClass>com.example.kafka.ProducerExample</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.3.0</version>
<configuration>
<archive>
<manifest>
<mainClass>com.example.kafka.ProducerExample</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
18. Buduję:
$ cd nadawacz
$ mvn clean package
19. Uruchamiam program nadający komunikat do brokera:
$ cd ..
$ java -jar nadawacz/target/kafka-java-producer-1.0-SNAPSHOT-jar-with-dependencies.jar
20. Odbieram komunikaty z kafki:
$ kafkacat -C -b localhost:9093 -t obejrzane_filmy -X security.protocol=SSL -X ssl.ca.location=./kafka/secrets/server-certificate.pem -X ssl.keystore.location=./kafka/secrets/kafka.client.keystore.jks -X ssl.keystore.password=your_client_secret_password -X ssl.key.password=your_client_secret_password