Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/dappnode/validator-tracker/internal/adapters/brain"
"github.com/dappnode/validator-tracker/internal/adapters/dappmanager"
"github.com/dappnode/validator-tracker/internal/adapters/notifier"
"github.com/dappnode/validator-tracker/internal/adapters/sqlite"
"github.com/dappnode/validator-tracker/internal/application/domain"
"github.com/dappnode/validator-tracker/internal/application/services"
"github.com/dappnode/validator-tracker/internal/config"
Expand Down Expand Up @@ -51,6 +52,12 @@ func main() {
logger.Fatal("Failed to initialize beacon adapter. A live connection is required on startup: %v", err)
}

// Initialize SQLite storage adapter
storage, err := sqlite.NewSQLiteStorage("./validator_tracker.db")
if err != nil {
logger.Fatal("Failed to initialize SQLite storage: %v", err)
}

// Prepare context and WaitGroup for graceful shutdown
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand All @@ -66,6 +73,7 @@ func main() {
SlashedNotified: make(map[domain.ValidatorIndex]bool),
PreviouslyAllLive: true, // assume all validators were live at start
PreviouslyOffline: false,
ValidatorStorage: storage, // <-- new field for storage
}
wg.Add(1)
go func() {
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.24.3

require (
github.com/attestantio/go-eth2-client v0.26.0
github.com/mattn/go-sqlite3 v1.14.32
github.com/rs/zerolog v1.34.0
)

Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-sqlite3 v1.14.32 h1:JD12Ag3oLy1zQA+BNn74xRgaBbdhbNIDYvQUEuuErjs=
github.com/mattn/go-sqlite3 v1.14.32/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo=
github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4=
github.com/minio/sha256-simd v1.0.1 h1:6kaan5IFmwTNynnKKpDHe6FWHohJOHhCPchzK49dzMM=
Expand Down
22 changes: 22 additions & 0 deletions internal/adapters/beacon/beacon.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,28 @@ func (b *beaconAttestantClient) GetSlashedValidators(ctx context.Context, indice
return slashedIndices, nil
}

// GetSyncCommittee retrieves if the indices are in the current sync committee
func (b *beaconAttestantClient) GetSyncCommittee(ctx context.Context, epoch domain.Epoch, indices []domain.ValidatorIndex) (map[domain.ValidatorIndex]bool, error) {
if len(indices) == 0 {
logger.Debug("Called GetSyncCommittee with no validator indices, returning empty map. Nothing to check.")
return map[domain.ValidatorIndex]bool{}, nil
}

epochVal := phase0.Epoch(epoch)
syncCommittee, err := b.client.SyncCommittee(ctx, &api.SyncCommitteeOpts{
Epoch: &epochVal,
})
if err != nil {
return nil, err
}

syncCommitteeMap := make(map[domain.ValidatorIndex]bool)
for _, idx := range syncCommittee.Data.Validators {
syncCommitteeMap[domain.ValidatorIndex(idx)] = true
}
return syncCommitteeMap, nil
Comment on lines +327 to +331
Copy link

Copilot AI Sep 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function creates a map of all sync committee members but doesn't filter for the requested validator indices. This returns membership status for all validators in the sync committee instead of just the ones being tracked. The function should only return results for validators in the indices parameter.

Suggested change
syncCommitteeMap := make(map[domain.ValidatorIndex]bool)
for _, idx := range syncCommittee.Data.Validators {
syncCommitteeMap[domain.ValidatorIndex(idx)] = true
}
return syncCommitteeMap, nil
// Build a set of sync committee members for fast lookup
syncCommitteeSet := make(map[phase0.ValidatorIndex]struct{}, len(syncCommittee.Data.Validators))
for _, idx := range syncCommittee.Data.Validators {
syncCommitteeSet[idx] = struct{}{}
}
// Build result map for only the requested indices
result := make(map[domain.ValidatorIndex]bool, len(indices))
for _, idx := range indices {
_, exists := syncCommitteeSet[phase0.ValidatorIndex(idx)]
result[idx] = exists
}
return result, nil

Copilot uses AI. Check for mistakes.
}

// enum for consensus client
type ConsensusClient string

Expand Down
30 changes: 30 additions & 0 deletions internal/adapters/notifier/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,36 @@ func (n *Notifier) SendBlockProposalNot(validators []domain.ValidatorIndex, epoc
return n.sendNotification(payload)
}

// SendCommitteeNotification sends a notification when one or more validators have been in the sync committee.
func (n *Notifier) SendCommitteeNotification(validators []domain.ValidatorIndex, epoch domain.Epoch) error {
title := fmt.Sprintf("Validator(s) in Sync Committee: %s", indexesToString(validators, true))
body := fmt.Sprintf("🟢 Validator(s) %s have been in the sync committee at epoch %d on %s.", indexesToString(validators, true), epoch, n.Network)
priority := Medium
status := Triggered
isBanner := true
correlationId := string(domain.Notifications.Committee)
beaconchaUrl := n.buildBeaconchaURL(validators)
var callToAction *CallToAction
if beaconchaUrl != "" {
callToAction = &CallToAction{
Title: "Open in Explorer",
URL: beaconchaUrl,
}
}
payload := NotificationPayload{
Title: title,
Body: body,
Category: &n.Category,
Priority: &priority,
IsBanner: &isBanner,
DnpName: &n.SignerDnpName,
Status: &status,
CorrelationId: &correlationId,
CallToAction: callToAction,
}
return n.sendNotification(payload)
}

// Helper to join validator indexes as comma-separated string
// If truncate is true, only the first 10 are shown, then '...'.
func indexesToString(indexes []domain.ValidatorIndex, truncate bool) string {
Expand Down
109 changes: 109 additions & 0 deletions internal/adapters/sqlite/sqlite_storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package sqlite

import (
"context"
"database/sql" // basic sql
"fmt"

_ "github.com/mattn/go-sqlite3" // additional driver for sqlite
)

// Implements ports.ValidatorStoragePort

type SQLiteStorage struct {
DB *sql.DB
}

func NewSQLiteStorage(dbPath string) (*SQLiteStorage, error) {
db, err := sql.Open("sqlite3", dbPath)
if err != nil {
return nil, fmt.Errorf("failed to open sqlite db: %w", err)
}
if err := migrate(db); err != nil {
return nil, fmt.Errorf("failed to migrate sqlite db: %w", err)
}
return &SQLiteStorage{DB: db}, nil
}

func migrate(db *sql.DB) error {
queries := []string{
`CREATE TABLE IF NOT EXISTS validator_epoch_status (
validator_index INTEGER NOT NULL,
epoch INTEGER NOT NULL,
liveness BOOLEAN,
in_sync_committee BOOLEAN,
sync_committee_reward INTEGER,
attestation_reward INTEGER,
slashed BOOLEAN,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (validator_index, epoch)
);`,
`CREATE TABLE IF NOT EXISTS validator_block_proposals (
validator_index INTEGER NOT NULL,
slot INTEGER NOT NULL,
epoch INTEGER NOT NULL,
block_reward INTEGER,
PRIMARY KEY (validator_index, slot)
);`,
`CREATE TABLE IF NOT EXISTS validators (
validator_index INTEGER PRIMARY KEY,
label TEXT,
added_at DATETIME DEFAULT CURRENT_TIMESTAMP
);`,
`CREATE INDEX IF NOT EXISTS idx_epoch ON validator_epoch_status(epoch);`,
`CREATE INDEX IF NOT EXISTS idx_validator_epoch ON validator_epoch_status(validator_index, epoch);`,
`CREATE INDEX IF NOT EXISTS idx_proposals_epoch ON validator_block_proposals(epoch);`,
`CREATE INDEX IF NOT EXISTS idx_proposals_slot ON validator_block_proposals(slot);`,
}
for _, q := range queries {
if _, err := db.Exec(q); err != nil {
return err
}
}
Comment on lines +58 to +62
Copy link

Copilot AI Sep 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Migration statements are executed individually without a transaction, so a mid-sequence failure can leave the schema in a partially applied state. Wrap the loop in a BEGIN/COMMIT transaction (rolling back on error) to ensure atomic schema setup.

Suggested change
for _, q := range queries {
if _, err := db.Exec(q); err != nil {
return err
}
}
tx, err := db.Begin()
if err != nil {
return err
}
for _, q := range queries {
if _, err := tx.Exec(q); err != nil {
tx.Rollback()
return err
}
}
if err := tx.Commit(); err != nil {
return err
}

Copilot uses AI. Check for mistakes.
return nil
}

// Upsert = Insert or Update. If a record with the same primary key exists, it updates the existing record.
// If the record does not exist, it inserts a new record.

// UpsertValidatorEpochStatus inserts or updates validator epoch status. It will update fields if the record exists.
// If any of parameters are nil, the corresponding fields will be set to NULL in the database.
func (s *SQLiteStorage) UpsertValidatorEpochStatus(ctx context.Context, validatorIndex uint64, epoch uint64, liveness *bool, inSyncCommittee *bool, syncCommitteeReward *uint64, attestationReward *uint64, slashed *bool) error {
_, err := s.DB.ExecContext(ctx,
`INSERT INTO validator_epoch_status (validator_index, epoch, liveness, in_sync_committee, sync_committee_reward, attestation_reward, slashed)
VALUES (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(validator_index, epoch) DO UPDATE SET
liveness=excluded.liveness,
in_sync_committee=excluded.in_sync_committee,
sync_committee_reward=excluded.sync_committee_reward,
attestation_reward=excluded.attestation_reward,
slashed=excluded.slashed,
updated_at=CURRENT_TIMESTAMP;`,
validatorIndex, epoch, liveness, inSyncCommittee, syncCommitteeReward, attestationReward, slashed,
)
return err
}

// UpsertValidatorBlockProposal inserts or updates a block proposal for a validator. It will update the block_reward if the record exists.
// If blockReward is nil, the block_reward field will be set to NULL in the database.
func (s *SQLiteStorage) UpsertValidatorBlockProposal(ctx context.Context, validatorIndex uint64, slot uint64, epoch uint64, blockReward *uint64) error {
_, err := s.DB.ExecContext(ctx,
`INSERT INTO validator_block_proposals (validator_index, slot, epoch, block_reward)
VALUES (?, ?, ?, ?)
ON CONFLICT(validator_index, slot) DO UPDATE SET
block_reward=excluded.block_reward;`,
validatorIndex, slot, epoch, blockReward,
)
return err
}

func (s *SQLiteStorage) UpsertValidatorMetadata(ctx context.Context, validatorIndex uint64, label *string) error {
_, err := s.DB.ExecContext(ctx,
`INSERT INTO validators (validator_index, label)
VALUES (?, ?)
ON CONFLICT(validator_index) DO UPDATE SET
label=excluded.label;`,
validatorIndex, label,
)
return err
}
14 changes: 8 additions & 6 deletions internal/application/domain/notification.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,19 @@ type ValidatorNotificationsEnabled map[ValidatorNotification]bool
type ValidatorNotification string

type validatorNotifications struct {
Liveness ValidatorNotification
Slashed ValidatorNotification
Proposal ValidatorNotification
Liveness ValidatorNotification
Slashed ValidatorNotification
Proposal ValidatorNotification
Committee ValidatorNotification
}

var Notifications validatorNotifications

func InitNotifications(network string) {
Notifications = validatorNotifications{
Liveness: ValidatorNotification(network + "-validator-liveness"),
Slashed: ValidatorNotification(network + "-validator-slashed"),
Proposal: ValidatorNotification(network + "-block-proposal"),
Liveness: ValidatorNotification(network + "-validator-liveness"),
Slashed: ValidatorNotification(network + "-validator-slashed"),
Proposal: ValidatorNotification(network + "-block-proposal"),
Committee: ValidatorNotification(network + "-validator-committee"),
}
}
1 change: 1 addition & 0 deletions internal/application/ports/beaconchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ type BeaconChainAdapter interface {
DidProposeBlock(ctx context.Context, slot domain.Slot) (bool, error)

GetValidatorsLiveness(ctx context.Context, epoch domain.Epoch, indices []domain.ValidatorIndex) (map[domain.ValidatorIndex]bool, error)
GetSyncCommittee(ctx context.Context, epoch domain.Epoch, indices []domain.ValidatorIndex) (map[domain.ValidatorIndex]bool, error)
}
1 change: 1 addition & 0 deletions internal/application/ports/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ type NotifierPort interface {
SendValidatorLivenessNot(validators []domain.ValidatorIndex, epoch domain.Epoch, live bool) error
SendValidatorsSlashedNot(validators []domain.ValidatorIndex, epoch domain.Epoch) error
SendBlockProposalNot(validators []domain.ValidatorIndex, epoch domain.Epoch, proposed bool) error
SendCommitteeNotification(validators []domain.ValidatorIndex, epoch domain.Epoch) error
}
18 changes: 18 additions & 0 deletions internal/application/ports/validatorstorage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package ports

import (
"context"
)

// ValidatorStoragePort defines methods for persisting validator duty and proposal data
// in a hexagonal architecture.
type ValidatorStoragePort interface {
// UpsertValidatorEpochStatus inserts or updates validator epoch status.
UpsertValidatorEpochStatus(ctx context.Context, valIndex uint64, epoch uint64, liveness *bool, inSyncCommittee *bool, syncCommitteeReward *uint64, attestationReward *uint64, slashed *bool) error

// UpsertValidatorBlockProposal inserts or updates a block proposal for a validator.
UpsertValidatorBlockProposal(ctx context.Context, valIndex uint64, slot uint64, epoch uint64, blockReward *uint64) error

// UpsertValidatorMetadata inserts or updates validator metadata.
UpsertValidatorMetadata(ctx context.Context, valIndex uint64, label *string) error
}
Loading