ストリーミング テーブル

ストリーミング テーブルは、ストリーミングまたは増分データ処理の追加サポートを備えた Delta テーブルです。 ストリーミング テーブルは、パイプライン内の 1 つ以上のフローの対象にすることができます。

ストリーミング テーブルは、次の理由からデータ インジェストに適しています。

  • 各入力行は 1 回だけ処理されます。これは、インジェスト ワークロードの大部分をモデル化します (つまり、行をテーブルに追加またはアップサートすることによって)。
  • 追記可能な大量のデータを処理できます。

また、ストリーミング テーブルは、行や期間を超えて推論したり、大量のデータを処理したり、待ち時間の短い処理を提供したりできるため、待機時間の短いストリーミング変換にも適しています。

次の図は、フローがストリーミング ソースから読み取り、パイプライン内のストリーミング テーブルに増分的に書き込む方法を示しています。

ストリーミング テーブルを含むパイプラインに新しいデータを読み取る個々のフローによって接続された S3、Kafka、および Pub/Sub ストリーミング ソースを示す図。

更新のたびに、ストリーミング テーブルに関連付けられているフローは、ストリーミング ソース内の変更された情報を読み取り、そのテーブルに新しい情報を追加します。

ストリーミング テーブルは、1 つのパイプラインによって所有および更新されます。 パイプラインのソース コードでストリーミング テーブルを明示的に定義します。 パイプラインによって定義されたテーブルは、他のパイプラインでは変更または更新できません。 1 つのストリーミング テーブルに追加する複数のフローを定義できます。

Azure Databricksは、ストリーミング テーブル処理をサポートする内部テーブルを作成します。 これらのテーブルは system.information_schema.tables に表示されますが、カタログ エクスプローラーやその他のワークスペース UI ページには表示されません。

Databricks SQL を使用してパイプラインの外部にストリーミング テーブルを作成する場合、Azure Databricks はテーブルを更新するために使用されるパイプラインを作成します。 ワークスペースの左側のナビゲーションから [ジョブ] と [パイプライン ] を選択すると、パイプラインを表示できます。 [ パイプラインの種類 ] 列をビューに追加できます。 パイプラインで定義されているストリーミング テーブルには、 ETLの種類があります。 Databricks SQL で作成されたストリーミング テーブルには、 MV/STの種類があります。

フローの詳細については、「 Lakeflow Spark 宣言型パイプライン フローを使用したデータの増分読み込みと処理」を参照してください。

インジェスト用のストリーミング テーブル

ストリーミング テーブルは、追加専用のデータ ソース用に設計され、入力を 1 回だけ処理します。 これにより、データが継続的に到着するインジェスト ワークロードに適しており、既存のレコードを再処理することなく確実にキャプチャする必要があります。 Azure Databricksでは、クラウド ストレージとストリーミング メッセージ バスからのデータの取り込みがサポートされています。

クラウド ストレージからファイルを取り込む

ストリーミング テーブルを使用して、クラウド ストレージから新しいファイルを取り込むことができます。 これらの例では、自動ローダーを使用して、新しいファイルが到着すると増分処理されます。

Python

from pyspark import pipelines as dp

# Create a streaming table
@dp.table
def customers_bronze():
  return (
    spark.readStream.format("cloudFiles")
     .option("cloudFiles.format", "json")
     .option("cloudFiles.inferColumnTypes", "true")
     .load("/Volumes/path/to/files")
  )

ストリーミング テーブルを作成するには、データ セット定義がストリーム型である必要があります。 データ セット定義で spark.readStream 関数を使用すると、ストリーミング データセットが返されます。

SQL

-- Create a streaming table
CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM STREAM read_files(
  "/volumes/path/to/files",
  format => "json"
);

ストリーミング テーブルにはストリーミング データセットが必要です。 STREAM前の read_files キーワードは、データ セットをストリームとして扱うようにクエリに指示します。

ストリーミング メッセージを取り込む

ストリーミング テーブルを使用して、メッセージ バスからデータを取り込むこともできます。 次の例では、Pub/Sub トピックから読み取るストリーミング テーブルを作成する方法を示します。

Python

@dp.table
def pubsub_raw():
  auth_options = {
    "clientId": client_id,
    "clientEmail": client_email,
    "privateKey": private_key,
    "privateKeyId": private_key_id
  }
  return (
    spark.readStream
      .format("pubsub")
      .option("subscriptionId", "my-subscription")
      .option("topicId", "my-topic")
      .option("projectId", "my-project")
      .options(auth_options)
      .load()
  )

SQL

CREATE OR REFRESH STREAMING TABLE pubsub_raw
AS SELECT * FROM STREAM read_pubsub(
  subscriptionId => 'my-subscription',
  projectId => 'my-project',
  topicId => 'my-topic',
  clientEmail => secret('pubsub-scope', 'clientEmail'),
  clientId => secret('pubsub-scope', 'clientId'),
  privateKeyId => secret('pubsub-scope', 'privateKeyId'),
  privateKey => secret('pubsub-scope', 'privateKey')
);

Databricks では、承認オプションを提供するときにシークレットを使用することをお勧めします。 すべての認証オプションについては、 Pub/Sub へのアクセスの構成 を参照してください。

ストリーミング テーブルへのデータの読み込みの詳細については、「 パイプラインでのデータの読み込み」を参照してください。

次の図は、追加専用ストリーミング テーブルのしくみを示しています。

追加専用ストリーミング テーブルのしくみを示す図

ストリーミング テーブルに既に追加されている行は、パイプラインに対する後の更新では再クエリされません。 クエリを変更する場合 (たとえば、 SELECT LOWER (name) から SELECT UPPER (name))、既存の行は大文字に更新されませんが、新しい行は大文字になります。 完全更新をトリガーして、ソース テーブルから以前のすべてのデータを再クエリして、ストリーミング テーブル内のすべての行を更新できます。

ストリーミング テーブルと低遅延ストリーミング

ストリーミング テーブルは、境界付き状態での待機時間の短いストリーミング用に設計されています。 ストリーミング テーブルではチェックポイント管理が使用されるため、待機時間の短いストリーミングに適しています。 ただし、そのために、ストリームが自然に区切られていること、またはウォーターマークによって区切られていることが想定されています。

自然に境界付けられたストリームは、明確に定義された開始と終了を持つストリーミング データ ソースによって生成されます。 自然境界ストリームの例として、ファイルの最初のバッチが配置された後に新しいファイルが追加されないファイルのディレクトリからデータを読み取る方法があります。 ストリームは、ファイルの数が有限であり、すべてのファイルが処理された後にストリームが終了するため、境界付けされたと見なされます。

ウォーターマークを使用してストリームをバインドすることもできます。 構造化ストリーミングにおけるウォーターマークは、遅延データを処理する際に、システムが遅れたイベントを待機する期間を指定することで時間枠を完了と見なすメカニズムです。 ウォーターマークがない無制限のストリームは、メモリ圧力によりパイプラインが失敗する可能性があります。

ステートフル ストリーム処理の詳細については、「 ウォーターマークを使用したステートフル処理の最適化」を参照してください。

ストリーム スナップショット結合

ストリーム スナップショット結合は、ストリーミング データセットを、ストリームの開始時にスナップショットが作成されたディメンション テーブルに接続します。 ディメンション テーブルはその時点で固定として扱われるため、ストリームの開始後に行われた変更は結合に反映されません。 これは、小さな不一致が重要でない場合 (たとえば、トランザクションの数が顧客の数よりも桁違いに大きい場合など) に許容されます。

次のコード サンプルでは、 customers と呼ばれる 2 つの行を持つディメンション テーブルを、増え続けるデータ セット ( transactions) と結合します。 sales_reportと呼ばれるテーブル内のこれら 2 つのデータセット間の結合が具体化されます。 外部プロセスが新しい行 (customer_id=3, name=Zoya) を追加して顧客テーブルを更新した場合、静的ディメンション テーブルはストリームの開始時にスナップショット化されたため、この新しい行は結合に存在しません。

from pyspark import pipelines as dp

@dp.temporary_view
# assume this table contains an append-only stream of rows about transactions
# (customer_id=1, value=100)
# (customer_id=2, value=150)
# (customer_id=3, value=299)
# ... <and so on> ...
def v_transactions():
  return spark.readStream.table("transactions")

# assume this table contains only these two rows about customers
# (customer_id=1, name=Bilal)
# (customer_id=2, name=Olga)
@dp.temporary_view
def v_customers():
  return spark.read.table("customers")

@dp.table
def sales_report():
  facts = spark.readStream.table("v_transactions")
  dims = spark.read.table("v_customers")

  return facts.join(dims, on="customer_id", how="inner")

ストリーミング テーブルの制限事項

ストリーミング テーブルには、次の制限があります。

  • 限られた進化: データ セット全体を再計算することなく、クエリを変更できます。 完全な更新を行わないと、ストリーミング テーブルでは各行が 1 回しか表示されないため、異なるクエリで異なる行が処理されます。 たとえば、クエリのフィールドに UPPER() を追加した場合、変更後に処理された行のみが大文字になります。 つまり、データ セットで実行されているすべての以前のバージョンのクエリに注意する必要があります。 変更前に処理された既存の行を再処理するには、完全な更新が必要です。
  • 状態管理: ストリーミング テーブルは待機時間が短く、自然に境界付けられたり、透かしで囲まれたりするストリームが必要です。 詳細については、「 ウォーターマークを使用したステートフル処理の最適化」を参照してください。
  • 結合は再計算されません。 ストリーミング テーブルの結合は、ディメンションが変更されたときに再計算されません。 この特性は、"高速だが間違った" シナリオに適している可能性があります。 ビューを常に正しくする場合は、具体化されたビューを使用できます。 具体化されたビューは、ディメンションが変更されたときに自動的に結合を再計算するため、常に正しいです。 詳細については、「 具体化されたビュー」を参照してください。