今回も引き続き、GCSとBigQueryで連動するプログラムについて述べる。ここでは、GCS(Google Cloud Storage)とBigQueryで連動するPythonプログラムを作成する前に行った環境構築と、作成したPython 2.7のプログラムの紹介を行う。
前提条件
以下の記事での環境構築が完了していること
GCSとBigQueryで連動するプログラムを作成してみた(2)今回も引き続き、GCSとBigQueryで連動するプログラムについて述べる。ここでは、Windows 10が入っているローカル環境にて、...
環境構築
ここでは、前提条件で記載した以外の環境構築手順について述べる。
1) コマンドプロンプト上で、「pip install (–upgrade) google-cloud-storage」を実行し、GCSを利用するためのライブラリをインストールする
なお、–upgradeオプションを指定すると、インストール済ライブラリを最新化できる
2) コマンドプロンプト上で、「pip install (–upgrade) google-cloud-bigquery」を実行し、BigQueryを利用するためのライブラリをインストールする
作成したPython 2.7のプログラム
ここでは、作成したPython 2.7のプログラムを紹介する。
1) GCS上のCSVファイル(insert_bigquery_sales.csv)のデータを、BigQueryのsalesテーブルに追加するプログラム (プログラム名:insert_into_sales.py)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 | # -*- coding: utf-8 -*- import sys from google.cloud import bigquery def insert_into_sales(): # 文字コードをUTF-8に設定 reload(sys) sys.setdefaultencoding('utf-8') # ロードするBigQuery上のデータセットとテーブルの設定 dataset_id = 'bigquery_purin_it' table_id = 'sales' client = bigquery.Client() dataset_ref = client.dataset(dataset_id) # BigQuery上のテーブルにロードするジョブの設定 # ここでは、テーブルデータを全て削除してからロードしている job_config = bigquery.LoadJobConfig() job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE job_config.schema = [ bigquery.SchemaField('sale_date', 'DATE') , bigquery.SchemaField('product_name', 'STRING') , bigquery.SchemaField('place_name', 'STRING') , bigquery.SchemaField('sales_amount', 'INTEGER') ] job_config.skip_leading_rows = 1 job_config.source_format = bigquery.SourceFormat.CSV load_job = client.load_table_from_uri( 'gs://test_purin_bucket/insert_bigquery_sales.csv' , dataset_ref.table(table_id) , job_config = job_config) # ジョブを実行し、終了を待つ print('Starting job {}'.format(load_job.job_id)) load_job.result() print('Job Finished.') if __name__ == '__main__': insert_into_sales() |
2) BigQueryのsalesテーブルのデータを、JSONファイル(sales.json)に出力するプログラム (プログラム名:select_from_sales.py)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 | # -*- coding: utf-8 -*- import sys import json import collections as cl from google.cloud import bigquery def query_from_sales(): # 文字コードをUTF-8に設定 reload(sys) sys.setdefaultencoding('utf-8') # BigQuery salesテーブルからデータを抽出 client = bigquery.Client() query_job = client.query(""" SELECT sale_date , product_name , place_name , sales_amount FROM `(GCPのプロジェクト名).bigquery_purin_it.sales` """) results = query_job.result() # 抽出した結果をJSONファイルに出力 ys = cl.OrderedDict() i = 0 for row in results: data = cl.OrderedDict() data["sale_date"] = row.sale_date.strftime('%Y-%m-%d') data["product_name"] = row.product_name data["place_name"] = row.place_name data["sales_amount"] = row.sales_amount ys[i + 1] = data i = i + 1 fw = open('C:\work\gcp\sales.json', 'w') json.dump(ys, fw, ensure_ascii=False, indent=4) fw.close() print('JSONファイル出力完了'.decode('utf-8')) if __name__ == '__main__': query_from_sales() |
「HD Video Converter Factory Pro」は動画の形式変換や編集・録画等を行える便利ツールだった動画の形式変換や編集・録画等を行える便利ツールの一つに、「HD Video Converter Factory Pro」があります。ここ...
作成したPython 2.7のプログラムの実行結果
以下の記事を参照のこと。
GCSとBigQueryで連動するプログラムを作成してみた(1)GCS(Google Cloud Storage)とBigQueryで連動するプログラムを作成したので共有する。 GCSは、ファイ...