以下の記事で、Spring BatchのChunkモデルを利用したバッチで、DBのデータをAzure Blob StorageにCSVファイルを出力する処理を実装していたが、このときは、Azure Functions(Linux)上の、「/home/tmp_user_data.csv」に中間ファイルを出力していた。
今回は、同様の処理を、中間ファイルを利用せずに、Spring BatchのChunkモデルを利用して、DBのデータをAzure Blob StorageにCSVファイルを出力してみたので、そのサンプルプログラムを共有する。
前提条件
下記記事のサンプルプログラムを作成済であること。
サンプルプログラムの作成
作成したサンプルプログラムの構成は、以下の通り。
なお、上記の赤枠は、今回追加・変更したプログラムである。
pom.xml、local.settings.jsonは、以下の記事の「作成したサンプルプログラムの内容(Azure Functions)」と同様の修正を行っている。
また、CSVファイルをAzure Blob Storageに出力する処理は、ItemWriterインタフェースを継承した以下のクラスで実装している。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 | package com.example.batch; import java.util.List; import org.springframework.batch.item.ItemWriter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import com.example.mybatis.model.UserData; import com.example.service.DemoBlobService; import com.example.util.DemoStringUtil; @Component public class DemoItemWriter implements ItemWriter<UserData> { /** BLOBへアクセスするサービス */ @Autowired private DemoBlobService demoBlobService; /** 取得したUSER_DATAテーブルのデータを出力するBLOB名 */ @Value("${blob.name}") private String blobName; /** 改行コード名 */ @Value("${line.sep.name}") private String lineSepName; /** CSV書き込み方式 */ @Value("${csv.write.method}") private String csvWriteMethod; /** * 読み込んだDBのデータを、BLOB(CSVファイル)に書き込む処理を定義する. */ @Override public void write(List<? extends UserData> items) { // 書き込みデータを格納する文字列 StringBuilder sb = new StringBuilder(); // 改行コードを変換 String lineSeparator = DemoStringUtil.getLineSepCode(lineSepName); // CSV書き込み方式が一括(ALL)の場合 if ("ALL".equals(csvWriteMethod)) { // ヘッダー行を、書き込みデータに追加 String header = "id,name,birth_year,birth_month,birth_day,sex,memo"; sb.append(header + lineSeparator); } // 読み込んだDBのデータ各行を、書き込みデータに追加 for(UserData item : items) { // ID sb.append(item.getId()); sb.append(","); // 名前 sb.append(DemoStringUtil.addDoubleQuote(item.getName())); sb.append(","); // 生年月日_年 sb.append(item.getBirth_year()); sb.append(","); // 生年月日_月 sb.append(item.getBirth_month()); sb.append(","); // 生年月日_日 sb.append(item.getBirth_day()); sb.append(","); // 性別 sb.append(DemoStringUtil.addDoubleQuote(item.getSex())); sb.append(","); // メモ sb.append(DemoStringUtil.addDoubleQuote(item.getMemo())); // 改行コード sb.append(lineSeparator); } // CSV書き込み方式に応じて、書き込みデータを、BLOB(CSVファイル)に書き込む if ("ALL".equals(csvWriteMethod)) { demoBlobService.writeBlockBlob(blobName, sb.toString()); } else { demoBlobService.writeAppendBlob(blobName, sb.toString()); } } } |
さらに、ItemWriterインタフェースを継承したクラスは、以下のSpring Batch定義クラスで呼び出している。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 | package com.example.batch; import org.apache.ibatis.session.SqlSessionFactory; import org.mybatis.spring.batch.MyBatisPagingItemReader; import org.mybatis.spring.batch.builder.MyBatisPagingItemReaderBuilder; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.launch.support.RunIdIncrementer; import org.springframework.batch.item.ItemWriter; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import com.example.mybatis.model.UserData; import lombok.RequiredArgsConstructor; @Configuration @EnableBatchProcessing @RequiredArgsConstructor public class DemoChunkConfig { /** ジョブ生成ファクトリ */ public final JobBuilderFactory jobBuilderFactory; /** ステップ生成ファクトリ */ public final StepBuilderFactory stepBuilderFactory; /** SQLセッションファクトリ */ public final SqlSessionFactory sqlSessionFactory; /** ステップの前後処理 */ private final DemoStepListener demoStepListener; /** データ書き込み処理 */ private final DemoItemWriter demoItemWriter; /** チャンクサイズ */ @Value("${chunk.size}") private String chunkSize; /** * MyBatisPagingItemReaderを使ってUSER_DATAデータをページを分けて取得する. * @return 読み込みオブジェクト */ @Bean public MyBatisPagingItemReader<UserData> reader(){ return new MyBatisPagingItemReaderBuilder<UserData>() .sqlSessionFactory(sqlSessionFactory) .queryId("com.example.mybatis.UserDataMapper.findAll") .pageSize(Integer.parseInt(chunkSize)) .build(); } /** * Spring Batchのジョブ内で指定する処理単位(ステップ)を定義する. * @param reader 読み込みオブジェクト * @param writer 書き込みオブジェクト * @return Spring Batchのジョブ内で指定する処理単位 */ @Bean public Step step(MyBatisPagingItemReader<UserData> reader , ItemWriter<UserData> writer) { // 生成するステップ内で読み込み/書き込みを一連の流れを指定する // その際、読み込みデータの加工は行わず、 // チャンクサイズ(=何件読み込む毎にコミットするか)を指定している return stepBuilderFactory.get("step") .<UserData, UserData>chunk(Integer.parseInt(chunkSize)) .reader(reader) .writer(demoItemWriter) .listener(demoStepListener) .build(); } /** * Spring Batchのジョブを定義する. * @param jobListener 実行前後の処理(リスナ) * @param step 実行するステップ * @return Spring Batchのジョブ */ @Bean public Job updateUserData(DemoJobListener jobListener, Step step) { // 生成するジョブ内で、実行前後の処理(リスナ)と処理単位(ステップ)を指定する return jobBuilderFactory .get("selectUserData") .incrementer(new RunIdIncrementer()) .listener(jobListener) .flow(step) .end() .build(); } } |
また、Spring Batchのジョブ内で指定する処理単位(ステップ)実行前後の処理を記載したクラスの内容は以下の通りで、Blob Storageのファイル削除やCSVファイルヘッダーの書き込みを行っている。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 | package com.example.batch; import org.springframework.batch.core.ExitStatus; import org.springframework.batch.core.StepExecution; import org.springframework.batch.core.StepExecutionListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import com.example.service.DemoBlobService; import com.example.util.DemoStringUtil; @Component public class DemoStepListener implements StepExecutionListener { /** BLOBへアクセスするサービス */ @Autowired private DemoBlobService demoBlobService; /** 取得したUSER_DATAテーブルのデータを出力するBLOB名 */ @Value("${blob.name}") private String blobName; /** 改行コード名 */ @Value("${line.sep.name}") private String lineSepName; /** CSV書き込み方式 */ @Value("${csv.write.method}") private String csvWriteMethod; /** * Spring Batchのジョブ内で指定する処理単位(ステップ)実行前の処理を定義する. */ @Override public void beforeStep(StepExecution stepExecution) { // Blobファイル(BlockBlob,追加Blob)があれば削除 demoBlobService.deleteBlockBlob(blobName); demoBlobService.deleteAppendBlob(blobName); // 改行コードを変換 String lineSeparator = DemoStringUtil.getLineSepCode(lineSepName); // CSV書き込み方式が分割(DIV)の場合 if ("DIV".equals(csvWriteMethod)) { // Blobファイルを作成後、ヘッダー行を出力 String header = "id,name,birth_year,birth_month,birth_day,sex,memo"; demoBlobService.writeAppendBlob(blobName, header + lineSeparator); } } /** * Spring Batchのジョブ内で指定する処理単位(ステップ)実行後の処理を定義する. */ @Override public ExitStatus afterStep(StepExecution stepExecution) { return ExitStatus.COMPLETED; } } |
さらに、Blob Storageにアクセスするクラスの内容は以下の通りで、Blob Storageへのファイルを書き込みや削除を行っている。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 | package com.example.service; import javax.annotation.PostConstruct; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import com.microsoft.azure.storage.CloudStorageAccount; import com.microsoft.azure.storage.blob.BlobRequestOptions; import com.microsoft.azure.storage.blob.CloudAppendBlob; import com.microsoft.azure.storage.blob.CloudBlobClient; import com.microsoft.azure.storage.blob.CloudBlobContainer; import com.microsoft.azure.storage.blob.CloudBlockBlob; import io.netty.util.internal.StringUtil; @Service public class DemoBlobService { /** Azure Storageのアカウント名 */ @Value("${azure.storage.accountName}") private String storageAccountName; /** Azure Storageへのアクセスキー */ @Value("${azure.storage.accessKey}") private String storageAccessKey; /** Azure StorageのBlobコンテナー名 */ @Value("${azure.storage.containerName}") private String storageContainerName; /** 文字コード */ @Value("${character.code}") private String characterCode; // Blobストレージへの接続文字列 private String storageConnectionString = null; /** * Blobストレージへの接続文字列の初期化処理 */ @PostConstruct public void init() { // Blobストレージへの接続文字列を設定 storageConnectionString = "DefaultEndpointsProtocol=https;" + "AccountName=" + storageAccountName + ";" + "AccountKey=" + storageAccessKey + ";"; } /** * 引数で指定されたファイル(BlockBlob)に、引数で指定されたメッセージを書き込む. * @param fileName ファイル名(BlockBlob) * @param message メッセージ */ public void writeBlockBlob(String fileName, String message) { // 未指定の項目がある場合、何もしない if(StringUtil.isNullOrEmpty(storageAccountName) || StringUtil.isNullOrEmpty(storageAccessKey) || StringUtil.isNullOrEmpty(storageContainerName) || StringUtil.isNullOrEmpty(fileName) || StringUtil.isNullOrEmpty(message)) { return; } // ファイルアップロード処理 // Blob内のコンテナー内の指定したファイル(BlockBlob)に、指定したメッセージを書き込む try { CloudBlockBlob cbb = getContainer().getBlockBlobReference(fileName); cbb.deleteIfExists(); BlobRequestOptions options = new BlobRequestOptions(); options.setAbsorbConditionalErrorsOnRetry(true); cbb.uploadText(message, characterCode, null, options, null); } catch (Exception ex) { throw new RuntimeException(ex); } } /** * 引数で指定されたBlob(BlockBlob)が存在する場合、削除する. * @param blobName Blob名(BlockBlob) */ public void deleteBlockBlob(String fileName) { // 未指定の項目がある場合、何もしない if(StringUtil.isNullOrEmpty(storageAccountName) || StringUtil.isNullOrEmpty(storageAccessKey) || StringUtil.isNullOrEmpty(storageContainerName) || StringUtil.isNullOrEmpty(fileName)) { return; } // Blob内のコンテナー内の指定したファイル(BlockBlob)が存在する場合、削除する try { CloudBlockBlob cbb = getContainer().getBlockBlobReference(fileName); cbb.deleteIfExists(); } catch (Exception ex) { throw new RuntimeException(ex); } } /** * 引数で指定されたファイル(追加Blob)に、引数で指定されたメッセージを追加する. * @param fileName ファイル名(追加Blob) * @param message メッセージ */ public void writeAppendBlob(String fileName, String message) { // 未指定の項目がある場合、何もしない if(StringUtil.isNullOrEmpty(storageAccountName) || StringUtil.isNullOrEmpty(storageAccessKey) || StringUtil.isNullOrEmpty(storageContainerName) || StringUtil.isNullOrEmpty(fileName) || StringUtil.isNullOrEmpty(message)) { return; } // ファイルアップロード処理 // Blob内のコンテナー内の指定したファイル(追加Blob)に、指定したメッセージを追記モードで書き込む try { CloudAppendBlob cab = getContainer().getAppendBlobReference(fileName); if (!cab.exists()) { cab.createOrReplace(); } BlobRequestOptions options = new BlobRequestOptions(); options.setAbsorbConditionalErrorsOnRetry(true); cab.appendText(message, characterCode, null, options, null); } catch (Exception ex) { throw new RuntimeException(ex); } } /** * 引数で指定されたファイル(追加Blob)が存在する場合、削除する. * @param fileName ファイル名(追加Blob) */ public void deleteAppendBlob(String fileName) { // 未指定の項目がある場合、何もしない if(StringUtil.isNullOrEmpty(storageAccountName) || StringUtil.isNullOrEmpty(storageAccessKey) || StringUtil.isNullOrEmpty(storageContainerName) || StringUtil.isNullOrEmpty(fileName)) { return; } // Blob内のコンテナー内の指定したファイル(追加Blob)が存在する場合、削除する try { CloudAppendBlob cab = getContainer().getAppendBlobReference(fileName); cab.deleteIfExists(); } catch (Exception ex) { throw new RuntimeException(ex); } } /** * Blobストレージのコンテナを取得する. * @return Blobストレージのコンテナ * @throws Exception */ private CloudBlobContainer getContainer() throws Exception { // ストレージアカウントオブジェクトを取得 CloudStorageAccount storageAccount = CloudStorageAccount.parse(storageConnectionString); // Blobクライアントオブジェクトを取得 CloudBlobClient blobClient = storageAccount.createCloudBlobClient(); // Blob内のコンテナーを取得 CloudBlobContainer container = blobClient.getContainerReference(storageContainerName); return container; } } |
また、application.propertiesに、CSVファイル出力設定を追加している。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | # Azure Storageの接続先 azure.storage.accountName=azureblobpurinit azure.storage.accessKey=(Azure Blob Storageのアクセスキー) azure.storage.blob-endpoint=https://azureblobpurinit.blob.core.windows.net azure.storage.containerName=blobcontainer # DB接続設定 spring.datasource.url=jdbc:sqlserver://azure-db-purinit.database.windows.net:1433;database=azureSqlDatabase spring.datasource.username=purinit@azure-db-purinit spring.datasource.password=(DBのパスワード) spring.datasource.driverClassName=com.microsoft.sqlserver.jdbc.SQLServerDriver ## ファイル出力設定 # チャンクサイズ chunk.size=300000 # BLOB名 blob.name=user_data.csv # 文字コード character.code=UTF-8 # 改行コード名(CR/LF/CRLF) line.sep.name=LF # CSV書き込み方式(ALL(一括)/DIV(分割)) csv.write.method=ALL |
その他のソースコード内容は、以下のサイトを参照のこと。
https://github.com/purin-it/azure/tree/master/azure-functions-db-to-csv-direct/demoAzureFunc
サンプルプログラムの実行結果
サンプルプログラムの実行結果は、下記記事の「サンプルプログラムの実行結果」と同じ結果となる。
ただし、CSV書き込み方式をDIV(分割)に設定した場合、以下のように、CSVファイルが、追加BLOBとして出力されることが確認できる。
また、CSV書き込み方式をALL(一括)に設定した場合、以下のように、CSVファイルが、ブロックBLOBとして出力されることが確認できる。
さらに、取得元のDB(Azure SQL Database上のUSER_DATAテーブル)に30万件のデータを入れた場合の性能は、以下の通り。
1) 下記記事の「テストデータ作成」に従って、30万件のデータを作成する。
2) application.propertiesを、以下のように、チャンクサイズ=10000、CSV書き込み方式=DIV(分割)に変更する。
1 2 3 4 5 6 7 8 9 10 11 | ## ファイル出力設定 # チャンクサイズ chunk.size=10000 # BLOB名 blob.name=user_data.csv # 文字コード character.code=UTF-8 # 改行コード名(CR/LF/CRLF) line.sep.name=LF # CSV書き込み方式(ALL(一括)/DIV(分割)) csv.write.method=DIV |
3) 以下のサイトの「サンプルプログラムの実行結果(ローカル)」「サンプルプログラムの実行結果(Azure上)」に記載の手順で、サンプルプログラムをAzure Functionsにデプロイする。
4) バッチ実行時のログ出力内容は以下の通りで、83902ms(約1分24秒)で処理が完了していることが確認できる。
なお、上記ログの確認手順は、以下のサイトを参照のこと。
なお、上記CPU・メモリ使用率の確認手順は、以下のサイトを参照のこと。
6) application.propertiesを、以下のように、チャンクサイズ=2300に変更し、サンプルプログラムをAzure Functionsにデプロイする。
1 2 3 4 5 6 7 8 9 10 11 | ## ファイル出力設定 # チャンクサイズ chunk.size=2300 # BLOB名 blob.name=user_data.csv # 文字コード character.code=UTF-8 # 改行コード名(CR/LF/CRLF) line.sep.name=LF # CSV書き込み方式(ALL(一括)/DIV(分割)) csv.write.method=DIV |
7) バッチ実行時のログ出力内容は以下の通りで、76713ms(約1分17秒)で処理が完了していることが確認できる。
9) application.propertiesを、以下のように、チャンクサイズ=300000、CSV書き込み方式=ALL(一括)に変更し、サンプルプログラムをAzure Functionsにデプロイする。
1 2 3 4 5 6 7 8 9 10 11 | ## ファイル出力設定 # チャンクサイズ chunk.size=300000 # BLOB名 blob.name=user_data.csv # 文字コード character.code=UTF-8 # 改行コード名(CR/LF/CRLF) line.sep.name=LF # CSV書き込み方式(ALL(一括)/DIV(分割)) csv.write.method=ALL |
10) バッチ実行時のログ出力内容は以下の通りで、68092ms(約1分8秒)で処理が完了していることが確認できる。
11) バッチ実行時のCPU・メモリ使用率は以下の通りで、メモリ使用率がかなり上昇していることが確認できる。
要点まとめ
- Spring BatchのChunkモデルを利用したバッチ処理でも、ItemWriterインタフェースを継承したクラスを利用することで、中間ファイルを利用せずに、DBのデータをAzure Blob StorageにCSVファイルを出力できる。