これまでこのブログでは、TimerTriggerを利用してBlob上のCSVファイルをDBに書き込む処理を作成していたが、EventGridTriggerを利用して、BlobにCSVファイルが作成されたタイミングで、そのCSVファイルをDBに書き込むこともできる。
今回は、EventGridTriggerを利用して、BlobにCSVファイルが作成されたタイミングで、そのCSVファイルをDBに書き込む処理を作成してみたので、そのサンプルプログラムを共有する。
なお、今回の記事は長くなるため、サンプルプログラムの内容とAzure上へのデプロイまでのみ記載し、Azure Portal上でのAzure Eventの設定とサンプルプログラムの実行結果の内容は、次回の記事で記載する。
前提条件
下記記事のサンプルプログラムを作成済であること。なお、読み込むCSVファイルの文字コードはUTF-8とする。
また、以下のように、サブスクリプションのEvent Grid リソース プロバイダー(Microsoft.EventGrid)が、有効になっていること。
サンプルプログラムの作成
前提条件の記事のサンプルプログラムを、EventGridTriggerによって起動するように修正する。なお、下記の赤枠は、前提条件のプログラムから大きく変更したり、追加したりしたプログラムである。
EventGridのハンドラークラスの内容は以下の通りで、EventGridTriggerアノテーションを付与したクラスで、EventGridTriggerのイベント情報を取得している。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 | package com.example; import java.time.LocalDateTime; import org.springframework.cloud.function.adapter.azure.FunctionInvoker; import com.example.model.EventGridTriggerParam; import com.example.model.EventGridTriggerResult; import com.example.model.EventSchema; import com.microsoft.azure.functions.ExecutionContext; import com.microsoft.azure.functions.annotation.EventGridTrigger; import com.microsoft.azure.functions.annotation.FunctionName; public class EventGridTriggerTestHandler extends FunctionInvoker<EventGridTriggerParam, EventGridTriggerResult> { /** * EventGridTriggerによって、DemoAzureFunctionクラスの * eventGridTriggerTestメソッドを呼び出す. * @param event EventGridTriggerイベント情報 * @param context コンテキストオブジェクト */ @FunctionName("eventGridTriggerTest") public void eventGridTriggerTest( @EventGridTrigger(name = "event") EventSchema event, final ExecutionContext context) { context.getLogger().info( "EventGridTriggerTestHandler eventGridTriggerTest triggered: " + event.eventTime); // EventGridTriggerイベント情報から、Blob Storageの読み込むファイル名を取得し、 // EventGridTriggerParamに設定する EventGridTriggerParam param = new EventGridTriggerParam(); String subject = event.subject; param.setFileName(subject.substring(subject.lastIndexOf("/") + 1)); param.setTimerInfo(LocalDateTime.now().toString()); handleRequest(param, context); } } |
また、EventGridTriggerのイベント情報は、以下のクラスで定義している。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 | package com.example.model; import java.util.Date; import java.util.Map; public class EventSchema { /** トピック */ public String topic; /** サブジェクト */ public String subject; /** イベントタイプ */ public String eventType; /** イベント発生日時 */ public Date eventTime; /** ID */ public String id; /** データのバージョン */ public String dataVersion; /** メタデータのバージョン */ public String metadataVersion; /** データ */ public Map<String, Object> data; } |
なお、ここまでの処理は、以下のサイトを参考にして実装している。
https://docs.microsoft.com/ja-jp/azure/azure-functions/functions-bindings-event-grid-trigger?tabs=java%2Cbash
さらに、EventGridTriggerのサービスクラス、Paramクラス、Resultクラスの内容は以下の通り。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 | package com.example.service; import java.util.HashMap; import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.core.Job; import org.springframework.batch.core.JobParameter; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.launch.JobLauncher; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import com.example.model.EventGridTriggerParam; import com.example.model.EventGridTriggerResult; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; @Service public class EventGridTriggerService { /* Spring Bootでログ出力するためのLogbackのクラスを生成 */ private static final Logger LOGGER = LoggerFactory.getLogger(EventGridTriggerService.class); /** Spring Batchのジョブを起動するクラス */ @Autowired JobLauncher jobLauncher; /** Spring Batchのジョブを定義するクラス */ @Autowired Job job; /** * イベントグリッドトリガーのテストを行うサービス. * @param param EventGridTrigger呼出用Param * @return イベントグリッドトリガーのテストを行うサービスクラスの呼出結果 */ public EventGridTriggerResult eventGridTriggerTest(EventGridTriggerParam param) { // イベントグリッドトリガーのテストを行うサービスが呼び出されたことをログ出力する LOGGER.info("EventGridTriggerService eventGridTriggerTest triggered: " + param.getTimerInfo()); // Spring Batchのジョブを起動する // ジョブ起動時に生成したパラメータを指定する try { jobLauncher.run(job, createInitialJobParameterMap(param)); } catch (Exception e) { throw new RuntimeException(e); } // イベントグリッドトリガーのテストを行うサービスクラスの呼出結果を返却する EventGridTriggerResult result = new EventGridTriggerResult(); result.setResult("success"); return result; } /** * ジョブを起動するためのパラメータを生成する. * @param EventGridTriggerParamオブジェクト * @return ジョブを起動するためのパラメータ * @throws JsonProcessingException */ private JobParameters createInitialJobParameterMap(EventGridTriggerParam param) throws JsonProcessingException { Map<String, JobParameter> m = new HashMap<>(); m.put("time", new JobParameter(System.currentTimeMillis())); m.put("eventGridTriggerParam" , new JobParameter(new ObjectMapper().writeValueAsString(param))); JobParameters p = new JobParameters(m); return p; } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | package com.example.model; import lombok.Data; @Data public class EventGridTriggerParam { /** 読み込むBlobストレージのファイル名 */ private String fileName; /** Timer情報 */ private String timerInfo; } |
1 2 3 4 5 6 7 8 9 10 11 | package com.example.model; import lombok.Data; @Data public class EventGridTriggerResult { /** 結果情報 */ private String result; } |
また、Azure Functionsのメインクラスの内容は以下の通りで、EventGridTriggerのサービスクラスのファンクション定義を追加している。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 | package com.example; import java.util.function.Function; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import com.example.model.EventGridTriggerParam; import com.example.model.EventGridTriggerResult; import com.example.service.EventGridTriggerService; @SpringBootApplication public class DemoAzureFunction { /** イベントグリッドトリガーのテストを行うサービスクラスのオブジェクト */ @Autowired private EventGridTriggerService eventGridTriggerService; public static void main(String[] args) throws Exception { SpringApplication.run(DemoAzureFunction.class, args); } /** * イベントグリッドトリガーのテストを行い結果を返却する関数 * @return イベントグリッドトリガーのテストを行うサービスクラスの呼出結果 */ @Bean public Function<EventGridTriggerParam, EventGridTriggerResult> eventGridTriggerTest(){ return eventGridTriggerParam -> eventGridTriggerService.eventGridTriggerTest(eventGridTriggerParam); } } |
さらに、EventGridTriggerのサービスクラスから呼び出されるバッチ処理の内容は以下の通りで、CSVファイルをDBに書き込む処理を実施した後は、読み込んだCSVファイル名の末尾に「_yyyymmddhhmmss」を追加する処理を追加している。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 | package com.example.service; import java.io.BufferedReader; import java.io.InputStream; import java.io.InputStreamReader; import java.net.URISyntaxException; import java.security.InvalidKeyException; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import com.example.mybatis.UserDataMapper; import com.example.mybatis.model.UserData; import com.example.util.DemoStringUtil; import com.microsoft.azure.storage.CloudStorageAccount; import com.microsoft.azure.storage.StorageException; import com.microsoft.azure.storage.blob.CloudBlobClient; import com.microsoft.azure.storage.blob.CloudBlobContainer; import com.microsoft.azure.storage.blob.CloudBlockBlob; import com.microsoft.azure.storage.blob.CopyStatus; @Service public class DemoBatchService { /* Spring Bootでログ出力するためのLogbackのクラスを生成 */ private static final Logger LOGGER = LoggerFactory.getLogger(DemoBatchService.class); /** Azure Storageのアカウント名 */ @Value("${azure.storage.accountName}") private String storageAccountName; /** Azure Storageへのアクセスキー */ @Value("${azure.storage.accessKey}") private String storageAccessKey; /** Azure StorageのBlobコンテナー名 */ @Value("${azure.storage.containerName}") private String storageContainerName; /** USER_DATAテーブルにアクセスするマッパー */ @Autowired private UserDataMapper userDataMapper; /** * BlobStorageから引数で指定したファイルを読み込み、USER_DATAテーブルに書き込む * @param fileName ファイル名 */ @Transactional public void readUserData(String fileName) { // ファイルがCSVファイル以外か、ファイル名の末尾が「_yyyymmddhhmmss.csv」 // の場合は、何もせず処理を終了する if (!fileName.matches(".*\\.csv") || fileName.matches(".*_[0-9]{14}\\.csv")) { return; } // BlobStorageから引数で指定したファイルを読み込む try (BufferedReader br = new BufferedReader( new InputStreamReader(getBlobCsvData(fileName), "UTF-8"))) { String lineStr = null; int lineCnt = 0; // 1行目(タイトル行)は読み飛ばし、2行目以降はチェックの上、 // USER_DATAテーブルに書き込む // チェックエラー時はエラーログを出力の上、DB更新は行わず先へ進む while ((lineStr = br.readLine()) != null) { // 1行目(タイトル行)は読み飛ばす lineCnt++; if (lineCnt == 1) { continue; } // 引数のCSVファイル1行分の文字列を受け取り、エラーがあればNULLを、 // エラーがなければUserDataオブジェクトに変換し返す UserData userData = checkData(lineStr, lineCnt); // 読み込んだファイルをUSER_DATAテーブルに書き込む if (userData != null) { userDataMapper.upsert(userData); } } } catch (Exception ex) { LOGGER.error(ex.getMessage()); throw new RuntimeException(ex); } finally { try { // 読み込んだファイル名の末尾に「_yyyymmddhhmmss」を追加する renameBlobCsvData(fileName); } catch (Exception ex) { LOGGER.error(ex.getMessage()); throw new RuntimeException(ex); } } } /** * Blobストレージから指定したファイルデータを取得する. * @param fileName ファイル名 * @return 指定したファイルデータの入力ストリーム * @throws URISyntaxException * @throws InvalidKeyException * @throws StorageException */ private InputStream getBlobCsvData(String fileName) throws URISyntaxException, InvalidKeyException, StorageException { // Blob内のコンテナーを取得 CloudBlobContainer container = getBlobContainer(); // BlobStorageから指定したファイルを読み込む CloudBlockBlob blob = container.getBlockBlobReference(fileName); return blob.openInputStream(); } /** * Blobストレージの指定したファイルをリネームする. * @param fileName ファイル名 * @throws URISyntaxException * @throws InvalidKeyException * @throws StorageException * @throws InterruptedException */ private void renameBlobCsvData(String fileName) throws URISyntaxException, InvalidKeyException , StorageException, InterruptedException { // Blob内のコンテナーを取得 CloudBlobContainer container = getBlobContainer(); // 指定したファイルを、末尾に「_yyyymmddhhmmss」を追加したファイル名のBlobにコピー CloudBlockBlob blob = container.getBlockBlobReference(fileName); CloudBlockBlob blobCopy = container.getBlockBlobReference(getNewFileName(fileName)); blobCopy.startCopy(blob.getUri()); // コピーが終わるまで待機 while (true) { if (blobCopy.getCopyState().getStatus() != CopyStatus.PENDING) { break; } Thread.sleep(1000); } // 指定したファイルを削除 blob.delete(); } /** * Blob内のコンテナーを取得し返す. * @return Blob内のコンテナー * @throws URISyntaxException * @throws InvalidKeyException * @throws StorageException */ private CloudBlobContainer getBlobContainer() throws URISyntaxException, InvalidKeyException, StorageException { // Blobストレージへの接続文字列 String storageConnectionString = "DefaultEndpointsProtocol=https;" + "AccountName=" + storageAccountName + ";" + "AccountKey=" + storageAccessKey + ";"; // ストレージアカウントオブジェクトを取得 CloudStorageAccount storageAccount = CloudStorageAccount.parse(storageConnectionString); // Blobクライアントオブジェクトを取得 CloudBlobClient blobClient = storageAccount.createCloudBlobClient(); // Blob内のコンテナーを取得し返却 return blobClient.getContainerReference(storageContainerName); } /** * 指定したファイルの末尾に「_yyyymmddhhmmss」を追加したファイル名を返す. * @param fileName ファイル名 * @return リネーム後のファイル名 */ private String getNewFileName(String fileName) { StringBuilder sbNewFile = new StringBuilder(); sbNewFile.append(fileName.replace(".csv", "")); sbNewFile.append("_"); sbNewFile.append(DemoStringUtil.getNowDateTime()); sbNewFile.append(".csv"); return sbNewFile.toString(); } /** * 引数のCSVファイル1行分の文字列を受け取り、エラーがあればNULLを、 * エラーがなければUserDataオブジェクトに変換し返す. * @param lineStr CSVファイル1行分の文字列 * @param lineCnt 行数 * @return 変換後のUserData */ private UserData checkData(String lineStr, int lineCnt) { // 引数のCSVファイル1行分の文字列をカンマで分割 String[] strArray = lineStr.split(","); // 桁数不正の場合はエラー if (strArray == null || strArray.length != 7) { LOGGER.info(lineCnt + "行目: 桁数が不正です。"); return null; } // 文字列前後のダブルクォーテーションを削除する for (int i = 0; i < strArray.length; i++) { strArray[i] = DemoStringUtil.trimDoubleQuot(strArray[i]); } // 1列目が空またはNULLの場合はエラー if (StringUtils.isEmpty(strArray[0])) { LOGGER.info(lineCnt + "行目: 1列目が空またはNULLです。"); return null; } // 1列目が数値以外の場合はエラー if (!StringUtils.isNumeric(strArray[0])) { LOGGER.info(lineCnt + "行目: 1列目が数値以外です。"); return null; } // 1列目の桁数が不正な場合はエラー if (strArray[0].length() > 6) { LOGGER.info(lineCnt + "行目: 1列目の桁数が不正です。"); return null; } // 2列目が空またはNULLの場合はエラー if (StringUtils.isEmpty(strArray[1])) { LOGGER.info(lineCnt + "行目: 2列目が空またはNULLです。"); return null; } // 2列目の桁数が不正な場合はエラー if (strArray[1].length() > 40) { LOGGER.info(lineCnt + "行目: 2列目の桁数が不正です。"); return null; } // 3列目が空またはNULLの場合はエラー if (StringUtils.isEmpty(strArray[2])) { LOGGER.info(lineCnt + "行目: 3列目が空またはNULLです。"); return null; } // 3列目が数値以外の場合はエラー if (!StringUtils.isNumeric(strArray[2])) { LOGGER.info(lineCnt + "行目: 3列目が数値以外です。"); return null; } // 3列目の桁数が不正な場合はエラー if (strArray[2].length() > 4) { LOGGER.info(lineCnt + "行目: 3列目の桁数が不正です。"); return null; } // 4列目が空またはNULLの場合はエラー if (StringUtils.isEmpty(strArray[3])) { LOGGER.info(lineCnt + "行目: 4列目が空またはNULLです。"); return null; } // 4列目が数値以外の場合はエラー if (!StringUtils.isNumeric(strArray[3])) { LOGGER.info(lineCnt + "行目: 4列目が数値以外です。"); return null; } // 4列目の桁数が不正な場合はエラー if (strArray[3].length() > 2) { LOGGER.info(lineCnt + "行目: 4列目の桁数が不正です。"); return null; } // 5列目が空またはNULLの場合はエラー if (StringUtils.isEmpty(strArray[4])) { LOGGER.info(lineCnt + "行目: 5列目が空またはNULLです。"); return null; } // 5列目が数値以外の場合はエラー if (!StringUtils.isNumeric(strArray[4])) { LOGGER.info(lineCnt + "行目: 5列目が数値以外です。"); return null; } // 5列目の桁数が不正な場合はエラー if (strArray[4].length() > 2) { LOGGER.info(lineCnt + "行目: 5列目の桁数が不正です。"); return null; } // 3列目・4列目・5列目から生成される日付が不正であればエラー String birthDay = strArray[2] + DemoStringUtil.addZero(strArray[3]) + DemoStringUtil.addZero(strArray[4]); if (!DemoStringUtil.isCorrectDate(birthDay, "uuuuMMdd")) { LOGGER.info(lineCnt + "行目: 3~5列目の日付が不正です。"); return null; } // 6列目が1,2以外の場合はエラー if (!("1".equals(strArray[5])) && !("2".equals(strArray[5]))) { LOGGER.info(lineCnt + "行目: 6列目の性別が不正です。"); return null; } // 7列目の桁数が不正な場合はエラー if (!StringUtils.isEmpty(strArray[6]) && strArray[6].length() > 1024) { LOGGER.info(lineCnt + "行目: 7列目の桁数が不正です。"); return null; } // エラーがなければUserDataオブジェクトに変換し返す UserData userData = new UserData(); userData.setId(Integer.parseInt(strArray[0])); userData.setName(strArray[1]); userData.setBirth_year(Integer.parseInt(strArray[2])); userData.setBirth_month(Integer.parseInt(strArray[3])); userData.setBirth_day(Integer.parseInt(strArray[4])); userData.setSex(strArray[5]); userData.setMemo(strArray[6]); return userData; } } |
その他、ユーティリティクラスの内容は以下の通りで、getNowDateTimeメソッドを追加している。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 | package com.example.util; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.time.format.ResolverStyle; import org.apache.commons.lang3.StringUtils; public class DemoStringUtil { /** * 文字列前後のダブルクォーテーションを削除する. * @param str 変換前文字列 * @return 変換後文字列 */ public static String trimDoubleQuot(String regStr) { if (StringUtils.isEmpty(regStr)) { return regStr; } char c = '"'; if (regStr.charAt(0) == c && regStr.charAt(regStr.length() - 1) == c) { return regStr.substring(1, regStr.length() - 1); } else { return regStr; } } /** * DateTimeFormatterを利用して日付チェックを行う. * @param dateStr チェック対象文字列 * @param dateFormat 日付フォーマット * @return 日付チェック結果 */ public static boolean isCorrectDate(String dateStr, String dateFormat) { if (StringUtils.isEmpty(dateStr) || StringUtils.isEmpty(dateFormat)) { return false; } // 日付と時刻を厳密に解決するスタイルで、DateTimeFormatterオブジェクトを作成 DateTimeFormatter df = DateTimeFormatter.ofPattern(dateFormat) .withResolverStyle(ResolverStyle.STRICT); try { // チェック対象文字列をLocalDate型の日付に変換できれば、チェックOKとする LocalDate.parse(dateStr, df); return true; } catch (Exception e) { return false; } } /** * 数値文字列が1桁の場合、頭に0を付けて返す. * @param intNum 数値文字列 * @return 変換後数値文字列 */ public static String addZero(String intNum) { if (StringUtils.isEmpty(intNum)) { return intNum; } if (intNum.length() == 1) { return "0" + intNum; } return intNum; } /** * 現在日時をyyyyMMddHHmmss形式で返す. * @return 現在日時 */ public static String getNowDateTime() { LocalDateTime localDateTime = LocalDateTime.now(); DateTimeFormatter datetimeformatter = DateTimeFormatter.ofPattern("yyyyMMddHHmmss"); return datetimeformatter.format(localDateTime); } } |
その他のソースコード内容は、以下のサイトを参照のこと。また、「extensions.csproj」は、「mvn package」コマンドを実行後に作成されるファイルである。
https://github.com/purin-it/azure/tree/master/event-grid-trigger-batch-csv-to-db/demoAzureFunc
Azure環境へのデプロイ手順は、以下のサイトの「サンプルプログラムの実行結果(ローカル)」「サンプルプログラムの実行結果(Azure上)」に記載されている通りである。
ただし、mvn packageコマンドの実行結果(途中省略)は、以下のようになる。
また、Azure環境にデプロイ後、Azure Portal上で関数を確認すると、以下のように、トリガーがEventGridトリガーになっていることが確認できる。