TimerTrigger/SpringBatch

Spring BatchのChunkモデルを利用したバッチで中間ファイルを作らずCSVファイルを出力してみた

以下の記事で、Spring BatchのChunkモデルを利用したバッチで、DBのデータをAzure Blob StorageにCSVファイルを出力する処理を実装していたが、このときは、Azure Functions(Linux)上の、「/home/tmp_user_data.csv」に中間ファイルを出力していた。

Azure Function上でSpring BatchのChunkモデルを利用してDBデータをCSVに出力してみたこれまでこのブログでは、Blob上のCSVファイルをDBのテーブルに書き込む処理を作成していたが、逆に、DBのテーブルデータをBlob上...

今回は、同様の処理を、中間ファイルを利用せずに、Spring BatchのChunkモデルを利用して、DBのデータをAzure Blob StorageにCSVファイルを出力してみたので、そのサンプルプログラムを共有する。

前提条件

下記記事のサンプルプログラムを作成済であること。

Azure Function上でSpring BatchのChunkモデルを利用してDBデータをCSVに出力してみたこれまでこのブログでは、Blob上のCSVファイルをDBのテーブルに書き込む処理を作成していたが、逆に、DBのテーブルデータをBlob上...

サンプルプログラムの作成

作成したサンプルプログラムの構成は、以下の通り。
サンプルプログラムの構成
なお、上記の赤枠は、今回追加・変更したプログラムである。

pom.xml、local.settings.jsonは、以下の記事の「作成したサンプルプログラムの内容(Azure Functions)」と同様の修正を行っている。

Azure App ServiceやAzure Functionsで使うSpring Bootのバージョンを2.7.xに変更してみたSpring Bootの各バージョン毎のサポート期間は、以下のサイトで確認できる。 https://spring.pleiades.i...

また、CSVファイルをAzure Blob Storageに出力する処理は、ItemWriterインタフェースを継承した以下のクラスで実装している。

package com.example.batch;

import java.util.List;

import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import com.example.mybatis.model.UserData;
import com.example.service.DemoBlobService;
import com.example.util.DemoStringUtil;

@Component
public class DemoItemWriter implements ItemWriter<UserData> {
  
  /** BLOBへアクセスするサービス */
  @Autowired
  private DemoBlobService demoBlobService;
  
  /** 取得したUSER_DATAテーブルのデータを出力するBLOB名 */
  @Value("${blob.name}")
  private String blobName;
  
  /** 改行コード名 */
  @Value("${line.sep.name}")
  private String lineSepName;
  
  /** CSV書き込み方式 */
  @Value("${csv.write.method}")
  private String csvWriteMethod;
  
  /**
   * 読み込んだDBのデータを、BLOB(CSVファイル)に書き込む処理を定義する.
   */
  @Override
  public void write(List<? extends UserData> items) {
    // 書き込みデータを格納する文字列
    StringBuilder sb = new StringBuilder();

    // 改行コードを変換
    String lineSeparator = DemoStringUtil.getLineSepCode(lineSepName);
    
    // CSV書き込み方式が一括(ALL)の場合
    if ("ALL".equals(csvWriteMethod)) {
      // ヘッダー行を、書き込みデータに追加
      String header = "id,name,birth_year,birth_month,birth_day,sex,memo";
      sb.append(header + lineSeparator);
    }
    
    // 読み込んだDBのデータ各行を、書き込みデータに追加
    for(UserData item : items) {
      // ID
      sb.append(item.getId());
      sb.append(",");
      // 名前
      sb.append(DemoStringUtil.addDoubleQuote(item.getName()));
      sb.append(",");
      // 生年月日_年
      sb.append(item.getBirth_year());
      sb.append(",");
      // 生年月日_月
      sb.append(item.getBirth_month());
      sb.append(",");
      // 生年月日_日
      sb.append(item.getBirth_day());
      sb.append(",");
      // 性別
      sb.append(DemoStringUtil.addDoubleQuote(item.getSex()));
      sb.append(",");
      // メモ
      sb.append(DemoStringUtil.addDoubleQuote(item.getMemo()));
      // 改行コード
      sb.append(lineSeparator);
    }
    
    // CSV書き込み方式に応じて、書き込みデータを、BLOB(CSVファイル)に書き込む
    if ("ALL".equals(csvWriteMethod)) {
      demoBlobService.writeBlockBlob(blobName, sb.toString());
    } else {
      demoBlobService.writeAppendBlob(blobName, sb.toString());
    }
  }

}

さらに、ItemWriterインタフェースを継承したクラスは、以下のSpring Batch定義クラスで呼び出している。

package com.example.batch;

import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.batch.MyBatisPagingItemReader;
import org.mybatis.spring.batch.builder.MyBatisPagingItemReaderBuilder;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.example.mybatis.model.UserData;

import lombok.RequiredArgsConstructor;

@Configuration
@EnableBatchProcessing
@RequiredArgsConstructor
public class DemoChunkConfig {

  /** ジョブ生成ファクトリ */
  public final JobBuilderFactory jobBuilderFactory;

  /** ステップ生成ファクトリ */
  public final StepBuilderFactory stepBuilderFactory;

  /** SQLセッションファクトリ */
  public final SqlSessionFactory sqlSessionFactory;
  
  /** ステップの前後処理 */
  private final DemoStepListener demoStepListener;
  
  /** データ書き込み処理 */
  private final DemoItemWriter demoItemWriter;
  
  /** チャンクサイズ */
  @Value("${chunk.size}")
  private String chunkSize;
  
  /**
   * MyBatisPagingItemReaderを使ってUSER_DATAデータをページを分けて取得する.
   * @return 読み込みオブジェクト
   */
  @Bean
  public MyBatisPagingItemReader<UserData> reader(){
    return new MyBatisPagingItemReaderBuilder<UserData>()
        .sqlSessionFactory(sqlSessionFactory)
        .queryId("com.example.mybatis.UserDataMapper.findAll")
        .pageSize(Integer.parseInt(chunkSize))
        .build();
  }

  /**
   * Spring Batchのジョブ内で指定する処理単位(ステップ)を定義する.
   * @param reader 読み込みオブジェクト
   * @param writer 書き込みオブジェクト
   * @return Spring Batchのジョブ内で指定する処理単位
   */
  @Bean
  public Step step(MyBatisPagingItemReader<UserData> reader
        , ItemWriter<UserData> writer) {
    // 生成するステップ内で読み込み/書き込みを一連の流れを指定する
    // その際、読み込みデータの加工は行わず、
    // チャンクサイズ(=何件読み込む毎にコミットするか)を指定している
    return stepBuilderFactory.get("step")
        .<UserData, UserData>chunk(Integer.parseInt(chunkSize))
        .reader(reader)
        .writer(demoItemWriter)
        .listener(demoStepListener)
        .build();
  }

  /**
   * Spring Batchのジョブを定義する.
   * @param jobListener 実行前後の処理(リスナ)
   * @param step        実行するステップ
   * @return Spring Batchのジョブ
   */
  @Bean
  public Job updateUserData(DemoJobListener jobListener, Step step) {
    // 生成するジョブ内で、実行前後の処理(リスナ)と処理単位(ステップ)を指定する
    return jobBuilderFactory
        .get("selectUserData")
        .incrementer(new RunIdIncrementer())
        .listener(jobListener)
        .flow(step)
        .end()
        .build();
  }
}

また、Spring Batchのジョブ内で指定する処理単位(ステップ)実行前後の処理を記載したクラスの内容は以下の通りで、Blob Storageのファイル削除やCSVファイルヘッダーの書き込みを行っている。

package com.example.batch;

import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import com.example.service.DemoBlobService;
import com.example.util.DemoStringUtil;

@Component
public class DemoStepListener implements StepExecutionListener {
  
  /** BLOBへアクセスするサービス */
  @Autowired
  private DemoBlobService demoBlobService;

  /** 取得したUSER_DATAテーブルのデータを出力するBLOB名 */
  @Value("${blob.name}")
  private String blobName;
  
  /** 改行コード名 */
  @Value("${line.sep.name}")
  private String lineSepName;
  
  /** CSV書き込み方式 */
  @Value("${csv.write.method}")
  private String csvWriteMethod;
  
  /**
   * Spring Batchのジョブ内で指定する処理単位(ステップ)実行前の処理を定義する.
   */
  @Override
  public void beforeStep(StepExecution stepExecution) {
    // Blobファイル(BlockBlob,追加Blob)があれば削除
    demoBlobService.deleteBlockBlob(blobName);
    demoBlobService.deleteAppendBlob(blobName);
    
    // 改行コードを変換
    String lineSeparator = DemoStringUtil.getLineSepCode(lineSepName);
    
    // CSV書き込み方式が分割(DIV)の場合
    if ("DIV".equals(csvWriteMethod)) {
      // Blobファイルを作成後、ヘッダー行を出力
      String header = "id,name,birth_year,birth_month,birth_day,sex,memo";
      demoBlobService.writeAppendBlob(blobName, header + lineSeparator);
    }

  }

  /**
   * Spring Batchのジョブ内で指定する処理単位(ステップ)実行後の処理を定義する.
   */
  @Override
  public ExitStatus afterStep(StepExecution stepExecution) {
    return ExitStatus.COMPLETED;
  }

}

さらに、Blob Storageにアクセスするクラスの内容は以下の通りで、Blob Storageへのファイルを書き込みや削除を行っている。

package com.example.service;

import javax.annotation.PostConstruct;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import com.microsoft.azure.storage.CloudStorageAccount;
import com.microsoft.azure.storage.blob.BlobRequestOptions;
import com.microsoft.azure.storage.blob.CloudAppendBlob;
import com.microsoft.azure.storage.blob.CloudBlobClient;
import com.microsoft.azure.storage.blob.CloudBlobContainer;
import com.microsoft.azure.storage.blob.CloudBlockBlob;

import io.netty.util.internal.StringUtil;

@Service
public class DemoBlobService {

  /** Azure Storageのアカウント名 */
  @Value("${azure.storage.accountName}")
  private String storageAccountName;

  /** Azure Storageへのアクセスキー */
  @Value("${azure.storage.accessKey}")
  private String storageAccessKey;

  /** Azure StorageのBlobコンテナー名 */
  @Value("${azure.storage.containerName}")
  private String storageContainerName;
  
  /** 文字コード */
  @Value("${character.code}")
  private String characterCode;
  
  // Blobストレージへの接続文字列
  private String storageConnectionString = null;
  
  /**
   * Blobストレージへの接続文字列の初期化処理
   */
  @PostConstruct
  public void init() {
    // Blobストレージへの接続文字列を設定
    storageConnectionString = "DefaultEndpointsProtocol=https;" 
        + "AccountName=" + storageAccountName + ";"
        + "AccountKey=" + storageAccessKey + ";";
  }
  
  /**
   * 引数で指定されたファイル(BlockBlob)に、引数で指定されたメッセージを書き込む.
   * @param fileName ファイル名(BlockBlob)
   * @param message メッセージ
   */
  public void writeBlockBlob(String fileName, String message) {
    // 未指定の項目がある場合、何もしない
    if(StringUtil.isNullOrEmpty(storageAccountName) 
        || StringUtil.isNullOrEmpty(storageAccessKey) 
        || StringUtil.isNullOrEmpty(storageContainerName)
        || StringUtil.isNullOrEmpty(fileName)
        || StringUtil.isNullOrEmpty(message)) {
      return;
    }
    
    // ファイルアップロード処理
    // Blob内のコンテナー内の指定したファイル(BlockBlob)に、指定したメッセージを書き込む
    try {
      CloudBlockBlob cbb = getContainer().getBlockBlobReference(fileName);
      cbb.deleteIfExists();
      BlobRequestOptions options = new BlobRequestOptions();
      options.setAbsorbConditionalErrorsOnRetry(true);
      cbb.uploadText(message, characterCode, null, options, null);
    } catch (Exception ex) {
      throw new RuntimeException(ex);
    }
  }
  
  /**
   * 引数で指定されたBlob(BlockBlob)が存在する場合、削除する.
   * @param blobName Blob名(BlockBlob)
   */
  public void deleteBlockBlob(String fileName) {
    // 未指定の項目がある場合、何もしない
    if(StringUtil.isNullOrEmpty(storageAccountName) 
        || StringUtil.isNullOrEmpty(storageAccessKey) 
        || StringUtil.isNullOrEmpty(storageContainerName)
        || StringUtil.isNullOrEmpty(fileName)) {
      return;
    }
    
    // Blob内のコンテナー内の指定したファイル(BlockBlob)が存在する場合、削除する
    try {
      CloudBlockBlob cbb = getContainer().getBlockBlobReference(fileName);
      cbb.deleteIfExists();
    } catch (Exception ex) {
      throw new RuntimeException(ex);
    }
  }
  
  /**
   * 引数で指定されたファイル(追加Blob)に、引数で指定されたメッセージを追加する.
   * @param fileName ファイル名(追加Blob)
   * @param message メッセージ
   */
  public void writeAppendBlob(String fileName, String message) {
    // 未指定の項目がある場合、何もしない
    if(StringUtil.isNullOrEmpty(storageAccountName) 
        || StringUtil.isNullOrEmpty(storageAccessKey) 
        || StringUtil.isNullOrEmpty(storageContainerName)
        || StringUtil.isNullOrEmpty(fileName)
        || StringUtil.isNullOrEmpty(message)) {
      return;
    }
    
    // ファイルアップロード処理
    // Blob内のコンテナー内の指定したファイル(追加Blob)に、指定したメッセージを追記モードで書き込む
    try {
      CloudAppendBlob cab = getContainer().getAppendBlobReference(fileName);
      if (!cab.exists()) {
        cab.createOrReplace();
      }
      BlobRequestOptions options = new BlobRequestOptions();
      options.setAbsorbConditionalErrorsOnRetry(true);
      cab.appendText(message, characterCode, null, options, null);    
    } catch (Exception ex) {
      throw new RuntimeException(ex);
    }
  }
  
  /**
   * 引数で指定されたファイル(追加Blob)が存在する場合、削除する.
   * @param fileName ファイル名(追加Blob)
   */
  public void deleteAppendBlob(String fileName) {
    // 未指定の項目がある場合、何もしない
    if(StringUtil.isNullOrEmpty(storageAccountName) 
        || StringUtil.isNullOrEmpty(storageAccessKey) 
        || StringUtil.isNullOrEmpty(storageContainerName)
        || StringUtil.isNullOrEmpty(fileName)) {
      return;
    }
    
    // Blob内のコンテナー内の指定したファイル(追加Blob)が存在する場合、削除する
    try {
      CloudAppendBlob cab = getContainer().getAppendBlobReference(fileName);
      cab.deleteIfExists();    
    } catch (Exception ex) {
      throw new RuntimeException(ex);
    }
  }
  
  /**
   * Blobストレージのコンテナを取得する.
   * @return Blobストレージのコンテナ
   * @throws Exception
   */
  private CloudBlobContainer getContainer() throws Exception {
    // ストレージアカウントオブジェクトを取得
    CloudStorageAccount storageAccount 
        = CloudStorageAccount.parse(storageConnectionString);

    // Blobクライアントオブジェクトを取得
    CloudBlobClient blobClient 
        = storageAccount.createCloudBlobClient();

    // Blob内のコンテナーを取得
    CloudBlobContainer container 
        = blobClient.getContainerReference(storageContainerName);
    return container;
  }

}

また、application.propertiesに、CSVファイル出力設定を追加している。

# Azure Storageの接続先
azure.storage.accountName=azureblobpurinit
azure.storage.accessKey=(Azure Blob Storageのアクセスキー)
azure.storage.blob-endpoint=https://azureblobpurinit.blob.core.windows.net
azure.storage.containerName=blobcontainer

# DB接続設定
spring.datasource.url=jdbc:sqlserver://azure-db-purinit.database.windows.net:1433;database=azureSqlDatabase
spring.datasource.username=purinit@azure-db-purinit
spring.datasource.password=(DBのパスワード)
spring.datasource.driverClassName=com.microsoft.sqlserver.jdbc.SQLServerDriver

## ファイル出力設定
# チャンクサイズ
chunk.size=300000
# BLOB名
blob.name=user_data.csv
# 文字コード
character.code=UTF-8
# 改行コード名(CR/LF/CRLF)
line.sep.name=LF
# CSV書き込み方式(ALL(一括)/DIV(分割))
csv.write.method=ALL

その他のソースコード内容は、以下のサイトを参照のこと。
https://github.com/purin-it/azure/tree/master/azure-functions-db-to-csv-direct/demoAzureFunc



「DesignEvo」は多くのテンプレートからロゴを簡単に作成できるツールだった多くのテンプレートが用意されていてロゴを簡単に作成できるツールの一つに、「DesignEvo」があります。今回は、「DesignEvo」...

サンプルプログラムの実行結果

サンプルプログラムの実行結果は、下記記事の「サンプルプログラムの実行結果」と同じ結果となる。

Azure Function上でSpring BatchのChunkモデルを利用してDBデータをCSVに出力してみたこれまでこのブログでは、Blob上のCSVファイルをDBのテーブルに書き込む処理を作成していたが、逆に、DBのテーブルデータをBlob上...

ただし、CSV書き込み方式をDIV(分割)に設定した場合、以下のように、CSVファイルが、追加BLOBとして出力されることが確認できる。
追加BLOBの出力

また、CSV書き込み方式をALL(一括)に設定した場合、以下のように、CSVファイルが、ブロックBLOBとして出力されることが確認できる。
ブロックBLOBの出力

さらに、取得元のDB(Azure SQL Database上のUSER_DATAテーブル)に30万件のデータを入れた場合の性能は、以下の通り。

1) 下記記事の「テストデータ作成」に従って、30万件のデータを作成する。

Spring BatchのChunkモデルを利用したバッチで30万件のDBデータをCSVに出力してみたSpring Batchには、ChunkモデルとTaskletモデルがあり、以下のサイトに記載の通り、大量のデータを処理するにはChun...

2) application.propertiesを、以下のように、チャンクサイズ=10000、CSV書き込み方式=DIV(分割)に変更する。

## ファイル出力設定
# チャンクサイズ
chunk.size=10000
# BLOB名
blob.name=user_data.csv
# 文字コード
character.code=UTF-8
# 改行コード名(CR/LF/CRLF)
line.sep.name=LF
# CSV書き込み方式(ALL(一括)/DIV(分割))
csv.write.method=DIV

3) 以下のサイトの「サンプルプログラムの実行結果(ローカル)」「サンプルプログラムの実行結果(Azure上)」に記載の手順で、サンプルプログラムをAzure Functionsにデプロイする。

Azure Functions上でTimerTriggerによって動作するJavaアプリケーション(Spring Boot上)を作成してみたこれまでは、HTTPリクエストによりAzure Functionsが動作するアプリケーションのみ作成してきたが、Timer Trigge...

4) バッチ実行時のログ出力内容は以下の通りで、83902ms(約1分24秒)で処理が完了していることが確認できる。
サンプルプログラムの実行結果_4

なお、上記ログの確認手順は、以下のサイトを参照のこと。

Azure FunctionsでJavaアプリケーション(Spring Boot上)のLogbackでのログを確認してみたこれまで、Azure Functionsの関数の「モニター」で何度かログを確認したことがあったが、この方法だと、Spring Bootプ...

5) バッチ実行時のCPU・メモリ使用率は以下の通り。
サンプルプログラムの実行結果_5

なお、上記CPU・メモリ使用率の確認手順は、以下のサイトを参照のこと。

Azure FunctionsやAzure App ServiceのCPUやメモリ使用率を確認してみたAzure FunctionsやAzure App Serviceの、CPUやメモリ等のリソース使用量を確認したい場合、Azure Po...

6) application.propertiesを、以下のように、チャンクサイズ=2300に変更し、サンプルプログラムをAzure Functionsにデプロイする。

## ファイル出力設定
# チャンクサイズ
chunk.size=2300
# BLOB名
blob.name=user_data.csv
# 文字コード
character.code=UTF-8
# 改行コード名(CR/LF/CRLF)
line.sep.name=LF
# CSV書き込み方式(ALL(一括)/DIV(分割))
csv.write.method=DIV

7) バッチ実行時のログ出力内容は以下の通りで、76713ms(約1分17秒)で処理が完了していることが確認できる。
サンプルプログラムの実行結果_7

8) バッチ実行時のCPU・メモリ使用率は以下の通り。
サンプルプログラムの実行結果_8

9) application.propertiesを、以下のように、チャンクサイズ=300000、CSV書き込み方式=ALL(一括)に変更し、サンプルプログラムをAzure Functionsにデプロイする。

## ファイル出力設定
# チャンクサイズ
chunk.size=300000
# BLOB名
blob.name=user_data.csv
# 文字コード
character.code=UTF-8
# 改行コード名(CR/LF/CRLF)
line.sep.name=LF
# CSV書き込み方式(ALL(一括)/DIV(分割))
csv.write.method=ALL

10) バッチ実行時のログ出力内容は以下の通りで、68092ms(約1分8秒)で処理が完了していることが確認できる。
サンプルプログラムの実行結果_10

11) バッチ実行時のCPU・メモリ使用率は以下の通りで、メモリ使用率がかなり上昇していることが確認できる。
サンプルプログラムの実行結果_11

要点まとめ

  • Spring BatchのChunkモデルを利用したバッチ処理でも、ItemWriterインタフェースを継承したクラスを利用することで、中間ファイルを利用せずに、DBのデータをAzure Blob StorageにCSVファイルを出力できる。