GCP

GCSとBigQueryで連動するプログラムを作成してみた(3)

今回も引き続き、GCSとBigQueryで連動するプログラムについて述べる。ここでは、GCS(Google Cloud Storage)とBigQueryで連動するPythonプログラムを作成する前に行った環境構築と、作成したPython 2.7のプログラムの紹介を行う。

前提条件

以下の記事での環境構築が完了していること

GCSとBigQueryで連動するプログラムを作成してみた(2)今回も引き続き、GCSとBigQueryで連動するプログラムについて述べる。ここでは、Windows 10が入っているローカル環境にて、...

環境構築

ここでは、前提条件で記載した以外の環境構築手順について述べる。

1) コマンドプロンプト上で、「pip install (–upgrade) google-cloud-storage」を実行し、GCSを利用するためのライブラリをインストールする
Python用環境構築1
なお、–upgradeオプションを指定すると、インストール済ライブラリを最新化できる

2) コマンドプロンプト上で、「pip install (–upgrade) google-cloud-bigquery」を実行し、BigQueryを利用するためのライブラリをインストールする
Python用環境構築2

作成したPython 2.7のプログラム

ここでは、作成したPython 2.7のプログラムを紹介する。

1) GCS上のCSVファイル(insert_bigquery_sales.csv)のデータを、BigQueryのsalesテーブルに追加するプログラム (プログラム名:insert_into_sales.py)

# -*- coding: utf-8 -*-
import sys
from google.cloud import bigquery

def insert_into_sales():
    # 文字コードをUTF-8に設定
    reload(sys)
    sys.setdefaultencoding('utf-8')

    # ロードするBigQuery上のデータセットとテーブルの設定
    dataset_id = 'bigquery_purin_it'
    table_id = 'sales'
    client = bigquery.Client()
    dataset_ref = client.dataset(dataset_id)

    # BigQuery上のテーブルにロードするジョブの設定
    # ここでは、テーブルデータを全て削除してからロードしている
    job_config = bigquery.LoadJobConfig()
    job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
    job_config.schema = [
          bigquery.SchemaField('sale_date', 'DATE')
        , bigquery.SchemaField('product_name', 'STRING')
        , bigquery.SchemaField('place_name', 'STRING')
        , bigquery.SchemaField('sales_amount', 'INTEGER')
    ]
    job_config.skip_leading_rows = 1
    job_config.source_format = bigquery.SourceFormat.CSV    
    load_job = client.load_table_from_uri(
                     'gs://test_purin_bucket/insert_bigquery_sales.csv'
                     , dataset_ref.table(table_id)
                     , job_config = job_config)
    
    # ジョブを実行し、終了を待つ
    print('Starting job {}'.format(load_job.job_id))
    load_job.result()
    print('Job Finished.')

if __name__ == '__main__':
    insert_into_sales()

2) BigQueryのsalesテーブルのデータを、JSONファイル(sales.json)に出力するプログラム (プログラム名:select_from_sales.py)

# -*- coding: utf-8 -*-
import sys
import json
import collections as cl
from google.cloud import bigquery

def query_from_sales():
    # 文字コードをUTF-8に設定
    reload(sys)
    sys.setdefaultencoding('utf-8')

    # BigQuery salesテーブルからデータを抽出
    client = bigquery.Client()
    query_job = client.query("""
       SELECT 
           sale_date
         , product_name
         , place_name
         , sales_amount
       FROM `(GCPのプロジェクト名).bigquery_purin_it.sales`
     """)
    results = query_job.result()
    
    # 抽出した結果をJSONファイルに出力
    ys = cl.OrderedDict()
    i = 0
    for row in results:
        data = cl.OrderedDict()
        data["sale_date"] = row.sale_date.strftime('%Y-%m-%d')
        data["product_name"] = row.product_name
        data["place_name"] = row.place_name
        data["sales_amount"] = row.sales_amount
        ys[i + 1] = data
        i = i + 1
        
    fw = open('C:\work\gcp\sales.json', 'w')
    json.dump(ys, fw, ensure_ascii=False, indent=4)
    fw.close()
    print('JSONファイル出力完了'.decode('utf-8'))

if __name__ == '__main__':
    query_from_sales()
「HD Video Converter Factory Pro」は動画の形式変換や編集・録画等を行える便利ツールだった動画の形式変換や編集・録画等を行える便利ツールの一つに、「HD Video Converter Factory Pro」があります。ここ...

作成したPython 2.7のプログラムの実行結果

以下の記事を参照のこと。

GCSとBigQueryで連動するプログラムを作成してみた(1)GCS(Google Cloud Storage)とBigQueryで連動するプログラムを作成したので共有する。 GCSは、ファイ...