Watermerken toepassen om drempelwaarden voor gegevensverwerking te beheren

Op deze pagina worden concepten voor watermerken beschreven en worden aanbevelingen weergegeven voor het gebruik van watermerken in algemene stateful streamingbewerkingen.

Streamingquery's verzamelen statusgegevens in de loop van de tijd. Watermerken verwijderen automatisch oude statusgegevens om geheugenfouten en verhoogde verwerkingslatentie te voorkomen.

Wat is een watermerk?

Tijdens de verwerking behoudt Structured Streaming de status tussen microbatches. Streamingquery's gebruiken de status om de resultaten incrementeel bij te werken in plaats van alles na elke microbatch opnieuw te compileren. Watermerken bepalen de drempelwaarde voor wanneer een query stopt met het verwerken van een statusentiteit.

Algemene voorbeelden van overheidsentiteiten zijn:

  • Aggregaties in een tijdvenster.
  • Unieke sleutels in een join tussen twee streams.

Als u een watermerk wilt declareren voor een streaming DataFrame, geeft u een tijdstempelveld en een drempelwaarde voor late tijd op. Wanneer er nieuwe gegevens binnenkomen, houdt de statusbeheerder de meest recente tijdstempel in het opgegeven veld bij en verwerkt deze alleen records binnen de drempelwaarde voor late tijd.

Query's verwerken altijd records die binnen de drempelwaarde binnenkomen. Query’s kunnen records die buiten de drempelwaarde binnenkomen mogelijk nog steeds verwerken, maar dit is niet gegarandeerd.

In het volgende voorbeeld wordt een drempelwaarde van 10 minuten toegepast op een venstertelling:

Python

from pyspark.sql.functions import window

(df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window("event_time", "5 minutes"),
    "id")
  .count()
)

Scala

import org.apache.spark.sql.functions.window

df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window($"event_time", "5 minutes"),
    $"id")
  .count()

In dit voorbeeld:

  • De kolom event_time wordt gebruikt om een watermerk van 10 minuten en een tumblingvenster van 5 minuten te definiëren.
  • Er wordt voor elke waargenomen id een telling verzameld voor elk niet-overlappend venster van 5 minuten.
  • Statusinformatie wordt voor elke telling bijgehouden totdat het einde van het venster meer dan 10 minuten vóór de laatst waargenomen event_time ligt.

Belangrijk

Verwijs in een groupBy() en window() bewerking naar kolommen op naam "<colName>" of col("<colName>")om ervoor te zorgen dat de gebeurtenistijdmarkering behouden blijft. In Scala kunt u ook gebruiken $colName.

Hoe beïnvloeden watermerken de verwerkingstijd en doorvoer?

Uitvoermodi bepalen wanneer een query met watermerken gegevens naar de sink schrijft. Watermerken zijn essentieel voor doorvoerbeheer in stateful streaming, omdat ze de totale hoeveelheid statusinformatie in het geheugen verminderen. Niet alle uitvoermodi worden ondersteund voor alle stateful bewerkingen. Zie de watermerken en uitvoermodus voor aggregaties in vensters.

Het selecteren van een watermerkduur heeft compromissen:

  • Kortere watermerken verlagen de querylatentie omdat query's minder statusinformatie opslaan en resultaten schrijven nadat elke watermerkduur is voltooid. Korte watermerken hebben echter een lage tolerantie voor late gegevens.
  • Langere watermerken hebben een hoge tolerantie voor late gegevens. Lange watermerken verhogen echter de querylatentie omdat query's meer statusinformatie moeten opslaan en moeten wachten om resultaten te schrijven na een langere watermerkduur.

Watermerken en uitvoerinstelling voor vensteraggregaties

In de volgende tabel ziet u het verwerkingsgedrag voor query's met aggregatie op een tijdstempel en een watermerk:

Uitvoermodus Gedrag
Toevoegen De query schrijft rijen naar de doeltabel nadat de drempelwaarde voor het watermerk is verstreken. Alle schrijfopdrachten worden vertraagd op basis van de achterstandsdrempel. De oude aggregatiestatus wordt verbroken nadat de drempelwaarde is verstreken.
Bijwerken De query schrijft rijen naar de doeltabel terwijl de resultaten worden berekend en de query kan rijen bijwerken en overschrijven wanneer nieuwe gegevens binnenkomen. De oude aggregatiestatus wordt verbroken nadat de drempelwaarde is verstreken.
Voltooid De aggregatietoestand gaat niet verloren. De query herschrijft de doeltabel voor elke trigger.

Watermerken en uitvoermodi voor joins tussen streams

Samenvoegingen van meerdere streams ondersteunen alleen de toevoegmodus. Query's schrijven per batch de overeenkomende records weg.

Voor inner joins raadt Databricks u aan een grenswaarde in te stellen voor elke streaminggegevensbron, zodat de query statusinformatie voor oude records kan negeren. Zonder watermerken probeert Structured Streaming elke sleutel van beide zijden van de join toe te voegen aan elke trigger, wat van invloed kan zijn op de prestaties.

Voor outer joins is het gebruik van watermarking verplicht. Wanneer een record niet overeenkomt, schrijft de query een null-waarde voor die sleutel. Omdat joins alleen de append-modus ondersteunen, worden records zonder match pas weggeschreven nadat de laatheidsdrempel is verstreken.

De drempelwaarde voor late gegevens beheren met een beleid voor meerdere watermerken

Voor meerdere structured streaming-invoer kunt u meerdere watermerken instellen om tolerantiedrempels voor late gegevens te beheren. Met watermerken kunt u statusinformatie en latentie beheren.

Een streamingquery kan meerdere invoerstromen hebben die worden verenigd of samengevoegd. Voor stateful bewerkingen vereist elk van de invoerstromen mogelijk een andere drempelwaarde voor late gegevenstolerantie. Geef deze drempelwaarden op met withWatermark("eventTime", delay) voor elke invoerstroom. Hier volgt een voorbeeldquery met stream-stream joins.

Python

input_stream1 = ...      # delays up to 1 hour
input_stream2 = ...      # delays up to 2 hours

(input_stream1.withWatermark("eventTime1", "1 hour")
  .join(
    input_stream2.withWatermark("eventTime2", "2 hours"),
    joinCondition)
)

Scala

val inputStream1 = ...      // delays up to 1 hour
val inputStream2 = ...      // delays up to 2 hours

inputStream1.withWatermark("eventTime1", "1 hour")
  .join(
    inputStream2.withWatermark("eventTime2", "2 hours"),
    joinCondition)

Tijdens het uitvoeren van de query met stateful bewerkingen houdt Structured Streaming de maximale gebeurtenistijd voor elke invoerstroom bij, berekent watermerken op basis van de bijbehorende vertraging en bepaalt één globaal watermerk. Structured Streaming maakt standaard gebruik van het minimum als het algemene watermerk. Als een stroom achterloopt op de andere stromen, voorkomt een minimaal globaal watermerk dat de query gegevens per ongeluk als laat markeert. Dit kan bijvoorbeeld gebeuren wanneer een van de streams geen gegevens meer ontvangt vanwege stroomopwaartse fouten. Het globale watermerk beweegt zich veilig voort in het tempo van de langzaamste stroom en vertraagt zo nodig de uitvoer van de query.

Als u de latentie wilt verminderen, stelt u in op spark.sql.streaming.multipleWatermarkPolicymax (standaard) minom het watermerk van de snelste stroom te gebruiken als globaal watermerk. Deze configuratie verwijdert echter gegevens uit de langzaamste stromen. Databricks raadt u aan deze configuratie met voorzichtigheid toe te passen.

Watermerken toepassen op afzonderlijke bewerkingen

Met de distinct bewerking wordt elke unieke record in de status bijgehouden. Zonder watermerk groeit de status voor onbepaalde tijd en kan dit geheugenproblemen veroorzaken. Geef een watermark op voor een tijdstempelveld om de toestand te begrenzen en oude records te verwijderen zodra de drempel is overschreden.

In het volgende voorbeeld wordt een watermerk toegepast op een distinct bewerking:

Python

streamingDf = spark.readStream. ...  # columns: eventTime, id, value, ...

# Apply watermark before distinct operation
(streamingDf
  .withWatermark("eventTime", "1 hour")
  .distinct()
)

Scala

val streamingDf = spark.readStream. ...  // columns: eventTime, id, value, ...

// Apply watermark before distinct operation
streamingDf
  .withWatermark("eventTime", "1 hour")
  .distinct()

In dit voorbeeld verwijdert de streamingquery dubbele records die binnen 1 uur na de meest recent waargenomen eventTime binnenkomen. De query verwijdert statusinformatie voor ontdubbeling nadat de drempelwaarde is verstreken.

Belangrijk

Als u specifieke kolommen wilt ontdubbelen in plaats van alle kolommen, gebruikt dropDuplicates() u of dropDuplicatesWithinWatermark() in plaats van distinct. Zie Dubbele waarden in watermerk verwijderen.

dubbele waarden binnen het watermerk verwijderen

In Databricks Runtime 13.3 LTS of hoger kunt u een unieke id gebruiken om records binnen een grensdrempel te ontdubbelen.

Structured Streaming garandeert exactly-once-verwerking, maar verwijdert geen duplicaten uit records van gegevensbronnen. Gebruik dropDuplicatesWithinWatermark om duplicaten op elk veld te verwijderen, zelfs wanneer velden verschillen tussen dubbele records, zoals gebeurtenistijd of aankomsttijd.

Met dropDuplicatesWithinWatermark ontdubbelen query’s altijd records die binnen de watermarkdrempel binnenkomen. Query's kunnen ook dubbele records verwijderen die buiten de drempel binnenkomen, maar dat is niet gegarandeerd. Als u wilt garanderen dat query's alle duplicaten verwijderen, stelt u de drempelwaarde voor het watermerk in op groter dan het maximale tijdstempelverschil tussen dubbele gebeurtenissen.

U moet een watermerk opgeven om de dropDuplicatesWithinWatermark methode te gebruiken:

Python

streamingDf = spark.readStream. ...

# deduplicate using guid column with watermark based on eventTime column
(streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark(["guid"])
)

Scala

val streamingDf = spark.readStream. ...  // columns: guid, eventTime, ...

// deduplicate using guid column with watermark based on eventTime column
streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark(Seq("guid"))

Voorbeelden van toepassingen

In de volgende voorbeelden ziet u geavanceerde gebruiksvoorbeelden voor vensters:

Gebruik de tuimelvensters om de totale omzet per uur te berekenen

Tumbling windows hebben een vaste grootte en overlappen elkaar niet. Elke invoerrij behoort tot precies één venster. Gebruik tumbling windows om afzonderlijke aggregaties per tijdsinterval te berekenen, zoals verkooptotalen per uur:

Python

from pyspark.sql.functions import window, sum

hourly_sales = (orders
  .withWatermark("timestamp", "1 hour")
  .groupBy(window("timestamp", "1 hour"))
  .agg(sum("amount").alias("total_sales"))
)

Scala

import org.apache.spark.sql.functions.{window, sum}

val hourlySales = orders
  .withWatermark("timestamp", "1 hour")
  .groupBy(window($"timestamp", "1 hour"))
  .agg(sum($"amount").alias("total_sales"))

In dit voorbeeld:

  • window("timestamp", "1 hour") groeperen orders in niet-overlappende intervallen van 1 uur, zoals 5 tot 6:00 uur en 6 tot 7:00 uur.
  • withWatermark("timestamp", "1 hour") behoudt het aggregaat van elk venster in de status totdat het eindtijdstempel van het venster 1 uur ouder is dan de maximale ordertimestamp.

Gebruik glijdende vensters om voortschrijdende aggregaties te berekenen

Schuifvensters hebben een vaste grootte met intervallen die elkaar kunnen overlappen. Eén rij kan tot meerdere vensters behoren. Gebruik schuifvensters om voortschrijdende aggregaties te berekenen, zoals verkopen over een voortschrijdende periode van 6 uur:

Python

from pyspark.sql.functions import window, sum

rolling_sales = (orders
  .withWatermark("timestamp", "1 hour")
  .groupBy(window("timestamp", "6 hours", slideDuration="1 hour"))
  .agg(sum("amount").alias("total_sales"))
)

Scala

import org.apache.spark.sql.functions.{window, sum}

val rollingSales = orders
  .withWatermark("timestamp", "1 hour")
  .groupBy(window($"timestamp", "6 hours", "1 hour"))
  .agg(sum($"amount").alias("total_sales"))

In dit voorbeeld:

  • window("timestamp", "6 hours", slideDuration="1 hour") groeperen orders in intervallen van 6 uur die met 1 uur vooruitgaan, bijvoorbeeld 5 tot 11:00 uur en 6:00 tot 12:00 uur.
  • withWatermark("timestamp", "1 hour") behoudt de aggregatiewaarde van elk venster in de status totdat de eindtijdstempel van het venster 1 uur ouder is dan de maximale ordertijdstempel.
  • slideDuration moet kleiner zijn dan of gelijk zijn aan de windowDuration.

Sessievensters gebruiken om de gebruikersactiviteit te controleren

Sessievensters hebben geen vaste grootte. Er wordt een venster geopend wanneer een rij binnenkomt en sluit na een tussenpoos waarin geen nieuwe rijen binnenkomen. Gebruik sessievensters om activiteitspieken te aggregeren tussen lange niet-actieve perioden, zoals de paginaweergaven van een gebruiker binnen een periode van 30 minuten:

Python

from pyspark.sql.functions import session_window, sum

sessionized_page_views = (activity
  .withWatermark("timestamp", "1 hour")
  .groupBy("user_id", session_window("timestamp", gapDuration="30 minutes"))
  .agg(sum("page_views").alias("total_page_views"))
)

Scala

import org.apache.spark.sql.functions.{session_window, sum}

val sessionizedPageViews = activity
  .withWatermark("timestamp", "1 hour")
  .groupBy($"user_id", session_window($"timestamp", "30 minutes"))
  .agg(sum($"page_views").alias("total_page_views"))

In dit voorbeeld:

  • session_window("timestamp", gapDuration="30 minutes") opent een venster wanneer de eerste paginaweergave binnenkomt. Elke volgende paginaweergave die binnen 30 minuten binnenkomt, breidt het venster uit. Wanneer er binnen 30 minuten geen paginaweergave binnenkomt, wordt het venster gesloten en start de volgende paginaweergave een nieuw venster.
  • withWatermark("timestamp", "1 hour") behoudt de statistische status van elke sessie totdat het tijdstempel voor het einde van het venster 1 uur ouder is dan de maximale tijdstempel van de paginaweergave.
  • Het timeColumn argument voor window() en session_window() moet van TimestampType of TimestampNTZTypezijn.
  • Gebruik current_timestamp() dit om vensters te definiëren op basis van verwerkingstijd in plaats van gebeurtenistijd.
  • U kunt vensterduur instellen van microseconden tot dagen. Duren van een maand of langer worden niet ondersteund.
  • Gebruik complete de uitvoermodus met vensteraggregaties om alle vensterstatus voor onbepaalde tijd te behouden. Gebruik append de uitvoermodus met een geschikt watermerk om de statusgroei te binden en geheugenproblemen voor grote gegevenssets te voorkomen. Zie Watermerken en uitvoermodus voor vensteraggregaties voor meer informatie over het gedrag van de uitvoermodus.