Skip to content

Commit 301e099

Browse files
authored
Merge pull request #34375 from def-/pr-scalability-password-sasl
scalability framework: Introduce password + sasl connection tests
2 parents 47eff9e + f185862 commit 301e099

File tree

6 files changed

+151
-18
lines changed

6 files changed

+151
-18
lines changed

ci/nightly/pipeline.template.yml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,8 @@ steps:
215215
- id: scalability-benchmark-connection
216216
label: "Scalability benchmark (connection) against merge base or 'latest'"
217217
depends_on: build-x86_64
218-
timeout_in_minutes: 180
218+
# TODO: Lower the timeout again once password/sasl auth is performant: https://github.com/MaterializeInc/database-issues/issues/9956
219+
timeout_in_minutes: 1440
219220
agents:
220221
# Larger instance is more stable in performance
221222
queue: hetzner-x86-64-dedi-16cpu-64gb
@@ -232,9 +233,9 @@ steps:
232233
- --workload-group-marker
233234
- "ConnectionWorkload"
234235
- --exponent-base
235-
- "2.5"
236+
- "2"
236237
- --max-concurrency
237-
- "2048"
238+
- "512"
238239
- id: parallel-benchmark
239240
label: "Parallel Benchmark"
240241
depends_on: build-x86_64

misc/python/materialize/scalability/endpoint/endpoint.py

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,18 @@
77
# the Business Source License, use of this software will be governed
88
# by the Apache License, Version 2.0.
99

10+
from enum import Enum, auto
1011
from typing import Any
1112

1213
import psycopg
1314

1415

16+
class ConnectionKind(Enum):
17+
Plain = auto()
18+
Password = auto()
19+
Sasl = auto()
20+
21+
1522
class Endpoint:
1623

1724
_version: str | None = None
@@ -20,19 +27,26 @@ def __init__(self, specified_target: str):
2027
self._specified_target = specified_target
2128

2229
def sql_connection(
23-
self, quiet: bool = False
30+
self, quiet: bool = False, kind: ConnectionKind = ConnectionKind.Plain
2431
) -> psycopg.connection.Connection[tuple[Any, ...]]:
2532
if not quiet:
2633
print(f"Connecting to URL: {self.url()}")
2734

28-
conn = psycopg.connect(self.url())
35+
conn = psycopg.connect(self.url(kind))
2936
conn.autocommit = True
3037
return conn
3138

32-
def url(self) -> str:
33-
return (
34-
f"postgresql://{self.user()}:{self.password()}@{self.host()}:{self.port()}"
39+
def url(self, kind: ConnectionKind = ConnectionKind.Plain) -> str:
40+
port = (
41+
self.port()
42+
if kind == ConnectionKind.Plain
43+
else (
44+
self.port_password()
45+
if kind == ConnectionKind.Password
46+
else self.port_sasl()
47+
)
3548
)
49+
return f"postgresql://{self.user()}:{self.password()}@{self.host()}:{port}/{self.database()}"
3650

3751
def specified_target(self) -> str:
3852
return self._specified_target
@@ -49,9 +63,18 @@ def user(self) -> str:
4963
def password(self) -> str:
5064
raise NotImplementedError
5165

66+
def database(self) -> str:
67+
raise NotImplementedError
68+
5269
def port(self) -> int:
5370
raise NotImplementedError
5471

72+
def port_password(self) -> int:
73+
raise NotImplementedError
74+
75+
def port_sasl(self) -> int:
76+
raise NotImplementedError
77+
5578
def up(self) -> None:
5679
raise NotImplementedError
5780

misc/python/materialize/scalability/endpoint/endpoints.py

Lines changed: 40 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
import psycopg
1111

12-
from materialize import git
12+
from materialize import MZ_ROOT, git
1313
from materialize.mzcompose import ADDITIONAL_BENCHMARKING_SYSTEM_PARAMETERS
1414
from materialize.mzcompose.composition import Composition
1515
from materialize.mzcompose.services.materialized import Materialized
@@ -60,6 +60,9 @@ def user(self) -> str:
6060
def password(self) -> str:
6161
return "postgres"
6262

63+
def database(self) -> str:
64+
return "postgres"
65+
6366
def up(self) -> None:
6467
self.composition.down(destroy_volumes=True)
6568
with self.composition.override(Postgres()):
@@ -83,12 +86,6 @@ def host(self) -> str:
8386
def internal_host(self) -> str:
8487
return "localhost"
8588

86-
def user(self) -> str:
87-
return "materialize"
88-
89-
def password(self) -> str:
90-
return "materialize"
91-
9289
def internal_port(self) -> int:
9390
raise NotImplementedError
9491

@@ -115,6 +112,15 @@ def port(self) -> int:
115112
def internal_port(self) -> int:
116113
return 6877
117114

115+
def user(self) -> str:
116+
return "materialize"
117+
118+
def password(self) -> str:
119+
return "materialize"
120+
121+
def database(self) -> str:
122+
return "materialize"
123+
118124
def up(self) -> None:
119125
self.lift_limits()
120126

@@ -138,17 +144,36 @@ def __init__(
138144
alternative_image if image != alternative_image else None
139145
)
140146
self._port: int | None = None
147+
self._port_password: int | None = None
148+
self._port_sasl: int | None = None
141149
self._resolved_target = resolved_target
142150
self.use_balancerd = use_balancerd
143151
super().__init__(specified_target)
144152

145153
def resolved_target(self) -> str | None:
146154
return self._resolved_target
147155

156+
def user(self) -> str:
157+
return "user1"
158+
159+
def password(self) -> str:
160+
return "password"
161+
162+
def database(self) -> str:
163+
return "materialize"
164+
148165
def port(self) -> int:
149166
assert self._port is not None
150167
return self._port
151168

169+
def port_password(self) -> int:
170+
assert self._port_password is not None
171+
return self._port_password
172+
173+
def port_sasl(self) -> int:
174+
assert self._port_sasl is not None
175+
return self._port_sasl
176+
152177
def internal_port(self) -> int:
153178
return self.composition.port("materialized", 6877)
154179

@@ -182,16 +207,24 @@ def up_internal(self) -> None:
182207
additional_system_parameter_defaults=ADDITIONAL_BENCHMARKING_SYSTEM_PARAMETERS,
183208
external_metadata_store=True,
184209
metadata_store="cockroach",
210+
listeners_config_path=f"{MZ_ROOT}/src/materialized/ci/listener_configs/testdrive_sasl.json",
185211
)
186212
):
187213
self.composition.up("materialized")
188214
self.composition.verify_build_profile()
215+
self.composition.sql(
216+
"create role \"user1\" with login password 'password';"
217+
)
218+
self.composition.sql("grant materialize to user1;")
189219

190220
if self.use_balancerd:
191221
self.composition.up("balancerd")
192222
self._port = self.composition.default_port("balancerd")
193223
else:
194224
self._port = self.composition.default_port("materialized")
225+
# TODO: Also make it work through balancerd
226+
self._port_password = self.composition.port("materialized", 6880)
227+
self._port_sasl = self.composition.port("materialized", 6881)
195228

196229
def __str__(self) -> str:
197230
return f"MaterializeContainer ({self.image} specified as {self.specified_target()})"

misc/python/materialize/scalability/operation/operations/operations.py

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
# by the Apache License, Version 2.0.
99
from psycopg import Connection
1010

11-
from materialize.scalability.endpoint.endpoint import Endpoint
11+
from materialize.scalability.endpoint.endpoint import ConnectionKind, Endpoint
1212
from materialize.scalability.operation.operation_data import OperationData
1313
from materialize.scalability.operation.scalability_operation import (
1414
Operation,
@@ -201,7 +201,55 @@ def _execute(self, data: OperationData) -> OperationData:
201201
endpoint: Endpoint = data.get("endpoint")
202202
schema: Schema = data.get("schema")
203203

204-
connection = endpoint.sql_connection(quiet=True)
204+
connection = endpoint.sql_connection(quiet=True, kind=ConnectionKind.Plain)
205+
connection.autocommit = True
206+
cursor = connection.cursor()
207+
208+
# this sets the database schema
209+
for connect_sql in schema.connect_sqls():
210+
cursor.execute(connect_sql.encode("utf8"))
211+
212+
data.push("connection", connection)
213+
data.push("cursor", cursor)
214+
return data
215+
216+
217+
class ConnectPassword(Operation):
218+
def required_keys(self) -> set[str]:
219+
return {"endpoint", "schema"}
220+
221+
def produced_keys(self) -> set[str]:
222+
return {"connection", "cursor"}
223+
224+
def _execute(self, data: OperationData) -> OperationData:
225+
endpoint: Endpoint = data.get("endpoint")
226+
schema: Schema = data.get("schema")
227+
228+
connection = endpoint.sql_connection(quiet=True, kind=ConnectionKind.Password)
229+
connection.autocommit = True
230+
cursor = connection.cursor()
231+
232+
# this sets the database schema
233+
for connect_sql in schema.connect_sqls():
234+
cursor.execute(connect_sql.encode("utf8"))
235+
236+
data.push("connection", connection)
237+
data.push("cursor", cursor)
238+
return data
239+
240+
241+
class ConnectSasl(Operation):
242+
def required_keys(self) -> set[str]:
243+
return {"endpoint", "schema"}
244+
245+
def produced_keys(self) -> set[str]:
246+
return {"connection", "cursor"}
247+
248+
def _execute(self, data: OperationData) -> OperationData:
249+
endpoint: Endpoint = data.get("endpoint")
250+
schema: Schema = data.get("schema")
251+
252+
connection = endpoint.sql_connection(quiet=True, kind=ConnectionKind.Sasl)
205253
connection.autocommit = True
206254
cursor = connection.cursor()
207255

misc/python/materialize/scalability/workload/workloads/connection_workloads.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
from materialize.scalability.operation.operation_data import OperationData
1010
from materialize.scalability.operation.operations.operations import (
1111
Connect,
12+
ConnectPassword,
13+
ConnectSasl,
1214
Disconnect,
1315
SelectOne,
1416
)
@@ -28,3 +30,29 @@ def amend_data_before_execution(self, data: OperationData) -> None:
2830

2931
def operations(self) -> list["Operation"]:
3032
return [OperationChainWithDataExchange([Connect(), SelectOne(), Disconnect()])]
33+
34+
35+
class EstablishPasswordConnectionWorkload(WorkloadWithContext, ConnectionWorkload):
36+
def amend_data_before_execution(self, data: OperationData) -> None:
37+
data.push("endpoint", self.endpoint)
38+
data.push("schema", self.schema)
39+
data.remove("cursor")
40+
41+
def operations(self) -> list["Operation"]:
42+
return [
43+
OperationChainWithDataExchange(
44+
[ConnectPassword(), SelectOne(), Disconnect()]
45+
)
46+
]
47+
48+
49+
class EstablishSaslConnectionWorkload(WorkloadWithContext, ConnectionWorkload):
50+
def amend_data_before_execution(self, data: OperationData) -> None:
51+
data.push("endpoint", self.endpoint)
52+
data.push("schema", self.schema)
53+
data.remove("cursor")
54+
55+
def operations(self) -> list["Operation"]:
56+
return [
57+
OperationChainWithDataExchange([ConnectSasl(), SelectOne(), Disconnect()])
58+
]

test/scalability/mzcompose.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@
101101
def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
102102
parser.add_argument(
103103
"--target",
104-
help="Target for the benchmark: 'HEAD', 'local', 'remote', 'common-ancestor', 'Postgres', or a DockerHub tag",
104+
help="Target for the benchmark: 'HEAD', 'local', 'remote', 'common-ancestor', 'postgres', or a DockerHub tag",
105105
action="append",
106106
default=[],
107107
)

0 commit comments

Comments
 (0)