EventGridTrigger/SpringBatch

Azure Functions上でSpring Batchを利用しているプログラムで処理単位(Tasklet)を並列に実行してみた

Spring Batchでは、複数の処理単位(Tasklet)を並列に実行することができる。

今回は、EventGridTriggerを利用したバッチ処理内で、以下の図のような並列処理を実行してみたので、そのサンプルプログラムを共有する。
バッチフロー図

前提条件

下記記事のサンプルプログラムを作成済であること。

Azure Functions上でSpring Batchを利用しているプログラムで処理単位(Tasklet)を複数つないでみたこれまでこのブログでは、1つの処理単位(Tasklet)のみを扱ってきたが、複数の処理単位を順次実行することもできる。 今回は、E...

また、以下の記事のPostmanをインストール済であること。

POSTメソッドでリクエストされるAzure FunctionsのAPIをPostmanによって呼び出してみたAzure App Serviceを利用せずに、HTTPトリガーによって呼び出されるAzure Functionsの動作確認を行うには、...

作成したサンプルプログラムの修正

前提条件の記事のサンプルプログラムに、ログ出力する処理単位(Tasklet)を必要な分だけ追加し、ジョブ実行定義を修正する。なお、下記の赤枠は、前提条件のプログラムから変更・追加したプログラムである。
サンプルプログラムの構成

新しく追加したログ出力する処理単位(Tasklet)は、前提条件の記事で作成した「DemoTaskletLog.java」を流用している。以下はDemoTaskletLog2.javaの内容であるが、DemoTaskletLog3.java・DemoTaskletLog4.javaも同じような内容になっている。

package com.example.batch;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.stereotype.Component;

import com.example.model.EventGridTriggerParam;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;

@Component
public class DemoTaskletLog2 implements Tasklet {

    /* Spring Bootでログ出力するためのLogbackのクラスを生成 */
    private static final Logger LOGGER 
        = LoggerFactory.getLogger(DemoTaskletLog2.class);

    /**
     * Spring Batchのジョブ内でのログ出力処理を定義する.
     */
    @Override
    public RepeatStatus execute(StepContribution contribution
            , ChunkContext chunkContext) throws Exception {
        // Spring Batchのジョブ内での処理が呼び出されたことをログ出力する
        String paramStr = chunkContext.getStepContext()
                .getStepExecution().getJobParameters()
                .getString("eventGridTriggerParam");

        EventGridTriggerParam param = null;
        if (paramStr != null) {
            param = new ObjectMapper().readValue(paramStr
                , new TypeReference<EventGridTriggerParam>() {});
            LOGGER.info("DemoTaskletLog2 execute " 
                + " triggered: " + param.getTimerInfo());
        }

        // ジョブが終了したことを返す
        return RepeatStatus.FINISHED;
    }

}



また、ジョブ定義クラスの内容は以下の通りで、この記事の冒頭に記載したフロー1・フロー2を追加し、フロー1(ログ出力処理1→ログ出力処理2)、フロー2(ログ出力処理3→CSVファイル取込処理)を実行後、ログ出力処理4を実行するようにしている。

package com.example.batch;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.job.flow.support.SimpleFlow;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;

@Configuration
@EnableBatchProcessing
public class DemoTaskletConfig {

    /** Spring Batchのジョブ内で指定する実行処理を定義するクラス */
    @Autowired
    private DemoTasklet demoTasklet;
    
    /** Spring Batchのジョブ内で指定するログ出力処理を定義するクラス */
    @Autowired
    private DemoTaskletLog demoTaskletLog;
    
    /** Spring Batchのジョブ内で指定するログ出力処理を定義するクラス(2) */
    @Autowired
    private DemoTaskletLog2 demoTaskletLog2;
    
    /** Spring Batchのジョブ内で指定するログ出力処理を定義するクラス(3) */
    @Autowired
    private DemoTaskletLog3 demoTaskletLog3;
    
    /** Spring Batchのジョブ内で指定するログ出力処理を定義するクラス(4) */
    @Autowired
    private DemoTaskletLog4 demoTaskletLog4;

    /** Spring Batchのジョブを生成するクラス */
    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    /** Spring Batchのステップを生成するクラス */
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    /**
     * Spring Batchのジョブ内で指定するCSVファイル取込処理(ステップ)を定義する.
     * @return Spring Batchのジョブ内で指定するCSVファイル取込処理
     */
    @Bean
    public Step step() {
        // 生成するステップ内でCSVファイル取込処理(Tasklet)を指定する
        return stepBuilderFactory
                .get("step")
                .tasklet(demoTasklet)
                .build();
    }
    
    /**
     * Spring Batchのジョブ内で指定するログ出力処理(ステップ)を定義する.
     * @return Spring Batchのジョブ内で指定するログ出力処理
     */
    @Bean
    public Step stepLog() {
        // 生成するステップ内でログ出力する処理(Tasklet)を指定する
        return stepBuilderFactory
                .get("stepLog")
                .tasklet(demoTaskletLog)
                .build();
    }
    
    /**
     * Spring Batchのジョブ内で指定するログ出力処理(ステップ)(2)を定義する.
     * @return Spring Batchのジョブ内で指定するログ出力処理
     */
    @Bean
    public Step stepLog2() {
        // 生成するステップ内でログ出力する処理(Tasklet)を指定する
        return stepBuilderFactory
                .get("stepLog2")
                .tasklet(demoTaskletLog2)
                .build();
    }
    
    /**
     * Spring Batchのジョブ内で指定するログ出力処理(ステップ)(3)を定義する.
     * @return Spring Batchのジョブ内で指定するログ出力処理
     */
    @Bean
    public Step stepLog3() {
        // 生成するステップ内でログ出力する処理(Tasklet)を指定する
        return stepBuilderFactory
                .get("stepLog3")
                .tasklet(demoTaskletLog3)
                .build();
    }
    
    /**
     * Spring Batchのジョブ内で指定するログ出力処理(ステップ)(4)を定義する.
     * @return Spring Batchのジョブ内で指定するログ出力処理
     */
    @Bean
    public Step stepLog4() {
        // 生成するステップ内でログ出力する処理(Tasklet)を指定する
        return stepBuilderFactory
                .get("stepLog4")
                .tasklet(demoTaskletLog4)
                .build();
    }
    
    /**
     * 並列にステップを実行するフロー1(ログ出力処理1, ログ出力処理2)を定義する.
     * @param stepLog ログ出力処理1
     * @param stepLog2 ログ出力処理2
     * @return 並列にステップを実行するフロー1
     */
    @Bean
    public Flow flow1(Step stepLog, Step stepLog2) {
        return new FlowBuilder<SimpleFlow>("flow1")
            .start(stepLog)
            .next(stepLog2)
            .build();
    }

    /**
     * 並列にステップを実行するフロー2(ログ出力処理3, CSVファイル取込処理)を定義する.
     * @param stepLog3 ログ出力処理3
     * @param step CSVファイル取込処理
     * @return 並列にステップを実行するフロー2
     */
    @Bean
    public Flow flow2(Step stepLog3, Step step) {
        return new FlowBuilder<SimpleFlow>("flow2")
            .start(stepLog3)
            .next(step)
            .build();
    }
    
    /**
     * Spring Batchのジョブを定義する.
     * @param flow1 並列にステップを実行するフロー1
     * @param flow2 並列にステップを実行するフロー2
     * @param stepLog4 ログ出力処理4
     * @return Spring Batchのジョブ
     */
    @Bean
    public Job job(Flow flow1, Flow flow2, Step stepLog4) {
        // 生成するジョブ内で、実行前後の処理(リスナ)と処理単位(ステップ)を指定する
        // flow1とflow2を並列に実行した後で、ログ出力処理4を実行する
        return jobBuilderFactory
                .get("job")
                .incrementer(new RunIdIncrementer())
                .listener(listener())
                .start(flow1)
                .split(new SimpleAsyncTaskExecutor())
                .add(flow2)
                .next(stepLog4)
                .end()
                .build();
    }

    /**
     * Spring Batchのジョブの実行前後の処理を定義する.
     * @return Spring Batchのジョブの実行前後の処理
     */
    @Bean
    public DemoJobListener listener() {
        return new DemoJobListener();
    }
}

その他のソースコード内容は、以下のサイトを参照のこと。
https://github.com/purin-it/azure/tree/master/event-grid-trigger-multi-tasklet-parallel/demoAzureFunc



サンプルプログラムの実行結果

以下の記事の「EventGridのローカル環境での実行」に記載した通りに実行する。

EventGridTriggerを利用したプログラムをローカル環境で動かしてみた前回は、EventGridTriggerを利用して、BlobにCSVファイルが作成されたタイミングで、そのCSVファイルをDBに書き込む...

実行した結果のログは以下の通りで、この記事の冒頭に記載した処理フローが実行されていることが確認できる。
サンプルプログラムの実行結果

要点まとめ

  • Spring Batchでは、複数の処理単位(Tasklet)を並列に実行することができる。その際、並列処理それぞれのフローをあらかじめ定義しておく必要がある。