GCP

GCSとBigQueryで連動するプログラムを作成してみた(4)

今回も引き続き、GCSとBigQueryで連動するプログラムについて述べる。ここでは、GCS(Google Cloud Storage)とBigQueryで連動する、作成したJava 1.8のプログラムの紹介を行う。

前提条件

以下の記事での環境構築が完了していること

GCSとBigQueryで連動するプログラムを作成してみた(2)今回も引き続き、GCSとBigQueryで連動するプログラムについて述べる。ここでは、Windows 10が入っているローカル環境にて、...

また、以下の記事での「Eclipseのダウンロードと解凍」が完了していること

JavaでSQL Serverにデータロードするプログラムを作成した(環境構築編)今回も引き続き、Javaで作成したSQL Serverにデータロードするプログラムについて記載する。ここでは、JavaでSQL Serv...

作成したJava 1.8のプログラム

ここでは、作成したJava 1.8のプログラムを紹介する。

1) 以下の構成でのMavenプロジェクト「select_from_sales」を作成する
javaプログラム構成

2) pom.xmlの内容は以下の通り

<project xmlns="http://maven.apache.org/POM/4.0.0" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
         http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>test.bigquery.select</groupId>
  <artifactId>select-from-sales</artifactId>
  <version>0.0.1-SNAPSHOT</version>

  <!-- 文字コードとJavaのバージョンの設定 -->
  <properties>
      <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
      <java.version>1.8</java.version>
  </properties>

  <!-- プラグインの設定 -->
  <build>
      <plugins>
          <!-- Javaファイルのコンパイラの設定 -->
          <plugin>
              <groupId>org.apache.maven.plugins</groupId>
              <artifactId>maven-compiler-plugin</artifactId>
              <version>3.6.1</version>
              <configuration>
                  <source>${java.version}</source>
                  <target>${java.version}</target>
              </configuration>
          </plugin>
          <!-- プロジェクトと依存するライブラリを1つにまとめる設定 -->
          <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>2.6</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <!-- メインプログラムとして実行するクラスの指定 -->
                            <mainClass>test.TestBigQuery</mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
          </plugin>
      </plugins>
  </build>

  <!-- ライブラリ依存関係の設定 -->
  <dependencies>
      <dependency>
          <!-- BigQuery用ライブラリを追加する設定 -->
          <groupId>com.google.cloud</groupId>
          <artifactId>google-cloud-bigquery</artifactId>
          <version>1.49.0</version>
      </dependency>
      <dependency>
          <!-- JSON用ライブラリを追加する設定 -->
          <groupId>org.json</groupId>
          <artifactId>json</artifactId>
          <version>20180813</version>
      </dependency>
  </dependencies>

</project>

3) メインとなるTestBigQuery.javaは以下の通り。ここで、次項で述べる4)5)を順に呼び出している

package test;

public class TestBigQuery {

        public static void main(String[] args) {
                InsertIntoSales.main(args);
                SelectFromSales.main(args);
        }

}

4) GCS上のCSVファイル(insert_bigquery_sales.csv)のデータを、BigQueryのsalesテーブルに追加する、InsertIntoSales.javaは以下の通り

package test;

import java.util.UUID;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.CsvOptions;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.LoadJobConfiguration;
import com.google.cloud.bigquery.LoadJobConfiguration.Builder;
import com.google.cloud.bigquery.Table;

public class InsertIntoSales {

        public static void main(String[] args) {

             // ロードするGCS上のファイルとBigQuery上のデータセットとテーブルの設定
             String sourceUri = "gs://test_purin_bucket/insert_bigquery_sales.csv";
             String dataSet = "bigquery_purin_it";
             String tableName = "sales";

             BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
             Table table = bigquery.getTable(dataSet, tableName);

             // BigQuery上のテーブルにロードするジョブの設定
             // ここでは、テーブルデータを全て削除してからロードしている
             Builder loadConfig = LoadJobConfiguration.newBuilder(
                                      table.getTableId(), sourceUri);
             loadConfig.setWriteDisposition(JobInfo.WriteDisposition.WRITE_TRUNCATE);
             com.google.cloud.bigquery.CsvOptions.Builder csvOptions 
                      = CsvOptions.newBuilder();
             csvOptions.setSkipLeadingRows(1);
             loadConfig.setFormatOptions(csvOptions.build());

             JobId jobId = JobId.of(UUID.randomUUID().toString());
             Job loadJob = bigquery.create(JobInfo.newBuilder(loadConfig.build())
                                      .setJobId(jobId).build());

             try {
                  // ジョブを実行し、終了を待つ
                  System.out.printf("Starting job %s\n", jobId.getJob());
                  loadJob = loadJob.waitFor();

                  // BigQuery salesテーブルからデータを抽出するジョブの
                  // エラーチェック
                  if (loadJob == null) {
                       throw new RuntimeException("Job no longer exists");
                  } else if (loadJob.getStatus().getError() != null) {
                       throw new RuntimeException(loadJob.getStatus()
                                                       .getError().toString());
                  }

                  // ジョブの完了メッセージを出力
                  System.out.println("Job Finished.");

             } catch (Exception e) {
                  System.out.println(e);
             }
       }

}

5) BigQueryのsalesテーブルのデータを、JSONファイル(sales.json)に出力する、SelectFromSales.javaは以下の通り

package test;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.UUID;

import org.json.JSONException;
import org.json.JSONObject;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.FieldValueList;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.TableResult;

public class SelectFromSales {

     public static void main(String[] args) {

            // BigQuery salesテーブルからデータを抽出
            BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
            QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(
                                "SELECT "
                                    + " sale_date "
                                    + ", product_name "
                                    + ", place_name "
                                    + ", sales_amount "
                                    + "FROM `(プロジェクトID).bigquery_purin_it.sales` ")
                                .setUseLegacySql(false)
                                .build();

            // BigQuery salesテーブルからデータを抽出するジョブを生成
            JobId jobId = JobId.of(UUID.randomUUID().toString());
            Job queryJob = bigquery.create(JobInfo.newBuilder(queryConfig)
                                   .setJobId(jobId).build());

            try {
                 // BigQuery salesテーブルからデータを抽出するジョブの終了を待機
                 queryJob = queryJob.waitFor();

                 // BigQuery salesテーブルからデータを抽出するジョブの
                 // エラーチェック
                 if (queryJob == null) {
                       throw new RuntimeException("Job no longer exists");
                 } else if (queryJob.getStatus().getError() != null) {
                       throw new RuntimeException(queryJob.getStatus()
                                                 .getError().toString());
                 }

                 // 抽出した結果をJSONファイルに出力
                 TableResult result = queryJob.getQueryResults();
                 new SelectFromSales().putJsonFile(result);
                 System.out.println("JSONファイル出力完了");

             } catch (Exception e) {
                  System.out.println(e);
             }
	}

        private void putJsonFile(TableResult result)
                        throws JSONException, FileNotFoundException, IOException {

              // 抽出した結果をJSON形式に変換
              JSONObject jsonObj = new JSONObject();
              int idx = 0;
              for (FieldValueList row : result.iterateAll()) {
                   JSONObject jsonObjTmp = new JSONObject();
                   jsonObjTmp.put("sale_date"
                                   , row.get("sale_date").getStringValue());
                   jsonObjTmp.put("product_name"
                                   , row.get("product_name").getStringValue());
                   jsonObjTmp.put("place_name"
                                   , row.get("place_name").getStringValue());
                   jsonObjTmp.put("sales_amount"
                                  , row.get("sales_amount").getLongValue());

                   idx += 1;
                   jsonObj.put(String.valueOf(idx), jsonObjTmp);
              }

              // JSONファイルを出力
              String jsonFilePath = "c:\\work\\gcp\\";
              String jsonFileName = "sales.json";
              PrintWriter pw = new PrintWriter(jsonFilePath + jsonFileName, "utf-8");
              pw.write(jsonObj.toString(4));
              pw.close();
       }

}

なお、mavenプロジェクトの作成・実行等については、以下の記事を参照のこと。

JavaでSQL Serverにデータロードするプログラムを作成した(ソースコード編)今回も引き続き、Javaで作成したSQL Serverにデータロードするプログラムについて記載する。ここでは、JavaでSQL Serv...

作成したJava1.8の実行結果

作成したJava1.8について、maven installにより作成された「select-from-sales-0.0.1-SNAPSHOT-jar-with-dependencies.jar」をコマンドプロンプト上で実行すると、以下のようになる。
Java_実行結果

なお、GCS上に配置したファイル、BigQuery上のテーブル、出力されたJSONファイルについては、以下の記事の通り。

GCSとBigQueryで連動するプログラムを作成してみた(1)GCS(Google Cloud Storage)とBigQueryで連動するプログラムを作成したので共有する。 GCSは、ファイ...