Spring Batchでは、複数の処理単位(Tasklet)を並列に実行することができる。
今回は、EventGridTriggerを利用したバッチ処理内で、以下の図のような並列処理を実行してみたので、そのサンプルプログラムを共有する。
前提条件
下記記事のサンプルプログラムを作成済であること。
また、以下の記事のPostmanをインストール済であること。
作成したサンプルプログラムの修正
前提条件の記事のサンプルプログラムに、ログ出力する処理単位(Tasklet)を必要な分だけ追加し、ジョブ実行定義を修正する。なお、下記の赤枠は、前提条件のプログラムから変更・追加したプログラムである。
新しく追加したログ出力する処理単位(Tasklet)は、前提条件の記事で作成した「DemoTaskletLog.java」を流用している。以下はDemoTaskletLog2.javaの内容であるが、DemoTaskletLog3.java・DemoTaskletLog4.javaも同じような内容になっている。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 | package com.example.batch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.core.StepContribution; import org.springframework.batch.core.scope.context.ChunkContext; import org.springframework.batch.core.step.tasklet.Tasklet; import org.springframework.batch.repeat.RepeatStatus; import org.springframework.stereotype.Component; import com.example.model.EventGridTriggerParam; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; @Component public class DemoTaskletLog2 implements Tasklet { /* Spring Bootでログ出力するためのLogbackのクラスを生成 */ private static final Logger LOGGER = LoggerFactory.getLogger(DemoTaskletLog2.class); /** * Spring Batchのジョブ内でのログ出力処理を定義する. */ @Override public RepeatStatus execute(StepContribution contribution , ChunkContext chunkContext) throws Exception { // Spring Batchのジョブ内での処理が呼び出されたことをログ出力する String paramStr = chunkContext.getStepContext() .getStepExecution().getJobParameters() .getString("eventGridTriggerParam"); EventGridTriggerParam param = null; if (paramStr != null) { param = new ObjectMapper().readValue(paramStr , new TypeReference<EventGridTriggerParam>() {}); LOGGER.info("DemoTaskletLog2 execute " + " triggered: " + param.getTimerInfo()); } // ジョブが終了したことを返す return RepeatStatus.FINISHED; } } |
また、ジョブ定義クラスの内容は以下の通りで、この記事の冒頭に記載したフロー1・フロー2を追加し、フロー1(ログ出力処理1→ログ出力処理2)、フロー2(ログ出力処理3→CSVファイル取込処理)を実行後、ログ出力処理4を実行するようにしている。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 | package com.example.batch; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.job.builder.FlowBuilder; import org.springframework.batch.core.job.flow.Flow; import org.springframework.batch.core.job.flow.support.SimpleFlow; import org.springframework.batch.core.launch.support.RunIdIncrementer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.task.SimpleAsyncTaskExecutor; @Configuration @EnableBatchProcessing public class DemoTaskletConfig { /** Spring Batchのジョブ内で指定する実行処理を定義するクラス */ @Autowired private DemoTasklet demoTasklet; /** Spring Batchのジョブ内で指定するログ出力処理を定義するクラス */ @Autowired private DemoTaskletLog demoTaskletLog; /** Spring Batchのジョブ内で指定するログ出力処理を定義するクラス(2) */ @Autowired private DemoTaskletLog2 demoTaskletLog2; /** Spring Batchのジョブ内で指定するログ出力処理を定義するクラス(3) */ @Autowired private DemoTaskletLog3 demoTaskletLog3; /** Spring Batchのジョブ内で指定するログ出力処理を定義するクラス(4) */ @Autowired private DemoTaskletLog4 demoTaskletLog4; /** Spring Batchのジョブを生成するクラス */ @Autowired private JobBuilderFactory jobBuilderFactory; /** Spring Batchのステップを生成するクラス */ @Autowired private StepBuilderFactory stepBuilderFactory; /** * Spring Batchのジョブ内で指定するCSVファイル取込処理(ステップ)を定義する. * @return Spring Batchのジョブ内で指定するCSVファイル取込処理 */ @Bean public Step step() { // 生成するステップ内でCSVファイル取込処理(Tasklet)を指定する return stepBuilderFactory .get("step") .tasklet(demoTasklet) .build(); } /** * Spring Batchのジョブ内で指定するログ出力処理(ステップ)を定義する. * @return Spring Batchのジョブ内で指定するログ出力処理 */ @Bean public Step stepLog() { // 生成するステップ内でログ出力する処理(Tasklet)を指定する return stepBuilderFactory .get("stepLog") .tasklet(demoTaskletLog) .build(); } /** * Spring Batchのジョブ内で指定するログ出力処理(ステップ)(2)を定義する. * @return Spring Batchのジョブ内で指定するログ出力処理 */ @Bean public Step stepLog2() { // 生成するステップ内でログ出力する処理(Tasklet)を指定する return stepBuilderFactory .get("stepLog2") .tasklet(demoTaskletLog2) .build(); } /** * Spring Batchのジョブ内で指定するログ出力処理(ステップ)(3)を定義する. * @return Spring Batchのジョブ内で指定するログ出力処理 */ @Bean public Step stepLog3() { // 生成するステップ内でログ出力する処理(Tasklet)を指定する return stepBuilderFactory .get("stepLog3") .tasklet(demoTaskletLog3) .build(); } /** * Spring Batchのジョブ内で指定するログ出力処理(ステップ)(4)を定義する. * @return Spring Batchのジョブ内で指定するログ出力処理 */ @Bean public Step stepLog4() { // 生成するステップ内でログ出力する処理(Tasklet)を指定する return stepBuilderFactory .get("stepLog4") .tasklet(demoTaskletLog4) .build(); } /** * 並列にステップを実行するフロー1(ログ出力処理1, ログ出力処理2)を定義する. * @param stepLog ログ出力処理1 * @param stepLog2 ログ出力処理2 * @return 並列にステップを実行するフロー1 */ @Bean public Flow flow1(Step stepLog, Step stepLog2) { return new FlowBuilder<SimpleFlow>("flow1") .start(stepLog) .next(stepLog2) .build(); } /** * 並列にステップを実行するフロー2(ログ出力処理3, CSVファイル取込処理)を定義する. * @param stepLog3 ログ出力処理3 * @param step CSVファイル取込処理 * @return 並列にステップを実行するフロー2 */ @Bean public Flow flow2(Step stepLog3, Step step) { return new FlowBuilder<SimpleFlow>("flow2") .start(stepLog3) .next(step) .build(); } /** * Spring Batchのジョブを定義する. * @param flow1 並列にステップを実行するフロー1 * @param flow2 並列にステップを実行するフロー2 * @param stepLog4 ログ出力処理4 * @return Spring Batchのジョブ */ @Bean public Job job(Flow flow1, Flow flow2, Step stepLog4) { // 生成するジョブ内で、実行前後の処理(リスナ)と処理単位(ステップ)を指定する // flow1とflow2を並列に実行した後で、ログ出力処理4を実行する return jobBuilderFactory .get("job") .incrementer(new RunIdIncrementer()) .listener(listener()) .start(flow1) .split(new SimpleAsyncTaskExecutor()) .add(flow2) .next(stepLog4) .end() .build(); } /** * Spring Batchのジョブの実行前後の処理を定義する. * @return Spring Batchのジョブの実行前後の処理 */ @Bean public DemoJobListener listener() { return new DemoJobListener(); } } |
その他のソースコード内容は、以下のサイトを参照のこと。
https://github.com/purin-it/azure/tree/master/event-grid-trigger-multi-tasklet-parallel/demoAzureFunc
サンプルプログラムの実行結果
以下の記事の「EventGridのローカル環境での実行」に記載した通りに実行する。
実行した結果のログは以下の通りで、この記事の冒頭に記載した処理フローが実行されていることが確認できる。
要点まとめ
- Spring Batchでは、複数の処理単位(Tasklet)を並列に実行することができる。その際、並列処理それぞれのフローをあらかじめ定義しておく必要がある。