Azure Functions上でSpring Batchを利用しているプログラムの処理単位(Tasklet)内のインスタンス変数は、そのままだと、並列に実行された場合に上書きされてしまうが、プログラムの処理単位(Tasklet)に@StepScopeアノテーションを付与することで、並列に実行され複数スレッドが動いた場合に、インスタンス変数の値をスレッド毎にもたせることができる。
@StepScopeアノテーションについては、以下のサイトを参照のこと。
https://qiita.com/niwasawa/items/d809e3ce3daf30666017
今回は、処理単位(Tasklet)間を並列に実行した場合のインスタンス変数の値を確認してみたので、そのサンプルプログラムを共有する。
前提条件
下記記事のサンプルプログラムを作成済であること。
また、以下の記事のPostmanをインストール済であること。
作成したサンプルプログラムの修正
前提条件の記事のサンプルプログラムに、処理単位(Tasklet)間を並列に実行した場合のインスタンス変数の値を確認する処理を追加する。
なお、上記の赤枠は、前提条件のプログラムから追加・変更したプログラムである。
DemoTasklet.javaの内容は以下の通りで、@StepScopeアノテーションをクラスに付与し、次のTaskletに渡すためのファイル名をインスタンス変数の値に設定するように修正している。
package com.example.batch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.core.ExitStatus; import org.springframework.batch.core.StepContribution; import org.springframework.batch.core.StepExecution; import org.springframework.batch.core.StepExecutionListener; import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.batch.core.scope.context.ChunkContext; import org.springframework.batch.repeat.RepeatStatus; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import com.example.model.EventGridTriggerParam; import com.example.service.DemoBatchService; import org.springframework.batch.core.step.tasklet.Tasklet; @Component @StepScope public class DemoTasklet implements Tasklet, StepExecutionListener { /* Spring Bootでログ出力するためのLogbackのクラスを生成 */ private static final Logger LOGGER = LoggerFactory.getLogger(DemoTasklet.class); /** * BlobStorageからファイル(user_data.csv)を読み込み、 * USER_DATAテーブルに書き込むサービス */ @Autowired private DemoBatchService demoBatchService; // 次のタスクレットに渡すためのファイル名(インスタンス変数) private String fileNameLocal = null; /** * Spring Batchのジョブ実行前の処理を定義する. */ @Override public void beforeStep(StepExecution stepExecution) { // Spring Batchのジョブで渡されたパラメータを取得し、 // 処理が呼び出されたことをログ出力する EventGridTriggerParam param = DemoTaskletUtil.getEventGridTriggerParam(stepExecution); LOGGER.info("DemoTasklet beforeStep " + " triggered: " + param.getTimerInfo()); // 次のタスクレットに渡すためのファイル名(インスタンス変数)を設定 fileNameLocal = param.getFileName(); LOGGER.info("DemoTasklet beforeStep " + "fileNameLocal: " + fileNameLocal); } /** * Spring Batchのジョブ内での処理を定義する. */ @Override public RepeatStatus execute(StepContribution contribution , ChunkContext chunkContext) throws Exception { // Spring Batchのジョブで渡されたパラメータを取得し、 // 処理が呼び出されたことをログ出力する EventGridTriggerParam param = DemoTaskletUtil.getEventGridTriggerParam( chunkContext.getStepContext().getStepExecution()); LOGGER.info("DemoTasklet execute " + " triggered: " + param.getTimerInfo()); // BlobStorageから指定したファイルを読み込み、 // USER_DATAテーブルに書き込むサービスを呼び出す demoBatchService.readUserData(param.getFileName()); // ジョブが終了したことを返す return RepeatStatus.FINISHED; } /** * Spring Batchのジョブ実行後の処理を定義する. */ @Override public ExitStatus afterStep(StepExecution stepExecution) { // Spring Batchのジョブで渡されたパラメータを取得し、 // 処理が呼び出されたことをログ出力する EventGridTriggerParam param = DemoTaskletUtil.getEventGridTriggerParam(stepExecution); // Spring Batchのジョブ内での処理が呼び出されたことをログ出力する LOGGER.info("DemoTasklet afterStep " + " triggered: " + param.getTimerInfo()); // ファイル名(インスタンス変数)を次のタスクレットに渡し、 // 実行後ジョブが終了したことを返す LOGGER.info("DemoTasklet afterStep " + "fileNameLocal: " + fileNameLocal); stepExecution.getJobExecution().getExecutionContext() .put("inputFileName", fileNameLocal); return ExitStatus.COMPLETED; } }
DemoTaskletLog.javaの内容は以下の通りで、DemoTaskletクラスと同様に、@StepScopeアノテーションをクラスに付与している。
package com.example.batch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.core.ExitStatus; import org.springframework.batch.core.StepContribution; import org.springframework.batch.core.StepExecution; import org.springframework.batch.core.StepExecutionListener; import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.batch.core.scope.context.ChunkContext; import org.springframework.batch.core.step.tasklet.Tasklet; import org.springframework.batch.repeat.RepeatStatus; import org.springframework.stereotype.Component; import com.example.model.EventGridTriggerParam; @Component @StepScope public class DemoTaskletLog implements Tasklet, StepExecutionListener { /* Spring Bootでログ出力するためのLogbackのクラスを生成 */ private static final Logger LOGGER = LoggerFactory.getLogger(DemoTaskletLog.class); /** * Spring Batchのジョブ実行前の処理を定義する. */ @Override public void beforeStep(StepExecution stepExecution) { // Spring Batchのジョブで渡されたパラメータを取得し、 // 処理が呼び出されたことをログ出力する EventGridTriggerParam param = DemoTaskletUtil.getEventGridTriggerParam(stepExecution); LOGGER.info("DemoTaskletLog beforeStep " + " triggered: " + param.getTimerInfo()); // タスクレットに渡されたファイル名を取得する String inputFileName = (String) stepExecution.getJobExecution() .getExecutionContext().get("inputFileName"); LOGGER.info("inputFileName: " + inputFileName); } /** * Spring Batchのジョブ内でのログ出力処理を定義する. */ @Override public RepeatStatus execute(StepContribution contribution , ChunkContext chunkContext) throws Exception { // Spring Batchのジョブで渡されたパラメータを取得し、 // 処理が呼び出されたことをログ出力する EventGridTriggerParam param = DemoTaskletUtil.getEventGridTriggerParam( chunkContext.getStepContext().getStepExecution()); LOGGER.info("DemoTaskletLog execute " + " triggered: " + param.getTimerInfo()); // ジョブが終了したことを返す return RepeatStatus.FINISHED; } /** * Spring Batchのジョブ実行後の処理を定義する. */ @Override public ExitStatus afterStep(StepExecution stepExecution) { // Spring Batchのジョブで渡されたパラメータを取得し、 // 処理が呼び出されたことをログ出力する EventGridTriggerParam param = DemoTaskletUtil.getEventGridTriggerParam(stepExecution); LOGGER.info("DemoTaskletLog afterStep " + " triggered: " + param.getTimerInfo()); // 実行後ジョブが終了したことを返す return ExitStatus.COMPLETED; } }
その他のソースコード内容は、以下のサイトを参照のこと。
https://github.com/purin-it/azure/tree/master/event-grid-trigger-tasklet-parallel-stepscope/demoAzureFunc
サンプルプログラムの実行結果
サンプルプログラムの実行結果は、以下の通り。
1) サンプルプログラムは、以下の記事の「EventGridのローカル環境での実行」に記載した方法で実行する。
2) それぞれ500件のデータを含む取込ファイル「user_data_1.csv」「user_data_2.csv」を用意し、1)に記載した方法で、2ファイルを並行して(2スレッドが起動した状態で)取り込んだ結果のログは以下の通りで、スレッド毎にインスタンス変数の値が設定されていることが確認できる。
なお、500件のデータは、以下の記事のプログラムを利用して作成している。
3) 以下のように、2つのTaskletクラスで、@StepScopeアノテーションをコメントアウトする。
4) 3)の状態で、それぞれ500件のデータを含む取込ファイル「user_data_1.csv」「user_data_2.csv」を用意し、1)に記載した方法で、2ファイルを並行して(2スレッドが起動した状態で)取り込んだ結果のログは以下の通りで、後から実行したスレッド(pool-2-thread-2)によってインスタンス変数の値が上書きされてしまうことが確認できる。
要点まとめ
- Azure Functions上でSpring Batchを利用しているプログラムの処理単位(Tasklet)内のインスタンス変数は、そのままだと、並列に実行された場合に上書きされてしまうが、Taskletに@StepScopeアノテーションを付与することで、並列に実行され複数スレッドが動いた場合に、インスタンス変数の値をスレッド毎にもたせることができる。