Azure Function上で楽観ロックを実装する場合、DB接続する際のSqlSessionを生成する際に、Spring BatchのChunkモデルを用いたバッチ処理ではバッチモード(ExecutorType.BATCH)を利用し、それ以外のオンラインからの呼出ではExecutorType.SIMPLEを利用する必要がある。
今回は、バッチモードとそれ以外で、DB接続設定を使い分けてみたので、そのサンプルプログラムを共有する。
前提条件
下記記事のサンプルプログラムを作成済であること。
サンプルプログラムの作成
作成したサンプルプログラムの構成は、以下の通り。
なお、上記の赤枠は、今回追加・変更したプログラムである。
バッチ処理のDB接続設定の内容は以下の通りで、SQLセッションファクトリを生成する際に、バッチモードに設定する処理を追加している。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 | package com.example.config; import org.apache.ibatis.mapping.Environment; import org.springframework.context.annotation.Configuration; import org.apache.ibatis.session.ExecutorType; import org.apache.ibatis.session.SqlSessionFactory; import org.apache.ibatis.transaction.jdbc.JdbcTransactionFactory; import org.mybatis.spring.SqlSessionFactoryBean; import org.mybatis.spring.annotation.MapperScan; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Primary; import org.springframework.jdbc.datasource.DataSourceTransactionManager; import org.springframework.transaction.PlatformTransactionManager; import javax.sql.DataSource; @Configuration @MapperScan(basePackages = {"com.example.mybatis.batch"} , sqlSessionFactoryRef = "sqlSessionFactoryBatch") public class DemoBatchDataSourceConfig { /** * バッチで利用するためのデータソースプロパティを生成する * @return バッチで利用するためのデータソースプロパティ */ @Bean(name = {"datasourceBatchProperties"}) @Primary @ConfigurationProperties(prefix = "spring.datasource") public DataSourceProperties datasourceBatchProperties() { return new DataSourceProperties(); } /** * バッチで利用するためのデータソースを生成する * @param properties バッチで利用するためのデータソースプロパティ * @return バッチで利用するためのデータソース */ @Bean(name = {"dataSourceBatch"}) @Primary public DataSource datasourceBatch( @Qualifier("datasourceBatchProperties") DataSourceProperties properties) { return properties.initializeDataSourceBuilder().build(); } /** * バッチで利用するためのトランザクションマネージャを生成する * @param dataSourceBatch バッチで利用するためのデータソース * @return バッチで利用するためのトランザクションマネージャ */ @Bean(name = {"txManagerBatch"}) @Primary public PlatformTransactionManager txManagerBatch( @Qualifier("dataSourceBatch") DataSource dataSourceBatch) { return new DataSourceTransactionManager(dataSourceBatch); } /** * バッチで利用するためのSQLセッションファクトリを生成する * @param dataSourceBatch バッチで利用するためのデータソース * @return バッチで利用するためのSQLセッションファクトリ * @throws Exception 任意例外 */ @Bean(name = {"sqlSessionFactoryBatch"}) @Primary public SqlSessionFactory sqlSessionFactory( @Qualifier("dataSourceBatch") DataSource dataSourceBatch) throws Exception { SqlSessionFactoryBean sqlSessionFactory = new SqlSessionFactoryBean(); sqlSessionFactory.setDataSource(dataSourceBatch); // MyBatisでバッチモードで処理できるよう設定を変更 Environment env = new Environment("development" , new JdbcTransactionFactory(), dataSourceBatch); org.apache.ibatis.session.Configuration config = new org.apache.ibatis.session.Configuration(env); config.setDefaultExecutorType(ExecutorType.BATCH); sqlSessionFactory.setConfiguration(config); return sqlSessionFactory.getObject(); } } |
また、オンライン処理のDB接続設定の内容は以下の通りで、こちらはバッチモードの設定を含めていない。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 | package com.example.config; import org.springframework.context.annotation.Configuration; import org.apache.ibatis.session.SqlSessionFactory; import org.mybatis.spring.SqlSessionFactoryBean; import org.mybatis.spring.annotation.MapperScan; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.jdbc.datasource.DataSourceTransactionManager; import org.springframework.transaction.PlatformTransactionManager; import javax.sql.DataSource; @Configuration @MapperScan(basePackages = {"com.example.mybatis.online"} , sqlSessionFactoryRef = "sqlSessionFactoryOnline") public class DemoOnlineDataSourceConfig { /** * オンラインで利用するためのデータソースプロパティを生成する * @return オンラインで利用するためのデータソースプロパティ */ @Bean(name = {"datasourceOnlineProperties"}) @ConfigurationProperties(prefix = "spring.datasource.online") public DataSourceProperties datasourceOnlineProperties() { return new DataSourceProperties(); } /** * オンラインで利用するためのデータソースを生成する * @param properties オンラインで利用するためのデータソースプロパティ * @return オンラインで利用するためのデータソース */ @Bean(name = {"dataSourceOnline"}) public DataSource datasourceOnline( @Qualifier("datasourceOnlineProperties") DataSourceProperties properties) { return properties.initializeDataSourceBuilder().build(); } /** * オンラインで利用するためのトランザクションマネージャを生成する * @param dataSourceOnline オンラインで利用するためのデータソース * @return オンラインで利用するためのトランザクションマネージャ */ @Bean(name = {"txManagerOnline"}) public PlatformTransactionManager txManagerOnline( @Qualifier("dataSourceOnline") DataSource dataSourceOnline) { return new DataSourceTransactionManager(dataSourceOnline); } /** * オンラインで利用するためのSQLセッションファクトリを生成する * @param dataSourceOnline オンラインで利用するためのデータソース * @return オンラインで利用するためのSQLセッションファクトリ * @throws Exception 任意例外 */ @Bean(name = {"sqlSessionFactoryOnline"}) public SqlSessionFactory sqlSessionFactory( @Qualifier("dataSourceOnline") DataSource dataSourceOnline) throws Exception { SqlSessionFactoryBean sqlSessionFactory = new SqlSessionFactoryBean(); sqlSessionFactory.setDataSource(dataSourceOnline); return sqlSessionFactory.getObject(); } } |
さらに、application.propertiesの設定は以下の通りで、オンラインのDB接続設定を追加し、バッチモードの設定をコメントアウトしている。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | # Azure Storageの接続先 azure.storage.accountName=azureblobpurinit azure.storage.accessKey=(Azure Blob Storageのアクセスキー) azure.storage.blob-endpoint=https://azureblobpurinit.blob.core.windows.net azure.storage.containerName=blobcontainer # DB接続設定(バッチ) spring.datasource.url=jdbc:sqlserver://azure-db-purinit.database.windows.net:1433;database=azureSqlDatabase spring.datasource.username=purinit@azure-db-purinit spring.datasource.password=(DBのパスワード) spring.datasource.driverClassName=com.microsoft.sqlserver.jdbc.SQLServerDriver # DB接続設定(オンライン) spring.datasource.online.url=jdbc:sqlserver://azure-db-purinit.database.windows.net:1433;database=azureSqlDatabase spring.datasource.online.username=purinit@azure-db-purinit spring.datasource.online.password=(DBのパスワード) spring.datasource.online.driverClassName=com.microsoft.sqlserver.jdbc.SQLServerDriver # MyBatisでバッチモードで処理できるよう設定を変更 #mybatis.executor-type=BATCH #mybatis.configuration.default-executor-type=BATCH |
また、pom.xmlの内容は以下の通りで、@ConfigurationPropertiesアノテーションを利用するための設定を追加している。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 | <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.example</groupId> <artifactId>demoAzureFunc</artifactId> <version>0.0.1-SNAPSHOT</version> <name>Hello Spring Function on Azure</name> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.4.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <azure.functions.maven.plugin.version>1.9.0</azure.functions.maven.plugin.version> <!-- customize those properties. The functionAppName should be unique across Azure --> <functionResourceGroup>azureAppDemo</functionResourceGroup> <functionAppName>azureFuncDemoApp</functionAppName> <functionAppServicePlan>ASP-azureAppDemo-8679</functionAppServicePlan> <functionPricingTier>B1</functionPricingTier> <functionAppRegion>japaneast</functionAppRegion> <stagingDirectory>${project.build.directory}/azure-functions/${functionAppName}</stagingDirectory> <start-class>com.example.DemoAzureFunction</start-class> <spring.boot.wrapper.version>1.0.25.RELEASE</spring.boot.wrapper.version> </properties> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-function-adapter-azure</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-function-web</artifactId> <scope>provided</scope> </dependency> <!-- lombokを利用するための設定 --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <scope>provided</scope> </dependency> <!-- Spring Batchを利用するための設定 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> </dependency> <!-- Azure Storageの設定 --> <dependency> <groupId>com.microsoft.azure</groupId> <artifactId>azure-storage</artifactId> <version>8.3.0</version> </dependency> <!-- Azure StorageでSASトークンを利用するための設定 --> <dependency> <groupId>com.azure</groupId> <artifactId>azure-storage-blob</artifactId> <version>12.10.0</version> </dependency> <!-- SQL Serverを利用するための設定 --> <dependency> <groupId>com.microsoft.sqlserver</groupId> <artifactId>mssql-jdbc</artifactId> </dependency> <!-- MyBatisを利用するための設定 --> <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>2.1.1</version> </dependency> <!-- @ConfigurationPropertiesアノテーションを利用するための設定 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency> <!-- Test --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-function-dependencies</artifactId> <version>3.1.2</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <pluginManagement> <plugins> <plugin> <groupId>com.microsoft.azure</groupId> <artifactId>azure-functions-maven-plugin</artifactId> <version>${azure.functions.maven.plugin.version}</version> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-resources-plugin</artifactId> <version>3.1.0</version> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-dependency-plugin</artifactId> <version>3.1.2</version> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-clean-plugin</artifactId> <version>3.1.0</version> </plugin> </plugins> </pluginManagement> <plugins> <plugin> <groupId>com.microsoft.azure</groupId> <artifactId>azure-functions-maven-plugin</artifactId> <configuration> <resourceGroup>${functionResourceGroup}</resourceGroup> <appName>${functionAppName}</appName> <appServicePlanName>${functionAppServicePlan}</appServicePlanName> <region>${functionAppRegion}</region> <pricingTier>${functionPricingTier}</pricingTier> <runtime> <os>Linux</os> <javaVersion>8</javaVersion> </runtime> <appSettings> <!-- Run Azure Function from package file by default --> <property> <name>WEBSITE_RUN_FROM_PACKAGE</name> <value>1</value> </property> <property> <name>FUNCTIONS_EXTENSION_VERSION</name> <value>~3</value> </property> <property> <name>FUNCTIONS_WORKER_RUNTIME</name> <value>java</value> </property> </appSettings> </configuration> <executions> <execution> <id>package-functions</id> <goals> <goal>package</goal> </goals> </execution> </executions> </plugin> <plugin> <artifactId>maven-resources-plugin</artifactId> <executions> <execution> <id>copy-resources</id> <phase>package</phase> <goals> <goal>copy-resources</goal> </goals> <configuration> <overwrite>true</overwrite> <outputDirectory> ${project.build.directory}/azure-functions/${functionAppName} </outputDirectory> <resources> <resource> <directory>${project.basedir}/src/main/azure </directory> <includes> <include>**</include> </includes> </resource> </resources> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-dependency-plugin</artifactId> <executions> <execution> <id>copy-dependencies</id> <phase>prepare-package</phase> <goals> <goal>copy-dependencies</goal> </goals> <configuration> <outputDirectory>${stagingDirectory}/lib</outputDirectory> <overWriteReleases>false</overWriteReleases> <overWriteSnapshots>false</overWriteSnapshots> <overWriteIfNewer>true</overWriteIfNewer> <includeScope>runtime</includeScope> </configuration> </execution> </executions> </plugin> <!--Remove obj folder generated by .NET SDK in maven clean--> <plugin> <artifactId>maven-clean-plugin</artifactId> <configuration> <filesets> <fileset> <directory>obj</directory> </fileset> </filesets> </configuration> </plugin> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <dependencies> <dependency> <groupId>org.springframework.boot.experimental</groupId> <artifactId>spring-boot-thin-layout</artifactId> <version>${spring.boot.wrapper.version}</version> </dependency> </dependencies> </plugin> </plugins> </build> <repositories> <repository> <id>spring-snapshots</id> <name>Spring Snapshots</name> <url>https://repo.spring.io/plugins-snapshot</url> <snapshots> <enabled>true</enabled> </snapshots> <releases> <enabled>false</enabled> </releases> </repository> <repository> <id>spring-milestones</id> <name>Spring Milestones</name> <url>https://repo.spring.io/plugins-milestone</url> <snapshots> <enabled>false</enabled> </snapshots> </repository> </repositories> <pluginRepositories> <pluginRepository> <id>spring-snapshots</id> <name>Spring Snapshots</name> <url>https://repo.spring.io/plugins-snapshot</url> <snapshots> <enabled>true</enabled> </snapshots> <releases> <enabled>false</enabled> </releases> </pluginRepository> <pluginRepository> <id>spring-milestones</id> <name>Spring Milestones</name> <url>https://repo.spring.io/plugins-milestone</url> <snapshots> <enabled>false</enabled> </snapshots> </pluginRepository> </pluginRepositories> </project> |
さらに、バッチ処理のMapperインタフェース・XMLファイルの内容は以下の通りで、いずれも「com.example.mybatis.batch」フォルダ下に設定している。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | package com.example.mybatis.batch; import org.apache.ibatis.annotations.Mapper; import com.example.mybatis.model.UserData; @Mapper public interface UserDataMapperBatch { /** * DBにUserDataオブジェクトがあれば更新し、なければ追加する. * @param userData UserDataオブジェクト */ void upsert(UserData userData); /** * 引数のIDをキーにもつUserDataオブジェクトを行ロックをかけ取得する. * @param id ID * @return UserDataオブジェクト */ UserData findByIdRowLock(Integer id); /** * 引数のUserDataオブジェクトのバージョンを更新する. * @param userData UserDataオブジェクト */ void updateVersion(UserData userData); } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | <?xml version="1.0" encoding="UTF-8" ?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> <mapper namespace="com.example.mybatis.batch.UserDataMapperBatch"> <update id="upsert" parameterType="com.example.mybatis.model.UserData"> MERGE INTO USER_DATA AS u USING ( SELECT #{id} id, #{version} version ) s ON ( u.id = s.id ) WHEN MATCHED AND u.version = s.version THEN UPDATE SET name = #{name}, birth_year = #{birth_year} , birth_month = #{birth_month}, birth_day = #{birth_day} , sex = #{sex}, memo = #{memo}, version = #{version} + 1 WHEN NOT MATCHED THEN INSERT ( id, name, birth_year, birth_month, birth_day , sex, memo, version ) VALUES (#{id}, #{name}, #{birth_year}, #{birth_month}, #{birth_day} , #{sex}, #{memo}, #{version}) ; </update> <select id="findByIdRowLock" parameterType="java.lang.Integer" resultType="com.example.mybatis.model.UserData"> SELECT id, name, birth_year, birth_month, birth_day, sex, memo, version FROM USER_DATA WITH(ROWLOCK, UPDLOCK, NOWAIT) <!-- Azure SQL Databaseで行ロックをかける --> WHERE id = #{id} </select> <update id="updateVersion" parameterType="com.example.mybatis.model.UserData"> UPDATE USER_DATA SET version = #{version} + 2 WHERE id = #{id} </update> </mapper> |
また、オンライン処理のMapperインタフェース・XMLファイルの内容は以下の通りで、いずれも「com.example.mybatis.online」フォルダ下に設定している。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | package com.example.mybatis.online; import org.apache.ibatis.annotations.Mapper; import com.example.mybatis.model.UserData; @Mapper public interface UserDataMapperOnline { /** * 引数のIDをキーにもつUserDataオブジェクトのバージョン番号を更新する. * @param id ID * @return 更新件数 */ int updateVersion(Integer id); /** * 引数のIDをキーにもつUserDataオブジェクトを取得する. * @param id ID * @return UserDataオブジェクト */ UserData findById(Integer id); } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | <?xml version="1.0" encoding="UTF-8" ?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> <mapper namespace="com.example.mybatis.online.UserDataMapperOnline"> <update id="updateVersion" parameterType="java.lang.Integer"> UPDATE USER_DATA SET version = version + 1 WHERE id = #{id} </update> <select id="findById" parameterType="java.lang.Integer" resultType="com.example.mybatis.model.UserData"> SELECT id, name, birth_year, birth_month, birth_day, sex, memo, version FROM USER_DATA WHERE id = #{id} </select> </mapper> |
さらに、バッチ処理のMapperインタフェースを呼び出す箇所を、それぞれ変更後のパスに変更している。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 | package com.example.batch; import java.net.MalformedURLException; import org.apache.ibatis.session.SqlSessionFactory; import org.mybatis.spring.batch.MyBatisBatchItemWriter; import org.mybatis.spring.batch.builder.MyBatisBatchItemWriterBuilder; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.launch.support.RunIdIncrementer; import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.file.FlatFileItemReader; import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper; import org.springframework.batch.item.file.mapping.DefaultLineMapper; import org.springframework.batch.item.file.transform.DelimitedLineTokenizer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.UrlResource; import com.example.mybatis.model.UserData; import com.example.service.DemoBlobService; import lombok.RequiredArgsConstructor; @Configuration @EnableBatchProcessing @RequiredArgsConstructor public class DemoChunkConfig { /** ジョブ生成ファクトリ */ public final JobBuilderFactory jobBuilderFactory; /** ステップ生成ファクトリ */ public final StepBuilderFactory stepBuilderFactory; /** SQLセッションファクトリ */ public final SqlSessionFactory sqlSessionFactoryBatch; /** データ加工処理 */ private final DemoUpdateProcessor demoUpdateProcessor; /** データ加工前後処理 */ private final DemoProcessorListener demoProcessorListener; /** データ書き込み前後処理 */ private final DemoUpdateListener demoUpdateListener; /** ステップの前後処理 */ private final DemoStepListener demoStepListener; /** BLOBへアクセスするサービス */ @Autowired private DemoBlobService demoBlobService; /** * BlobStorageからファイルを読み込む. * @return 読み込みオブジェクト */ @Bean public FlatFileItemReader<UserData> reader() { FlatFileItemReader<UserData> reader = new FlatFileItemReader<UserData>(); try { // BlobStorageからファイル(user_data.csv)を読み込む際のURLをreaderに設定 String url = demoBlobService.getBlobUrl("user_data.csv"); reader.setResource(new UrlResource(url)); } catch (MalformedURLException ex) { throw new RuntimeException(ex); } // ファイルを読み込む際の文字コード reader.setEncoding("UTF-8"); // 1行目を読み飛ばす reader.setLinesToSkip(1); // ファイルから読み込んだデータをUserDataオブジェクトに格納 reader.setLineMapper(new DefaultLineMapper<UserData>() { { setLineTokenizer(new DelimitedLineTokenizer() { { setNames(new String[] { "id", "name", "birth_year" , "birth_month", "birth_day", "sex", "memo" }); } }); setFieldSetMapper(new BeanWrapperFieldSetMapper<UserData>() { { setTargetType(UserData.class); } }); } }); return reader; } /** * 読み込んだファイルのデータを、DBに書き込む. * @return 書き込みオブジェクト */ @Bean public MyBatisBatchItemWriter<UserData> writer() { return new MyBatisBatchItemWriterBuilder<UserData>() .sqlSessionFactory(sqlSessionFactoryBatch) .statementId("com.example.mybatis.batch.UserDataMapperBatch.upsert") .assertUpdates(true) // 楽観ロックエラーを有効にする .build(); } /** * Spring Batchのジョブ内で指定する処理単位(ステップ)を定義する. * @param reader 読み込みオブジェクト * @param writer 書き込みオブジェクト * @return Spring Batchのジョブ内で指定する処理単位 */ @Bean public Step step(ItemReader<UserData> reader, ItemWriter<UserData> writer) { // 生成するステップ内で読み込み/加工/加工後/書き込み/書き込み後/ステップ実行前後の処理の // 一連の流れを指定する // その際のチャンクサイズ(=何件読み込む毎にコミットするか)を3に指定している return stepBuilderFactory.get("step") .<UserData, UserData>chunk(3) .reader(reader) .processor(demoUpdateProcessor) .listener(demoProcessorListener) .writer(writer) .listener(demoUpdateListener) .listener(demoStepListener) .build(); } /** * Spring Batchのジョブを定義する. * @param jobListener 実行前後の処理(リスナ) * @param step 実行するステップ * @return Spring Batchのジョブ */ @Bean public Job updateUserData(DemoJobListener jobListener, Step step) { // 生成するジョブ内で、実行前後の処理(リスナ)と処理単位(ステップ)を指定する return jobBuilderFactory .get("updateUserData") .incrementer(new RunIdIncrementer()) .listener(jobListener) .flow(step) .end() .build(); } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 | package com.example.batch; import org.springframework.batch.core.ItemProcessListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; import com.example.mybatis.batch.UserDataMapperBatch; import com.example.mybatis.model.UserData; @Component public class DemoProcessorListener implements ItemProcessListener<UserData, UserData> { @Autowired private UserDataMapperBatch userDataMapperBatch; /** * データ加工前の処理を定義する. */ @Override public void beforeProcess(UserData item) { // 何もしない } /** * データ加工後の処理を定義する. */ @Override @Transactional public void afterProcess(UserData item, UserData result) { // 処理結果がnullの場合は、処理を終了する if(result == null) { return; } // 既に登録済データのVERSIONを取得する Integer id = Integer.parseInt(result.getId()); UserData ud = userDataMapperBatch.findByIdRowLock(id); // バージョンの値を結果に設定する if(ud != null) { result.setVersion(ud.getVersion()); } else { result.setVersion(0); } // id=8の場合、バージョンを更新し、楽観ロックエラーにする /* if(id == 8) { userDataMapperBatch.updateVersion(result); } */ } /** * データ編集エラー後の処理を定義する. */ @Override public void onProcessError(UserData item, Exception e) { // 何もしない } } |
また、オンライン処理用のHandler、Service、Param、Resultをそれぞれ追加している。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 | package com.example; import java.util.Optional; import org.springframework.cloud.function.adapter.azure.FunctionInvoker; import com.example.model.OnlineServiceParam; import com.example.model.OnlineServiceResult; import com.microsoft.azure.functions.ExecutionContext; import com.microsoft.azure.functions.HttpMethod; import com.microsoft.azure.functions.HttpRequestMessage; import com.microsoft.azure.functions.HttpResponseMessage; import com.microsoft.azure.functions.HttpStatus; import com.microsoft.azure.functions.annotation.AuthorizationLevel; import com.microsoft.azure.functions.annotation.FunctionName; import com.microsoft.azure.functions.annotation.HttpTrigger; public class OnlineHandler extends FunctionInvoker<OnlineServiceParam, OnlineServiceResult> { /** * HTTP要求に応じて、HelloFunctionクラスのonlineメソッドを呼び出し、その戻り値をボディに設定したレスポンスを返す * @param request リクエストオブジェクト * @param context コンテキストオブジェクト * @return レスポンスオブジェクト */ @FunctionName("online") public HttpResponseMessage execute(@HttpTrigger(name = "request", methods = HttpMethod.GET , authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request , ExecutionContext context) { // リクエストパラメータからidの値を取得 String paramId = request.getQueryParameters().get("id"); // オンラインサービス呼出用Paramを生成 OnlineServiceParam onlineServiceParam = new OnlineServiceParam(); onlineServiceParam.setId(paramId); // handleRequestメソッド内でHelloFunctionクラスのonlineメソッドを呼び出し、 // その戻り値をボディに設定したレスポンスを、JSON形式で返す return request.createResponseBuilder(HttpStatus.OK) .body(handleRequest(onlineServiceParam, context)) .header("Content-Type", "text/json").build(); } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 | package com.example.service; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import com.example.model.OnlineServiceParam; import com.example.model.OnlineServiceResult; import com.example.mybatis.model.UserData; import com.example.mybatis.online.UserDataMapperOnline; @Service public class OnlineService { /* Spring Bootでログ出力するためのLogbackのクラスを生成 */ private static final Logger LOGGER = LoggerFactory.getLogger(OnlineService.class); @Autowired private UserDataMapperOnline userDataMapperOnline; /** * 指定されたIDのバージョンを更新し、更新後データを返却するサービス. * @param onlineServiceParam オンラインサービス呼出用Param * @return オンラインサービスの処理結果 */ public OnlineServiceResult online(OnlineServiceParam onlineServiceParam) { OnlineServiceResult result = new OnlineServiceResult(); Integer tmpId = null; // 引数のIDが数値でなければ、処理を終了 try { tmpId = Integer.parseInt(onlineServiceParam.getId()); }catch(Exception ex) { return result; } // 指定されたIDのバージョンを更新 int updCnt = userDataMapperOnline.updateVersion(tmpId); LOGGER.info("更新ID : " + tmpId + ", 更新件数 : " + updCnt); // 指定されたIDのUserDataオブジェクトを返却 UserData userData = userDataMapperOnline.findById(tmpId); if(userData == null) { userData = new UserData(); } result.setUserData(userData.toString()); return result; } } |
1 2 3 4 5 6 7 8 9 10 11 | package com.example.model; import lombok.Data; @Data public class OnlineServiceParam { /** ID */ private String id; } |
1 2 3 4 5 6 7 8 9 10 11 | package com.example.model; import lombok.Data; @Data public class OnlineServiceResult { /** ユーザ情報 */ private String userData; } |
さらに、Functionのメインクラスに、関数onlineを追加している。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 | package com.example; import java.util.function.Function; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import com.example.model.OnlineServiceParam; import com.example.model.OnlineServiceResult; import com.example.model.TimerTriggerParam; import com.example.model.TimerTriggerResult; import com.example.service.OnlineService; import com.example.service.TimerTriggerService; @SpringBootApplication public class DemoAzureFunction { /** タイマートリガーのテストを行うサービスクラスのオブジェクト */ @Autowired private TimerTriggerService timerTriggerService; /** オンライン処理を行うサービスクラスのオブジェクト */ @Autowired private OnlineService onlineService; public static void main(String[] args) throws Exception { SpringApplication.run(DemoAzureFunction.class, args); } /** * タイマートリガーのテストを行い結果を返却する関数 * @return タイマートリガーのテストを行うサービスクラスの呼出結果 */ @Bean public Function<TimerTriggerParam, TimerTriggerResult> timerTriggerTest() { return timerTriggerParam -> timerTriggerService.timerTriggerTest(timerTriggerParam); } /** * オンライン処理を行い結果を返却する関数 * @return オンライン処理を行うサービスクラスの呼出結果 */ @Bean public Function<OnlineServiceParam, OnlineServiceResult> online(){ return onlineServiceParam -> onlineService.online(onlineServiceParam); } } |
その他のソースコード内容は、以下のサイトを参照のこと。
https://github.com/purin-it/azure/tree/master/azure-functions-db-batch-online/demoAzureFunc
サンプルプログラムの実行結果
サンプルプログラムの実行結果は、以下の通り。
1) 以下のサイトの「サンプルプログラムの実行結果(ローカル)」「サンプルプログラムの実行結果(Azure上)」に記載の手順で、サンプルプログラムをAzure Functionsにデプロイする。
2) バッチ処理の実行結果は、以下の記事の「サンプルプログラムの実行結果」を参照のこと。
3) OnlineHandlerクラスのAPIを呼び出す前のデータベースの状態は、以下の通り。
1 | SELECT * FROM dbo.USER_DATA ORDER BY ID ASC |
4) 「https://azurefuncdemoapp.azurewebsites.net/api/online?id=5」とアクセスし、OnlineHandlerクラスのAPIを呼び出すと、以下の戻り値が返却される。
なお、「https://azurefuncdemoapp.azurewebsites.net」の部分は、Azure Portalで確認した、Azure Functionsの以下の赤枠のURLから確認できる。
5) OnlineHandlerクラスのAPIを呼び出した後のデータベースの状態は以下の通りで、id=5のVERSIONが1増加していることが確認できる。
1 | SELECT * FROM dbo.USER_DATA ORDER BY ID ASC |
6) OnlineHandlerクラスのAPIを呼び出した際の、ログの出力結果は以下の通りで、ID=5を更新した際の更新件数が1であることが確認できる。
要点まとめ
- Azure Function上で楽観ロックを実装する際、DB接続する際のSqlSessionを生成する際、Spring BatchのChunkモデルを用いたバッチ処理ではバッチモード(ExecutorType.BATCH)を利用し、それ以外のオンラインからの呼出ではExecutorType.SIMPLEを利用する必要がある。その使い分けをするには、DB接続設定を分ければよい。