go to the english version

jak uruchomić i użyć Kafkę z SSL na bardzo prostym scenariuszu

Niedawno uczyłem się Kafki (zobacz też tu). W tym celu między innymi chciałem uruchomić sobie na komputerze brokera Kafki słuchającego po SSL, wysłać jakiś komunikat i odebrać go. Oto moje notatki, jak to robiłem.

Plan jest taki: Będzie jeden kontener dockerowy z zookeeperem i drugi z brokerem. Stworzę keystore, w którym będzie klucz publiczny i prywatny brokera - tego keystore'a będzie używał broker. Klucz publiczny z tego keystore'a wyeksportuję do certyfikatu serwera - będzie go używał klient, żeby móc rozmawiać z brokerem (no bo wszystko, co klient wysyła do brokera, musi przecież szyfrować kluczem publicznym brokera). Klient będzie też używał tego certyfikatu do uwierzytelnienia brokera (czyli do sprawdzenia, czy ten broker, z którym się połaczył, jest prawdziwy). Ale ponieważ ten certyfikat brokera jest samopodpisany, to żeby klient zaakceptował ten certyfikat, muszę stworzyć truststore dla klienta i do tego truststore'a zaimportować certyfikat brokera. No ale w drugą stronę z kolei podobnie: klient musi się uwierzytelnić przed brokerem, więc klient musi mieć parę kluczy (prywatny i publiczny), musi mieć certyfikat z tym kluczem publicznym, a broker musi zaakceptować ten certyfikat. Więc stworzę też drugi keystore, w którym będzie klucz prywatny i publiczny klienta kafkowego - tego keystore'a będzie używał klient. Klucz publiczny z tego keystore'a wyeksportuję do certyfikatu klienta. Ponieważ ten certyfikat klienta jest samopodpisany, to tak normalnie broker by mu nie zaufał - ale ja stworzę drugi truststore, którego będzie używał broker, i do tego truststore'a brokerowego zaimportuję certyfikat klienta. A potem jest jeszcze jedna sprawa, która była dla mnie nieoczywista: klient ładując swój certyfikat musi mu zaufać, a serwer ładując swój certyfikat też musi mu zaufać. Więc do truststore'a brokerowego zaimportuję również certyfikat brokera, a do truststore'a klienckiego zaimportuję również certfikat klienta. No więc w sumie okazuje się, że oba truststore'y zawierają te same certyfikaty. Więc pewnie w tym przypadku w praktyce wystarczyłoby mieć jeden truststore'a i żeby używał go i broker, i klient - ale ja tak nie robię, bo byłoby to mniej jasne dydaktycznie.

0. Instaluję kafkacat; jeśli trzeba, oram dockera, zwłaszcza jeśli te eksperymenty robię nie pierwszy raz:
$ sudo apt-get install kafkacat
$ docker rm -f $(docker ps -aq)
$ docker network prune

1. Uruchamiam zookeepera:
$ docker network create kafka
$ docker run -d --network=kafka --name=zookeeper -e ZOOKEEPER_CLIENT_PORT=2181 confluentinc/cp-zookeeper:latest

2. Tworzę keystore dla kafki (serwera):
$ mkdir -p kafka/secrets
$ cd kafka/secrets
$ keytool -keystore kafka.server.keystore.jks -alias localhost -validity 365 -genkey -keyalg RSA -storepass your_server_secret_password -keypass your_server_secret_password -dname "CN=localhost, OU=Your_Organization_Unit, O=Your_Organization, L=Your_City, S=Your_State, C=Your_Country" -storetype pkcs12

3. Tworzę keystore dla klienta:
$ keytool -keystore kafka.client.keystore.jks -alias localhost -validity 365 -genkey -keyalg RSA -storepass your_client_secret_password -keypass your_client_secret_password -dname "CN=localhost, OU=Your_Organization_Unit, O=Your_Organization, L=Your_City, S=Your_State, C=Your_Country" -storetype pkcs12

4. Eksportuję certyfikaty serwera i klienta:
$ keytool -export -alias localhost -file server-certificate.crt -keystore kafka.server.keystore.jks -storepass your_server_secret_password
$ keytool -export -alias localhost -file client-certificate.crt -keystore kafka.client.keystore.jks -storepass your_client_secret_password

5. Tworzę truststore dla serwera i importuję doń certyfikat klienta (aby serwer mógł zaufać klientowi) oraz swój certyfikat serwera (aby serwer ufał swojemu własnemu certyfikatowi):
$ keytool -import -noprompt -file client-certificate.crt -alias client -keystore kafka.server.truststore.jks -storepass your_server_truststore_password
$ keytool -import -noprompt -file server-certificate.crt -alias server -keystore kafka.server.truststore.jks -storepass your_server_truststore_password

6. Tworzę truststore dla klienta i importuję doń certyfikat serwera (aby klient mógł zaufać serwerowi) oraz swój certyfikat klienta (aby klient ufał swojemu własnemu certyfikatowi):
$ keytool -import -noprompt -file server-certificate.crt -alias server -keystore kafka.client.truststore.jks -storepass your_client_truststore_password
$ keytool -import -noprompt -file client-certificate.crt -alias client -keystore kafka.client.truststore.jks -storepass your_client_truststore_password

7. Uruchamiam brokera kafki:
$ echo "your_server_secret_password" > moje_haslo_serwera
$ cd ../..
$ docker run -d --network=kafka --name=broker1 -p 9093:9093 -v $(pwd)/kafka/secrets:/etc/kafka/secrets -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 -e KAFKA_BROKER_ID=1 -e KAFKA_LISTENERS=SSL://:9093 -e KAFKA_ADVERTISED_LISTENERS=SSL://localhost:9093 -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=SSL:SSL -e KAFKA_INTER_BROKER_LISTENER_NAME=SSL -e KAFKA_SSL_KEYSTORE_PASSWORD=your_server_secret_password -e KAFKA_SSL_KEYSTORE_LOCATION=/etc/kafka/ -e KAFKA_SSL_KEYSTORE_FILENAME=kafka.server.keystore.jks -e KAFKA_SSL_KEY_PASSWORD=your_server_secret_password -e KAFKA_SSL_KEY_CREDENTIALS=moje_haslo_serwera -e KAFKA_SSL_KEYSTORE_CREDENTIALS=moje_haslo_serwera -e KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM= -e KAFKA_SSL_TRUSTSTORE_LOCATION=/etc/kafka/secrets/kafka.server.truststore.jks -e KAFKA_SSL_TRUSTSTORE_PASSWORD=your_server_truststore_password confluentinc/cp-kafka:latest
$ docker logs broker1 | less

8. Wbijam się na maszynę broker1:
$ docker exec -it broker1 /bin/bash

9. Tam, na maszynie broker1, tworzę plik konfiguracyjny, którego zaraz użyję jako klient kafki:
$ cat > konfiguracja_mojego_klienta.properties
security.protocol=SSL
ssl.truststore.location=/etc/kafka/secrets/kafka.client.truststore.jks
ssl.truststore.password=your_client_truststore_password
ssl.keystore.location=/etc/kafka/secrets/kafka.client.keystore.jks
ssl.keystore.password=your_client_secret_password
ssl.key.password=your_client_secret_password

10. Nadal będąc na maszynie broker1 tworzę topik:
$ kafka-topics --create --topic obejrzane_filmy --bootstrap-server localhost:9093 --command-config konfiguracja_mojego_klienta.properties --replication-factor 1 --partitions 1

11. Nadal będąc na maszynie broker1 oglądam informacje o topiku:
$ kafka-topics --describe --topic obejrzane_filmy --command-config konfiguracja_mojego_klienta.properties --bootstrap-server localhost:9093

12. Wychodzę z maszyny broker1, wracając na gospodarza.
$ exit

13. Na gospodarzu nadaję komunikat do kafki:
$ openssl x509 -inform der -in kafka/secrets/server-certificate.crt -out kafka/secrets/server-certificate.pem
$ echo '12345:{"userId": 12345, "film": "Nad Niemnem", "data": "2023-07-06"}' | kafkacat -P -b localhost:9093 -t obejrzane_filmy -K: -X security.protocol=SSL -X ssl.ca.location=./kafka/secrets/server-certificate.pem -X ssl.keystore.location=./kafka/secrets/kafka.client.keystore.jks -X ssl.keystore.password=your_client_secret_password -X ssl.key.password=your_client_secret_password

14. Odbieram komunikaty z kafki:
$ kafkacat -C -b localhost:9093 -t obejrzane_filmy -X security.protocol=SSL -X ssl.ca.location=./kafka/secrets/server-certificate.pem -X ssl.keystore.location=./kafka/secrets/kafka.client.keystore.jks -X ssl.keystore.password=your_client_secret_password -X ssl.key.password=your_client_secret_password

15. A teraz nadam komunikat programistycznie. Tworzę strukturę katalogów dla projektu:
$ mkdir -p nadawacz/src/main/java/com/example/kafka

16. Tworzę plik javowy:

$ cat > nadawacz/src/main/java/com/example/kafka/ProducerExample.java
package com.example.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class ProducerExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9093");
        props.put("security.protocol", "SSL");
        props.put("ssl.truststore.location", "kafka/secrets/kafka.client.truststore.jks");
        props.put("ssl.truststore.password", "your_client_truststore_password");
        props.put("ssl.keystore.location", "kafka/secrets/kafka.client.keystore.jks");
        props.put("ssl.keystore.password", "your_client_secret_password");
        props.put("ssl.key.password", "your_client_secret_password");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer producer = new KafkaProducer<>(props);
        try {
            String key = "12346";
            String value = "{\"userId\": 12346, \"film\": \"Latarnik\", \"data\": \"2023-07-26\"}";
            producer.send(new ProducerRecord("obejrzane_filmy", key, value));
        } finally {
            producer.close();
        }
    }
}

17. Tworzę plik pom.xml:

$ cat > nadawacz/pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
                             http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>kafka-java-producer</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.8.0</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.30</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.0</version>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <version>3.1.2</version>
                <configuration>
                    <archive>
                        <manifest>
                            <mainClass>com.example.kafka.ProducerExample</mainClass>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.3.0</version>
                <configuration>
                    <archive>
                        <manifest>
                            <mainClass>com.example.kafka.ProducerExample</mainClass>
                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

18. Buduję:
$ cd nadawacz
$ mvn clean package

19. Uruchamiam program nadający komunikat do brokera:
$ cd ..
$ java -jar nadawacz/target/kafka-java-producer-1.0-SNAPSHOT-jar-with-dependencies.jar

20. Odbieram komunikaty z kafki:
$ kafkacat -C -b localhost:9093 -t obejrzane_filmy -X security.protocol=SSL -X ssl.ca.location=./kafka/secrets/server-certificate.pem -X ssl.keystore.location=./kafka/secrets/kafka.client.keystore.jks -X ssl.keystore.password=your_client_secret_password -X ssl.key.password=your_client_secret_password