EventGridTrigger/SpringBatch

Azure Functions上でSpring Batchを利用しているプログラムをEventGridTriggerによって動かしてみた(ソースコード編)

これまでこのブログでは、TimerTriggerを利用してBlob上のCSVファイルをDBに書き込む処理を作成していたが、EventGridTriggerを利用して、BlobにCSVファイルが作成されたタイミングで、そのCSVファイルをDBに書き込むこともできる。

今回は、EventGridTriggerを利用して、BlobにCSVファイルが作成されたタイミングで、そのCSVファイルをDBに書き込む処理を作成してみたので、そのサンプルプログラムを共有する。

なお、今回の記事は長くなるため、サンプルプログラムの内容とAzure上へのデプロイまでのみ記載し、Azure Portal上でのAzure Eventの設定とサンプルプログラムの実行結果の内容は、次回の記事で記載する。

前提条件

下記記事のサンプルプログラムを作成済であること。なお、読み込むCSVファイルの文字コードはUTF-8とする。

TimerTriggerによって動作するAzure Function上でCSVファイルの内容をDBに書き込んでみたTimer Triggerによって、一定時間が来たタイミングでAzure Functionsが動作するアプリケーションを生成し、そのバッ...

また、以下のように、サブスクリプションのEvent Grid リソース プロバイダー(Microsoft.EventGrid)が、有効になっていること。
前提条件



サンプルプログラムの作成

前提条件の記事のサンプルプログラムを、EventGridTriggerによって起動するように修正する。なお、下記の赤枠は、前提条件のプログラムから大きく変更したり、追加したりしたプログラムである。
サンプルプログラムの構成

EventGridのハンドラークラスの内容は以下の通りで、EventGridTriggerアノテーションを付与したクラスで、EventGridTriggerのイベント情報を取得している。

package com.example;

import java.time.LocalDateTime;

import org.springframework.cloud.function.adapter.azure.FunctionInvoker;

import com.example.model.EventGridTriggerParam;
import com.example.model.EventGridTriggerResult;
import com.example.model.EventSchema;
import com.microsoft.azure.functions.ExecutionContext;
import com.microsoft.azure.functions.annotation.EventGridTrigger;
import com.microsoft.azure.functions.annotation.FunctionName;

public class EventGridTriggerTestHandler 
    extends FunctionInvoker<EventGridTriggerParam, EventGridTriggerResult> {

    /**
     * EventGridTriggerによって、DemoAzureFunctionクラスの
     * eventGridTriggerTestメソッドを呼び出す.
     * @param event   EventGridTriggerイベント情報
     * @param context コンテキストオブジェクト
     */
    @FunctionName("eventGridTriggerTest")
    public void eventGridTriggerTest(
            @EventGridTrigger(name = "event") EventSchema event,
            final ExecutionContext context) {
        context.getLogger().info(
            "EventGridTriggerTestHandler eventGridTriggerTest triggered: " 
            + event.eventTime);

        // EventGridTriggerイベント情報から、Blob Storageの読み込むファイル名を取得し、
        // EventGridTriggerParamに設定する
        EventGridTriggerParam param = new EventGridTriggerParam();
        String subject = event.subject;
        param.setFileName(subject.substring(subject.lastIndexOf("/") + 1));
        param.setTimerInfo(LocalDateTime.now().toString());

        handleRequest(param, context);
    }
}

また、EventGridTriggerのイベント情報は、以下のクラスで定義している。

package com.example.model;

import java.util.Date;
import java.util.Map;

public class EventSchema {

    /** トピック */
    public String topic;

    /** サブジェクト */
    public String subject;

    /** イベントタイプ */
    public String eventType;

    /** イベント発生日時 */
    public Date eventTime;

    /** ID */
    public String id;

    /** データのバージョン */
    public String dataVersion;

    /** メタデータのバージョン */
    public String metadataVersion;

    /** データ */
    public Map<String, Object> data;

}

なお、ここまでの処理は、以下のサイトを参考にして実装している。
https://docs.microsoft.com/ja-jp/azure/azure-functions/functions-bindings-event-grid-trigger?tabs=java%2Cbash



さらに、EventGridTriggerのサービスクラス、Paramクラス、Resultクラスの内容は以下の通り。

package com.example.service;

import java.util.HashMap;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameter;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import com.example.model.EventGridTriggerParam;
import com.example.model.EventGridTriggerResult;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

@Service
public class EventGridTriggerService {

    /* Spring Bootでログ出力するためのLogbackのクラスを生成 */
    private static final Logger LOGGER 
        = LoggerFactory.getLogger(EventGridTriggerService.class);

    /** Spring Batchのジョブを起動するクラス */
    @Autowired
    JobLauncher jobLauncher;

    /** Spring Batchのジョブを定義するクラス */
    @Autowired
    Job job;

    /**
     * イベントグリッドトリガーのテストを行うサービス.
     * @param param EventGridTrigger呼出用Param
     * @return イベントグリッドトリガーのテストを行うサービスクラスの呼出結果
     */
    public EventGridTriggerResult eventGridTriggerTest(EventGridTriggerParam param) {
        // イベントグリッドトリガーのテストを行うサービスが呼び出されたことをログ出力する
        LOGGER.info("EventGridTriggerService eventGridTriggerTest triggered: " 
            + param.getTimerInfo());

        // Spring Batchのジョブを起動する
        // ジョブ起動時に生成したパラメータを指定する
        try {
            jobLauncher.run(job, createInitialJobParameterMap(param));
        } catch (Exception e) {
            throw new RuntimeException(e);
        }

        // イベントグリッドトリガーのテストを行うサービスクラスの呼出結果を返却する
        EventGridTriggerResult result = new EventGridTriggerResult();
        result.setResult("success");
        return result;
    }

    /**
     * ジョブを起動するためのパラメータを生成する.
     * @param EventGridTriggerParamオブジェクト
     * @return ジョブを起動するためのパラメータ
     * @throws JsonProcessingException
     */
    private JobParameters createInitialJobParameterMap(EventGridTriggerParam param) 
            throws JsonProcessingException {
        Map<String, JobParameter> m = new HashMap<>();
        m.put("time", new JobParameter(System.currentTimeMillis()));
        m.put("eventGridTriggerParam"
            , new JobParameter(new ObjectMapper().writeValueAsString(param)));

        JobParameters p = new JobParameters(m);
        return p;
    }
}
package com.example.model;

import lombok.Data;

@Data
public class EventGridTriggerParam {

    /** 読み込むBlobストレージのファイル名 */
    private String fileName;
    
    /** Timer情報 */
    private String timerInfo;
    
}
package com.example.model;

import lombok.Data;

@Data
public class EventGridTriggerResult {

    /** 結果情報 */
    private String result;
    
}
サラリーマン型フリーランスSEという働き方でお金の不安を解消しよう先日、「サラリーマン型フリーランスSE」という働き方を紹介するYouTube動画を視聴しましたので、その内容をご紹介します。 「サ...

また、Azure Functionsのメインクラスの内容は以下の通りで、EventGridTriggerのサービスクラスのファンクション定義を追加している。

package com.example;

import java.util.function.Function;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

import com.example.model.EventGridTriggerParam;
import com.example.model.EventGridTriggerResult;
import com.example.service.EventGridTriggerService;

@SpringBootApplication
public class DemoAzureFunction {
    
    /** イベントグリッドトリガーのテストを行うサービスクラスのオブジェクト */
    @Autowired
    private EventGridTriggerService eventGridTriggerService;

    public static void main(String[] args) throws Exception {
        SpringApplication.run(DemoAzureFunction.class, args);
    }
    
    /**
     * イベントグリッドトリガーのテストを行い結果を返却する関数
     * @return イベントグリッドトリガーのテストを行うサービスクラスの呼出結果
     */
    @Bean
    public Function<EventGridTriggerParam, EventGridTriggerResult> 
            eventGridTriggerTest(){
        return eventGridTriggerParam 
            -> eventGridTriggerService.eventGridTriggerTest(eventGridTriggerParam);
    }

}



さらに、EventGridTriggerのサービスクラスから呼び出されるバッチ処理の内容は以下の通りで、CSVファイルをDBに書き込む処理を実施した後は、読み込んだCSVファイル名の末尾に「_yyyymmddhhmmss」を追加する処理を追加している。

package com.example.service;

import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URISyntaxException;
import java.security.InvalidKeyException;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import com.example.mybatis.UserDataMapper;
import com.example.mybatis.model.UserData;
import com.example.util.DemoStringUtil;
import com.microsoft.azure.storage.CloudStorageAccount;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.CloudBlobClient;
import com.microsoft.azure.storage.blob.CloudBlobContainer;
import com.microsoft.azure.storage.blob.CloudBlockBlob;
import com.microsoft.azure.storage.blob.CopyStatus;

@Service
public class DemoBatchService {

    /* Spring Bootでログ出力するためのLogbackのクラスを生成 */
    private static final Logger LOGGER 
        = LoggerFactory.getLogger(DemoBatchService.class);

    /** Azure Storageのアカウント名 */
    @Value("${azure.storage.accountName}")
    private String storageAccountName;

    /** Azure Storageへのアクセスキー */
    @Value("${azure.storage.accessKey}")
    private String storageAccessKey;

    /** Azure StorageのBlobコンテナー名 */
    @Value("${azure.storage.containerName}")
    private String storageContainerName;

    /** USER_DATAテーブルにアクセスするマッパー */
    @Autowired
    private UserDataMapper userDataMapper;

    /**
     * BlobStorageから引数で指定したファイルを読み込み、USER_DATAテーブルに書き込む
     * @param fileName ファイル名
     */
    @Transactional
    public void readUserData(String fileName) {
        // ファイルがCSVファイル以外か、ファイル名の末尾が「_yyyymmddhhmmss.csv」
        // の場合は、何もせず処理を終了する
        if (!fileName.matches(".*\\.csv") 
               || fileName.matches(".*_[0-9]{14}\\.csv")) {
            return;
        }

        // BlobStorageから引数で指定したファイルを読み込む
        try (BufferedReader br = new BufferedReader(
             new InputStreamReader(getBlobCsvData(fileName), "UTF-8"))) {
            String lineStr = null;
            int lineCnt = 0;

            // 1行目(タイトル行)は読み飛ばし、2行目以降はチェックの上、
            // USER_DATAテーブルに書き込む
            // チェックエラー時はエラーログを出力の上、DB更新は行わず先へ進む
            while ((lineStr = br.readLine()) != null) {
                // 1行目(タイトル行)は読み飛ばす
                lineCnt++;
                if (lineCnt == 1) {
                    continue;
                }

                // 引数のCSVファイル1行分の文字列を受け取り、エラーがあればNULLを、
                // エラーがなければUserDataオブジェクトに変換し返す
                UserData userData = checkData(lineStr, lineCnt);

                // 読み込んだファイルをUSER_DATAテーブルに書き込む
                if (userData != null) {
                    userDataMapper.upsert(userData);
                }
            }

        } catch (Exception ex) {
            LOGGER.error(ex.getMessage());
            throw new RuntimeException(ex);

        } finally {
            try {
                // 読み込んだファイル名の末尾に「_yyyymmddhhmmss」を追加する
                renameBlobCsvData(fileName);
            } catch (Exception ex) {
                LOGGER.error(ex.getMessage());
                throw new RuntimeException(ex);
            }
        }
    }

    /**
     * Blobストレージから指定したファイルデータを取得する.
     * @param fileName ファイル名
     * @return 指定したファイルデータの入力ストリーム
     * @throws URISyntaxException
     * @throws InvalidKeyException
     * @throws StorageException
     */
    private InputStream getBlobCsvData(String fileName)
            throws URISyntaxException, InvalidKeyException, StorageException {
        // Blob内のコンテナーを取得
        CloudBlobContainer container = getBlobContainer();

        // BlobStorageから指定したファイルを読み込む
        CloudBlockBlob blob = container.getBlockBlobReference(fileName);
        return blob.openInputStream();
    }

    /**
     * Blobストレージの指定したファイルをリネームする.
     * @param fileName ファイル名
     * @throws URISyntaxException
     * @throws InvalidKeyException
     * @throws StorageException
     * @throws InterruptedException
     */
    private void renameBlobCsvData(String fileName)
            throws URISyntaxException, InvalidKeyException
                 , StorageException, InterruptedException {
        // Blob内のコンテナーを取得
        CloudBlobContainer container = getBlobContainer();

        // 指定したファイルを、末尾に「_yyyymmddhhmmss」を追加したファイル名のBlobにコピー
        CloudBlockBlob blob = container.getBlockBlobReference(fileName);
        CloudBlockBlob blobCopy 
            = container.getBlockBlobReference(getNewFileName(fileName));
        blobCopy.startCopy(blob.getUri());

        // コピーが終わるまで待機
        while (true) {
            if (blobCopy.getCopyState().getStatus() != CopyStatus.PENDING) {
                break;
            }
            Thread.sleep(1000);
        }

        // 指定したファイルを削除
        blob.delete();
    }

    /**
     * Blob内のコンテナーを取得し返す.
     * @return Blob内のコンテナー
     * @throws URISyntaxException
     * @throws InvalidKeyException
     * @throws StorageException
     */
    private CloudBlobContainer getBlobContainer() 
           throws URISyntaxException, InvalidKeyException, StorageException {
        // Blobストレージへの接続文字列
        String storageConnectionString = "DefaultEndpointsProtocol=https;" 
                + "AccountName=" + storageAccountName + ";"
                + "AccountKey=" + storageAccessKey + ";";

        // ストレージアカウントオブジェクトを取得
        CloudStorageAccount storageAccount 
            = CloudStorageAccount.parse(storageConnectionString);

        // Blobクライアントオブジェクトを取得
        CloudBlobClient blobClient = storageAccount.createCloudBlobClient();

        // Blob内のコンテナーを取得し返却
        return blobClient.getContainerReference(storageContainerName);
    }

    /**
     * 指定したファイルの末尾に「_yyyymmddhhmmss」を追加したファイル名を返す.
     * @param fileName ファイル名
     * @return リネーム後のファイル名
     */
    private String getNewFileName(String fileName) {
        StringBuilder sbNewFile = new StringBuilder();
        sbNewFile.append(fileName.replace(".csv", ""));
        sbNewFile.append("_");
        sbNewFile.append(DemoStringUtil.getNowDateTime());
        sbNewFile.append(".csv");
        return sbNewFile.toString();
    }

    /**
     * 引数のCSVファイル1行分の文字列を受け取り、エラーがあればNULLを、
     * エラーがなければUserDataオブジェクトに変換し返す.
     * @param lineStr CSVファイル1行分の文字列
     * @param lineCnt 行数
     * @return 変換後のUserData
     */
    private UserData checkData(String lineStr, int lineCnt) {
        // 引数のCSVファイル1行分の文字列をカンマで分割
        String[] strArray = lineStr.split(",");

        // 桁数不正の場合はエラー
        if (strArray == null || strArray.length != 7) {
            LOGGER.info(lineCnt + "行目: 桁数が不正です。");
            return null;
        }

        // 文字列前後のダブルクォーテーションを削除する
        for (int i = 0; i < strArray.length; i++) {
            strArray[i] = DemoStringUtil.trimDoubleQuot(strArray[i]);
        }

        // 1列目が空またはNULLの場合はエラー
        if (StringUtils.isEmpty(strArray[0])) {
            LOGGER.info(lineCnt + "行目: 1列目が空またはNULLです。");
            return null;
        }
        // 1列目が数値以外の場合はエラー
        if (!StringUtils.isNumeric(strArray[0])) {
            LOGGER.info(lineCnt + "行目: 1列目が数値以外です。");
            return null;
        }
        // 1列目の桁数が不正な場合はエラー
        if (strArray[0].length() > 6) {
            LOGGER.info(lineCnt + "行目: 1列目の桁数が不正です。");
            return null;
        }

        // 2列目が空またはNULLの場合はエラー
        if (StringUtils.isEmpty(strArray[1])) {
            LOGGER.info(lineCnt + "行目: 2列目が空またはNULLです。");
            return null;
        }
        // 2列目の桁数が不正な場合はエラー
        if (strArray[1].length() > 40) {
            LOGGER.info(lineCnt + "行目: 2列目の桁数が不正です。");
            return null;
        }

        // 3列目が空またはNULLの場合はエラー
        if (StringUtils.isEmpty(strArray[2])) {
            LOGGER.info(lineCnt + "行目: 3列目が空またはNULLです。");
            return null;
        }
        // 3列目が数値以外の場合はエラー
        if (!StringUtils.isNumeric(strArray[2])) {
            LOGGER.info(lineCnt + "行目: 3列目が数値以外です。");
            return null;
        }
        // 3列目の桁数が不正な場合はエラー
        if (strArray[2].length() > 4) {
            LOGGER.info(lineCnt + "行目: 3列目の桁数が不正です。");
            return null;
        }

        // 4列目が空またはNULLの場合はエラー
        if (StringUtils.isEmpty(strArray[3])) {
            LOGGER.info(lineCnt + "行目: 4列目が空またはNULLです。");
            return null;
        }
        // 4列目が数値以外の場合はエラー
        if (!StringUtils.isNumeric(strArray[3])) {
            LOGGER.info(lineCnt + "行目: 4列目が数値以外です。");
            return null;
        }
        // 4列目の桁数が不正な場合はエラー
        if (strArray[3].length() > 2) {
            LOGGER.info(lineCnt + "行目: 4列目の桁数が不正です。");
            return null;
        }

        // 5列目が空またはNULLの場合はエラー
        if (StringUtils.isEmpty(strArray[4])) {
            LOGGER.info(lineCnt + "行目: 5列目が空またはNULLです。");
            return null;
        }
        // 5列目が数値以外の場合はエラー
        if (!StringUtils.isNumeric(strArray[4])) {
            LOGGER.info(lineCnt + "行目: 5列目が数値以外です。");
            return null;
        }
        // 5列目の桁数が不正な場合はエラー
        if (strArray[4].length() > 2) {
            LOGGER.info(lineCnt + "行目: 5列目の桁数が不正です。");
            return null;
        }

        // 3列目・4列目・5列目から生成される日付が不正であればエラー
        String birthDay = strArray[2] + DemoStringUtil.addZero(strArray[3]) 
            + DemoStringUtil.addZero(strArray[4]);
        if (!DemoStringUtil.isCorrectDate(birthDay, "uuuuMMdd")) {
            LOGGER.info(lineCnt + "行目: 3~5列目の日付が不正です。");
            return null;
        }

        // 6列目が1,2以外の場合はエラー
        if (!("1".equals(strArray[5])) && !("2".equals(strArray[5]))) {
            LOGGER.info(lineCnt + "行目: 6列目の性別が不正です。");
            return null;
        }

        // 7列目の桁数が不正な場合はエラー
        if (!StringUtils.isEmpty(strArray[6]) && strArray[6].length() > 1024) {
            LOGGER.info(lineCnt + "行目: 7列目の桁数が不正です。");
            return null;
        }

        // エラーがなければUserDataオブジェクトに変換し返す
        UserData userData = new UserData();
        userData.setId(Integer.parseInt(strArray[0]));
        userData.setName(strArray[1]);
        userData.setBirth_year(Integer.parseInt(strArray[2]));
        userData.setBirth_month(Integer.parseInt(strArray[3]));
        userData.setBirth_day(Integer.parseInt(strArray[4]));
        userData.setSex(strArray[5]);
        userData.setMemo(strArray[6]);
        return userData;
    }
}
「EaseUS Todo Backup」は様々な形でバックアップ取得が行える便利ツールだったパソコン内のデータを、ファイル/パーティション/ディスク等の様々な単位でバックアップしたり、バックアップのスケジュール設定や暗号化設定も...

その他、ユーティリティクラスの内容は以下の通りで、getNowDateTimeメソッドを追加している。

package com.example.util;

import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.ResolverStyle;

import org.apache.commons.lang3.StringUtils;

public class DemoStringUtil {

    /**
     * 文字列前後のダブルクォーテーションを削除する.
     * @param str 変換前文字列
     * @return 変換後文字列
     */
    public static String trimDoubleQuot(String regStr) {
        if (StringUtils.isEmpty(regStr)) {
            return regStr;
        }
        char c = '"';
        if (regStr.charAt(0) == c && regStr.charAt(regStr.length() - 1) == c) {
            return regStr.substring(1, regStr.length() - 1);
        } else {
            return regStr;
        }
    }

    /**
     * DateTimeFormatterを利用して日付チェックを行う.
     * @param dateStr    チェック対象文字列
     * @param dateFormat 日付フォーマット
     * @return 日付チェック結果
     */
    public static boolean isCorrectDate(String dateStr, String dateFormat) {
        if (StringUtils.isEmpty(dateStr) || StringUtils.isEmpty(dateFormat)) {
            return false;
        }
        // 日付と時刻を厳密に解決するスタイルで、DateTimeFormatterオブジェクトを作成
        DateTimeFormatter df = DateTimeFormatter.ofPattern(dateFormat)
                               .withResolverStyle(ResolverStyle.STRICT);
        try {
            // チェック対象文字列をLocalDate型の日付に変換できれば、チェックOKとする
            LocalDate.parse(dateStr, df);
            return true;
        } catch (Exception e) {
            return false;
        }
    }

    /**
     * 数値文字列が1桁の場合、頭に0を付けて返す.
     * @param intNum 数値文字列
     * @return 変換後数値文字列
     */
    public static String addZero(String intNum) {
        if (StringUtils.isEmpty(intNum)) {
            return intNum;
        }
        if (intNum.length() == 1) {
            return "0" + intNum;
        }
        return intNum;
    }
    
    /**
     * 現在日時をyyyyMMddHHmmss形式で返す.
     * @return 現在日時
     */
    public static String getNowDateTime() {
        LocalDateTime localDateTime = LocalDateTime.now();
        DateTimeFormatter datetimeformatter 
            = DateTimeFormatter.ofPattern("yyyyMMddHHmmss");
        return datetimeformatter.format(localDateTime);
    }
}

その他のソースコード内容は、以下のサイトを参照のこと。また、「extensions.csproj」は、「mvn package」コマンドを実行後に作成されるファイルである。
https://github.com/purin-it/azure/tree/master/event-grid-trigger-batch-csv-to-db/demoAzureFunc



Azure環境へのデプロイ手順は、以下のサイトの「サンプルプログラムの実行結果(ローカル)」「サンプルプログラムの実行結果(Azure上)」に記載されている通りである。

Azure Functions上でTimerTriggerによって動作するJavaアプリケーション(Spring Boot上)を作成してみたこれまでは、HTTPリクエストによりAzure Functionsが動作するアプリケーションのみ作成してきたが、Timer Trigge...

ただし、mvn packageコマンドの実行結果(途中省略)は、以下のようになる。
mvn_package_実行結果1

mvn_package_実行結果2

また、Azure環境にデプロイ後、Azure Portal上で関数を確認すると、以下のように、トリガーがEventGridトリガーになっていることが確認できる。
Azure_Functionsデプロイ後