TimerTrigger/SpringBatch

Azure Function上でCSVファイルのデータをDBに取り込むプログラムで楽観ロックを使ってみた

これまでこのブログで、Spring BatchのChunkモデルを用いて、Blob上のCSVファイルをDBのテーブルに書き込む処理を作成していたが、DBのテーブルに書き込む際に、楽観ロックを利用することもできる。

今回は、Blob上のCSVファイルをDBのテーブルに書き込む処理で、楽観ロックを利用してみたので、そのサンプルプログラムを共有する。

なお、楽観ロックを含む排他制御については、以下のサイトを参照のこと。
https://qiita.com/NagaokaKenichi/items/73040df85b7bd4e9ecfc

前提条件

下記記事のサンプルプログラムを作成済であること。

Azure Function上でSpring BatchのChunkモデルで動作するプログラムにListenerで処理を追加してみたSpring BatchのChunkモデルは、以下のサイトに記載の通り、ファイルの読み込み/データの加工/DBへの書き込みといった処理の...

書き込み先テーブルのカラム追加

書き込み対象となるテーブルであるUSER_DATAテーブルに、楽観ロックを行うためのカラム「VERSION」を追加する。その手順は、以下の通り。

1) VERSIONを追加する前のUSER_DATAテーブルの構成は、以下の通り。

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
SELECT * FROM dbo.USER_DATA ORDER BY ID ASC
SELECT * FROM dbo.USER_DATA ORDER BY ID ASC
SELECT * FROM dbo.USER_DATA ORDER BY ID ASC
カラム追加_1

2) 以下のSQLを実行し、楽観ロックを行うためのカラム「VERSION」を追加する。

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
ALTER TABLE dbo.USER_DATA ADD VERSION INT NOT NULL DEFAULT(0)
ALTER TABLE dbo.USER_DATA ADD VERSION INT NOT NULL DEFAULT(0)
ALTER TABLE dbo.USER_DATA ADD VERSION INT NOT NULL DEFAULT(0)
カラム追加_2

3) VERSIONを追加した後のUSER_DATAテーブルの構成は、以下の通り。

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
SELECT * FROM dbo.USER_DATA ORDER BY ID ASC
SELECT * FROM dbo.USER_DATA ORDER BY ID ASC
SELECT * FROM dbo.USER_DATA ORDER BY ID ASC
カラム追加_3



「Envader」はLinuxコマンドやDatabase SQL等のスキルを、環境構築不要で習得できる学習サイトだった「Envader」は、ITエンジニアとしてよく使うLinuxコマンドやDatabase SQL等のスキルを、解説を読んだ上で、問題を解き...

サンプルプログラムの作成

作成したサンプルプログラムの構成は、以下の通り。
サンプルプログラムの構成
なお、上記の赤枠は、今回追加・変更したプログラムである。

データ加工前後の処理は以下の通りで、データ加工後に、USER_DATAテーブルのデータ取得/更新処理を追加している。

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
package com.example.batch;
import org.springframework.batch.core.ItemProcessListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import com.example.mybatis.UserDataMapper;
import com.example.mybatis.model.UserData;
@Component
public class DemoProcessorListener implements ItemProcessListener<UserData, UserData> {
@Autowired
private UserDataMapper userDataMapper;
/**
* データ加工前の処理を定義する.
*/
@Override
public void beforeProcess(UserData item) {
// 何もしない
}
/**
* データ加工後の処理を定義する.
*/
@Override
@Transactional
public void afterProcess(UserData item, UserData result) {
// 処理結果がnullの場合は、処理を終了する
if(result == null) {
return;
}
// 既に登録済データのVERSIONを取得する
Integer id = Integer.parseInt(result.getId());
UserData ud = userDataMapper.findByIdRowLock(id);
// バージョンの値を結果に設定する
if(ud != null) {
result.setVersion(ud.getVersion());
} else {
result.setVersion(0);
}
// id=8の場合、バージョンを更新し、楽観ロックエラーにする
if(id == 8) {
userDataMapper.updateVersion(result);
}
}
/**
* データ編集エラー後の処理を定義する.
*/
@Override
public void onProcessError(UserData item, Exception e) {
// 何もしない
}
}
package com.example.batch; import org.springframework.batch.core.ItemProcessListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; import com.example.mybatis.UserDataMapper; import com.example.mybatis.model.UserData; @Component public class DemoProcessorListener implements ItemProcessListener<UserData, UserData> { @Autowired private UserDataMapper userDataMapper; /** * データ加工前の処理を定義する. */ @Override public void beforeProcess(UserData item) { // 何もしない } /** * データ加工後の処理を定義する. */ @Override @Transactional public void afterProcess(UserData item, UserData result) { // 処理結果がnullの場合は、処理を終了する if(result == null) { return; } // 既に登録済データのVERSIONを取得する Integer id = Integer.parseInt(result.getId()); UserData ud = userDataMapper.findByIdRowLock(id); // バージョンの値を結果に設定する if(ud != null) { result.setVersion(ud.getVersion()); } else { result.setVersion(0); } // id=8の場合、バージョンを更新し、楽観ロックエラーにする if(id == 8) { userDataMapper.updateVersion(result); } } /** * データ編集エラー後の処理を定義する. */ @Override public void onProcessError(UserData item, Exception e) { // 何もしない } }
package com.example.batch;

import org.springframework.batch.core.ItemProcessListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import com.example.mybatis.UserDataMapper;
import com.example.mybatis.model.UserData;

@Component
public class DemoProcessorListener implements ItemProcessListener<UserData, UserData> {

  @Autowired
  private UserDataMapper userDataMapper;
  
  /**
   * データ加工前の処理を定義する.
   */
  @Override
  public void beforeProcess(UserData item) {
    // 何もしない  
  }

  /**
   * データ加工後の処理を定義する.
   */
  @Override
  @Transactional
  public void afterProcess(UserData item, UserData result) {
    // 処理結果がnullの場合は、処理を終了する
    if(result == null) {
      return;
    }
    
    // 既に登録済データのVERSIONを取得する
    Integer id = Integer.parseInt(result.getId());
    UserData ud = userDataMapper.findByIdRowLock(id);
    
    // バージョンの値を結果に設定する
    if(ud != null) {
      result.setVersion(ud.getVersion());
    } else {
      result.setVersion(0);
    }
    
    // id=8の場合、バージョンを更新し、楽観ロックエラーにする
    if(id == 8) {
      userDataMapper.updateVersion(result);
    }
  }

  /**
   * データ編集エラー後の処理を定義する.
   */
  @Override
  public void onProcessError(UserData item, Exception e) {
    // 何もしない
  }

}

また、USER_DATAテーブルのエンティティクラスの内容は以下の通りで、カラム「VERSION」を追加している。

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
package com.example.mybatis.model;
import lombok.Data;
@Data
public class UserData {
/** ID */
private String id;
/** 名前 */
private String name;
/** 生年月日_年 */
private String birth_year;
/** 生年月日_月 */
private String birth_month;
/** 生年月日_日 */
private String birth_day;
/** 性別 */
private String sex;
/** メモ */
private String memo;
/** バージョン */
private Integer version;
}
package com.example.mybatis.model; import lombok.Data; @Data public class UserData { /** ID */ private String id; /** 名前 */ private String name; /** 生年月日_年 */ private String birth_year; /** 生年月日_月 */ private String birth_month; /** 生年月日_日 */ private String birth_day; /** 性別 */ private String sex; /** メモ */ private String memo; /** バージョン */ private Integer version; }
package com.example.mybatis.model;

import lombok.Data;

@Data
public class UserData {

  /** ID */
  private String id;
  
  /** 名前 */
  private String name;
  
  /** 生年月日_年 */
  private String birth_year;
  
  /** 生年月日_月 */
  private String birth_month;
  
  /** 生年月日_日 */
  private String birth_day;
  
  /** 性別 */
  private String sex;
  
  /** メモ */
  private String memo;
  
  /** バージョン */
  private Integer version;
  
}

さらに、USER_DATAテーブルのMapperインタフェース・XMLファイルの内容は以下の通りで、USER_DATAテーブルのデータ取得/更新処理を追加している。

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
package com.example.mybatis;
import org.apache.ibatis.annotations.Mapper;
import com.example.mybatis.model.UserData;
@Mapper
public interface UserDataMapper {
/**
* DBにUserDataオブジェクトがあれば更新し、なければ追加する.
* @param userData UserDataオブジェクト
*/
void upsert(UserData userData);
/**
* 引数のIDをキーにもつUserDataオブジェクトを行ロックをかけ取得する.
* @param id ID
* @return UserDataオブジェクト
*/
UserData findByIdRowLock(Integer id);
/**
* 引数のUserDataオブジェクトのバージョンを更新する.
* @param userData UserDataオブジェクト
*/
void updateVersion(UserData userData);
}
package com.example.mybatis; import org.apache.ibatis.annotations.Mapper; import com.example.mybatis.model.UserData; @Mapper public interface UserDataMapper { /** * DBにUserDataオブジェクトがあれば更新し、なければ追加する. * @param userData UserDataオブジェクト */ void upsert(UserData userData); /** * 引数のIDをキーにもつUserDataオブジェクトを行ロックをかけ取得する. * @param id ID * @return UserDataオブジェクト */ UserData findByIdRowLock(Integer id); /** * 引数のUserDataオブジェクトのバージョンを更新する. * @param userData UserDataオブジェクト */ void updateVersion(UserData userData); }
package com.example.mybatis;

import org.apache.ibatis.annotations.Mapper;
import com.example.mybatis.model.UserData;

@Mapper
public interface UserDataMapper {

  /**
   * DBにUserDataオブジェクトがあれば更新し、なければ追加する.
   * @param userData UserDataオブジェクト
   */
  void upsert(UserData userData);

  /**
   * 引数のIDをキーにもつUserDataオブジェクトを行ロックをかけ取得する.
   * @param id ID
   * @return UserDataオブジェクト
   */
  UserData findByIdRowLock(Integer id);
  
  /**
   * 引数のUserDataオブジェクトのバージョンを更新する.
   * @param userData UserDataオブジェクト
   */
  void updateVersion(UserData userData);
}
Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.example.mybatis.UserDataMapper">
<update id="upsert" parameterType="com.example.mybatis.model.UserData">
MERGE INTO USER_DATA AS u
USING ( SELECT #{id} id, #{version} version ) s
ON ( u.id = s.id )
WHEN MATCHED AND u.version = s.version THEN
UPDATE SET name = #{name}, birth_year = #{birth_year}
, birth_month = #{birth_month}, birth_day = #{birth_day}, sex = #{sex}
, memo = #{memo}, version = #{version} + 1
WHEN NOT MATCHED THEN
INSERT ( id, name, birth_year, birth_month, birth_day, sex, memo, version )
VALUES (#{id}, #{name}, #{birth_year}, #{birth_month}, #{birth_day}
, #{sex}, #{memo}, #{version})
;
</update>
<select id="findByIdRowLock" parameterType="java.lang.Integer"
resultType="com.example.mybatis.model.UserData">
SELECT id, name, birth_year, birth_month, birth_day, sex, memo, version
FROM USER_DATA WITH(ROWLOCK, UPDLOCK, NOWAIT) <!-- SQL Databaseで行ロックをかける -->
WHERE id = #{id}
</select>
<update id="updateVersion" parameterType="com.example.mybatis.model.UserData">
UPDATE USER_DATA SET version = #{version} + 2 WHERE id = #{id}
</update>
</mapper>
<?xml version="1.0" encoding="UTF-8" ?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> <mapper namespace="com.example.mybatis.UserDataMapper"> <update id="upsert" parameterType="com.example.mybatis.model.UserData"> MERGE INTO USER_DATA AS u USING ( SELECT #{id} id, #{version} version ) s ON ( u.id = s.id ) WHEN MATCHED AND u.version = s.version THEN UPDATE SET name = #{name}, birth_year = #{birth_year} , birth_month = #{birth_month}, birth_day = #{birth_day}, sex = #{sex} , memo = #{memo}, version = #{version} + 1 WHEN NOT MATCHED THEN INSERT ( id, name, birth_year, birth_month, birth_day, sex, memo, version ) VALUES (#{id}, #{name}, #{birth_year}, #{birth_month}, #{birth_day} , #{sex}, #{memo}, #{version}) ; </update> <select id="findByIdRowLock" parameterType="java.lang.Integer" resultType="com.example.mybatis.model.UserData"> SELECT id, name, birth_year, birth_month, birth_day, sex, memo, version FROM USER_DATA WITH(ROWLOCK, UPDLOCK, NOWAIT) <!-- SQL Databaseで行ロックをかける --> WHERE id = #{id} </select> <update id="updateVersion" parameterType="com.example.mybatis.model.UserData"> UPDATE USER_DATA SET version = #{version} + 2 WHERE id = #{id} </update> </mapper>
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
    PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
    "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.example.mybatis.UserDataMapper">
  <update id="upsert" parameterType="com.example.mybatis.model.UserData">
    MERGE INTO USER_DATA AS u 
    USING ( SELECT #{id} id, #{version} version ) s
    ON ( u.id = s.id ) 
    WHEN MATCHED AND u.version = s.version THEN
       UPDATE SET name = #{name}, birth_year = #{birth_year}
         , birth_month = #{birth_month}, birth_day = #{birth_day}, sex = #{sex}
         , memo = #{memo}, version = #{version} + 1
    WHEN NOT MATCHED THEN
       INSERT ( id, name, birth_year, birth_month, birth_day, sex, memo, version ) 
       VALUES (#{id}, #{name}, #{birth_year}, #{birth_month}, #{birth_day}
             , #{sex}, #{memo}, #{version})
    ;
  </update>
  <select id="findByIdRowLock" parameterType="java.lang.Integer" 
      resultType="com.example.mybatis.model.UserData">
    SELECT id, name, birth_year, birth_month, birth_day, sex, memo, version 
    FROM USER_DATA WITH(ROWLOCK, UPDLOCK, NOWAIT) <!-- SQL Databaseで行ロックをかける -->
    WHERE id = #{id} 
  </select>
  <update id="updateVersion" parameterType="com.example.mybatis.model.UserData">
    UPDATE USER_DATA SET version = #{version} + 2 WHERE id = #{id}
  </update>
</mapper>

また、プロパティファイルの内容は以下の通りで、USER_DATAテーブルのデータ取得/更新処理を追加することにより発生するエラーを回避するために、バッチモードで処理できるようにしている。

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
# Azure Storageの接続先
azure.storage.accountName=azureblobpurinit
azure.storage.accessKey=(Azure Blob Storageのアクセスキー)
azure.storage.blob-endpoint=https://azureblobpurinit.blob.core.windows.net
azure.storage.containerName=blobcontainer
# DB接続設定
spring.datasource.url=jdbc:sqlserver://azure-db-purinit.database.windows.net:1433;database=azureSqlDatabase
spring.datasource.username=purinit@azure-db-purinit
spring.datasource.password=(DBのパスワード)
spring.datasource.driverClassName=com.microsoft.sqlserver.jdbc.SQLServerDriver
# MyBatisでバッチモードで処理できるよう設定を変更
mybatis.executor-type=BATCH
mybatis.configuration.default-executor-type=BATCH
# Azure Storageの接続先 azure.storage.accountName=azureblobpurinit azure.storage.accessKey=(Azure Blob Storageのアクセスキー) azure.storage.blob-endpoint=https://azureblobpurinit.blob.core.windows.net azure.storage.containerName=blobcontainer # DB接続設定 spring.datasource.url=jdbc:sqlserver://azure-db-purinit.database.windows.net:1433;database=azureSqlDatabase spring.datasource.username=purinit@azure-db-purinit spring.datasource.password=(DBのパスワード) spring.datasource.driverClassName=com.microsoft.sqlserver.jdbc.SQLServerDriver # MyBatisでバッチモードで処理できるよう設定を変更 mybatis.executor-type=BATCH mybatis.configuration.default-executor-type=BATCH
# Azure Storageの接続先
azure.storage.accountName=azureblobpurinit
azure.storage.accessKey=(Azure Blob Storageのアクセスキー)
azure.storage.blob-endpoint=https://azureblobpurinit.blob.core.windows.net
azure.storage.containerName=blobcontainer

# DB接続設定
spring.datasource.url=jdbc:sqlserver://azure-db-purinit.database.windows.net:1433;database=azureSqlDatabase
spring.datasource.username=purinit@azure-db-purinit
spring.datasource.password=(DBのパスワード)
spring.datasource.driverClassName=com.microsoft.sqlserver.jdbc.SQLServerDriver

# MyBatisでバッチモードで処理できるよう設定を変更
mybatis.executor-type=BATCH
mybatis.configuration.default-executor-type=BATCH

さらに、Spring Batchの定義クラスは以下の通りで、DemoProcessorListenerの呼出処理と、writerメソッドに楽観ロックエラーを有効にする設定を追加している。

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
package com.example.batch;
import java.net.MalformedURLException;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.batch.MyBatisBatchItemWriter;
import org.mybatis.spring.batch.builder.MyBatisBatchItemWriterBuilder;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.UrlResource;
import com.example.mybatis.model.UserData;
import com.example.service.DemoBlobService;
import lombok.RequiredArgsConstructor;
@Configuration
@EnableBatchProcessing
@RequiredArgsConstructor
public class DemoChunkConfig {
/** ジョブ生成ファクトリ */
public final JobBuilderFactory jobBuilderFactory;
/** ステップ生成ファクトリ */
public final StepBuilderFactory stepBuilderFactory;
/** SQLセッションファクトリ */
public final SqlSessionFactory sqlSessionFactory;
/** データ加工処理 */
private final DemoUpdateProcessor demoUpdateProcessor;
/** データ加工前後処理 */
private final DemoProcessorListener demoProcessorListener;
/** データ書き込み前後処理 */
private final DemoUpdateListener demoUpdateListener;
/** ステップの前後処理 */
private final DemoStepListener demoStepListener;
/** BLOBへアクセスするサービス */
@Autowired
private DemoBlobService demoBlobService;
/**
* BlobStorageからファイルを読み込む.
* @return 読み込みオブジェクト
*/
@Bean
public FlatFileItemReader<UserData> reader() {
FlatFileItemReader<UserData> reader = new FlatFileItemReader<UserData>();
try {
// BlobStorageからファイル(user_data.csv)を読み込む際のURLをreaderに設定
String url = demoBlobService.getBlobUrl("user_data.csv");
reader.setResource(new UrlResource(url));
} catch (MalformedURLException ex) {
throw new RuntimeException(ex);
}
// ファイルを読み込む際の文字コード
reader.setEncoding("UTF-8");
// 1行目を読み飛ばす
reader.setLinesToSkip(1);
// ファイルから読み込んだデータをUserDataオブジェクトに格納
reader.setLineMapper(new DefaultLineMapper<UserData>() {
{
setLineTokenizer(new DelimitedLineTokenizer() {
{
setNames(new String[] { "id", "name", "birth_year"
, "birth_month", "birth_day", "sex", "memo" });
}
});
setFieldSetMapper(new BeanWrapperFieldSetMapper<UserData>() {
{
setTargetType(UserData.class);
}
});
}
});
return reader;
}
/**
* 読み込んだファイルのデータを、DBに書き込む.
* @return 書き込みオブジェクト
*/
@Bean
public MyBatisBatchItemWriter<UserData> writer() {
return new MyBatisBatchItemWriterBuilder<UserData>()
.sqlSessionFactory(sqlSessionFactory)
.statementId("com.example.mybatis.UserDataMapper.upsert")
.assertUpdates(true) // 楽観ロックエラーを有効にする
.build();
}
/**
* Spring Batchのジョブ内で指定する処理単位(ステップ)を定義する.
* @param reader 読み込みオブジェクト
* @param writer 書き込みオブジェクト
* @return Spring Batchのジョブ内で指定する処理単位
*/
@Bean
public Step step(ItemReader<UserData> reader, ItemWriter<UserData> writer) {
// 生成するステップ内で読み込み/加工/加工後/書き込み/書き込み後/ステップ実行前後の処理の
// 一連の流れを指定する
// その際のチャンクサイズ(=何件読み込む毎にコミットするか)を3に指定している
return stepBuilderFactory.get("step")
.<UserData, UserData>chunk(3)
.reader(reader)
.processor(demoUpdateProcessor)
.listener(demoProcessorListener)
.writer(writer)
.listener(demoUpdateListener)
.listener(demoStepListener)
.build();
}
/**
* Spring Batchのジョブを定義する.
* @param jobListener 実行前後の処理(リスナ)
* @param step 実行するステップ
* @return Spring Batchのジョブ
*/
@Bean
public Job updateUserData(DemoJobListener jobListener, Step step) {
// 生成するジョブ内で、実行前後の処理(リスナ)と処理単位(ステップ)を指定する
return jobBuilderFactory
.get("updateUserData")
.incrementer(new RunIdIncrementer())
.listener(jobListener)
.flow(step)
.end()
.build();
}
}
package com.example.batch; import java.net.MalformedURLException; import org.apache.ibatis.session.SqlSessionFactory; import org.mybatis.spring.batch.MyBatisBatchItemWriter; import org.mybatis.spring.batch.builder.MyBatisBatchItemWriterBuilder; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.launch.support.RunIdIncrementer; import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.file.FlatFileItemReader; import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper; import org.springframework.batch.item.file.mapping.DefaultLineMapper; import org.springframework.batch.item.file.transform.DelimitedLineTokenizer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.UrlResource; import com.example.mybatis.model.UserData; import com.example.service.DemoBlobService; import lombok.RequiredArgsConstructor; @Configuration @EnableBatchProcessing @RequiredArgsConstructor public class DemoChunkConfig { /** ジョブ生成ファクトリ */ public final JobBuilderFactory jobBuilderFactory; /** ステップ生成ファクトリ */ public final StepBuilderFactory stepBuilderFactory; /** SQLセッションファクトリ */ public final SqlSessionFactory sqlSessionFactory; /** データ加工処理 */ private final DemoUpdateProcessor demoUpdateProcessor; /** データ加工前後処理 */ private final DemoProcessorListener demoProcessorListener; /** データ書き込み前後処理 */ private final DemoUpdateListener demoUpdateListener; /** ステップの前後処理 */ private final DemoStepListener demoStepListener; /** BLOBへアクセスするサービス */ @Autowired private DemoBlobService demoBlobService; /** * BlobStorageからファイルを読み込む. * @return 読み込みオブジェクト */ @Bean public FlatFileItemReader<UserData> reader() { FlatFileItemReader<UserData> reader = new FlatFileItemReader<UserData>(); try { // BlobStorageからファイル(user_data.csv)を読み込む際のURLをreaderに設定 String url = demoBlobService.getBlobUrl("user_data.csv"); reader.setResource(new UrlResource(url)); } catch (MalformedURLException ex) { throw new RuntimeException(ex); } // ファイルを読み込む際の文字コード reader.setEncoding("UTF-8"); // 1行目を読み飛ばす reader.setLinesToSkip(1); // ファイルから読み込んだデータをUserDataオブジェクトに格納 reader.setLineMapper(new DefaultLineMapper<UserData>() { { setLineTokenizer(new DelimitedLineTokenizer() { { setNames(new String[] { "id", "name", "birth_year" , "birth_month", "birth_day", "sex", "memo" }); } }); setFieldSetMapper(new BeanWrapperFieldSetMapper<UserData>() { { setTargetType(UserData.class); } }); } }); return reader; } /** * 読み込んだファイルのデータを、DBに書き込む. * @return 書き込みオブジェクト */ @Bean public MyBatisBatchItemWriter<UserData> writer() { return new MyBatisBatchItemWriterBuilder<UserData>() .sqlSessionFactory(sqlSessionFactory) .statementId("com.example.mybatis.UserDataMapper.upsert") .assertUpdates(true) // 楽観ロックエラーを有効にする .build(); } /** * Spring Batchのジョブ内で指定する処理単位(ステップ)を定義する. * @param reader 読み込みオブジェクト * @param writer 書き込みオブジェクト * @return Spring Batchのジョブ内で指定する処理単位 */ @Bean public Step step(ItemReader<UserData> reader, ItemWriter<UserData> writer) { // 生成するステップ内で読み込み/加工/加工後/書き込み/書き込み後/ステップ実行前後の処理の // 一連の流れを指定する // その際のチャンクサイズ(=何件読み込む毎にコミットするか)を3に指定している return stepBuilderFactory.get("step") .<UserData, UserData>chunk(3) .reader(reader) .processor(demoUpdateProcessor) .listener(demoProcessorListener) .writer(writer) .listener(demoUpdateListener) .listener(demoStepListener) .build(); } /** * Spring Batchのジョブを定義する. * @param jobListener 実行前後の処理(リスナ) * @param step 実行するステップ * @return Spring Batchのジョブ */ @Bean public Job updateUserData(DemoJobListener jobListener, Step step) { // 生成するジョブ内で、実行前後の処理(リスナ)と処理単位(ステップ)を指定する return jobBuilderFactory .get("updateUserData") .incrementer(new RunIdIncrementer()) .listener(jobListener) .flow(step) .end() .build(); } }
package com.example.batch;

import java.net.MalformedURLException;

import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.batch.MyBatisBatchItemWriter;
import org.mybatis.spring.batch.builder.MyBatisBatchItemWriterBuilder;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.UrlResource;

import com.example.mybatis.model.UserData;
import com.example.service.DemoBlobService;

import lombok.RequiredArgsConstructor;

@Configuration
@EnableBatchProcessing
@RequiredArgsConstructor
public class DemoChunkConfig {

  /** ジョブ生成ファクトリ */
  public final JobBuilderFactory jobBuilderFactory;

  /** ステップ生成ファクトリ */
  public final StepBuilderFactory stepBuilderFactory;

  /** SQLセッションファクトリ */
  public final SqlSessionFactory sqlSessionFactory;

  /** データ加工処理 */
  private final DemoUpdateProcessor demoUpdateProcessor;
  
  /** データ加工前後処理 */
  private final DemoProcessorListener demoProcessorListener;
  
  /** データ書き込み前後処理 */
  private final DemoUpdateListener demoUpdateListener;
  
  /** ステップの前後処理 */
  private final DemoStepListener demoStepListener;
  
  /** BLOBへアクセスするサービス */
  @Autowired
  private DemoBlobService demoBlobService;

  /**
   * BlobStorageからファイルを読み込む.
   * @return 読み込みオブジェクト
   */
  @Bean
  public FlatFileItemReader<UserData> reader() {
    FlatFileItemReader<UserData> reader = new FlatFileItemReader<UserData>();
    try {
      // BlobStorageからファイル(user_data.csv)を読み込む際のURLをreaderに設定
      String url = demoBlobService.getBlobUrl("user_data.csv");
      reader.setResource(new UrlResource(url));
    } catch (MalformedURLException ex) {
      throw new RuntimeException(ex);
    }
    // ファイルを読み込む際の文字コード
    reader.setEncoding("UTF-8");
    // 1行目を読み飛ばす
    reader.setLinesToSkip(1);
    // ファイルから読み込んだデータをUserDataオブジェクトに格納
    reader.setLineMapper(new DefaultLineMapper<UserData>() {
      {
        setLineTokenizer(new DelimitedLineTokenizer() {
          {
            setNames(new String[] { "id", "name", "birth_year"
                , "birth_month", "birth_day", "sex", "memo" });
          }
        });
        setFieldSetMapper(new BeanWrapperFieldSetMapper<UserData>() {
          {
            setTargetType(UserData.class);
          }
        });
      }
    });
    return reader;
  }

  /**
   * 読み込んだファイルのデータを、DBに書き込む.
   * @return 書き込みオブジェクト
   */
  @Bean
  public MyBatisBatchItemWriter<UserData> writer() {
    return new MyBatisBatchItemWriterBuilder<UserData>()
        .sqlSessionFactory(sqlSessionFactory)
        .statementId("com.example.mybatis.UserDataMapper.upsert")
        .assertUpdates(true)  // 楽観ロックエラーを有効にする
        .build();
  }

  /**
   * Spring Batchのジョブ内で指定する処理単位(ステップ)を定義する.
   * @param reader 読み込みオブジェクト
   * @param writer 書き込みオブジェクト
   * @return Spring Batchのジョブ内で指定する処理単位
   */
  @Bean
  public Step step(ItemReader<UserData> reader, ItemWriter<UserData> writer) {
    // 生成するステップ内で読み込み/加工/加工後/書き込み/書き込み後/ステップ実行前後の処理の
    // 一連の流れを指定する
    // その際のチャンクサイズ(=何件読み込む毎にコミットするか)を3に指定している
    return stepBuilderFactory.get("step")
        .<UserData, UserData>chunk(3)
        .reader(reader)
        .processor(demoUpdateProcessor)
        .listener(demoProcessorListener)
        .writer(writer)
        .listener(demoUpdateListener)
        .listener(demoStepListener)
        .build();
  }

  /**
   * Spring Batchのジョブを定義する.
   * @param jobListener 実行前後の処理(リスナ)
   * @param step        実行するステップ
   * @return Spring Batchのジョブ
   */
  @Bean
  public Job updateUserData(DemoJobListener jobListener, Step step) {
    // 生成するジョブ内で、実行前後の処理(リスナ)と処理単位(ステップ)を指定する
    return jobBuilderFactory
        .get("updateUserData")
        .incrementer(new RunIdIncrementer())
        .listener(jobListener)
        .flow(step)
        .end()
        .build();
  }
}

その他のソースコード内容は、以下のサイトを参照のこと。
https://github.com/purin-it/azure/tree/master/azure-functions-csv-to-db-opt-lock/demoAzureFunc



サラリーマン型フリーランスSEという働き方でお金の不安を解消しよう先日、「サラリーマン型フリーランスSE」という働き方を紹介するYouTube動画を視聴しましたので、その内容をご紹介します。 「サ...

サンプルプログラムの実行結果

サンプルプログラムの実行結果は、以下の通り。

1) バッチ実行前のBlob・DBの状態は、それぞれ以下の通り。
サンプルプログラムの実行結果_1_1

<配置したCSVファイル(user_data.csv)>
サンプルプログラムの実行結果_1_2

サンプルプログラムの実行結果_1_3

2) 以下のサイトの「サンプルプログラムの実行結果(ローカル)」「サンプルプログラムの実行結果(Azure上)」に記載の手順で、サンプルプログラムをAzure Functionsにデプロイする。

Azure Functions上でTimerTriggerによって動作するJavaアプリケーション(Spring Boot上)を作成してみたこれまでは、HTTPリクエストによりAzure Functionsが動作するアプリケーションのみ作成してきたが、Timer Trigge...

3) バッチ実行後のBlob・DBの状態は、それぞれ以下の通りで、楽観ロックエラーになったid=8付近の、id=7以上のデータが更新されていないことが確認できる。また、エラーになっていない、id=6以下のVERSIONが1ずつインクリメントされていることが確認できる。
サンプルプログラムの実行結果_3_1

<作成された結果ファイル(result.txt)>
サンプルプログラムの実行結果_3_2

サンプルプログラムの実行結果_3_3

4) ログの出力結果は以下の通りで、id=8でエラーが発生していることが確認できる。
サンプルプログラムの実行結果_4

要点まとめ

  • Spring BatchのChunkモデルで、DBのデータを読み込んだ後に書き込みを行えるようにするには、application.propertiesをバッチモードに設定する必要がある。
  • Spring BatchのChunkモデルで、DBのテーブルに書き込む際に楽観ロックエラーにするには、MyBatisBatchItemWriterのassertUpdatesプロパティの値を、trueに設定すればよい。