SSAS

GCS(Google Cloud Storage)のファイルをSQLServerに取り込んでデータ分析を行った(5)

GCS(Google Cloud Storage)に配置したファイルを、SQL Server搭載済の仮想マシンに転送し、SQL Serverにデータロードし、データロードしたデータを利用してデータ分析用のデータ(ディメンション・キューブ)であるSSASプロジェクトを、SQL Server Analysis Serverに配置するところまで、一括で実施できるJavaプログラムを作成したため、その内容について共有する。
今回は、Javaプログラムのソースコードについて述べる。

今回作成したJavaプログラムの構成は以下の通り。
Javaソース構造

下記記事のように、Mavenプロジェクトを作成している。

JavaでSQL Serverにデータロードするプログラムを作成した(ソースコード編)今回も引き続き、Javaで作成したSQL Serverにデータロードするプログラムについて記載する。ここでは、JavaでSQL Serv...

また、pom.xmlの内容は以下の通り。

<project xmlns="http://maven.apache.org/POM/4.0.0" 
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
       http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>test.sample</groupId>
  <artifactId>test.sqlserver-analysis</artifactId>
  <version>0.0.1-SNAPSHOT</version>

  <!-- 文字コードとJavaのバージョンの設定 -->
  <properties>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     <java.version>1.8</java.version>
  </properties>

  <!-- プラグインの設定 -->
  <build>
      <plugins>
          <!-- Javaファイルのコンパイラの設定 -->
          <plugin>
              <groupId>org.apache.maven.plugins</groupId>
              <artifactId>maven-compiler-plugin</artifactId>
              <version>3.6.1</version>
              <configuration>
                  <source>${java.version}</source>
                  <target>${java.version}</target>
              </configuration>
         </plugin>
         <!-- プロジェクトと依存するライブラリを1つにまとめる設定 -->
         <plugin>
              <groupId>org.apache.maven.plugins</groupId>
              <artifactId>maven-assembly-plugin</artifactId>
              <version>2.6</version>
              <configuration>
                  <descriptorRefs>
                       <descriptorRef>jar-with-dependencies</descriptorRef>
                  </descriptorRefs>
                  <archive>
                       <manifest>
                             <!-- メインプログラムとして実行するクラスの指定 -->
                             <mainClass>test.TestSqlserverAnalysis</mainClass>
                       </manifest>
                  </archive>
              </configuration>
              <executions>
                  <execution>
                       <phase>package</phase>
                       <goals>
                             <goal>single</goal>
                       </goals>
                 </execution>
             </executions>
          </plugin>
      </plugins>
  </build>

  <!-- ライブラリ依存関係の設定 -->
  <dependencies>
      <dependency>
          <!-- SQL ServerへのJDBCドライバを追加する設定 -->
          <groupId>com.microsoft.sqlserver</groupId>
          <artifactId>mssql-jdbc</artifactId>
          <version>7.0.0.jre8</version>
      </dependency>
  </dependencies>

</project>

また、sqlserver_analysis.propertiesの内容は以下の通り。

copyFromGcsCmd=C:\\work\\copy_from_gcs.cmd
serverName=localhost
dbName=model
tblName=dbo.sales
tsvFilePath=c:\\work\\sqlserver\\
deleteSSASFile=c:\\work\\delete_SSASProject1.xmla
deploySSASExe=c:\\Program Files (x86)\\Microsoft SQL Server\\140\\Tools\\Binn\\ManagementStudio\\Microsoft.AnalysisServices.Deployment.exe
deploySSASAsdb=c:\\Users\\sql_server\\source\\repos\\SSASProject1Solution\\SSASProject1\\bin\\SSASProject1.asdatabase

さらに、TestSqlserverAnalysis.javaの内容は以下の通り。

package test;

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Date;
import java.util.Properties;

public class TestSqlserverAnalysis {

        private static String copyFromGcsCmd = null;
        private static String serverName = null;
        private static String dbName = null;
        private static String tblName = null;
        private static String tsvFilePath = null;
        private static String deleteSSASFile = null;
        private static String deploySSASExe = null;
        private static String deploySSASAsdb = null;

        /**
         * メインクラス
         *
         * @param args 引数
         */
        public static void main(String[] args) {
              System.out.println(
                   "*** TestSqlServerAnalysis Started. started time : "
                   + new Date() + " ***");
              long startTime = System.currentTimeMillis();

              try {
                       // プロパティファイルの値を読み込む
                       initValuables();

                       // GCSのファイルをコピーする
                       doOsCommand(new ProcessBuilder(copyFromGcsCmd)
                                     , "Get File From GCS");

                       // コピーしたGCSのファイルをSQLServerへのロードする
                       loadToSqlServer();

                       // SqlServerAnalysisServicesにデプロイ済のSSASプロジェクトを削除する
                       doOsCommand(new ProcessBuilder("powershell", "-Command"
                            , "Invoke-ASCmd", "-InputFile:" + deleteSSASFile
                            , "-Server:" + serverName), "Delete SSAS Project");

                       // SqlServerAnalysisServicesにSSASプロジェクトをデプロイする
                       doOsCommand(new ProcessBuilder(deploySSASExe, deploySSASAsdb
                            , "/s"), "Deploy SSAS Project");

               } catch (Exception e) {
                     System.out.println(e);
               } finally {
                     long endTime = System.currentTimeMillis();
                     System.out.println(
                        "*** TestSqlServerAnalysis Ended. ended time : " 
                         + new Date() + " ***");
                     System.out.println(
                        "*** execute time : " 
                         + (endTime - startTime) / 1000.0 + "[s] ***");
               }

         }

        /**
         * プロパティファイルの値を読み込む
         *
         * @throws IOException 入出力例外
         */
         private static void initValuables() throws IOException {

                System.out.println("--- Init Valuables Started. ---");

                // プロパティファイルの値を読み込む
                Properties properties = new Properties();
                InputStream stream = TestSqlserverAnalysis.class.getClassLoader()
                             .getResourceAsStream("sqlserver_analysis.properties");
                properties.load(stream);

                // プロパティファイルの値を設定する
                copyFromGcsCmd = properties.getProperty("copyFromGcsCmd");
                serverName = properties.getProperty("serverName");
                dbName = properties.getProperty("dbName");
                tblName = properties.getProperty("tblName");
                tsvFilePath = properties.getProperty("tsvFilePath");
                deleteSSASFile = properties.getProperty("deleteSSASFile");
                deploySSASExe = properties.getProperty("deploySSASExe");
                deploySSASAsdb = properties.getProperty("deploySSASAsdb");

                System.out.println("--- Init Valuables Ended. ---");
        }

        /**
         * OSコマンドを実行する
         *
         * @param pb プロセス生成オブジェクト
         * @throws IOException 入出力例外
         * @throws InterruptedException 割り込み例外
         */
        private static void doOsCommand(ProcessBuilder pb, String cmdName) 
                                       throws IOException, InterruptedException {

                System.out.println("--- " + cmdName + " Started.---");

                // 標準出力と標準エラー出力を統合
                pb.redirectErrorStream(true);

                // OSコマンドを実行する
                Process process = pb.start();

                // OSコマンドの標準出力を出力する
                InputStream is = process.getInputStream();
                BufferedReader br = new BufferedReader(new InputStreamReader(is));
                while (true) {
                        String line = br.readLine();
                        if (line == null) {
                               break;
                        }
                        System.out.println(line);
                }

                // OSコマンドが完了するまで待つ
                process.waitFor();

                System.out.println("--- " + cmdName + " Ended.---");
        }

        /**
         * SQLServerへのデータロード処理を実行する
         *
         * @throws ClassNotFoundException
         * @throws SQLException
         */
        private static void loadToSqlServer() 
                    throws ClassNotFoundException, SQLException {

                System.out.println("--- Load to SqlServer Started. ---");

                // SQL Serverに接続開始
                Class.forName("com.microsoft.sqlserver.jdbc.SQLServerDriver");
                Connection con = DriverManager.getConnection(
                      "jdbc:sqlserver://" + serverName + ";databaseName=" + dbName
                             + ";integratedSecurity=true;"); // Windows認証で接続

                // tblNameで指定したテーブルのデータを一括削除
                PreparedStatement ps 
                   = con.prepareStatement("TRUNCATE TABLE " + tblName);
                ps.executeUpdate();

                // tblNameで指定したテーブルに、TSVファイルのデータを追加
                int updCnt = 0;
                File[] files = new File(tsvFilePath).listFiles();
                for(int i = 0; i < files.length; i++) {
                    String fileName = files[i].getName();
                    if(fileName.startsWith("insert_sales_") 
                              && fileName.endsWith(".tsv")) {
                        ps = con.prepareStatement("BULK INSERT " + tblName
                            // 読み込みファイルを指定
                            + " FROM '" + tsvFilePath + fileName + "' " 
                            + " WITH (FIRSTROW = 2 " // 読み込み開始行(2行目)を指定
                            + ", FIELDTERMINATOR = '\\t'" // 区切り文字(タブ)を指定
                            + ", ROWTERMINATOR = '\\n'" // 改行文字(\n)を指定
                            + ", CODEPAGE = '65001'" // 文字コード(UTF-8)を指定
                            + ", DATAFILETYPE = 'Char')"); // データ形式(文字形式)を指定
                        updCnt += ps.executeUpdate();
                   }
                }

                // 後処理
                if(ps != null) {
                        ps.close();
                }
                if(con != null) {
                       con.close();
                }

                System.out.println("update count :" + updCnt);
                System.out.println("--- Load to SqlServer Ended. ---");
       }

}

なお、上記Javaのプログラムから呼ばれる「copy_from_gcs.cmd」の内容は以下の通り。
GCSからのファイルコピー
Javaプログラムから直接gsutilコマンドを呼ぼうとした際エラーになってしまったため、別のバッチファイルとして作成している。ちなみに、「gsutil -m cp」と「-m」というオプションを付与することで、コピー処理の並列実行ができる。

本プログラムの実行結果は、以下の記事を参照のこと。

GCS(Google Cloud Storage)のファイルをSQLServerに取り込んでデータ分析を行った(1)GCS(Google Cloud Storage)に配置したファイルを、SQL Server搭載済の仮想マシンに転送し、SQL Serv...