組み込みのコネクタを使用して、Google Pub/Sub をサブスクライブします。 このコネクタは、サブスクライバーからのレコードに対して厳密に1回だけ実行される処理セマンティクスを提供します。
注
Pub/Sub では、重複するレコードが発行されたり、レコードがサブスクライバーに順不同で到着したりする可能性があります。 重複するレコードと順序が逸脱したレコードを処理するコードを記述します。
Pub/Sub ストリームを構成する
次のコード例は、Pub/Sub から読み取られた構造化ストリーミングを構成するための基本的な構文を示しています。
Python
auth_options = {
"clientId": client_id,
"clientEmail": client_email,
"privateKey": private_key,
"privateKeyId": private_key_id
}
query = (spark.readStream
.format("pubsub")
.option("subscriptionId", "mysub")
.option("topicId", "mytopic")
.option("projectId", "myproject")
.options(auth_options)
.load()
)
SQL
CREATE OR REFRESH STREAMING TABLE pubsub_raw
AS SELECT * FROM STREAM read_pubsub(
subscriptionId => 'mysub',
projectId => 'myproject',
topicId => 'mytopic',
clientEmail => secret('pubsub-scope', 'clientEmail'),
clientId => secret('pubsub-scope', 'clientId'),
privateKeyId => secret('pubsub-scope', 'privateKeyId'),
privateKey => secret('pubsub-scope', 'privateKey')
);
Scala
val authOptions: Map[String, String] =
Map("clientId" -> clientId,
"clientEmail" -> clientEmail,
"privateKey" -> privateKey,
"privateKeyId" -> privateKeyId)
val query = spark.readStream
.format("pubsub")
// Creates a Pub/Sub subscription if one does not already exist with this ID
.option("subscriptionId", "mysub")
.option("topicId", "mytopic")
.option("projectId", "myproject")
.options(authOptions)
.load()
その他の構成オプションについては、「Pub/Sub ストリーミング読み取りのオプションを構成する」を参照してください。
Pub/Sub へのアクセスを構成する
構成する資格情報には、次の役割を持つ必要があります。
| 役割 | 必須または省略可能 | ロールの使用方法 |
|---|---|---|
roles/pubsub.viewer または roles/viewer |
必須 | サブスクリプションが存在するかどうかを確認し、サブスクリプションを取得します。 |
roles/pubsub.subscriber |
必須 | サブスクリプションからデータを取得します。 |
roles/pubsub.editor または roles/editor |
省略可能 | サブスクリプションが存在しない場合はサブスクリプションの作成を有効にし、 deleteSubscriptionOnStreamStop を使用してストリーム終了時にサブスクリプションを削除できるようにします。 |
Databricks では、承認オプションを提供するときにシークレットを使用することをお勧めします。 接続を承認するには、次のオプションが必要です:
clientEmailclientIdprivateKeyprivateKeyId
Pub/Sub スキーマについて
ストリームのスキーマは、以下の表に示す通り、Pub/Sub からフェッチされたレコードと一致します。
| フィールド | タイプ |
|---|---|
messageId |
StringType |
payload |
ArrayType[ByteType] |
attributes |
StringType |
publishTimestampInMillis |
LongType |
Pub/Sub ストリーミング読み取りのオプションを構成する
次のテーブルで、Pub/Sub でサポートされているその他のオプションについて説明します。 すべてのオプションは、.option("<optionName>", "<optionValue>") 構文を使用して構造化ストリーミング読み取りの一部として構成されます。
注
一部の Pub/Sub 構成オプションでは、マイクロバッチ ではなく、フェッチ 概念を使用します。 これは内部実装の詳細を反映しており、オプションは他のStructured Streamingコネクタの補題と同様に機能しますが、レコードがフェッチされてから処理されるという違いがあります。
| オプション | 既定値 | 説明 |
|---|---|---|
numFetchPartitions |
ストリーム初期化時に存在する Executor の数の半分に設定します。 | サブスクリプションからレコードをフェッチする並列 Spark タスクの数。 |
deleteSubscriptionOnStreamStop |
false |
true の場合、ストリームに渡されたサブスクリプションは、ストリーミング ジョブの終了時に削除されます。 |
maxBytesPerTrigger |
none |
トリガーされる各マイクロバッチの間に処理されるバッチ サイズのソフト制限。 |
maxRecordsPerFetch |
1000 |
レコードを処理する前にタスクごとにフェッチするレコードの数。 |
maxFetchPeriod |
10s |
レコードを処理する前に取得する各タスクの時間。 期間文字列を受け入れます。たとえば、1 秒間は 1s 、1 分間は 1m 。 Databricks では、既定値を使用することが推奨されています。 |
Pub/Sub で増分バッチ処理を使用する
Trigger.AvailableNowを使用して、Pub/Sub ソースの使用可能なレコードを増分バッチとして使用できます。
Azure Databricks では、Trigger.AvailableNow 設定で読み取りを開始したときにタイムスタンプが記録されます。 バッチによって処理されるレコードには、以前にフェッチされたすべてのデータと、記録されたストリーム開始タイムスタンプより小さいタイムスタンプを持つ新しく発行されたレコードが含まれます。 詳細については、「 AvailableNow: 増分バッチ処理」を参照してください。
Pub/Sub ストリーミング メトリックを監視する
構造化ストリーミングの進行状況メトリックは、フェッチされて処理できるレコードの数、フェッチされて処理できる状態のレコードのサイズ、およびストリームの開始後に表示された重複の数を報告します。 これらのメトリックの例を次に示します:
"metrics" : {
"numDuplicatesSinceStreamStart" : "1",
"numRecordsReadyToProcess" : "1",
"sizeOfRecordsReadyToProcess" : "8"
}
制限事項
Pub/Sub では、投機的実行 (spark.speculation) はサポートされていません。