これまでこのブログでは、TimerTriggerを利用してBlob上のCSVファイルをDBに書き込む処理を作成していたが、EventGridTriggerを利用して、BlobにCSVファイルが作成されたタイミングで、そのCSVファイルをDBに書き込むこともできる。
今回は、EventGridTriggerを利用して、BlobにCSVファイルが作成されたタイミングで、そのCSVファイルをDBに書き込む処理を作成してみたので、そのサンプルプログラムを共有する。
なお、今回の記事は長くなるため、サンプルプログラムの内容とAzure上へのデプロイまでのみ記載し、Azure Portal上でのAzure Eventの設定とサンプルプログラムの実行結果の内容は、次回の記事で記載する。
前提条件
下記記事のサンプルプログラムを作成済であること。なお、読み込むCSVファイルの文字コードはUTF-8とする。
また、以下のように、サブスクリプションのEvent Grid リソース プロバイダー(Microsoft.EventGrid)が、有効になっていること。

サンプルプログラムの作成
前提条件の記事のサンプルプログラムを、EventGridTriggerによって起動するように修正する。なお、下記の赤枠は、前提条件のプログラムから大きく変更したり、追加したりしたプログラムである。

EventGridのハンドラークラスの内容は以下の通りで、EventGridTriggerアノテーションを付与したクラスで、EventGridTriggerのイベント情報を取得している。
package com.example;
import java.time.LocalDateTime;
import org.springframework.cloud.function.adapter.azure.FunctionInvoker;
import com.example.model.EventGridTriggerParam;
import com.example.model.EventGridTriggerResult;
import com.example.model.EventSchema;
import com.microsoft.azure.functions.ExecutionContext;
import com.microsoft.azure.functions.annotation.EventGridTrigger;
import com.microsoft.azure.functions.annotation.FunctionName;
public class EventGridTriggerTestHandler
extends FunctionInvoker<EventGridTriggerParam, EventGridTriggerResult> {
/**
* EventGridTriggerによって、DemoAzureFunctionクラスの
* eventGridTriggerTestメソッドを呼び出す.
* @param event EventGridTriggerイベント情報
* @param context コンテキストオブジェクト
*/
@FunctionName("eventGridTriggerTest")
public void eventGridTriggerTest(
@EventGridTrigger(name = "event") EventSchema event,
final ExecutionContext context) {
context.getLogger().info(
"EventGridTriggerTestHandler eventGridTriggerTest triggered: "
+ event.eventTime);
// EventGridTriggerイベント情報から、Blob Storageの読み込むファイル名を取得し、
// EventGridTriggerParamに設定する
EventGridTriggerParam param = new EventGridTriggerParam();
String subject = event.subject;
param.setFileName(subject.substring(subject.lastIndexOf("/") + 1));
param.setTimerInfo(LocalDateTime.now().toString());
handleRequest(param, context);
}
}また、EventGridTriggerのイベント情報は、以下のクラスで定義している。
package com.example.model;
import java.util.Date;
import java.util.Map;
public class EventSchema {
/** トピック */
public String topic;
/** サブジェクト */
public String subject;
/** イベントタイプ */
public String eventType;
/** イベント発生日時 */
public Date eventTime;
/** ID */
public String id;
/** データのバージョン */
public String dataVersion;
/** メタデータのバージョン */
public String metadataVersion;
/** データ */
public Map<String, Object> data;
}なお、ここまでの処理は、以下のサイトを参考にして実装している。
https://docs.microsoft.com/ja-jp/azure/azure-functions/functions-bindings-event-grid-trigger?tabs=java%2Cbash
さらに、EventGridTriggerのサービスクラス、Paramクラス、Resultクラスの内容は以下の通り。
package com.example.service;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameter;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.example.model.EventGridTriggerParam;
import com.example.model.EventGridTriggerResult;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
@Service
public class EventGridTriggerService {
/* Spring Bootでログ出力するためのLogbackのクラスを生成 */
private static final Logger LOGGER
= LoggerFactory.getLogger(EventGridTriggerService.class);
/** Spring Batchのジョブを起動するクラス */
@Autowired
JobLauncher jobLauncher;
/** Spring Batchのジョブを定義するクラス */
@Autowired
Job job;
/**
* イベントグリッドトリガーのテストを行うサービス.
* @param param EventGridTrigger呼出用Param
* @return イベントグリッドトリガーのテストを行うサービスクラスの呼出結果
*/
public EventGridTriggerResult eventGridTriggerTest(EventGridTriggerParam param) {
// イベントグリッドトリガーのテストを行うサービスが呼び出されたことをログ出力する
LOGGER.info("EventGridTriggerService eventGridTriggerTest triggered: "
+ param.getTimerInfo());
// Spring Batchのジョブを起動する
// ジョブ起動時に生成したパラメータを指定する
try {
jobLauncher.run(job, createInitialJobParameterMap(param));
} catch (Exception e) {
throw new RuntimeException(e);
}
// イベントグリッドトリガーのテストを行うサービスクラスの呼出結果を返却する
EventGridTriggerResult result = new EventGridTriggerResult();
result.setResult("success");
return result;
}
/**
* ジョブを起動するためのパラメータを生成する.
* @param EventGridTriggerParamオブジェクト
* @return ジョブを起動するためのパラメータ
* @throws JsonProcessingException
*/
private JobParameters createInitialJobParameterMap(EventGridTriggerParam param)
throws JsonProcessingException {
Map<String, JobParameter> m = new HashMap<>();
m.put("time", new JobParameter(System.currentTimeMillis()));
m.put("eventGridTriggerParam"
, new JobParameter(new ObjectMapper().writeValueAsString(param)));
JobParameters p = new JobParameters(m);
return p;
}
}package com.example.model;
import lombok.Data;
@Data
public class EventGridTriggerParam {
/** 読み込むBlobストレージのファイル名 */
private String fileName;
/** Timer情報 */
private String timerInfo;
}package com.example.model;
import lombok.Data;
@Data
public class EventGridTriggerResult {
/** 結果情報 */
private String result;
}また、Azure Functionsのメインクラスの内容は以下の通りで、EventGridTriggerのサービスクラスのファンクション定義を追加している。
package com.example;
import java.util.function.Function;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import com.example.model.EventGridTriggerParam;
import com.example.model.EventGridTriggerResult;
import com.example.service.EventGridTriggerService;
@SpringBootApplication
public class DemoAzureFunction {
/** イベントグリッドトリガーのテストを行うサービスクラスのオブジェクト */
@Autowired
private EventGridTriggerService eventGridTriggerService;
public static void main(String[] args) throws Exception {
SpringApplication.run(DemoAzureFunction.class, args);
}
/**
* イベントグリッドトリガーのテストを行い結果を返却する関数
* @return イベントグリッドトリガーのテストを行うサービスクラスの呼出結果
*/
@Bean
public Function<EventGridTriggerParam, EventGridTriggerResult>
eventGridTriggerTest(){
return eventGridTriggerParam
-> eventGridTriggerService.eventGridTriggerTest(eventGridTriggerParam);
}
}
さらに、EventGridTriggerのサービスクラスから呼び出されるバッチ処理の内容は以下の通りで、CSVファイルをDBに書き込む処理を実施した後は、読み込んだCSVファイル名の末尾に「_yyyymmddhhmmss」を追加する処理を追加している。
package com.example.service;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URISyntaxException;
import java.security.InvalidKeyException;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import com.example.mybatis.UserDataMapper;
import com.example.mybatis.model.UserData;
import com.example.util.DemoStringUtil;
import com.microsoft.azure.storage.CloudStorageAccount;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.CloudBlobClient;
import com.microsoft.azure.storage.blob.CloudBlobContainer;
import com.microsoft.azure.storage.blob.CloudBlockBlob;
import com.microsoft.azure.storage.blob.CopyStatus;
@Service
public class DemoBatchService {
/* Spring Bootでログ出力するためのLogbackのクラスを生成 */
private static final Logger LOGGER
= LoggerFactory.getLogger(DemoBatchService.class);
/** Azure Storageのアカウント名 */
@Value("${azure.storage.accountName}")
private String storageAccountName;
/** Azure Storageへのアクセスキー */
@Value("${azure.storage.accessKey}")
private String storageAccessKey;
/** Azure StorageのBlobコンテナー名 */
@Value("${azure.storage.containerName}")
private String storageContainerName;
/** USER_DATAテーブルにアクセスするマッパー */
@Autowired
private UserDataMapper userDataMapper;
/**
* BlobStorageから引数で指定したファイルを読み込み、USER_DATAテーブルに書き込む
* @param fileName ファイル名
*/
@Transactional
public void readUserData(String fileName) {
// ファイルがCSVファイル以外か、ファイル名の末尾が「_yyyymmddhhmmss.csv」
// の場合は、何もせず処理を終了する
if (!fileName.matches(".*\\.csv")
|| fileName.matches(".*_[0-9]{14}\\.csv")) {
return;
}
// BlobStorageから引数で指定したファイルを読み込む
try (BufferedReader br = new BufferedReader(
new InputStreamReader(getBlobCsvData(fileName), "UTF-8"))) {
String lineStr = null;
int lineCnt = 0;
// 1行目(タイトル行)は読み飛ばし、2行目以降はチェックの上、
// USER_DATAテーブルに書き込む
// チェックエラー時はエラーログを出力の上、DB更新は行わず先へ進む
while ((lineStr = br.readLine()) != null) {
// 1行目(タイトル行)は読み飛ばす
lineCnt++;
if (lineCnt == 1) {
continue;
}
// 引数のCSVファイル1行分の文字列を受け取り、エラーがあればNULLを、
// エラーがなければUserDataオブジェクトに変換し返す
UserData userData = checkData(lineStr, lineCnt);
// 読み込んだファイルをUSER_DATAテーブルに書き込む
if (userData != null) {
userDataMapper.upsert(userData);
}
}
} catch (Exception ex) {
LOGGER.error(ex.getMessage());
throw new RuntimeException(ex);
} finally {
try {
// 読み込んだファイル名の末尾に「_yyyymmddhhmmss」を追加する
renameBlobCsvData(fileName);
} catch (Exception ex) {
LOGGER.error(ex.getMessage());
throw new RuntimeException(ex);
}
}
}
/**
* Blobストレージから指定したファイルデータを取得する.
* @param fileName ファイル名
* @return 指定したファイルデータの入力ストリーム
* @throws URISyntaxException
* @throws InvalidKeyException
* @throws StorageException
*/
private InputStream getBlobCsvData(String fileName)
throws URISyntaxException, InvalidKeyException, StorageException {
// Blob内のコンテナーを取得
CloudBlobContainer container = getBlobContainer();
// BlobStorageから指定したファイルを読み込む
CloudBlockBlob blob = container.getBlockBlobReference(fileName);
return blob.openInputStream();
}
/**
* Blobストレージの指定したファイルをリネームする.
* @param fileName ファイル名
* @throws URISyntaxException
* @throws InvalidKeyException
* @throws StorageException
* @throws InterruptedException
*/
private void renameBlobCsvData(String fileName)
throws URISyntaxException, InvalidKeyException
, StorageException, InterruptedException {
// Blob内のコンテナーを取得
CloudBlobContainer container = getBlobContainer();
// 指定したファイルを、末尾に「_yyyymmddhhmmss」を追加したファイル名のBlobにコピー
CloudBlockBlob blob = container.getBlockBlobReference(fileName);
CloudBlockBlob blobCopy
= container.getBlockBlobReference(getNewFileName(fileName));
blobCopy.startCopy(blob.getUri());
// コピーが終わるまで待機
while (true) {
if (blobCopy.getCopyState().getStatus() != CopyStatus.PENDING) {
break;
}
Thread.sleep(1000);
}
// 指定したファイルを削除
blob.delete();
}
/**
* Blob内のコンテナーを取得し返す.
* @return Blob内のコンテナー
* @throws URISyntaxException
* @throws InvalidKeyException
* @throws StorageException
*/
private CloudBlobContainer getBlobContainer()
throws URISyntaxException, InvalidKeyException, StorageException {
// Blobストレージへの接続文字列
String storageConnectionString = "DefaultEndpointsProtocol=https;"
+ "AccountName=" + storageAccountName + ";"
+ "AccountKey=" + storageAccessKey + ";";
// ストレージアカウントオブジェクトを取得
CloudStorageAccount storageAccount
= CloudStorageAccount.parse(storageConnectionString);
// Blobクライアントオブジェクトを取得
CloudBlobClient blobClient = storageAccount.createCloudBlobClient();
// Blob内のコンテナーを取得し返却
return blobClient.getContainerReference(storageContainerName);
}
/**
* 指定したファイルの末尾に「_yyyymmddhhmmss」を追加したファイル名を返す.
* @param fileName ファイル名
* @return リネーム後のファイル名
*/
private String getNewFileName(String fileName) {
StringBuilder sbNewFile = new StringBuilder();
sbNewFile.append(fileName.replace(".csv", ""));
sbNewFile.append("_");
sbNewFile.append(DemoStringUtil.getNowDateTime());
sbNewFile.append(".csv");
return sbNewFile.toString();
}
/**
* 引数のCSVファイル1行分の文字列を受け取り、エラーがあればNULLを、
* エラーがなければUserDataオブジェクトに変換し返す.
* @param lineStr CSVファイル1行分の文字列
* @param lineCnt 行数
* @return 変換後のUserData
*/
private UserData checkData(String lineStr, int lineCnt) {
// 引数のCSVファイル1行分の文字列をカンマで分割
String[] strArray = lineStr.split(",");
// 桁数不正の場合はエラー
if (strArray == null || strArray.length != 7) {
LOGGER.info(lineCnt + "行目: 桁数が不正です。");
return null;
}
// 文字列前後のダブルクォーテーションを削除する
for (int i = 0; i < strArray.length; i++) {
strArray[i] = DemoStringUtil.trimDoubleQuot(strArray[i]);
}
// 1列目が空またはNULLの場合はエラー
if (StringUtils.isEmpty(strArray[0])) {
LOGGER.info(lineCnt + "行目: 1列目が空またはNULLです。");
return null;
}
// 1列目が数値以外の場合はエラー
if (!StringUtils.isNumeric(strArray[0])) {
LOGGER.info(lineCnt + "行目: 1列目が数値以外です。");
return null;
}
// 1列目の桁数が不正な場合はエラー
if (strArray[0].length() > 6) {
LOGGER.info(lineCnt + "行目: 1列目の桁数が不正です。");
return null;
}
// 2列目が空またはNULLの場合はエラー
if (StringUtils.isEmpty(strArray[1])) {
LOGGER.info(lineCnt + "行目: 2列目が空またはNULLです。");
return null;
}
// 2列目の桁数が不正な場合はエラー
if (strArray[1].length() > 40) {
LOGGER.info(lineCnt + "行目: 2列目の桁数が不正です。");
return null;
}
// 3列目が空またはNULLの場合はエラー
if (StringUtils.isEmpty(strArray[2])) {
LOGGER.info(lineCnt + "行目: 3列目が空またはNULLです。");
return null;
}
// 3列目が数値以外の場合はエラー
if (!StringUtils.isNumeric(strArray[2])) {
LOGGER.info(lineCnt + "行目: 3列目が数値以外です。");
return null;
}
// 3列目の桁数が不正な場合はエラー
if (strArray[2].length() > 4) {
LOGGER.info(lineCnt + "行目: 3列目の桁数が不正です。");
return null;
}
// 4列目が空またはNULLの場合はエラー
if (StringUtils.isEmpty(strArray[3])) {
LOGGER.info(lineCnt + "行目: 4列目が空またはNULLです。");
return null;
}
// 4列目が数値以外の場合はエラー
if (!StringUtils.isNumeric(strArray[3])) {
LOGGER.info(lineCnt + "行目: 4列目が数値以外です。");
return null;
}
// 4列目の桁数が不正な場合はエラー
if (strArray[3].length() > 2) {
LOGGER.info(lineCnt + "行目: 4列目の桁数が不正です。");
return null;
}
// 5列目が空またはNULLの場合はエラー
if (StringUtils.isEmpty(strArray[4])) {
LOGGER.info(lineCnt + "行目: 5列目が空またはNULLです。");
return null;
}
// 5列目が数値以外の場合はエラー
if (!StringUtils.isNumeric(strArray[4])) {
LOGGER.info(lineCnt + "行目: 5列目が数値以外です。");
return null;
}
// 5列目の桁数が不正な場合はエラー
if (strArray[4].length() > 2) {
LOGGER.info(lineCnt + "行目: 5列目の桁数が不正です。");
return null;
}
// 3列目・4列目・5列目から生成される日付が不正であればエラー
String birthDay = strArray[2] + DemoStringUtil.addZero(strArray[3])
+ DemoStringUtil.addZero(strArray[4]);
if (!DemoStringUtil.isCorrectDate(birthDay, "uuuuMMdd")) {
LOGGER.info(lineCnt + "行目: 3~5列目の日付が不正です。");
return null;
}
// 6列目が1,2以外の場合はエラー
if (!("1".equals(strArray[5])) && !("2".equals(strArray[5]))) {
LOGGER.info(lineCnt + "行目: 6列目の性別が不正です。");
return null;
}
// 7列目の桁数が不正な場合はエラー
if (!StringUtils.isEmpty(strArray[6]) && strArray[6].length() > 1024) {
LOGGER.info(lineCnt + "行目: 7列目の桁数が不正です。");
return null;
}
// エラーがなければUserDataオブジェクトに変換し返す
UserData userData = new UserData();
userData.setId(Integer.parseInt(strArray[0]));
userData.setName(strArray[1]);
userData.setBirth_year(Integer.parseInt(strArray[2]));
userData.setBirth_month(Integer.parseInt(strArray[3]));
userData.setBirth_day(Integer.parseInt(strArray[4]));
userData.setSex(strArray[5]);
userData.setMemo(strArray[6]);
return userData;
}
}その他、ユーティリティクラスの内容は以下の通りで、getNowDateTimeメソッドを追加している。
package com.example.util;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.ResolverStyle;
import org.apache.commons.lang3.StringUtils;
public class DemoStringUtil {
/**
* 文字列前後のダブルクォーテーションを削除する.
* @param str 変換前文字列
* @return 変換後文字列
*/
public static String trimDoubleQuot(String regStr) {
if (StringUtils.isEmpty(regStr)) {
return regStr;
}
char c = '"';
if (regStr.charAt(0) == c && regStr.charAt(regStr.length() - 1) == c) {
return regStr.substring(1, regStr.length() - 1);
} else {
return regStr;
}
}
/**
* DateTimeFormatterを利用して日付チェックを行う.
* @param dateStr チェック対象文字列
* @param dateFormat 日付フォーマット
* @return 日付チェック結果
*/
public static boolean isCorrectDate(String dateStr, String dateFormat) {
if (StringUtils.isEmpty(dateStr) || StringUtils.isEmpty(dateFormat)) {
return false;
}
// 日付と時刻を厳密に解決するスタイルで、DateTimeFormatterオブジェクトを作成
DateTimeFormatter df = DateTimeFormatter.ofPattern(dateFormat)
.withResolverStyle(ResolverStyle.STRICT);
try {
// チェック対象文字列をLocalDate型の日付に変換できれば、チェックOKとする
LocalDate.parse(dateStr, df);
return true;
} catch (Exception e) {
return false;
}
}
/**
* 数値文字列が1桁の場合、頭に0を付けて返す.
* @param intNum 数値文字列
* @return 変換後数値文字列
*/
public static String addZero(String intNum) {
if (StringUtils.isEmpty(intNum)) {
return intNum;
}
if (intNum.length() == 1) {
return "0" + intNum;
}
return intNum;
}
/**
* 現在日時をyyyyMMddHHmmss形式で返す.
* @return 現在日時
*/
public static String getNowDateTime() {
LocalDateTime localDateTime = LocalDateTime.now();
DateTimeFormatter datetimeformatter
= DateTimeFormatter.ofPattern("yyyyMMddHHmmss");
return datetimeformatter.format(localDateTime);
}
}その他のソースコード内容は、以下のサイトを参照のこと。また、「extensions.csproj」は、「mvn package」コマンドを実行後に作成されるファイルである。
https://github.com/purin-it/azure/tree/master/event-grid-trigger-batch-csv-to-db/demoAzureFunc
Azure環境へのデプロイ手順は、以下のサイトの「サンプルプログラムの実行結果(ローカル)」「サンプルプログラムの実行結果(Azure上)」に記載されている通りである。
ただし、mvn packageコマンドの実行結果(途中省略)は、以下のようになる。


また、Azure環境にデプロイ後、Azure Portal上で関数を確認すると、以下のように、トリガーがEventGridトリガーになっていることが確認できる。






