EventGridTrigger/SpringBatch

Azure Functions上でSpring Batchを利用しているプログラムの処理単位(Tasklet)間でデータ受け渡しをしてみた

Azure Functions上でSpring Batchを利用しているプログラムの処理単位(Tasklet)は、StepExecutionListenerインタフェースのafterStepメソッドやbeforeStepメソッドを利用することで、処理単位(Tasklet)間でデータの受け渡しを行うことができる。

今回は、処理単位(Tasklet)間でデータの受け渡しを行ってみたので、そのサンプルプログラムを共有する。

前提条件

下記記事のサンプルプログラムを作成済であること。

Azure Functions上でSpring Batchを利用しているプログラムで処理単位(Tasklet)を複数つないでみたこれまでこのブログでは、1つの処理単位(Tasklet)のみを扱ってきたが、複数の処理単位を順次実行することもできる。 今回は、E...

また、以下の記事のPostmanをインストール済であること。

POSTメソッドでリクエストされるAzure FunctionsのAPIをPostmanによって呼び出してみたAzure App Serviceを利用せずに、HTTPトリガーによって呼び出されるAzure Functionsの動作確認を行うには、...



作成したサンプルプログラムの修正

前提条件の記事のサンプルプログラムに、処理単位(Tasklet)間でデータの受け渡し処理を追加する。なお、下記の赤枠は、前提条件のプログラムから追加・変更したプログラムである。
サンプルプログラムの構成

新設したユーティリティクラスの内容は以下の通りで、ジョブパラメータからEventGridTriggerParamオブジェクトを生成している。

package com.example.batch;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.StepExecution;

import com.example.model.EventGridTriggerParam;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;

public class DemoTaskletUtil {

    /* Spring Bootでログ出力するためのLogbackのクラスを生成 */
    private static final Logger LOGGER 
        = LoggerFactory.getLogger(DemoTaskletUtil.class);

    /**
     * Spring Batchのジョブで渡されたパラメータから、
     * EventGridTriggerParamオブジェクトを生成する.
     * @param stepExecution ジョブステップ実行定義
     * @return EventGridTriggerParamオブジェクト
     */
    public static EventGridTriggerParam getEventGridTriggerParam(
       StepExecution stepExecution) {
       // Spring Batchのジョブで渡されたパラメータを取得し、
       // EventGridTriggerParamオブジェクトを生成する
       String paramStr = stepExecution.getJobParameters()
                             .getString("eventGridTriggerParam");
       EventGridTriggerParam param = null;
       if (paramStr != null) {
           try {
               param = new ObjectMapper().readValue(paramStr
                       , new TypeReference<EventGridTriggerParam>() { });
           } catch (Exception ex) {
               LOGGER.error(ex.getMessage());
               throw new RuntimeException(ex);
           }
       }
       return param;
    }
}
「HD Video Converter Factory Pro」は動画の形式変換や編集・録画等を行える便利ツールだった動画の形式変換や編集・録画等を行える便利ツールの一つに、「HD Video Converter Factory Pro」があります。ここ...

DemoTasklet.javaの内容は以下の通りで、StepExecutionListenerインタフェースを実装し、afterStepメソッド内で次のTaskletにデータ(inputFileName)を渡すようにしている。

package com.example.batch;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.example.model.EventGridTriggerParam;
import com.example.service.DemoBatchService;

import org.springframework.batch.core.step.tasklet.Tasklet;

@Component
public class DemoTasklet implements Tasklet, StepExecutionListener {

    /* Spring Bootでログ出力するためのLogbackのクラスを生成 */
    private static final Logger LOGGER 
        = LoggerFactory.getLogger(DemoTasklet.class);

    /**
     * BlobStorageからファイル(user_data.csv)を読み込み、
     * USER_DATAテーブルに書き込むサービス
     */
    @Autowired
    private DemoBatchService demoBatchService;

    /**
     * Spring Batchのジョブ実行前の処理を定義する.
     */
    @Override
    public void beforeStep(StepExecution stepExecution) {
       // Spring Batchのジョブで渡されたパラメータを取得し、
       // 処理が呼び出されたことをログ出力する
       EventGridTriggerParam param 
           = DemoTaskletUtil.getEventGridTriggerParam(stepExecution);
       LOGGER.info("DemoTasklet beforeStep " + " triggered: " + param.getTimerInfo());
    }

    /**
     * Spring Batchのジョブ内での処理を定義する.
     */
    @Override
    public RepeatStatus execute(StepContribution contribution
            , ChunkContext chunkContext) throws Exception {
       // Spring Batchのジョブで渡されたパラメータを取得し、
       // 処理が呼び出されたことをログ出力する
       EventGridTriggerParam param = DemoTaskletUtil.getEventGridTriggerParam(
            chunkContext.getStepContext().getStepExecution());
       LOGGER.info("DemoTasklet execute " + " triggered: " + param.getTimerInfo());

       // BlobStorageから指定したファイルを読み込み、
       // USER_DATAテーブルに書き込むサービスを呼び出す
       demoBatchService.readUserData(param.getFileName());

       // ジョブが終了したことを返す
       return RepeatStatus.FINISHED;
    }

    /**
     * Spring Batchのジョブ実行後の処理を定義する.
     */
    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
       // Spring Batchのジョブで渡されたパラメータを取得し、
       // 処理が呼び出されたことをログ出力する
       EventGridTriggerParam param 
            = DemoTaskletUtil.getEventGridTriggerParam(stepExecution);

       // Spring Batchのジョブ内での処理が呼び出されたことをログ出力する
       LOGGER.info("DemoTasklet afterStep " + " triggered: " + param.getTimerInfo());

       // ファイル名を次のタスクレットに渡し、実行後ジョブが終了したことを返す
       stepExecution.getJobExecution().getExecutionContext()
            .put("inputFileName", param.getFileName());
       return ExitStatus.COMPLETED;
    }

}
「AOMEI Partition Assistant Standard(無料)版」は便利なパーティション管理ツールだったハードディスクの記憶領域を論理的に分割し、分割された個々の領域のことを、パーティションといいます。 例えば、以下の図の場合、C/D...

DemoTaskletLog.javaの内容は以下の通りで、StepExecutionListenerインタフェースを実装し、beforeStepメソッド内で前のTaskletからのデータ(inputFileName)を受け取るようにしている。

package com.example.batch;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.stereotype.Component;

import com.example.model.EventGridTriggerParam;

@Component
public class DemoTaskletLog implements Tasklet, StepExecutionListener {

    /* Spring Bootでログ出力するためのLogbackのクラスを生成 */
    private static final Logger LOGGER = LoggerFactory.getLogger(DemoTaskletLog.class);

    /**
     * Spring Batchのジョブ実行前の処理を定義する.
     */
    @Override
    public void beforeStep(StepExecution stepExecution) {
       // Spring Batchのジョブで渡されたパラメータを取得し、
       // 処理が呼び出されたことをログ出力する
       EventGridTriggerParam param 
            = DemoTaskletUtil.getEventGridTriggerParam(stepExecution);
       LOGGER.info("DemoTaskletLog beforeStep " + " triggered: " + param.getTimerInfo());

       // タスクレットに渡されたファイル名を取得する
       String inputFileName = (String) stepExecution.getJobExecution()
            .getExecutionContext().get("inputFileName");
       LOGGER.info("inputFileName: " + inputFileName);
    }

    /**
     * Spring Batchのジョブ内でのログ出力処理を定義する.
     */
    @Override
    public RepeatStatus execute(StepContribution contribution
            , ChunkContext chunkContext) throws Exception {
       // Spring Batchのジョブで渡されたパラメータを取得し、
       // 処理が呼び出されたことをログ出力する
       EventGridTriggerParam param = DemoTaskletUtil
          .getEventGridTriggerParam(chunkContext.getStepContext().getStepExecution());
       LOGGER.info("DemoTaskletLog execute " + " triggered: " + param.getTimerInfo());

       // ジョブが終了したことを返す
       return RepeatStatus.FINISHED;
    }

    /**
     * Spring Batchのジョブ実行後の処理を定義する.
     */
    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
       // Spring Batchのジョブで渡されたパラメータを取得し、
       // 処理が呼び出されたことをログ出力する
       EventGridTriggerParam param 
            = DemoTaskletUtil.getEventGridTriggerParam(stepExecution);
       LOGGER.info("DemoTaskletLog afterStep " + " triggered: " + param.getTimerInfo());

       // 実行後ジョブが終了したことを返す
       return ExitStatus.COMPLETED;
    }

}

その他のソースコード内容は、以下のサイトを参照のこと。
https://github.com/purin-it/azure/tree/master/event-grid-trigger-tasklet-put-data/demoAzureFunc

サンプルプログラムの実行結果

サンプルプログラムの実行結果は、以下の通り。

1) 以下の記事の「EventGridのローカル環境での実行」に記載した通りに実行する。

EventGridTriggerを利用したプログラムをローカル環境で動かしてみた前回は、EventGridTriggerを利用して、BlobにCSVファイルが作成されたタイミングで、そのCSVファイルをDBに書き込む...

実行した結果のログは以下の通りで、DemoTaskletLogクラスのinputFileNameに、取り込んだCSVファイル名「user_data.csv」が設定されていることが確認できる。
サンプルプログラムの実行結果_1

2) それぞれ500件のデータを含む取込ファイル「user_data_1.csv」「user_data_2.csv」を用意し、1)と同じ方法で、2ファイルを並行して(2スレッドが起動した状態で)取り込んだ結果のログは以下の通りで、DemoTaskletLogクラスのinputFileNameに、取り込んだCSVファイル名「user_data_1.csv」「user_data_2.csv」がそれぞれ設定されていることが確認できる。
サンプルプログラムの実行結果_2

なお、500件のデータは、以下の記事のプログラムを利用して作成している。

Javaで大量のデータをCSVファイルに出力できるようにしてみた大量のデータをCSVファイル等に出力する際は、出力データ作成用のプログラムを作成しておくと便利である。今回は、以下のUSER_DATAテ...

要点まとめ

  • Azure Functions上でSpring Batchを利用しているプログラムの処理単位(Tasklet)は、StepExecutionListenerインタフェースのafterStepメソッドやbeforeStepメソッドを利用することで、処理単位(Tasklet)間でデータの受け渡しを行うことができる。