GCP

GCP Cloud Pub/Subを使ってみた(3)

今回も引き続き、Cloud Pub/Subについて述べる。今回は、1台の仮想マシン上でメッセージの送受信が行えるようなプログラムの内容について述べる。

前提条件

GCP(Google Cloud Platform)のアカウントが有り、下記記事の手順に従って、GCP上でDebian GNU/Linux搭載済の仮想マシンを作成済であること。

GCP(Google Cloud Platform)でLinux搭載済の仮想マシンを作成してみたGCP(Google Cloud Platform)上でLinux搭載済の仮想マシンを作成し、GCS(Google Cloud Stor...

また、以下の記事での環境構築を実施済であること。

GCSとBigQueryで連動するプログラムを作成してみた(2)今回も引き続き、GCSとBigQueryで連動するプログラムについて述べる。ここでは、Windows 10が入っているローカル環境にて、...

さらに、以下の記事の「JDK1.8のインストール」「Mavenのインストール」が実施済であること。なお、MavenによるJavaプログラムの作成・実行手順も、上記記事を参照のこと。

GCSとBigQueryで連動するプログラムを作成してみた(2)今回も引き続き、GCSとBigQueryで連動するプログラムについて述べる。ここでは、Windows 10が入っているローカル環境にて、...

その他、下記記事のトピックとサブスクリプションの作成が完了していること。

GCP Cloud Pub/Subを使ってみた(2)今回も引き続き、Cloud Pub/Subについて述べる。今回は、GCP(Google Cloud Platform)のPub/Subを...

やってみたこと

  1. Mavenプロジェクトの作成
  2. ソースコードの作成
  3. MavenプロジェクトのコンパイルとJarファイルの作成
  4. javaコマンドによるJarファイルの実行

 

Mavenプロジェクトの作成

GCP(Google Cloud Platform)上でDebian GNU/Linux搭載済の仮想マシンを起動し、ログインした状態で、以下のように、mavenコマンドを利用して、use-pubsubというMavenプロジェクトを作成する。
mavenプロジェクトの作成

ソースコードの作成

pom.xmlを修正し、TestPubSub.javaというプログラムを追加する。ソース修正後のフォルダ構成は以下の通り。
ソースコードの作成

修正後のpom.xmlは以下の通り。

<project xmlns="http://maven.apache.org/POM/4.0.0" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
        http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>test.pubsub</groupId>
  <artifactId>use-pubsub</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <name>use-pubsub</name>
  <url>http://maven.apache.org</url>

  <!-- 文字コードとJavaのバージョンの設定 -->
  <properties>
      <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
      <java.version>1.8</java.version>
  </properties>

  <!-- プラグインの設定 -->
  <build>
      <plugins>
          <!-- Javaファイルのコンパイラの設定 -->
          <plugin>
              <groupId>org.apache.maven.plugins</groupId>
              <artifactId>maven-compiler-plugin</artifactId>
              <version>3.6.1</version>
              <configuration>
                  <source>${java.version}</source>
                  <target>${java.version}</target>
              </configuration>
          </plugin>
          <!-- mavenプロジェクトのテスト時のエラー解消のための設定 -->
          <plugin>
              <groupId>org.apache.maven.plugins</groupId>
              <artifactId>maven-surefire-plugin</artifactId>
              <version>3.0.0-M3</version>
              <configuration>
                  <useSystemClassLoader>false</useSystemClassLoader>
              </configuration>
          </plugin>
          <!-- プロジェクトと依存するライブラリを1つにまとめる設定 -->
          <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>2.6</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <!-- メインプログラムとして実行するクラスの指定 -->
                            <mainClass>test.TestPubSub</mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
          </plugin>
      </plugins>
  </build>

  <!-- ライブラリ依存関係の設定 -->
  <dependencies>
      <dependency>
          <!-- Junitの設定 -->
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
          <version>3.8.1</version>
          <scope>test</scope>
      </dependency>
      <dependency>
          <!-- PubSub用ライブラリを追加する設定 -->
          <groupId>com.google.cloud</groupId>
          <artifactId>google-cloud-pubsub</artifactId>
          <version>1.49.0</version>
      </dependency>
  </dependencies>

</project>

また、作成したTestPubSub.javaは以下の通り。

package test;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;

import com.google.cloud.ServiceOptions;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;

public class TestPubSub {

        private static final String PROJECT_ID = ServiceOptions.getDefaultProjectId();
        private static final String TOPIC_ID = "my-topic";
        private static final String SUBSCRIPTION_ID = "my-sub";
        private static final int MSG_CNT = 5;
        private static final BlockingQueue<PubsubMessage> messages 
                                          = new LinkedBlockingDeque<>();

        public static void main(String[] args) {

               try {
                        // メッセージを送信する
                        doPublish();

                        // 送信されたメッセージを受信する
                        doPull();

                } catch (Exception e) {
                        System.out.println(e);
                }

        }

        private static void doPublish() throws Exception {

                System.out.println("**** started doPublish. ****");

                // トピック名を指定しメッセージ送信のためのPublisherを生成
                Publisher publisher = Publisher.newBuilder(
                       ProjectTopicName.of(PROJECT_ID, TOPIC_ID)).build();

                // メッセージをPublisherに設定し送信(MSG_CNT数分繰り返す)
                for (int i = 0; i < MSG_CNT; i++) {
                        String message = "Hello PubSub " + i + " !!";
                        ByteString data = ByteString.copyFromUtf8(message);
                        PubsubMessage pubsubMessage 
                            = PubsubMessage.newBuilder().setData(data).build();
                        publisher.publish(pubsubMessage);

                        // 送信したメッセージを出力
                        System.out.println("Sended Data : " + message);
                }

                // Publisherを終了
                if (publisher != null) {
                        publisher.shutdown();
                }

                System.out.println("**** ended doPublish. ****");
                System.out.println();

        }

       private static class MessageReceiverTest implements MessageReceiver {

                @Override
                public void receiveMessage(PubsubMessage message
                                         , AckReplyConsumer consumer) {
                         // メッセージを受信しACK(肯定応答)を返す
                         // ACKを返さないとメッセージが再送され続けてしまう
                        messages.offer(message);
                        consumer.ack();
                }

        }

        private static void doPull() throws InterruptedException {

                System.out.println("**** started doPull. ****");

                // 送信されたメッセージを取得するSubscriberを生成
                ProjectSubscriptionName subscriptionName 
                    = ProjectSubscriptionName.of(PROJECT_ID, SUBSCRIPTION_ID);
                Subscriber subscriber = Subscriber.newBuilder(subscriptionName
                                            , new MessageReceiverTest()).build();

                // メッセージの受信を開始
                subscriber.startAsync().awaitRunning();
                int cnt = 0;
                while (true) {
                        // 受信したメッセージを出力
                        PubsubMessage message = messages.take();
                        System.out.println("Received Data: " 
                            + message.getData().toStringUtf8());
                        cnt++;

                        // 全てのメッセージを受信できたら終了する
                        if (cnt == MSG_CNT) {
                               break;
                        }
                 }

                // Subscriberを終了
                if (subscriber != null) {
                        subscriber.stopAsync();
                }

               System.out.println("**** ended doPull. ****");
     }

}

MavenプロジェクトのコンパイルとJarファイルの作成

「mvn compile」によりMavenプロジェクトをコンパイルした上で、「mvn package」によりJarファイルの作成を行う。実行後のフォルダ構成は以下の通り。
jarファイルの作成

javaコマンドによるJarファイルの実行

作成したJarファイルを実行した結果は以下の通りで、メッセージを送信後、送信したメッセージが受信できることが確認できる。
実行結果