これまでこのブログでは、Blob上のCSVファイルをDBのテーブルに書き込む処理を作成していたが、逆に、DBのテーブルデータをBlob上のCSVファイルに出力することもできる。
今回は、Azure Function上でSpring BatchのChunkモデルを利用して、DBのテーブルデータをBlob上のCSVファイルに出力してみたので、そのサンプルプログラムを共有する。
前提条件
下記記事のサンプルプログラムを作成済であること。
また、取得元のDB(Azure SQL Database上のUSER_DATAテーブル)に、以下のデータが設定済であること。
さらに、Azure Functionsのオペレーティングシステムが、以下の赤枠のように、Linuxであること。
サンプルプログラムの作成
作成したサンプルプログラムの構成は、以下の通り。
なお、上記の赤枠は、今回追加・変更したプログラムである。
Spring Batchの定義クラスは以下の通りで、DBのデータをCSVファイルに出力する処理と、DemoStepListenerの呼出処理を追加している。なお、CSVファイルは、Azure Functions(Linux)上の、「/home/tmp_user_data.csv」に出力するようにしている。
package com.example.batch; import java.io.IOException; import java.io.Writer; import org.apache.ibatis.session.SqlSessionFactory; import org.mybatis.spring.batch.MyBatisPagingItemReader; import org.mybatis.spring.batch.builder.MyBatisPagingItemReaderBuilder; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.launch.support.RunIdIncrementer; import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.file.FlatFileHeaderCallback; import org.springframework.batch.item.file.FlatFileItemWriter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.FileSystemResource; import com.example.mybatis.model.UserData; import lombok.RequiredArgsConstructor; @Configuration @EnableBatchProcessing @RequiredArgsConstructor public class DemoChunkConfig { /** ジョブ生成ファクトリ */ public final JobBuilderFactory jobBuilderFactory; /** ステップ生成ファクトリ */ public final StepBuilderFactory stepBuilderFactory; /** SQLセッションファクトリ */ public final SqlSessionFactory sqlSessionFactory; /** ステップの前後処理 */ private final DemoStepListener demoStepListener; /** 取得したUSER_DATAテーブルのデータを出力するファイルパス */ private static final String TMP_FILE_PATH = "/home/"; /** 取得したUSER_DATAテーブルのデータを出力するファイル名 */ private static final String TMP_FILE_NAME = "tmp_user_data.csv"; /** 改行コード */ private static final String LINE_SEPARATOR = "\r\n"; /** 文字コード */ private static final String CHARCTER_CODE = "UTF-8"; /** チャンクサイズ */ private final int CHUNK_SIZE = 3; /** * MyBatisPagingItemReaderを使ってUSER_DATAデータをページを分けて取得する. * @return 読み込みオブジェクト */ @Bean public MyBatisPagingItemReader<UserData> reader(){ return new MyBatisPagingItemReaderBuilder<UserData>() .sqlSessionFactory(sqlSessionFactory) .queryId("com.example.mybatis.UserDataMapper.findAll") .pageSize(CHUNK_SIZE) .build(); } /** * 読み込んだDBのデータを、ローカルのファイルに書き込む. * @return 書き込みオブジェクト */ @Bean public FlatFileItemWriter<UserData> writer(){ FlatFileItemWriter<UserData> writer = new FlatFileItemWriter<UserData>(); // ファイルの出力先を指定 writer.setResource(new FileSystemResource(TMP_FILE_PATH + TMP_FILE_NAME)); // ファイルを書き込む際の文字コード・改行コードを指定 writer.setEncoding(CHARCTER_CODE); writer.setLineSeparator(LINE_SEPARATOR); // ファイルを追記書き込みNGとする writer.setAppendAllowed(false); // ファイルに書き込む // ヘッダー行を定義する writer.setHeaderCallback(new FlatFileHeaderCallback() { public void writeHeader(Writer ioWriter) throws IOException { ioWriter.append("id,name,birth_year,birth_month,birth_day,sex,memo"); } }); // 書き込む文字列は、DemoLineAggregatorクラスで設定 writer.setLineAggregator(new DemoLineAggregator()); return writer; } /** * Spring Batchのジョブ内で指定する処理単位(ステップ)を定義する. * @param reader 読み込みオブジェクト * @param writer 書き込みオブジェクト * @return Spring Batchのジョブ内で指定する処理単位 */ @Bean public Step step(MyBatisPagingItemReader<UserData> reader , ItemWriter<UserData> writer) { // 生成するステップ内で読み込み/書き込みを一連の流れを指定する // その際、読み込みデータの加工は行わず、 // チャンクサイズ(=何件読み込む毎にコミットするか)を指定している return stepBuilderFactory.get("step") .<UserData, UserData>chunk(CHUNK_SIZE) .reader(reader) .writer(writer) .listener(demoStepListener) .build(); } /** * Spring Batchのジョブを定義する. * @param jobListener 実行前後の処理(リスナ) * @param step 実行するステップ * @return Spring Batchのジョブ */ @Bean public Job updateUserData(DemoJobListener jobListener, Step step) { // 生成するジョブ内で、実行前後の処理(リスナ)と処理単位(ステップ)を指定する return jobBuilderFactory .get("selectUserData") .incrementer(new RunIdIncrementer()) .listener(jobListener) .flow(step) .end() .build(); } }
また、CSVファイルに書き込むデータ内容を定義したクラスの内容は、以下の通り。
package com.example.batch; import org.springframework.batch.item.file.transform.LineAggregator; import com.example.mybatis.model.UserData; import com.example.util.DemoStringUtil; public class DemoLineAggregator implements LineAggregator<UserData>{ /** * 書き込みデータ(1行分)の設定内容を指定する. */ @Override public String aggregate(UserData userData) { StringBuilder sb = new StringBuilder(); // ID sb.append(userData.getId()); sb.append(","); // 名前 sb.append(DemoStringUtil.addDoubleQuote(userData.getName())); sb.append(","); // 生年月日_年 sb.append(userData.getBirth_year()); sb.append(","); // 生年月日_月 sb.append(userData.getBirth_month()); sb.append(","); // 生年月日_日 sb.append(userData.getBirth_day()); sb.append(","); // 性別 sb.append(DemoStringUtil.addDoubleQuote(userData.getSex())); sb.append(","); // メモ sb.append(DemoStringUtil.addDoubleQuote(userData.getMemo())); return sb.toString(); } }
さらに、Spring Batchのジョブ内で指定する処理単位(ステップ)実行前後の処理を記載したクラスの内容は以下の通りで、ファイルを削除したり、Azure Functions(Linux)に配置したCSVファイルをBlob Storageにアップロードする処理を追加している。
package com.example.batch; import java.io.File; import org.springframework.batch.core.ExitStatus; import org.springframework.batch.core.StepExecution; import org.springframework.batch.core.StepExecutionListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import com.example.service.DemoBlobService; @Component public class DemoStepListener implements StepExecutionListener { /** BLOBへアクセスするサービス */ @Autowired private DemoBlobService demoBlobService; /** 取得したUSER_DATAテーブルのデータを出力するファイルパス */ private static final String TMP_FILE_PATH = "/home/"; /** 取得したUSER_DATAテーブルのデータを出力するファイル名 */ private static final String TMP_FILE_NAME = "tmp_user_data.csv"; /** 取得したUSER_DATAテーブルのデータを出力するBLOB名 */ private static final String BLOB_NAME = "user_data.csv"; /** * Spring Batchのジョブ内で指定する処理単位(ステップ)実行前の処理を定義する. */ @Override public void beforeStep(StepExecution stepExecution) { // Blobファイルを削除 demoBlobService.deleteBlockBlob(BLOB_NAME); // ローカルファイルがあれば削除 // (後続のafterStepメソッドでの削除ができないため、ここで削除) File tmpFile = new File(TMP_FILE_PATH + TMP_FILE_NAME); if(tmpFile.exists()) { tmpFile.delete(); } } /** * Spring Batchのジョブ内で指定する処理単位(ステップ)実行後の処理を定義する. */ @Override public ExitStatus afterStep(StepExecution stepExecution) { // ローカルファイルをBlobにアップロード demoBlobService.uploadBlobFromFile( TMP_FILE_PATH + TMP_FILE_NAME, BLOB_NAME); return ExitStatus.COMPLETED; } }
また、BlobStorageにアクセスするクラスの内容は以下の通りで、Blob Storageへのファイルをアップロードや削除処理を追加している。
package com.example.service; import javax.annotation.PostConstruct; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import com.microsoft.azure.storage.CloudStorageAccount; import com.microsoft.azure.storage.blob.CloudBlobClient; import com.microsoft.azure.storage.blob.CloudBlobContainer; import com.microsoft.azure.storage.blob.CloudBlockBlob; import io.netty.util.internal.StringUtil; @Service public class DemoBlobService { /** Azure Storageのアカウント名 */ @Value("${azure.storage.accountName}") private String storageAccountName; /** Azure Storageへのアクセスキー */ @Value("${azure.storage.accessKey}") private String storageAccessKey; /** Azure StorageのBlobコンテナー名 */ @Value("${azure.storage.containerName}") private String storageContainerName; // Blobストレージへの接続文字列 private String storageConnectionString = null; /** * Blobストレージへの接続文字列の初期化処理 */ @PostConstruct public void init() { // Blobストレージへの接続文字列を設定 storageConnectionString = "DefaultEndpointsProtocol=https;" + "AccountName=" + storageAccountName + ";" + "AccountKey=" + storageAccessKey + ";"; } /** * 引数で指定されたBlobが存在する場合、削除する. * @param blobName Blob名 */ public void deleteBlockBlob(String blobName) { // 未指定の項目がある場合、何もしない if(StringUtil.isNullOrEmpty(storageAccountName) || StringUtil.isNullOrEmpty(storageAccessKey) || StringUtil.isNullOrEmpty(storageContainerName) || StringUtil.isNullOrEmpty(blobName)) { return; } // Blob内のコンテナー内の指定したファイルが存在する場合、削除する try { CloudBlockBlob cbb = getContainer().getBlockBlobReference(blobName); cbb.deleteIfExists(); } catch (Exception ex) { throw new RuntimeException(ex); } } /** * 引数で指定したローカルファイルを、引数で指定されたBlobにアップロードする. * @param localFile ローカルファイル * @param blobName Blob名 */ public void uploadBlobFromFile(String localFile, String blobName) { // 未指定の項目がある場合、何もしない if(StringUtil.isNullOrEmpty(storageAccountName) || StringUtil.isNullOrEmpty(storageAccessKey) || StringUtil.isNullOrEmpty(storageContainerName) || StringUtil.isNullOrEmpty(localFile) || StringUtil.isNullOrEmpty(blobName)) { return; } try { CloudBlockBlob cbb = getContainer().getBlockBlobReference(blobName); cbb.uploadFromFile(localFile); } catch (Exception ex) { throw new RuntimeException(ex); } } /** * Blobストレージのコンテナを取得する. * @return Blobストレージのコンテナ * @throws Exception */ private CloudBlobContainer getContainer() throws Exception { // ストレージアカウントオブジェクトを取得 CloudStorageAccount storageAccount = CloudStorageAccount.parse(storageConnectionString); // Blobクライアントオブジェクトを取得 CloudBlobClient blobClient = storageAccount.createCloudBlobClient(); // Blob内のコンテナーを取得 CloudBlobContainer container = blobClient.getContainerReference(storageContainerName); return container; } }
さらに、DBのデータを取得する処理は、以下のMapperで実行している。
package com.example.mybatis; import java.util.List; import org.apache.ibatis.annotations.Mapper; import com.example.mybatis.model.UserData; @Mapper public interface UserDataMapper { /** * USER_DATAテーブルのデータを全件取得する. * @return USER_DATAテーブルの全データ */ List<UserData> findAll(); }
<?xml version="1.0" encoding="UTF-8" ?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> <mapper namespace="com.example.mybatis.UserDataMapper"> <select id="findAll" resultType="com.example.mybatis.model.UserData"> SELECT id, name, birth_year, birth_month, birth_day, sex, memo FROM USER_DATA <!-- 何件取得するか指定するために、ソート順を指定後に、OFFSET, FETCH NEXTを指定 --> ORDER BY id ASC OFFSET #{_skiprows} ROWS FETCH NEXT #{_pagesize} ROWS ONLY </select> </mapper>
なお、#{_skiprows}・#{_pagesize}は、MyBatis を利用してデータベースからページ単位でレコードを読み出す、MyBatisPagingItemReaderで利用する変数となる。詳細は以下のサイトを参照のこと。
http://mybatis.org/spring/ja/batch.html
また、文字列の編集処理を定義したクラスの内容は、以下の通り。
package com.example.util; import java.time.LocalDate; import java.time.format.DateTimeFormatter; import java.time.format.ResolverStyle; import org.apache.commons.lang3.StringUtils; public class DemoStringUtil { /** * DateTimeFormatterを利用して日付チェックを行う. * @param dateStr チェック対象文字列 * @param dateFormat 日付フォーマット * @return 日付チェック結果 */ public static boolean isCorrectDate(String dateStr, String dateFormat) { if (StringUtils.isEmpty(dateStr) || StringUtils.isEmpty(dateFormat)) { return false; } // 日付と時刻を厳密に解決するスタイルで、DateTimeFormatterオブジェクトを作成 DateTimeFormatter df = DateTimeFormatter.ofPattern(dateFormat) .withResolverStyle(ResolverStyle.STRICT); try { // チェック対象文字列をLocalDate型の日付に変換できれば、チェックOKとする LocalDate.parse(dateStr, df); return true; } catch (Exception e) { return false; } } /** * 数値文字列が1桁の場合、頭に0を付けて返す. * @param intNum 数値文字列 * @return 変換後数値文字列 */ public static String addZero(String intNum) { if (StringUtils.isEmpty(intNum)) { return intNum; } if (intNum.length() == 1) { return "0" + intNum; } return intNum; } /** * 引数の文字列の前後にダブルクォートを追加する. * ただし、引数の文字列がnullの場合は空文字を返却する. * @param str 任意の文字列 * @return 変換後文字列 */ public static String addDoubleQuote(String str) { String retStr = ""; if(str != null) { retStr = "\"" + str + "\""; } return retStr; } }
その他のソースコード内容は、以下のサイトを参照のこと。
https://github.com/purin-it/azure/tree/master/azure-functions-db-to-csv/demoAzureFunc
サンプルプログラムの実行結果
サンプルプログラムの実行結果は、以下の通り。
1) 以下のサイトの「サンプルプログラムの実行結果(ローカル)」「サンプルプログラムの実行結果(Azure上)」に記載の手順で、サンプルプログラムをAzure Functionsにデプロイする。
2) バッチ実行後に、Blob Storageに、以下のCSVファイルが出力されていることが確認できる。Blob Storageからファイルをダウンロードするには、一覧のファイルを選択する。
3) ファイルをダウンロードするには、「ダウンロード」ボタンを押下する。
4) 実際にダウンロードされたファイルは、以下の通りで、USER_DATAテーブルのデータ全件が取得できていることが確認できる。
なお、上記ログの確認手順は、以下のサイトを参照のこと。
要点まとめ
- Azure Function上でSpring BatchのChunkモデルを利用すると、Blob上のCSVファイルをDBのテーブルに書き込むだけでなく、DBのテーブルデータをBlob上のCSVファイルに出力することもできる。