Creare un modello di apprendimento automatico con MLlib di Apache Spark

Questo articolo illustra come usare Apache Spark MLlib per creare un'applicazione di Machine Learning che gestisce l'analisi predittiva in un set di dati aperto Azure. Spark offre librerie di apprendimento automatico predefinite. Questo esempio usa la classificazione tramite regressione logistica.

La guida illustra questi passaggi:

  • Configura il notebook e gli import
  • Caricare e campionare i dati dei taxi di NYC
  • Preparare e progettare le funzionalità
  • Codificare le funzionalità categoriche
  • Addestrare il modello di regressione logistica
  • Valutare e visualizzare i risultati

SparkML e MLlib, librerie Spark di base, offrono diverse utilità in grado di agevolare le attività di apprendimento automatico. Queste utilità sono adatte per:

  • Classificazione
  • Clustering
  • Verifica delle ipotesi e calcolo delle statistiche campionarie
  • Regressione
  • Scomposizione di valori singolari e analisi in componenti principali
  • Modellazione di argomenti

Prerequisiti

  • Abbonati a Microsoft Fabric. Oppure, registrati per una versione di prova gratuita di Microsoft Fabric.

  • Accedi a Microsoft Fabric.

  • Passare a Fabric usando il selettore di esperienza nell'angolo in basso a sinistra della home page.

    Screenshot che mostra la selezione di

Informazioni sulla classificazione e la regressione logistica

Classificazione, un'attività comune di apprendimento automatico, prevede il processo di ordinamento dei dati in categorie. Un algoritmo di classificazione illustra come assegnare etichette ai dati di input forniti. Ad esempio, un algoritmo di Machine Learning potrebbe accettare informazioni sulle azioni come input e dividere il titolo in due categorie: azioni da vendere e azioni da conservare.

L'algoritmo di regressione logistica è utile per la classificazione. L'API di regressione logistica di Spark è utile per la classificazione binaria, ovvero la classificazione di dati di input in uno di due gruppi. Per altre informazioni sulla regressione logistica, vedere Wikipedia.

La regressione logistica produce una funzione logistica che stima la probabilità che un vettore di input appartenga a un gruppo o all'altro.

Esempio di analisi predittiva di dati relativi alla rete taxi di New York

I dati sono disponibili tramite risorse di set di dati aperti di Azure. Questo sottoinsieme di set di dati ospita informazioni sulle corse in taxi gialle, inclusi gli orari di inizio, le ore di fine, le posizioni di inizio, le posizioni di fine, i costi delle corse e altri attributi.

Questa esercitazione usa Apache Spark per eseguire analisi sui dati relativi alle mance dei taxi di New York e sviluppare un modello per stimare se una determinata corsa include una mancia.

Creare un modello di Machine Learning con Apache Spark

  1. Creare un notebook PySpark. Per altre informazioni, vedere Creare un notebook.

    Dopo aver creato il notebook, collegalo a un lakehouse selezionando Aggiungi lakehouse nel pannello a sinistra.

  2. Importare i tipi necessari per questo notebook. Incollare il codice seguente nella prima cella ed eseguirlo.

    import matplotlib.pyplot as plt
    from pyspark.sql.functions import unix_timestamp, date_format, col, when
    from pyspark.ml import Pipeline
    from pyspark.ml.feature import RFormula
    from pyspark.ml.feature import OneHotEncoder, StringIndexer
    from pyspark.ml.classification import LogisticRegression
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    

    Verifica: la compilazione della cella si completa senza ImportError. Se viene visualizzato un errore, verificare che il notebook usi il runtime PySpark.

  3. Usare MLflow per tenere traccia degli esperimenti di Machine Learning e delle esecuzioni corrispondenti. Se la registrazione automatica di Microsoft Fabric è abilitata, vengono acquisite automaticamente le metriche e i parametri corrispondenti.

    import mlflow
    

    Verificare: il completamento della cella avviene senza errori. Eseguire print(mlflow.__version__) per verificare che MLflow sia disponibile.

Creare il DataFrame di input

Questo esempio carica i dati dall'archiviazione set di dati aperti di Azure in un dataframe Apache Spark. Si applicano quindi operazioni Spark per pulire e filtrare il set di dati.

  1. Incollare il codice seguente in una nuova cella ed eseguirlo per creare un dataframe Spark. Questo passaggio recupera i dati dei taxi gialli di New York filtrati a maggio 2018.

    blob_account_name = "azureopendatastorage"
    blob_container_name = "nyctlc"
    blob_relative_path = "yellow"
    wasbs_path = f"wasbs://{blob_container_name}@{blob_account_name}.blob.core.windows.net/{blob_relative_path}"
    
    nyc_tlc_df = spark.read.parquet(wasbs_path) \
        .filter((col("tpepPickupDateTime") >= "2018-05-01") & (col("tpepPickupDateTime") < "2018-06-01")) \
        .repartition(20)
    

    Verifica: eseguire la cella seguente per verificare che i dati vengano caricati correttamente.

    print(f"Loaded {nyc_tlc_df.count()} rows")
    # Expected output: Loaded approximately 9,000,000+ rows
    
  2. Esempio del set di dati per velocizzare lo sviluppo e il training.

    # Sample without replacement to avoid duplicates
    sampled_taxi_df = nyc_tlc_df.sample(False, 0.001, seed=1234)
    

    Verifica: verificare che le dimensioni del campione siano gestibili.

    print(f"Sampled {sampled_taxi_df.count()} rows")
    # Expected output: Sampled approximately 9,000-10,000 rows
    
  3. Visualizzare i dati usando il comando predefinito display() per esplorare l'esempio di dati.

    display(sampled_taxi_df.limit(10))
    

    Verifica: viene visualizzata una tabella con 10 righe che mostra colonne come tpepPickupDateTime, fareAmounttipAmount, e tripDistance.

Preparare i dati

La preparazione dei dati è un passaggio fondamentale nel processo di Machine Learning. Comporta la pulizia, la trasformazione e l'organizzazione dei dati non elaborati per renderli adatti per l'analisi e la modellazione. In questa sezione eseguire diversi passaggi di preparazione dei dati:

  • Filtrare il set di dati per rimuovere outlier e valori non corretti.
  • Rimuovere colonne non necessarie per il training del modello.
  • Creare nuove colonne dai dati non elaborati.
  • Generare un'etichetta per determinare se una determinata corsa in taxi prevede una mancia.

Eseguire il codice seguente per selezionare colonne pertinenti, calcolare le funzionalità derivate e filtrare gli outlier:

taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount',
                    'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime',
                    date_format('tpepPickupDateTime', 'HH').cast('integer').alias('pickupHour'),
                    date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString'),
                    (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs'),
                    (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
                    ) \
            .filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)
                    & (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)
                    & (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)
                    & (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)
                    & (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)
                    & (sampled_taxi_df.rateCodeId <= 5)
                    & (sampled_taxi_df.paymentType.isin({"1", "2"}))
                    )

Importante

La date_format funzione usa il modello 'HH' (formato di 24 ore, valori 0-23) anziché 'hh' (formato di 12 ore, valori 1-12). Il formato a 24 ore è necessario per la logica successiva di raggruppamento per fasce orarie.

Successivamente, aggiungi la funzionalità delle fasce orarie del traffico in base all'ora del giorno:

taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount',
                                    'tripDistance', 'weekdayString', 'pickupHour', 'tripTimeSecs', 'tipped',
                                    when((col('pickupHour') <= 6) | (col('pickupHour') >= 20), "Night")
                                    .when((col('pickupHour') >= 7) & (col('pickupHour') <= 10), "AMRush")
                                    .when((col('pickupHour') >= 11) & (col('pickupHour') <= 15), "Afternoon")
                                    .when((col('pickupHour') >= 16) & (col('pickupHour') <= 19), "PMRush")
                                    .otherwise("Other").alias('trafficTimeBins')
                                    ) \
                            .filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))

Verifica: verificare che gli intervalli temporali del traffico siano distribuiti correttamente.

taxi_featurised_df.groupBy('trafficTimeBins').count().show()
# Expected output: Shows counts for Night, AMRush, Afternoon, PMRush categories

Creare un modello di regressione logistica

L'ultima attività converte i dati con etichetta in un formato che possa essere gestito dalla regressione logistica. L'input per un algoritmo di regressione logistica deve avere una struttura di coppie etichetta-vettore di funzionalità, dove il vettore di funzionalità è un vettore di numeri che rappresenta il punto di ingresso.

Convertire le colonne trafficTimeBins categoriche e weekdayString in rappresentazioni integer usando l'approccio OneHotEncoder seguente:

# Convert categorical features into numeric representations
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(inputCol="weekdayIndex", outputCol="weekdayVec")

# Apply the encodings to create a new DataFrame
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)

Verifica: verificare che il dataframe codificato contenga le nuove colonne previste.

print("Columns:", encoded_final_df.columns)
print(f"Row count: {encoded_final_df.count()}")
# Expected output: Columns list includes 'trafficTimeBinsVec' and 'weekdayVec'

Addestrare un modello di regressione logistica

Suddividere il set di dati in un set di training (70%) e un set di test (30%):

# Split the DataFrame into training and test sets
trainingFraction = 0.7
testingFraction = (1 - trainingFraction)
seed = 1234

train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)

Verifica: conferma che la suddivisione abbia prodotto dimensioni adeguate.

print(f"Training rows: {train_data_df.count()}, Test rows: {test_data_df.count()}")
# Expected output: Approximately 70%/30% split of the encoded data

Crea la formula del modello, addestra il modello di regressione logistica e valutalo utilizzando l'area sotto la curva ROC (Receiver Operating Characteristic):

# Create a logistic regression model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol='label')

# Define the formula: 'tipped' is the response variable, right-hand side are predictors
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType + trafficTimeBinsVec")

# Train the model using a pipeline
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)

# Generate predictions on the test dataset
predictions = lrModel.transform(test_data_df)

# Evaluate using Area Under ROC
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", metricName="areaUnderROC")
auc = evaluator.evaluate(predictions)
print(f"Area under ROC = {auc}")

Verifica: l'output mostra un valore AUC. Un modello con prestazioni buone produce un valore vicino a 1,0.

Area under ROC = 0.97 (approximately)

Note

Il valore esatto dell'AUC varia a seconda dell'esempio di dati. I valori superiori a 0,90 indicano prestazioni predittive avanzate per questo set di dati.

Creare una rappresentazione visiva della stima

Creare una visualizzazione finale per interpretare i risultati del modello. Una curva ROC presenta il compromesso tra il tasso positivo vero e il tasso falso positivo.

# Plot the ROC curve from the model training summary
modelSummary = lrModel.stages[-1].summary

# Extract FPR and TPR values as plain lists
roc_data = modelSummary.roc.select('FPR', 'TPR').toPandas()

plt.figure(figsize=(8, 6))
plt.plot([0, 1], [0, 1], 'r--', label='Random classifier')
plt.plot(roc_data['FPR'], roc_data['TPR'], label=f'Logistic Regression (AUC = {auc:.4f})')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve - NYC Taxi Tip Prediction')
plt.legend(loc='lower right')
plt.show()

Verifica: viene visualizzato un tracciato che mostra la curva ROC sopra la linea diagonale tratteggiata rossa. La curva deve inchinarsi verso l'angolo superiore sinistro, a indicare prestazioni di classificazione avanzate.

Grafico che mostra la curva ROC per la regressione logistica nel modello di mancia.

Pulire le risorse

Dopo aver completato questa esercitazione, elimina il notebook e il lakehouse per liberare la capacità dell'area di lavoro:

  1. Nell'area di lavoro fare clic con il pulsante destro del mouse sul notebook e scegliere Elimina.
  2. Se è stata creata una lakehouse specificamente per questa esercitazione, fare clic con il pulsante destro del mouse su di essa e selezionare Elimina.

Per conservare il modello addestrato per un utilizzo futuro, aggiungere il codice seguente prima della fase di pulizia:

# Save the model to the lakehouse
model_path = "abfss://<your-workspace>@onelake.dfs.fabric.microsoft.com/<your-lakehouse>.Lakehouse/Files/models/taxi_tip_model"
lrModel.write().overwrite().save(model_path)
print(f"Model saved to: {model_path}")

Troubleshooting

Issue Motivo Soluzione
Py4JJavaError durante la lettura di Parquet Connettività di rete all'archiviazione BLOB di Azure Verificare che l'area di lavoro Fabric abbia accesso a Internet in uscita. Prova a riavviare la sessione Spark.
AnalysisException: cannot resolve column Errore di ortografia del nome della colonna o mancata corrispondenza dello schema Eseguire nyc_tlc_df.printSchema() per esaminare le colonne disponibili. Lo schema del set di dati nyc taxi può cambiare tra anni.
DataFrame vuoto dopo il filtraggio Condizioni di filtro troppo restrittive per la finestra dati Aumentare l'intervallo di date o controllare sampled_taxi_df.count() prima del filtro.
IllegalArgumentException in StringIndexer Etichette non visibili durante la trasformazione Aggiungere handleInvalid="skip" alle StringIndexer chiamate: StringIndexer(inputCol="...", outputCol="...", handleInvalid="skip")
AUC basso (inferiore a 0,6) Dati insufficienti o progettazione di funzionalità non corrette Aumentare la frazione di campione (ad esempio, 0.01 anziché 0.001) e verificare che trafficTimeBins le categorie siano bilanciate.
OutOfMemoryError Set di dati troppo grande per la capacità disponibile Riduci la frazione di campionamento o aumenta il livello di capacità di Fabric.
Grafico ROC non visualizzato Problema del back-end Matplotlib nel notebook Aggiungere %matplotlib inline nella parte superiore del notebook.