Niedawno uczyłem się Kafki (zobacz też tu). W tym celu między innymi chciałem uruchomić sobie na komputerze brokera Kafki słuchającego po SSL, wysłać jakiś komunikat i odebrać go. Oto moje notatki, jak to robiłem.
Plan jest taki: Będzie jeden kontener dockerowy z zookeeperem i drugi z brokerem. Stworzę keystore, w którym będzie klucz publiczny i prywatny brokera - tego keystore'a będzie używał broker. Klucz publiczny z tego keystore'a wyeksportuję do certyfikatu serwera - będzie go używał klient, żeby móc rozmawiać z brokerem (no bo wszystko, co klient wysyła do brokera, musi przecież szyfrować kluczem publicznym brokera). Klient będzie też używał tego certyfikatu do uwierzytelnienia brokera (czyli do sprawdzenia, czy ten broker, z którym się połaczył, jest prawdziwy). Ale ponieważ ten certyfikat brokera jest samopodpisany, to żeby klient zaakceptował ten certyfikat, muszę stworzyć truststore dla klienta i do tego truststore'a zaimportować certyfikat brokera. No ale w drugą stronę z kolei podobnie: klient musi się uwierzytelnić przed brokerem, więc klient musi mieć parę kluczy (prywatny i publiczny), musi mieć certyfikat z tym kluczem publicznym, a broker musi zaakceptować ten certyfikat. Więc stworzę też drugi keystore, w którym będzie klucz prywatny i publiczny klienta kafkowego - tego keystore'a będzie używał klient. Klucz publiczny z tego keystore'a wyeksportuję do certyfikatu klienta. Ponieważ ten certyfikat klienta jest samopodpisany, to tak normalnie broker by mu nie zaufał - ale ja stworzę drugi truststore, którego będzie używał broker, i do tego truststore'a brokerowego zaimportuję certyfikat klienta. A potem jest jeszcze jedna sprawa, która była dla mnie nieoczywista: klient ładując swój certyfikat musi mu zaufać, a serwer ładując swój certyfikat też musi mu zaufać. Więc do truststore'a brokerowego zaimportuję również certyfikat brokera, a do truststore'a klienckiego zaimportuję również certfikat klienta. No więc w sumie okazuje się, że oba truststore'y zawierają te same certyfikaty. Więc pewnie w tym przypadku w praktyce wystarczyłoby mieć jeden truststore'a i żeby używał go i broker, i klient - ale ja tak nie robię, bo byłoby to mniej jasne dydaktycznie.
0. Instaluję kafkacat; jeśli trzeba, oram dockera, zwłaszcza jeśli te eksperymenty robię nie pierwszy raz:
$ sudo apt-get install kafkacat
$ docker rm -f $(docker ps -aq)
$ docker network prune
1. Uruchamiam zookeepera:
$ docker network create kafka
$ docker run -d --network=kafka --name=zookeeper -e ZOOKEEPER_CLIENT_PORT=2181 confluentinc/cp-zookeeper:latest
2. Tworzę keystore dla kafki (serwera):
$ mkdir -p kafka/secrets
$ cd kafka/secrets
$ keytool -keystore kafka.server.keystore.jks -alias localhost -validity 365 -genkey -keyalg RSA -storepass your_server_secret_password -keypass your_server_secret_password -dname "CN=localhost, OU=Your_Organization_Unit, O=Your_Organization, L=Your_City, S=Your_State, C=Your_Country" -storetype pkcs12
3. Tworzę keystore dla klienta:
$ keytool -keystore kafka.client.keystore.jks -alias localhost -validity 365 -genkey -keyalg RSA -storepass your_client_secret_password -keypass your_client_secret_password -dname "CN=localhost, OU=Your_Organization_Unit, O=Your_Organization, L=Your_City, S=Your_State, C=Your_Country" -storetype pkcs12
4. Eksportuję certyfikaty serwera i klienta:
$ keytool -export -alias localhost -file server-certificate.crt -keystore kafka.server.keystore.jks -storepass your_server_secret_password
$ keytool -export -alias localhost -file client-certificate.crt -keystore kafka.client.keystore.jks -storepass your_client_secret_password
5. Tworzę truststore dla serwera i importuję doń certyfikat klienta (aby serwer mógł zaufać klientowi) oraz swój certyfikat serwera (aby serwer ufał swojemu własnemu certyfikatowi):
$ keytool -import -noprompt -file client-certificate.crt -alias client -keystore kafka.server.truststore.jks -storepass your_server_truststore_password
$ keytool -import -noprompt -file server-certificate.crt -alias server -keystore kafka.server.truststore.jks -storepass your_server_truststore_password
6. Tworzę truststore dla klienta i importuję doń certyfikat serwera (aby klient mógł zaufać serwerowi) oraz swój certyfikat klienta (aby klient ufał swojemu własnemu certyfikatowi):
$ keytool -import -noprompt -file server-certificate.crt -alias server -keystore kafka.client.truststore.jks -storepass your_client_truststore_password
$ keytool -import -noprompt -file client-certificate.crt -alias client -keystore kafka.client.truststore.jks -storepass your_client_truststore_password
7. Uruchamiam brokera kafki:
$ echo "your_server_secret_password" > moje_haslo_serwera
$ cd ../..
$ docker run -d --network=kafka --name=broker1 -p 9093:9093 -v $(pwd)/kafka/secrets:/etc/kafka/secrets -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 -e KAFKA_BROKER_ID=1 -e KAFKA_LISTENERS=SSL://:9093 -e KAFKA_ADVERTISED_LISTENERS=SSL://localhost:9093 -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=SSL:SSL -e KAFKA_INTER_BROKER_LISTENER_NAME=SSL -e KAFKA_SSL_KEYSTORE_PASSWORD=your_server_secret_password -e KAFKA_SSL_KEYSTORE_LOCATION=/etc/kafka/ -e KAFKA_SSL_KEYSTORE_FILENAME=kafka.server.keystore.jks -e KAFKA_SSL_KEY_PASSWORD=your_server_secret_password -e KAFKA_SSL_KEY_CREDENTIALS=moje_haslo_serwera -e KAFKA_SSL_KEYSTORE_CREDENTIALS=moje_haslo_serwera -e KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM= -e KAFKA_SSL_TRUSTSTORE_LOCATION=/etc/kafka/secrets/kafka.server.truststore.jks -e KAFKA_SSL_TRUSTSTORE_PASSWORD=your_server_truststore_password confluentinc/cp-kafka:latest
$ docker logs broker1 | less
8. Wbijam się na maszynę broker1:
$ docker exec -it broker1 /bin/bash
9. Tam, na maszynie broker1, tworzę plik konfiguracyjny, którego zaraz użyję jako klient kafki:
$ cat > konfiguracja_mojego_klienta.properties
security.protocol=SSL
ssl.truststore.location=/etc/kafka/secrets/kafka.client.truststore.jks
ssl.truststore.password=your_client_truststore_password
ssl.keystore.location=/etc/kafka/secrets/kafka.client.keystore.jks
ssl.keystore.password=your_client_secret_password
ssl.key.password=your_client_secret_password
10. Nadal będąc na maszynie broker1 tworzę topik:
$ kafka-topics --create --topic obejrzane_filmy --bootstrap-server localhost:9093 --command-config konfiguracja_mojego_klienta.properties --replication-factor 1 --partitions 1
11. Nadal będąc na maszynie broker1 oglądam informacje o topiku:
$ kafka-topics --describe --topic obejrzane_filmy --command-config konfiguracja_mojego_klienta.properties --bootstrap-server localhost:9093
12. Wychodzę z maszyny broker1, wracając na gospodarza.
$ exit
13. Na gospodarzu nadaję komunikat do kafki:
$ openssl x509 -inform der -in kafka/secrets/server-certificate.crt -out kafka/secrets/server-certificate.pem
$ echo '12345:{"userId": 12345, "film": "Nad Niemnem", "data": "2023-07-06"}' | kafkacat -P -b localhost:9093 -t obejrzane_filmy -K: -X security.protocol=SSL -X ssl.ca.location=./kafka/secrets/server-certificate.pem -X ssl.keystore.location=./kafka/secrets/kafka.client.keystore.jks -X ssl.keystore.password=your_client_secret_password -X ssl.key.password=your_client_secret_password
14. Odbieram komunikaty z kafki:
$ kafkacat -C -b localhost:9093 -t obejrzane_filmy -X security.protocol=SSL -X ssl.ca.location=./kafka/secrets/server-certificate.pem -X ssl.keystore.location=./kafka/secrets/kafka.client.keystore.jks -X ssl.keystore.password=your_client_secret_password -X ssl.key.password=your_client_secret_password
15. A teraz nadam komunikat programistycznie. Tworzę strukturę katalogów dla projektu:
$ mkdir -p nadawacz/src/main/java/com/example/kafka
16. Tworzę plik javowy:
$ cat > nadawacz/src/main/java/com/example/kafka/ProducerExample.java package com.example.kafka; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class ProducerExample { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9093"); props.put("security.protocol", "SSL"); props.put("ssl.truststore.location", "kafka/secrets/kafka.client.truststore.jks"); props.put("ssl.truststore.password", "your_client_truststore_password"); props.put("ssl.keystore.location", "kafka/secrets/kafka.client.keystore.jks"); props.put("ssl.keystore.password", "your_client_secret_password"); props.put("ssl.key.password", "your_client_secret_password"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producerproducer = new KafkaProducer<>(props); try { String key = "12346"; String value = "{\"userId\": 12346, \"film\": \"Latarnik\", \"data\": \"2023-07-26\"}"; producer.send(new ProducerRecord ("obejrzane_filmy", key, value)); } finally { producer.close(); } } }
17. Tworzę plik pom.xml:
$ cat > nadawacz/pom.xml <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.example</groupId> <artifactId>kafka-java-producer</artifactId> <version>1.0-SNAPSHOT</version> <packaging>jar</packaging> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.8.0</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.30</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.0</version> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> <version>3.1.2</version> <configuration> <archive> <manifest> <mainClass>com.example.kafka.ProducerExample</mainClass> </manifest> </archive> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <version>3.3.0</version> <configuration> <archive> <manifest> <mainClass>com.example.kafka.ProducerExample</mainClass> </manifest> </archive> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
18. Buduję:
$ cd nadawacz
$ mvn clean package
19. Uruchamiam program nadający komunikat do brokera:
$ cd ..
$ java -jar nadawacz/target/kafka-java-producer-1.0-SNAPSHOT-jar-with-dependencies.jar
20. Odbieram komunikaty z kafki:
$ kafkacat -C -b localhost:9093 -t obejrzane_filmy -X security.protocol=SSL -X ssl.ca.location=./kafka/secrets/server-certificate.pem -X ssl.keystore.location=./kafka/secrets/kafka.client.keystore.jks -X ssl.keystore.password=your_client_secret_password -X ssl.key.password=your_client_secret_password