Apache Kafka: セットアップと実行のためのステップバイステップ ガイド

Apache Kafka: セットアップと実行のためのステップバイステップ ガイド

今日のコンピューティング システムでは、毎日何百万ものデータ レコードが生成されています。これらには、金融取引、注文、または車のセンサーからのデータが含まれます。これらのデータ ストリーミング イベントをリアルタイムで処理し、異なるエンタープライズ システム間でイベント レコードを確実に移動するには、 Apache Kafkaが必要です。

Apache Kafka は、1 秒あたり 100 万件を超えるレコードを処理するオープンソースのデータ ストリーミング ソリューションです。この高スループットに加えて、Apache Kafka は、高いスケーラビリティと可用性、低遅延、永続的なストレージを提供します。

LinkedIn、Uber、Netflix などの企業は、リアルタイム処理とデータ ストリーミングに Apache Kafka を利用しています。 Apache Kafka を使い始める最も簡単な方法は、ローカル マシン上で Apache Kafka を起動して実行することです。これにより、Apache Kafka サーバーの動作を確認できるだけでなく、メッセージを生成および消費することもできます。

ローカルマシンで作業している人
ローカルマシンで作業している人

Kafka クライアントを使用したサーバーの起動、トピックの作成、Java コードの作成に関する実践的な経験があれば、Apache Kafka を使用してデータ パイプラインのすべてのニーズを満たす準備が整います。

Apache Kafka をローカル マシンにダウンロードする方法

Apache Kafka の最新バージョンは、 公式リンクからダウンロードできます。ダウンロードされたコンテンツは.tgz形式で圧縮されます。ダウンロードしたら、同じものを解凍する必要があります。

Linux を使用している場合は、ターミナルを開きます。次に、Apache Kafka 圧縮バージョンをダウンロードした場所に移動します。次のコマンドを実行します。

 tar -xzvf kafka_2.13-3.5.0.tgz

コマンドが完了すると、 kafka_2.13-3.5.0という新しいディレクトリが作成されることがわかります。以下を使用してフォルダー内に移動します。

 cd kafka_2.13-3.5.0

lsコマンドを使用して、このディレクトリの内容を一覧表示できるようになりました。

Windows ユーザーの場合も、同じ手順に従うことができます。 tarコマンドが見つからない場合は、WinZip などのサードパーティ ツールを使用してアーカイブを開くことができます。

ローカルマシンでApache Kafkaを起動する方法

Apache Kafka をダウンロードして解凍したら、実行を開始します。インストーラーはありません。コマンドラインまたはターミナルウィンドウから直接使用を開始できます。

Apache Kafka を始める前に、システムに Java 8 以降がインストールされていることを確認してください。 Apache Kafka には Java インストールを実行する必要があります。

#1. Apache Zookeeper サーバーを実行する

最初のステップは、Apache Zookeeper を実行することです。アーカイブの一部として事前にダウンロードできます。これは、構成を維持し、他のサービスとの同期を提供するサービスです。

アーカイブの内容を抽出したディレクトリに入ったら、次のコマンドを実行します。

Linux ユーザーの場合:

 bin/zookeeper-server-start.sh config/zookeeper.properties

Windows ユーザーの場合:

 bin/windows/zookeeper-server-start.bat config/zookeeper.properties 
Zookeeper の起動を示すターミナル ウィンドウ
Zookeeper の起動を示すターミナル ウィンドウ

zookeeper.propertiesファイルは、Apache Zookeeper サーバーを実行するための構成を提供します。データが保存されるローカル ディレクトリやサーバーが実行されるポートなどのプロパティを構成できます。

#2. Apache Kafkaサーバーを起動します

Apache Zookeeper サーバーが起動されたので、次は Apache Kafka サーバーを起動します。

新しいターミナルまたはコマンド プロンプト ウィンドウを開き、抽出されたファイルが存在するディレクトリに移動します。次に、以下のコマンドを使用して Apache Kafka サーバーを起動できます。

Linux ユーザーの場合:

 bin/kafka-server-start.sh config/server.properties

Windows ユーザーの場合:

 bin/windows/kafka-server-start.bat config/server.properties

Apache Kafka サーバーが実行されています。デフォルトの構成を変更したい場合は、 server.propertiesファイルを変更することで変更できます。さまざまな値が公式ドキュメントに記載されています。

ローカルマシンでApache Kafkaを使用する方法

これで、ローカル マシンで Apache Kafka を使用してメッセージを生成および消費する準備が整いました。 Apache Zookeeper サーバーと Apache Kafka サーバーが起動して実行されているので、最初のトピックを作成し、最初のメッセージを生成し、それを使用する方法を見てみましょう。

Apache Kafka でトピックを作成する手順は何ですか?

最初のトピックを作成する前に、トピックが実際に何であるかを理解しましょう。 Apache Kafka では、トピックはデータ ストリーミングに役立つ論理データ ストアです。これは、データが 1 つのコンポーネントから別のコンポーネントに転送されるチャネルと考えてください。

トピックはマルチプロデューサーとマルチコンシューマーをサポートしており、複数のシステムがトピックの書き込みと読み取りを行うことができます。他のメッセージング システムとは異なり、トピックからのメッセージは複数回利用できます。さらに、メッセージの保存期間についても言及できます。

銀行取引用のデータを生成するシステム (プロデューサー) の例を考えてみましょう。そして、別のシステム (消費者) がこのデータを消費し、アプリ通知をユーザーに送信します。これを容易にするために、トピックが必要です。

新しいターミナルまたはコマンド プロンプト ウィンドウを開き、アーカイブを抽出したディレクトリに移動します。次のコマンドは、 transactionsというトピックを作成します。

Linux ユーザーの場合:

 bin/kafka-topics.sh --create --topic transactions --bootstrap-server localhost:9092

Windows ユーザーの場合:

 bin/windows/kafka-topics.bat --create --topic transactions --bootstrap-server localhost:9092 
Kafka トピックを作成するための成功したコマンドを示すターミナル ウィンドウ
Kafka トピックを作成するための成功したコマンドを示すターミナル ウィンドウ

これで最初のトピックが作成され、メッセージの生成と使用を開始する準備が整いました。

Apache Kafka へのメッセージを生成するにはどうすればよいですか?

Apache Kafka トピックの準備ができたら、最初のメッセージを作成できます。新しいターミナルまたはコマンド プロンプト ウィンドウを開くか、トピックの作成に使用したものと同じウィンドウを使用します。次に、アーカイブの内容を抽出した適切なディレクトリにいることを確認します。コマンド ラインを使用して、次のコマンドを使用してトピックに関するメッセージを作成できます。

Linux ユーザーの場合:

 bin/kafka-console-producer.sh --topic transactions --bootstrap-server localhost:9092

Windows ユーザーの場合:

 bin/windows/kafka-console-producer.bat --topic transactions --bootstrap-server localhost:9092

コマンドを実行すると、ターミナルまたはコマンド プロンプト ウィンドウが入力を待機していることがわかります。最初のメッセージを書いて Enter キーを押します。

 > This is a transactional record for $100 
Kafka メッセージを生成するコマンドを表示するターミナル ウィンドウ
Kafka メッセージを生成するコマンドを表示するターミナル ウィンドウ

ローカル マシン上で Apache Kafka への最初のメッセージが作成されました。これで、このメッセージを使用する準備が整いました。

Apache Kafka からのメッセージを消費するにはどうすればよいですか?

トピックが作成され、Kafka トピックへのメッセージが生成されていれば、そのメッセージを使用できるようになります。

Apache Kafka を使用すると、複数のコンシューマを同じトピックにアタッチできます。各コンシューマは、論理識別子であるコンシューマ グループの一部になることができます。たとえば、同じデータを使用する必要がある 2 つのサービスがある場合、それらのサービスは異なるコンシューマ グループを持つことができます。

ただし、同じサービスのインスタンスが 2 つある場合は、同じメッセージを 2 回消費して処理することは避けたいでしょう。その場合、両方とも同じ消費者グループを持つことになります。

ターミナルまたはコマンド プロンプト ウィンドウで、適切なディレクトリにいることを確認します。次のコマンドを使用してコンシューマを起動します。

Linux ユーザーの場合:

 bin/kafka-console-consumer.sh --topic transactions --from-beginning --bootstrap-server localhost:9092 --group notif-consumer

Windows ユーザーの場合:

 bin/windows/kafka-console-consumer.bat --topic transactions --from-beginning --bootstrap-server localhost:9092 --group notif-consumer 
Kafka コンシューマーが実行中であることを示すターミナル ウィンドウ
Kafka コンシューマーが実行中であることを示すターミナル ウィンドウ

以前に作成したメッセージが端末に表示されるのがわかります。これで、Apache Kafka を使用して最初のメッセージが使用されました。

kafka-console-consumerコマンドには多くの引数が渡されます。それぞれの意味を見てみましょう。

  • --topicは、使用するトピックについて言及します。
  • --from-beginning存在する最初のメッセージからすぐにメッセージの読み取りを開始するようにコンソール コンシューマに指示します。
  • Apache Kafka サーバーは--bootstrap-serverオプションを介して言及されます
  • さらに、 --groupパラメータを渡すことでコンシューマ グループを指定できます。
  • コンシューマ グループ パラメータがない場合は自動生成されます。

コンソール コンシューマを実行している状態で、新しいメッセージの生成を試すことができます。それらがすべて消費され、ターミナルに表示されることがわかります。

トピックを作成し、メッセージの生成と消費が正常に完了したので、これを Java アプリケーションと統合しましょう。

Java を使用して Apache Kafka プロデューサーとコンシューマーを作成する方法

始める前に、ローカル マシンに Java 8 以降がインストールされていることを確認してください。 Apache Kafka は、シームレスな接続を可能にする独自のクライアント ライブラリを提供します。 Maven を使用して依存関係を管理している場合は、次の依存関係をpom.xmlに追加します。

 <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.5.0</version>
</dependency>

Maven リポジトリからライブラリをダウンロードして、Java クラスパスに追加することもできます。

ライブラリを配置したら、任意のコード エディターを開きます。 Java を使用してプロデューサーとコンシューマーを起動する方法を見てみましょう。

Apache Kafka Java プロデューサーを作成する

kafka-clientsライブラリを配置したら、Kafka プロデューサーの作成を開始する準備が整いました。

SimpleProducer.javaというクラスを作成しましょう。これは、以前に作成したトピックに関するメッセージを生成する役割を果たします。このクラス内で、 org.apache.kafka.clients.producer.KafkaProducerのインスタンスを作成します。その後、このプロデューサーを使用してメッセージを送信します。

Kafka プロデューサを作成するには、Apache Kafka サーバーのホストとポートが必要です。ローカルマシン上で実行しているため、ホストはlocalhostになります。サーバーの起動時にデフォルトのプロパティを変更していない場合、ポートは9092になります。プロデューサの作成に役立つ以下のコードを検討してください。

 package org.example.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class SimpleProducer {

    private final KafkaProducer<String, String> producer;

    public SimpleProducer(String host, String port) {
        String server = host + ":" + port;
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        this.producer = new KafkaProducer<>(properties);
    }
}

3 つのプロパティが設定されていることがわかります。それぞれについて簡単に説明しましょう。

  • BOOTSTRAP_SERVERS_CONFIG を使用すると、Apache Kafka サーバーが実行されている場所を定義できます
  • KEY_SERIALIZER_CLASS_CONFIG は、メッセージ キーの送信に使用する形式をプロデューサーに指示します。
  • 実際のメッセージを送信する形式は、VALUE_SERIALIZER_CLASS_CONFIG プロパティを使用して定義されます。

テキスト メッセージを送信するため、両方のプロパティはStringSerializer.classを使用するように設定されています。

実際にトピックにメッセージを送信するには、 ProducerRecordを受け取るproducer.send()メソッドを使用する必要があります。次のコードは、トピックにメッセージを送信し、メッセージ オフセットとともに応答を出力するメソッドを提供します。

 public void produce(String topic, String message) throws ExecutionException, InterruptedException {
    ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);
    final Future<RecordMetadata> send = this.producer.send(record);
    final RecordMetadata recordMetadata = send.get();
    System.out.println(recordMetadata);
}

コード全体を配置したら、トピックにメッセージを送信できるようになります。以下のコードに示すように、 mainメソッドを使用してこれをテストできます。

 package org.example.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class SimpleProducer {

    private final KafkaProducer<String, String> producer;

    public SimpleProducer(String host, String port) {
        String server = host + ":" + port;
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        this.producer = new KafkaProducer<>(properties);
    }

    public void produce(String topic, String message) throws ExecutionException, InterruptedException {
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);
        final Future<RecordMetadata> send = this.producer.send(record);
        final RecordMetadata recordMetadata = send.get();
        System.out.println(recordMetadata);
    }

    public static void main(String[] args) throws Exception{
       SimpleProducer producer = new SimpleProducer("localhost", "9092");
       producer.produce("transactions", "This is a transactional record of $200");
    }
}

このコードでは、ローカル マシン上の Apache Kafka サーバーに接続するSimpleProducerを作成しています。内部でKafkaProducerを使用して、トピックに関するテキスト メッセージを生成します。

Kafka プロデューサーのコードと出力を示す Java IDE のスクリーン キャプチャ
Kafka プロデューサーのコードと出力を示す Java IDE のスクリーン キャプチャ

Apache Kafka Java コンシューマを作成する

Java クライアントを使用して Apache Kafka コンシューマーを作成します。 SimpleConsumer.javaというクラスを作成します。次に、このクラスのコンストラクターを作成し、 org.apache.kafka.clients.consumer.KafkaConsumerを初期化します。コンシューマーを作成するには、Apache Kafka サーバーが実行されるホストとポートが必要です。さらに、コンシューマ グループとコンシューム元のトピックも必要です。以下に示すコード スニペットを使用します。

 package org.example.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;

public class SimpleConsumer {

    private static final String OFFSET_RESET = "earliest";

    private final KafkaConsumer<String, String> consumer;
    private boolean keepConsuming = true;

    public SimpleConsumer(String host, String port, String consumerGroupId, String topic) {
        String server = host + ":" + port;
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OFFSET_RESET);
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        this.consumer = new KafkaConsumer<>(properties);
        this.consumer.subscribe(List.of(topic));
    }
}

Kafka プロデューサーと同様に、Kafka コンシューマーも Properties オブジェクトを受け取ります。さまざまなプロパティ セットをすべて見てみましょう。

  • BOOTSTRAP_SERVERS_CONFIG は、Apache Kafka サーバーがどこで実行されているかをコンシューマに通知します
  • コンシューマ グループは、GROUP_ID_CONFIG を使用して指定されます。
  • コンシューマーが消費を開始するとき、AUTO_OFFSET_RESET_CONFIG を使用すると、どのくらい前からメッセージの消費を開始するかを指定できます。
  • KEY_DESERIALIZER_CLASS_CONFIG は、コンシューマにメッセージ キーのタイプを伝えます。
  • VALUE_DESERIALIZER_CLASS_CONFIG は、実際のメッセージのコンシューマ タイプを示します。

あなたの場合、テキストメッセージを使用することになるため、デシリアライザーのプロパティはStringDeserializer.classに設定されます。

これで、トピックからのメッセージを使用できるようになります。物事を単純にするために、メッセージが消費されたら、メッセージをコンソールに出力します。以下のコードを使用してこれを実現する方法を見てみましょう。

 private boolean keepConsuming = true;

public void consume() {
    while (keepConsuming) {
        final ConsumerRecords<String, String> consumerRecords = this.consumer.poll(Duration.ofMillis(100L));
        if (consumerRecords != null && !consumerRecords.isEmpty()) {
            consumerRecords.iterator().forEachRemaining(consumerRecord -> {
                System.out.println(consumerRecord.value());
            });
        }
    }
}

このコードはトピックのポーリングを継続します。 Consumer Record を受信すると、メッセージが出力されます。 main メソッドを使用して、コンシューマの動作をテストします。トピックを消費し、メッセージを出力し続ける Java アプリケーションを起動します。 Java アプリケーションを停止してコンシューマを終了します。

 package org.example.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;

public class SimpleConsumer {

    private static final String OFFSET_RESET = "earliest";

    private final KafkaConsumer<String, String> consumer;
    private boolean keepConsuming = true;

    public SimpleConsumer(String host, String port, String consumerGroupId, String topic) {
        String server = host + ":" + port;
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OFFSET_RESET);
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        this.consumer = new KafkaConsumer<>(properties);
        this.consumer.subscribe(List.of(topic));
    }

    public void consume() {
        while (keepConsuming) {
            final ConsumerRecords<String, String> consumerRecords = this.consumer.poll(Duration.ofMillis(100L));
            if (consumerRecords != null && !consumerRecords.isEmpty()) {
                consumerRecords.iterator().forEachRemaining(consumerRecord -> {
                    System.out.println(consumerRecord.value());
                });
            }
        }
    }

    public static void main(String[] args) {
        SimpleConsumer simpleConsumer = new SimpleConsumer("localhost", "9092", "transactions-consumer", "transactions");
        simpleConsumer.consume();
    }
}

コードを実行すると、Java プロデューサによって生成されたメッセージだけでなく、コンソール プロデューサ経由で生成したメッセージも消費されることがわかります。これは、 AUTO_OFFSET_RESET_CONFIGプロパティがearliestに設定されているためです。

Kafka Consumer のコードと出力を示す Java IDE のスクリーン キャプチャ
Kafka Consumer のコードと出力を示す Java IDE のスクリーン キャプチャ

SimpleConsumer を実行すると、コンソール プロデューサまたは SimpleProducer Java アプリケーションを使用して、トピックへのさらなるメッセージを生成できます。それらが消費され、コンソールに表示されることがわかります。

Apache Kafka でデータ パイプラインのすべてのニーズを満たします

Apache Kafka を使用すると、すべてのデータ パイプライン要件を簡単に処理できます。ローカル マシンに Apache Kafka をセットアップすると、Kafka が提供するさまざまな機能をすべて探索できます。さらに、公式 Java クライアントを使用すると、Apache Kafka サーバーの書き込み、接続、通信を効率的に行うことができます。

Apache Kafka は、多用途でスケーラブルで高性能なデータ ストリーミング システムであるため、まさにゲームチェンジャーとなります。これをローカル開発に使用したり、運用システムに統合したりすることもできます。ローカルでのセットアップが簡単であるのと同じように、大規模なアプリケーション向けに Apache Kafka を設定することも大きな作業ではありません。

データ ストリーミング プラットフォームをお探しの場合は、リアルタイム分析と処理に最適なストリーミング データ プラットフォームをご覧ください。

「 Apache Kafka: セットアップと実行のためのステップバイステップ ガイド」についてわかりやすく解説!絶対に観るべきベスト2動画

Complete Guide to Apache Kafka Cluster Setup & Administration for Beginners | Kafka Tutorial
Step by step installing Apache Kafka