Spark/Databricks における “キュー(データスキュー)” とは何か?

まず、「キュー」あるいは 「データスキュー (data skew)」という言葉は、分散処理システム(Spark/Databricks 等)でしばしば問題になる現象を指します。

データスキューを解決するための代表的な方法がいくつか存在しますので、それぞれの対応内容を簡単にみていきましょう!

データスキューの定義と問題点

  • Spark や Databricks では、大量データを複数のパーティションに分散して並列処理を行います。
  • しかし、あるキー(たとえば user_idcustomer_id など)にデータが極端に集中していると、そのキーを扱うパーティションだけ負荷が偏ります。
  • この偏りが「データスキュー」です。
  • スキューが起きると、その重いパーティションを処理するタスクだけが遅くなり、全体のジョブ完了時間を引き延ばす、あるいはメモリ不足で処理が失敗することもあります。
    • Spark UI で「タスク間の処理時間のばらつき」「スパークステージにおける一部タスクの極端な遅さ」が見られることが典型兆候です。
  • 特に、ジョイン集約 (groupBy / reduce) など、データをシャッフル再配置する操作で顕著に表れます。

つまり、スキュー = 分散処理が “負荷バランスを失った状態” と捉えるとわかりやすいです。

スキューを緩和する代表的な手法

以下の 3 つと、補足として Broadcast(ブロードキャスト)について整理します。

1. Salting(ソルト処理・プレフィックス付与)

概要
スキューを引き起こしているキーにランダムな「塩 (salt)」を付与し、元のキーを複数バリエーションに分散させて別のパーティションへ散らす方法です。 saturncloud.io+2Medium+2

実装の流れ(例)

  1. スキューキー(例:user_id = “power_user”)を特定
  2. そのキーの値に対して、乱数を付与して user_id_0, user_id_1 … のように複数に振り分け
  3. 対応する結合側データ(プロファイル側など)にも同じルールで salt を付与
  4. ジョインを行い、結合後に salt を取り除く/集約し直す

注意点・限界

  • salt を付与するバリエーション数(塩の数)が少なすぎると効果が薄い
  • salt を付与しすぎるとオーバーヘッドになる
  • Spark のデフォルトハッシュ方式と salt が衝突するケースもあり、必ずしも万能とは言えない Reddit+1
  • salt 処理/逆変換処理が必要なので実装がやや煩雑

Salting は「ホットキーを意図的に分散させる」アプローチの典型例です。

2. 再パーティション (Repartition)

概要
repartition()repartitionByRange() などを使って、パーティション数を増やしたり、分散方法(ソート範囲など)を変えたりして、データの偏りを緩和する手法です。

利点

  • パーティション数を増やすことで、処理をより多くのタスクに分散できる
  • 並列度を上げられる

制約・限界

  • 単にパーティション数を増やすだけでは、スキューキー自体の偏りは解消されない
  • 再パーティション自体もシャッフルを伴うためコストがかかる
  • 適切なパーティション数設定(CPU数、データ量)を見極める必要がある

Spark/Databricks では、/*+ REPARTITION */ のようなヒント句も使えます。

3. Skewed Keys の分離処理(ホットキー分割・個別処理)

概要
スキューを引き起こす “頻出キー(ホットキー)” をあらかじめ抽出し、それらを別ジョブや別処理パスで扱う方法です。

実装例

  • スキューキーを抽出(例:user_id = 12345 が極端に多い)
  • そのキーに該当するレコードを別の DataFrame / ジョブに切り出す
  • 残りのデータを通常ルートで処理
  • 最後に両者をマージ/結合結果を統合

メリット

  • ホットキーがジョイン処理全体に与える影響を最小化できる
  • ホットキー処理に特化したリソース配分や最適化が可能

注意点

  • 分離処理のロジック設計が必要
  • キー判定基準が変動するデータでは維持コストがかかる

ブロードキャスト (Broadcast) について

概要
小さいテーブル(ルックアップテーブルなど)をすべての executor ノードに配布し、シャッフル(ネットワーク通信)なしに結合を行う方式です。

仕組み

  • 結合側の片方をすべてのノードにキャッシュ的に配布
  • 各 executor はローカルにその結合先テーブルを持つため、結合時にデータ移動を最小化できる

使うべき場面

  • 片方のテーブルが非常に小さい(例:マスタ参照テーブル、コードマスタなど)
  • 大規模テーブルとの結合で shuffle を回避したいとき

制約・注意点

  • ブロードキャスト先のテーブルが大きすぎると、各 executor のメモリを圧迫
  • ブロードキャストしてもスキューキー自体の偏りは解消しない
  • スキューキーをブロードキャストしても、重い処理は避けられない

実際、問題文で「スキューキーをブロードキャストする」は不適切な対策とされています。

まとめ:対策の選択と使い分け

手法用途 / 強み制約・注意点
Salting偏ったキーを複数に分散、スキュー軽減Salt設計・逆変換が必要、衝突リスク
Repartition並列性を上げる/細分化偏りそのものは解消しない、追加シャッフル
スキューキー分離処理ホットキーを隔離して影響を抑えるロジック設計・動的キー変動に注意
Broadcast小さい参照テーブルを結合時に配布して shuffle 避け大きすぎるとメモリ圧迫、スキューは解消不可

たとえば、クリックストリーム + ユーザープロファイル結合という文脈では:

  • プロファイルテーブルが非常に小さいなら Broadcast Join を使って結合を避けることもできる
  • ただし今回の問題では、「ブロードキャストをスキューキーに対して使う」は誤り
  • よって、Salting や再パーティション、スキューキーの分離処理 を組み合わせて使うのが現実的な対策

Spark/Databricks では Adaptive Query Execution(AQE) がスキューの自動補正を支援する機能もありますが、完全ではなく手動対策を併用すべきです。

最新情報をチェックしよう!