Skip to content

Commit 487a581

Browse files
committed
add support for dumping large database test
Signed-off-by: Avi Deitcher <avi@deitcher.net>
1 parent 777a1b0 commit 487a581

File tree

10 files changed

+368
-85
lines changed

10 files changed

+368
-85
lines changed

pkg/core/dump.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ func (e *Executor) Dump(ctx context.Context, opts DumpOptions) (DumpResults, err
111111
Routines: routines,
112112
SuppressUseDatabase: suppressUseDatabase,
113113
MaxAllowedPacket: maxAllowedPacket,
114+
PostDumpDelay: opts.PostDumpDelay,
114115
}, dw); err != nil {
115116
dbDumpSpan.SetStatus(codes.Error, err.Error())
116117
dbDumpSpan.End()

pkg/core/dumpoptions.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package core
22

33
import (
4+
"time"
5+
46
"github.com/databacker/mysql-backup/pkg/compression"
57
"github.com/databacker/mysql-backup/pkg/database"
68
"github.com/databacker/mysql-backup/pkg/encrypt"
@@ -25,4 +27,6 @@ type DumpOptions struct {
2527
MaxAllowedPacket int
2628
Run uuid.UUID
2729
FilenamePattern string
30+
// PostDumpDelay inafter each dump is complete, while holding connection open. Do not use outside of tests.
31+
PostDumpDelay time.Duration
2832
}

pkg/database/connection.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,11 @@ import (
88
)
99

1010
type Connection struct {
11-
User string
12-
Pass string
13-
Host string
14-
Port int
11+
User string
12+
Pass string
13+
Host string
14+
Port int
15+
MultiStatements bool
1516
}
1617

1718
func (c Connection) MySQL() string {
@@ -27,5 +28,6 @@ func (c Connection) MySQL() string {
2728
}
2829
config.ParseTime = true
2930
config.TLSConfig = "preferred"
31+
config.MultiStatements = c.MultiStatements
3032
return config.FormatDSN()
3133
}

pkg/database/dump.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"database/sql"
66
"fmt"
7+
"time"
78

89
"github.com/databacker/mysql-backup/pkg/database/mysql"
910
)
@@ -14,6 +15,8 @@ type DumpOpts struct {
1415
Routines bool
1516
SuppressUseDatabase bool
1617
MaxAllowedPacket int
18+
// PostDumpDelay after each dump is complete, while holding connection open. Do not use outside of tests.
19+
PostDumpDelay time.Duration
1720
}
1821

1922
func Dump(ctx context.Context, dbconn Connection, opts DumpOpts, writers []DumpWriter) error {
@@ -42,6 +45,7 @@ func Dump(ctx context.Context, dbconn Connection, opts DumpOpts, writers []DumpW
4245
Routines: opts.Routines,
4346
SuppressUseDatabase: opts.SuppressUseDatabase,
4447
MaxAllowedPacket: opts.MaxAllowedPacket,
48+
PostDumpDelay: opts.PostDumpDelay,
4549
}
4650
if err := dumper.Dump(); err != nil {
4751
return fmt.Errorf("failed to dump database %s: %v", schema, err)

pkg/database/mysql/dump.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ type Data struct {
5252
SuppressUseDatabase bool
5353
Charset string
5454
Collation string
55+
PostDumpDelay time.Duration
5556

5657
tx *sql.Tx
5758
headerTmpl *template.Template
@@ -254,6 +255,9 @@ func (data *Data) Dump() error {
254255
return data.err
255256
}
256257

258+
if data.PostDumpDelay > 0 {
259+
time.Sleep(data.PostDumpDelay)
260+
}
257261
meta.CompleteTime = time.Now().UTC().Format("2006-01-02 15:04:05")
258262
return data.footerTmpl.Execute(data.Out, meta)
259263
}

test/README.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,3 +71,11 @@ base/ - base of the backup area
7171
backups/ - the directory where backups are stored
7272
15674832/ - one target's backup
7373
88725436/ - another target's backup
74+
75+
## Databases in Parallel
76+
77+
The behaviour of backing up multiple databases in parallel can have an impact on connections.
78+
79+
To test management, there is a database test `TestIntegration/parallel_databases`. It sets up a database server
80+
with multiples databases, each with a table of a MB or a few. It then injects a delay on each backup of 10
81+
seconds, and then exits. Every second during the backup, it reports the number of connections.

test/backup_test.go

Lines changed: 109 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ import (
3131

3232
"github.com/docker/docker/api/types"
3333
"github.com/docker/docker/api/types/container"
34-
imagetypes "github.com/docker/docker/api/types/image"
3534
"github.com/docker/docker/client"
3635
"github.com/docker/docker/pkg/stdcopy"
3736
"github.com/docker/go-connections/nat"
@@ -546,44 +545,12 @@ func setup(dc *dockerContext, base, backupFile, compactBackupFile string) (mysql
546545
s3server := httptest.NewServer(s3.Server())
547546
s3url = s3server.URL
548547

549-
// start the mysql container; configure it for lots of debug logging, in case we need it
550-
mysqlConf := `
551-
[mysqld]
552-
log_error =/var/log/mysql/mysql_error.log
553-
general_log_file=/var/log/mysql/mysql.log
554-
general_log =1
555-
slow_query_log =1
556-
slow_query_log_file=/var/log/mysql/mysql_slow.log
557-
long_query_time =2
558-
log_queries_not_using_indexes = 1
559-
`
560-
confFile := filepath.Join(base, "log.cnf")
561-
if err := os.WriteFile(confFile, []byte(mysqlConf), 0644); err != nil {
562-
return mysql, smb, s3url, s3backend, fmt.Errorf("failed to write mysql config file: %v", err)
563-
}
564-
logDir := filepath.Join(base, "mysql_logs")
565-
if err := os.Mkdir(logDir, 0755); err != nil {
566-
return mysql, smb, s3url, s3backend, fmt.Errorf("failed to create mysql log directory: %v", err)
567-
}
568-
// ensure we have mysql image
569-
resp, err := dc.cli.ImagePull(context.Background(), mysqlImage, imagetypes.PullOptions{})
570-
if err != nil {
571-
return mysql, smb, s3url, s3backend, fmt.Errorf("failed to pull mysql image: %v", err)
572-
}
573-
_, _ = io.Copy(os.Stdout, resp)
574-
_ = resp.Close()
575-
mysqlCID, mysqlPort, err := dc.startContainer(mysqlImage, "mysql", "3306/tcp", []string{fmt.Sprintf("%s:/etc/mysql/conf.d/log.conf:ro", confFile), fmt.Sprintf("%s:/var/log/mysql", logDir)}, nil, []string{
576-
fmt.Sprintf("MYSQL_ROOT_PASSWORD=%s", mysqlRootPass),
577-
"MYSQL_DATABASE=tester",
578-
fmt.Sprintf("MYSQL_USER=%s", mysqlUser),
579-
fmt.Sprintf("MYSQL_PASSWORD=%s", mysqlPass),
580-
})
548+
mysql, err = startDatabase(dc, base, mysqlImage, "mysql")
581549
if err != nil {
582-
return
550+
return mysql, smb, s3url, s3backend, fmt.Errorf("failed to start mysql container: %v", err)
583551
}
584-
mysql = containerPort{name: "mysql", id: mysqlCID, port: mysqlPort}
585552

586-
if err = dc.waitForDBConnectionAndGrantPrivileges(mysqlCID, mysqlRootUser, mysqlRootPass); err != nil {
553+
if err = dc.waitForDBConnectionAndGrantPrivileges(mysql.id, mysqlRootUser, mysqlRootPass); err != nil {
587554
return
588555
}
589556

@@ -890,58 +857,119 @@ func populatePrePost(base string, targets []backupTarget) (err error) {
890857
return nil
891858
}
892859

893-
func startDatabase(dc *dockerContext, baseDir, image, name string) (containerPort, error) {
894-
resp, err := dc.cli.ImagePull(context.Background(), image, imagetypes.PullOptions{})
895-
if err != nil {
896-
return containerPort{}, fmt.Errorf("failed to pull mysql image: %v", err)
897-
}
898-
_, _ = io.Copy(os.Stdout, resp)
899-
_ = resp.Close()
900-
901-
// start the mysql container; configure it for lots of debug logging, in case we need it
902-
mysqlConf := `
903-
[mysqld]
904-
log_error =/var/log/mysql/mysql_error.log
905-
general_log_file=/var/log/mysql/mysql.log
906-
general_log =1
907-
slow_query_log =1
908-
slow_query_log_file=/var/log/mysql/mysql_slow.log
909-
long_query_time =2
910-
log_queries_not_using_indexes = 1
911-
`
912-
if err := os.Mkdir(baseDir, 0o755); err != nil {
913-
return containerPort{}, fmt.Errorf("failed to create mysql base directory: %v", err)
914-
}
915-
confFile := filepath.Join(baseDir, "log.cnf")
916-
if err := os.WriteFile(confFile, []byte(mysqlConf), 0644); err != nil {
917-
return containerPort{}, fmt.Errorf("failed to write mysql config file: %v", err)
918-
}
919-
logDir := filepath.Join(baseDir, "mysql_logs")
920-
if err := os.Mkdir(logDir, 0755); err != nil {
921-
return containerPort{}, fmt.Errorf("failed to create mysql log directory: %v", err)
922-
}
923-
924-
// start mysql
925-
cid, port, err := dc.startContainer(
926-
image, name, "3306/tcp", []string{fmt.Sprintf("%s:/etc/mysql/conf.d/log.conf:ro", confFile), fmt.Sprintf("%s:/var/log/mysql", logDir)}, nil, []string{
927-
fmt.Sprintf("MYSQL_ROOT_PASSWORD=%s", mysqlRootPass),
928-
"MYSQL_DATABASE=tester",
929-
fmt.Sprintf("MYSQL_USER=%s", mysqlUser),
930-
fmt.Sprintf("MYSQL_PASSWORD=%s", mysqlPass),
931-
})
932-
if err != nil {
933-
return containerPort{}, fmt.Errorf("failed to start mysql container: %v", err)
934-
}
935-
return containerPort{name: name, id: cid, port: port}, nil
936-
}
937-
938860
func TestIntegration(t *testing.T) {
939861
CheckSkipIntegration(t, "integration")
940862
syscall.Umask(0)
941863
dc, err := getDockerContext()
942864
if err != nil {
943865
t.Fatalf("failed to get docker client: %v", err)
944866
}
867+
t.Run("parallel databases", func(t *testing.T) {
868+
base := t.TempDir()
869+
mysql, err := startDatabase(dc, base, mysqlImage, "mysql-parallel")
870+
defer func() {
871+
// log the results before tearing down, if requested
872+
if err := logContainers(dc, mysql.id); err != nil {
873+
log.Errorf("failed to get logs from service containers: %v", err)
874+
}
875+
876+
// tear everything down
877+
if err := teardown(dc, mysql.id); err != nil {
878+
log.Errorf("failed to teardown test: %v", err)
879+
}
880+
}()
881+
882+
if err != nil {
883+
t.Fatalf("failed to start large mysql database: %v", err)
884+
}
885+
if err = dc.waitForDBConnectionAndGrantPrivileges(mysql.id, mysqlRootUser, mysqlRootPass); err != nil {
886+
t.Fatalf("failed to wait for DB connection: %v", err)
887+
}
888+
dbconn := database.Connection{
889+
User: mysqlRootUser,
890+
Pass: mysqlRootPass,
891+
Host: "localhost",
892+
Port: mysql.port,
893+
MultiStatements: true,
894+
}
895+
896+
db, err := sql.Open("mysql", dbconn.MySQL())
897+
if err != nil {
898+
t.Fatalf("failed to open connection to database: %v", err)
899+
}
900+
901+
var dbCount uint = 10
902+
var tableSize uint64 = 1
903+
t.Logf("Setting up database server with %d databases, with one table of %d MB each", dbCount, tableSize)
904+
if err := setupLargeDatabase(db, dbCount, tableSize*1024*1024); err != nil {
905+
t.Fatalf("failed to setup large database: %v", err)
906+
}
907+
// now just run a backup to a temporary directory
908+
t.Logf("Running backup for large database")
909+
backupDir := t.TempDir()
910+
executor := &core.Executor{}
911+
executor.SetLogger(log.New())
912+
913+
store, err := storage.ParseURL(backupDir, credentials.Creds{})
914+
if err != nil {
915+
t.Fatalf("invalid target url: %v", err)
916+
}
917+
918+
dumpOptions := core.DumpOptions{
919+
Compressor: &compression.GzipCompressor{},
920+
DBConn: database.Connection{
921+
User: mysqlRootUser,
922+
Pass: mysqlRootPass,
923+
Host: "localhost",
924+
Port: mysql.port,
925+
},
926+
Targets: []storage.Storage{store},
927+
PostDumpDelay: 5 * time.Second, // for testing only, make them delay 10 seconds
928+
}
929+
ctx := context.Background()
930+
start := time.Now()
931+
errChan := make(chan error, 1)
932+
t.Logf("Starting dump test for large database at %s", start)
933+
go func() {
934+
_, err := executor.Dump(ctx, dumpOptions)
935+
errChan <- err
936+
}()
937+
ticker := time.NewTicker(1 * time.Second)
938+
defer ticker.Stop()
939+
940+
loop:
941+
for {
942+
select {
943+
case err := <-errChan:
944+
if err != nil {
945+
t.Fatalf("failed to run dump test: %v", err)
946+
}
947+
ticker.Stop()
948+
break loop
949+
case <-ticker.C:
950+
// every interval, report how many connections are running to the database
951+
tr, err := getStatus(db, "Threads_running")
952+
if err != nil {
953+
t.Fatalf("failed to get Threads_running status: %v", err)
954+
}
955+
956+
tc, err := getStatus(db, "Threads_connected")
957+
if err != nil {
958+
t.Fatalf("failed to get Threads_connected status: %v", err)
959+
}
960+
uTotal, uActive, err := getProcesslistCounts(db)
961+
if err != nil {
962+
t.Fatalf("failed to get processlist counts: %v", err)
963+
}
964+
965+
t.Logf("[%s]\tthreads_running=%d\tthreads_connected=%d\topen_user=%d\tactive_user=%d\n",
966+
time.Now().Format("15:04:05"),
967+
tr, tc, uTotal, uActive)
968+
969+
}
970+
}
971+
t.Logf("Dump completed at %s in %s", time.Now(), time.Since(start))
972+
})
945973
t.Run("dump", func(t *testing.T) {
946974
var (
947975
err error

0 commit comments

Comments
 (0)