Azure Function上で楽観ロックを実装する場合、DB接続する際のSqlSessionを生成する際に、Spring BatchのChunkモデルを用いたバッチ処理ではバッチモード(ExecutorType.BATCH)を利用し、それ以外のオンラインからの呼出ではExecutorType.SIMPLEを利用する必要がある。
今回は、バッチモードとそれ以外で、DB接続設定を使い分けてみたので、そのサンプルプログラムを共有する。
前提条件
下記記事のサンプルプログラムを作成済であること。
サンプルプログラムの作成
作成したサンプルプログラムの構成は、以下の通り。
なお、上記の赤枠は、今回追加・変更したプログラムである。
バッチ処理のDB接続設定の内容は以下の通りで、SQLセッションファクトリを生成する際に、バッチモードに設定する処理を追加している。
package com.example.config; import org.apache.ibatis.mapping.Environment; import org.springframework.context.annotation.Configuration; import org.apache.ibatis.session.ExecutorType; import org.apache.ibatis.session.SqlSessionFactory; import org.apache.ibatis.transaction.jdbc.JdbcTransactionFactory; import org.mybatis.spring.SqlSessionFactoryBean; import org.mybatis.spring.annotation.MapperScan; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Primary; import org.springframework.jdbc.datasource.DataSourceTransactionManager; import org.springframework.transaction.PlatformTransactionManager; import javax.sql.DataSource; @Configuration @MapperScan(basePackages = {"com.example.mybatis.batch"} , sqlSessionFactoryRef = "sqlSessionFactoryBatch") public class DemoBatchDataSourceConfig { /** * バッチで利用するためのデータソースプロパティを生成する * @return バッチで利用するためのデータソースプロパティ */ @Bean(name = {"datasourceBatchProperties"}) @Primary @ConfigurationProperties(prefix = "spring.datasource") public DataSourceProperties datasourceBatchProperties() { return new DataSourceProperties(); } /** * バッチで利用するためのデータソースを生成する * @param properties バッチで利用するためのデータソースプロパティ * @return バッチで利用するためのデータソース */ @Bean(name = {"dataSourceBatch"}) @Primary public DataSource datasourceBatch( @Qualifier("datasourceBatchProperties") DataSourceProperties properties) { return properties.initializeDataSourceBuilder().build(); } /** * バッチで利用するためのトランザクションマネージャを生成する * @param dataSourceBatch バッチで利用するためのデータソース * @return バッチで利用するためのトランザクションマネージャ */ @Bean(name = {"txManagerBatch"}) @Primary public PlatformTransactionManager txManagerBatch( @Qualifier("dataSourceBatch") DataSource dataSourceBatch) { return new DataSourceTransactionManager(dataSourceBatch); } /** * バッチで利用するためのSQLセッションファクトリを生成する * @param dataSourceBatch バッチで利用するためのデータソース * @return バッチで利用するためのSQLセッションファクトリ * @throws Exception 任意例外 */ @Bean(name = {"sqlSessionFactoryBatch"}) @Primary public SqlSessionFactory sqlSessionFactory( @Qualifier("dataSourceBatch") DataSource dataSourceBatch) throws Exception { SqlSessionFactoryBean sqlSessionFactory = new SqlSessionFactoryBean(); sqlSessionFactory.setDataSource(dataSourceBatch); // MyBatisでバッチモードで処理できるよう設定を変更 Environment env = new Environment("development" , new JdbcTransactionFactory(), dataSourceBatch); org.apache.ibatis.session.Configuration config = new org.apache.ibatis.session.Configuration(env); config.setDefaultExecutorType(ExecutorType.BATCH); sqlSessionFactory.setConfiguration(config); return sqlSessionFactory.getObject(); } }
また、オンライン処理のDB接続設定の内容は以下の通りで、こちらはバッチモードの設定を含めていない。
package com.example.config; import org.springframework.context.annotation.Configuration; import org.apache.ibatis.session.SqlSessionFactory; import org.mybatis.spring.SqlSessionFactoryBean; import org.mybatis.spring.annotation.MapperScan; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.jdbc.datasource.DataSourceTransactionManager; import org.springframework.transaction.PlatformTransactionManager; import javax.sql.DataSource; @Configuration @MapperScan(basePackages = {"com.example.mybatis.online"} , sqlSessionFactoryRef = "sqlSessionFactoryOnline") public class DemoOnlineDataSourceConfig { /** * オンラインで利用するためのデータソースプロパティを生成する * @return オンラインで利用するためのデータソースプロパティ */ @Bean(name = {"datasourceOnlineProperties"}) @ConfigurationProperties(prefix = "spring.datasource.online") public DataSourceProperties datasourceOnlineProperties() { return new DataSourceProperties(); } /** * オンラインで利用するためのデータソースを生成する * @param properties オンラインで利用するためのデータソースプロパティ * @return オンラインで利用するためのデータソース */ @Bean(name = {"dataSourceOnline"}) public DataSource datasourceOnline( @Qualifier("datasourceOnlineProperties") DataSourceProperties properties) { return properties.initializeDataSourceBuilder().build(); } /** * オンラインで利用するためのトランザクションマネージャを生成する * @param dataSourceOnline オンラインで利用するためのデータソース * @return オンラインで利用するためのトランザクションマネージャ */ @Bean(name = {"txManagerOnline"}) public PlatformTransactionManager txManagerOnline( @Qualifier("dataSourceOnline") DataSource dataSourceOnline) { return new DataSourceTransactionManager(dataSourceOnline); } /** * オンラインで利用するためのSQLセッションファクトリを生成する * @param dataSourceOnline オンラインで利用するためのデータソース * @return オンラインで利用するためのSQLセッションファクトリ * @throws Exception 任意例外 */ @Bean(name = {"sqlSessionFactoryOnline"}) public SqlSessionFactory sqlSessionFactory( @Qualifier("dataSourceOnline") DataSource dataSourceOnline) throws Exception { SqlSessionFactoryBean sqlSessionFactory = new SqlSessionFactoryBean(); sqlSessionFactory.setDataSource(dataSourceOnline); return sqlSessionFactory.getObject(); } }
さらに、application.propertiesの設定は以下の通りで、オンラインのDB接続設定を追加し、バッチモードの設定をコメントアウトしている。
# Azure Storageの接続先 azure.storage.accountName=azureblobpurinit azure.storage.accessKey=(Azure Blob Storageのアクセスキー) azure.storage.blob-endpoint=https://azureblobpurinit.blob.core.windows.net azure.storage.containerName=blobcontainer # DB接続設定(バッチ) spring.datasource.url=jdbc:sqlserver://azure-db-purinit.database.windows.net:1433;database=azureSqlDatabase spring.datasource.username=purinit@azure-db-purinit spring.datasource.password=(DBのパスワード) spring.datasource.driverClassName=com.microsoft.sqlserver.jdbc.SQLServerDriver # DB接続設定(オンライン) spring.datasource.online.url=jdbc:sqlserver://azure-db-purinit.database.windows.net:1433;database=azureSqlDatabase spring.datasource.online.username=purinit@azure-db-purinit spring.datasource.online.password=(DBのパスワード) spring.datasource.online.driverClassName=com.microsoft.sqlserver.jdbc.SQLServerDriver # MyBatisでバッチモードで処理できるよう設定を変更 #mybatis.executor-type=BATCH #mybatis.configuration.default-executor-type=BATCH
また、pom.xmlの内容は以下の通りで、@ConfigurationPropertiesアノテーションを利用するための設定を追加している。
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.example</groupId> <artifactId>demoAzureFunc</artifactId> <version>0.0.1-SNAPSHOT</version> <name>Hello Spring Function on Azure</name> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.4.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <azure.functions.maven.plugin.version>1.9.0</azure.functions.maven.plugin.version> <!-- customize those properties. The functionAppName should be unique across Azure --> <functionResourceGroup>azureAppDemo</functionResourceGroup> <functionAppName>azureFuncDemoApp</functionAppName> <functionAppServicePlan>ASP-azureAppDemo-8679</functionAppServicePlan> <functionPricingTier>B1</functionPricingTier> <functionAppRegion>japaneast</functionAppRegion> <stagingDirectory>${project.build.directory}/azure-functions/${functionAppName}</stagingDirectory> <start-class>com.example.DemoAzureFunction</start-class> <spring.boot.wrapper.version>1.0.25.RELEASE</spring.boot.wrapper.version> </properties> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-function-adapter-azure</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-function-web</artifactId> <scope>provided</scope> </dependency> <!-- lombokを利用するための設定 --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <scope>provided</scope> </dependency> <!-- Spring Batchを利用するための設定 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> </dependency> <!-- Azure Storageの設定 --> <dependency> <groupId>com.microsoft.azure</groupId> <artifactId>azure-storage</artifactId> <version>8.3.0</version> </dependency> <!-- Azure StorageでSASトークンを利用するための設定 --> <dependency> <groupId>com.azure</groupId> <artifactId>azure-storage-blob</artifactId> <version>12.10.0</version> </dependency> <!-- SQL Serverを利用するための設定 --> <dependency> <groupId>com.microsoft.sqlserver</groupId> <artifactId>mssql-jdbc</artifactId> </dependency> <!-- MyBatisを利用するための設定 --> <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>2.1.1</version> </dependency> <!-- @ConfigurationPropertiesアノテーションを利用するための設定 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency> <!-- Test --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-function-dependencies</artifactId> <version>3.1.2</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <pluginManagement> <plugins> <plugin> <groupId>com.microsoft.azure</groupId> <artifactId>azure-functions-maven-plugin</artifactId> <version>${azure.functions.maven.plugin.version}</version> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-resources-plugin</artifactId> <version>3.1.0</version> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-dependency-plugin</artifactId> <version>3.1.2</version> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-clean-plugin</artifactId> <version>3.1.0</version> </plugin> </plugins> </pluginManagement> <plugins> <plugin> <groupId>com.microsoft.azure</groupId> <artifactId>azure-functions-maven-plugin</artifactId> <configuration> <resourceGroup>${functionResourceGroup}</resourceGroup> <appName>${functionAppName}</appName> <appServicePlanName>${functionAppServicePlan}</appServicePlanName> <region>${functionAppRegion}</region> <pricingTier>${functionPricingTier}</pricingTier> <runtime> <os>Linux</os> <javaVersion>8</javaVersion> </runtime> <appSettings> <!-- Run Azure Function from package file by default --> <property> <name>WEBSITE_RUN_FROM_PACKAGE</name> <value>1</value> </property> <property> <name>FUNCTIONS_EXTENSION_VERSION</name> <value>~3</value> </property> <property> <name>FUNCTIONS_WORKER_RUNTIME</name> <value>java</value> </property> </appSettings> </configuration> <executions> <execution> <id>package-functions</id> <goals> <goal>package</goal> </goals> </execution> </executions> </plugin> <plugin> <artifactId>maven-resources-plugin</artifactId> <executions> <execution> <id>copy-resources</id> <phase>package</phase> <goals> <goal>copy-resources</goal> </goals> <configuration> <overwrite>true</overwrite> <outputDirectory> ${project.build.directory}/azure-functions/${functionAppName} </outputDirectory> <resources> <resource> <directory>${project.basedir}/src/main/azure </directory> <includes> <include>**</include> </includes> </resource> </resources> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-dependency-plugin</artifactId> <executions> <execution> <id>copy-dependencies</id> <phase>prepare-package</phase> <goals> <goal>copy-dependencies</goal> </goals> <configuration> <outputDirectory>${stagingDirectory}/lib</outputDirectory> <overWriteReleases>false</overWriteReleases> <overWriteSnapshots>false</overWriteSnapshots> <overWriteIfNewer>true</overWriteIfNewer> <includeScope>runtime</includeScope> </configuration> </execution> </executions> </plugin> <!--Remove obj folder generated by .NET SDK in maven clean--> <plugin> <artifactId>maven-clean-plugin</artifactId> <configuration> <filesets> <fileset> <directory>obj</directory> </fileset> </filesets> </configuration> </plugin> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <dependencies> <dependency> <groupId>org.springframework.boot.experimental</groupId> <artifactId>spring-boot-thin-layout</artifactId> <version>${spring.boot.wrapper.version}</version> </dependency> </dependencies> </plugin> </plugins> </build> <repositories> <repository> <id>spring-snapshots</id> <name>Spring Snapshots</name> <url>https://repo.spring.io/plugins-snapshot</url> <snapshots> <enabled>true</enabled> </snapshots> <releases> <enabled>false</enabled> </releases> </repository> <repository> <id>spring-milestones</id> <name>Spring Milestones</name> <url>https://repo.spring.io/plugins-milestone</url> <snapshots> <enabled>false</enabled> </snapshots> </repository> </repositories> <pluginRepositories> <pluginRepository> <id>spring-snapshots</id> <name>Spring Snapshots</name> <url>https://repo.spring.io/plugins-snapshot</url> <snapshots> <enabled>true</enabled> </snapshots> <releases> <enabled>false</enabled> </releases> </pluginRepository> <pluginRepository> <id>spring-milestones</id> <name>Spring Milestones</name> <url>https://repo.spring.io/plugins-milestone</url> <snapshots> <enabled>false</enabled> </snapshots> </pluginRepository> </pluginRepositories> </project>
さらに、バッチ処理のMapperインタフェース・XMLファイルの内容は以下の通りで、いずれも「com.example.mybatis.batch」フォルダ下に設定している。
package com.example.mybatis.batch; import org.apache.ibatis.annotations.Mapper; import com.example.mybatis.model.UserData; @Mapper public interface UserDataMapperBatch { /** * DBにUserDataオブジェクトがあれば更新し、なければ追加する. * @param userData UserDataオブジェクト */ void upsert(UserData userData); /** * 引数のIDをキーにもつUserDataオブジェクトを行ロックをかけ取得する. * @param id ID * @return UserDataオブジェクト */ UserData findByIdRowLock(Integer id); /** * 引数のUserDataオブジェクトのバージョンを更新する. * @param userData UserDataオブジェクト */ void updateVersion(UserData userData); }
<?xml version="1.0" encoding="UTF-8" ?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> <mapper namespace="com.example.mybatis.batch.UserDataMapperBatch"> <update id="upsert" parameterType="com.example.mybatis.model.UserData"> MERGE INTO USER_DATA AS u USING ( SELECT #{id} id, #{version} version ) s ON ( u.id = s.id ) WHEN MATCHED AND u.version = s.version THEN UPDATE SET name = #{name}, birth_year = #{birth_year} , birth_month = #{birth_month}, birth_day = #{birth_day} , sex = #{sex}, memo = #{memo}, version = #{version} + 1 WHEN NOT MATCHED THEN INSERT ( id, name, birth_year, birth_month, birth_day , sex, memo, version ) VALUES (#{id}, #{name}, #{birth_year}, #{birth_month}, #{birth_day} , #{sex}, #{memo}, #{version}) ; </update> <select id="findByIdRowLock" parameterType="java.lang.Integer" resultType="com.example.mybatis.model.UserData"> SELECT id, name, birth_year, birth_month, birth_day, sex, memo, version FROM USER_DATA WITH(ROWLOCK, UPDLOCK, NOWAIT) <!-- Azure SQL Databaseで行ロックをかける --> WHERE id = #{id} </select> <update id="updateVersion" parameterType="com.example.mybatis.model.UserData"> UPDATE USER_DATA SET version = #{version} + 2 WHERE id = #{id} </update> </mapper>
また、オンライン処理のMapperインタフェース・XMLファイルの内容は以下の通りで、いずれも「com.example.mybatis.online」フォルダ下に設定している。
package com.example.mybatis.online; import org.apache.ibatis.annotations.Mapper; import com.example.mybatis.model.UserData; @Mapper public interface UserDataMapperOnline { /** * 引数のIDをキーにもつUserDataオブジェクトのバージョン番号を更新する. * @param id ID * @return 更新件数 */ int updateVersion(Integer id); /** * 引数のIDをキーにもつUserDataオブジェクトを取得する. * @param id ID * @return UserDataオブジェクト */ UserData findById(Integer id); }
<?xml version="1.0" encoding="UTF-8" ?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> <mapper namespace="com.example.mybatis.online.UserDataMapperOnline"> <update id="updateVersion" parameterType="java.lang.Integer"> UPDATE USER_DATA SET version = version + 1 WHERE id = #{id} </update> <select id="findById" parameterType="java.lang.Integer" resultType="com.example.mybatis.model.UserData"> SELECT id, name, birth_year, birth_month, birth_day, sex, memo, version FROM USER_DATA WHERE id = #{id} </select> </mapper>
さらに、バッチ処理のMapperインタフェースを呼び出す箇所を、それぞれ変更後のパスに変更している。
package com.example.batch; import java.net.MalformedURLException; import org.apache.ibatis.session.SqlSessionFactory; import org.mybatis.spring.batch.MyBatisBatchItemWriter; import org.mybatis.spring.batch.builder.MyBatisBatchItemWriterBuilder; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.launch.support.RunIdIncrementer; import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.file.FlatFileItemReader; import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper; import org.springframework.batch.item.file.mapping.DefaultLineMapper; import org.springframework.batch.item.file.transform.DelimitedLineTokenizer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.UrlResource; import com.example.mybatis.model.UserData; import com.example.service.DemoBlobService; import lombok.RequiredArgsConstructor; @Configuration @EnableBatchProcessing @RequiredArgsConstructor public class DemoChunkConfig { /** ジョブ生成ファクトリ */ public final JobBuilderFactory jobBuilderFactory; /** ステップ生成ファクトリ */ public final StepBuilderFactory stepBuilderFactory; /** SQLセッションファクトリ */ public final SqlSessionFactory sqlSessionFactoryBatch; /** データ加工処理 */ private final DemoUpdateProcessor demoUpdateProcessor; /** データ加工前後処理 */ private final DemoProcessorListener demoProcessorListener; /** データ書き込み前後処理 */ private final DemoUpdateListener demoUpdateListener; /** ステップの前後処理 */ private final DemoStepListener demoStepListener; /** BLOBへアクセスするサービス */ @Autowired private DemoBlobService demoBlobService; /** * BlobStorageからファイルを読み込む. * @return 読み込みオブジェクト */ @Bean public FlatFileItemReader<UserData> reader() { FlatFileItemReader<UserData> reader = new FlatFileItemReader<UserData>(); try { // BlobStorageからファイル(user_data.csv)を読み込む際のURLをreaderに設定 String url = demoBlobService.getBlobUrl("user_data.csv"); reader.setResource(new UrlResource(url)); } catch (MalformedURLException ex) { throw new RuntimeException(ex); } // ファイルを読み込む際の文字コード reader.setEncoding("UTF-8"); // 1行目を読み飛ばす reader.setLinesToSkip(1); // ファイルから読み込んだデータをUserDataオブジェクトに格納 reader.setLineMapper(new DefaultLineMapper<UserData>() { { setLineTokenizer(new DelimitedLineTokenizer() { { setNames(new String[] { "id", "name", "birth_year" , "birth_month", "birth_day", "sex", "memo" }); } }); setFieldSetMapper(new BeanWrapperFieldSetMapper<UserData>() { { setTargetType(UserData.class); } }); } }); return reader; } /** * 読み込んだファイルのデータを、DBに書き込む. * @return 書き込みオブジェクト */ @Bean public MyBatisBatchItemWriter<UserData> writer() { return new MyBatisBatchItemWriterBuilder<UserData>() .sqlSessionFactory(sqlSessionFactoryBatch) .statementId("com.example.mybatis.batch.UserDataMapperBatch.upsert") .assertUpdates(true) // 楽観ロックエラーを有効にする .build(); } /** * Spring Batchのジョブ内で指定する処理単位(ステップ)を定義する. * @param reader 読み込みオブジェクト * @param writer 書き込みオブジェクト * @return Spring Batchのジョブ内で指定する処理単位 */ @Bean public Step step(ItemReader<UserData> reader, ItemWriter<UserData> writer) { // 生成するステップ内で読み込み/加工/加工後/書き込み/書き込み後/ステップ実行前後の処理の // 一連の流れを指定する // その際のチャンクサイズ(=何件読み込む毎にコミットするか)を3に指定している return stepBuilderFactory.get("step") .<UserData, UserData>chunk(3) .reader(reader) .processor(demoUpdateProcessor) .listener(demoProcessorListener) .writer(writer) .listener(demoUpdateListener) .listener(demoStepListener) .build(); } /** * Spring Batchのジョブを定義する. * @param jobListener 実行前後の処理(リスナ) * @param step 実行するステップ * @return Spring Batchのジョブ */ @Bean public Job updateUserData(DemoJobListener jobListener, Step step) { // 生成するジョブ内で、実行前後の処理(リスナ)と処理単位(ステップ)を指定する return jobBuilderFactory .get("updateUserData") .incrementer(new RunIdIncrementer()) .listener(jobListener) .flow(step) .end() .build(); } }
package com.example.batch; import org.springframework.batch.core.ItemProcessListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; import com.example.mybatis.batch.UserDataMapperBatch; import com.example.mybatis.model.UserData; @Component public class DemoProcessorListener implements ItemProcessListener<UserData, UserData> { @Autowired private UserDataMapperBatch userDataMapperBatch; /** * データ加工前の処理を定義する. */ @Override public void beforeProcess(UserData item) { // 何もしない } /** * データ加工後の処理を定義する. */ @Override @Transactional public void afterProcess(UserData item, UserData result) { // 処理結果がnullの場合は、処理を終了する if(result == null) { return; } // 既に登録済データのVERSIONを取得する Integer id = Integer.parseInt(result.getId()); UserData ud = userDataMapperBatch.findByIdRowLock(id); // バージョンの値を結果に設定する if(ud != null) { result.setVersion(ud.getVersion()); } else { result.setVersion(0); } // id=8の場合、バージョンを更新し、楽観ロックエラーにする /* if(id == 8) { userDataMapperBatch.updateVersion(result); } */ } /** * データ編集エラー後の処理を定義する. */ @Override public void onProcessError(UserData item, Exception e) { // 何もしない } }
また、オンライン処理用のHandler、Service、Param、Resultをそれぞれ追加している。
package com.example; import java.util.Optional; import org.springframework.cloud.function.adapter.azure.FunctionInvoker; import com.example.model.OnlineServiceParam; import com.example.model.OnlineServiceResult; import com.microsoft.azure.functions.ExecutionContext; import com.microsoft.azure.functions.HttpMethod; import com.microsoft.azure.functions.HttpRequestMessage; import com.microsoft.azure.functions.HttpResponseMessage; import com.microsoft.azure.functions.HttpStatus; import com.microsoft.azure.functions.annotation.AuthorizationLevel; import com.microsoft.azure.functions.annotation.FunctionName; import com.microsoft.azure.functions.annotation.HttpTrigger; public class OnlineHandler extends FunctionInvoker<OnlineServiceParam, OnlineServiceResult> { /** * HTTP要求に応じて、HelloFunctionクラスのonlineメソッドを呼び出し、その戻り値をボディに設定したレスポンスを返す * @param request リクエストオブジェクト * @param context コンテキストオブジェクト * @return レスポンスオブジェクト */ @FunctionName("online") public HttpResponseMessage execute(@HttpTrigger(name = "request", methods = HttpMethod.GET , authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request , ExecutionContext context) { // リクエストパラメータからidの値を取得 String paramId = request.getQueryParameters().get("id"); // オンラインサービス呼出用Paramを生成 OnlineServiceParam onlineServiceParam = new OnlineServiceParam(); onlineServiceParam.setId(paramId); // handleRequestメソッド内でHelloFunctionクラスのonlineメソッドを呼び出し、 // その戻り値をボディに設定したレスポンスを、JSON形式で返す return request.createResponseBuilder(HttpStatus.OK) .body(handleRequest(onlineServiceParam, context)) .header("Content-Type", "text/json").build(); } }
package com.example.service; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import com.example.model.OnlineServiceParam; import com.example.model.OnlineServiceResult; import com.example.mybatis.model.UserData; import com.example.mybatis.online.UserDataMapperOnline; @Service public class OnlineService { /* Spring Bootでログ出力するためのLogbackのクラスを生成 */ private static final Logger LOGGER = LoggerFactory.getLogger(OnlineService.class); @Autowired private UserDataMapperOnline userDataMapperOnline; /** * 指定されたIDのバージョンを更新し、更新後データを返却するサービス. * @param onlineServiceParam オンラインサービス呼出用Param * @return オンラインサービスの処理結果 */ public OnlineServiceResult online(OnlineServiceParam onlineServiceParam) { OnlineServiceResult result = new OnlineServiceResult(); Integer tmpId = null; // 引数のIDが数値でなければ、処理を終了 try { tmpId = Integer.parseInt(onlineServiceParam.getId()); }catch(Exception ex) { return result; } // 指定されたIDのバージョンを更新 int updCnt = userDataMapperOnline.updateVersion(tmpId); LOGGER.info("更新ID : " + tmpId + ", 更新件数 : " + updCnt); // 指定されたIDのUserDataオブジェクトを返却 UserData userData = userDataMapperOnline.findById(tmpId); if(userData == null) { userData = new UserData(); } result.setUserData(userData.toString()); return result; } }
package com.example.model; import lombok.Data; @Data public class OnlineServiceParam { /** ID */ private String id; }
package com.example.model; import lombok.Data; @Data public class OnlineServiceResult { /** ユーザ情報 */ private String userData; }
さらに、Functionのメインクラスに、関数onlineを追加している。
package com.example; import java.util.function.Function; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import com.example.model.OnlineServiceParam; import com.example.model.OnlineServiceResult; import com.example.model.TimerTriggerParam; import com.example.model.TimerTriggerResult; import com.example.service.OnlineService; import com.example.service.TimerTriggerService; @SpringBootApplication public class DemoAzureFunction { /** タイマートリガーのテストを行うサービスクラスのオブジェクト */ @Autowired private TimerTriggerService timerTriggerService; /** オンライン処理を行うサービスクラスのオブジェクト */ @Autowired private OnlineService onlineService; public static void main(String[] args) throws Exception { SpringApplication.run(DemoAzureFunction.class, args); } /** * タイマートリガーのテストを行い結果を返却する関数 * @return タイマートリガーのテストを行うサービスクラスの呼出結果 */ @Bean public Function<TimerTriggerParam, TimerTriggerResult> timerTriggerTest() { return timerTriggerParam -> timerTriggerService.timerTriggerTest(timerTriggerParam); } /** * オンライン処理を行い結果を返却する関数 * @return オンライン処理を行うサービスクラスの呼出結果 */ @Bean public Function<OnlineServiceParam, OnlineServiceResult> online(){ return onlineServiceParam -> onlineService.online(onlineServiceParam); } }
その他のソースコード内容は、以下のサイトを参照のこと。
https://github.com/purin-it/azure/tree/master/azure-functions-db-batch-online/demoAzureFunc
サンプルプログラムの実行結果
サンプルプログラムの実行結果は、以下の通り。
1) 以下のサイトの「サンプルプログラムの実行結果(ローカル)」「サンプルプログラムの実行結果(Azure上)」に記載の手順で、サンプルプログラムをAzure Functionsにデプロイする。
2) バッチ処理の実行結果は、以下の記事の「サンプルプログラムの実行結果」を参照のこと。
3) OnlineHandlerクラスのAPIを呼び出す前のデータベースの状態は、以下の通り。
SELECT * FROM dbo.USER_DATA ORDER BY ID ASC
4) 「https://azurefuncdemoapp.azurewebsites.net/api/online?id=5」とアクセスし、OnlineHandlerクラスのAPIを呼び出すと、以下の戻り値が返却される。
なお、「https://azurefuncdemoapp.azurewebsites.net」の部分は、Azure Portalで確認した、Azure Functionsの以下の赤枠のURLから確認できる。
5) OnlineHandlerクラスのAPIを呼び出した後のデータベースの状態は以下の通りで、id=5のVERSIONが1増加していることが確認できる。
SELECT * FROM dbo.USER_DATA ORDER BY ID ASC
6) OnlineHandlerクラスのAPIを呼び出した際の、ログの出力結果は以下の通りで、ID=5を更新した際の更新件数が1であることが確認できる。
要点まとめ
- Azure Function上で楽観ロックを実装する際、DB接続する際のSqlSessionを生成する際、Spring BatchのChunkモデルを用いたバッチ処理ではバッチモード(ExecutorType.BATCH)を利用し、それ以外のオンラインからの呼出ではExecutorType.SIMPLEを利用する必要がある。その使い分けをするには、DB接続設定を分ければよい。