TimerTrigger/SpringBatch

Azure Function上でCSVファイルのデータをDBに書き込む処理をカスタマイズしてみた

これまでこのブログで、Spring BatchのChunkモデルを用いて、Blob上のCSVファイルをDBのテーブルに書き込む処理を作成していたが、DBのテーブルに書き込む処理で、MyBatisBatchItemWriterクラスを使わず、カスタマイズすることもできる。

今回は、Blob上のCSVファイルをDBのテーブルに書き込む処理をカスタマイズしてみたので、そのサンプルプログラムを共有する。

前提条件

下記記事のサンプルプログラムを作成済であること。

Azure Function上でバッチモードとそれ以外のデータソースを混在させてみたAzure Function上で楽観ロックを実装する場合、DB接続する際のSqlSessionを生成する際に、Spring Batchの...

サンプルプログラムの作成

作成したサンプルプログラムの構成は以下の通り。

なお、上記の赤枠は、今回追加・変更したプログラムである。

カスタマイズしたDB書き込み処理は以下の通りで、writeメソッド内で各SQLを実行したり、更新件数を確認する処理を追加している。

package com.example.batch;

import java.util.List;

import javax.annotation.PostConstruct;

import org.apache.ibatis.executor.BatchResult;
import org.apache.ibatis.session.ExecutorType;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionTemplate;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.dao.InvalidDataAccessResourceUsageException;
import org.springframework.stereotype.Component;

import com.example.mybatis.batch.UserDataMapperBatch;
import com.example.mybatis.model.UserData;

@Component
public class DemoItemWriter implements ItemWriter<UserData> {
  
  /** USER_DATAテーブルにアクセスするためのMapper */
  @Autowired
  private UserDataMapperBatch userDataMapperBatch;
  
  /** SQLセッションファクトリ */
  @Autowired
  @Qualifier("sqlSessionFactoryBatch")
  private SqlSessionFactory sqlSessionFactoryBatch;
  
  /** SQLセッションテンプレート */
  private SqlSessionTemplate sqlSessionTemplate  = null;
  
  /**
   * 更新件数を取得するため、SQLセッションテンプレートを初期化
   */
  @PostConstruct
  public void init() {
    sqlSessionTemplate = new SqlSessionTemplate(
        sqlSessionFactoryBatch, ExecutorType.BATCH);
  }
  
  /**
   * 読み込んだファイルのデータを、DBに書き込む処理を定義する.
   */
  @Override
  public void write(List<? extends UserData> items) {
    for(UserData item : items) {
      // 指定したIDをもつUserDataオブジェクトを、DBからロックをかけて取得する
      Integer id = Integer.parseInt(item.getId());
      UserData userData = userDataMapperBatch.findByIdRowLock(id);
      
      // id=8の場合、バージョンを更新し、楽観ロックエラーにする
      /* if(id == 8) {
        userDataMapperBatch.updateVersion(userData);
      } */
      
      // DBにデータが無ければ追加、あれば更新する
      if(userData == null) {
        userDataMapperBatch.insert(item);
      } else {
        userDataMapperBatch.update(item);
      }
      
      // 更新件数を確認し、1件でなければエラーとする
      List<BatchResult> results = sqlSessionTemplate.flushStatements();
      if(results == null || results.size() != 1) {
        throw new InvalidDataAccessResourceUsageException(
            "楽観ロックエラーが発生しました。");
      }
    }
  }

}

また、バッチ処理のMapperインタフェース・XMLファイルの内容は以下の通りで、ロックをかけないデータ取得・追加・更新処理を追加している。

package com.example.mybatis.batch;

import org.apache.ibatis.annotations.Mapper;
import com.example.mybatis.model.UserData;

@Mapper
public interface UserDataMapperBatch {

  /**
   * 引数のIDをキーにもつUserDataオブジェクトを行ロックをかけ取得する.
   * @param id ID
   * @return UserDataオブジェクト
   */
  UserData findByIdRowLock(Integer id);
  
  /**
   * 引数のUserDataオブジェクトのバージョンを更新する.
   * @param userData UserDataオブジェクト
   */
  void updateVersion(UserData userData);

  /**
   * 引数のIDをキーにもつUserDataオブジェクトを取得する.
   * @param id ID
   * @return UserDataオブジェクト
   */
  UserData findById(Integer id);
  
  /**
   * 引数のUserDataオブジェクトをDBに追加する.
   * @param userData UserDataオブジェクト
   */
  void insert(UserData userData);
  
  /**
   * DBにある引数のUserDataオブジェクトを更新する.
   * @param userData UserDataオブジェクト
   */
  void update(UserData userData);
}
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
    PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
    "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.example.mybatis.batch.UserDataMapperBatch">
  <select id="findByIdRowLock" parameterType="java.lang.Integer" 
        resultType="com.example.mybatis.model.UserData">
    SELECT id, name, birth_year, birth_month, birth_day, sex, memo, version 
    FROM USER_DATA WITH(ROWLOCK, UPDLOCK, NOWAIT) <!-- Azure SQL Databaseで行ロックをかける -->
    WHERE id = #{id} 
  </select>
  <update id="updateVersion" parameterType="com.example.mybatis.model.UserData">
    UPDATE USER_DATA SET version = #{version} + 2 WHERE id = #{id}
  </update>
  <select id="findById" parameterType="java.lang.Integer" 
        resultType="com.example.mybatis.model.UserData">
    SELECT id, name, birth_year, birth_month, birth_day, sex, memo, version 
    FROM USER_DATA
    WHERE id = #{id} 
  </select>
  <insert id="insert" parameterType="com.example.mybatis.model.UserData">
    INSERT INTO USER_DATA ( id, name, birth_year, birth_month, birth_day
        , sex, memo, version ) 
    VALUES (#{id}, #{name}, #{birth_year}, #{birth_month}, #{birth_day}
        , #{sex}, #{memo}, #{version})
  </insert>
  <update id="update" parameterType="com.example.mybatis.model.UserData">
    UPDATE USER_DATA SET name = #{name}, birth_year = #{birth_year}
      , birth_month = #{birth_month}, birth_day = #{birth_day}, sex = #{sex}
      , memo = #{memo}, version = #{version} + 1
    WHERE id = #{id} AND version = #{version}
  </update>
</mapper>

さらに、Spring Batchの定義クラスは以下の通りで、DB書き込み処理でDemoItemWriterクラスを利用するよう修正している。

package com.example.batch;

import java.net.MalformedURLException;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.UrlResource;

import com.example.mybatis.model.UserData;
import com.example.service.DemoBlobService;

import lombok.RequiredArgsConstructor;

@Configuration
@EnableBatchProcessing
@RequiredArgsConstructor
public class DemoChunkConfig {

  /** ジョブ生成ファクトリ */
  public final JobBuilderFactory jobBuilderFactory;

  /** ステップ生成ファクトリ */
  public final StepBuilderFactory stepBuilderFactory;

  /** データ加工処理 */
  private final DemoUpdateProcessor demoUpdateProcessor;
  
  /** データ加工前後処理 */
  private final DemoProcessorListener demoProcessorListener;
  
  /** データ書き込み前後処理 */
  private final DemoUpdateListener demoUpdateListener;
  
  /** データ書き込み処理 */
  private final DemoItemWriter demoItemWriter;
  
  /** ステップの前後処理 */
  private final DemoStepListener demoStepListener;
  
  /** BLOBへアクセスするサービス */
  @Autowired
  private DemoBlobService demoBlobService;

  /**
   * BlobStorageからファイルを読み込む.
   * @return 読み込みオブジェクト
   */
  @Bean
  public FlatFileItemReader<UserData> reader() {
    FlatFileItemReader<UserData> reader = new FlatFileItemReader<UserData>();
    try {
      // BlobStorageからファイル(user_data.csv)を読み込む際のURLをreaderに設定
      String url = demoBlobService.getBlobUrl("user_data.csv");
      reader.setResource(new UrlResource(url));
    } catch (MalformedURLException ex) {
      throw new RuntimeException(ex);
    }
    // ファイルを読み込む際の文字コード
    reader.setEncoding("UTF-8");
    // 1行目を読み飛ばす
    reader.setLinesToSkip(1);
    // ファイルから読み込んだデータをUserDataオブジェクトに格納
    reader.setLineMapper(new DefaultLineMapper<UserData>() {
      {
        setLineTokenizer(new DelimitedLineTokenizer() {
          {
            setNames(new String[] { "id", "name", "birth_year"
                , "birth_month", "birth_day", "sex", "memo" });
          }
        });
        setFieldSetMapper(new BeanWrapperFieldSetMapper<UserData>() {
          {
            setTargetType(UserData.class);
          }
        });
      }
    });
    return reader;
  }

  /**
   * Spring Batchのジョブ内で指定する処理単位(ステップ)を定義する.
   * @param reader 読み込みオブジェクト
   * @return Spring Batchのジョブ内で指定する処理単位
   */
  @Bean
  public Step step(ItemReader<UserData> reader) {
    // 生成するステップ内で読み込み/加工/加工後/書き込み/書き込み後/ステップ実行前後の処理の
    // 一連の流れを指定する
    // その際のチャンクサイズ(=何件読み込む毎にコミットするか)を3に指定している
    return stepBuilderFactory.get("step")
        .<UserData, UserData>chunk(3)
        .reader(reader)
        .processor(demoUpdateProcessor)
        .listener(demoProcessorListener)
        .writer(demoItemWriter)
        .listener(demoUpdateListener)
        .listener(demoStepListener)
        .build();
  }

  /**
   * Spring Batchのジョブを定義する.
   * @param jobListener 実行前後の処理(リスナ)
   * @param step        実行するステップ
   * @return Spring Batchのジョブ
   */
  @Bean
  public Job updateUserData(DemoJobListener jobListener, Step step) {
    // 生成するジョブ内で、実行前後の処理(リスナ)と処理単位(ステップ)を指定する
    return jobBuilderFactory
        .get("updateUserData")
        .incrementer(new RunIdIncrementer())
        .listener(jobListener)
        .flow(step)
        .end()
        .build();
  }
}

また、データ加工前後の処理を定義したクラスは以下の通りで、バージョンを取得する処理でロックをかけないようにしている。

package com.example.batch;

import org.springframework.batch.core.ItemProcessListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import com.example.mybatis.batch.UserDataMapperBatch;
import com.example.mybatis.model.UserData;

@Component
public class DemoProcessorListener implements ItemProcessListener<UserData, UserData> {

  @Autowired
  private UserDataMapperBatch userDataMapperBatch;
  
  /**
   * データ加工前の処理を定義する.
   */
  @Override
  public void beforeProcess(UserData item) {
    // 何もしない  
  }

  /**
   * データ加工後の処理を定義する.
   */
  @Override
  @Transactional
  public void afterProcess(UserData item, UserData result) {
    // 処理結果がnullの場合は、処理を終了する
    if(result == null) {
      return;
    }
    
    // 既に登録済データのVERSIONを取得する
    Integer id = Integer.parseInt(result.getId());
    UserData ud = userDataMapperBatch.findById(id);
    
    // バージョンの値を結果に設定する
    if(ud != null) {
      result.setVersion(ud.getVersion());
    } else {
      result.setVersion(0);
    }
  }

  /**
   * データ編集エラー後の処理を定義する.
   */
  @Override
  public void onProcessError(UserData item, Exception e) {
    // 何もしない
  }

}

その他のソースコード内容は、以下のサイトを参照のこと。
https://github.com/purin-it/azure/tree/master/azure-functions-customize-db-write/demoAzureFunc

サンプルプログラムの実行結果

サンプルプログラムの実行結果は、以下の記事の「サンプルプログラムの実行結果」と同じ内容となる。

Azure Function上でCSVファイルのデータをDBに取り込むプログラムで楽観ロックを使ってみたこれまでこのブログで、Spring BatchのChunkモデルを用いて、Blob上のCSVファイルをDBのテーブルに書き込む処理を作成...

要点まとめ

  • Spring BatchのChunkモデルを用いて、Blob上のCSVファイルをDBのテーブルに書き込む処理を、ItemWriterインタフェースを継承してカスタマイズすることができる。