TimerTrigger/SpringBatch

Azure Function上でバッチモードとそれ以外のデータソースを混在させてみた

Azure Function上で楽観ロックを実装する場合、DB接続する際のSqlSessionを生成する際に、Spring BatchのChunkモデルを用いたバッチ処理ではバッチモード(ExecutorType.BATCH)を利用し、それ以外のオンラインからの呼出ではExecutorType.SIMPLEを利用する必要がある。

今回は、バッチモードとそれ以外で、DB接続設定を使い分けてみたので、そのサンプルプログラムを共有する。

前提条件

下記記事のサンプルプログラムを作成済であること。

Azure Function上でCSVファイルのデータをDBに取り込むプログラムで楽観ロックを使ってみたこれまでこのブログで、Spring BatchのChunkモデルを用いて、Blob上のCSVファイルをDBのテーブルに書き込む処理を作成...

サンプルプログラムの作成

作成したサンプルプログラムの構成は、以下の通り。
サンプルプログラムの構成
なお、上記の赤枠は、今回追加・変更したプログラムである。

バッチ処理のDB接続設定の内容は以下の通りで、SQLセッションファクトリを生成する際に、バッチモードに設定する処理を追加している。

package com.example.config;

import org.apache.ibatis.mapping.Environment;
import org.springframework.context.annotation.Configuration;
import org.apache.ibatis.session.ExecutorType;
import org.apache.ibatis.session.SqlSessionFactory;
import org.apache.ibatis.transaction.jdbc.JdbcTransactionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;

import javax.sql.DataSource;

@Configuration
@MapperScan(basePackages = {"com.example.mybatis.batch"}
    , sqlSessionFactoryRef = "sqlSessionFactoryBatch")
public class DemoBatchDataSourceConfig {

  /**
   * バッチで利用するためのデータソースプロパティを生成する
   * @return バッチで利用するためのデータソースプロパティ
   */
  @Bean(name = {"datasourceBatchProperties"})
  @Primary
  @ConfigurationProperties(prefix = "spring.datasource")
  public DataSourceProperties datasourceBatchProperties() {
    return new DataSourceProperties();
  }

  /**
   * バッチで利用するためのデータソースを生成する
   * @param properties バッチで利用するためのデータソースプロパティ
   * @return バッチで利用するためのデータソース
   */
  @Bean(name = {"dataSourceBatch"})
  @Primary
  public DataSource datasourceBatch(
      @Qualifier("datasourceBatchProperties") DataSourceProperties properties) {
    return properties.initializeDataSourceBuilder().build();
  }

  /**
   * バッチで利用するためのトランザクションマネージャを生成する
   * @param dataSourceBatch バッチで利用するためのデータソース
   * @return バッチで利用するためのトランザクションマネージャ
   */
  @Bean(name = {"txManagerBatch"})
  @Primary
  public PlatformTransactionManager txManagerBatch(
      @Qualifier("dataSourceBatch") DataSource dataSourceBatch) {
    return new DataSourceTransactionManager(dataSourceBatch);
  }

  /**
   * バッチで利用するためのSQLセッションファクトリを生成する
   * @param dataSourceBatch バッチで利用するためのデータソース
   * @return バッチで利用するためのSQLセッションファクトリ
   * @throws Exception 任意例外
   */
  @Bean(name = {"sqlSessionFactoryBatch"})
  @Primary
  public SqlSessionFactory sqlSessionFactory(
      @Qualifier("dataSourceBatch") DataSource dataSourceBatch) throws Exception {
    SqlSessionFactoryBean sqlSessionFactory = new SqlSessionFactoryBean();
    sqlSessionFactory.setDataSource(dataSourceBatch);

    // MyBatisでバッチモードで処理できるよう設定を変更
    Environment env = new Environment("development"
      , new JdbcTransactionFactory(), dataSourceBatch);
    org.apache.ibatis.session.Configuration config 
      = new org.apache.ibatis.session.Configuration(env);
    config.setDefaultExecutorType(ExecutorType.BATCH);
    sqlSessionFactory.setConfiguration(config);
    
    return sqlSessionFactory.getObject();
  }
}

また、オンライン処理のDB接続設定の内容は以下の通りで、こちらはバッチモードの設定を含めていない。

package com.example.config;

import org.springframework.context.annotation.Configuration;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;

import javax.sql.DataSource;

@Configuration
@MapperScan(basePackages = {"com.example.mybatis.online"}
    , sqlSessionFactoryRef = "sqlSessionFactoryOnline")
public class DemoOnlineDataSourceConfig {

  /**
   * オンラインで利用するためのデータソースプロパティを生成する
   * @return オンラインで利用するためのデータソースプロパティ
   */
  @Bean(name = {"datasourceOnlineProperties"})
  @ConfigurationProperties(prefix = "spring.datasource.online")
  public DataSourceProperties datasourceOnlineProperties() {
    return new DataSourceProperties();
  }

  /**
   * オンラインで利用するためのデータソースを生成する
   * @param properties オンラインで利用するためのデータソースプロパティ
   * @return オンラインで利用するためのデータソース
   */
  @Bean(name = {"dataSourceOnline"})
  public DataSource datasourceOnline(
      @Qualifier("datasourceOnlineProperties") DataSourceProperties properties) {
    return properties.initializeDataSourceBuilder().build();
  }

  /**
   * オンラインで利用するためのトランザクションマネージャを生成する
   * @param dataSourceOnline オンラインで利用するためのデータソース
   * @return オンラインで利用するためのトランザクションマネージャ
   */
  @Bean(name = {"txManagerOnline"})
  public PlatformTransactionManager txManagerOnline(
      @Qualifier("dataSourceOnline") DataSource dataSourceOnline) {
    return new DataSourceTransactionManager(dataSourceOnline);
  }

  /**
   * オンラインで利用するためのSQLセッションファクトリを生成する
   * @param dataSourceOnline オンラインで利用するためのデータソース
   * @return オンラインで利用するためのSQLセッションファクトリ
   * @throws Exception 任意例外
   */
  @Bean(name = {"sqlSessionFactoryOnline"})
  public SqlSessionFactory sqlSessionFactory(
      @Qualifier("dataSourceOnline") DataSource dataSourceOnline) throws Exception {
    SqlSessionFactoryBean sqlSessionFactory = new SqlSessionFactoryBean();
    sqlSessionFactory.setDataSource(dataSourceOnline);
    return sqlSessionFactory.getObject();
  }
}

さらに、application.propertiesの設定は以下の通りで、オンラインのDB接続設定を追加し、バッチモードの設定をコメントアウトしている。

# Azure Storageの接続先
azure.storage.accountName=azureblobpurinit
azure.storage.accessKey=(Azure Blob Storageのアクセスキー)
azure.storage.blob-endpoint=https://azureblobpurinit.blob.core.windows.net
azure.storage.containerName=blobcontainer

# DB接続設定(バッチ)
spring.datasource.url=jdbc:sqlserver://azure-db-purinit.database.windows.net:1433;database=azureSqlDatabase
spring.datasource.username=purinit@azure-db-purinit
spring.datasource.password=(DBのパスワード)
spring.datasource.driverClassName=com.microsoft.sqlserver.jdbc.SQLServerDriver

# DB接続設定(オンライン)
spring.datasource.online.url=jdbc:sqlserver://azure-db-purinit.database.windows.net:1433;database=azureSqlDatabase
spring.datasource.online.username=purinit@azure-db-purinit
spring.datasource.online.password=(DBのパスワード)
spring.datasource.online.driverClassName=com.microsoft.sqlserver.jdbc.SQLServerDriver

# MyBatisでバッチモードで処理できるよう設定を変更
#mybatis.executor-type=BATCH
#mybatis.configuration.default-executor-type=BATCH

また、pom.xmlの内容は以下の通りで、@ConfigurationPropertiesアノテーションを利用するための設定を追加している。

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.example</groupId>
  <artifactId>demoAzureFunc</artifactId>
  <version>0.0.1-SNAPSHOT</version>

  <name>Hello Spring Function on Azure</name>

  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.3.4.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
  </parent>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <azure.functions.maven.plugin.version>1.9.0</azure.functions.maven.plugin.version>

    <!-- customize those properties. The functionAppName should be unique across Azure -->
    <functionResourceGroup>azureAppDemo</functionResourceGroup>
    <functionAppName>azureFuncDemoApp</functionAppName>
    <functionAppServicePlan>ASP-azureAppDemo-8679</functionAppServicePlan>
    <functionPricingTier>B1</functionPricingTier>

    <functionAppRegion>japaneast</functionAppRegion>
    <stagingDirectory>${project.build.directory}/azure-functions/${functionAppName}</stagingDirectory>
    <start-class>com.example.DemoAzureFunction</start-class>
    <spring.boot.wrapper.version>1.0.25.RELEASE</spring.boot.wrapper.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-function-adapter-azure</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-function-web</artifactId>
      <scope>provided</scope>
    </dependency>
    <!-- lombokを利用するための設定 -->
		<dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <scope>provided</scope>
    </dependency>
    <!-- Spring Batchを利用するための設定 -->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-batch</artifactId>
    </dependency>
    <!-- Azure Storageの設定 -->
    <dependency>
      <groupId>com.microsoft.azure</groupId>
      <artifactId>azure-storage</artifactId>
      <version>8.3.0</version>
    </dependency>
    <!-- Azure StorageでSASトークンを利用するための設定 -->
    <dependency>
      <groupId>com.azure</groupId>
      <artifactId>azure-storage-blob</artifactId>
      <version>12.10.0</version>
    </dependency>
    <!-- SQL Serverを利用するための設定 -->
    <dependency>
      <groupId>com.microsoft.sqlserver</groupId>
      <artifactId>mssql-jdbc</artifactId>
    </dependency>
    <!-- MyBatisを利用するための設定 -->
    <dependency>
      <groupId>org.mybatis.spring.boot</groupId>
      <artifactId>mybatis-spring-boot-starter</artifactId>
      <version>2.1.1</version>
    </dependency>
    <!-- @ConfigurationPropertiesアノテーションを利用するための設定 -->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-configuration-processor</artifactId>
      <optional>true</optional>
    </dependency>
    <!-- Test -->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
    </dependency>
  </dependencies>

  <dependencyManagement>
    <dependencies>
      <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-function-dependencies</artifactId>
        <version>3.1.2</version>
        <type>pom</type>
        <scope>import</scope>
      </dependency>
    </dependencies>
  </dependencyManagement>

  <build>
    <pluginManagement>
      <plugins>
        <plugin>
          <groupId>com.microsoft.azure</groupId>
          <artifactId>azure-functions-maven-plugin</artifactId>
          <version>${azure.functions.maven.plugin.version}</version>
        </plugin>
        <plugin>
          <groupId>org.apache.maven.plugins</groupId>
          <artifactId>maven-resources-plugin</artifactId>
          <version>3.1.0</version>
        </plugin>
        <plugin>
          <groupId>org.apache.maven.plugins</groupId>
          <artifactId>maven-dependency-plugin</artifactId>
          <version>3.1.2</version>
        </plugin>
        <plugin>
          <groupId>org.apache.maven.plugins</groupId>
          <artifactId>maven-clean-plugin</artifactId>
          <version>3.1.0</version>
        </plugin>
      </plugins>
    </pluginManagement>

    <plugins>
      <plugin>
        <groupId>com.microsoft.azure</groupId>
        <artifactId>azure-functions-maven-plugin</artifactId>
        <configuration>
          <resourceGroup>${functionResourceGroup}</resourceGroup>
          <appName>${functionAppName}</appName>
          <appServicePlanName>${functionAppServicePlan}</appServicePlanName>
          <region>${functionAppRegion}</region>
          <pricingTier>${functionPricingTier}</pricingTier>
          <runtime>
          	<os>Linux</os>
          	<javaVersion>8</javaVersion>
          </runtime>
          <appSettings>
            <!-- Run Azure Function from package file by default -->
            <property>
              <name>WEBSITE_RUN_FROM_PACKAGE</name>
              <value>1</value>
            </property>
            <property>
              <name>FUNCTIONS_EXTENSION_VERSION</name>
              <value>~3</value>
            </property>
            <property>
              <name>FUNCTIONS_WORKER_RUNTIME</name>
              <value>java</value>
            </property>
          </appSettings>
        </configuration>
        <executions>
          <execution>
            <id>package-functions</id>
            <goals>
              <goal>package</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
      <plugin>
        <artifactId>maven-resources-plugin</artifactId>
        <executions>
          <execution>
            <id>copy-resources</id>
            <phase>package</phase>
            <goals>
              <goal>copy-resources</goal>
            </goals>
            <configuration>
              <overwrite>true</overwrite>
              <outputDirectory>
                ${project.build.directory}/azure-functions/${functionAppName}
              </outputDirectory>
              <resources>
                <resource>
                  <directory>${project.basedir}/src/main/azure
                  </directory>
                  <includes>
                    <include>**</include>
                  </includes>
                </resource>
              </resources>
            </configuration>
          </execution>
        </executions>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-dependency-plugin</artifactId>
        <executions>
          <execution>
            <id>copy-dependencies</id>
            <phase>prepare-package</phase>
            <goals>
              <goal>copy-dependencies</goal>
            </goals>
            <configuration>
              <outputDirectory>${stagingDirectory}/lib</outputDirectory>
              <overWriteReleases>false</overWriteReleases>
              <overWriteSnapshots>false</overWriteSnapshots>
              <overWriteIfNewer>true</overWriteIfNewer>
              <includeScope>runtime</includeScope>
            </configuration>
          </execution>
        </executions>
      </plugin>
      <!--Remove obj folder generated by .NET SDK in maven clean-->
      <plugin>
        <artifactId>maven-clean-plugin</artifactId>
        <configuration>
          <filesets>
            <fileset>
              <directory>obj</directory>
            </fileset>
          </filesets>
        </configuration>
      </plugin>
      <plugin>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-maven-plugin</artifactId>
        <dependencies>
          <dependency>
            <groupId>org.springframework.boot.experimental</groupId>
            <artifactId>spring-boot-thin-layout</artifactId>
            <version>${spring.boot.wrapper.version}</version>
          </dependency>
        </dependencies>
      </plugin>
    </plugins>
  </build>

  <repositories>
    <repository>
      <id>spring-snapshots</id>
      <name>Spring Snapshots</name>
      <url>https://repo.spring.io/plugins-snapshot</url>
      <snapshots>
        <enabled>true</enabled>
      </snapshots>
      <releases>
        <enabled>false</enabled>
      </releases>
    </repository>
    <repository>
      <id>spring-milestones</id>
      <name>Spring Milestones</name>
      <url>https://repo.spring.io/plugins-milestone</url>
      <snapshots>
        <enabled>false</enabled>
      </snapshots>
    </repository>
  </repositories>
  <pluginRepositories>
    <pluginRepository>
      <id>spring-snapshots</id>
      <name>Spring Snapshots</name>
      <url>https://repo.spring.io/plugins-snapshot</url>
      <snapshots>
        <enabled>true</enabled>
      </snapshots>
      <releases>
        <enabled>false</enabled>
      </releases>
    </pluginRepository>
    <pluginRepository>
      <id>spring-milestones</id>
      <name>Spring Milestones</name>
      <url>https://repo.spring.io/plugins-milestone</url>
      <snapshots>
        <enabled>false</enabled>
      </snapshots>
    </pluginRepository>
  </pluginRepositories>
</project>

さらに、バッチ処理のMapperインタフェース・XMLファイルの内容は以下の通りで、いずれも「com.example.mybatis.batch」フォルダ下に設定している。

package com.example.mybatis.batch;

import org.apache.ibatis.annotations.Mapper;
import com.example.mybatis.model.UserData;

@Mapper
public interface UserDataMapperBatch {

  /**
   * DBにUserDataオブジェクトがあれば更新し、なければ追加する.
   * @param userData UserDataオブジェクト
   */
  void upsert(UserData userData);

  /**
   * 引数のIDをキーにもつUserDataオブジェクトを行ロックをかけ取得する.
   * @param id ID
   * @return UserDataオブジェクト
   */
  UserData findByIdRowLock(Integer id);
  
  /**
   * 引数のUserDataオブジェクトのバージョンを更新する.
   * @param userData UserDataオブジェクト
   */
  void updateVersion(UserData userData);
}
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
    PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
    "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.example.mybatis.batch.UserDataMapperBatch">
  <update id="upsert" parameterType="com.example.mybatis.model.UserData">
    MERGE INTO USER_DATA AS u 
    USING ( SELECT #{id} id, #{version} version ) s
    ON ( u.id = s.id ) 
    WHEN MATCHED AND u.version = s.version THEN
       UPDATE SET name = #{name}, birth_year = #{birth_year}
         , birth_month = #{birth_month}, birth_day = #{birth_day}
         , sex = #{sex}, memo = #{memo}, version = #{version} + 1
    WHEN NOT MATCHED THEN
       INSERT ( id, name, birth_year, birth_month, birth_day
              , sex, memo, version ) 
       VALUES (#{id}, #{name}, #{birth_year}, #{birth_month}, #{birth_day}
              , #{sex}, #{memo}, #{version})
    ;
  </update>
  <select id="findByIdRowLock" parameterType="java.lang.Integer" 
      resultType="com.example.mybatis.model.UserData">
    SELECT id, name, birth_year, birth_month, birth_day, sex, memo, version 
    FROM USER_DATA WITH(ROWLOCK, UPDLOCK, NOWAIT) <!-- Azure SQL Databaseで行ロックをかける -->
    WHERE id = #{id} 
  </select>
  <update id="updateVersion" parameterType="com.example.mybatis.model.UserData">
    UPDATE USER_DATA SET version = #{version} + 2 WHERE id = #{id}
  </update>
</mapper>

また、オンライン処理のMapperインタフェース・XMLファイルの内容は以下の通りで、いずれも「com.example.mybatis.online」フォルダ下に設定している。

package com.example.mybatis.online;

import org.apache.ibatis.annotations.Mapper;
import com.example.mybatis.model.UserData;

@Mapper
public interface UserDataMapperOnline {

  /**
   * 引数のIDをキーにもつUserDataオブジェクトのバージョン番号を更新する.
   * @param id ID
   * @return 更新件数
   */
  int updateVersion(Integer id);
  
  /**
   * 引数のIDをキーにもつUserDataオブジェクトを取得する.
   * @param id ID
   * @return UserDataオブジェクト
   */
  UserData findById(Integer id);
}
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
    PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
    "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.example.mybatis.online.UserDataMapperOnline">
  <update id="updateVersion" parameterType="java.lang.Integer">
    UPDATE USER_DATA SET version = version + 1 WHERE id = #{id}
  </update>
  <select id="findById" parameterType="java.lang.Integer" 
      resultType="com.example.mybatis.model.UserData">
    SELECT id, name, birth_year, birth_month, birth_day, sex, memo, version 
    FROM USER_DATA
    WHERE id = #{id}
  </select>
</mapper>

さらに、バッチ処理のMapperインタフェースを呼び出す箇所を、それぞれ変更後のパスに変更している。

package com.example.batch;

import java.net.MalformedURLException;

import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.batch.MyBatisBatchItemWriter;
import org.mybatis.spring.batch.builder.MyBatisBatchItemWriterBuilder;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.UrlResource;

import com.example.mybatis.model.UserData;
import com.example.service.DemoBlobService;

import lombok.RequiredArgsConstructor;

@Configuration
@EnableBatchProcessing
@RequiredArgsConstructor
public class DemoChunkConfig {

  /** ジョブ生成ファクトリ */
  public final JobBuilderFactory jobBuilderFactory;

  /** ステップ生成ファクトリ */
  public final StepBuilderFactory stepBuilderFactory;

  /** SQLセッションファクトリ */
  public final SqlSessionFactory sqlSessionFactoryBatch;

  /** データ加工処理 */
  private final DemoUpdateProcessor demoUpdateProcessor;
  
  /** データ加工前後処理 */
  private final DemoProcessorListener demoProcessorListener;
  
  /** データ書き込み前後処理 */
  private final DemoUpdateListener demoUpdateListener;
  
  /** ステップの前後処理 */
  private final DemoStepListener demoStepListener;
  
  /** BLOBへアクセスするサービス */
  @Autowired
  private DemoBlobService demoBlobService;

  /**
   * BlobStorageからファイルを読み込む.
   * @return 読み込みオブジェクト
   */
  @Bean
  public FlatFileItemReader<UserData> reader() {
    FlatFileItemReader<UserData> reader = new FlatFileItemReader<UserData>();
    try {
      // BlobStorageからファイル(user_data.csv)を読み込む際のURLをreaderに設定
      String url = demoBlobService.getBlobUrl("user_data.csv");
      reader.setResource(new UrlResource(url));
    } catch (MalformedURLException ex) {
      throw new RuntimeException(ex);
    }
    // ファイルを読み込む際の文字コード
    reader.setEncoding("UTF-8");
    // 1行目を読み飛ばす
    reader.setLinesToSkip(1);
    // ファイルから読み込んだデータをUserDataオブジェクトに格納
    reader.setLineMapper(new DefaultLineMapper<UserData>() {
      {
        setLineTokenizer(new DelimitedLineTokenizer() {
          {
            setNames(new String[] { "id", "name", "birth_year"
                , "birth_month", "birth_day", "sex", "memo" });
          }
        });
        setFieldSetMapper(new BeanWrapperFieldSetMapper<UserData>() {
          {
            setTargetType(UserData.class);
          }
        });
      }
    });
    return reader;
  }

  /**
   * 読み込んだファイルのデータを、DBに書き込む.
   * @return 書き込みオブジェクト
   */
  @Bean
  public MyBatisBatchItemWriter<UserData> writer() {
    return new MyBatisBatchItemWriterBuilder<UserData>()
        .sqlSessionFactory(sqlSessionFactoryBatch)
        .statementId("com.example.mybatis.batch.UserDataMapperBatch.upsert")
        .assertUpdates(true)  // 楽観ロックエラーを有効にする
        .build();
  }

  /**
   * Spring Batchのジョブ内で指定する処理単位(ステップ)を定義する.
   * @param reader 読み込みオブジェクト
   * @param writer 書き込みオブジェクト
   * @return Spring Batchのジョブ内で指定する処理単位
   */
  @Bean
  public Step step(ItemReader<UserData> reader, ItemWriter<UserData> writer) {
    // 生成するステップ内で読み込み/加工/加工後/書き込み/書き込み後/ステップ実行前後の処理の
    // 一連の流れを指定する
    // その際のチャンクサイズ(=何件読み込む毎にコミットするか)を3に指定している
    return stepBuilderFactory.get("step")
        .<UserData, UserData>chunk(3)
        .reader(reader)
        .processor(demoUpdateProcessor)
        .listener(demoProcessorListener)
        .writer(writer)
        .listener(demoUpdateListener)
        .listener(demoStepListener)
        .build();
  }

  /**
   * Spring Batchのジョブを定義する.
   * @param jobListener 実行前後の処理(リスナ)
   * @param step        実行するステップ
   * @return Spring Batchのジョブ
   */
  @Bean
  public Job updateUserData(DemoJobListener jobListener, Step step) {
    // 生成するジョブ内で、実行前後の処理(リスナ)と処理単位(ステップ)を指定する
    return jobBuilderFactory
        .get("updateUserData")
        .incrementer(new RunIdIncrementer())
        .listener(jobListener)
        .flow(step)
        .end()
        .build();
  }
}
package com.example.batch;

import org.springframework.batch.core.ItemProcessListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import com.example.mybatis.batch.UserDataMapperBatch;
import com.example.mybatis.model.UserData;

@Component
public class DemoProcessorListener implements ItemProcessListener<UserData, UserData> {

  @Autowired
  private UserDataMapperBatch userDataMapperBatch;
  
  /**
   * データ加工前の処理を定義する.
   */
  @Override
  public void beforeProcess(UserData item) {
    // 何もしない  
  }

  /**
   * データ加工後の処理を定義する.
   */
  @Override
  @Transactional
  public void afterProcess(UserData item, UserData result) {
    // 処理結果がnullの場合は、処理を終了する
    if(result == null) {
      return;
    }
    
    // 既に登録済データのVERSIONを取得する
    Integer id = Integer.parseInt(result.getId());
    UserData ud = userDataMapperBatch.findByIdRowLock(id);
    
    // バージョンの値を結果に設定する
    if(ud != null) {
      result.setVersion(ud.getVersion());
    } else {
      result.setVersion(0);
    }
    
    // id=8の場合、バージョンを更新し、楽観ロックエラーにする
    /* if(id == 8) {
      userDataMapperBatch.updateVersion(result);
    } */
  }

  /**
   * データ編集エラー後の処理を定義する.
   */
  @Override
  public void onProcessError(UserData item, Exception e) {
    // 何もしない
  }

}

また、オンライン処理用のHandler、Service、Param、Resultをそれぞれ追加している。

package com.example;

import java.util.Optional;

import org.springframework.cloud.function.adapter.azure.FunctionInvoker;

import com.example.model.OnlineServiceParam;
import com.example.model.OnlineServiceResult;
import com.microsoft.azure.functions.ExecutionContext;
import com.microsoft.azure.functions.HttpMethod;
import com.microsoft.azure.functions.HttpRequestMessage;
import com.microsoft.azure.functions.HttpResponseMessage;
import com.microsoft.azure.functions.HttpStatus;
import com.microsoft.azure.functions.annotation.AuthorizationLevel;
import com.microsoft.azure.functions.annotation.FunctionName;
import com.microsoft.azure.functions.annotation.HttpTrigger;

public class OnlineHandler extends  FunctionInvoker<OnlineServiceParam, OnlineServiceResult> {

  /**
   * HTTP要求に応じて、HelloFunctionクラスのonlineメソッドを呼び出し、その戻り値をボディに設定したレスポンスを返す
   * @param request リクエストオブジェクト
   * @param context コンテキストオブジェクト
   * @return レスポンスオブジェクト
   */
  @FunctionName("online")
  public HttpResponseMessage execute(@HttpTrigger(name = "request", methods = HttpMethod.GET
          , authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request
      , ExecutionContext context) {
    // リクエストパラメータからidの値を取得
    String paramId = request.getQueryParameters().get("id");
    
    // オンラインサービス呼出用Paramを生成
    OnlineServiceParam onlineServiceParam = new OnlineServiceParam();
    onlineServiceParam.setId(paramId);
    
    // handleRequestメソッド内でHelloFunctionクラスのonlineメソッドを呼び出し、
    // その戻り値をボディに設定したレスポンスを、JSON形式で返す
    return request.createResponseBuilder(HttpStatus.OK)
        .body(handleRequest(onlineServiceParam, context))
        .header("Content-Type", "text/json").build();
  }
}
package com.example.service;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import com.example.model.OnlineServiceParam;
import com.example.model.OnlineServiceResult;
import com.example.mybatis.model.UserData;
import com.example.mybatis.online.UserDataMapperOnline;

@Service
public class OnlineService {

  /* Spring Bootでログ出力するためのLogbackのクラスを生成 */
  private static final Logger LOGGER = LoggerFactory.getLogger(OnlineService.class);
  
  @Autowired
  private UserDataMapperOnline userDataMapperOnline;
  
  /**
   * 指定されたIDのバージョンを更新し、更新後データを返却するサービス.
   * @param onlineServiceParam オンラインサービス呼出用Param
   * @return オンラインサービスの処理結果
   */
  public OnlineServiceResult online(OnlineServiceParam onlineServiceParam) {
    OnlineServiceResult result = new OnlineServiceResult();
    Integer tmpId = null;
    
    // 引数のIDが数値でなければ、処理を終了
    try {
      tmpId = Integer.parseInt(onlineServiceParam.getId());
    }catch(Exception ex) {
      return result;
    }
    
    // 指定されたIDのバージョンを更新
    int updCnt = userDataMapperOnline.updateVersion(tmpId);
    LOGGER.info("更新ID : " + tmpId + ", 更新件数 : " + updCnt);
    
    // 指定されたIDのUserDataオブジェクトを返却
    UserData userData = userDataMapperOnline.findById(tmpId);
    if(userData == null) {
      userData = new UserData();
    }
    result.setUserData(userData.toString());
    return result;
  }
}
package com.example.model;

import lombok.Data;

@Data
public class OnlineServiceParam {

  /** ID */
  private String id;
  
}
package com.example.model;

import lombok.Data;

@Data
public class OnlineServiceResult {

  /** ユーザ情報 */
  private String userData;
  
}

さらに、Functionのメインクラスに、関数onlineを追加している。

package com.example;

import java.util.function.Function;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

import com.example.model.OnlineServiceParam;
import com.example.model.OnlineServiceResult;
import com.example.model.TimerTriggerParam;
import com.example.model.TimerTriggerResult;
import com.example.service.OnlineService;
import com.example.service.TimerTriggerService;

@SpringBootApplication
public class DemoAzureFunction {

  /** タイマートリガーのテストを行うサービスクラスのオブジェクト */
  @Autowired
  private TimerTriggerService timerTriggerService;
  
  /** オンライン処理を行うサービスクラスのオブジェクト */
  @Autowired
  private OnlineService onlineService;

  public static void main(String[] args) throws Exception {
    SpringApplication.run(DemoAzureFunction.class, args);
  }

  /**
   * タイマートリガーのテストを行い結果を返却する関数
   * @return タイマートリガーのテストを行うサービスクラスの呼出結果
   */
  @Bean
  public Function<TimerTriggerParam, TimerTriggerResult> timerTriggerTest() {
    return timerTriggerParam -> timerTriggerService.timerTriggerTest(timerTriggerParam);
  }

  /**
   * オンライン処理を行い結果を返却する関数
   * @return オンライン処理を行うサービスクラスの呼出結果
   */
  @Bean
  public Function<OnlineServiceParam, OnlineServiceResult> online(){
    return onlineServiceParam -> onlineService.online(onlineServiceParam);
  }
}

その他のソースコード内容は、以下のサイトを参照のこと。
https://github.com/purin-it/azure/tree/master/azure-functions-db-batch-online/demoAzureFunc



「Envader」はLinuxコマンドやDatabase SQL等のスキルを、環境構築不要で習得できる学習サイトだった「Envader」は、ITエンジニアとしてよく使うLinuxコマンドやDatabase SQL等のスキルを、解説を読んだ上で、問題を解き...

サンプルプログラムの実行結果

サンプルプログラムの実行結果は、以下の通り。

1) 以下のサイトの「サンプルプログラムの実行結果(ローカル)」「サンプルプログラムの実行結果(Azure上)」に記載の手順で、サンプルプログラムをAzure Functionsにデプロイする。

Azure Functions上でTimerTriggerによって動作するJavaアプリケーション(Spring Boot上)を作成してみたこれまでは、HTTPリクエストによりAzure Functionsが動作するアプリケーションのみ作成してきたが、Timer Trigge...

2) バッチ処理の実行結果は、以下の記事の「サンプルプログラムの実行結果」を参照のこと。

Azure Function上でCSVファイルのデータをDBに取り込むプログラムで楽観ロックを使ってみたこれまでこのブログで、Spring BatchのChunkモデルを用いて、Blob上のCSVファイルをDBのテーブルに書き込む処理を作成...

3) OnlineHandlerクラスのAPIを呼び出す前のデータベースの状態は、以下の通り。

SELECT * FROM dbo.USER_DATA ORDER BY ID ASC
サンプルプログラムの実行結果_3

4) 「https://azurefuncdemoapp.azurewebsites.net/api/online?id=5」とアクセスし、OnlineHandlerクラスのAPIを呼び出すと、以下の戻り値が返却される。
サンプルプログラムの実行結果_4_1

なお、「https://azurefuncdemoapp.azurewebsites.net」の部分は、Azure Portalで確認した、Azure Functionsの以下の赤枠のURLから確認できる。
サンプルプログラムの実行結果_4_2

5) OnlineHandlerクラスのAPIを呼び出した後のデータベースの状態は以下の通りで、id=5のVERSIONが1増加していることが確認できる。

SELECT * FROM dbo.USER_DATA ORDER BY ID ASC
サンプルプログラムの実行結果_5

6) OnlineHandlerクラスのAPIを呼び出した際の、ログの出力結果は以下の通りで、ID=5を更新した際の更新件数が1であることが確認できる。
サンプルプログラムの実行結果_6

要点まとめ

  • Azure Function上で楽観ロックを実装する際、DB接続する際のSqlSessionを生成する際、Spring BatchのChunkモデルを用いたバッチ処理ではバッチモード(ExecutorType.BATCH)を利用し、それ以外のオンラインからの呼出ではExecutorType.SIMPLEを利用する必要がある。その使い分けをするには、DB接続設定を分ければよい。