Spring Batchでは、複数の処理単位(Tasklet)を並列に実行することができる。
今回は、EventGridTriggerを利用したバッチ処理内で、以下の図のような並列処理を実行してみたので、そのサンプルプログラムを共有する。
前提条件
下記記事のサンプルプログラムを作成済であること。
また、以下の記事のPostmanをインストール済であること。
作成したサンプルプログラムの修正
前提条件の記事のサンプルプログラムに、ログ出力する処理単位(Tasklet)を必要な分だけ追加し、ジョブ実行定義を修正する。なお、下記の赤枠は、前提条件のプログラムから変更・追加したプログラムである。
新しく追加したログ出力する処理単位(Tasklet)は、前提条件の記事で作成した「DemoTaskletLog.java」を流用している。以下はDemoTaskletLog2.javaの内容であるが、DemoTaskletLog3.java・DemoTaskletLog4.javaも同じような内容になっている。
package com.example.batch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.core.StepContribution; import org.springframework.batch.core.scope.context.ChunkContext; import org.springframework.batch.core.step.tasklet.Tasklet; import org.springframework.batch.repeat.RepeatStatus; import org.springframework.stereotype.Component; import com.example.model.EventGridTriggerParam; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; @Component public class DemoTaskletLog2 implements Tasklet { /* Spring Bootでログ出力するためのLogbackのクラスを生成 */ private static final Logger LOGGER = LoggerFactory.getLogger(DemoTaskletLog2.class); /** * Spring Batchのジョブ内でのログ出力処理を定義する. */ @Override public RepeatStatus execute(StepContribution contribution , ChunkContext chunkContext) throws Exception { // Spring Batchのジョブ内での処理が呼び出されたことをログ出力する String paramStr = chunkContext.getStepContext() .getStepExecution().getJobParameters() .getString("eventGridTriggerParam"); EventGridTriggerParam param = null; if (paramStr != null) { param = new ObjectMapper().readValue(paramStr , new TypeReference<EventGridTriggerParam>() {}); LOGGER.info("DemoTaskletLog2 execute " + " triggered: " + param.getTimerInfo()); } // ジョブが終了したことを返す return RepeatStatus.FINISHED; } }
また、ジョブ定義クラスの内容は以下の通りで、この記事の冒頭に記載したフロー1・フロー2を追加し、フロー1(ログ出力処理1→ログ出力処理2)、フロー2(ログ出力処理3→CSVファイル取込処理)を実行後、ログ出力処理4を実行するようにしている。
package com.example.batch; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.job.builder.FlowBuilder; import org.springframework.batch.core.job.flow.Flow; import org.springframework.batch.core.job.flow.support.SimpleFlow; import org.springframework.batch.core.launch.support.RunIdIncrementer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.task.SimpleAsyncTaskExecutor; @Configuration @EnableBatchProcessing public class DemoTaskletConfig { /** Spring Batchのジョブ内で指定する実行処理を定義するクラス */ @Autowired private DemoTasklet demoTasklet; /** Spring Batchのジョブ内で指定するログ出力処理を定義するクラス */ @Autowired private DemoTaskletLog demoTaskletLog; /** Spring Batchのジョブ内で指定するログ出力処理を定義するクラス(2) */ @Autowired private DemoTaskletLog2 demoTaskletLog2; /** Spring Batchのジョブ内で指定するログ出力処理を定義するクラス(3) */ @Autowired private DemoTaskletLog3 demoTaskletLog3; /** Spring Batchのジョブ内で指定するログ出力処理を定義するクラス(4) */ @Autowired private DemoTaskletLog4 demoTaskletLog4; /** Spring Batchのジョブを生成するクラス */ @Autowired private JobBuilderFactory jobBuilderFactory; /** Spring Batchのステップを生成するクラス */ @Autowired private StepBuilderFactory stepBuilderFactory; /** * Spring Batchのジョブ内で指定するCSVファイル取込処理(ステップ)を定義する. * @return Spring Batchのジョブ内で指定するCSVファイル取込処理 */ @Bean public Step step() { // 生成するステップ内でCSVファイル取込処理(Tasklet)を指定する return stepBuilderFactory .get("step") .tasklet(demoTasklet) .build(); } /** * Spring Batchのジョブ内で指定するログ出力処理(ステップ)を定義する. * @return Spring Batchのジョブ内で指定するログ出力処理 */ @Bean public Step stepLog() { // 生成するステップ内でログ出力する処理(Tasklet)を指定する return stepBuilderFactory .get("stepLog") .tasklet(demoTaskletLog) .build(); } /** * Spring Batchのジョブ内で指定するログ出力処理(ステップ)(2)を定義する. * @return Spring Batchのジョブ内で指定するログ出力処理 */ @Bean public Step stepLog2() { // 生成するステップ内でログ出力する処理(Tasklet)を指定する return stepBuilderFactory .get("stepLog2") .tasklet(demoTaskletLog2) .build(); } /** * Spring Batchのジョブ内で指定するログ出力処理(ステップ)(3)を定義する. * @return Spring Batchのジョブ内で指定するログ出力処理 */ @Bean public Step stepLog3() { // 生成するステップ内でログ出力する処理(Tasklet)を指定する return stepBuilderFactory .get("stepLog3") .tasklet(demoTaskletLog3) .build(); } /** * Spring Batchのジョブ内で指定するログ出力処理(ステップ)(4)を定義する. * @return Spring Batchのジョブ内で指定するログ出力処理 */ @Bean public Step stepLog4() { // 生成するステップ内でログ出力する処理(Tasklet)を指定する return stepBuilderFactory .get("stepLog4") .tasklet(demoTaskletLog4) .build(); } /** * 並列にステップを実行するフロー1(ログ出力処理1, ログ出力処理2)を定義する. * @param stepLog ログ出力処理1 * @param stepLog2 ログ出力処理2 * @return 並列にステップを実行するフロー1 */ @Bean public Flow flow1(Step stepLog, Step stepLog2) { return new FlowBuilder<SimpleFlow>("flow1") .start(stepLog) .next(stepLog2) .build(); } /** * 並列にステップを実行するフロー2(ログ出力処理3, CSVファイル取込処理)を定義する. * @param stepLog3 ログ出力処理3 * @param step CSVファイル取込処理 * @return 並列にステップを実行するフロー2 */ @Bean public Flow flow2(Step stepLog3, Step step) { return new FlowBuilder<SimpleFlow>("flow2") .start(stepLog3) .next(step) .build(); } /** * Spring Batchのジョブを定義する. * @param flow1 並列にステップを実行するフロー1 * @param flow2 並列にステップを実行するフロー2 * @param stepLog4 ログ出力処理4 * @return Spring Batchのジョブ */ @Bean public Job job(Flow flow1, Flow flow2, Step stepLog4) { // 生成するジョブ内で、実行前後の処理(リスナ)と処理単位(ステップ)を指定する // flow1とflow2を並列に実行した後で、ログ出力処理4を実行する return jobBuilderFactory .get("job") .incrementer(new RunIdIncrementer()) .listener(listener()) .start(flow1) .split(new SimpleAsyncTaskExecutor()) .add(flow2) .next(stepLog4) .end() .build(); } /** * Spring Batchのジョブの実行前後の処理を定義する. * @return Spring Batchのジョブの実行前後の処理 */ @Bean public DemoJobListener listener() { return new DemoJobListener(); } }
その他のソースコード内容は、以下のサイトを参照のこと。
https://github.com/purin-it/azure/tree/master/event-grid-trigger-multi-tasklet-parallel/demoAzureFunc
サンプルプログラムの実行結果
以下の記事の「EventGridのローカル環境での実行」に記載した通りに実行する。
実行した結果のログは以下の通りで、この記事の冒頭に記載した処理フローが実行されていることが確認できる。
要点まとめ
- Spring Batchでは、複数の処理単位(Tasklet)を並列に実行することができる。その際、並列処理それぞれのフローをあらかじめ定義しておく必要がある。