今回も引き続き、GCSとBigQueryで連動するプログラムについて述べる。ここでは、GCS(Google Cloud Storage)とBigQueryで連動する、作成したJava 1.8のプログラムの紹介を行う。
前提条件
以下の記事での環境構築が完了していること
GCSとBigQueryで連動するプログラムを作成してみた(2)今回も引き続き、GCSとBigQueryで連動するプログラムについて述べる。ここでは、Windows 10が入っているローカル環境にて、...
また、以下の記事での「Eclipseのダウンロードと解凍」が完了していること
JavaでSQL Serverにデータロードするプログラムを作成した(環境構築編)今回も引き続き、Javaで作成したSQL Serverにデータロードするプログラムについて記載する。ここでは、JavaでSQL Serv...
作成したJava 1.8のプログラム
ここでは、作成したJava 1.8のプログラムを紹介する。
1) 以下の構成でのMavenプロジェクト「select_from_sales」を作成する
2) pom.xmlの内容は以下の通り
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>test.bigquery.select</groupId> <artifactId>select-from-sales</artifactId> <version>0.0.1-SNAPSHOT</version> <!-- 文字コードとJavaのバージョンの設定 --> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <java.version>1.8</java.version> </properties> <!-- プラグインの設定 --> <build> <plugins> <!-- Javaファイルのコンパイラの設定 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.6.1</version> <configuration> <source>${java.version}</source> <target>${java.version}</target> </configuration> </plugin> <!-- プロジェクトと依存するライブラリを1つにまとめる設定 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>2.6</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <!-- メインプログラムとして実行するクラスの指定 --> <mainClass>test.TestBigQuery</mainClass> </manifest> </archive> </configuration> <executions> <execution> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> <!-- ライブラリ依存関係の設定 --> <dependencies> <dependency> <!-- BigQuery用ライブラリを追加する設定 --> <groupId>com.google.cloud</groupId> <artifactId>google-cloud-bigquery</artifactId> <version>1.49.0</version> </dependency> <dependency> <!-- JSON用ライブラリを追加する設定 --> <groupId>org.json</groupId> <artifactId>json</artifactId> <version>20180813</version> </dependency> </dependencies> </project>
3) メインとなるTestBigQuery.javaは以下の通り。ここで、次項で述べる4)5)を順に呼び出している
package test; public class TestBigQuery { public static void main(String[] args) { InsertIntoSales.main(args); SelectFromSales.main(args); } }
4) GCS上のCSVファイル(insert_bigquery_sales.csv)のデータを、BigQueryのsalesテーブルに追加する、InsertIntoSales.javaは以下の通り
package test; import java.util.UUID; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.BigQueryOptions; import com.google.cloud.bigquery.CsvOptions; import com.google.cloud.bigquery.Job; import com.google.cloud.bigquery.JobId; import com.google.cloud.bigquery.JobInfo; import com.google.cloud.bigquery.LoadJobConfiguration; import com.google.cloud.bigquery.LoadJobConfiguration.Builder; import com.google.cloud.bigquery.Table; public class InsertIntoSales { public static void main(String[] args) { // ロードするGCS上のファイルとBigQuery上のデータセットとテーブルの設定 String sourceUri = "gs://test_purin_bucket/insert_bigquery_sales.csv"; String dataSet = "bigquery_purin_it"; String tableName = "sales"; BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService(); Table table = bigquery.getTable(dataSet, tableName); // BigQuery上のテーブルにロードするジョブの設定 // ここでは、テーブルデータを全て削除してからロードしている Builder loadConfig = LoadJobConfiguration.newBuilder( table.getTableId(), sourceUri); loadConfig.setWriteDisposition(JobInfo.WriteDisposition.WRITE_TRUNCATE); com.google.cloud.bigquery.CsvOptions.Builder csvOptions = CsvOptions.newBuilder(); csvOptions.setSkipLeadingRows(1); loadConfig.setFormatOptions(csvOptions.build()); JobId jobId = JobId.of(UUID.randomUUID().toString()); Job loadJob = bigquery.create(JobInfo.newBuilder(loadConfig.build()) .setJobId(jobId).build()); try { // ジョブを実行し、終了を待つ System.out.printf("Starting job %s\n", jobId.getJob()); loadJob = loadJob.waitFor(); // BigQuery salesテーブルからデータを抽出するジョブの // エラーチェック if (loadJob == null) { throw new RuntimeException("Job no longer exists"); } else if (loadJob.getStatus().getError() != null) { throw new RuntimeException(loadJob.getStatus() .getError().toString()); } // ジョブの完了メッセージを出力 System.out.println("Job Finished."); } catch (Exception e) { System.out.println(e); } } }
5) BigQueryのsalesテーブルのデータを、JSONファイル(sales.json)に出力する、SelectFromSales.javaは以下の通り
package test; import java.io.FileNotFoundException; import java.io.IOException; import java.io.PrintWriter; import java.util.UUID; import org.json.JSONException; import org.json.JSONObject; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.BigQueryOptions; import com.google.cloud.bigquery.FieldValueList; import com.google.cloud.bigquery.Job; import com.google.cloud.bigquery.JobId; import com.google.cloud.bigquery.JobInfo; import com.google.cloud.bigquery.QueryJobConfiguration; import com.google.cloud.bigquery.TableResult; public class SelectFromSales { public static void main(String[] args) { // BigQuery salesテーブルからデータを抽出 BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService(); QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder( "SELECT " + " sale_date " + ", product_name " + ", place_name " + ", sales_amount " + "FROM `(プロジェクトID).bigquery_purin_it.sales` ") .setUseLegacySql(false) .build(); // BigQuery salesテーブルからデータを抽出するジョブを生成 JobId jobId = JobId.of(UUID.randomUUID().toString()); Job queryJob = bigquery.create(JobInfo.newBuilder(queryConfig) .setJobId(jobId).build()); try { // BigQuery salesテーブルからデータを抽出するジョブの終了を待機 queryJob = queryJob.waitFor(); // BigQuery salesテーブルからデータを抽出するジョブの // エラーチェック if (queryJob == null) { throw new RuntimeException("Job no longer exists"); } else if (queryJob.getStatus().getError() != null) { throw new RuntimeException(queryJob.getStatus() .getError().toString()); } // 抽出した結果をJSONファイルに出力 TableResult result = queryJob.getQueryResults(); new SelectFromSales().putJsonFile(result); System.out.println("JSONファイル出力完了"); } catch (Exception e) { System.out.println(e); } } private void putJsonFile(TableResult result) throws JSONException, FileNotFoundException, IOException { // 抽出した結果をJSON形式に変換 JSONObject jsonObj = new JSONObject(); int idx = 0; for (FieldValueList row : result.iterateAll()) { JSONObject jsonObjTmp = new JSONObject(); jsonObjTmp.put("sale_date" , row.get("sale_date").getStringValue()); jsonObjTmp.put("product_name" , row.get("product_name").getStringValue()); jsonObjTmp.put("place_name" , row.get("place_name").getStringValue()); jsonObjTmp.put("sales_amount" , row.get("sales_amount").getLongValue()); idx += 1; jsonObj.put(String.valueOf(idx), jsonObjTmp); } // JSONファイルを出力 String jsonFilePath = "c:\\work\\gcp\\"; String jsonFileName = "sales.json"; PrintWriter pw = new PrintWriter(jsonFilePath + jsonFileName, "utf-8"); pw.write(jsonObj.toString(4)); pw.close(); } }
なお、mavenプロジェクトの作成・実行等については、以下の記事を参照のこと。
JavaでSQL Serverにデータロードするプログラムを作成した(ソースコード編)今回も引き続き、Javaで作成したSQL Serverにデータロードするプログラムについて記載する。ここでは、JavaでSQL Serv...
作成したJava1.8の実行結果
作成したJava1.8について、maven installにより作成された「select-from-sales-0.0.1-SNAPSHOT-jar-with-dependencies.jar」をコマンドプロンプト上で実行すると、以下のようになる。
なお、GCS上に配置したファイル、BigQuery上のテーブル、出力されたJSONファイルについては、以下の記事の通り。
GCSとBigQueryで連動するプログラムを作成してみた(1)GCS(Google Cloud Storage)とBigQueryで連動するプログラムを作成したので共有する。 GCSは、ファイ...