将自定义 Databricks 应用连接到 Lakebase

重要

Lakebase Autoscaling 是 Lakebase 的最新版本更新,具有自动缩放计算、缩放到零、分支和即时还原功能。 有关支持的区域,请参阅 区域可用性。 如果你是 Lakebase 预配的用户,请参阅 Lakebase 预配

本教程展示如何将 Databricks 应用程序连接到具有自动凭据轮换的 Lakebase 自动缩放功能。 应用在 Databricks 过期之前从 Databricks 生成新的数据库凭据。 此示例使用 Flask,但身份验证模式适用于任何框架。

工作原理

Databricks Apps 使用 OAuth 令牌向 Lakebase 进行身份验证,该令牌在一小时后过期。 若要处理此问题,请为应用 的服务主体创建 Postgres 角色,然后配置应用,以便在需要连接到数据库时自动生成新的令牌。 应用使用连接池。 根据需要,池会使用新令牌创建新连接,因此应用程序绝不会使用过期凭据。

将应用部署到Azure Databricks时,它将作为其服务主体运行,并为该标识生成令牌。 在本地测试时,应用将作为Azure Databricks用户帐户运行,并为你生成令牌。 两者都使用相同的令牌轮换代码。 仅身份验证上下文更改。

开始之前

若要完成本教程,需要:

  • 访问启用了 Lakebase Postgres 自动缩放的 Azure Databricks 工作区。 如果在应用切换器中看不到 Lakebase,请与工作区管理员联系。
  • 创建应用的权限
  • 基本熟悉Python和 SQL
  • 为本地开发安装的 Databricks CLI
  • 在本地安装 Python 3.9 或更高版本

步骤 1:创建应用和数据库

首先,创建 Databricks 应用和 Lakebase 项目。 应用会自动获取用于数据库身份验证的服务主体标识。

创建应用

使用 Flask Hello world 模板创建新的 Databricks 应用。 请参阅 从模板创建 Databricks 应用

创建应用后,转到应用的 “环境 ”选项卡并记下 DATABRICKS_CLIENT_ID 值(例如 6b215d2b-f099-4bdb-900a-60837201ececUUID 格式)。 这将成为应用用于 OAuth 身份验证的 Postgres 用户名。

注释

尚未部署应用。 首先配置数据库连接。

创建数据库

创建新的 Lakebase 自动缩放项目来托管数据库。 单击 “应用”图标。应用 切换器,选择 Lakebase Postgres,然后创建一个名称(例如 my-app-db)和 Postgres 版本(接受默认 Postgres 17)的新项目。 有关完整的设置详细信息,请参阅 “创建项目”。

等待计算资源变为活动状态(大约 1 分钟)后再继续操作。

步骤 2:配置数据库身份验证和架构

使用 OAuth 身份验证为应用的服务主体创建 Postgres 角色,然后创建一个包含要显示应用数据的示例表。

设置 OAuth 身份验证

在 Lakebase 项目中,打开 SQL 编辑器并运行这些命令。 该 databricks_auth 扩展启用 OAuth 身份验证。 有了它,Postgres 角色将接受 Databricks 令牌,而不是传统密码:

-- Enable the Databricks authentication extension
CREATE EXTENSION IF NOT EXISTS databricks_auth;

-- Create a Postgres role for your app's service principal
-- Replace the UUID below with your DATABRICKS_CLIENT_ID from Step 1
SELECT databricks_create_role('<DATABRICKS_CLIENT_ID>', 'service_principal');

-- Grant necessary permissions (use the same DATABRICKS_CLIENT_ID)
GRANT CONNECT ON DATABASE databricks_postgres TO "<DATABRICKS_CLIENT_ID>";
GRANT CREATE, USAGE ON SCHEMA public TO "<DATABRICKS_CLIENT_ID>";

<DATABRICKS_CLIENT_ID> 替换为应用的 DATABRICKS_CLIENT_ID 值。 现在,服务主体可以使用Azure Databricks自动管理的 OAuth 令牌进行身份验证。 有关详细信息,请参阅 为Azure Databricks标识创建 OAuth 角色

创建数据库架构

为服务主体创建具有显式权限的示例表(服务主体不继承默认架构权限):

-- Create a sample table
CREATE TABLE notes (
    id SERIAL PRIMARY KEY,
    content TEXT NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- Grant permissions to your app's service principal (use your DATABRICKS_CLIENT_ID)
GRANT SELECT, INSERT, UPDATE, DELETE ON TABLE notes TO "<DATABRICKS_CLIENT_ID>";

-- Insert sample data
INSERT INTO notes (content) VALUES
   ('Welcome to Lakebase Autoscaling!'),
   ('This app connects to Postgres'),
   ('Data fetched from your database');

<DATABRICKS_CLIENT_ID> 替换为 DATABRICKS_CLIENT_ID 值。

步骤 3:生成和配置应用程序

下载应用文件,使用自动 OAuth 令牌轮换配置数据库连接,并在部署之前在本地进行测试。

下载和配置应用文件

从工作区下载应用文件,方法是从应用的 “同步文件 ”部分复制导出命令:

databricks workspace export-dir /Workspace/Users/<your-email>/databricks_apps/<app-folder>/flask-hello-world-app .

编辑 app.yaml 以添加数据库连接详细信息。 通过从 Lakebase Connect 弹出框中仅选择 参数 来获取连接值:

command: ['flask', '--app', 'app.py', 'run', '--host', '0.0.0.0', '--port', '8000']

env:
  - name: PGHOST
    value: '<your-endpoint-hostname>'
  - name: PGDATABASE
    value: 'databricks_postgres'
  - name: PGUSER
    value: '<DATABRICKS_CLIENT_ID>'
  - name: PGPORT
    value: '5432'
  - name: PGSSLMODE
    value: 'require'
  - name: ENDPOINT_NAME
    value: 'projects/<project-id>/branches/<branch-id>/endpoints/<endpoint-id>'

替换占位符:

  • <your-endpoint-hostname>:从 Connect 模式复制 PGHOST 值(例如 ep-xyz.database.us-west-2.dev.databricks.com
  • <DATABRICKS_CLIENT_ID>:使用来自步骤 1 的 DATABRICKS_CLIENT_ID
  • projects/<project-id>/branches/<branch-id>/endpoints/<endpoint-id>:在 Lakebase 应用中,进入分支的Computes选项卡,单击您的计算资源的获取 ID,然后选择复制资源名称

实现 OAuth 令牌轮换和数据库查询

用该代码替换 app.py,此代码添加了自动 OAuth 令牌轮换,并且包含从步骤 2 提取笔记的数据库查询。

import os
from databricks.sdk import WorkspaceClient
import psycopg
from psycopg_pool import ConnectionPool
from flask import Flask

app = Flask(__name__)

# Initialize Databricks client for token generation
w = WorkspaceClient()

# Custom connection class that generates fresh OAuth tokens
class OAuthConnection(psycopg.Connection):
    @classmethod
    def connect(cls, conninfo='', **kwargs):
        # Generate a fresh OAuth token for each connection (tokens are workspace-scoped)
        endpoint_name = os.environ["ENDPOINT_NAME"]
        credential = w.postgres.generate_database_credential(endpoint=endpoint_name)
        kwargs['password'] = credential.token
        return super().connect(conninfo, **kwargs)

# Configure connection parameters
username = os.environ["PGUSER"]
host = os.environ["PGHOST"]
port = os.environ.get("PGPORT", "5432")
database = os.environ["PGDATABASE"]
sslmode = os.environ.get("PGSSLMODE", "require")

# Create connection pool with automatic token rotation
pool = ConnectionPool(
    conninfo=f"dbname={database} user={username} host={host} port={port} sslmode={sslmode}",
    connection_class=OAuthConnection,
    min_size=1,
    max_size=10,
    open=True
)

@app.route('/')
def hello_world():
    # Use connection from pool (automatically gets fresh token)
    with pool.connection() as conn:
        with conn.cursor() as cur:
            cur.execute("SELECT content, created_at FROM notes ORDER BY created_at DESC LIMIT 5")
            notes = cur.fetchall()

    # Display results
    notes_html = "<ul>" + "".join([f"<li>{note[0]} - {note[1]}</li>" for note in notes]) + "</ul>"
    return f'<h1>Hello from Lakebase!</h1><h2>Recent Notes:</h2>{notes_html}'

if __name__ == '__main__':
    app.run(host="0.0.0.0", port=8000)

三个关键组件包括:

  • WorkspaceClient:使用 SDK 生成新的凭据。
  • OAuthConnection:一种自定义连接类,用于将新凭据注入每个连接。
  • ConnectionPool:根据需要管理连接并调用自定义类。

有关凭据轮换策略和错误处理的详细信息,请参阅 令牌轮换示例

更新 requirements.txt 以包含所需的包:

flask
psycopg[binary,pool]
databricks-sdk>=0.81.0

版本 0.81.0 或更高版本包括 generate_database_credential() 该方法。

本地测试

在本地测试应用,以在部署之前验证数据库连接是否正常工作。 在本地测试时,应用作为Azure Databricks用户帐户(而不是服务主体)运行,因此请将PGUSER更改为以下环境变量中的电子邮件地址。

对工作区进行身份验证并导出环境变量:

databricks auth login

export PGHOST="<your-endpoint-hostname>"
export PGDATABASE="databricks_postgres"
export PGUSER="your.email@company.com"  # Use YOUR email for local testing, not the service principal
export PGPORT="5432"
export PGSSLMODE="require"
export ENDPOINT_NAME="<your-endpoint-name>"

app.yaml 复制值,但请将 PGUSER 值(服务主体客户端 ID)替换为Azure Databricks电子邮件地址。

安装依赖项并运行应用:

pip3 install --upgrade -r requirements.txt
python3 app.py

在浏览器中打开 http://localhost:8000 。 您应该能看到“Hello from Lakebase!”,以及您的三个示例笔记。 连接池在创建新连接时自动生成新的 OAuth 令牌。 有关详细信息,请参阅 OAuth 令牌身份验证

本地应用程序输出“Hello from Lakebase!”,并显示最近的笔记

步骤 4:部署和验证

在本地测试后,将更改同步到工作区文件夹并从该位置部署:

# Upload files to workspace
databricks sync . /Workspace/Users/<your-email>/my-lakebase-app

# Deploy from the uploaded location
databricks apps deploy <app-name> --source-code-path /Workspace/Users/<your-email>/my-lakebase-app

<your-email> 替换为Azure Databricks电子邮件地址,并将 <app-name> 替换为应用名称。 该 --source-code-path 标志指示部署使用上传的文件,而不是应用的默认位置。

等待部署完成(2-3 分钟),然后通过提供的 URL 访问应用。 应会看到“Hello from Lakebase!”,其中包含示例说明。

另请参阅