はじめに
こんにちは。
今回は、以前から気になっていた「美容商品の販売データ」の分析に挑戦することにしました。
テーマは、「過去の販売データ(CSV)」と「未来の天気予報(API)」の統合です。
本記事では、環境構築から、SQLでのデータ加工の裏側を解説します。
よろしくお願いします。
【使用したcsvファイル】
💄 Top Beauty & Cosmetics Products Worldwide 2024
Kaggle公式サイト:https://www.kaggle.com/
【環境】
- OS:MAC
- パッケージ・プロジェクトの管理ツール:poetry
- 言語:Python
- データベース:DuckDB
環境構築に時間をかけず、本質的なデータ分析にすぐ着手したかったのでDuckDBを使いました。
通常、DBを使うには「サーバーを立てる」「ユーザー権限を設定する」といった手間がかかりますが、DuckDBはPythonライブラリ(duckdb)をインポートするだけで動きます。
コード解説
main.pyをベースに解説していきます!
全体のコードはブログの一番下にまとめてあります。
全体のコードへジャンプ◾️実行用の関数を呼び出す
まず、run_analysis関数を呼び出します。
DBの管理全般をここで行います。
product_df = run_analysis()
◾️DBへの接続とSQLの読み込み
まず、DBへの接続とSQLの読み込みを行います。
処理はdb_utils.pyのget_connection関数で管理しています。
main.py
def run_analysis():
con = util.get_connection()
あちこちで同じコードを書くと修正が大変になるので、接続やファイルの読み込みといった共通モジュールとして集約しました。
db_utils.py
import duckdb
import os
def get_connection(db_path="local_test.db"):
"""DBへの接続を返す共通関数"""
return duckdb.connect(db_path)
def read_sql_file(file_path):
"""SQLファイルを読み込む共通関数"""
if not os.path.exists(file_path):
raise FileNotFoundError(f"SQLファイルが見つかりません: {file_path}")
with open(file_path, 'r', encoding='utf-8') as f:
return f.read()
◾️テーブルを作る
分析で扱うデータを入れこむテーブルを作ります。
処理はcreate_table.pyのrun関数で管理しています。
main.py
create_table.run(con)
DuckDBのread_csv_auto関数を活用し、スキーマの自動推論とテーブル作成を実現しました。
また、 WHERE 1=0 とすることで、データのロード前に空の型定義だけを正確に作成しています。
これにより、もしCSV側のデータが壊れていても、テーブル作成自体が失敗してエラーで止まることが防げます。
変なデータがテーブルに入ってしまった場合、この後に実行する INSERT 処理でエラーとして検知できます。 いきなり全データをロードするのではなく、まずは”あるべき姿(スキーマ)”を確定させることで、不純なデータに対するガードを一段階高くしました。
create_table.py
import src.table_category as conf
def run(con):
con.execute(f"CREATE SCHEMA IF NOT EXISTS {conf.SCHEMA}")
con.execute(f"DROP TABLE IF EXISTS {conf.TABLE_BEAUTY_BASIC_INFO}")
con.execute(f"DROP TABLE IF EXISTS {conf.TABLE_BEAUTY_DETAILS_INFO}")
con.execute(f"CREATE TABLE {conf.TABLE_BEAUTY_BASIC_INFO} AS SELECT * FROM read_csv_auto('{conf.BASE_PATH}/product_basic_info.csv') WHERE 1=0")
con.execute(f"CREATE TABLE {conf.TABLE_BEAUTY_DETAILS_INFO} AS SELECT * FROM read_csv_auto('{conf.BASE_PATH}/productdetails.csv') WHERE 1=0")
print(f"--- [Success] Tables defined from CSV structure ---")
table_category.pyでは、
テーブル名やスキーマ定義をmain.pyにハードコードせず、外から参照するようにしました。
table_category.py
SCHEMA = 'main_schema'
TABLE_BEAUTY_BASIC_INFO = f'{SCHEMA}.beauty_product_basic_info'
TABLE_BEAUTY_DETAILS_INFO = f'{SCHEMA}.beauty_product_details_info'
BASE_PATH = "/opt/homebrew/bin/20260224/samplecsv"
◾️データを挿入する
続いて、テーブルにデータを挿入します。
main.py
insert_data.run(con)
データの流し込みはinsert_data.pyのrun関数で管理しています。
create_table.pyのテーブルの作成と同様、DuckDBのread_csv_auto関数を活用しています。
スキーマの判別をread_csv_autoに委ねることで、データ構造の変化にも柔軟に対応できる自動インサート処理を実現しました。
insert_data.py
import src.table_category as conf
def run(con):
con.execute(f"INSERT INTO {conf.TABLE_BEAUTY_BASIC_INFO} SELECT * FROM read_csv_auto('{conf.BASE_PATH}/product_basic_info.csv')")
con.execute(f"INSERT INTO {conf.TABLE_BEAUTY_DETAILS_INFO} SELECT * FROM read_csv_auto('{conf.BASE_PATH}/productdetails.csv')")
print("Data inserted.")
◾️SQLファイルをPythonにロード
ここでは、SQLをPythonコードの中に直接書くのではなく、
独立した .sql ファイルとして管理しています。
analysis_beauty_product.sql
select t1.Product_Name,t1.SALE_DATE,t1.Rating,t1.Price_USD,t2.Country_of_Origin from {TABLE_BEAUTY_BASIC_INFO} t1
join {TABLE_BEAUTY_DETAILS_INFO} t2 on t1.Product_ID = t2.Product_ID
WHERE Country_of_Origin = 'Japan';
変数にsqlファイルのパスを指定します。
db_util.pyのread_sql_file関数に変数を入れ込み、呼び出します。
main.py
sql_path = "src/sql/analysis_beauty_product.sql"
query = util.read_sql_file(sql_path)
db_util.py(再掲載)
def read_sql_file(file_path):
"""SQLファイルを読み込む共通関数"""
if not os.path.exists(file_path):
raise FileNotFoundError(f"SQLファイルが見つかりません: {file_path}")
with open(file_path, 'r', encoding='utf-8') as f:
return f.read()
◾️SQLクエリのテンプレートに、実際のテーブル名を置き換える
設定ファイル(conf)からテーブル名を取得し、SQLテンプレート内のプレースホルダを置換します。
設定の集約化、可読性の向上、ミス防止などに効果的です。
main.py
formatted_query = query.replace("{TABLE_BEAUTY_BASIC_INFO}", conf.TABLE_BEAUTY_BASIC_INFO) \
.replace("{TABLE_BEAUTY_DETAILS_INFO}", conf.TABLE_BEAUTY_DETAILS_INFO)
◾️データフレーム(表)を作る。
df = con.execute(formatted_query).df()
出力結果:

最後に
今回はSQLの動的生成と文字列置換について解説しました。
次回は「API編」を予定しています。
外部システムとの連携で気をつけるポイントなどをまとめる予定ですので、ぜひお楽しみに!
今回登場したコード全体
main.py
import src.db_util as util
import src.table_category as conf
import src.create_table as create_table
import src.insert_data as insert_data
import pandas as pd
from src.weather_api_client import get_weather
def run_analysis():
con = util.get_connection()
create_table.run(con)
insert_data.run(con)
sql_path = "src/sql/analysis_beauty_product.sql"
query = util.read_sql_file(sql_path)
formatted_query = query.replace("{TABLE_BEAUTY_BASIC_INFO}", conf.TABLE_BEAUTY_BASIC_INFO) \
.replace("{TABLE_BEAUTY_DETAILS_INFO}", conf.TABLE_BEAUTY_DETAILS_INFO)
df = con.execute(formatted_query).df()
return df
def create_weather_frame(data):
weather_list = []
for forecast in data['list']:
weather_list.append({
"Date_Time": forecast['dt_txt'],
"Weather": forecast['weather'][0]['description'],
"Temp_Celsius": forecast['main']['temp']
})
return pd.DataFrame(weather_list)
if __name__ == "__main__":
product_df = run_analysis()
data = get_weather("Tokyo")
weather_df = create_weather_frame(data)
# strをdatetime64に変換
weather_df['Date_Time'] = pd.to_datetime(weather_df['Date_Time'])
# 時刻を00:00:00にリセット(normalize)して日付の粒度を揃える
weather_df['Date_Time'] = weather_df['Date_Time'].dt.normalize()
# 型の確認
# print(product_df['SALE_DATE'].dtype)
# print(weather_df['Date_Time'].dtype)
# 結合キーの値を数件表示して見比べる
# print(product_df['SALE_DATE'].head())
# print(weather_df['Date_Time'].head())
df_combined = pd.merge(
product_df,
weather_df,
left_on='SALE_DATE',
right_on='Date_Time',
how='left')
print(df_combined)
print(df_combined['Weather'].value_counts())
print(round(df_combined['Temp_Celsius'].describe()),2)
df_combined['Temp_Category'] = pd.cut(df_combined['Temp_Celsius'], bins=[-float('inf'), 7, float('inf')], labels=['寒い', '暖かい'])
print(df_combined.groupby(['Temp_Category', 'Product_Name']).size().unstack())
create_table.py
import src.table_category as conf
def run(con):
con.execute(f"CREATE SCHEMA IF NOT EXISTS {conf.SCHEMA}")
con.execute(f"DROP TABLE IF EXISTS {conf.TABLE_BEAUTY_BASIC_INFO}")
con.execute(f"DROP TABLE IF EXISTS {conf.TABLE_BEAUTY_DETAILS_INFO}")
con.execute(f"CREATE TABLE {conf.TABLE_BEAUTY_BASIC_INFO} AS SELECT * FROM read_csv_auto('{conf.BASE_PATH}/product_basic_info.csv') WHERE 1=0")
con.execute(f"CREATE TABLE {conf.TABLE_BEAUTY_DETAILS_INFO} AS SELECT * FROM read_csv_auto('{conf.BASE_PATH}/productdetails.csv') WHERE 1=0")
print(f"--- [Success] Tables defined from CSV structure ---")
db_util.py
import duckdb
import os
def get_connection(db_path="local_test.db"):
"""DBへの接続を返す共通関数"""
return duckdb.connect(db_path)
def read_sql_file(file_path):
"""SQLファイルを読み込む共通関数"""
if not os.path.exists(file_path):
raise FileNotFoundError(f"SQLファイルが見つかりません: {file_path}")
with open(file_path, 'r', encoding='utf-8') as f:
return f.read()
insert_data.py
import src.table_category as conf
def run(con):
con.execute(f"INSERT INTO {conf.TABLE_BEAUTY_BASIC_INFO} SELECT * FROM read_csv_auto('{conf.BASE_PATH}/product_basic_info.csv')")
con.execute(f"INSERT INTO {conf.TABLE_BEAUTY_DETAILS_INFO} SELECT * FROM read_csv_auto('{conf.BASE_PATH}/productdetails.csv')")
print("Data inserted.")
table_category.py
SCHEMA = 'main_schema'
TABLE_BEAUTY_BASIC_INFO = f'{SCHEMA}.beauty_product_basic_info'
TABLE_BEAUTY_DETAILS_INFO = f'{SCHEMA}.beauty_product_details_info'
BASE_PATH = "your csv path"
analysis_beauty_product.sql
select t1.Product_Name,t1.SALE_DATE,t1.Rating,t1.Price_USD,t2.Country_of_Origin from {TABLE_BEAUTY_BASIC_INFO} t1
join {TABLE_BEAUTY_DETAILS_INFO} t2 on t1.Product_ID = t2.Product_ID
WHERE Country_of_Origin = 'Japan';
