今回も引き続き、Cloud Pub/Subについて述べる。今回は、2台の仮想マシン上でメッセージの送受信が行えるようなプログラムの内容について述べる。
前提条件
GCP(Google Cloud Platform)のアカウントが有り、下記記事の手順に従って、GCP上でDebian GNU/Linux搭載済の2台の仮想マシンを作成済であること。
また、以下の記事での環境構築を実施済であること。
さらに、以下の記事の「JDK1.8のインストール」「Mavenのインストール」が実施済であること。なお、MavenによるJavaプログラムの作成・実行手順も、上記記事を参照のこと。
その他、下記記事のトピックとサブスクリプションの作成が完了していること。
やってみたこと
2台の仮想マシンを起動
GCP上でDebian GNU/Linux搭載済の2台の仮想マシンを起動する。起動後の仮想マシンは以下の通り。
なお、上記仮想マシンのうち、今後は「test-linux-vm-2」をサーバー側とし、「test-linux-vm」をクライアント側とする。
サーバー側でのJavaプログラムの作成
サーバー側の仮想マシン「test-linux-vm-2」にログイン後、実施した内容は以下の通り。
1) mavenコマンドを利用して、use-pubsub-serverというMavenプロジェクトを作成
2) pom.xmlを修正し、TestPubSubServer.javaというプログラムを追加
ソース修正後のフォルダ構成は以下の通り
修正後のpom.xmlは以下の通り
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 | <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>test.pubsub</groupId> <artifactId>use-pubsub-server</artifactId> <version>0.0.1-SNAPSHOT</version> <name>use-pubsub-server</name> <url>http://maven.apache.org</url> <!-- 文字コードとJavaのバージョンの設定 --> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <java.version>1.8</java.version> </properties> <!-- プラグインの設定 --> <build> <plugins> <!-- Javaファイルのコンパイラの設定 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.6.1</version> <configuration> <source>${java.version}</source> <target>${java.version}</target> </configuration> </plugin> <!-- mavenプロジェクトのテスト時のエラー解消のための設定 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <version>3.0.0-M3</version> <configuration> <useSystemClassLoader>false</useSystemClassLoader> </configuration> </plugin> <!-- プロジェクトと依存するライブラリを1つにまとめる設定 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>2.6</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <!-- メインプログラムとして実行するクラスの指定 --> <mainClass>test.TestPubSubServer</mainClass> </manifest> </archive> </configuration> <executions> <execution> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> <!-- ライブラリ依存関係の設定 --> <dependencies> <dependency> <!-- Junitの設定 --> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <!-- PubSub用ライブラリを追加する設定 --> <groupId>com.google.cloud</groupId> <artifactId>google-cloud-pubsub</artifactId> <version>1.49.0</version> </dependency> </dependencies> </project> |
作成したTestPubSubServer.javaは以下の通り
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 | package test; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingDeque; import com.google.cloud.ServiceOptions; import com.google.cloud.pubsub.v1.AckReplyConsumer; import com.google.cloud.pubsub.v1.MessageReceiver; import com.google.cloud.pubsub.v1.Subscriber; import com.google.pubsub.v1.ProjectSubscriptionName; import com.google.pubsub.v1.PubsubMessage; public class TestPubSubServer { private static final String PROJECT_ID = ServiceOptions.getDefaultProjectId(); private static final String TOPIC_ID = "my-topic"; private static final String SUBSCRIPTION_ID = "my-sub"; private static final int MSG_CNT = 5; private static final BlockingQueue<PubsubMessage> messages = new LinkedBlockingDeque<>(); public static void main(String[] args) { try { // 送信されたメッセージを受信する doPull(); } catch (Exception e) { System.out.println(e); } } private static class MessageReceiverTest implements MessageReceiver { @Override public void receiveMessage(PubsubMessage message , AckReplyConsumer consumer) { // メッセージを受信しACK(肯定応答)を返す messages.offer(message); consumer.ack(); } } private static void doPull() throws InterruptedException { System.out.println("**** started doPull. ****"); // 送信されたメッセージを取得するSubscriberを生成 ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(PROJECT_ID, SUBSCRIPTION_ID); Subscriber subscriber = Subscriber.newBuilder( subscriptionName, new MessageReceiverTest()).build(); // メッセージの受信を開始 subscriber.startAsync().awaitRunning(); int cnt = 0; while (true) { // 受信したメッセージを出力 PubsubMessage message = messages.take(); System.out.println("Received Data: " + message.getData().toStringUtf8()); cnt++; // 全てのメッセージを受信できたら終了する if (cnt == MSG_CNT) { break; } } // Subscriberを終了 if (subscriber != null) { subscriber.stopAsync(); } System.out.println("**** ended doPull. ****"); } } |
なお、前回の「GCP Cloud Pub/Subを使ってみた(3)」で記載したTestPubSub.javaのうち、送信したメッセージを受信する部分のみ記載する形となる。
3) 「mvn compile」によりMavenプロジェクトをコンパイルした上で、「mvn package」によりJarファイルの作成を実行。実行後のフォルダ構成は以下の通り。
クライアント側でのJavaプログラムの作成
サーバー側の仮想マシンにログイン後、実施した内容は以下の通り。
1) mavenコマンドを利用して、use-pubsub-clientというMavenプロジェクトを作成
2) pom.xmlを修正し、TestPubSubClient.javaというプログラムを追加
ソース修正後のフォルダ構成は以下の通り
修正後のpom.xmlは以下の通り
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 | <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>test.pubsub</groupId> <artifactId>use-pubsub-client</artifactId> <version>0.0.1-SNAPSHOT</version> <name>use-pubsub-client</name> <url>http://maven.apache.org</url> <!-- 文字コードとJavaのバージョンの設定 --> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <java.version>1.8</java.version> </properties> <!-- プラグインの設定 --> <build> <plugins> <!-- Javaファイルのコンパイラの設定 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.6.1</version> <configuration> <source>${java.version}</source> <target>${java.version}</target> </configuration> </plugin> <!-- mavenプロジェクトのテスト時のエラー解消のための設定 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <version>3.0.0-M3</version> <configuration> <useSystemClassLoader>false</useSystemClassLoader> </configuration> </plugin> <!-- プロジェクトと依存するライブラリを1つにまとめる設定 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>2.6</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <!-- メインプログラムとして実行するクラスの指定 --> <mainClass>test.TestPubSubClient</mainClass> </manifest> </archive> </configuration> <executions> <execution> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> <!-- ライブラリ依存関係の設定 --> <dependencies> <dependency> <!-- Junitの設定 --> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <!-- PubSub用ライブラリを追加する設定 --> <groupId>com.google.cloud</groupId> <artifactId>google-cloud-pubsub</artifactId> <version>1.49.0</version> </dependency> </dependencies> </project> |
作成したTestPubSubClient.javaは以下の通り
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 | package test; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingDeque; import com.google.cloud.ServiceOptions; import com.google.cloud.pubsub.v1.Publisher; import com.google.protobuf.ByteString; import com.google.pubsub.v1.ProjectTopicName; import com.google.pubsub.v1.PubsubMessage; public class TestPubSubClient { private static final String PROJECT_ID = ServiceOptions.getDefaultProjectId(); private static final String TOPIC_ID = "my-topic"; private static final int MSG_CNT = 5; private static final BlockingQueue<PubsubMessage> messages = new LinkedBlockingDeque<>(); public static void main(String[] args) { try { // メッセージを送信する doPublish(); } catch (Exception e) { System.out.println(e); } } private static void doPublish() throws Exception { System.out.println("**** started doPublish. ****"); // トピック名を指定しメッセージ送信のためのPublisherを生成 Publisher publisher = Publisher.newBuilder( ProjectTopicName.of(PROJECT_ID, TOPIC_ID)).build(); // メッセージをPublisherに設定し送信(MSG_CNT数分繰り返す) for (int i = 0; i < MSG_CNT; i++) { String message = "Hello PubSub " + i + " !!"; ByteString data = ByteString.copyFromUtf8(message); PubsubMessage pubsubMessage = PubsubMessage.newBuilder() .setData(data).build(); publisher.publish(pubsubMessage); // 送信したメッセージを出力 System.out.println("Sended Data : " + message); } // Publisherを終了 if (publisher != null) { publisher.shutdown(); } System.out.println("**** ended doPublish. ****"); System.out.println(); } } |
なお、前回の「GCP Cloud Pub/Subを使ってみた(3)」で記載したTestPubSub.javaのうち、メッセージを送信する部分のみ記載する形となる。
3) 「mvn compile」によりMavenプロジェクトをコンパイルした上で、「mvn package」によりJarファイルの作成を実行。実行後のフォルダ構成は以下の通り。
実行確認
2台のマシン間での、Pub/Subによるメッセージ送受信を確認した結果は以下の通り。
1) サーバー側(test-linux-vm-2)のプログラムを起動