Skip to content

Commit a22dfb2

Browse files
committed
*: Optimize the logic of creating sqlrunner. #229
1 parent 718ff33 commit a22dfb2

File tree

9 files changed

+263
-88
lines changed

9 files changed

+263
-88
lines changed

cluster/cluster.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ import (
2828
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2929
"k8s.io/apimachinery/pkg/labels"
3030
"k8s.io/apimachinery/pkg/runtime"
31+
"k8s.io/apimachinery/pkg/types"
32+
"sigs.k8s.io/controller-runtime/pkg/client"
3133
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
3234

3335
apiv1alpha1 "github.com/radondb/radondb-mysql-kubernetes/api/v1alpha1"
@@ -331,3 +333,19 @@ func sizeToBytes(s string) (uint64, error) {
331333
}
332334
return 0, fmt.Errorf("'%s' format error, must be a positive integer with a unit of measurement like K, M or G", s)
333335
}
336+
337+
// GetClusterKey returns the MysqlUser's Cluster key.
338+
func (c *Cluster) GetClusterKey() client.ObjectKey {
339+
return client.ObjectKey{
340+
Name: c.Name,
341+
Namespace: c.Namespace,
342+
}
343+
}
344+
345+
// GetKey return the user key. Usually used for logging or for runtime.Client.Get as key.
346+
func (c *Cluster) GetKey() client.ObjectKey {
347+
return types.NamespacedName{
348+
Namespace: c.Namespace,
349+
Name: c.Name,
350+
}
351+
}

cluster/syncer/statefulset.go

Lines changed: 17 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -63,10 +63,13 @@ type StatefulSetSyncer struct {
6363

6464
// Secret resourceVersion.
6565
sctRev string
66+
67+
// Mysql query runner.
68+
internal.SQLRunnerFactory
6669
}
6770

6871
// NewStatefulSetSyncer returns a pointer to StatefulSetSyncer.
69-
func NewStatefulSetSyncer(cli client.Client, c *cluster.Cluster, cmRev, sctRev string) *StatefulSetSyncer {
72+
func NewStatefulSetSyncer(cli client.Client, c *cluster.Cluster, cmRev, sctRev string, sqlRunnerFactory internal.SQLRunnerFactory) *StatefulSetSyncer {
7073
return &StatefulSetSyncer{
7174
Cluster: c,
7275
cli: cli,
@@ -80,8 +83,9 @@ func NewStatefulSetSyncer(cli client.Client, c *cluster.Cluster, cmRev, sctRev s
8083
Namespace: c.Namespace,
8184
},
8285
},
83-
cmRev: cmRev,
84-
sctRev: sctRev,
86+
cmRev: cmRev,
87+
sctRev: sctRev,
88+
SQLRunnerFactory: sqlRunnerFactory,
8589
}
8690
}
8791

@@ -258,6 +262,13 @@ func (s *StatefulSetSyncer) updatePod(ctx context.Context) error {
258262
// 5. Check followerHost current role.
259263
// 6. If followerHost is not leader, switch it to leader through xenon.
260264
func (s *StatefulSetSyncer) preUpdate(ctx context.Context, leader, follower string) error {
265+
leaderRunner, closeConn, err := s.SQLRunnerFactory(internal.NewConfigFromClusterKey(
266+
s.cli, s.Cluster.GetClusterKey(), utils.OperatorUser, utils.LeaderHost))
267+
if err != nil {
268+
return err
269+
}
270+
defer closeConn()
271+
261272
// Status.Replicas indicate the number of Pod has been created.
262273
// So sfs.Spec.Replicas is 2, May be sfs.Status.Replicas maybe are 3, 5 ,
263274
// because it do not update the pods, so it is still the last status.
@@ -272,7 +283,6 @@ func (s *StatefulSetSyncer) preUpdate(ctx context.Context, leader, follower stri
272283
defer utils.RemoveUpdateFile()
273284
sctName := s.GetNameForResource(utils.Secret)
274285
svcName := s.GetNameForResource(utils.HeadlessSVC)
275-
port := utils.MysqlPort
276286
nameSpace := s.Namespace
277287

278288
// Get secrets.
@@ -286,36 +296,21 @@ func (s *StatefulSetSyncer) preUpdate(ctx context.Context, leader, follower stri
286296
); err != nil {
287297
return fmt.Errorf("failed to get the secret: %s", sctName)
288298
}
289-
user, ok := secret.Data["operator-user"]
290-
if !ok {
291-
return fmt.Errorf("failed to get the user: %s", user)
292-
}
293-
password, ok := secret.Data["operator-password"]
294-
if !ok {
295-
return fmt.Errorf("failed to get the password: %s", password)
296-
}
299+
297300
rootPasswd, ok := secret.Data["root-password"]
298301
if !ok {
299302
return fmt.Errorf("failed to get the root password: %s", rootPasswd)
300303
}
301304

302-
leaderHost := fmt.Sprintf("%s.%s.%s", leader, svcName, nameSpace)
303-
leaderRunner, err := internal.NewSQLRunner(utils.BytesToString(user), utils.BytesToString(password), leaderHost, port)
304-
if err != nil {
305-
log.Error(err, "failed to connect the mysql", "node", leader)
306-
return err
307-
}
308-
defer leaderRunner.Close()
309-
310305
if err = retry(time.Second*2, time.Duration(waitLimit)*time.Second, func() (bool, error) {
311306
// Set leader read only.
312-
if err = leaderRunner.RunQuery("SET GLOBAL super_read_only=on;"); err != nil {
307+
if err = leaderRunner.QueryExec(internal.NewQuery("SET GLOBAL super_read_only=on;")); err != nil {
313308
log.Error(err, "failed to set leader read only", "node", leader)
314309
return false, err
315310
}
316311

317312
// Make sure the master has sent all binlog to slave.
318-
success, err := leaderRunner.CheckProcesslist()
313+
success, err := internal.CheckProcesslist(leaderRunner)
319314
if err != nil {
320315
return false, err
321316
}

cluster/syncer/status.go

Lines changed: 15 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,17 @@ type StatusSyncer struct {
4848
*cluster.Cluster
4949

5050
cli client.Client
51+
52+
// Mysql query runner.
53+
internal.SQLRunnerFactory
5154
}
5255

5356
// NewStatusSyncer returns a pointer to StatusSyncer.
54-
func NewStatusSyncer(c *cluster.Cluster, cli client.Client) *StatusSyncer {
57+
func NewStatusSyncer(c *cluster.Cluster, cli client.Client, sqlRunnerFactory internal.SQLRunnerFactory) *StatusSyncer {
5558
return &StatusSyncer{
56-
Cluster: c,
57-
cli: cli,
59+
Cluster: c,
60+
cli: cli,
61+
SQLRunnerFactory: sqlRunnerFactory,
5862
}
5963
}
6064

@@ -144,7 +148,6 @@ func (s *StatusSyncer) Sync(ctx context.Context) (syncer.SyncResult, error) {
144148
func (s *StatusSyncer) updateNodeStatus(ctx context.Context, cli client.Client, pods []corev1.Pod) error {
145149
sctName := s.GetNameForResource(utils.Secret)
146150
svcName := s.GetNameForResource(utils.HeadlessSVC)
147-
port := utils.MysqlPort
148151
nameSpace := s.Namespace
149152

150153
secret := &corev1.Secret{}
@@ -158,14 +161,7 @@ func (s *StatusSyncer) updateNodeStatus(ctx context.Context, cli client.Client,
158161
log.V(1).Info("secret not found", "name", sctName)
159162
return nil
160163
}
161-
user, ok := secret.Data["operator-user"]
162-
if !ok {
163-
return fmt.Errorf("failed to get the user: %s", user)
164-
}
165-
password, ok := secret.Data["operator-password"]
166-
if !ok {
167-
return fmt.Errorf("failed to get the password: %s", password)
168-
}
164+
169165
rootPasswd, ok := secret.Data["root-password"]
170166
if !ok {
171167
return fmt.Errorf("failed to get the root password: %s", rootPasswd)
@@ -187,18 +183,20 @@ func (s *StatusSyncer) updateNodeStatus(ctx context.Context, cli client.Client,
187183
s.updateNodeCondition(node, int(apiv1alpha1.IndexLeader), isLeader)
188184

189185
isLagged, isReplicating, isReadOnly := corev1.ConditionUnknown, corev1.ConditionUnknown, corev1.ConditionUnknown
190-
runner, err := internal.NewSQLRunner(utils.BytesToString(user), utils.BytesToString(password), host, port)
186+
sqlRunner, closeConn, err := s.SQLRunnerFactory(internal.NewConfigFromClusterKey(
187+
s.cli, s.Cluster.GetClusterKey(), utils.OperatorUser, host))
188+
defer closeConn()
191189
if err != nil {
192190
log.Error(err, "failed to connect the mysql", "node", node.Name)
193191
node.Message = err.Error()
194192
} else {
195-
isLagged, isReplicating, err = runner.CheckSlaveStatusWithRetry(checkNodeStatusRetry)
193+
isLagged, isReplicating, err = internal.CheckSlaveStatusWithRetry(sqlRunner, checkNodeStatusRetry)
196194
if err != nil {
197195
log.Error(err, "failed to check slave status", "node", node.Name)
198196
node.Message = err.Error()
199197
}
200198

201-
isReadOnly, err = runner.CheckReadOnly()
199+
isReadOnly, err = internal.CheckReadOnly(sqlRunner)
202200
if err != nil {
203201
log.Error(err, "failed to check read only", "node", node.Name)
204202
node.Message = err.Error()
@@ -208,15 +206,11 @@ func (s *StatusSyncer) updateNodeStatus(ctx context.Context, cli client.Client,
208206
isLeader == corev1.ConditionTrue &&
209207
isReadOnly != corev1.ConditionFalse {
210208
log.V(1).Info("try to correct the leader writeable", "node", node.Name)
211-
runner.RunQuery("SET GLOBAL read_only=off")
212-
runner.RunQuery("SET GLOBAL super_read_only=off")
209+
sqlRunner.QueryExec(internal.NewQuery("SET GLOBAL read_only=off"))
210+
sqlRunner.QueryExec(internal.NewQuery("SET GLOBAL super_read_only=off"))
213211
}
214212
}
215213

216-
if runner != nil {
217-
runner.Close()
218-
}
219-
220214
// update apiv1alpha1.NodeConditionLagged.
221215
s.updateNodeCondition(node, int(apiv1alpha1.IndexLagged), isLagged)
222216
// update apiv1alpha1.NodeConditionReplicating.

cmd/manager/main.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333

3434
mysqlv1alpha1 "github.com/radondb/radondb-mysql-kubernetes/api/v1alpha1"
3535
"github.com/radondb/radondb-mysql-kubernetes/controllers"
36+
"github.com/radondb/radondb-mysql-kubernetes/internal"
3637
//+kubebuilder:scaffold:imports
3738
)
3839

@@ -79,17 +80,19 @@ func main() {
7980
}
8081

8182
if err = (&controllers.ClusterReconciler{
82-
Client: mgr.GetClient(),
83-
Scheme: mgr.GetScheme(),
84-
Recorder: mgr.GetEventRecorderFor("controller.cluster"),
83+
Client: mgr.GetClient(),
84+
Scheme: mgr.GetScheme(),
85+
Recorder: mgr.GetEventRecorderFor("controller.cluster"),
86+
SQLRunnerFactory: internal.NewSQLRunner,
8587
}).SetupWithManager(mgr); err != nil {
8688
setupLog.Error(err, "unable to create controller", "controller", "Cluster")
8789
os.Exit(1)
8890
}
8991
if err = (&controllers.StatusReconciler{
90-
Client: mgr.GetClient(),
91-
Scheme: mgr.GetScheme(),
92-
Recorder: mgr.GetEventRecorderFor("controller.status"),
92+
Client: mgr.GetClient(),
93+
Scheme: mgr.GetScheme(),
94+
Recorder: mgr.GetEventRecorderFor("controller.status"),
95+
SQLRunnerFactory: internal.NewSQLRunner,
9396
}).SetupWithManager(mgr); err != nil {
9497
setupLog.Error(err, "unable to create controller", "controller", "Status")
9598
os.Exit(1)

controllers/cluster_controller.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,17 @@ import (
3535
apiv1alpha1 "github.com/radondb/radondb-mysql-kubernetes/api/v1alpha1"
3636
"github.com/radondb/radondb-mysql-kubernetes/cluster"
3737
clustersyncer "github.com/radondb/radondb-mysql-kubernetes/cluster/syncer"
38+
"github.com/radondb/radondb-mysql-kubernetes/internal"
3839
)
3940

4041
// ClusterReconciler reconciles a Cluster object
4142
type ClusterReconciler struct {
4243
client.Client
4344
Scheme *runtime.Scheme
4445
Recorder record.EventRecorder
46+
47+
// Mysql query runner.
48+
internal.SQLRunnerFactory
4549
}
4650

4751
// +kubebuilder:rbac:groups=mysql.radondb.com,resources=clusters,verbs=get;list;watch;create;update;patch;delete
@@ -114,7 +118,7 @@ func (r *ClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
114118
clustersyncer.NewHeadlessSVCSyncer(r.Client, instance),
115119
clustersyncer.NewLeaderSVCSyncer(r.Client, instance),
116120
clustersyncer.NewFollowerSVCSyncer(r.Client, instance),
117-
clustersyncer.NewStatefulSetSyncer(r.Client, instance, cmRev, sctRev),
121+
clustersyncer.NewStatefulSetSyncer(r.Client, instance, cmRev, sctRev, r.SQLRunnerFactory),
118122
clustersyncer.NewPDBSyncer(r.Client, instance),
119123
}
120124

controllers/status_controller.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import (
4040
apiv1alpha1 "github.com/radondb/radondb-mysql-kubernetes/api/v1alpha1"
4141
"github.com/radondb/radondb-mysql-kubernetes/cluster"
4242
clustersyncer "github.com/radondb/radondb-mysql-kubernetes/cluster/syncer"
43+
"github.com/radondb/radondb-mysql-kubernetes/internal"
4344
)
4445

4546
// reconcileTimePeriod represents the time in which a cluster should be reconciled
@@ -50,6 +51,9 @@ type StatusReconciler struct {
5051
client.Client
5152
Scheme *runtime.Scheme
5253
Recorder record.EventRecorder
54+
55+
// Mysql query runner.
56+
internal.SQLRunnerFactory
5357
}
5458

5559
// Reconcile is part of the main kubernetes reconciliation loop which aims to
@@ -88,7 +92,7 @@ func (r *StatusReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
8892
}
8993
}()
9094

91-
statusSyncer := clustersyncer.NewStatusSyncer(instance, r.Client)
95+
statusSyncer := clustersyncer.NewStatusSyncer(instance, r.Client, r.SQLRunnerFactory)
9296
if err := syncer.Sync(ctx, statusSyncer, r.Recorder); err != nil {
9397
return ctrl.Result{}, err
9498
}

internal/query.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
Copyright 2021 RadonDB.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package internal
18+
19+
import (
20+
"strings"
21+
"errors"
22+
)
23+
24+
// Query contains a escaped query string with variables marked with a question mark (?) and a slice
25+
// of positional arguments.
26+
type Query struct {
27+
escapedQuery string
28+
args []interface{}
29+
}
30+
31+
// String representation of the query.
32+
func (q *Query) String() string {
33+
return q.escapedQuery
34+
}
35+
36+
// Args is used in test.
37+
func (q *Query) Args() []interface{} {
38+
return q.args
39+
}
40+
41+
// NewQuery returns a new Query object.
42+
func NewQuery(q string, args ...interface{}) Query {
43+
if q == "" {
44+
internalLog.Error(errors.New("SQLError"), "sql cannot be empty")
45+
}
46+
47+
if !strings.HasSuffix(q, ";") {
48+
q += ";"
49+
}
50+
51+
return Query{
52+
escapedQuery: q,
53+
args: args,
54+
}
55+
}

0 commit comments

Comments
 (0)