EventGridTrigger/SpringBatch

Spring Batchを利用しているプログラムの処理単位(Tasklet)を@StepScopeアノテーションで並行に実行してみた

Azure Functions上でSpring Batchを利用しているプログラムの処理単位(Tasklet)内のインスタンス変数は、そのままだと、並列に実行された場合に上書きされてしまうが、プログラムの処理単位(Tasklet)に@StepScopeアノテーションを付与することで、並列に実行され複数スレッドが動いた場合に、インスタンス変数の値をスレッド毎にもたせることができる。

@StepScopeアノテーションについては、以下のサイトを参照のこと。
https://qiita.com/niwasawa/items/d809e3ce3daf30666017

今回は、処理単位(Tasklet)間を並列に実行した場合のインスタンス変数の値を確認してみたので、そのサンプルプログラムを共有する。

前提条件

下記記事のサンプルプログラムを作成済であること。

Azure Functions上でSpring Batchを利用しているプログラムの処理単位(Tasklet)間でデータ受け渡しをしてみたAzure Functions上でSpring Batchを利用しているプログラムの処理単位(Tasklet)は、StepExecuti...

また、以下の記事のPostmanをインストール済であること。

POSTメソッドでリクエストされるAzure FunctionsのAPIをPostmanによって呼び出してみたAzure App Serviceを利用せずに、HTTPトリガーによって呼び出されるAzure Functionsの動作確認を行うには、...

作成したサンプルプログラムの修正

前提条件の記事のサンプルプログラムに、処理単位(Tasklet)間を並列に実行した場合のインスタンス変数の値を確認する処理を追加する。
サンプルプログラムの構成
なお、上記の赤枠は、前提条件のプログラムから追加・変更したプログラムである。

DemoTasklet.javaの内容は以下の通りで、@StepScopeアノテーションをクラスに付与し、次のTaskletに渡すためのファイル名をインスタンス変数の値に設定するように修正している。

package com.example.batch;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.example.model.EventGridTriggerParam;
import com.example.service.DemoBatchService;

import org.springframework.batch.core.step.tasklet.Tasklet;

@Component
@StepScope
public class DemoTasklet implements Tasklet, StepExecutionListener {

    /* Spring Bootでログ出力するためのLogbackのクラスを生成 */
    private static final Logger LOGGER 
        = LoggerFactory.getLogger(DemoTasklet.class);

    /**
     * BlobStorageからファイル(user_data.csv)を読み込み、
     * USER_DATAテーブルに書き込むサービス
     */
    @Autowired
    private DemoBatchService demoBatchService;
    
    // 次のタスクレットに渡すためのファイル名(インスタンス変数)
    private String fileNameLocal = null;

    /**
     * Spring Batchのジョブ実行前の処理を定義する.
     */
    @Override
    public void beforeStep(StepExecution stepExecution) {
        // Spring Batchのジョブで渡されたパラメータを取得し、
        // 処理が呼び出されたことをログ出力する
        EventGridTriggerParam param 
            = DemoTaskletUtil.getEventGridTriggerParam(stepExecution);
        LOGGER.info("DemoTasklet beforeStep " + " triggered: " 
            + param.getTimerInfo());
        
        // 次のタスクレットに渡すためのファイル名(インスタンス変数)を設定
        fileNameLocal = param.getFileName();
        LOGGER.info("DemoTasklet beforeStep " + "fileNameLocal: " + fileNameLocal);
    }

    /**
     * Spring Batchのジョブ内での処理を定義する.
     */
    @Override
    public RepeatStatus execute(StepContribution contribution
             , ChunkContext chunkContext) throws Exception {
        // Spring Batchのジョブで渡されたパラメータを取得し、
        // 処理が呼び出されたことをログ出力する
        EventGridTriggerParam param = DemoTaskletUtil.getEventGridTriggerParam(
            chunkContext.getStepContext().getStepExecution());
        LOGGER.info("DemoTasklet execute " + " triggered: " 
            + param.getTimerInfo());

        // BlobStorageから指定したファイルを読み込み、
        // USER_DATAテーブルに書き込むサービスを呼び出す
        demoBatchService.readUserData(param.getFileName());

        // ジョブが終了したことを返す
        return RepeatStatus.FINISHED;
    }

    /**
     * Spring Batchのジョブ実行後の処理を定義する.
     */
    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        // Spring Batchのジョブで渡されたパラメータを取得し、
        // 処理が呼び出されたことをログ出力する
        EventGridTriggerParam param 
            = DemoTaskletUtil.getEventGridTriggerParam(stepExecution);

        // Spring Batchのジョブ内での処理が呼び出されたことをログ出力する
        LOGGER.info("DemoTasklet afterStep " + " triggered: " 
            + param.getTimerInfo());

        // ファイル名(インスタンス変数)を次のタスクレットに渡し、
        // 実行後ジョブが終了したことを返す
        LOGGER.info("DemoTasklet afterStep " + "fileNameLocal: " + fileNameLocal);
        stepExecution.getJobExecution().getExecutionContext()
            .put("inputFileName", fileNameLocal);
        return ExitStatus.COMPLETED;
    }

}

DemoTaskletLog.javaの内容は以下の通りで、DemoTaskletクラスと同様に、@StepScopeアノテーションをクラスに付与している。

package com.example.batch;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.stereotype.Component;

import com.example.model.EventGridTriggerParam;

@Component
@StepScope
public class DemoTaskletLog implements Tasklet, StepExecutionListener {

    /* Spring Bootでログ出力するためのLogbackのクラスを生成 */
    private static final Logger LOGGER 
        = LoggerFactory.getLogger(DemoTaskletLog.class);

    /**
     * Spring Batchのジョブ実行前の処理を定義する.
     */
    @Override
    public void beforeStep(StepExecution stepExecution) {
        // Spring Batchのジョブで渡されたパラメータを取得し、
        // 処理が呼び出されたことをログ出力する
        EventGridTriggerParam param 
            = DemoTaskletUtil.getEventGridTriggerParam(stepExecution);
        LOGGER.info("DemoTaskletLog beforeStep " + " triggered: " 
            + param.getTimerInfo());

        // タスクレットに渡されたファイル名を取得する
        String inputFileName = (String) stepExecution.getJobExecution()
            .getExecutionContext().get("inputFileName");
        LOGGER.info("inputFileName: " + inputFileName);
    }

    /**
     * Spring Batchのジョブ内でのログ出力処理を定義する.
     */
    @Override
    public RepeatStatus execute(StepContribution contribution
            , ChunkContext chunkContext) throws Exception {
        // Spring Batchのジョブで渡されたパラメータを取得し、
        // 処理が呼び出されたことをログ出力する
        EventGridTriggerParam param = DemoTaskletUtil.getEventGridTriggerParam(
            chunkContext.getStepContext().getStepExecution());
        LOGGER.info("DemoTaskletLog execute " + " triggered: " 
            + param.getTimerInfo());

        // ジョブが終了したことを返す
        return RepeatStatus.FINISHED;
    }

    /**
     * Spring Batchのジョブ実行後の処理を定義する.
     */
    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        // Spring Batchのジョブで渡されたパラメータを取得し、
        // 処理が呼び出されたことをログ出力する
        EventGridTriggerParam param 
            = DemoTaskletUtil.getEventGridTriggerParam(stepExecution);
        LOGGER.info("DemoTaskletLog afterStep " + " triggered: " 
            + param.getTimerInfo());

        // 実行後ジョブが終了したことを返す
        return ExitStatus.COMPLETED;
    }

}

その他のソースコード内容は、以下のサイトを参照のこと。
https://github.com/purin-it/azure/tree/master/event-grid-trigger-tasklet-parallel-stepscope/demoAzureFunc



「MiniTool Partition Wizard」はパーティション分割・統合・バックアップ・チェックを直感的に行える便利ツールだったハードディスクの記憶領域を論理的に分割し、分割された個々の領域のことを、パーティションといいます。 例えば、以下の図の場合、C/D...

サンプルプログラムの実行結果

サンプルプログラムの実行結果は、以下の通り。

1) サンプルプログラムは、以下の記事の「EventGridのローカル環境での実行」に記載した方法で実行する。

EventGridTriggerを利用したプログラムをローカル環境で動かしてみた前回は、EventGridTriggerを利用して、BlobにCSVファイルが作成されたタイミングで、そのCSVファイルをDBに書き込む...

2) それぞれ500件のデータを含む取込ファイル「user_data_1.csv」「user_data_2.csv」を用意し、1)に記載した方法で、2ファイルを並行して(2スレッドが起動した状態で)取り込んだ結果のログは以下の通りで、スレッド毎にインスタンス変数の値が設定されていることが確認できる。
サンプルプログラムの実行結果_2

なお、500件のデータは、以下の記事のプログラムを利用して作成している。

Javaで大量のデータをCSVファイルに出力できるようにしてみた大量のデータをCSVファイル等に出力する際は、出力データ作成用のプログラムを作成しておくと便利である。今回は、以下のUSER_DATAテ...

3) 以下のように、2つのTaskletクラスで、@StepScopeアノテーションをコメントアウトする。
サンプルプログラムの実行結果_3_1

サンプルプログラムの実行結果_3_2

4) 3)の状態で、それぞれ500件のデータを含む取込ファイル「user_data_1.csv」「user_data_2.csv」を用意し、1)に記載した方法で、2ファイルを並行して(2スレッドが起動した状態で)取り込んだ結果のログは以下の通りで、後から実行したスレッド(pool-2-thread-2)によってインスタンス変数の値が上書きされてしまうことが確認できる。
サンプルプログラムの実行結果_4

要点まとめ

  • Azure Functions上でSpring Batchを利用しているプログラムの処理単位(Tasklet)内のインスタンス変数は、そのままだと、並列に実行された場合に上書きされてしまうが、Taskletに@StepScopeアノテーションを付与することで、並列に実行され複数スレッドが動いた場合に、インスタンス変数の値をスレッド毎にもたせることができる。