Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Questo articolo illustra come usare Apache Spark MLlib per creare un'applicazione di Machine Learning che gestisce l'analisi predittiva in un set di dati aperto Azure. Spark offre librerie di apprendimento automatico predefinite. Questo esempio usa la classificazione tramite regressione logistica.
La guida illustra questi passaggi:
- Configura il notebook e gli import
- Caricare e campionare i dati dei taxi di NYC
- Preparare e progettare le funzionalità
- Codificare le funzionalità categoriche
- Addestrare il modello di regressione logistica
- Valutare e visualizzare i risultati
SparkML e MLlib, librerie Spark di base, offrono diverse utilità in grado di agevolare le attività di apprendimento automatico. Queste utilità sono adatte per:
- Classificazione
- Clustering
- Verifica delle ipotesi e calcolo delle statistiche campionarie
- Regressione
- Scomposizione di valori singolari e analisi in componenti principali
- Modellazione di argomenti
Prerequisiti
Abbonati a Microsoft Fabric. Oppure, registrati per una versione di prova gratuita di Microsoft Fabric.
Accedi a Microsoft Fabric.
Passare a Fabric usando il selettore di esperienza nell'angolo in basso a sinistra della home page.
- Se necessario, crea un Microsoft Fabric Lakehouse come descritto in Crea un Lakehouse in Microsoft Fabric.
- Crea un nuovo notebook nell'area di lavoro selezionando + e quindi Notebook. Per altre informazioni, vedere Creare un notebook.
Informazioni sulla classificazione e la regressione logistica
Classificazione, un'attività comune di apprendimento automatico, prevede il processo di ordinamento dei dati in categorie. Un algoritmo di classificazione illustra come assegnare etichette ai dati di input forniti. Ad esempio, un algoritmo di Machine Learning potrebbe accettare informazioni sulle azioni come input e dividere il titolo in due categorie: azioni da vendere e azioni da conservare.
L'algoritmo di regressione logistica è utile per la classificazione. L'API di regressione logistica di Spark è utile per la classificazione binaria, ovvero la classificazione di dati di input in uno di due gruppi. Per altre informazioni sulla regressione logistica, vedere Wikipedia.
La regressione logistica produce una funzione logistica che stima la probabilità che un vettore di input appartenga a un gruppo o all'altro.
Esempio di analisi predittiva di dati relativi alla rete taxi di New York
I dati sono disponibili tramite risorse di set di dati aperti di Azure. Questo sottoinsieme di set di dati ospita informazioni sulle corse in taxi gialle, inclusi gli orari di inizio, le ore di fine, le posizioni di inizio, le posizioni di fine, i costi delle corse e altri attributi.
Questa esercitazione usa Apache Spark per eseguire analisi sui dati relativi alle mance dei taxi di New York e sviluppare un modello per stimare se una determinata corsa include una mancia.
Creare un modello di Machine Learning con Apache Spark
Creare un notebook PySpark. Per altre informazioni, vedere Creare un notebook.
Dopo aver creato il notebook, collegalo a un lakehouse selezionando Aggiungi lakehouse nel pannello a sinistra.
Importare i tipi necessari per questo notebook. Incollare il codice seguente nella prima cella ed eseguirlo.
import matplotlib.pyplot as plt from pyspark.sql.functions import unix_timestamp, date_format, col, when from pyspark.ml import Pipeline from pyspark.ml.feature import RFormula from pyspark.ml.feature import OneHotEncoder, StringIndexer from pyspark.ml.classification import LogisticRegression from pyspark.ml.evaluation import BinaryClassificationEvaluatorVerifica: la compilazione della cella si completa senza
ImportError. Se viene visualizzato un errore, verificare che il notebook usi il runtime PySpark.Usare MLflow per tenere traccia degli esperimenti di Machine Learning e delle esecuzioni corrispondenti. Se la registrazione automatica di Microsoft Fabric è abilitata, vengono acquisite automaticamente le metriche e i parametri corrispondenti.
import mlflowVerificare: il completamento della cella avviene senza errori. Eseguire
print(mlflow.__version__)per verificare che MLflow sia disponibile.
Creare il DataFrame di input
Questo esempio carica i dati dall'archiviazione set di dati aperti di Azure in un dataframe Apache Spark. Si applicano quindi operazioni Spark per pulire e filtrare il set di dati.
Incollare il codice seguente in una nuova cella ed eseguirlo per creare un dataframe Spark. Questo passaggio recupera i dati dei taxi gialli di New York filtrati a maggio 2018.
blob_account_name = "azureopendatastorage" blob_container_name = "nyctlc" blob_relative_path = "yellow" wasbs_path = f"wasbs://{blob_container_name}@{blob_account_name}.blob.core.windows.net/{blob_relative_path}" nyc_tlc_df = spark.read.parquet(wasbs_path) \ .filter((col("tpepPickupDateTime") >= "2018-05-01") & (col("tpepPickupDateTime") < "2018-06-01")) \ .repartition(20)Verifica: eseguire la cella seguente per verificare che i dati vengano caricati correttamente.
print(f"Loaded {nyc_tlc_df.count()} rows") # Expected output: Loaded approximately 9,000,000+ rowsEsempio del set di dati per velocizzare lo sviluppo e il training.
# Sample without replacement to avoid duplicates sampled_taxi_df = nyc_tlc_df.sample(False, 0.001, seed=1234)Verifica: verificare che le dimensioni del campione siano gestibili.
print(f"Sampled {sampled_taxi_df.count()} rows") # Expected output: Sampled approximately 9,000-10,000 rowsVisualizzare i dati usando il comando predefinito
display()per esplorare l'esempio di dati.display(sampled_taxi_df.limit(10))Verifica: viene visualizzata una tabella con 10 righe che mostra colonne come
tpepPickupDateTime,fareAmounttipAmount, etripDistance.
Preparare i dati
La preparazione dei dati è un passaggio fondamentale nel processo di Machine Learning. Comporta la pulizia, la trasformazione e l'organizzazione dei dati non elaborati per renderli adatti per l'analisi e la modellazione. In questa sezione eseguire diversi passaggi di preparazione dei dati:
- Filtrare il set di dati per rimuovere outlier e valori non corretti.
- Rimuovere colonne non necessarie per il training del modello.
- Creare nuove colonne dai dati non elaborati.
- Generare un'etichetta per determinare se una determinata corsa in taxi prevede una mancia.
Eseguire il codice seguente per selezionare colonne pertinenti, calcolare le funzionalità derivate e filtrare gli outlier:
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount',
'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime',
date_format('tpepPickupDateTime', 'HH').cast('integer').alias('pickupHour'),
date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString'),
(unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs'),
(when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
) \
.filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)
& (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)
& (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)
& (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)
& (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)
& (sampled_taxi_df.rateCodeId <= 5)
& (sampled_taxi_df.paymentType.isin({"1", "2"}))
)
Importante
La date_format funzione usa il modello 'HH' (formato di 24 ore, valori 0-23) anziché 'hh' (formato di 12 ore, valori 1-12). Il formato a 24 ore è necessario per la logica successiva di raggruppamento per fasce orarie.
Successivamente, aggiungi la funzionalità delle fasce orarie del traffico in base all'ora del giorno:
taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount',
'tripDistance', 'weekdayString', 'pickupHour', 'tripTimeSecs', 'tipped',
when((col('pickupHour') <= 6) | (col('pickupHour') >= 20), "Night")
.when((col('pickupHour') >= 7) & (col('pickupHour') <= 10), "AMRush")
.when((col('pickupHour') >= 11) & (col('pickupHour') <= 15), "Afternoon")
.when((col('pickupHour') >= 16) & (col('pickupHour') <= 19), "PMRush")
.otherwise("Other").alias('trafficTimeBins')
) \
.filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))
Verifica: verificare che gli intervalli temporali del traffico siano distribuiti correttamente.
taxi_featurised_df.groupBy('trafficTimeBins').count().show()
# Expected output: Shows counts for Night, AMRush, Afternoon, PMRush categories
Creare un modello di regressione logistica
L'ultima attività converte i dati con etichetta in un formato che possa essere gestito dalla regressione logistica. L'input per un algoritmo di regressione logistica deve avere una struttura di coppie etichetta-vettore di funzionalità, dove il vettore di funzionalità è un vettore di numeri che rappresenta il punto di ingresso.
Convertire le colonne trafficTimeBins categoriche e weekdayString in rappresentazioni integer usando l'approccio OneHotEncoder seguente:
# Convert categorical features into numeric representations
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(inputCol="weekdayIndex", outputCol="weekdayVec")
# Apply the encodings to create a new DataFrame
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)
Verifica: verificare che il dataframe codificato contenga le nuove colonne previste.
print("Columns:", encoded_final_df.columns)
print(f"Row count: {encoded_final_df.count()}")
# Expected output: Columns list includes 'trafficTimeBinsVec' and 'weekdayVec'
Addestrare un modello di regressione logistica
Suddividere il set di dati in un set di training (70%) e un set di test (30%):
# Split the DataFrame into training and test sets
trainingFraction = 0.7
testingFraction = (1 - trainingFraction)
seed = 1234
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)
Verifica: conferma che la suddivisione abbia prodotto dimensioni adeguate.
print(f"Training rows: {train_data_df.count()}, Test rows: {test_data_df.count()}")
# Expected output: Approximately 70%/30% split of the encoded data
Crea la formula del modello, addestra il modello di regressione logistica e valutalo utilizzando l'area sotto la curva ROC (Receiver Operating Characteristic):
# Create a logistic regression model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol='label')
# Define the formula: 'tipped' is the response variable, right-hand side are predictors
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType + trafficTimeBinsVec")
# Train the model using a pipeline
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)
# Generate predictions on the test dataset
predictions = lrModel.transform(test_data_df)
# Evaluate using Area Under ROC
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", metricName="areaUnderROC")
auc = evaluator.evaluate(predictions)
print(f"Area under ROC = {auc}")
Verifica: l'output mostra un valore AUC. Un modello con prestazioni buone produce un valore vicino a 1,0.
Area under ROC = 0.97 (approximately)
Note
Il valore esatto dell'AUC varia a seconda dell'esempio di dati. I valori superiori a 0,90 indicano prestazioni predittive avanzate per questo set di dati.
Creare una rappresentazione visiva della stima
Creare una visualizzazione finale per interpretare i risultati del modello. Una curva ROC presenta il compromesso tra il tasso positivo vero e il tasso falso positivo.
# Plot the ROC curve from the model training summary
modelSummary = lrModel.stages[-1].summary
# Extract FPR and TPR values as plain lists
roc_data = modelSummary.roc.select('FPR', 'TPR').toPandas()
plt.figure(figsize=(8, 6))
plt.plot([0, 1], [0, 1], 'r--', label='Random classifier')
plt.plot(roc_data['FPR'], roc_data['TPR'], label=f'Logistic Regression (AUC = {auc:.4f})')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve - NYC Taxi Tip Prediction')
plt.legend(loc='lower right')
plt.show()
Verifica: viene visualizzato un tracciato che mostra la curva ROC sopra la linea diagonale tratteggiata rossa. La curva deve inchinarsi verso l'angolo superiore sinistro, a indicare prestazioni di classificazione avanzate.
Pulire le risorse
Dopo aver completato questa esercitazione, elimina il notebook e il lakehouse per liberare la capacità dell'area di lavoro:
- Nell'area di lavoro fare clic con il pulsante destro del mouse sul notebook e scegliere Elimina.
- Se è stata creata una lakehouse specificamente per questa esercitazione, fare clic con il pulsante destro del mouse su di essa e selezionare Elimina.
Per conservare il modello addestrato per un utilizzo futuro, aggiungere il codice seguente prima della fase di pulizia:
# Save the model to the lakehouse
model_path = "abfss://<your-workspace>@onelake.dfs.fabric.microsoft.com/<your-lakehouse>.Lakehouse/Files/models/taxi_tip_model"
lrModel.write().overwrite().save(model_path)
print(f"Model saved to: {model_path}")
Troubleshooting
| Issue | Motivo | Soluzione |
|---|---|---|
Py4JJavaError durante la lettura di Parquet |
Connettività di rete all'archiviazione BLOB di Azure | Verificare che l'area di lavoro Fabric abbia accesso a Internet in uscita. Prova a riavviare la sessione Spark. |
AnalysisException: cannot resolve column |
Errore di ortografia del nome della colonna o mancata corrispondenza dello schema | Eseguire nyc_tlc_df.printSchema() per esaminare le colonne disponibili. Lo schema del set di dati nyc taxi può cambiare tra anni. |
| DataFrame vuoto dopo il filtraggio | Condizioni di filtro troppo restrittive per la finestra dati | Aumentare l'intervallo di date o controllare sampled_taxi_df.count() prima del filtro. |
IllegalArgumentException in StringIndexer |
Etichette non visibili durante la trasformazione | Aggiungere handleInvalid="skip" alle StringIndexer chiamate: StringIndexer(inputCol="...", outputCol="...", handleInvalid="skip") |
| AUC basso (inferiore a 0,6) | Dati insufficienti o progettazione di funzionalità non corrette | Aumentare la frazione di campione (ad esempio, 0.01 anziché 0.001) e verificare che trafficTimeBins le categorie siano bilanciate. |
OutOfMemoryError |
Set di dati troppo grande per la capacità disponibile | Riduci la frazione di campionamento o aumenta il livello di capacità di Fabric. |
| Grafico ROC non visualizzato | Problema del back-end Matplotlib nel notebook | Aggiungere %matplotlib inline nella parte superiore del notebook. |
Contenuto correlato
- Usare esempi di intelligenza artificiale per creare modelli di Machine Learning: Usare esempi di intelligenza artificiale
- Tenere traccia delle esecuzioni di Machine Learning con Esperimenti: esperimenti di Machine Learning