Azure Functions上でSpring Batchを利用しているプログラムの処理単位(Tasklet)は、StepExecutionListenerインタフェースのafterStepメソッドやbeforeStepメソッドを利用することで、処理単位(Tasklet)間でデータの受け渡しを行うことができる。
今回は、処理単位(Tasklet)間でデータの受け渡しを行ってみたので、そのサンプルプログラムを共有する。
前提条件
下記記事のサンプルプログラムを作成済であること。
また、以下の記事のPostmanをインストール済であること。
作成したサンプルプログラムの修正
前提条件の記事のサンプルプログラムに、処理単位(Tasklet)間でデータの受け渡し処理を追加する。なお、下記の赤枠は、前提条件のプログラムから追加・変更したプログラムである。
新設したユーティリティクラスの内容は以下の通りで、ジョブパラメータからEventGridTriggerParamオブジェクトを生成している。
package com.example.batch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.core.StepExecution; import com.example.model.EventGridTriggerParam; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; public class DemoTaskletUtil { /* Spring Bootでログ出力するためのLogbackのクラスを生成 */ private static final Logger LOGGER = LoggerFactory.getLogger(DemoTaskletUtil.class); /** * Spring Batchのジョブで渡されたパラメータから、 * EventGridTriggerParamオブジェクトを生成する. * @param stepExecution ジョブステップ実行定義 * @return EventGridTriggerParamオブジェクト */ public static EventGridTriggerParam getEventGridTriggerParam( StepExecution stepExecution) { // Spring Batchのジョブで渡されたパラメータを取得し、 // EventGridTriggerParamオブジェクトを生成する String paramStr = stepExecution.getJobParameters() .getString("eventGridTriggerParam"); EventGridTriggerParam param = null; if (paramStr != null) { try { param = new ObjectMapper().readValue(paramStr , new TypeReference<EventGridTriggerParam>() { }); } catch (Exception ex) { LOGGER.error(ex.getMessage()); throw new RuntimeException(ex); } } return param; } }
DemoTasklet.javaの内容は以下の通りで、StepExecutionListenerインタフェースを実装し、afterStepメソッド内で次のTaskletにデータ(inputFileName)を渡すようにしている。
package com.example.batch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.core.ExitStatus; import org.springframework.batch.core.StepContribution; import org.springframework.batch.core.StepExecution; import org.springframework.batch.core.StepExecutionListener; import org.springframework.batch.core.scope.context.ChunkContext; import org.springframework.batch.repeat.RepeatStatus; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import com.example.model.EventGridTriggerParam; import com.example.service.DemoBatchService; import org.springframework.batch.core.step.tasklet.Tasklet; @Component public class DemoTasklet implements Tasklet, StepExecutionListener { /* Spring Bootでログ出力するためのLogbackのクラスを生成 */ private static final Logger LOGGER = LoggerFactory.getLogger(DemoTasklet.class); /** * BlobStorageからファイル(user_data.csv)を読み込み、 * USER_DATAテーブルに書き込むサービス */ @Autowired private DemoBatchService demoBatchService; /** * Spring Batchのジョブ実行前の処理を定義する. */ @Override public void beforeStep(StepExecution stepExecution) { // Spring Batchのジョブで渡されたパラメータを取得し、 // 処理が呼び出されたことをログ出力する EventGridTriggerParam param = DemoTaskletUtil.getEventGridTriggerParam(stepExecution); LOGGER.info("DemoTasklet beforeStep " + " triggered: " + param.getTimerInfo()); } /** * Spring Batchのジョブ内での処理を定義する. */ @Override public RepeatStatus execute(StepContribution contribution , ChunkContext chunkContext) throws Exception { // Spring Batchのジョブで渡されたパラメータを取得し、 // 処理が呼び出されたことをログ出力する EventGridTriggerParam param = DemoTaskletUtil.getEventGridTriggerParam( chunkContext.getStepContext().getStepExecution()); LOGGER.info("DemoTasklet execute " + " triggered: " + param.getTimerInfo()); // BlobStorageから指定したファイルを読み込み、 // USER_DATAテーブルに書き込むサービスを呼び出す demoBatchService.readUserData(param.getFileName()); // ジョブが終了したことを返す return RepeatStatus.FINISHED; } /** * Spring Batchのジョブ実行後の処理を定義する. */ @Override public ExitStatus afterStep(StepExecution stepExecution) { // Spring Batchのジョブで渡されたパラメータを取得し、 // 処理が呼び出されたことをログ出力する EventGridTriggerParam param = DemoTaskletUtil.getEventGridTriggerParam(stepExecution); // Spring Batchのジョブ内での処理が呼び出されたことをログ出力する LOGGER.info("DemoTasklet afterStep " + " triggered: " + param.getTimerInfo()); // ファイル名を次のタスクレットに渡し、実行後ジョブが終了したことを返す stepExecution.getJobExecution().getExecutionContext() .put("inputFileName", param.getFileName()); return ExitStatus.COMPLETED; } }
DemoTaskletLog.javaの内容は以下の通りで、StepExecutionListenerインタフェースを実装し、beforeStepメソッド内で前のTaskletからのデータ(inputFileName)を受け取るようにしている。
package com.example.batch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.core.ExitStatus; import org.springframework.batch.core.StepContribution; import org.springframework.batch.core.StepExecution; import org.springframework.batch.core.StepExecutionListener; import org.springframework.batch.core.scope.context.ChunkContext; import org.springframework.batch.core.step.tasklet.Tasklet; import org.springframework.batch.repeat.RepeatStatus; import org.springframework.stereotype.Component; import com.example.model.EventGridTriggerParam; @Component public class DemoTaskletLog implements Tasklet, StepExecutionListener { /* Spring Bootでログ出力するためのLogbackのクラスを生成 */ private static final Logger LOGGER = LoggerFactory.getLogger(DemoTaskletLog.class); /** * Spring Batchのジョブ実行前の処理を定義する. */ @Override public void beforeStep(StepExecution stepExecution) { // Spring Batchのジョブで渡されたパラメータを取得し、 // 処理が呼び出されたことをログ出力する EventGridTriggerParam param = DemoTaskletUtil.getEventGridTriggerParam(stepExecution); LOGGER.info("DemoTaskletLog beforeStep " + " triggered: " + param.getTimerInfo()); // タスクレットに渡されたファイル名を取得する String inputFileName = (String) stepExecution.getJobExecution() .getExecutionContext().get("inputFileName"); LOGGER.info("inputFileName: " + inputFileName); } /** * Spring Batchのジョブ内でのログ出力処理を定義する. */ @Override public RepeatStatus execute(StepContribution contribution , ChunkContext chunkContext) throws Exception { // Spring Batchのジョブで渡されたパラメータを取得し、 // 処理が呼び出されたことをログ出力する EventGridTriggerParam param = DemoTaskletUtil .getEventGridTriggerParam(chunkContext.getStepContext().getStepExecution()); LOGGER.info("DemoTaskletLog execute " + " triggered: " + param.getTimerInfo()); // ジョブが終了したことを返す return RepeatStatus.FINISHED; } /** * Spring Batchのジョブ実行後の処理を定義する. */ @Override public ExitStatus afterStep(StepExecution stepExecution) { // Spring Batchのジョブで渡されたパラメータを取得し、 // 処理が呼び出されたことをログ出力する EventGridTriggerParam param = DemoTaskletUtil.getEventGridTriggerParam(stepExecution); LOGGER.info("DemoTaskletLog afterStep " + " triggered: " + param.getTimerInfo()); // 実行後ジョブが終了したことを返す return ExitStatus.COMPLETED; } }
その他のソースコード内容は、以下のサイトを参照のこと。
https://github.com/purin-it/azure/tree/master/event-grid-trigger-tasklet-put-data/demoAzureFunc
サンプルプログラムの実行結果
サンプルプログラムの実行結果は、以下の通り。
1) 以下の記事の「EventGridのローカル環境での実行」に記載した通りに実行する。
実行した結果のログは以下の通りで、DemoTaskletLogクラスのinputFileNameに、取り込んだCSVファイル名「user_data.csv」が設定されていることが確認できる。
2) それぞれ500件のデータを含む取込ファイル「user_data_1.csv」「user_data_2.csv」を用意し、1)と同じ方法で、2ファイルを並行して(2スレッドが起動した状態で)取り込んだ結果のログは以下の通りで、DemoTaskletLogクラスのinputFileNameに、取り込んだCSVファイル名「user_data_1.csv」「user_data_2.csv」がそれぞれ設定されていることが確認できる。
なお、500件のデータは、以下の記事のプログラムを利用して作成している。
要点まとめ
- Azure Functions上でSpring Batchを利用しているプログラムの処理単位(Tasklet)は、StepExecutionListenerインタフェースのafterStepメソッドやbeforeStepメソッドを利用することで、処理単位(Tasklet)間でデータの受け渡しを行うことができる。