TimerTrigger/SpringBatch

TimerTriggerによって動作するAzure Function上でSpring Batch(Chunkモデル)を利用してみた

これまでは、Spring BatchのTaskletモデルを利用してきたが、ファイルの読み込み/データの加工/DBへの書き込みといった処理の流れを定型化したChunkモデルという方式もある。

Chunkモデルの処理方式については、以下のサイトを参照のこと。
https://spring.pleiades.io/spring-batch/docs/current/reference/html/step.html#chunkOrientedProcessing

今回は、以前作成したプログラムをChunkモデルを利用する方式に変えてみたので、そのサンプルプログラムを共有する。

なお、今回はAzure Blob Storageのファイル読み込み時にSASトークンを発行しないと、ファイルの読み込みを行えないため、以下のサイトに記載されている、SASトークンの生成処理を利用している。
https://logico-jp.io/2021/01/21/create-a-service-sas-for-a-container-or-blob-storage-in-java/



前提条件

下記記事のサンプルプログラムを作成済であること。

TimerTriggerによって動作するAzure Function上でCSVファイルの内容をDBに書き込んでみたTimer Triggerによって、一定時間が来たタイミングでAzure Functionsが動作するアプリケーションを生成し、そのバッ...

また、読み込むCSVファイルの文字コードはUTF-8であるものとする。

作成したサンプルプログラムの修正

前提条件の記事のサンプルプログラムを、Chunkモデルを利用するように修正する。なお、下記の赤枠は、前提条件のプログラムから変更したり、追加したりしたプログラムである。
サンプルプログラムの構成

Chunkモデルの設定を行うクラスの内容は以下の通りで、ファイルの読み込み・DBへの書き込みや、バッチ処理単位(ステップ)やジョブの定義を行っている。

package com.example.batch;

import java.net.MalformedURLException;

import javax.sql.DataSource;

import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.batch.MyBatisBatchItemWriter;
import org.mybatis.spring.batch.builder.MyBatisBatchItemWriterBuilder;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.UrlResource;

import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.BlobServiceClientBuilder;
import com.example.mybatis.model.UserData;
import com.example.util.DemoStringUtil;

import lombok.RequiredArgsConstructor;

@Configuration
@EnableBatchProcessing
@RequiredArgsConstructor
public class DemoChunkConfig {

    public final JobBuilderFactory jobBuilderFactory;

    public final StepBuilderFactory stepBuilderFactory;

    public final SqlSessionFactory sqlSessionFactory;

    private final DemoUpdateProcessor demoUpdateProcessor;

    /** Azure Storageのアカウント名 */
    @Value("${azure.storage.accountName}")
    private String storageAccountName;

    /** Azure Storageへのアクセスキー */
    @Value("${azure.storage.accessKey}")
    private String storageAccessKey;

    /** Azure StorageのBlobコンテナー名 */
    @Value("${azure.storage.containerName}")
    private String storageContainerName;

    /**
     * BlobStorageからファイルを読み込む.
     * @return 読み込みオブジェクト
     */
    @Bean
    public FlatFileItemReader<UserData> reader() {
        FlatFileItemReader<UserData> reader = new FlatFileItemReader<UserData>();
        try {
            // Blobストレージへの接続文字列
            String storageConnectionString = "DefaultEndpointsProtocol=https;" 
                    + "AccountName=" + storageAccountName
                    + ";" + "AccountKey=" + storageAccessKey + ";";
            // Blobサービスクライアントの生成
            BlobServiceClient blobServiceClient = new BlobServiceClientBuilder()
                    .connectionString(storageConnectionString).buildClient();
            // BlobサービスクライアントからSASを生成
            String sas = DemoStringUtil.getServiceSasUriForBlob(blobServiceClient);
            // BlobStorageからファイル(user_data.csv)を読み込む際のURLをreaderに設定
            String url = blobServiceClient.getAccountUrl() + "/" 
                    + storageContainerName + "/user_data.csv" 
                    + "?" + sas;
            reader.setResource(new UrlResource(url));
        } catch (MalformedURLException ex) {
            throw new RuntimeException(ex);
        }

        // ファイルを読み込む際の文字コード
        reader.setEncoding("UTF-8");
        // 1行目を読み飛ばす
        reader.setLinesToSkip(1);
        // ファイルから読み込んだデータをUserDataオブジェクトに格納
        reader.setLineMapper(new DefaultLineMapper<UserData>() {
            {
                setLineTokenizer(new DelimitedLineTokenizer() {
                    {
                        setNames(new String[] { "id", "name", "birth_year"
                                , "birth_month", "birth_day", "sex", "memo" });
                    }
                });
                setFieldSetMapper(new BeanWrapperFieldSetMapper<UserData>() {
                    {
                        setTargetType(UserData.class);
                    }
                });
            }
        });
        return reader;
    }

    /**
     * 読み込んだファイルのデータを、DBに書き込む.
     * @param dataSource データソース
     * @return 書き込みオブジェクト
     */
    @Bean
    public MyBatisBatchItemWriter<UserData> writer(DataSource dataSource) {
        return new MyBatisBatchItemWriterBuilder<UserData>()
                .sqlSessionFactory(sqlSessionFactory)
                .statementId("com.example.mybatis.UserDataMapper.upsert")
                .build();
    }

    /**
     * Spring Batchのジョブ内で指定する処理単位(ステップ)を定義する.
     * @param reader 読み込みオブジェクト
     * @param writer 書き込みオブジェクト
     * @return Spring Batchのジョブ内で指定する処理単位
     */
    @Bean
    public Step step(ItemReader<UserData> reader, ItemWriter<UserData> writer) {
        // 生成するステップ内で読み込み/加工/書き込みを一連の流れを指定する
        // その際のチャンクサイズ(=何件読み込む毎にコミットするか)を3に指定している
        return stepBuilderFactory.get("step")
                .<UserData, UserData>chunk(3)
                .reader(reader)
                .processor(demoUpdateProcessor)
                .writer(writer)
                .build();
    }

    /**
     * Spring Batchのジョブを定義する.
     * @param jobListener 実行前後の処理(リスナ)
     * @param step        実行するステップ
     * @return Spring Batchのジョブ
     */
    @Bean
    public Job updateUserData(DemoJobListener jobListener, Step step) {
        // 生成するジョブ内で、実行前後の処理(リスナ)と処理単位(ステップ)を指定する
        return jobBuilderFactory
                .get("updateUserData")
                .incrementer(new RunIdIncrementer())
                .listener(jobListener)
                .flow(step)
                .end()
                .build();
    }
}
「EaseUS Todo Backup」は様々な形でバックアップ取得が行える便利ツールだったパソコン内のデータを、ファイル/パーティション/ディスク等の様々な単位でバックアップしたり、バックアップのスケジュール設定や暗号化設定も...

また、読み込んだデータのチェックを行うクラスの内容は以下の通りで、エラー時はログ出力し、(DBへの書き込みをしないよう)NULLを返却するようにしている。

package com.example.batch;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.stereotype.Component;

import com.example.mybatis.model.UserData;
import com.example.util.DemoStringUtil;

@Component
public class DemoUpdateProcessor implements ItemProcessor<UserData, UserData> {

    /* Spring Bootでログ出力するためのLogbackのクラスを生成 */
    private static final Logger LOGGER 
        = LoggerFactory.getLogger(DemoUpdateProcessor.class);

    /**
     * 読み込んだデータの加工を行う.
     * ここでは、読み込んだデータのチェックを行い、エラーがあればNULLを、
     * エラーがなければ引数の値をそのまま返す.
     */
    @Override
    public UserData process(UserData item) throws Exception {
        String itemId = item.getId();
        
        // 1列目が空またはNULLの場合はエラー
        if (StringUtils.isEmpty(itemId)) {
            LOGGER.info("1列目が空またはNULLです。");
            return null;
        }
        // 1列目が数値以外の場合はエラー
        if (!StringUtils.isNumeric(itemId)) {
            LOGGER.info("1列目が数値以外です。" + " 該当のid=" + itemId);
            return null;
        }
        // 1列目の桁数が不正な場合はエラー
        if (itemId.length() > 6) {
            LOGGER.info("1列目の桁数が不正です。" + " 該当のid=" + itemId);
            return null;
        }

        // 2列目が空またはNULLの場合はエラー
        if (StringUtils.isEmpty(item.getName())) {
            LOGGER.info("2列目が空またはNULLです。" + " 該当のid=" + itemId);
            return null;
        }
        // 2列目の桁数が不正な場合はエラー
        if (item.getName().length() > 40) {
            LOGGER.info("2列目の桁数が不正です。" + " 該当のid=" + itemId);
            return null;
        }

        // 3列目が空またはNULLの場合はエラー
        if (StringUtils.isEmpty(item.getBirth_year())) {
            LOGGER.info("3列目が空またはNULLです。" + " 該当のid=" + itemId);
            return null;
        }
        // 3列目が数値以外の場合はエラー
        if (!StringUtils.isNumeric(item.getBirth_year())) {
            LOGGER.info("3列目が数値以外です。" + " 該当のid=" + itemId);
            return null;
        }
        // 3列目の桁数が不正な場合はエラー
        if (item.getBirth_year().length() > 4) {
            LOGGER.info("3列目の桁数が不正です。" + " 該当のid=" + itemId);
            return null;
        }

        // 4列目が空またはNULLの場合はエラー
        if (StringUtils.isEmpty(item.getBirth_month())) {
            LOGGER.info("4列目が空またはNULLです。" + " 該当のid=" + itemId);
            return null;
        }
        // 4列目が数値以外の場合はエラー
        if (!StringUtils.isNumeric(item.getBirth_month())) {
            LOGGER.info("4列目が数値以外です。" + " 該当のid=" + itemId);
            return null;
        }
        // 4列目の桁数が不正な場合はエラー
        if (item.getBirth_month().length() > 4) {
            LOGGER.info("4列目の桁数が不正です。" + " 該当のid=" + itemId);
            return null;
        }

        // 5列目が空またはNULLの場合はエラー
        if (StringUtils.isEmpty(item.getBirth_day())) {
            LOGGER.info("5列目が空またはNULLです。" + " 該当のid=" + itemId);
            return null;
        }
        // 5列目が数値以外の場合はエラー
        if (!StringUtils.isNumeric(item.getBirth_day())) {
            LOGGER.info("5列目が数値以外です。"+ " 該当のid=" + itemId);
            return null;
        }
        // 5列目の桁数が不正な場合はエラー
        if (item.getBirth_day().length() > 2) {
            LOGGER.info(" 5列目の桁数が不正です。"+ " 該当のid=" + itemId);
            return null;
        }

        // 3列目・4列目・5列目から生成される日付が不正であればエラー
        String birthDay = item.getBirth_year() 
                + DemoStringUtil.addZero(item.getBirth_month())
                + DemoStringUtil.addZero(item.getBirth_day());
        if (!DemoStringUtil.isCorrectDate(birthDay, "uuuuMMdd")) {
            LOGGER.info("3~5列目の日付が不正です。"+ " 該当のid=" + itemId);
            return null;
        }

        // 6列目が1,2以外の場合はエラー
        if (!("1".equals(item.getSex())) 
                 && !("2".equals(item.getSex()))) {
            LOGGER.info("6列目の性別が不正です。"+ " 該当のid=" + itemId);
            return null;
        }

        // 7列目の桁数が不正な場合はエラー
        if (!StringUtils.isEmpty(item.getMemo()) 
                 && item.getMemo().length() > 1024) {
            LOGGER.info("7列目の桁数が不正です。"+ " 該当のid=" + itemId);
            return null;
        }

        return item;
    }

}

さらに、ジョブの実行前後に処理を追加するクラスの内容は以下の通りで、@Componentアノテーションを付与してDemoChunkConfigクラスから参照できるようにしている。

package com.example.batch;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.listener.JobExecutionListenerSupport;
import org.springframework.stereotype.Component;

import com.example.model.TimerTriggerParam;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;

@Component
public class DemoJobListener extends JobExecutionListenerSupport {
    
    /* Spring Bootでログ出力するためのLogbackのクラスを生成 */
    private static final Logger LOGGER 
         = LoggerFactory.getLogger(DemoJobListener.class);

    /**
     * Spring Batchのジョブ実行前の処理を定義する.
     */
    @Override
    public void beforeJob(JobExecution jobExecution) {
        super.beforeJob(jobExecution);
        
        // Spring Batchのジョブ実行前の処理が呼び出されたことをログ出力する
        printLog(jobExecution, "beforeJob");
    }
    
    /**
     * Spring Batchのジョブ実行後の処理を定義する.
     */
    @Override
    public void afterJob(JobExecution jobExecution) {
        super.afterJob(jobExecution);
        
        // Spring Batchのジョブ実行後の処理が呼び出されたことをログ出力する
        printLog(jobExecution, "afterJob");
    }
    
    /**
     * ログ出力を行う.
     * @param jobExecution ジョブ実行時の定義オブジェクト
     * @param methodName メソッド名
     */
    private void printLog(JobExecution jobExecution, String methodName) {
        try {
            String paramStr 
                = jobExecution.getJobParameters().getString("timerTriggerParam");
            if(paramStr != null) {
                TimerTriggerParam param = new ObjectMapper().readValue(
                     paramStr, new TypeReference<TimerTriggerParam>() {});
                LOGGER.info("DemoJobListener " + methodName 
                     + " triggered: " + param.getTimerInfo());
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
    
}
サラリーマン型フリーランスSEという働き方でお金の不安を解消しよう先日、「サラリーマン型フリーランスSE」という働き方を紹介するYouTube動画を視聴しましたので、その内容をご紹介します。 「サ...

また、UserDataクラスの内容は以下の通りで、全項目を文字列型に変更している。

package com.example.mybatis.model;

import lombok.Data;

@Data
public class UserData {

    /** ID */
    private String id;
    
    /** 名前 */
    private String name;
    
    /** 生年月日_年 */
    private String birth_year;
    
    /** 生年月日_月 */
    private String birth_month;
    
    /** 生年月日_日 */
    private String birth_day;
    
    /** 性別 */
    private String sex;
    
    /** メモ */
    private String memo;
    
}

さらに、pom.xmlの追加内容は以下の通りで、SASトークンを生成するための設定を追加している。

<!-- Azure StorageでSASトークンを利用するための設定 -->
<dependency>
    <groupId>com.azure</groupId>
    <artifactId>azure-storage-blob</artifactId>
    <version>12.10.0</version>
</dependency>

また、文字列を編集するユーティリティクラスの内容は以下の通りで、SASトークンを生成するgetServiceSasUriForBlobメソッドを追加している。

package com.example.util;

import java.time.Duration;
import java.time.LocalDate;
import java.time.OffsetDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.ResolverStyle;

import org.apache.commons.lang3.StringUtils;

import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.common.sas.AccountSasPermission;
import com.azure.storage.common.sas.AccountSasResourceType;
import com.azure.storage.common.sas.AccountSasService;
import com.azure.storage.common.sas.AccountSasSignatureValues;

public class DemoStringUtil {

    /**
     * DateTimeFormatterを利用して日付チェックを行う.
     * @param dateStr    チェック対象文字列
     * @param dateFormat 日付フォーマット
     * @return 日付チェック結果
     */
    public static boolean isCorrectDate(String dateStr, String dateFormat) {
        if (StringUtils.isEmpty(dateStr) || StringUtils.isEmpty(dateFormat)) {
            return false;
        }
        // 日付と時刻を厳密に解決するスタイルで、DateTimeFormatterオブジェクトを作成
        DateTimeFormatter df = DateTimeFormatter.ofPattern(dateFormat)
                                  .withResolverStyle(ResolverStyle.STRICT);
        try {
            // チェック対象文字列をLocalDate型の日付に変換できれば、チェックOKとする
            LocalDate.parse(dateStr, df);
            return true;
        } catch (Exception e) {
            return false;
        }
    }

    /**
     * 数値文字列が1桁の場合、頭に0を付けて返す.
     * @param intNum 数値文字列
     * @return 変換後数値文字列
     */
    public static String addZero(String intNum) {
        if (StringUtils.isEmpty(intNum)) {
            return intNum;
        }
        if (intNum.length() == 1) {
            return "0" + intNum;
        }
        return intNum;
    }

    /**
     * 引数で指定したBlobのSASトークンを生成し返す.
     * @param client Blobサービスクライアントオブジェクト
     * @return SASトークン
     */
    public static String getServiceSasUriForBlob(BlobServiceClient client) {
        // SASトークンのアクセス権を読み取り可能に設定
        AccountSasPermission permissions 
              = new AccountSasPermission().setReadPermission(true);

        // SASトークンがBlobコンテナやオブジェクトにアクセスできるように設定
        AccountSasResourceType resourceTypes 
              = new AccountSasResourceType().setContainer(true).setObject(true);

        // SASトークンがBlobにアクセスできるように設定
        AccountSasService services = new AccountSasService().setBlobAccess(true);

        // SASトークンの有効期限を5分に設定
        OffsetDateTime expiryTime = OffsetDateTime.now().plus(Duration.ofMinutes(5));

        // SASトークンを作成
        AccountSasSignatureValues sasValues = new AccountSasSignatureValues(
           expiryTime, permissions, services, resourceTypes);
        return client.generateAccountSas(sasValues);
    }

}

その他のソースコード内容は、以下のサイトを参照のこと。
https://github.com/purin-it/azure/tree/master/azure-functions-spring-batch-chunk/demoAzureFunc



サンプルプログラムの実行結果

サンプルプログラムの実行結果は、以下の通り。

1) エラーが発生しない場合のCSVファイル・ログ・取り込み後のDBの内容は以下の通りで、CSVファイルの内容が、SQLデータベース上のUSER_DATAテーブルに全て取り込まれたことが確認できる。
サンプルプログラムの実行結果_CSV_1

サンプルプログラムの実行結果_ログ_1_1 サンプルプログラムの実行結果_ログ_1_2 サンプルプログラムの実行結果_DB_1



2) 赤枠でエラーが発生する場合のCSVファイル・ログ・取り込み後のDBの内容は以下の通りで、ログにエラー内容が出力され、SQLデータベース上のUSER_DATAテーブルに、エラーでないid=3のデータのみ取り込まれたことが確認できる。
サンプルプログラムの実行結果_CSV_2

サンプルプログラムの実行結果_ログ_2_1 サンプルプログラムの実行結果_ログ_2_2 サンプルプログラムの実行結果_DB_2_2

要点まとめ

  • Azure Functions上でSpring Batchを利用する際、TaskletモデルだけでなくChunkモデルも利用できる。
  • Chunkモデルでは、ファイルの読み込み/データの加工/DBへの書き込みといった処理の流れを定型化している。
  • ChunkモデルでAzure Blob Storageのファイル読み込む際は、SASトークンを発行する必要がある。