Timer Triggerによって、一定時間が来たタイミングでAzure Functionsが動作するアプリケーションを生成し、そのバッチ処理内部で、Spring Batch(Taskletモデル)を利用することができる。
今回は、Spring Batch(Taskletモデル)内に、Azure Storageに配置したCSVファイルを読み取って、そのファイルの内容をSQL Database上のテーブルに書き込む処理を追加してみたので、そのサンプルプログラムを共有する。
前提条件
下記記事のサンプルプログラムを作成済であること。
作成したサンプルプログラムの修正
前提条件の記事のサンプルプログラムに、CSVファイルの内容をDBに書き込む処理を追加する。なお、下記の赤枠は、前提条件のプログラムから変更したり、追加したりしたプログラムである。
pom.xmlの追加内容は以下の通りで、Azure StorageやSQL Database(SQL Server)、MyBatisを利用するための設定を追加している。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | <!-- Azure Storageの設定 --> <dependency> <groupId>com.microsoft.azure</groupId> <artifactId>azure-storage</artifactId> <version>8.3.0</version> </dependency> <!-- SQL Serverを利用するための設定 --> <dependency> <groupId>com.microsoft.sqlserver</groupId> <artifactId>mssql-jdbc</artifactId> </dependency> <!-- MyBatisを利用するための設定 --> <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>2.1.1</version> </dependency> |
Azure Functionsのメインクラスの内容は以下の通りで、Spring Batchで実施されるデータソースの自動設定を除外していたのを削除している。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 | package com.example; import java.util.function.Function; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import com.example.model.TimerTriggerParam; import com.example.model.TimerTriggerResult; import com.example.service.TimerTriggerService; @SpringBootApplication public class DemoAzureFunction { /** タイマートリガーのテストを行うサービスクラスのオブジェクト */ @Autowired private TimerTriggerService timerTriggerService; public static void main(String[] args) throws Exception { SpringApplication.run(DemoAzureFunction.class, args); } /** * タイマートリガーのテストを行い結果を返却する関数 * @return タイマートリガーのテストを行うサービスクラスの呼出結果 */ @Bean public Function<TimerTriggerParam, TimerTriggerResult> timerTriggerTest() { return timerTriggerParam -> timerTriggerService.timerTriggerTest(timerTriggerParam); } } |
Taskletクラスの内容は以下の通りで、CSVファイルの内容をDBに書き込むサービス(DemoBatchService)の呼び出しを追加している。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 | package com.example.batch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.core.StepContribution; import org.springframework.batch.core.scope.context.ChunkContext; import org.springframework.batch.repeat.RepeatStatus; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import com.example.model.TimerTriggerParam; import com.example.service.DemoBatchService; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.batch.core.step.tasklet.Tasklet; @Component public class DemoTasklet implements Tasklet { /* Spring Bootでログ出力するためのLogbackのクラスを生成 */ private static final Logger LOGGER = LoggerFactory.getLogger(DemoTasklet.class); /** * BlobStorageからファイル(user_data.csv)を読み込み、USER_DATAテーブルに書き込むサービス */ @Autowired private DemoBatchService demoBatchService; /** * Spring Batchのジョブ内での処理を定義する. */ @Override public RepeatStatus execute(StepContribution contribution , ChunkContext chunkContext) throws Exception { // Spring Batchのジョブ内での処理が呼び出されたことをログ出力する String paramStr = chunkContext.getStepContext().getStepExecution() .getJobParameters().getString("timerTriggerParam"); if (paramStr != null) { TimerTriggerParam param = new ObjectMapper().readValue( paramStr, new TypeReference<TimerTriggerParam>() { }); LOGGER.info("DemoTasklet execute " + " triggered: " + param.getTimerInfo()); } // BlobStorageからファイル(user_data.csv)を読み込み、 // USER_DATAテーブルに書き込むサービスを呼び出す demoBatchService.readUserData(); // ジョブが終了したことを返す return RepeatStatus.FINISHED; } } |
application.propertiesの内容は以下の通りで、Azure StorageとSQL Databaseへの接続設定を追加している。
1 2 3 4 5 6 7 8 9 10 | # Azure Storageの接続先 azure.storage.accountName=azureblobpurinit azure.storage.accessKey=(Azure Blob Storageのアクセスキー) azure.storage.containerName=blobcontainer # DB接続設定 spring.datasource.url=jdbc:sqlserver://azure-db-purinit.database.windows.net:1433;database=azureSqlDatabase spring.datasource.username=purinit@azure-db-purinit spring.datasource.password=(DBのパスワード) spring.datasource.driverClassName=com.microsoft.sqlserver.jdbc.SQLServerDriver |
CSVファイルの内容をDBに書き込むサービスの内容は以下の通り。CSVファイルのチェック処理もここに追加している。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 | package com.example.service; import java.io.BufferedReader; import java.io.InputStream; import java.io.InputStreamReader; import java.net.URISyntaxException; import java.security.InvalidKeyException; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import com.example.mybatis.UserDataMapper; import com.example.mybatis.model.UserData; import com.example.util.DemoStringUtil; import com.microsoft.azure.storage.CloudStorageAccount; import com.microsoft.azure.storage.StorageException; import com.microsoft.azure.storage.blob.CloudBlobClient; import com.microsoft.azure.storage.blob.CloudBlobContainer; import com.microsoft.azure.storage.blob.CloudBlockBlob; @Service public class DemoBatchService { /* Spring Bootでログ出力するためのLogbackのクラスを生成 */ private static final Logger LOGGER = LoggerFactory.getLogger(DemoBatchService.class); /** Azure Storageのアカウント名 */ @Value("${azure.storage.accountName}") private String storageAccountName; /** Azure Storageへのアクセスキー */ @Value("${azure.storage.accessKey}") private String storageAccessKey; /** Azure StorageのBlobコンテナー名 */ @Value("${azure.storage.containerName}") private String storageContainerName; /** USER_DATAテーブルにアクセスするマッパー */ @Autowired private UserDataMapper userDataMapper; /** * BlobStorageからファイル(user_data.csv)を読み込み、USER_DATAテーブルに書き込む */ @Transactional public void readUserData() { // BlobStorageからファイル(user_data.csv)を読み込む try (BufferedReader br = new BufferedReader(new InputStreamReader(getBlobCsvData(), "UTF-8"))) { String lineStr = null; int lineCnt = 0; // 1行目(タイトル行)は読み飛ばし、2行目以降はチェックの上、 // USER_DATAテーブルに書き込む // チェックエラー時はエラーログを出力の上、DB更新は行わず先へ進む while ((lineStr = br.readLine()) != null) { // 1行目(タイトル行)は読み飛ばす lineCnt++; if (lineCnt == 1) { continue; } // 引数のCSVファイル1行分の文字列を受け取り、エラーがあればNULLを、 // エラーがなければUserDataオブジェクトに変換し返す UserData userData = checkData(lineStr, lineCnt); // 読み込んだファイルをUSER_DATAテーブルに書き込む if (userData != null) { userDataMapper.upsert(userData); } } } catch (Exception ex) { LOGGER.error(ex.getMessage()); throw new RuntimeException(ex); } } /** * Blobストレージからファイルデータ(user_data.csv)を取得する. * @return ファイルデータ(user_data.csv)の入力ストリーム * @throws URISyntaxException * @throws InvalidKeyException * @throws StorageException */ private InputStream getBlobCsvData() throws URISyntaxException, InvalidKeyException, StorageException { // Blobストレージへの接続文字列 String storageConnectionString = "DefaultEndpointsProtocol=https;" + "AccountName=" + storageAccountName + ";" + "AccountKey=" + storageAccessKey + ";"; // ストレージアカウントオブジェクトを取得 CloudStorageAccount storageAccount = CloudStorageAccount.parse(storageConnectionString); // Blobクライアントオブジェクトを取得 CloudBlobClient blobClient = storageAccount.createCloudBlobClient(); // Blob内のコンテナーを取得 CloudBlobContainer container = blobClient.getContainerReference(storageContainerName); // BlobStorageからファイル(user_data.csv)を読み込む CloudBlockBlob blob = container.getBlockBlobReference("user_data.csv"); return blob.openInputStream(); } /** * 引数のCSVファイル1行分の文字列を受け取り、エラーがあればNULLを、 * エラーがなければUserDataオブジェクトに変換し返す. * @param lineStr CSVファイル1行分の文字列 * @param lineCnt 行数 * @return 変換後のUserData */ private UserData checkData(String lineStr, int lineCnt) { // 引数のCSVファイル1行分の文字列をカンマで分割 String[] strArray = lineStr.split(","); // 桁数不正の場合はエラー if (strArray == null || strArray.length != 7) { LOGGER.info(lineCnt + "行目: 桁数が不正です。"); return null; } // 文字列前後のダブルクォーテーションを削除する for (int i = 0; i < strArray.length; i++) { strArray[i] = DemoStringUtil.trimDoubleQuot(strArray[i]); } // 1列目が空またはNULLの場合はエラー if (StringUtils.isEmpty(strArray[0])) { LOGGER.info(lineCnt + "行目: 1列目が空またはNULLです。"); return null; } // 1列目が数値以外の場合はエラー if (!StringUtils.isNumeric(strArray[0])) { LOGGER.info(lineCnt + "行目: 1列目が数値以外です。"); return null; } // 1列目の桁数が不正な場合はエラー if (strArray[0].length() > 6) { LOGGER.info(lineCnt + "行目: 1列目の桁数が不正です。"); return null; } // 2列目が空またはNULLの場合はエラー if (StringUtils.isEmpty(strArray[1])) { LOGGER.info(lineCnt + "行目: 2列目が空またはNULLです。"); return null; } // 2列目の桁数が不正な場合はエラー if (strArray[1].length() > 40) { LOGGER.info(lineCnt + "行目: 2列目の桁数が不正です。"); return null; } // 3列目が空またはNULLの場合はエラー if (StringUtils.isEmpty(strArray[2])) { LOGGER.info(lineCnt + "行目: 3列目が空またはNULLです。"); return null; } // 3列目が数値以外の場合はエラー if (!StringUtils.isNumeric(strArray[2])) { LOGGER.info(lineCnt + "行目: 3列目が数値以外です。"); return null; } // 3列目の桁数が不正な場合はエラー if (strArray[2].length() > 4) { LOGGER.info(lineCnt + "行目: 3列目の桁数が不正です。"); return null; } // 4列目が空またはNULLの場合はエラー if (StringUtils.isEmpty(strArray[3])) { LOGGER.info(lineCnt + "行目: 4列目が空またはNULLです。"); return null; } // 4列目が数値以外の場合はエラー if (!StringUtils.isNumeric(strArray[3])) { LOGGER.info(lineCnt + "行目: 4列目が数値以外です。"); return null; } // 4列目の桁数が不正な場合はエラー if (strArray[3].length() > 2) { LOGGER.info(lineCnt + "行目: 4列目の桁数が不正です。"); return null; } // 5列目が空またはNULLの場合はエラー if (StringUtils.isEmpty(strArray[4])) { LOGGER.info(lineCnt + "行目: 5列目が空またはNULLです。"); return null; } // 5列目が数値以外の場合はエラー if (!StringUtils.isNumeric(strArray[4])) { LOGGER.info(lineCnt + "行目: 5列目が数値以外です。"); return null; } // 5列目の桁数が不正な場合はエラー if (strArray[4].length() > 2) { LOGGER.info(lineCnt + "行目: 5列目の桁数が不正です。"); return null; } // 3列目・4列目・5列目から生成される日付が不正であればエラー String birthDay = strArray[2] + DemoStringUtil.addZero(strArray[3]) + DemoStringUtil.addZero(strArray[4]); if (!DemoStringUtil.isCorrectDate(birthDay, "uuuuMMdd")) { LOGGER.info(lineCnt + "行目: 3~5列目の日付が不正です。"); return null; } // 6列目が1,2以外の場合はエラー if (!("1".equals(strArray[5])) && !("2".equals(strArray[5]))) { LOGGER.info(lineCnt + "行目: 6列目の性別が不正です。"); return null; } // 7列目の桁数が不正な場合はエラー if (!StringUtils.isEmpty(strArray[6]) && strArray[6].length() > 1024) { LOGGER.info(lineCnt + "行目: 7列目の桁数が不正です。"); return null; } // エラーがなければUserDataオブジェクトに変換し返す UserData userData = new UserData(); userData.setId(Integer.parseInt(strArray[0])); userData.setName(strArray[1]); userData.setBirth_year(Integer.parseInt(strArray[2])); userData.setBirth_month(Integer.parseInt(strArray[3])); userData.setBirth_day(Integer.parseInt(strArray[4])); userData.setSex(strArray[5]); userData.setMemo(strArray[6]); return userData; } } |
上記サービスクラスから呼ばれる、文字列のチェックや加工を行うユーティリティクラスの内容は、以下の通り。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 | package com.example.util; import java.time.LocalDate; import java.time.format.DateTimeFormatter; import java.time.format.ResolverStyle; import org.apache.commons.lang3.StringUtils; public class DemoStringUtil { /** * 文字列前後のダブルクォーテーションを削除する. * @param str 変換前文字列 * @return 変換後文字列 */ public static String trimDoubleQuot(String regStr) { if (StringUtils.isEmpty(regStr)) { return regStr; } char c = '"'; if (regStr.charAt(0) == c && regStr.charAt(regStr.length() - 1) == c) { return regStr.substring(1, regStr.length() - 1); } else { return regStr; } } /** * DateTimeFormatterを利用して日付チェックを行う. * @param dateStr チェック対象文字列 * @param dateFormat 日付フォーマット * @return 日付チェック結果 */ public static boolean isCorrectDate(String dateStr, String dateFormat) { if (StringUtils.isEmpty(dateStr) || StringUtils.isEmpty(dateFormat)) { return false; } // 日付と時刻を厳密に解決するスタイルで、DateTimeFormatterオブジェクトを作成 DateTimeFormatter df = DateTimeFormatter.ofPattern(dateFormat) .withResolverStyle(ResolverStyle.STRICT); try { // チェック対象文字列をLocalDate型の日付に変換できれば、チェックOKとする LocalDate.parse(dateStr, df); return true; } catch (Exception e) { return false; } } /** * 数値文字列が1桁の場合、頭に0を付けて返す. * @param intNum 数値文字列 * @return 変換後数値文字列 */ public static String addZero(String intNum) { if (StringUtils.isEmpty(intNum)) { return intNum; } if (intNum.length() == 1) { return "0" + intNum; } return intNum; } } |
SQL DatabaseのUSER_DATAテーブルにアクセスするためのEntityやMapperの内容は、以下の通り。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | package com.example.mybatis.model; import lombok.Data; @Data public class UserData { /** ID */ private int id; /** 名前 */ private String name; /** 生年月日_年 */ private int birth_year; /** 生年月日_月 */ private int birth_month; /** 生年月日_日 */ private int birth_day; /** 性別 */ private String sex; /** メモ */ private String memo; } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | package com.example.mybatis; import org.apache.ibatis.annotations.Mapper; import com.example.mybatis.model.UserData; @Mapper public interface UserDataMapper { /** * DBにUserDataオブジェクトがあれば更新し、なければ追加する. * @param userData UserDataオブジェクト */ void upsert(UserData userData); } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | <?xml version="1.0" encoding="UTF-8" ?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> <mapper namespace="com.example.mybatis.UserDataMapper"> <update id="upsert" parameterType="com.example.mybatis.model.UserData"> MERGE INTO USER_DATA AS u USING ( SELECT #{id} id ) s ON ( u.id = s.id ) WHEN MATCHED THEN UPDATE SET name = #{name}, birth_year = #{birth_year} , birth_month = #{birth_month}, birth_day = #{birth_day} , sex = #{sex}, memo = #{memo} WHEN NOT MATCHED THEN INSERT ( id, name, birth_year, birth_month , birth_day, sex, memo ) VALUES (#{id}, #{name}, #{birth_year}, #{birth_month} , #{birth_day}, #{sex}, #{memo}) ; </update> </mapper> |
なお、上記Mapperのupsertに定義したSQLは、USER_DATAテーブルの主キー(id)がある場合は更新、無い場合は追加を行う処理になっている。
その他のソースコード内容は、以下のサイトを参照のこと。
https://github.com/purin-it/azure/tree/master/timer-trigger-batch-csv-to-db/demoAzureFunc
また、ビルドとデプロイ手順については、以下の記事を参照のこと。
サンプルプログラムの実行結果(ローカル)
サンプルプログラムをローカルで実行した結果は、以下の通り。なお、CSVファイルの文字コードは、UTF-8とする。
1) 以下の、(エラーが発生しない)CSVファイルを、Azure Storage上の取り込み元になる場所に配置する。
2) SQLデータベース上のUSER_DATAテーブルを、以下のように、1件も存在しない状態に変更する。
3) ローカル環境で、Azure Functionsを「mvn azure-functions:run」コマンドで実行すると、1分毎に、CSVファイルをDBに書き込むバッチ処理が実行される。
4) バッチ実行後は、SQLデータベース上のUSER_DATAテーブルに、取り込んだCSVファイルが書き込まれることが確認できる。
5) SQLデータベースのNAMEの設定値を、以下のように「てすと」と変更する。
6) ローカル環境で、Azure Functionsを「mvn azure-functions:run」コマンドで再度実行する。
7) バッチ実行後は、SQLデータベース上のUSER_DATAテーブルに、取り込んだCSVファイルによって、NAMEの設定値が元に戻ることが確認できる。
8) 以下の、赤枠でエラーが発生するCSVファイルを、Azure Storage上の取り込み元になる場所に配置する。
9) SQLデータベース上のUSER_DATAテーブルを、以下のように、1件も存在しない状態に変更する。
10) ローカル環境で、Azure Functionsを「mvn azure-functions:run」コマンドで再度実行すると、以下の赤枠のように、エラーメッセージがログに出力されることが確認できる。
11) バッチ実行後は、SQLデータベース上のUSER_DATAテーブルに、エラーでないid=3のデータのみ取り込まれたことが確認できる。
Azure Functions 環境変数の設定
今回は、エラーログで日本語を利用しているが、このままでは日本語が文字化けするため、アプリケーション設定から環境変数を追加する。
1) Azure PortalでAzure Functionsを開き、「構成」メニューを選択する。
2) 名前に「Java_OPTS」、値に「Dfile.encoding=UTF-8」をもつ環境変数を追加する。
なお、上記内容は、以下のサイトを参考に設定している。
https://kisk0419.hatenablog.com/entry/2019/11/08/104651
また、Application Insightsでのログ確認手順は、以下の記事の「サンプルプログラムの実行結果(Azure上)」の項番6以降を参照のこと。
サンプルプログラムの実行結果(Azure上)
サンプルプログラムをAzure上で実行した結果は、以下の通り。なお、CSVファイルの文字コードは、UTF-8とする。
1) 以下の、(エラーが発生しない)CSVファイルを、Azure Storage上の取り込み元になる場所に配置する。
2) SQLデータベース上のUSER_DATAテーブルを、以下のように、1件も存在しない状態に変更する。
3) Azure上に、Azure Functionsをデプロイし、CSVファイルをDBに書き込むバッチ処理が実行されたログを確認した結果は、以下の通り。
4) ApplicationInsightsでログを確認した結果は、以下の通り。
5) バッチ実行後は、SQLデータベース上のUSER_DATAテーブルに、取り込んだCSVファイルが書き込まれることが確認できる。なお、下記はデータ追加の例であるが、データ更新についても同じように実行できる。
6) 以下の、赤枠でエラーが発生するCSVファイルを、Azure Storage上の取り込み元になる場所に配置する。
7) SQLデータベース上のUSER_DATAテーブルを、以下のように、1件も存在しない状態に変更する。
8) Azureで、CSVファイルをDBに書き込むバッチ処理が実行されたログを確認した結果は、以下の通り。
9) ApplicationInsightsでログを確認した結果は、以下の通りで、赤枠のように、エラーメッセージがログに出力されることが確認できる。
10) バッチ実行後は、SQLデータベース上のUSER_DATAテーブルに、エラーでないid=3のデータのみ取り込まれたことが確認できる。
要点まとめ
- Timer Triggerを利用したAzure Functions内でSpring Batch(Taskletモデル)を利用でき、その中でAzure Storageに配置したCSVファイルを読み取って、そのファイルの内容をSQL Database上のテーブルに書き込む処理を実装できる。