これまでこのブログでは、TimerTriggerを利用してBlob上のCSVファイルをDBに書き込む処理を作成していたが、EventGridTriggerを利用して、BlobにCSVファイルが作成されたタイミングで、そのCSVファイルをDBに書き込むこともできる。
今回は、EventGridTriggerを利用して、BlobにCSVファイルが作成されたタイミングで、そのCSVファイルをDBに書き込む処理を作成してみたので、そのサンプルプログラムを共有する。
なお、今回の記事は長くなるため、サンプルプログラムの内容とAzure上へのデプロイまでのみ記載し、Azure Portal上でのAzure Eventの設定とサンプルプログラムの実行結果の内容は、次回の記事で記載する。
前提条件
下記記事のサンプルプログラムを作成済であること。なお、読み込むCSVファイルの文字コードはUTF-8とする。
また、以下のように、サブスクリプションのEvent Grid リソース プロバイダー(Microsoft.EventGrid)が、有効になっていること。
サンプルプログラムの作成
前提条件の記事のサンプルプログラムを、EventGridTriggerによって起動するように修正する。なお、下記の赤枠は、前提条件のプログラムから大きく変更したり、追加したりしたプログラムである。
EventGridのハンドラークラスの内容は以下の通りで、EventGridTriggerアノテーションを付与したクラスで、EventGridTriggerのイベント情報を取得している。
package com.example; import java.time.LocalDateTime; import org.springframework.cloud.function.adapter.azure.FunctionInvoker; import com.example.model.EventGridTriggerParam; import com.example.model.EventGridTriggerResult; import com.example.model.EventSchema; import com.microsoft.azure.functions.ExecutionContext; import com.microsoft.azure.functions.annotation.EventGridTrigger; import com.microsoft.azure.functions.annotation.FunctionName; public class EventGridTriggerTestHandler extends FunctionInvoker<EventGridTriggerParam, EventGridTriggerResult> { /** * EventGridTriggerによって、DemoAzureFunctionクラスの * eventGridTriggerTestメソッドを呼び出す. * @param event EventGridTriggerイベント情報 * @param context コンテキストオブジェクト */ @FunctionName("eventGridTriggerTest") public void eventGridTriggerTest( @EventGridTrigger(name = "event") EventSchema event, final ExecutionContext context) { context.getLogger().info( "EventGridTriggerTestHandler eventGridTriggerTest triggered: " + event.eventTime); // EventGridTriggerイベント情報から、Blob Storageの読み込むファイル名を取得し、 // EventGridTriggerParamに設定する EventGridTriggerParam param = new EventGridTriggerParam(); String subject = event.subject; param.setFileName(subject.substring(subject.lastIndexOf("/") + 1)); param.setTimerInfo(LocalDateTime.now().toString()); handleRequest(param, context); } }
また、EventGridTriggerのイベント情報は、以下のクラスで定義している。
package com.example.model; import java.util.Date; import java.util.Map; public class EventSchema { /** トピック */ public String topic; /** サブジェクト */ public String subject; /** イベントタイプ */ public String eventType; /** イベント発生日時 */ public Date eventTime; /** ID */ public String id; /** データのバージョン */ public String dataVersion; /** メタデータのバージョン */ public String metadataVersion; /** データ */ public Map<String, Object> data; }
なお、ここまでの処理は、以下のサイトを参考にして実装している。
https://docs.microsoft.com/ja-jp/azure/azure-functions/functions-bindings-event-grid-trigger?tabs=java%2Cbash
さらに、EventGridTriggerのサービスクラス、Paramクラス、Resultクラスの内容は以下の通り。
package com.example.service; import java.util.HashMap; import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.core.Job; import org.springframework.batch.core.JobParameter; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.launch.JobLauncher; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import com.example.model.EventGridTriggerParam; import com.example.model.EventGridTriggerResult; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; @Service public class EventGridTriggerService { /* Spring Bootでログ出力するためのLogbackのクラスを生成 */ private static final Logger LOGGER = LoggerFactory.getLogger(EventGridTriggerService.class); /** Spring Batchのジョブを起動するクラス */ @Autowired JobLauncher jobLauncher; /** Spring Batchのジョブを定義するクラス */ @Autowired Job job; /** * イベントグリッドトリガーのテストを行うサービス. * @param param EventGridTrigger呼出用Param * @return イベントグリッドトリガーのテストを行うサービスクラスの呼出結果 */ public EventGridTriggerResult eventGridTriggerTest(EventGridTriggerParam param) { // イベントグリッドトリガーのテストを行うサービスが呼び出されたことをログ出力する LOGGER.info("EventGridTriggerService eventGridTriggerTest triggered: " + param.getTimerInfo()); // Spring Batchのジョブを起動する // ジョブ起動時に生成したパラメータを指定する try { jobLauncher.run(job, createInitialJobParameterMap(param)); } catch (Exception e) { throw new RuntimeException(e); } // イベントグリッドトリガーのテストを行うサービスクラスの呼出結果を返却する EventGridTriggerResult result = new EventGridTriggerResult(); result.setResult("success"); return result; } /** * ジョブを起動するためのパラメータを生成する. * @param EventGridTriggerParamオブジェクト * @return ジョブを起動するためのパラメータ * @throws JsonProcessingException */ private JobParameters createInitialJobParameterMap(EventGridTriggerParam param) throws JsonProcessingException { Map<String, JobParameter> m = new HashMap<>(); m.put("time", new JobParameter(System.currentTimeMillis())); m.put("eventGridTriggerParam" , new JobParameter(new ObjectMapper().writeValueAsString(param))); JobParameters p = new JobParameters(m); return p; } }
package com.example.model; import lombok.Data; @Data public class EventGridTriggerParam { /** 読み込むBlobストレージのファイル名 */ private String fileName; /** Timer情報 */ private String timerInfo; }
package com.example.model; import lombok.Data; @Data public class EventGridTriggerResult { /** 結果情報 */ private String result; }
また、Azure Functionsのメインクラスの内容は以下の通りで、EventGridTriggerのサービスクラスのファンクション定義を追加している。
package com.example; import java.util.function.Function; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import com.example.model.EventGridTriggerParam; import com.example.model.EventGridTriggerResult; import com.example.service.EventGridTriggerService; @SpringBootApplication public class DemoAzureFunction { /** イベントグリッドトリガーのテストを行うサービスクラスのオブジェクト */ @Autowired private EventGridTriggerService eventGridTriggerService; public static void main(String[] args) throws Exception { SpringApplication.run(DemoAzureFunction.class, args); } /** * イベントグリッドトリガーのテストを行い結果を返却する関数 * @return イベントグリッドトリガーのテストを行うサービスクラスの呼出結果 */ @Bean public Function<EventGridTriggerParam, EventGridTriggerResult> eventGridTriggerTest(){ return eventGridTriggerParam -> eventGridTriggerService.eventGridTriggerTest(eventGridTriggerParam); } }
さらに、EventGridTriggerのサービスクラスから呼び出されるバッチ処理の内容は以下の通りで、CSVファイルをDBに書き込む処理を実施した後は、読み込んだCSVファイル名の末尾に「_yyyymmddhhmmss」を追加する処理を追加している。
package com.example.service; import java.io.BufferedReader; import java.io.InputStream; import java.io.InputStreamReader; import java.net.URISyntaxException; import java.security.InvalidKeyException; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import com.example.mybatis.UserDataMapper; import com.example.mybatis.model.UserData; import com.example.util.DemoStringUtil; import com.microsoft.azure.storage.CloudStorageAccount; import com.microsoft.azure.storage.StorageException; import com.microsoft.azure.storage.blob.CloudBlobClient; import com.microsoft.azure.storage.blob.CloudBlobContainer; import com.microsoft.azure.storage.blob.CloudBlockBlob; import com.microsoft.azure.storage.blob.CopyStatus; @Service public class DemoBatchService { /* Spring Bootでログ出力するためのLogbackのクラスを生成 */ private static final Logger LOGGER = LoggerFactory.getLogger(DemoBatchService.class); /** Azure Storageのアカウント名 */ @Value("${azure.storage.accountName}") private String storageAccountName; /** Azure Storageへのアクセスキー */ @Value("${azure.storage.accessKey}") private String storageAccessKey; /** Azure StorageのBlobコンテナー名 */ @Value("${azure.storage.containerName}") private String storageContainerName; /** USER_DATAテーブルにアクセスするマッパー */ @Autowired private UserDataMapper userDataMapper; /** * BlobStorageから引数で指定したファイルを読み込み、USER_DATAテーブルに書き込む * @param fileName ファイル名 */ @Transactional public void readUserData(String fileName) { // ファイルがCSVファイル以外か、ファイル名の末尾が「_yyyymmddhhmmss.csv」 // の場合は、何もせず処理を終了する if (!fileName.matches(".*\\.csv") || fileName.matches(".*_[0-9]{14}\\.csv")) { return; } // BlobStorageから引数で指定したファイルを読み込む try (BufferedReader br = new BufferedReader( new InputStreamReader(getBlobCsvData(fileName), "UTF-8"))) { String lineStr = null; int lineCnt = 0; // 1行目(タイトル行)は読み飛ばし、2行目以降はチェックの上、 // USER_DATAテーブルに書き込む // チェックエラー時はエラーログを出力の上、DB更新は行わず先へ進む while ((lineStr = br.readLine()) != null) { // 1行目(タイトル行)は読み飛ばす lineCnt++; if (lineCnt == 1) { continue; } // 引数のCSVファイル1行分の文字列を受け取り、エラーがあればNULLを、 // エラーがなければUserDataオブジェクトに変換し返す UserData userData = checkData(lineStr, lineCnt); // 読み込んだファイルをUSER_DATAテーブルに書き込む if (userData != null) { userDataMapper.upsert(userData); } } } catch (Exception ex) { LOGGER.error(ex.getMessage()); throw new RuntimeException(ex); } finally { try { // 読み込んだファイル名の末尾に「_yyyymmddhhmmss」を追加する renameBlobCsvData(fileName); } catch (Exception ex) { LOGGER.error(ex.getMessage()); throw new RuntimeException(ex); } } } /** * Blobストレージから指定したファイルデータを取得する. * @param fileName ファイル名 * @return 指定したファイルデータの入力ストリーム * @throws URISyntaxException * @throws InvalidKeyException * @throws StorageException */ private InputStream getBlobCsvData(String fileName) throws URISyntaxException, InvalidKeyException, StorageException { // Blob内のコンテナーを取得 CloudBlobContainer container = getBlobContainer(); // BlobStorageから指定したファイルを読み込む CloudBlockBlob blob = container.getBlockBlobReference(fileName); return blob.openInputStream(); } /** * Blobストレージの指定したファイルをリネームする. * @param fileName ファイル名 * @throws URISyntaxException * @throws InvalidKeyException * @throws StorageException * @throws InterruptedException */ private void renameBlobCsvData(String fileName) throws URISyntaxException, InvalidKeyException , StorageException, InterruptedException { // Blob内のコンテナーを取得 CloudBlobContainer container = getBlobContainer(); // 指定したファイルを、末尾に「_yyyymmddhhmmss」を追加したファイル名のBlobにコピー CloudBlockBlob blob = container.getBlockBlobReference(fileName); CloudBlockBlob blobCopy = container.getBlockBlobReference(getNewFileName(fileName)); blobCopy.startCopy(blob.getUri()); // コピーが終わるまで待機 while (true) { if (blobCopy.getCopyState().getStatus() != CopyStatus.PENDING) { break; } Thread.sleep(1000); } // 指定したファイルを削除 blob.delete(); } /** * Blob内のコンテナーを取得し返す. * @return Blob内のコンテナー * @throws URISyntaxException * @throws InvalidKeyException * @throws StorageException */ private CloudBlobContainer getBlobContainer() throws URISyntaxException, InvalidKeyException, StorageException { // Blobストレージへの接続文字列 String storageConnectionString = "DefaultEndpointsProtocol=https;" + "AccountName=" + storageAccountName + ";" + "AccountKey=" + storageAccessKey + ";"; // ストレージアカウントオブジェクトを取得 CloudStorageAccount storageAccount = CloudStorageAccount.parse(storageConnectionString); // Blobクライアントオブジェクトを取得 CloudBlobClient blobClient = storageAccount.createCloudBlobClient(); // Blob内のコンテナーを取得し返却 return blobClient.getContainerReference(storageContainerName); } /** * 指定したファイルの末尾に「_yyyymmddhhmmss」を追加したファイル名を返す. * @param fileName ファイル名 * @return リネーム後のファイル名 */ private String getNewFileName(String fileName) { StringBuilder sbNewFile = new StringBuilder(); sbNewFile.append(fileName.replace(".csv", "")); sbNewFile.append("_"); sbNewFile.append(DemoStringUtil.getNowDateTime()); sbNewFile.append(".csv"); return sbNewFile.toString(); } /** * 引数のCSVファイル1行分の文字列を受け取り、エラーがあればNULLを、 * エラーがなければUserDataオブジェクトに変換し返す. * @param lineStr CSVファイル1行分の文字列 * @param lineCnt 行数 * @return 変換後のUserData */ private UserData checkData(String lineStr, int lineCnt) { // 引数のCSVファイル1行分の文字列をカンマで分割 String[] strArray = lineStr.split(","); // 桁数不正の場合はエラー if (strArray == null || strArray.length != 7) { LOGGER.info(lineCnt + "行目: 桁数が不正です。"); return null; } // 文字列前後のダブルクォーテーションを削除する for (int i = 0; i < strArray.length; i++) { strArray[i] = DemoStringUtil.trimDoubleQuot(strArray[i]); } // 1列目が空またはNULLの場合はエラー if (StringUtils.isEmpty(strArray[0])) { LOGGER.info(lineCnt + "行目: 1列目が空またはNULLです。"); return null; } // 1列目が数値以外の場合はエラー if (!StringUtils.isNumeric(strArray[0])) { LOGGER.info(lineCnt + "行目: 1列目が数値以外です。"); return null; } // 1列目の桁数が不正な場合はエラー if (strArray[0].length() > 6) { LOGGER.info(lineCnt + "行目: 1列目の桁数が不正です。"); return null; } // 2列目が空またはNULLの場合はエラー if (StringUtils.isEmpty(strArray[1])) { LOGGER.info(lineCnt + "行目: 2列目が空またはNULLです。"); return null; } // 2列目の桁数が不正な場合はエラー if (strArray[1].length() > 40) { LOGGER.info(lineCnt + "行目: 2列目の桁数が不正です。"); return null; } // 3列目が空またはNULLの場合はエラー if (StringUtils.isEmpty(strArray[2])) { LOGGER.info(lineCnt + "行目: 3列目が空またはNULLです。"); return null; } // 3列目が数値以外の場合はエラー if (!StringUtils.isNumeric(strArray[2])) { LOGGER.info(lineCnt + "行目: 3列目が数値以外です。"); return null; } // 3列目の桁数が不正な場合はエラー if (strArray[2].length() > 4) { LOGGER.info(lineCnt + "行目: 3列目の桁数が不正です。"); return null; } // 4列目が空またはNULLの場合はエラー if (StringUtils.isEmpty(strArray[3])) { LOGGER.info(lineCnt + "行目: 4列目が空またはNULLです。"); return null; } // 4列目が数値以外の場合はエラー if (!StringUtils.isNumeric(strArray[3])) { LOGGER.info(lineCnt + "行目: 4列目が数値以外です。"); return null; } // 4列目の桁数が不正な場合はエラー if (strArray[3].length() > 2) { LOGGER.info(lineCnt + "行目: 4列目の桁数が不正です。"); return null; } // 5列目が空またはNULLの場合はエラー if (StringUtils.isEmpty(strArray[4])) { LOGGER.info(lineCnt + "行目: 5列目が空またはNULLです。"); return null; } // 5列目が数値以外の場合はエラー if (!StringUtils.isNumeric(strArray[4])) { LOGGER.info(lineCnt + "行目: 5列目が数値以外です。"); return null; } // 5列目の桁数が不正な場合はエラー if (strArray[4].length() > 2) { LOGGER.info(lineCnt + "行目: 5列目の桁数が不正です。"); return null; } // 3列目・4列目・5列目から生成される日付が不正であればエラー String birthDay = strArray[2] + DemoStringUtil.addZero(strArray[3]) + DemoStringUtil.addZero(strArray[4]); if (!DemoStringUtil.isCorrectDate(birthDay, "uuuuMMdd")) { LOGGER.info(lineCnt + "行目: 3~5列目の日付が不正です。"); return null; } // 6列目が1,2以外の場合はエラー if (!("1".equals(strArray[5])) && !("2".equals(strArray[5]))) { LOGGER.info(lineCnt + "行目: 6列目の性別が不正です。"); return null; } // 7列目の桁数が不正な場合はエラー if (!StringUtils.isEmpty(strArray[6]) && strArray[6].length() > 1024) { LOGGER.info(lineCnt + "行目: 7列目の桁数が不正です。"); return null; } // エラーがなければUserDataオブジェクトに変換し返す UserData userData = new UserData(); userData.setId(Integer.parseInt(strArray[0])); userData.setName(strArray[1]); userData.setBirth_year(Integer.parseInt(strArray[2])); userData.setBirth_month(Integer.parseInt(strArray[3])); userData.setBirth_day(Integer.parseInt(strArray[4])); userData.setSex(strArray[5]); userData.setMemo(strArray[6]); return userData; } }
その他、ユーティリティクラスの内容は以下の通りで、getNowDateTimeメソッドを追加している。
package com.example.util; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.time.format.ResolverStyle; import org.apache.commons.lang3.StringUtils; public class DemoStringUtil { /** * 文字列前後のダブルクォーテーションを削除する. * @param str 変換前文字列 * @return 変換後文字列 */ public static String trimDoubleQuot(String regStr) { if (StringUtils.isEmpty(regStr)) { return regStr; } char c = '"'; if (regStr.charAt(0) == c && regStr.charAt(regStr.length() - 1) == c) { return regStr.substring(1, regStr.length() - 1); } else { return regStr; } } /** * DateTimeFormatterを利用して日付チェックを行う. * @param dateStr チェック対象文字列 * @param dateFormat 日付フォーマット * @return 日付チェック結果 */ public static boolean isCorrectDate(String dateStr, String dateFormat) { if (StringUtils.isEmpty(dateStr) || StringUtils.isEmpty(dateFormat)) { return false; } // 日付と時刻を厳密に解決するスタイルで、DateTimeFormatterオブジェクトを作成 DateTimeFormatter df = DateTimeFormatter.ofPattern(dateFormat) .withResolverStyle(ResolverStyle.STRICT); try { // チェック対象文字列をLocalDate型の日付に変換できれば、チェックOKとする LocalDate.parse(dateStr, df); return true; } catch (Exception e) { return false; } } /** * 数値文字列が1桁の場合、頭に0を付けて返す. * @param intNum 数値文字列 * @return 変換後数値文字列 */ public static String addZero(String intNum) { if (StringUtils.isEmpty(intNum)) { return intNum; } if (intNum.length() == 1) { return "0" + intNum; } return intNum; } /** * 現在日時をyyyyMMddHHmmss形式で返す. * @return 現在日時 */ public static String getNowDateTime() { LocalDateTime localDateTime = LocalDateTime.now(); DateTimeFormatter datetimeformatter = DateTimeFormatter.ofPattern("yyyyMMddHHmmss"); return datetimeformatter.format(localDateTime); } }
その他のソースコード内容は、以下のサイトを参照のこと。また、「extensions.csproj」は、「mvn package」コマンドを実行後に作成されるファイルである。
https://github.com/purin-it/azure/tree/master/event-grid-trigger-batch-csv-to-db/demoAzureFunc
Azure環境へのデプロイ手順は、以下のサイトの「サンプルプログラムの実行結果(ローカル)」「サンプルプログラムの実行結果(Azure上)」に記載されている通りである。
ただし、mvn packageコマンドの実行結果(途中省略)は、以下のようになる。
また、Azure環境にデプロイ後、Azure Portal上で関数を確認すると、以下のように、トリガーがEventGridトリガーになっていることが確認できる。