Skip to content

Commit 41854dc

Browse files
Merge pull request #37 from RedisBloom/support_TDigest
Add support for T-Digest sketch to the client
2 parents 65c8c3a + 180f2cb commit 41854dc

File tree

2 files changed

+274
-0
lines changed

2 files changed

+274
-0
lines changed

client.go

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,52 @@ type Client struct {
1818
Name string
1919
}
2020

21+
// TDigestInfo is a struct that represents T-Digest properties
22+
type TDigestInfo struct {
23+
compression int64
24+
capacity int64
25+
mergedNodes int64
26+
unmergedNodes int64
27+
mergedWeight float64
28+
unmergedWeight float64
29+
totalCompressions int64
30+
}
31+
32+
// Compression - returns the compression of TDigestInfo instance
33+
func (info *TDigestInfo) Compression() int64 {
34+
return info.compression
35+
}
36+
37+
// Capacity - returns the capacity of TDigestInfo instance
38+
func (info *TDigestInfo) Capacity() int64 {
39+
return info.capacity
40+
}
41+
42+
// MergedNodes - returns the merged nodes of TDigestInfo instance
43+
func (info *TDigestInfo) MergedNodes() int64 {
44+
return info.mergedNodes
45+
}
46+
47+
// UnmergedNodes - returns the unmerged nodes of TDigestInfo instance
48+
func (info *TDigestInfo) UnmergedNodes() int64 {
49+
return info.unmergedNodes
50+
}
51+
52+
// MergedWeight - returns the merged weight of TDigestInfo instance
53+
func (info *TDigestInfo) MergedWeight() float64 {
54+
return info.mergedWeight
55+
}
56+
57+
// UnmergedWeight - returns the unmerged weight of TDigestInfo instance
58+
func (info *TDigestInfo) UnmergedWeight() float64 {
59+
return info.unmergedWeight
60+
}
61+
62+
// TotalCompressions - returns the total compressions of TDigestInfo instance
63+
func (info *TDigestInfo) TotalCompressions() int64 {
64+
return info.totalCompressions
65+
}
66+
2167
// NewClient creates a new client connecting to the redis host, and using the given name as key prefix.
2268
// Addr can be a single host:port pair, or a comma separated list of host:port,host:port...
2369
// In the case of multiple hosts we create a multi-pool and select connections at random
@@ -443,6 +489,76 @@ func (client *Client) CfInfo(key string) (map[string]int64, error) {
443489
return ParseInfoReply(redis.Values(conn.Do("CF.INFO", key)))
444490
}
445491

492+
// TdCreate - Allocate the memory and initialize the t-digest
493+
func (client *Client) TdCreate(key string, compression int64) (string, error) {
494+
conn := client.Pool.Get()
495+
defer conn.Close()
496+
return redis.String(conn.Do("TDIGEST.CREATE", key, compression))
497+
}
498+
499+
// TdReset - Reset the sketch to zero - empty out the sketch and re-initialize it
500+
func (client *Client) TdReset(key string) (string, error) {
501+
conn := client.Pool.Get()
502+
defer conn.Close()
503+
return redis.String(conn.Do("TDIGEST.RESET", key))
504+
}
505+
506+
// TdAdd - Adds one or more samples to a sketch
507+
func (client *Client) TdAdd(key string, samples map[float64]float64) (string, error) {
508+
conn := client.Pool.Get()
509+
defer conn.Close()
510+
args := redis.Args{key}
511+
for k, v := range samples {
512+
args = args.Add(k, v)
513+
}
514+
reply, err := conn.Do("TDIGEST.ADD", args...)
515+
return redis.String(reply, err)
516+
}
517+
518+
// TdMerge - Merges all of the values from 'from' to 'this' sketch
519+
func (client *Client) TdMerge(toKey string, fromKey string) (string, error) {
520+
conn := client.Pool.Get()
521+
defer conn.Close()
522+
return redis.String(conn.Do("TDIGEST.MERGE", toKey, fromKey))
523+
}
524+
525+
// TdMin - Get minimum value from the sketch. Will return DBL_MAX if the sketch is empty
526+
func (client *Client) TdMin(key string) (float64, error) {
527+
conn := client.Pool.Get()
528+
defer conn.Close()
529+
return redis.Float64(conn.Do("TDIGEST.MIN", key))
530+
}
531+
532+
// TdMax - Get maximum value from the sketch. Will return DBL_MIN if the sketch is empty
533+
func (client *Client) TdMax(key string) (float64, error) {
534+
conn := client.Pool.Get()
535+
defer conn.Close()
536+
return redis.Float64(conn.Do("TDIGEST.MAX", key))
537+
}
538+
539+
// TdQuantile - Returns an estimate of the cutoff such that a specified fraction of the data added
540+
// to this TDigest would be less than or equal to the cutoff
541+
func (client *Client) TdQuantile(key string, quantile float64) (float64, error) {
542+
conn := client.Pool.Get()
543+
defer conn.Close()
544+
return redis.Float64(conn.Do("TDIGEST.QUANTILE", key, quantile))
545+
}
546+
547+
// TdCdf - Returns the fraction of all points added which are <= value
548+
func (client *Client) TdCdf(key string, value float64) (float64, error) {
549+
conn := client.Pool.Get()
550+
defer conn.Close()
551+
return redis.Float64(conn.Do("TDIGEST.CDF", key, value))
552+
}
553+
554+
// TdInfo - Returns compression, capacity, total merged and unmerged nodes, the total
555+
// compressions made up to date on that key, and merged and unmerged weight.
556+
func (client *Client) TdInfo(key string) (TDigestInfo, error) {
557+
conn := client.Pool.Get()
558+
defer conn.Close()
559+
return ParseTDigestInfo(redis.Values(conn.Do("TDIGEST.INFO", key)))
560+
}
561+
446562
func ParseInfoReply(values []interface{}, err error) (map[string]int64, error) {
447563
if err != nil {
448564
return nil, err
@@ -456,3 +572,38 @@ func ParseInfoReply(values []interface{}, err error) (map[string]int64, error) {
456572
}
457573
return m, err
458574
}
575+
576+
func ParseTDigestInfo(result interface{}, err error) (info TDigestInfo, outErr error) {
577+
values, outErr := redis.Values(result, err)
578+
if outErr != nil {
579+
return TDigestInfo{}, err
580+
}
581+
if len(values)%2 != 0 {
582+
return TDigestInfo{}, errors.New("ParseInfo expects even number of values result")
583+
}
584+
var key string
585+
for i := 0; i < len(values); i += 2 {
586+
key, outErr = redis.String(values[i], nil)
587+
switch key {
588+
case "Compression":
589+
info.compression, outErr = redis.Int64(values[i+1], nil)
590+
case "Capacity":
591+
info.capacity, outErr = redis.Int64(values[i+1], nil)
592+
case "Merged nodes":
593+
info.mergedNodes, outErr = redis.Int64(values[i+1], nil)
594+
case "Unmerged nodes":
595+
info.unmergedNodes, outErr = redis.Int64(values[i+1], nil)
596+
case "Merged weight":
597+
info.mergedWeight, outErr = redis.Float64(values[i+1], nil)
598+
case "Unmerged weight":
599+
info.unmergedWeight, outErr = redis.Float64(values[i+1], nil)
600+
case "Total compressions":
601+
info.totalCompressions, outErr = redis.Int64(values[i+1], nil)
602+
}
603+
if outErr != nil {
604+
return TDigestInfo{}, outErr
605+
}
606+
}
607+
608+
return info, nil
609+
}

client_test.go

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -527,5 +527,128 @@ func TestClient_BfScanDump(t *testing.T) {
527527
assert.Nil(t, err)
528528
_, _, err = client.BfScanDump(notBfKey, 0)
529529
assert.Equal(t, err.Error(), "WRONGTYPE Operation against a key holding the wrong kind of value")
530+
}
531+
532+
func TestClient_TdReset(t *testing.T) {
533+
client.FlushAll()
534+
key := "test_td"
535+
ret, err := client.TdCreate(key, 100)
536+
assert.Nil(t, err)
537+
assert.Equal(t, "OK", ret)
538+
539+
ret, err = client.TdReset(key)
540+
assert.Nil(t, err)
541+
assert.Equal(t, "OK", ret)
542+
543+
samples := map[float64]float64{1.0: 1.0, 2.0: 2.0}
544+
ret, err = client.TdAdd(key, samples)
545+
assert.Nil(t, err)
546+
assert.Equal(t, "OK", ret)
547+
548+
ret, err = client.TdReset(key)
549+
assert.Nil(t, err)
550+
assert.Equal(t, "OK", ret)
551+
552+
info, err := client.TdInfo(key)
553+
assert.Nil(t, err)
554+
assert.Equal(t, 0.0, info.UnmergedWeight())
555+
assert.Equal(t, int64(0), info.TotalCompressions())
556+
assert.Equal(t, int64(100), info.Compression())
557+
assert.Equal(t, int64(610), info.Capacity())
558+
}
530559

560+
func TestClient_TdMerge(t *testing.T) {
561+
key1 := "toKey"
562+
key2 := "fromKey"
563+
ret, err := client.TdCreate(key1, 10)
564+
assert.Nil(t, err)
565+
assert.Equal(t, "OK", ret)
566+
ret, err = client.TdCreate(key2, 10)
567+
assert.Nil(t, err)
568+
assert.Equal(t, "OK", ret)
569+
570+
//Add values
571+
samples1 := map[float64]float64{1.0: 1.0, 2.0: 2.0}
572+
samples2 := map[float64]float64{3.0: 3.0, 4.0: 4.0}
573+
ret, err = client.TdAdd(key1, samples1)
574+
assert.Nil(t, err)
575+
assert.Equal(t, "OK", ret)
576+
ret, err = client.TdAdd(key2, samples2)
577+
assert.Nil(t, err)
578+
assert.Equal(t, "OK", ret)
579+
580+
//Merge
581+
ret, err = client.TdMerge(key1, key2)
582+
assert.Nil(t, err)
583+
assert.Equal(t, "OK", ret)
584+
585+
// we should now have 10 weight on to-histogram
586+
info, err := client.TdInfo(key1)
587+
assert.Nil(t, err)
588+
assert.Equal(t, 10.0, info.UnmergedWeight()+info.MergedWeight())
589+
assert.Equal(t, int64(2), info.UnmergedNodes())
590+
assert.Equal(t, int64(2), info.MergedNodes())
591+
}
592+
593+
func TestClient_TdMinMax(t *testing.T) {
594+
client.FlushAll()
595+
key := "test_td"
596+
ret, err := client.TdCreate(key, 10)
597+
assert.Nil(t, err)
598+
assert.Equal(t, "OK", ret)
599+
600+
samples := map[float64]float64{1.0: 1.0, 2.0: 2.0, 3.0: 3.0}
601+
ret, err = client.TdAdd(key, samples)
602+
assert.Nil(t, err)
603+
assert.Equal(t, "OK", ret)
604+
605+
ans, err := client.TdMin(key)
606+
assert.Nil(t, err)
607+
assert.Equal(t, 1.0, ans)
608+
609+
ans, err = client.TdMax(key)
610+
assert.Nil(t, err)
611+
assert.Equal(t, 3.0, ans)
612+
}
613+
614+
func TestClient_TdQuantile(t *testing.T) {
615+
client.FlushAll()
616+
key := "test_td"
617+
ret, err := client.TdCreate(key, 10)
618+
assert.Nil(t, err)
619+
assert.Equal(t, "OK", ret)
620+
621+
samples := map[float64]float64{1.0: 1.0, 2.0: 1.0, 3.0: 1.0}
622+
ret, err = client.TdAdd(key, samples)
623+
assert.Nil(t, err)
624+
assert.Equal(t, "OK", ret)
625+
626+
ans, err := client.TdQuantile(key, 1.0)
627+
assert.Nil(t, err)
628+
assert.Equal(t, 3.0, ans)
629+
630+
ans, err = client.TdQuantile(key, 0.0)
631+
assert.Nil(t, err)
632+
assert.Equal(t, 1.0, ans)
633+
}
634+
635+
func TestClient_TdCdf(t *testing.T) {
636+
client.FlushAll()
637+
key := "test_td"
638+
ret, err := client.TdCreate(key, 10)
639+
assert.Nil(t, err)
640+
assert.Equal(t, "OK", ret)
641+
642+
samples := map[float64]float64{1.0: 1.0, 2.0: 1.0, 3.0: 1.0}
643+
ret, err = client.TdAdd(key, samples)
644+
assert.Nil(t, err)
645+
assert.Equal(t, "OK", ret)
646+
647+
ans, err := client.TdCdf(key, 10.0)
648+
assert.Nil(t, err)
649+
assert.Equal(t, 1.0, ans)
650+
651+
ans, err = client.TdCdf(key, 0.0)
652+
assert.Nil(t, err)
653+
assert.Equal(t, 0.0, ans)
531654
}

0 commit comments

Comments
 (0)