Selektives Überschreiben von Daten mit Delta Lake

Delta Lake verfügt über die folgenden unterschiedlichen Optionen für selektive Überschreibungen:

Auswahl Anwendungsfall Unterstützte Computetypen Mindestversion
REPLACE WHERE Zeilen, die einem Prädikat entsprechen, atomar überschreiben. Wird für Ersetzungen mit einer festen Übereinstimmungsbedingung verwendet, wie colA = 5 oder int_col IN (1, 2, 3). Alle Computetypen. SQL in Databricks Runtime 12.2 LTS und höher. Python und Scala in Databricks Runtime 9.1 LTS und höher.
REPLACE USING Dynamische Daten überschreiben. Ersetzt alle Zeilen, die den angegebenen Spalten entsprechen, basierend auf dem Gleichheitsvergleich der Spaltenwerte im bereitgestellten Dataset. Alle Computetypen. SQL in Databricks Runtime 16.3 und höher. Python und Scala in Databricks Runtime 18.2 und höher.
REPLACE ON Dynamisches Überschreiben von Daten durch booleschen Ausdruck. Wird für Ersetzungen mit einer komplexen oder NULL-sicheren Abgleichsbedingung verwendet, wie z. B. s.colA <=> t.colA AND s.colB <=> t.colB. Alle Computetypen. SQL in Databricks Runtime 17.1 und höher. Python und Scala in Databricks Runtime 18.2 und höher.
partitionOverwriteMode Veraltetes dynamisches Überschreiben von Partitionen, das alle vorhandenen Daten in jeder Partition überschreibt, in die der Schreibvorgang neue Daten schreibt. Für neue Workloads nicht empfohlen. SQL unterstützt nur klassische Rechenleistung. Python und Scala unterstützt alle Computetypen. SQL, Python und Scala in Databricks Runtime 11.3 LTS und höher.

Für die meisten Anwendungsfälle empfiehlt Databricks die Verwendung REPLACE USING oder REPLACE WHERE. Verwenden Sie REPLACE ON nur, wenn ihr Anwendungsfall komplexe oder NULL-sichere Abgleichsbedingungen erfordert.

Ausführliche Informationen zum Ersatzverhalten der einzelnen Optionen finden Sie unter INSERT. Eine vollständige Liste der DataFrameWriter Delta Lake-Optionen finden Sie unter Delta Lake und Apache Iceberg.

In Scala und Python können replaceOn und replaceUsing nicht in Kombination mit replaceWhere, partitionOverwriteMode oder overwriteSchema verwendet werden.

Bei leeren Quellabfragen löschen sowohl REPLACE USING als auch REPLACE ON keine Daten, REPLACE WHERE könnte jedoch Daten löschen.

Important

Wenn Daten versehentlich überschrieben wurden, können Sie die Änderung mithilfe der Wiederherstellung rückgängig machen.

REPLACE WHERE

Sie können selektiv nur die Daten überschreiben, die einem beliebigen Ausdruck entsprechen.REPLACE WHERE

Important

Um bei der Ausführung von REPLACE WHERE von der inkrementellen Aktualisierung zu profitieren, verwenden Sie REPLACE-Abläufe WHERE in Spark Declarative Pipelines (SDP). Siehe Batchverarbeitung mit REPLACE-FlüssenWHERE.

Um Ereignisse im Januar in der Zieltabelle, die nach start_date partitioniert ist, atomisch durch die Daten in replace_data zu ersetzen:

Python

(replace_data.write
  .mode("overwrite")
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
  .saveAsTable("events")
)

Scala

replace_data.write
  .mode("overwrite")
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
  .saveAsTable("events")

SQL

INSERT INTO TABLE events REPLACE WHERE start_date >= '2017-01-01' AND end_date <= '2017-01-31' SELECT * FROM replace_data

Dieser Beispielcode schreibt die Daten aus replace_data, überprüft, ob alle Zeilen mit dem Prädikat übereinstimmen, und führt mithilfe der overwrite-Semantik eine atomische Ersetzung durch. Wenn Werte im Vorgang außerhalb des Prädikats liegen, schlägt dieser Vorgang standardmäßig mit einem Fehler fehl.

Wenn Sie bei Classic Compute dieses Verhalten so ändern möchten, dass overwrite Werte innerhalb des Prädikatbereichs und insert Datensätze außerhalb des angegebenen Bereichs verwendet werden, entfernen Sie die Einschränkungsprüfung, indem Sie spark.databricks.delta.replaceWhere.constraintCheck.enabled auf false festlegen:

Python

spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", False)

Scala

spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", false)

SQL

SET spark.databricks.delta.replaceWhere.constraintCheck.enabled=false

Note

REPLACE WHERE akzeptiert ein boolean_expression mit einigen Einschränkungen. Siehe INSERT in der SQL-Sprachreferenz.

REPLACE WHERE kann bei leeren Quellabfragen Tabellenzeilen löschen.

altes Verhalten

Legacy replaceWhere ist nur auf der klassischen Berechnung verfügbar. Siehe klassische Berechnungsübersicht.

Wenn Sie das Legacyverhalten von replaceWhereAbfragen verwenden, überschreiben Abfragen Daten, die einem Prädikat nur über Partitionsspalten entsprechen. Der folgende Befehl würde den Monat Januar in der Zieltabelle, die durch date partitioniert ist, atomisch mit den Daten in df ersetzen.

Python
(df.write
  .mode("overwrite")
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
  .saveAsTable("people10m")
)
Scala
df.write
  .mode("overwrite")
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
  .saveAsTable("people10m")

Um das Legacy-Verhalten zu verwenden, setzen Sie spark.databricks.delta.replaceWhere.dataColumns.enabled auf false:

Python
spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", False)
Scala
spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", false)
SQL
SET spark.databricks.delta.replaceWhere.dataColumns.enabled=false

Dynamische Datenüberschreibungen

Dynamische Daten überschreiben selektiv Daten, die mit den angegebenen Schlüsselspalten oder dem booleschen Ausdruck übereinstimmen, wobei alle anderen Daten unverändert bleiben. Partitionierte Tabellen, nicht partitionierte Tabellen und Tabellen mit flüssigem Clustering werden alle unterstützt.

Dynamische Partitionsüberschreibungen sind eine Teilmenge des Dynamischen Datenüberschreibverhaltens. Dynamische Partitionsüberschreibungen ersetzen alle vorhandenen Daten in jeder Partition, in die neue Daten geschrieben werden, und lassen alle anderen Partitionen unverändert. Es werden nur partitionierte Tabellen unterstützt.

REPLACE USING

SQL wird in Databricks Runtime 16.3 und höher unterstützt. Python und Scala werden in Databricks Runtime 18.2 und höher unterstützt. Informationen zu Verhaltensunterschieden in Databricks Runtime 16.3 bis 17.1 finden Sie unter Legacy-Verhalten.

REPLACE USING ermöglicht ein compute-unabhängiges, atomares Überschreibverhalten, das in Databricks SQL-Warehouses, serverlosem Computing und klassischem Computing funktioniert. REPLACE USING erfordert nicht, dass Sie eine Spark-Sitzungskonfiguration festlegen.

REPLACE USING ersetzt Zeilen, wenn die angegebenen Spalten im Gleichheitsvergleich übereinstimmen. Alle anderen Daten bleiben unverändert.

Verwenden Sie dynamisches Überschreiben von Daten mit REPLACE USING:

Python

(sourceDataDF.write
  .mode("overwrite")
  .option("replaceUsing", "event_id, start_date")
  .saveAsTable("events")
)

Scala

sourceDataDF.write
  .mode("overwrite")
  .option("replaceUsing", "event_id, start_date")
  .saveAsTable("events")

SQL

INSERT INTO TABLE events
  REPLACE USING (event_id, start_date)
  SELECT * FROM source_data

Bei leeren Quellabfragen REPLACE USING werden keine Tabellenzeilen gelöscht.

Verwenden Sie REPLACE ON stattdessen für komplexe oder NULL-sichere Abgleichsbedingungen. Siehe REPLACE ON.

Siehe INSERT in der SQL-Sprachreferenz.

Legacyverhalten

In Databricks Runtime 16.3 bis 17.1 REPLACE USING verwendet Legacyverhalten und lässt nur dynamische Partitionsüberschreibungen zu, während Databricks Runtime 17.2 und höher dynamische Datenüberschreibungen zulässt.

Beachten Sie die folgenden Einschränkungen und Verhaltensweisen für das REPLACE USING Legacy-Verhalten:

  • Sie müssen den vollständigen Satz der Partitionsspalten der Tabelle in der USING Klausel angeben.
  • Überprüfen Sie immer, ob die geschriebenen Daten nur die erwarteten Partitionen berühren. Eine einzelne Zeile in der falschen Partition kann unbeabsichtigt die gesamte Partition überschreiben.

REPLACE ON

SQL wird in Databricks Runtime 17.1 und höher unterstützt. Python und Scala werden in Databricks Runtime 18.2 und höher unterstützt.

REPLACE ON ersetzt Zeilen, wenn sie eine benutzerdefinierte Bedingung erfüllen, im Gegensatz zu REPLACE USING, das Zeilen ersetzt, wenn die angegebenen Spalten bei einem Gleichheitsvergleich als gleich gelten. Verwenden Sie diese Funktion REPLACE ON , wenn Sie übereinstimmende Logik benötigen, die REPLACE USING nicht unterstützt wird, z. B. das Behandeln von NULL Werten als gleich.

Verwenden Sie optional die Option targetAlias, um einen Alias für die Zieltabelle anzugeben, sowie die APIs .as() oder .alias(), um einen Alias für die Quelldaten anzugeben.

Informationen zur SQL-Syntax finden Sie unter INSERT.

Python

(sourceDataDF.alias("s")
  .write
  .mode("overwrite")
  .option("targetAlias", "t")
  .option("replaceOn", "s.event_id <=> t.event_id AND s.start_date <=> t.start_date")
  .saveAsTable("events")
)

Scala

sourceDataDF.as("s")
  .write
  .mode("overwrite")
  .option("targetAlias", "t")
  .option("replaceOn", "s.event_id <=> t.event_id AND s.start_date <=> t.start_date")
  .saveAsTable("events")

SQL

INSERT INTO TABLE events AS t
  REPLACE ON (s.event_id <=> t.event_id AND s.start_date <=> t.start_date)
  (SELECT * FROM source_data) AS s

Bei leeren Quellabfragen REPLACE ON werden keine Tabellenzeilen gelöscht.

Dynamisches Überschreiben von Partitionen mit partitionOverwriteMode (legacy)

Important

Dieses Feature befindet sich in der Public Preview.

Databricks Runtime 11.3 LTS und höher unterstützt das dynamische Überschreiben von Partitionen bei partitionierten Tabellen im Überschreibmodus: entweder INSERT OVERWRITE in SQL oder mittels eines DataFrame-Schreibzugriffs mit df.write.mode("overwrite"). Diese Art der Überschreibung ist nur für klassische Rechenleistung verfügbar und nicht für Databricks SQL Warehouses oder serverlose Rechenleistung.

Warnung

Verwenden Sie INSERT REPLACE USING nach Möglichkeit anstelle von Partitionsüberschreibung INSERT OVERWRITE PARTITION und spark.sql.sources.partitionOverwriteMode=dynamic. Das Überschreiben von Partitionen kann veraltete Daten verwenden, wenn sich die Partitionierung ändert.

Wenn Sie den Modus für dynamische Partitionsüberschreibung verwenden möchten, legen Sie die Spark-Sitzungskonfiguration spark.sql.sources.partitionOverwriteMode auf dynamic. Sie können alternativ die DataFrameWriter Option partitionOverwriteMode auf dynamic setzen. Wenn vorhanden, setzt die abfragespezifische Option den in der Sitzungskonfiguration definierten Modus außer Kraft. Der Standardwert für spark.sql.sources.partitionOverwriteMode ist static.

Im folgenden Beispiel wird partitionOverwriteMode verwendet:

SQL

SET spark.sql.sources.partitionOverwriteMode=dynamic;
INSERT OVERWRITE TABLE default.people10m SELECT * FROM morePeople;

Python

(df.write
  .mode("overwrite")
  .option("partitionOverwriteMode", "dynamic")
  .saveAsTable("default.people10m")
)

Scala

df.write
  .mode("overwrite")
  .option("partitionOverwriteMode", "dynamic")
  .saveAsTable("default.people10m")

Beachten Sie die folgenden Einschränkungen und Verhaltensweisen für partitionOverwriteMode:

  • Sie können overwriteSchema nicht auf true setzen.
  • Sie können nicht sowohl partitionOverwriteMode als auch replaceWhere im selben DataFrameWriter-Vorgang angeben.
  • Wenn Sie eine replaceWhere Bedingung mithilfe einer DataFrameWriter Option angeben, wendet Delta Lake diese Bedingung an, um zu steuern, welche Daten überschrieben werden. Diese Option hat Vorrang vor der partitionOverwriteMode Konfiguration auf Sitzungsebene.
  • Überprüfen Sie immer, ob die geschriebenen Daten nur die erwarteten Partitionen berühren. Eine einzelne Zeile in der falschen Partition kann unbeabsichtigt die gesamte Partition überschreiben.