Spring BatchのChunkモデルは、以下のサイトに記載の通り、ファイルの読み込み/データの加工/DBへの書き込みといった処理の流れを定型化している。
https://spring.pleiades.io/spring-batch/docs/current/reference/html/step.html#chunkOrientedProcessing
また、Spring BatchのChunkモデルには、ファイルの読み込み/データの加工/DBへの書き込みの各処理の前後に、別の処理を追加するListenerクラスも用意されている。
今回は、DBへの書き込み前後・Step実行前後のそれぞれに、処理結果を出力する処理を追加してみたので、そのサンプルプログラムを共有する。
前提条件
下記記事のサンプルプログラムを作成済であること。
サンプルプログラムの作成
作成したサンプルプログラムの構成は以下の通り。

なお、上記の赤枠は、今回追加・変更したプログラムである。
チェック処理を行うクラスの内容は以下の通りで、ログ出力に加え、BlobStorage「result.txt」に、チェックエラーが発生した旨を記載している。
package com.example.batch;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.example.mybatis.model.UserData;
import com.example.service.DemoBlobService;
import com.example.util.DemoStringUtil;
@Component
public class DemoUpdateProcessor implements ItemProcessor<UserData, UserData> {
/* Spring Bootでログ出力するためのLogbackのクラスを生成 */
private static final Logger LOGGER
= LoggerFactory.getLogger(DemoUpdateProcessor.class);
/** BLOBへアクセスするサービス */
@Autowired
private DemoBlobService demoBlobService;
/** 結果を出力するBLOB名 */
private static final String BLOB_NAME = "result.txt";
/**
* 読み込んだデータの加工を行う.
* ここでは、読み込んだデータのチェックを行い、エラーがあればNULLを、
* エラーがなければ引数の値をそのまま返す.
*/
@Override
public UserData process(UserData item) throws Exception {
String itemId = StringUtils.isEmpty(item.getId()) ? "(未設定)" : item.getId();
LOGGER.info("読み込んだデータの加工を行います。ID=" + itemId);
// 1列目が空またはNULLの場合はエラー
if (StringUtils.isEmpty(itemId)) {
printLog(itemId, "1列目が空またはNULLです。");
return null;
}
// 1列目が数値以外の場合はエラー
if (!StringUtils.isNumeric(itemId)) {
printLog(itemId, "1列目が数値以外です。");
return null;
}
// 1列目の桁数が不正な場合はエラー
if (itemId.length() > 6) {
printLog(itemId, "1列目の桁数が不正です。");
return null;
}
// 2列目が空またはNULLの場合はエラー
if (StringUtils.isEmpty(item.getName())) {
printLog(itemId, "2列目が空またはNULLです。");
return null;
}
// 2列目の桁数が不正な場合はエラー
if (item.getName().length() > 40) {
printLog(itemId, "2列目の桁数が不正です。");
return null;
}
// 3列目が空またはNULLの場合はエラー
if (StringUtils.isEmpty(item.getBirth_year())) {
printLog(itemId, "3列目が空またはNULLです。");
return null;
}
// 3列目が数値以外の場合はエラー
if (!StringUtils.isNumeric(item.getBirth_year())) {
printLog(itemId, "3列目が数値以外です。");
return null;
}
// 3列目の桁数が不正な場合はエラー
if (item.getBirth_year().length() > 4) {
printLog(itemId, "3列目の桁数が不正です。");
return null;
}
// 4列目が空またはNULLの場合はエラー
if (StringUtils.isEmpty(item.getBirth_month())) {
printLog(itemId, "4列目が空またはNULLです。");
return null;
}
// 4列目が数値以外の場合はエラー
if (!StringUtils.isNumeric(item.getBirth_month())) {
printLog(itemId, "4列目が数値以外です。");
return null;
}
// 4列目の桁数が不正な場合はエラー
if (item.getBirth_month().length() > 4) {
printLog(itemId, "4列目の桁数が不正です。");
return null;
}
// 5列目が空またはNULLの場合はエラー
if (StringUtils.isEmpty(item.getBirth_day())) {
printLog(itemId, "5列目が空またはNULLです。");
return null;
}
// 5列目が数値以外の場合はエラー
if (!StringUtils.isNumeric(item.getBirth_day())) {
printLog(itemId, "5列目が数値以外です。");
return null;
}
// 5列目の桁数が不正な場合はエラー
if (item.getBirth_day().length() > 2) {
printLog(itemId, "5列目の桁数が不正です。");
return null;
}
// 3列目・4列目・5列目から生成される日付が不正であればエラー
String birthDay = item.getBirth_year()
+ DemoStringUtil.addZero(item.getBirth_month())
+ DemoStringUtil.addZero(item.getBirth_day());
if (!DemoStringUtil.isCorrectDate(birthDay, "uuuuMMdd")) {
printLog(itemId, "3~5列目の日付が不正です。");
return null;
}
// 6列目が1,2以外の場合はエラー
if (!("1".equals(item.getSex())) && !("2".equals(item.getSex()))) {
printLog(itemId, "6列目の性別が不正です。");
return null;
}
// 7列目の桁数が不正な場合はエラー
if (!StringUtils.isEmpty(item.getMemo()) && item.getMemo().length() > 1024) {
printLog(itemId, "7列目の桁数が不正です。");
return null;
}
return item;
}
/**
* エラーメッセージを出力する
* @param itemId ID
* @param message メッセージ
*/
private void printLog(String itemId, String message) {
// ログにエラーメッセージを書き込む
LOGGER.info(message + " 該当のID=" + itemId);
// BlobStorageにチェックエラーだった旨のメッセージを書き込む
demoBlobService.writeAppendBlob(BLOB_NAME, "ID: " + itemId + ": チェックエラー");
}
}また、BlobStorageにアクセスするクラスの内容は以下の通りで、Blobへの追記・削除・URL取得を行っている。
package com.example.service;
import java.time.Duration;
import java.time.OffsetDateTime;
import javax.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.BlobServiceClientBuilder;
import com.azure.storage.common.sas.AccountSasPermission;
import com.azure.storage.common.sas.AccountSasResourceType;
import com.azure.storage.common.sas.AccountSasService;
import com.azure.storage.common.sas.AccountSasSignatureValues;
import com.microsoft.azure.storage.CloudStorageAccount;
import com.microsoft.azure.storage.blob.CloudBlobClient;
import com.microsoft.azure.storage.blob.CloudBlobContainer;
import com.microsoft.azure.storage.blob.BlobRequestOptions;
import com.microsoft.azure.storage.blob.CloudAppendBlob;
import io.netty.util.internal.StringUtil;
@Service
public class DemoBlobService {
/** Azure Storageのアカウント名 */
@Value("${azure.storage.accountName}")
private String storageAccountName;
/** Azure Storageへのアクセスキー */
@Value("${azure.storage.accessKey}")
private String storageAccessKey;
/** Azure StorageのBlobコンテナー名 */
@Value("${azure.storage.containerName}")
private String storageContainerName;
/** 改行コード */
private static final String LINE_SEPARATOR = "\r\n";
/** 文字コード */
private static final String CHARCTER_CODE = "UTF-8";
// Blobストレージへの接続文字列
private String storageConnectionString = null;
/**
* Blobストレージへの接続文字列の初期化処理
*/
@PostConstruct
public void init() {
// Blobストレージへの接続文字列を設定
storageConnectionString = "DefaultEndpointsProtocol=https;"
+ "AccountName=" + storageAccountName + ";"
+ "AccountKey=" + storageAccessKey + ";";
}
/**
* 引数で指定されたファイルに、引数で指定されたメッセージを追加する.
* @param fileName ファイル名
* @param message メッセージ
*/
public void writeAppendBlob(String fileName, String message) {
// 未指定の項目がある場合、何もしない
if(StringUtil.isNullOrEmpty(storageAccountName)
|| StringUtil.isNullOrEmpty(storageAccessKey)
|| StringUtil.isNullOrEmpty(storageContainerName)
|| StringUtil.isNullOrEmpty(fileName)
|| StringUtil.isNullOrEmpty(message)) {
return;
}
// ファイルアップロード処理
// Blob内のコンテナー内の指定したファイルに、指定したメッセージを追記モードで書き込む
try {
CloudAppendBlob cab = getContainer().getAppendBlobReference(fileName);
if (!cab.exists()) {
cab.createOrReplace();
}
BlobRequestOptions options = new BlobRequestOptions();
options.setAbsorbConditionalErrorsOnRetry(true);
cab.appendText(message + LINE_SEPARATOR, CHARCTER_CODE, null, options, null);
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
/**
* 引数で指定されたファイルが存在する場合、削除する.
* @param fileName ファイル名
*/
public void deleteAppendBlob(String fileName) {
// 未指定の項目がある場合、何もしない
if(StringUtil.isNullOrEmpty(storageAccountName)
|| StringUtil.isNullOrEmpty(storageAccessKey)
|| StringUtil.isNullOrEmpty(storageContainerName)
|| StringUtil.isNullOrEmpty(fileName)) {
return;
}
// Blob内のコンテナー内の指定したファイルが存在する場合、削除する
try {
CloudAppendBlob cab = getContainer().getAppendBlobReference(fileName);
cab.deleteIfExists();
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
/**
* 引数で指定されたファイルのURLを返却する.
* @param fileName
* @return
*/
public String getBlobUrl(String fileName) {
// 未指定の項目がある場合、nullを返す
if(StringUtil.isNullOrEmpty(storageAccountName)
|| StringUtil.isNullOrEmpty(storageAccessKey)
|| StringUtil.isNullOrEmpty(storageContainerName)
|| StringUtil.isNullOrEmpty(fileName)) {
return null;
}
// Blob内のコンテナー内の指定したファイルのURLを返却する
try {
// Blobサービスクライアントの生成
BlobServiceClient blobServiceClient = new BlobServiceClientBuilder()
.connectionString(storageConnectionString).buildClient();
// BlobサービスクライアントからSASを生成
String sas = this.getServiceSasUriForBlob(blobServiceClient);
// Blob内のコンテナー内の指定したファイルのURLを返却
String url = blobServiceClient.getAccountUrl() + "/"
+ storageContainerName + "/" + fileName
+ "?" + sas;
return url;
} catch(Exception ex) {
throw new RuntimeException(ex);
}
}
/**
* Blobストレージのコンテナを取得する.
* @return Blobストレージのコンテナ
* @throws Exception
*/
private CloudBlobContainer getContainer() throws Exception {
// ストレージアカウントオブジェクトを取得
CloudStorageAccount storageAccount
= CloudStorageAccount.parse(storageConnectionString);
// Blobクライアントオブジェクトを取得
CloudBlobClient blobClient = storageAccount.createCloudBlobClient();
// Blob内のコンテナーを取得
CloudBlobContainer container
= blobClient.getContainerReference(storageContainerName);
return container;
}
/**
* 引数で指定したBlobのSASトークンを生成し返す.
* @param client Blobサービスクライアントオブジェクト
* @return SASトークン
*/
private String getServiceSasUriForBlob(BlobServiceClient client) {
// SASトークンのアクセス権を読み取り可能に設定
AccountSasPermission permissions
= new AccountSasPermission().setReadPermission(true);
// SASトークンがBlobコンテナやオブジェクトにアクセスできるように設定
AccountSasResourceType resourceTypes
= new AccountSasResourceType().setContainer(true).setObject(true);
// SASトークンがBlobにアクセスできるように設定
AccountSasService services = new AccountSasService().setBlobAccess(true);
// SASトークンの有効期限を5分に設定
OffsetDateTime expiryTime = OffsetDateTime.now().plus(Duration.ofMinutes(5));
// SASトークンを作成
AccountSasSignatureValues sasValues
= new AccountSasSignatureValues(expiryTime, permissions
, services, resourceTypes);
return client.generateAccountSas(sasValues);
}
}
さらに、Chunkモデルの出力(DB追加/更新処理)前後の処理を記載したクラスの内容は以下の通りで、BlobStorage「result.txt」に、書き込み結果を記載している。
package com.example.batch;
import java.util.List;
import org.springframework.batch.core.ItemWriteListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.example.mybatis.model.UserData;
import com.example.service.DemoBlobService;
@Component
public class DemoUpdateListener implements ItemWriteListener<UserData>{
/** BLOBへアクセスするサービス */
@Autowired
private DemoBlobService demoBlobService;
/** 結果を出力するBLOB名 */
private static final String BLOB_NAME = "result.txt";
/**
* データ書き込み前の処理を定義する.
*/
@Override
public void beforeWrite(List<? extends UserData> items) {
// 何もしない
}
/**
* データ書き込み後の処理を定義する.
*/
@Override
public void afterWrite(List<? extends UserData> items) {
// BlobStorageに正常終了だった旨のメッセージを書き込む
for(UserData u : items) {
demoBlobService.writeAppendBlob(BLOB_NAME, "ID: " + u.getId() + ": 書き込み正常");
}
}
/**
* データ書き込みエラー後の処理を定義する.
*/
@Override
public void onWriteError(Exception exception, List<? extends UserData> items) {
// BlobStorageに異常終了だった旨のメッセージを書き込む
for(UserData u : items) {
demoBlobService.writeAppendBlob(BLOB_NAME, "ID: " + u.getId() + ": 書き込み異常");
}
}
}また、Spring Batchのジョブ内で指定する処理単位(ステップ)実行前後の処理を記載したクラスの内容は以下の通りで、開始前にBlobStorage「result.txt」を削除し、終了後に読み込み件数・書き込み件数をログ出力している。
package com.example.batch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.example.service.DemoBlobService;
@Component
public class DemoStepListener implements StepExecutionListener {
/* Spring Bootでログ出力するためのLogbackのクラスを生成 */
private static final Logger LOGGER
= LoggerFactory.getLogger(DemoStepListener.class);
/** BLOBへアクセスするサービス */
@Autowired
private DemoBlobService demoBlobService;
/** 結果を出力するBLOB名 */
private static final String BLOB_NAME = "result.txt";
/**
* Spring Batchのジョブ内で指定する処理単位(ステップ)実行前の処理を定義する.
*/
@Override
public void beforeStep(StepExecution stepExecution) {
// 結果を出力するBLOBが残っていれば削除
demoBlobService.deleteAppendBlob(BLOB_NAME);
}
/**
* Spring Batchのジョブ内で指定する処理単位(ステップ)実行後の処理を定義する.
*/
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
// 読み込み件数・書き込み件数を取得しログ出力
int readCnt = stepExecution.getReadCount();
int writeCnt = stepExecution.getWriteCount();
LOGGER.info("読み込み件数:" + readCnt + ", 書き込み件数:" + writeCnt);
return ExitStatus.COMPLETED;
}
}さらに、Spring Batchの定義クラスは以下の通りで、DemoUpdateListener・DemoStepListenerの呼出処理を追加している。
package com.example.batch;
import java.net.MalformedURLException;
import javax.sql.DataSource;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.batch.MyBatisBatchItemWriter;
import org.mybatis.spring.batch.builder.MyBatisBatchItemWriterBuilder;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.UrlResource;
import com.example.mybatis.model.UserData;
import com.example.service.DemoBlobService;
import lombok.RequiredArgsConstructor;
@Configuration
@EnableBatchProcessing
@RequiredArgsConstructor
public class DemoChunkConfig {
/** ジョブ生成ファクトリ */
public final JobBuilderFactory jobBuilderFactory;
/** ステップ生成ファクトリ */
public final StepBuilderFactory stepBuilderFactory;
/** SQLセッションファクトリ */
public final SqlSessionFactory sqlSessionFactory;
/** データ加工処理 */
private final DemoUpdateProcessor demoUpdateProcessor;
/** データ書き込み前後処理 */
private final DemoUpdateListener demoUpdateListener;
/** ステップの前後処理 */
private final DemoStepListener demoStepListener;
/** BLOBへアクセスするサービス */
@Autowired
private DemoBlobService demoBlobService;
/**
* BlobStorageからファイルを読み込む.
* @return 読み込みオブジェクト
*/
@Bean
public FlatFileItemReader<UserData> reader() {
FlatFileItemReader<UserData> reader = new FlatFileItemReader<UserData>();
try {
// BlobStorageからファイル(user_data.csv)を読み込む際のURLをreaderに設定
String url = demoBlobService.getBlobUrl("user_data.csv");
reader.setResource(new UrlResource(url));
} catch (MalformedURLException ex) {
throw new RuntimeException(ex);
}
// ファイルを読み込む際の文字コード
reader.setEncoding("UTF-8");
// 1行目を読み飛ばす
reader.setLinesToSkip(1);
// ファイルから読み込んだデータをUserDataオブジェクトに格納
reader.setLineMapper(new DefaultLineMapper<UserData>() {
{
setLineTokenizer(new DelimitedLineTokenizer() {
{
setNames(new String[] { "id", "name", "birth_year"
, "birth_month", "birth_day", "sex", "memo" });
}
});
setFieldSetMapper(new BeanWrapperFieldSetMapper<UserData>() {
{
setTargetType(UserData.class);
}
});
}
});
return reader;
}
/**
* 読み込んだファイルのデータを、DBに書き込む.
* @param dataSource データソース
* @return 書き込みオブジェクト
*/
@Bean
public MyBatisBatchItemWriter<UserData> writer(DataSource dataSource) {
return new MyBatisBatchItemWriterBuilder<UserData>()
.sqlSessionFactory(sqlSessionFactory)
.statementId("com.example.mybatis.UserDataMapper.upsert")
.build();
}
/**
* Spring Batchのジョブ内で指定する処理単位(ステップ)を定義する.
* @param reader 読み込みオブジェクト
* @param writer 書き込みオブジェクト
* @return Spring Batchのジョブ内で指定する処理単位
*/
@Bean
public Step step(ItemReader<UserData> reader, ItemWriter<UserData> writer) {
// 生成するステップ内で読み込み/加工/書き込み/書き込み後/ステップ実行前後の処理の一連の流れを指定する
// その際のチャンクサイズ(=何件読み込む毎にコミットするか)を3に指定している
return stepBuilderFactory.get("step")
.<UserData, UserData>chunk(3)
.reader(reader)
.processor(demoUpdateProcessor)
.writer(writer)
.listener(demoUpdateListener)
.listener(demoStepListener)
.build();
}
/**
* Spring Batchのジョブを定義する.
* @param jobListener 実行前後の処理(リスナ)
* @param step 実行するステップ
* @return Spring Batchのジョブ
*/
@Bean
public Job updateUserData(DemoJobListener jobListener, Step step) {
// 生成するジョブ内で、実行前後の処理(リスナ)と処理単位(ステップ)を指定する
return jobBuilderFactory
.get("updateUserData")
.incrementer(new RunIdIncrementer())
.listener(jobListener)
.flow(step)
.end()
.build();
}
}その他のソースコード内容は、以下のサイトを参照のこと。
https://github.com/purin-it/azure/tree/master/azure-functions-spring-batch-chunk-listener/demoAzureFunc
サンプルプログラムの実行結果
サンプルプログラムの実行結果は、以下の通り。
1) 以下のサイトの「サンプルプログラムの実行結果(ローカル)」「サンプルプログラムの実行結果(Azure上)」に記載の手順で、サンプルプログラムをAzure Functionsにデプロイする。
2) 赤枠でエラーが発生する場合のCSVファイルをBlob Storageに配置し、作成したバッチ処理を実行する。


3) バッチ実行後のログ出力内容は以下の通りで、エラー内容や処理件数が出力されることが確認できる。


4) バッチ実行後にBlob Storageを確認すると、結果ファイル「result.txt」が作成され、各データの処理結果が出力されていることが確認できる。


5) バッチ実行後にUSER_DATAテーブルを確認すると、正常に書き込みできたデータが全件登録されていることが確認できる。

6) 以下のように、性別のチェック処理をコメントアウトし、バッチを再実行する。なお、読み込むCSVファイルの内容は、1)と同様とする。

7) バッチ実行後のログ出力内容を確認すると、以下のようになっていることが確認できる。


8) バッチ実行後の結果ファイル「result.txt」を確認すると、以下のようになっていることが確認できる。


9) バッチ実行後のUSER_DATAテーブルの内容を確認すると、以下のようになっていることが確認できる。

要点まとめ
- Spring BatchのChunkモデルで、ファイルの読み込み/データの加工/DBへの書き込みの各処理の前後に、別の処理を追加するには、そのためのListenerクラスも利用すればよい。





