Skip to content

Commit 84b2b09

Browse files
committed
Add Streamlit OLTP
1 parent a6d4f5a commit 84b2b09

File tree

3 files changed

+209
-1
lines changed

3 files changed

+209
-1
lines changed

streamlit/requirements.txt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
databricks-connect==16.0.0
2-
databricks-sdk[openai]==0.46.0
2+
databricks-sdk[openai]==0.60.0
33
databricks-sql-connector==4.0.0
44
pandas==2.2.3
55
streamlit==1.41.1
6+
psycopg[binary]==3.2.9
7+
psycopg-pool==3.2.6

streamlit/view_groups.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,12 @@
1212
{
1313
"title": "Tables",
1414
"views": [
15+
{
16+
"label": "Connect an OLTP database",
17+
"help": "Query an OLTP database instance table.",
18+
"page": "views/oltp_database_connect.py",
19+
"icon": ":material/database:",
20+
},
1521
{
1622
"label": "Read a table",
1723
"help": "Query a Unity Catalog Delta table.",
Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
import uuid
2+
import streamlit as st
3+
import pandas as pd
4+
5+
from databricks.sdk import WorkspaceClient
6+
7+
import psycopg
8+
from psycopg_pool import ConnectionPool
9+
10+
11+
st.header("OLTP Database", divider=True)
12+
st.subheader("Connect a table")
13+
st.write(
14+
"This app connects to a [Databricks Lakebase](https://docs.databricks.com/aws/en/oltp/) OLTP database instance. "
15+
"Provide the instance name, database, schema, and table."
16+
)
17+
18+
19+
w = WorkspaceClient()
20+
21+
22+
def generate_token(instance_name: str) -> str:
23+
cred = w.database.generate_database_credential(
24+
request_id=str(uuid.uuid4()), instance_names=[instance_name]
25+
)
26+
return cred.token
27+
28+
29+
class RotatingTokenConnection(psycopg.Connection):
30+
"""psycopg3 Connection that injects a fresh OAuth token as the password."""
31+
32+
@classmethod
33+
def connect(cls, conninfo: str = "", **kwargs):
34+
instance_name = kwargs.pop("_instance_name")
35+
kwargs["password"] = generate_token(instance_name)
36+
kwargs.setdefault("sslmode", "require")
37+
return super().connect(conninfo, **kwargs)
38+
39+
40+
@st.cache_resource
41+
def build_pool(*, instance_name: str, host: str, user: str, database: str) -> ConnectionPool:
42+
conninfo = f"host={host} dbname={database} user={user}"
43+
return ConnectionPool(
44+
conninfo=conninfo,
45+
connection_class=RotatingTokenConnection,
46+
kwargs={"_instance_name": instance_name},
47+
min_size=1,
48+
max_size=10,
49+
open=True,
50+
)
51+
52+
53+
def query_df(pool: ConnectionPool, sql: str) -> pd.DataFrame:
54+
with pool.connection() as conn:
55+
with conn.cursor() as cur:
56+
cur.execute(sql)
57+
cols = [d.name for d in cur.description]
58+
rows = cur.fetchall()
59+
return pd.DataFrame(rows, columns=cols)
60+
61+
62+
tab_try, tab_code, tab_reqs = st.tabs(["**Try it**", "**Code snippet**", "**Requirements**"])
63+
64+
with tab_try:
65+
instance_names = [i.name for i in w.database.list_database_instances()]
66+
instance_name = st.selectbox("Database instance:", instance_names)
67+
database = st.text_input("Database:", placeholder="customer_database")
68+
table = st.text_input("Table in a database schema:", placeholder="customer_core.customers_oltp")
69+
limit = st.text_input("Limit:", value=10)
70+
71+
user = w.current_user.me().user_name
72+
host = ""
73+
if instance_name:
74+
host = w.database.get_database_instance(name=instance_name).read_write_dns
75+
76+
if st.button("Run a query"):
77+
if not all([instance_name, host, database, table]):
78+
st.error("Please provide instance, database, and schema-table.")
79+
else:
80+
pool = build_pool(instance_name=instance_name, host=host, user=user, database=database)
81+
sql = f"SELECT * FROM {table} LIMIT {int(limit)};"
82+
df = query_df(pool, sql)
83+
st.dataframe(df, use_container_width=True)
84+
85+
with tab_code:
86+
st.code(
87+
'''
88+
import uuid
89+
import streamlit as st
90+
import pandas as pd
91+
92+
from databricks.sdk import WorkspaceClient
93+
94+
import psycopg
95+
from psycopg_pool import ConnectionPool
96+
97+
98+
w = WorkspaceClient()
99+
100+
101+
def generate_token(instance_name: str) -> str:
102+
cred = w.database.generate_database_credential(
103+
request_id=str(uuid.uuid4()), instance_names=[instance_name]
104+
)
105+
106+
return cred.token
107+
108+
109+
class RotatingTokenConnection(psycopg.Connection):
110+
@classmethod
111+
def connect(cls, conninfo: str = "", **kwargs):
112+
instance_name = kwargs.pop("_instance_name")
113+
kwargs["password"] = generate_token(instance_name)
114+
kwargs.setdefault("sslmode", "require")
115+
return super().connect(conninfo, **kwargs)
116+
117+
118+
@st.cache_resource
119+
def build_pool(instance_name: str, host: str, user: str, database: str) -> ConnectionPool:
120+
return ConnectionPool(
121+
conninfo=f"host={host} dbname={database} user={user}",
122+
connection_class=RotatingTokenConnection,
123+
kwargs={"_instance_name": instance_name},
124+
min_size=1,
125+
max_size=10,
126+
open=True,
127+
)
128+
129+
130+
def query_df(pool: ConnectionPool, sql: str) -> pd.DataFrame:
131+
with pool.connection() as conn:
132+
with conn.cursor() as cur:
133+
cur.execute(sql)
134+
cols = [d.name for d in cur.description]
135+
rows = cur.fetchall()
136+
137+
return pd.DataFrame(rows, columns=cols)
138+
139+
140+
instance_name = "dbase_instance"
141+
database = "customer_database"
142+
table = "customer_core.customers_oltp"
143+
user = w.current_user.me().user_name
144+
host = w.database.get_database_instance(name=instance_name).read_write_dns
145+
146+
pool = build_pool(instance_name, host, user, database)
147+
df = query_df(pool, f'SELECT * FROM {table} LIMIT 100')
148+
st.dataframe(df)
149+
''',
150+
language="python",
151+
)
152+
153+
with tab_reqs:
154+
col1, col2, col3 = st.columns(3)
155+
156+
with col1:
157+
st.markdown(
158+
"""
159+
**Permissions (app service principal)**
160+
* A PostgreSQL role for the service principal is **required**.
161+
See [this guide](https://docs.databricks.com/aws/en/oltp/pg-roles?language=PostgreSQL#create-postgres-roles-and-grant-privileges-for-databricks-identities).
162+
* The PostgreSQL service principal role should have these example grants:
163+
"""
164+
)
165+
st.code(
166+
'''
167+
GRANT CONNECT ON DATABASE customer_database TO "<YOUR-SERVICE-PRINCIPAL-ID>";
168+
GRANT USAGE ON SCHEMA customer_core TO "<YOUR-SERVICE-PRINCIPAL-ID>";
169+
GRANT SELECT ON TABLE customers_oltp TO "<YOUR-SERVICE-PRINCIPAL-ID>";
170+
''',
171+
language="sql",
172+
)
173+
st.caption(
174+
"[This guide](https://learn.microsoft.com/en-us/azure/databricks/oltp/query/sql-editor#create-a-new-query) "
175+
"shows you how to query your Lakebase."
176+
)
177+
178+
with col2:
179+
st.markdown(
180+
"""
181+
**Databricks resources**
182+
* [Lakebase](https://docs.databricks.com/aws/en/oltp/) database instance (PostgreSQL).
183+
* Target PostgreSQL database/schema/table.
184+
"""
185+
)
186+
187+
with col3:
188+
st.markdown(
189+
"""
190+
**Dependencies**
191+
* [Databricks SDK](https://pypi.org/project/databricks-sdk/) - `databricks-sdk`
192+
* [`psycopg[binary]`](https://pypi.org/project/psycopg/), [`psycopg-pool`](https://pypi.org/project/psycopg-pool/)
193+
* [Pandas](https://pypi.org/project/pandas/) - `pandas`
194+
* [Streamlit](https://pypi.org/project/streamlit/) - `streamlit`
195+
"""
196+
)
197+
198+
st.caption(
199+
"Tokens expire periodically; this app refreshes on each new connection and enforces TLS (sslmode=require)."
200+
)

0 commit comments

Comments
 (0)