GCP

GCP Cloud Pub/Subを使ってみた(4)

今回も引き続き、Cloud Pub/Subについて述べる。今回は、2台の仮想マシン上でメッセージの送受信が行えるようなプログラムの内容について述べる。

前提条件

GCP(Google Cloud Platform)のアカウントが有り、下記記事の手順に従って、GCP上でDebian GNU/Linux搭載済の2台の仮想マシンを作成済であること。

GCP(Google Cloud Platform)でLinux搭載済の仮想マシンを作成してみたGCP(Google Cloud Platform)上でLinux搭載済の仮想マシンを作成し、GCS(Google Cloud Stor...

また、以下の記事での環境構築を実施済であること。

GCSとBigQueryで連動するプログラムを作成してみた(2)今回も引き続き、GCSとBigQueryで連動するプログラムについて述べる。ここでは、Windows 10が入っているローカル環境にて、...

さらに、以下の記事の「JDK1.8のインストール」「Mavenのインストール」が実施済であること。なお、MavenによるJavaプログラムの作成・実行手順も、上記記事を参照のこと。

GCSとBigQueryで連動するプログラムを作成してみた(2)今回も引き続き、GCSとBigQueryで連動するプログラムについて述べる。ここでは、Windows 10が入っているローカル環境にて、...

その他、下記記事のトピックとサブスクリプションの作成が完了していること。

GCP Cloud Pub/Subを使ってみた(2)今回も引き続き、Cloud Pub/Subについて述べる。今回は、GCP(Google Cloud Platform)のPub/Subを...

やってみたこと

  1. 2台の仮想マシンを起動
  2. サーバー側でのJavaプログラムの作成
  3. クライアント側でのJavaプログラムの作成
  4. 実行確認

 

2台の仮想マシンを起動

GCP上でDebian GNU/Linux搭載済の2台の仮想マシンを起動する。起動後の仮想マシンは以下の通り。
起動後の仮想マシン
なお、上記仮想マシンのうち、今後は「test-linux-vm-2」をサーバー側とし、「test-linux-vm」をクライアント側とする。

サーバー側でのJavaプログラムの作成

サーバー側の仮想マシン「test-linux-vm-2」にログイン後、実施した内容は以下の通り。

1) mavenコマンドを利用して、use-pubsub-serverというMavenプロジェクトを作成
mavenプロジェクト作成_サーバー

2) pom.xmlを修正し、TestPubSubServer.javaというプログラムを追加
ソース修正後のフォルダ構成は以下の通り
サーバー側ソースコード構成

修正後のpom.xmlは以下の通り

<project xmlns="http://maven.apache.org/POM/4.0.0" 
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
                         http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>test.pubsub</groupId>
  <artifactId>use-pubsub-server</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <name>use-pubsub-server</name>
  <url>http://maven.apache.org</url>

  <!-- 文字コードとJavaのバージョンの設定 -->
  <properties>
      <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
      <java.version>1.8</java.version>
  </properties>

  <!-- プラグインの設定 -->
  <build>
      <plugins>
          <!-- Javaファイルのコンパイラの設定 -->
          <plugin>
              <groupId>org.apache.maven.plugins</groupId>
              <artifactId>maven-compiler-plugin</artifactId>
              <version>3.6.1</version>
              <configuration>
                  <source>${java.version}</source>
                  <target>${java.version}</target>
              </configuration>
          </plugin>
          <!-- mavenプロジェクトのテスト時のエラー解消のための設定 -->
          <plugin>
              <groupId>org.apache.maven.plugins</groupId>
              <artifactId>maven-surefire-plugin</artifactId>
              <version>3.0.0-M3</version>
              <configuration>
                  <useSystemClassLoader>false</useSystemClassLoader>
              </configuration>
          </plugin>
          <!-- プロジェクトと依存するライブラリを1つにまとめる設定 -->
          <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>2.6</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <!-- メインプログラムとして実行するクラスの指定 -->
                            <mainClass>test.TestPubSubServer</mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
          </plugin>
      </plugins>
  </build>

  <!-- ライブラリ依存関係の設定 -->
  <dependencies>
      <dependency>
          <!-- Junitの設定 -->
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
          <version>3.8.1</version>
          <scope>test</scope>
      </dependency>
      <dependency>
          <!-- PubSub用ライブラリを追加する設定 -->
          <groupId>com.google.cloud</groupId>
          <artifactId>google-cloud-pubsub</artifactId>
          <version>1.49.0</version>
      </dependency>
  </dependencies>

</project>

作成したTestPubSubServer.javaは以下の通り

package test;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;

import com.google.cloud.ServiceOptions;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;

public class TestPubSubServer {

        private static final String PROJECT_ID = ServiceOptions.getDefaultProjectId();
        private static final String TOPIC_ID = "my-topic";
        private static final String SUBSCRIPTION_ID = "my-sub";
        private static final int MSG_CNT = 5;
        private static final BlockingQueue<PubsubMessage> messages 
                                          = new LinkedBlockingDeque<>();

        public static void main(String[] args) {

                try {
                        // 送信されたメッセージを受信する
                        doPull();

                } catch (Exception e) {
                        System.out.println(e);
                }

        }

        private static class MessageReceiverTest implements MessageReceiver {

                @Override
                public void receiveMessage(PubsubMessage message
                                         , AckReplyConsumer consumer) {
                        // メッセージを受信しACK(肯定応答)を返す
                        messages.offer(message);
                        consumer.ack();
                }

        }

        private static void doPull() throws InterruptedException {

                System.out.println("**** started doPull. ****");

                // 送信されたメッセージを取得するSubscriberを生成
                ProjectSubscriptionName subscriptionName 
                    = ProjectSubscriptionName.of(PROJECT_ID, SUBSCRIPTION_ID);
                Subscriber subscriber = Subscriber.newBuilder(
                      subscriptionName, new MessageReceiverTest()).build();

                // メッセージの受信を開始
                subscriber.startAsync().awaitRunning();
                int cnt = 0;
                while (true) {
                        // 受信したメッセージを出力
                        PubsubMessage message = messages.take();
                        System.out.println("Received Data: " 
                            + message.getData().toStringUtf8());
                        cnt++;

                        // 全てのメッセージを受信できたら終了する
                        if (cnt == MSG_CNT) {
                                break;
                        }
                }

                // Subscriberを終了
                if (subscriber != null) {
                        subscriber.stopAsync();
                }

                System.out.println("**** ended doPull. ****");
       }

}

なお、前回の「GCP Cloud Pub/Subを使ってみた(3)」で記載したTestPubSub.javaのうち、送信したメッセージを受信する部分のみ記載する形となる。

3) 「mvn compile」によりMavenプロジェクトをコンパイルした上で、「mvn package」によりJarファイルの作成を実行。実行後のフォルダ構成は以下の通り。
jarファイル作成_サーバー側

クライアント側でのJavaプログラムの作成

サーバー側の仮想マシンにログイン後、実施した内容は以下の通り。

1) mavenコマンドを利用して、use-pubsub-clientというMavenプロジェクトを作成
mavenプロジェクト作成_クライアント

2) pom.xmlを修正し、TestPubSubClient.javaというプログラムを追加
ソース修正後のフォルダ構成は以下の通り
クライアント側ソースコード構成

修正後のpom.xmlは以下の通り

<project xmlns="http://maven.apache.org/POM/4.0.0" 
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
      xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
                          http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>test.pubsub</groupId>
  <artifactId>use-pubsub-client</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <name>use-pubsub-client</name>
  <url>http://maven.apache.org</url>

  <!-- 文字コードとJavaのバージョンの設定 -->
  <properties>
      <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
      <java.version>1.8</java.version>
  </properties>

  <!-- プラグインの設定 -->
  <build>
      <plugins>
          <!-- Javaファイルのコンパイラの設定 -->
          <plugin>
              <groupId>org.apache.maven.plugins</groupId>
              <artifactId>maven-compiler-plugin</artifactId>
              <version>3.6.1</version>
              <configuration>
                  <source>${java.version}</source>
                  <target>${java.version}</target>
              </configuration>
          </plugin>
          <!-- mavenプロジェクトのテスト時のエラー解消のための設定 -->
          <plugin>
              <groupId>org.apache.maven.plugins</groupId>
              <artifactId>maven-surefire-plugin</artifactId>
              <version>3.0.0-M3</version>
              <configuration>
                  <useSystemClassLoader>false</useSystemClassLoader>
              </configuration>
          </plugin>
          <!-- プロジェクトと依存するライブラリを1つにまとめる設定 -->
          <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>2.6</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <!-- メインプログラムとして実行するクラスの指定 -->
                            <mainClass>test.TestPubSubClient</mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
          </plugin>
      </plugins>
  </build>

  <!-- ライブラリ依存関係の設定 -->
  <dependencies>
      <dependency>
          <!-- Junitの設定 -->
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
          <version>3.8.1</version>
          <scope>test</scope>
      </dependency>
      <dependency>
          <!-- PubSub用ライブラリを追加する設定 -->
          <groupId>com.google.cloud</groupId>
          <artifactId>google-cloud-pubsub</artifactId>
          <version>1.49.0</version>
      </dependency>
  </dependencies>

</project>

作成したTestPubSubClient.javaは以下の通り

package test;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;

import com.google.cloud.ServiceOptions;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;

public class TestPubSubClient {

        private static final String PROJECT_ID = ServiceOptions.getDefaultProjectId();
        private static final String TOPIC_ID = "my-topic";
        private static final int MSG_CNT = 5;
        private static final BlockingQueue<PubsubMessage> messages 
                                           = new LinkedBlockingDeque<>();

        public static void main(String[] args) {

               try {
                        // メッセージを送信する
                        doPublish();
			
                } catch (Exception e) {
                        System.out.println(e);
                }

        }

        private static void doPublish() throws Exception {

                System.out.println("**** started doPublish. ****");

                // トピック名を指定しメッセージ送信のためのPublisherを生成
               Publisher publisher = Publisher.newBuilder(
                            ProjectTopicName.of(PROJECT_ID, TOPIC_ID)).build();

                // メッセージをPublisherに設定し送信(MSG_CNT数分繰り返す)
               for (int i = 0; i < MSG_CNT; i++) {
                        String message = "Hello PubSub " + i + " !!";
                        ByteString data = ByteString.copyFromUtf8(message);
                        PubsubMessage pubsubMessage = PubsubMessage.newBuilder()
                                                         .setData(data).build();
                        publisher.publish(pubsubMessage);

                        // 送信したメッセージを出力
                        System.out.println("Sended Data : " + message);
                }

                // Publisherを終了
                if (publisher != null) {
                         publisher.shutdown();
                }

                System.out.println("**** ended doPublish. ****");
                System.out.println();

       }

}

なお、前回の「GCP Cloud Pub/Subを使ってみた(3)」で記載したTestPubSub.javaのうち、メッセージを送信する部分のみ記載する形となる。

3) 「mvn compile」によりMavenプロジェクトをコンパイルした上で、「mvn package」によりJarファイルの作成を実行。実行後のフォルダ構成は以下の通り。
jarファイル作成_クライアント側

実行確認

2台のマシン間での、Pub/Subによるメッセージ送受信を確認した結果は以下の通り。

1) サーバー側(test-linux-vm-2)のプログラムを起動
実行結果1

2) クライアント側(test-linux-vm)のプログラムを起動
実行結果2

3) サーバー側(test-linux-vm-2)で、クライアント側から送信されたメッセージが受信できることを確認
実行結果3