Timer Triggerによって、一定時間が来たタイミングでAzure Functionsが動作するアプリケーションを生成し、そのバッチ処理内部で、Spring Batch(Taskletモデル)を利用することができる。
今回は、Spring Batch(Taskletモデル)内に、Azure Storageに配置したCSVファイルを読み取って、そのファイルの内容をSQL Database上のテーブルに書き込む処理を追加してみたので、そのサンプルプログラムを共有する。
前提条件
下記記事のサンプルプログラムを作成済であること。
作成したサンプルプログラムの修正
前提条件の記事のサンプルプログラムに、CSVファイルの内容をDBに書き込む処理を追加する。なお、下記の赤枠は、前提条件のプログラムから変更したり、追加したりしたプログラムである。
pom.xmlの追加内容は以下の通りで、Azure StorageやSQL Database(SQL Server)、MyBatisを利用するための設定を追加している。
<!-- Azure Storageの設定 --> <dependency> <groupId>com.microsoft.azure</groupId> <artifactId>azure-storage</artifactId> <version>8.3.0</version> </dependency> <!-- SQL Serverを利用するための設定 --> <dependency> <groupId>com.microsoft.sqlserver</groupId> <artifactId>mssql-jdbc</artifactId> </dependency> <!-- MyBatisを利用するための設定 --> <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>2.1.1</version> </dependency>
Azure Functionsのメインクラスの内容は以下の通りで、Spring Batchで実施されるデータソースの自動設定を除外していたのを削除している。
package com.example; import java.util.function.Function; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import com.example.model.TimerTriggerParam; import com.example.model.TimerTriggerResult; import com.example.service.TimerTriggerService; @SpringBootApplication public class DemoAzureFunction { /** タイマートリガーのテストを行うサービスクラスのオブジェクト */ @Autowired private TimerTriggerService timerTriggerService; public static void main(String[] args) throws Exception { SpringApplication.run(DemoAzureFunction.class, args); } /** * タイマートリガーのテストを行い結果を返却する関数 * @return タイマートリガーのテストを行うサービスクラスの呼出結果 */ @Bean public Function<TimerTriggerParam, TimerTriggerResult> timerTriggerTest() { return timerTriggerParam -> timerTriggerService.timerTriggerTest(timerTriggerParam); } }
Taskletクラスの内容は以下の通りで、CSVファイルの内容をDBに書き込むサービス(DemoBatchService)の呼び出しを追加している。
package com.example.batch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.core.StepContribution; import org.springframework.batch.core.scope.context.ChunkContext; import org.springframework.batch.repeat.RepeatStatus; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import com.example.model.TimerTriggerParam; import com.example.service.DemoBatchService; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.batch.core.step.tasklet.Tasklet; @Component public class DemoTasklet implements Tasklet { /* Spring Bootでログ出力するためのLogbackのクラスを生成 */ private static final Logger LOGGER = LoggerFactory.getLogger(DemoTasklet.class); /** * BlobStorageからファイル(user_data.csv)を読み込み、USER_DATAテーブルに書き込むサービス */ @Autowired private DemoBatchService demoBatchService; /** * Spring Batchのジョブ内での処理を定義する. */ @Override public RepeatStatus execute(StepContribution contribution , ChunkContext chunkContext) throws Exception { // Spring Batchのジョブ内での処理が呼び出されたことをログ出力する String paramStr = chunkContext.getStepContext().getStepExecution() .getJobParameters().getString("timerTriggerParam"); if (paramStr != null) { TimerTriggerParam param = new ObjectMapper().readValue( paramStr, new TypeReference<TimerTriggerParam>() { }); LOGGER.info("DemoTasklet execute " + " triggered: " + param.getTimerInfo()); } // BlobStorageからファイル(user_data.csv)を読み込み、 // USER_DATAテーブルに書き込むサービスを呼び出す demoBatchService.readUserData(); // ジョブが終了したことを返す return RepeatStatus.FINISHED; } }
application.propertiesの内容は以下の通りで、Azure StorageとSQL Databaseへの接続設定を追加している。
# Azure Storageの接続先 azure.storage.accountName=azureblobpurinit azure.storage.accessKey=(Azure Blob Storageのアクセスキー) azure.storage.containerName=blobcontainer # DB接続設定 spring.datasource.url=jdbc:sqlserver://azure-db-purinit.database.windows.net:1433;database=azureSqlDatabase spring.datasource.username=purinit@azure-db-purinit spring.datasource.password=(DBのパスワード) spring.datasource.driverClassName=com.microsoft.sqlserver.jdbc.SQLServerDriver
CSVファイルの内容をDBに書き込むサービスの内容は以下の通り。CSVファイルのチェック処理もここに追加している。
package com.example.service; import java.io.BufferedReader; import java.io.InputStream; import java.io.InputStreamReader; import java.net.URISyntaxException; import java.security.InvalidKeyException; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import com.example.mybatis.UserDataMapper; import com.example.mybatis.model.UserData; import com.example.util.DemoStringUtil; import com.microsoft.azure.storage.CloudStorageAccount; import com.microsoft.azure.storage.StorageException; import com.microsoft.azure.storage.blob.CloudBlobClient; import com.microsoft.azure.storage.blob.CloudBlobContainer; import com.microsoft.azure.storage.blob.CloudBlockBlob; @Service public class DemoBatchService { /* Spring Bootでログ出力するためのLogbackのクラスを生成 */ private static final Logger LOGGER = LoggerFactory.getLogger(DemoBatchService.class); /** Azure Storageのアカウント名 */ @Value("${azure.storage.accountName}") private String storageAccountName; /** Azure Storageへのアクセスキー */ @Value("${azure.storage.accessKey}") private String storageAccessKey; /** Azure StorageのBlobコンテナー名 */ @Value("${azure.storage.containerName}") private String storageContainerName; /** USER_DATAテーブルにアクセスするマッパー */ @Autowired private UserDataMapper userDataMapper; /** * BlobStorageからファイル(user_data.csv)を読み込み、USER_DATAテーブルに書き込む */ @Transactional public void readUserData() { // BlobStorageからファイル(user_data.csv)を読み込む try (BufferedReader br = new BufferedReader(new InputStreamReader(getBlobCsvData(), "UTF-8"))) { String lineStr = null; int lineCnt = 0; // 1行目(タイトル行)は読み飛ばし、2行目以降はチェックの上、 // USER_DATAテーブルに書き込む // チェックエラー時はエラーログを出力の上、DB更新は行わず先へ進む while ((lineStr = br.readLine()) != null) { // 1行目(タイトル行)は読み飛ばす lineCnt++; if (lineCnt == 1) { continue; } // 引数のCSVファイル1行分の文字列を受け取り、エラーがあればNULLを、 // エラーがなければUserDataオブジェクトに変換し返す UserData userData = checkData(lineStr, lineCnt); // 読み込んだファイルをUSER_DATAテーブルに書き込む if (userData != null) { userDataMapper.upsert(userData); } } } catch (Exception ex) { LOGGER.error(ex.getMessage()); throw new RuntimeException(ex); } } /** * Blobストレージからファイルデータ(user_data.csv)を取得する. * @return ファイルデータ(user_data.csv)の入力ストリーム * @throws URISyntaxException * @throws InvalidKeyException * @throws StorageException */ private InputStream getBlobCsvData() throws URISyntaxException, InvalidKeyException, StorageException { // Blobストレージへの接続文字列 String storageConnectionString = "DefaultEndpointsProtocol=https;" + "AccountName=" + storageAccountName + ";" + "AccountKey=" + storageAccessKey + ";"; // ストレージアカウントオブジェクトを取得 CloudStorageAccount storageAccount = CloudStorageAccount.parse(storageConnectionString); // Blobクライアントオブジェクトを取得 CloudBlobClient blobClient = storageAccount.createCloudBlobClient(); // Blob内のコンテナーを取得 CloudBlobContainer container = blobClient.getContainerReference(storageContainerName); // BlobStorageからファイル(user_data.csv)を読み込む CloudBlockBlob blob = container.getBlockBlobReference("user_data.csv"); return blob.openInputStream(); } /** * 引数のCSVファイル1行分の文字列を受け取り、エラーがあればNULLを、 * エラーがなければUserDataオブジェクトに変換し返す. * @param lineStr CSVファイル1行分の文字列 * @param lineCnt 行数 * @return 変換後のUserData */ private UserData checkData(String lineStr, int lineCnt) { // 引数のCSVファイル1行分の文字列をカンマで分割 String[] strArray = lineStr.split(","); // 桁数不正の場合はエラー if (strArray == null || strArray.length != 7) { LOGGER.info(lineCnt + "行目: 桁数が不正です。"); return null; } // 文字列前後のダブルクォーテーションを削除する for (int i = 0; i < strArray.length; i++) { strArray[i] = DemoStringUtil.trimDoubleQuot(strArray[i]); } // 1列目が空またはNULLの場合はエラー if (StringUtils.isEmpty(strArray[0])) { LOGGER.info(lineCnt + "行目: 1列目が空またはNULLです。"); return null; } // 1列目が数値以外の場合はエラー if (!StringUtils.isNumeric(strArray[0])) { LOGGER.info(lineCnt + "行目: 1列目が数値以外です。"); return null; } // 1列目の桁数が不正な場合はエラー if (strArray[0].length() > 6) { LOGGER.info(lineCnt + "行目: 1列目の桁数が不正です。"); return null; } // 2列目が空またはNULLの場合はエラー if (StringUtils.isEmpty(strArray[1])) { LOGGER.info(lineCnt + "行目: 2列目が空またはNULLです。"); return null; } // 2列目の桁数が不正な場合はエラー if (strArray[1].length() > 40) { LOGGER.info(lineCnt + "行目: 2列目の桁数が不正です。"); return null; } // 3列目が空またはNULLの場合はエラー if (StringUtils.isEmpty(strArray[2])) { LOGGER.info(lineCnt + "行目: 3列目が空またはNULLです。"); return null; } // 3列目が数値以外の場合はエラー if (!StringUtils.isNumeric(strArray[2])) { LOGGER.info(lineCnt + "行目: 3列目が数値以外です。"); return null; } // 3列目の桁数が不正な場合はエラー if (strArray[2].length() > 4) { LOGGER.info(lineCnt + "行目: 3列目の桁数が不正です。"); return null; } // 4列目が空またはNULLの場合はエラー if (StringUtils.isEmpty(strArray[3])) { LOGGER.info(lineCnt + "行目: 4列目が空またはNULLです。"); return null; } // 4列目が数値以外の場合はエラー if (!StringUtils.isNumeric(strArray[3])) { LOGGER.info(lineCnt + "行目: 4列目が数値以外です。"); return null; } // 4列目の桁数が不正な場合はエラー if (strArray[3].length() > 2) { LOGGER.info(lineCnt + "行目: 4列目の桁数が不正です。"); return null; } // 5列目が空またはNULLの場合はエラー if (StringUtils.isEmpty(strArray[4])) { LOGGER.info(lineCnt + "行目: 5列目が空またはNULLです。"); return null; } // 5列目が数値以外の場合はエラー if (!StringUtils.isNumeric(strArray[4])) { LOGGER.info(lineCnt + "行目: 5列目が数値以外です。"); return null; } // 5列目の桁数が不正な場合はエラー if (strArray[4].length() > 2) { LOGGER.info(lineCnt + "行目: 5列目の桁数が不正です。"); return null; } // 3列目・4列目・5列目から生成される日付が不正であればエラー String birthDay = strArray[2] + DemoStringUtil.addZero(strArray[3]) + DemoStringUtil.addZero(strArray[4]); if (!DemoStringUtil.isCorrectDate(birthDay, "uuuuMMdd")) { LOGGER.info(lineCnt + "行目: 3~5列目の日付が不正です。"); return null; } // 6列目が1,2以外の場合はエラー if (!("1".equals(strArray[5])) && !("2".equals(strArray[5]))) { LOGGER.info(lineCnt + "行目: 6列目の性別が不正です。"); return null; } // 7列目の桁数が不正な場合はエラー if (!StringUtils.isEmpty(strArray[6]) && strArray[6].length() > 1024) { LOGGER.info(lineCnt + "行目: 7列目の桁数が不正です。"); return null; } // エラーがなければUserDataオブジェクトに変換し返す UserData userData = new UserData(); userData.setId(Integer.parseInt(strArray[0])); userData.setName(strArray[1]); userData.setBirth_year(Integer.parseInt(strArray[2])); userData.setBirth_month(Integer.parseInt(strArray[3])); userData.setBirth_day(Integer.parseInt(strArray[4])); userData.setSex(strArray[5]); userData.setMemo(strArray[6]); return userData; } }
上記サービスクラスから呼ばれる、文字列のチェックや加工を行うユーティリティクラスの内容は、以下の通り。
package com.example.util; import java.time.LocalDate; import java.time.format.DateTimeFormatter; import java.time.format.ResolverStyle; import org.apache.commons.lang3.StringUtils; public class DemoStringUtil { /** * 文字列前後のダブルクォーテーションを削除する. * @param str 変換前文字列 * @return 変換後文字列 */ public static String trimDoubleQuot(String regStr) { if (StringUtils.isEmpty(regStr)) { return regStr; } char c = '"'; if (regStr.charAt(0) == c && regStr.charAt(regStr.length() - 1) == c) { return regStr.substring(1, regStr.length() - 1); } else { return regStr; } } /** * DateTimeFormatterを利用して日付チェックを行う. * @param dateStr チェック対象文字列 * @param dateFormat 日付フォーマット * @return 日付チェック結果 */ public static boolean isCorrectDate(String dateStr, String dateFormat) { if (StringUtils.isEmpty(dateStr) || StringUtils.isEmpty(dateFormat)) { return false; } // 日付と時刻を厳密に解決するスタイルで、DateTimeFormatterオブジェクトを作成 DateTimeFormatter df = DateTimeFormatter.ofPattern(dateFormat) .withResolverStyle(ResolverStyle.STRICT); try { // チェック対象文字列をLocalDate型の日付に変換できれば、チェックOKとする LocalDate.parse(dateStr, df); return true; } catch (Exception e) { return false; } } /** * 数値文字列が1桁の場合、頭に0を付けて返す. * @param intNum 数値文字列 * @return 変換後数値文字列 */ public static String addZero(String intNum) { if (StringUtils.isEmpty(intNum)) { return intNum; } if (intNum.length() == 1) { return "0" + intNum; } return intNum; } }
SQL DatabaseのUSER_DATAテーブルにアクセスするためのEntityやMapperの内容は、以下の通り。
package com.example.mybatis.model; import lombok.Data; @Data public class UserData { /** ID */ private int id; /** 名前 */ private String name; /** 生年月日_年 */ private int birth_year; /** 生年月日_月 */ private int birth_month; /** 生年月日_日 */ private int birth_day; /** 性別 */ private String sex; /** メモ */ private String memo; }
package com.example.mybatis; import org.apache.ibatis.annotations.Mapper; import com.example.mybatis.model.UserData; @Mapper public interface UserDataMapper { /** * DBにUserDataオブジェクトがあれば更新し、なければ追加する. * @param userData UserDataオブジェクト */ void upsert(UserData userData); }
<?xml version="1.0" encoding="UTF-8" ?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> <mapper namespace="com.example.mybatis.UserDataMapper"> <update id="upsert" parameterType="com.example.mybatis.model.UserData"> MERGE INTO USER_DATA AS u USING ( SELECT #{id} id ) s ON ( u.id = s.id ) WHEN MATCHED THEN UPDATE SET name = #{name}, birth_year = #{birth_year} , birth_month = #{birth_month}, birth_day = #{birth_day} , sex = #{sex}, memo = #{memo} WHEN NOT MATCHED THEN INSERT ( id, name, birth_year, birth_month , birth_day, sex, memo ) VALUES (#{id}, #{name}, #{birth_year}, #{birth_month} , #{birth_day}, #{sex}, #{memo}) ; </update> </mapper>
なお、上記Mapperのupsertに定義したSQLは、USER_DATAテーブルの主キー(id)がある場合は更新、無い場合は追加を行う処理になっている。
その他のソースコード内容は、以下のサイトを参照のこと。
https://github.com/purin-it/azure/tree/master/timer-trigger-batch-csv-to-db/demoAzureFunc
また、ビルドとデプロイ手順については、以下の記事を参照のこと。
サンプルプログラムの実行結果(ローカル)
サンプルプログラムをローカルで実行した結果は、以下の通り。なお、CSVファイルの文字コードは、UTF-8とする。
1) 以下の、(エラーが発生しない)CSVファイルを、Azure Storage上の取り込み元になる場所に配置する。
2) SQLデータベース上のUSER_DATAテーブルを、以下のように、1件も存在しない状態に変更する。
3) ローカル環境で、Azure Functionsを「mvn azure-functions:run」コマンドで実行すると、1分毎に、CSVファイルをDBに書き込むバッチ処理が実行される。
4) バッチ実行後は、SQLデータベース上のUSER_DATAテーブルに、取り込んだCSVファイルが書き込まれることが確認できる。
5) SQLデータベースのNAMEの設定値を、以下のように「てすと」と変更する。
6) ローカル環境で、Azure Functionsを「mvn azure-functions:run」コマンドで再度実行する。
7) バッチ実行後は、SQLデータベース上のUSER_DATAテーブルに、取り込んだCSVファイルによって、NAMEの設定値が元に戻ることが確認できる。
8) 以下の、赤枠でエラーが発生するCSVファイルを、Azure Storage上の取り込み元になる場所に配置する。
9) SQLデータベース上のUSER_DATAテーブルを、以下のように、1件も存在しない状態に変更する。
10) ローカル環境で、Azure Functionsを「mvn azure-functions:run」コマンドで再度実行すると、以下の赤枠のように、エラーメッセージがログに出力されることが確認できる。
11) バッチ実行後は、SQLデータベース上のUSER_DATAテーブルに、エラーでないid=3のデータのみ取り込まれたことが確認できる。
Azure Functions 環境変数の設定
今回は、エラーログで日本語を利用しているが、このままでは日本語が文字化けするため、アプリケーション設定から環境変数を追加する。
1) Azure PortalでAzure Functionsを開き、「構成」メニューを選択する。
2) 名前に「Java_OPTS」、値に「Dfile.encoding=UTF-8」をもつ環境変数を追加する。
なお、上記内容は、以下のサイトを参考に設定している。
https://kisk0419.hatenablog.com/entry/2019/11/08/104651
また、Application Insightsでのログ確認手順は、以下の記事の「サンプルプログラムの実行結果(Azure上)」の項番6以降を参照のこと。
サンプルプログラムの実行結果(Azure上)
サンプルプログラムをAzure上で実行した結果は、以下の通り。なお、CSVファイルの文字コードは、UTF-8とする。
1) 以下の、(エラーが発生しない)CSVファイルを、Azure Storage上の取り込み元になる場所に配置する。
2) SQLデータベース上のUSER_DATAテーブルを、以下のように、1件も存在しない状態に変更する。
3) Azure上に、Azure Functionsをデプロイし、CSVファイルをDBに書き込むバッチ処理が実行されたログを確認した結果は、以下の通り。
4) ApplicationInsightsでログを確認した結果は、以下の通り。
5) バッチ実行後は、SQLデータベース上のUSER_DATAテーブルに、取り込んだCSVファイルが書き込まれることが確認できる。なお、下記はデータ追加の例であるが、データ更新についても同じように実行できる。
6) 以下の、赤枠でエラーが発生するCSVファイルを、Azure Storage上の取り込み元になる場所に配置する。
7) SQLデータベース上のUSER_DATAテーブルを、以下のように、1件も存在しない状態に変更する。
8) Azureで、CSVファイルをDBに書き込むバッチ処理が実行されたログを確認した結果は、以下の通り。
9) ApplicationInsightsでログを確認した結果は、以下の通りで、赤枠のように、エラーメッセージがログに出力されることが確認できる。
10) バッチ実行後は、SQLデータベース上のUSER_DATAテーブルに、エラーでないid=3のデータのみ取り込まれたことが確認できる。
要点まとめ
- Timer Triggerを利用したAzure Functions内でSpring Batch(Taskletモデル)を利用でき、その中でAzure Storageに配置したCSVファイルを読み取って、そのファイルの内容をSQL Database上のテーブルに書き込む処理を実装できる。