Skip to content

Commit 3405711

Browse files
authored
Merge pull request #19 from antonbricks/lakebase-streamlit
Add Streamlit app to query Databricks Lakehouse OLTP tables
2 parents a6d4f5a + 259cab0 commit 3405711

File tree

6 files changed

+325
-3
lines changed

6 files changed

+325
-3
lines changed
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
---
2+
sidebar_position: 1
3+
---
4+
5+
# Connect an OLTP database
6+
7+
This app connects to a [Databricks Lakebase](https://docs.databricks.com/aws/en/oltp/) OLTP database instance. Provide the instance name, database, schema, and table.
8+
9+
## Code snippet
10+
11+
```python title="app.py"
12+
import uuid
13+
import streamlit as st
14+
import pandas as pd
15+
16+
from databricks.sdk import WorkspaceClient
17+
18+
import psycopg
19+
from psycopg_pool import ConnectionPool
20+
21+
22+
w = WorkspaceClient()
23+
24+
25+
def _generate_token(instance_name: str) -> str:
26+
cred = w.database.generate_database_credential(
27+
request_id=str(uuid.uuid4()), instance_names=[instance_name]
28+
)
29+
return cred.token
30+
31+
32+
class RotatingTokenConnection(psycopg.Connection):
33+
@classmethod
34+
def connect(cls, conninfo: str = "", **kwargs):
35+
instance_name = kwargs.pop("_instance_name")
36+
kwargs["password"] = _generate_token(instance_name)
37+
kwargs.setdefault("sslmode", "require")
38+
return super().connect(conninfo, **kwargs)
39+
40+
41+
@st.cache_resource
42+
def build_pool(instance_name: str, host: str, user: str, database: str) -> ConnectionPool:
43+
return ConnectionPool(
44+
conninfo=f"host={host} dbname={database} user={user}",
45+
connection_class=RotatingTokenConnection,
46+
kwargs={"_instance_name": instance_name},
47+
min_size=1,
48+
max_size=10,
49+
open=True,
50+
)
51+
52+
53+
def query_df(pool: ConnectionPool, sql: str) -> pd.DataFrame:
54+
with pool.connection() as conn:
55+
with conn.cursor() as cur:
56+
cur.execute(sql)
57+
cols = [d.name for d in cur.description]
58+
rows = cur.fetchall()
59+
60+
return pd.DataFrame(rows, columns=cols)
61+
62+
63+
instance_name = "dbase_instance"
64+
database = "customer_database"
65+
table = "customer_core.customers_oltp"
66+
user = w.current_user.me().user_name
67+
host = w.database.get_database_instance(name=instance_name).read_write_dns
68+
69+
pool = build_pool(instance_name, host, user, database)
70+
df = query_df(pool, f'SELECT * FROM {table} LIMIT 100')
71+
st.dataframe(df)
72+
```
73+
74+
:::info
75+
76+
This sample uses Streamlit's [st.cache_resource](https://docs.streamlit.io/develop/concepts/architecture/caching#stcache_resource) to cache the database connection across users, sessions, and reruns. Use Streamlit's caching decorators to implement a caching strategy that works for your use case.
77+
78+
:::
79+
80+
## Resources
81+
82+
- [Lakebase](https://docs.databricks.com/aws/en/oltp/) database instance (PostgreSQL).
83+
- Target PostgreSQL database/schema/table.
84+
85+
## Permissions
86+
87+
First, the database instance should be specified in your [**App resources**](https://docs.databricks.com/aws/en/dev-tools/databricks-apps/resources).
88+
89+
Then, your [app service principal](https://docs.databricks.com/aws/en/dev-tools/databricks-apps/#how-does-databricks-apps-manage-authorization) needs the following permissions:
90+
91+
GRANT CONNECT ON DATABASE databricks_postgres TO "099f0306-9e29-4a87-84c0-3046e4bcea02";
92+
GRANT USAGE ON SCHEMA public TO "099f0306-9e29-4a87-84c0-3046e4bcea02";
93+
GRANT SELECT, INSERT, UPDATE, DELETE ON TABLE quotes_history TO "099f0306-9e29-4a87-84c0-3046e4bcea02";
94+
95+
See [this guide](https://docs.databricks.com/aws/en/oltp/pg-roles?language=PostgreSQL#create-postgres-roles-and-grant-privileges-for-databricks-identities) for more information.
96+
97+
[This guide](https://learn.microsoft.com/en-us/azure/databricks/oltp/query/sql-editor#create-a-new-query) shows you how to query your Lakebase.
98+
99+
## Dependencies
100+
101+
- [Databricks SDK](https://pypi.org/project/databricks-sdk/) - `databricks-sdk`
102+
- [`psycopg[binary]`](https://pypi.org/project/psycopg/), [`psycopg-pool`](https://pypi.org/project/psycopg-pool/)
103+
- [Pandas](https://pypi.org/project/pandas/) - `pandas`
104+
- [Streamlit](https://pypi.org/project/streamlit/) - `streamlit`
105+
106+
```python title="requirements.txt"
107+
databricks-sdk
108+
databricks-sql-connector
109+
pandas
110+
streamlit
111+
psycopg[binary]
112+
psycopg-pool
113+
```

docs/docs/streamlit/tables/tables_edit.mdx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
---
2-
sidebar_position: 2
2+
sidebar_position: 3
33
---
44

55
# Edit a table

docs/docs/streamlit/tables/tables_read.mdx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
---
2-
sidebar_position: 1
2+
sidebar_position: 2
33
---
44

55
# Read a table

streamlit/requirements.txt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
databricks-connect==16.0.0
2-
databricks-sdk[openai]==0.46.0
2+
databricks-sdk[openai]==0.60.0
33
databricks-sql-connector==4.0.0
44
pandas==2.2.3
55
streamlit==1.41.1
6+
psycopg[binary]==3.2.9
7+
psycopg-pool==3.2.6

streamlit/view_groups.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,12 @@
1212
{
1313
"title": "Tables",
1414
"views": [
15+
{
16+
"label": "Connect an OLTP database",
17+
"help": "Query an OLTP database instance table.",
18+
"page": "views/oltp_database_connect.py",
19+
"icon": ":material/database:",
20+
},
1521
{
1622
"label": "Read a table",
1723
"help": "Query a Unity Catalog Delta table.",
Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
import uuid
2+
import streamlit as st
3+
import pandas as pd
4+
5+
from databricks.sdk import WorkspaceClient
6+
7+
import psycopg
8+
from psycopg_pool import ConnectionPool
9+
10+
11+
st.header("OLTP Database", divider=True)
12+
st.subheader("Connect a table")
13+
st.write(
14+
"This app connects to a [Databricks Lakebase](https://docs.databricks.com/aws/en/oltp/) OLTP database instance. "
15+
"Provide the instance name, database, schema, and table."
16+
)
17+
18+
19+
w = WorkspaceClient()
20+
21+
22+
def generate_token(instance_name: str) -> str:
23+
cred = w.database.generate_database_credential(
24+
request_id=str(uuid.uuid4()), instance_names=[instance_name]
25+
)
26+
return cred.token
27+
28+
29+
class RotatingTokenConnection(psycopg.Connection):
30+
"""psycopg3 Connection that injects a fresh OAuth token as the password."""
31+
32+
@classmethod
33+
def connect(cls, conninfo: str = "", **kwargs):
34+
instance_name = kwargs.pop("_instance_name")
35+
kwargs["password"] = generate_token(instance_name)
36+
kwargs.setdefault("sslmode", "require")
37+
return super().connect(conninfo, **kwargs)
38+
39+
40+
@st.cache_resource
41+
def build_pool(*, instance_name: str, host: str, user: str, database: str) -> ConnectionPool:
42+
conninfo = f"host={host} dbname={database} user={user}"
43+
return ConnectionPool(
44+
conninfo=conninfo,
45+
connection_class=RotatingTokenConnection,
46+
kwargs={"_instance_name": instance_name},
47+
min_size=1,
48+
max_size=10,
49+
open=True,
50+
)
51+
52+
53+
def query_df(pool: ConnectionPool, sql: str) -> pd.DataFrame:
54+
with pool.connection() as conn:
55+
with conn.cursor() as cur:
56+
cur.execute(sql)
57+
cols = [d.name for d in cur.description]
58+
rows = cur.fetchall()
59+
return pd.DataFrame(rows, columns=cols)
60+
61+
62+
tab_try, tab_code, tab_reqs = st.tabs(["**Try it**", "**Code snippet**", "**Requirements**"])
63+
64+
with tab_try:
65+
instance_names = [i.name for i in w.database.list_database_instances()]
66+
instance_name = st.selectbox("Database instance:", instance_names)
67+
database = st.text_input("Database:", placeholder="customer_database")
68+
table = st.text_input("Table in a database schema:", placeholder="customer_core.customers_oltp")
69+
limit = st.text_input("Limit:", value=10)
70+
71+
user = w.current_user.me().user_name
72+
host = ""
73+
if instance_name:
74+
host = w.database.get_database_instance(name=instance_name).read_write_dns
75+
76+
if st.button("Run a query"):
77+
if not all([instance_name, host, database, table]):
78+
st.error("Please provide instance, database, and schema-table.")
79+
else:
80+
pool = build_pool(instance_name=instance_name, host=host, user=user, database=database)
81+
sql = f"SELECT * FROM {table} LIMIT {int(limit)};"
82+
df = query_df(pool, sql)
83+
st.dataframe(df, use_container_width=True)
84+
85+
with tab_code:
86+
st.code(
87+
'''
88+
import uuid
89+
import streamlit as st
90+
import pandas as pd
91+
92+
from databricks.sdk import WorkspaceClient
93+
94+
import psycopg
95+
from psycopg_pool import ConnectionPool
96+
97+
98+
w = WorkspaceClient()
99+
100+
101+
def generate_token(instance_name: str) -> str:
102+
cred = w.database.generate_database_credential(
103+
request_id=str(uuid.uuid4()), instance_names=[instance_name]
104+
)
105+
106+
return cred.token
107+
108+
109+
class RotatingTokenConnection(psycopg.Connection):
110+
@classmethod
111+
def connect(cls, conninfo: str = "", **kwargs):
112+
instance_name = kwargs.pop("_instance_name")
113+
kwargs["password"] = generate_token(instance_name)
114+
kwargs.setdefault("sslmode", "require")
115+
return super().connect(conninfo, **kwargs)
116+
117+
118+
@st.cache_resource
119+
def build_pool(instance_name: str, host: str, user: str, database: str) -> ConnectionPool:
120+
return ConnectionPool(
121+
conninfo=f"host={host} dbname={database} user={user}",
122+
connection_class=RotatingTokenConnection,
123+
kwargs={"_instance_name": instance_name},
124+
min_size=1,
125+
max_size=10,
126+
open=True,
127+
)
128+
129+
130+
def query_df(pool: ConnectionPool, sql: str) -> pd.DataFrame:
131+
with pool.connection() as conn:
132+
with conn.cursor() as cur:
133+
cur.execute(sql)
134+
cols = [d.name for d in cur.description]
135+
rows = cur.fetchall()
136+
137+
return pd.DataFrame(rows, columns=cols)
138+
139+
140+
instance_name = "dbase_instance"
141+
database = "customer_database"
142+
table = "customer_core.customers_oltp"
143+
user = w.current_user.me().user_name
144+
host = w.database.get_database_instance(name=instance_name).read_write_dns
145+
146+
pool = build_pool(instance_name, host, user, database)
147+
df = query_df(pool, f'SELECT * FROM {table} LIMIT 100')
148+
st.dataframe(df)
149+
''',
150+
language="python",
151+
)
152+
153+
with tab_reqs:
154+
col1, col2, col3 = st.columns(3)
155+
156+
with col1:
157+
st.markdown(
158+
"""
159+
**Permissions (app service principal)**
160+
* The database instance should be specified in your [**App resources**](https://docs.databricks.com/aws/en/dev-tools/databricks-apps/resources).
161+
* A PostgreSQL role for the service principal is **required**.
162+
See [this guide](https://docs.databricks.com/aws/en/oltp/pg-roles?language=PostgreSQL#create-postgres-roles-and-grant-privileges-for-databricks-identities).
163+
* The PostgreSQL service principal role should have these example grants:
164+
"""
165+
)
166+
st.code(
167+
'''
168+
GRANT CONNECT ON DATABASE databricks_postgres TO "099f0306-9e29-4a87-84c0-3046e4bcea02";
169+
GRANT USAGE ON SCHEMA public TO "099f0306-9e29-4a87-84c0-3046e4bcea02";
170+
GRANT SELECT, INSERT, UPDATE, DELETE ON TABLE quotes_history TO "099f0306-9e29-4a87-84c0-3046e4bcea02";
171+
''',
172+
language="sql",
173+
)
174+
st.caption(
175+
"[This guide](https://learn.microsoft.com/en-us/azure/databricks/oltp/query/sql-editor#create-a-new-query) "
176+
"shows you how to query your Lakebase."
177+
)
178+
179+
with col2:
180+
st.markdown(
181+
"""
182+
**Databricks resources**
183+
* [Lakebase](https://docs.databricks.com/aws/en/oltp/) database instance (PostgreSQL).
184+
* Target PostgreSQL database/schema/table.
185+
"""
186+
)
187+
188+
with col3:
189+
st.markdown(
190+
"""
191+
**Dependencies**
192+
* [Databricks SDK](https://pypi.org/project/databricks-sdk/) - `databricks-sdk`
193+
* [`psycopg[binary]`](https://pypi.org/project/psycopg/), [`psycopg-pool`](https://pypi.org/project/psycopg-pool/)
194+
* [Pandas](https://pypi.org/project/pandas/) - `pandas`
195+
* [Streamlit](https://pypi.org/project/streamlit/) - `streamlit`
196+
"""
197+
)
198+
199+
st.caption(
200+
"Tokens expire periodically; this app refreshes on each new connection and enforces TLS (sslmode=require)."
201+
)

0 commit comments

Comments
 (0)