|
| 1 | +--- |
| 2 | +sidebar_position: 2 |
| 3 | +--- |
| 4 | + |
| 5 | +# Insert data into a table |
| 6 | + |
| 7 | +This recipe demonstrates how to insert data into a Databricks [Unity Catalog table](https://docs.databricks.com/aws/en/tables/) from a FastAPI application using the [Databricks SQL Connector](https://docs.databricks.com/en/dev-tools/python-sql-connector.html). |
| 8 | + |
| 9 | +:::info |
| 10 | +In this example, we set up our API to be called using the `POST` HTTP method which is the standard choice for creating new resources in REST APIs as defined in RFC 7231. |
| 11 | +Unlike `GET`, POST is not idempotent - making the same request multiple times may create multiple resources. |
| 12 | + |
| 13 | +POST requests are typically used when submitting data to be processed or when creating new resources, with the request body containing the data to be added. |
| 14 | + |
| 15 | +For detailed specifications, refer to [RFC 7231 Section 4.3.3](https://datatracker.ietf.org/doc/html/rfc7231#section-4.3.3) which defines the POST method's semantics and requirements. |
| 16 | + |
| 17 | +::: |
| 18 | + |
| 19 | +## Code snippet |
| 20 | + |
| 21 | +```python title="app.py" |
| 22 | +import os |
| 23 | +from typing import Dict, List |
| 24 | + |
| 25 | +from fastapi import FastAPI, Request |
| 26 | + |
| 27 | +from databricks import sql |
| 28 | +from databricks.sdk.core import Config |
| 29 | + |
| 30 | +DATABRICKS_WAREHOUSE_ID = os.environ.get("DATABRICKS_WAREHOUSE_ID") or None |
| 31 | + |
| 32 | +app = FastAPI() |
| 33 | +databricks_cfg = Config() |
| 34 | + |
| 35 | + |
| 36 | +def get_connection(warehouse_id: str): |
| 37 | + http_path = f"/sql/1.0/warehouses/{DATABRICKS_WAREHOUSE_ID}" |
| 38 | + return sql.connect( |
| 39 | + server_hostname=databricks_cfg.host, |
| 40 | + http_path=http_path, |
| 41 | + credentials_provider=lambda: databricks_cfg.authenticate, |
| 42 | + ) |
| 43 | + |
| 44 | + |
| 45 | +def insert_data(table_path: str, data: List[Dict], warehouse_id: str) -> int: |
| 46 | + conn = get_connection(warehouse_id) |
| 47 | + try: |
| 48 | + with conn.cursor() as cursor: |
| 49 | + # Get columns |
| 50 | + columns = list(data[0].keys()) |
| 51 | + columns_str = ", ".join(columns) |
| 52 | + placeholders = ", ".join(["?"] * len(columns)) |
| 53 | + |
| 54 | + # Build the INSERT statement with multiple VALUES clauses |
| 55 | + values_clauses = [] |
| 56 | + all_values = [] |
| 57 | + |
| 58 | + for record in data: |
| 59 | + values_clauses.append(f"({placeholders})") |
| 60 | + all_values.extend(record[col] for col in columns) |
| 61 | + |
| 62 | + insert_query = f""" |
| 63 | + INSERT INTO {table_path} ({columns_str}) |
| 64 | + VALUES {", ".join(values_clauses)} |
| 65 | + """ |
| 66 | + |
| 67 | + # Execute the insert with all values in a single statement |
| 68 | + print(f"Executing query: {insert_query}") |
| 69 | + cursor.execute(insert_query, all_values) |
| 70 | + return cursor.rowcount |
| 71 | + |
| 72 | + except Exception as e: |
| 73 | + raise Exception(f"Failed to insert data: {str(e)}") |
| 74 | + |
| 75 | + |
| 76 | +@app.post("/api/v1/table") |
| 77 | +async def insert_table_data(request: Request): |
| 78 | + request_as_json = await request.json() |
| 79 | + request_as_dict = dict(request_as_json) |
| 80 | + results = None |
| 81 | + try: |
| 82 | + # Build the table path |
| 83 | + table_path = f"{request_as_dict['catalog']}.{request_as_dict['schema']}.{request_as_dict['table']}" |
| 84 | + |
| 85 | + # Insert the data |
| 86 | + records_inserted = insert_data( |
| 87 | + table_path=table_path, |
| 88 | + data=request_as_dict["data"], |
| 89 | + warehouse_id=DATABRICKS_WAREHOUSE_ID, |
| 90 | + ) |
| 91 | + |
| 92 | + # Ensure records_inserted is not negative (DBSQL Side Effect) |
| 93 | + if records_inserted < 0: |
| 94 | + records_inserted = len(request_as_dict["data"]) |
| 95 | + |
| 96 | + # Create the response |
| 97 | + results = {"data": request_as_dict["data"], "count": records_inserted} |
| 98 | + except Exception as e: |
| 99 | + raise Exception(f"FastAPI Request Failed: {str(e)}") |
| 100 | + |
| 101 | + return {"results": results} |
| 102 | +``` |
| 103 | + |
| 104 | +:::warning |
| 105 | + |
| 106 | +The above example is shortened for brevity and not suitable for production use. |
| 107 | +You can find a more advanced sample in the databricks-apps-cookbook GitHub repository. |
| 108 | + |
| 109 | +::: |
| 110 | + |
| 111 | +### Example Usage |
| 112 | + |
| 113 | +In this example, we will create the following Unity Catalog table called `my_catalog.my_schema.trips` via Databricks SQL Editor. |
| 114 | +It is assumed that the user/service principal identity has the appropriate Unity Catalog grants to make changes as required. |
| 115 | + |
| 116 | +```sql |
| 117 | +CREATE OR REPLACE TABLE my_catalog.my_schema.trips ( |
| 118 | + trip_id INT, |
| 119 | + passenger_count INT, |
| 120 | + trip_distance FLOAT, |
| 121 | + pickup_datetime TIMESTAMP, |
| 122 | + dropoff_datetime TIMESTAMP, |
| 123 | + payment_type STRING, |
| 124 | + fare_amount FLOAT, |
| 125 | + tip_amount FLOAT |
| 126 | +) |
| 127 | +``` |
| 128 | + |
| 129 | +Once the table has been created, you can provide data (list of dicts) to be inserted using the API example provided above. |
| 130 | +To highlight this, please consult the example Python script below, noting the `POST` verb and `JSON` data payload. |
| 131 | + |
| 132 | +```python title="insert_data_into_table.py" |
| 133 | +from databricks.sdk.core import Config |
| 134 | +import requests |
| 135 | + |
| 136 | +config = Config(profile="my-env") |
| 137 | +token = config.oauth_token().access_token |
| 138 | + |
| 139 | +rows_to_be_inserted = [ |
| 140 | + { |
| 141 | + "trip_id": 1, |
| 142 | + "passenger_count": 1, |
| 143 | + "trip_distance": 10.0, |
| 144 | + "pickup_datetime": "2024-01-01 12:00:00", |
| 145 | + "dropoff_datetime": "2024-01-01 12:10:00", |
| 146 | + "payment_type": "credit_card", |
| 147 | + "fare_amount": 15.0, |
| 148 | + "tip_amount": 2.0, |
| 149 | + }, |
| 150 | + { |
| 151 | + "trip_id": 2, |
| 152 | + "passenger_count": 1, |
| 153 | + "trip_distance": 86.0, |
| 154 | + "pickup_datetime": "2024-01-01 14:00:00", |
| 155 | + "dropoff_datetime": "2024-01-01 15:13:00", |
| 156 | + "payment_type": "cash", |
| 157 | + "fare_amount": 15.0, |
| 158 | + "tip_amount": 3.0, |
| 159 | + }, |
| 160 | + { |
| 161 | + "trip_id": 3, |
| 162 | + "passenger_count": 1, |
| 163 | + "trip_distance": 6.0, |
| 164 | + "pickup_datetime": "2024-01-01 15:31:00", |
| 165 | + "dropoff_datetime": "2024-01-01 15:45:00", |
| 166 | + "payment_type": "cash", |
| 167 | + "fare_amount": 15.0, |
| 168 | + "tip_amount": 3.0, |
| 169 | + }, |
| 170 | +] |
| 171 | + |
| 172 | +response = requests.post( |
| 173 | + "https://<your-app-url>.databricksapps.com/api/v1/table", |
| 174 | + headers={"Authorization": f"Bearer {token}"}, |
| 175 | + json={ |
| 176 | + "catalog": "my_catalog", |
| 177 | + "schema": "my_schema", |
| 178 | + "table": "trips", |
| 179 | + "data": rows_to_be_inserted, |
| 180 | + }, |
| 181 | +) |
| 182 | +print(response.json()) |
| 183 | +``` |
| 184 | + |
| 185 | +If the request was successful, you will get the following output in your terminal: |
| 186 | + |
| 187 | +```shell |
| 188 | +{'data': [{'trip_id': 1, 'passenger_count': 1, 'trip_distance': 10.0, 'pickup_datetime': '2024-01-01 12:00:00', 'dropoff_datetime': '2024-01-01 12:10:00', 'payment_type': 'credit_card', 'fare_amount': 15.0, 'tip_amount': 2.0}, {'trip_id': 2, 'passenger_count': 1, 'trip_distance': 86.0, 'pickup_datetime': '2024-01-01 14:00:00', 'dropoff_datetime': '2024-01-01 15:13:00', 'payment_type': 'cash', 'fare_amount': 15.0, 'tip_amount': 3.0}, {'trip_id': 3, 'passenger_count': 1, 'trip_distance': 6.0, 'pickup_datetime': '2024-01-01 15:31:00', 'dropoff_datetime': '2024-01-01 15:45:00', 'payment_type': 'cash', 'fare_amount': 15.0, 'tip_amount': 3.0}], 'count': 3, 'total': 3} |
| 189 | +``` |
| 190 | + |
| 191 | +## Resources |
| 192 | + |
| 193 | +- [SQL warehouse](https://docs.databricks.com/aws/en/compute/sql-warehouse/) |
| 194 | +- [Unity Catalog table](https://docs.databricks.com/aws/en/tables/) |
| 195 | + |
| 196 | +## Permissions |
| 197 | + |
| 198 | +Your [app service principal](https://docs.databricks.com/aws/en/dev-tools/databricks-apps/#how-does-databricks-apps-manage-authorization) needs the following permissions: |
| 199 | + |
| 200 | +- `SELECT` and `MODIFY` on the Unity Catalog table |
| 201 | +- `CAN USE` on the SQL warehouse |
| 202 | + |
| 203 | +See Unity [Catalog privileges and securable objects](https://docs.databricks.com/aws/en/data-governance/unity-catalog/manage-privileges/privileges) for more information. |
| 204 | + |
| 205 | +## Dependencies |
| 206 | + |
| 207 | +- [Databricks SDK for Python](https://pypi.org/project/databricks-sdk/) - `databricks-sdk` |
| 208 | +- [Databricks SQL Connector for Python](https://pypi.org/project/databricks-sql-connector/) - `databricks-sql-connector` |
| 209 | +- [FastAPI](https://pypi.org/project/fastapi/) - `fastapi` |
| 210 | +- [uvicorn](https://pypi.org/project/uvicorn/) - `uvicorn` |
| 211 | + |
| 212 | +```python title="requirements.txt" |
| 213 | +databricks-sdk |
| 214 | +databricks-sql-connector |
| 215 | +fastapi |
| 216 | +uvicorn |
| 217 | +``` |
0 commit comments