diff --git a/cmd/main.go b/cmd/main.go index 3a30cd7..34cca70 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -13,6 +13,7 @@ import ( "github.com/dappnode/validator-tracker/internal/adapters/brain" "github.com/dappnode/validator-tracker/internal/adapters/dappmanager" "github.com/dappnode/validator-tracker/internal/adapters/notifier" + "github.com/dappnode/validator-tracker/internal/adapters/sqlite" "github.com/dappnode/validator-tracker/internal/application/domain" "github.com/dappnode/validator-tracker/internal/application/services" "github.com/dappnode/validator-tracker/internal/config" @@ -51,6 +52,12 @@ func main() { logger.Fatal("Failed to initialize beacon adapter. A live connection is required on startup: %v", err) } + // Initialize SQLite storage adapter + storage, err := sqlite.NewSQLiteStorage("./validator_tracker.db") + if err != nil { + logger.Fatal("Failed to initialize SQLite storage: %v", err) + } + // Prepare context and WaitGroup for graceful shutdown ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -66,6 +73,7 @@ func main() { SlashedNotified: make(map[domain.ValidatorIndex]bool), PreviouslyAllLive: true, // assume all validators were live at start PreviouslyOffline: false, + ValidatorStorage: storage, // <-- new field for storage } wg.Add(1) go func() { diff --git a/go.mod b/go.mod index a6dacce..2b59834 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.24.3 require ( github.com/attestantio/go-eth2-client v0.26.0 + github.com/mattn/go-sqlite3 v1.14.32 github.com/rs/zerolog v1.34.0 ) diff --git a/go.sum b/go.sum index 84f2f8c..42b39d1 100644 --- a/go.sum +++ b/go.sum @@ -64,6 +64,8 @@ github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/ github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mattn/go-sqlite3 v1.14.32 h1:JD12Ag3oLy1zQA+BNn74xRgaBbdhbNIDYvQUEuuErjs= +github.com/mattn/go-sqlite3 v1.14.32/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= github.com/minio/sha256-simd v1.0.1 h1:6kaan5IFmwTNynnKKpDHe6FWHohJOHhCPchzK49dzMM= diff --git a/internal/adapters/beacon/beacon.go b/internal/adapters/beacon/beacon.go index cf057a5..4ccc9d8 100644 --- a/internal/adapters/beacon/beacon.go +++ b/internal/adapters/beacon/beacon.go @@ -309,6 +309,28 @@ func (b *beaconAttestantClient) GetSlashedValidators(ctx context.Context, indice return slashedIndices, nil } +// GetSyncCommittee retrieves if the indices are in the current sync committee +func (b *beaconAttestantClient) GetSyncCommittee(ctx context.Context, epoch domain.Epoch, indices []domain.ValidatorIndex) (map[domain.ValidatorIndex]bool, error) { + if len(indices) == 0 { + logger.Debug("Called GetSyncCommittee with no validator indices, returning empty map. Nothing to check.") + return map[domain.ValidatorIndex]bool{}, nil + } + + epochVal := phase0.Epoch(epoch) + syncCommittee, err := b.client.SyncCommittee(ctx, &api.SyncCommitteeOpts{ + Epoch: &epochVal, + }) + if err != nil { + return nil, err + } + + syncCommitteeMap := make(map[domain.ValidatorIndex]bool) + for _, idx := range syncCommittee.Data.Validators { + syncCommitteeMap[domain.ValidatorIndex(idx)] = true + } + return syncCommitteeMap, nil +} + // enum for consensus client type ConsensusClient string diff --git a/internal/adapters/notifier/notifier.go b/internal/adapters/notifier/notifier.go index c670230..7b9deb7 100644 --- a/internal/adapters/notifier/notifier.go +++ b/internal/adapters/notifier/notifier.go @@ -211,6 +211,36 @@ func (n *Notifier) SendBlockProposalNot(validators []domain.ValidatorIndex, epoc return n.sendNotification(payload) } +// SendCommitteeNotification sends a notification when one or more validators have been in the sync committee. +func (n *Notifier) SendCommitteeNotification(validators []domain.ValidatorIndex, epoch domain.Epoch) error { + title := fmt.Sprintf("Validator(s) in Sync Committee: %s", indexesToString(validators, true)) + body := fmt.Sprintf("🟢 Validator(s) %s have been in the sync committee at epoch %d on %s.", indexesToString(validators, true), epoch, n.Network) + priority := Medium + status := Triggered + isBanner := true + correlationId := string(domain.Notifications.Committee) + beaconchaUrl := n.buildBeaconchaURL(validators) + var callToAction *CallToAction + if beaconchaUrl != "" { + callToAction = &CallToAction{ + Title: "Open in Explorer", + URL: beaconchaUrl, + } + } + payload := NotificationPayload{ + Title: title, + Body: body, + Category: &n.Category, + Priority: &priority, + IsBanner: &isBanner, + DnpName: &n.SignerDnpName, + Status: &status, + CorrelationId: &correlationId, + CallToAction: callToAction, + } + return n.sendNotification(payload) +} + // Helper to join validator indexes as comma-separated string // If truncate is true, only the first 10 are shown, then '...'. func indexesToString(indexes []domain.ValidatorIndex, truncate bool) string { diff --git a/internal/adapters/sqlite/sqlite_storage.go b/internal/adapters/sqlite/sqlite_storage.go new file mode 100644 index 0000000..740d163 --- /dev/null +++ b/internal/adapters/sqlite/sqlite_storage.go @@ -0,0 +1,109 @@ +package sqlite + +import ( + "context" + "database/sql" // basic sql + "fmt" + + _ "github.com/mattn/go-sqlite3" // additional driver for sqlite +) + +// Implements ports.ValidatorStoragePort + +type SQLiteStorage struct { + DB *sql.DB +} + +func NewSQLiteStorage(dbPath string) (*SQLiteStorage, error) { + db, err := sql.Open("sqlite3", dbPath) + if err != nil { + return nil, fmt.Errorf("failed to open sqlite db: %w", err) + } + if err := migrate(db); err != nil { + return nil, fmt.Errorf("failed to migrate sqlite db: %w", err) + } + return &SQLiteStorage{DB: db}, nil +} + +func migrate(db *sql.DB) error { + queries := []string{ + `CREATE TABLE IF NOT EXISTS validator_epoch_status ( + validator_index INTEGER NOT NULL, + epoch INTEGER NOT NULL, + liveness BOOLEAN, + in_sync_committee BOOLEAN, + sync_committee_reward INTEGER, + attestation_reward INTEGER, + slashed BOOLEAN, + updated_at DATETIME DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (validator_index, epoch) + );`, + `CREATE TABLE IF NOT EXISTS validator_block_proposals ( + validator_index INTEGER NOT NULL, + slot INTEGER NOT NULL, + epoch INTEGER NOT NULL, + block_reward INTEGER, + PRIMARY KEY (validator_index, slot) + );`, + `CREATE TABLE IF NOT EXISTS validators ( + validator_index INTEGER PRIMARY KEY, + label TEXT, + added_at DATETIME DEFAULT CURRENT_TIMESTAMP + );`, + `CREATE INDEX IF NOT EXISTS idx_epoch ON validator_epoch_status(epoch);`, + `CREATE INDEX IF NOT EXISTS idx_validator_epoch ON validator_epoch_status(validator_index, epoch);`, + `CREATE INDEX IF NOT EXISTS idx_proposals_epoch ON validator_block_proposals(epoch);`, + `CREATE INDEX IF NOT EXISTS idx_proposals_slot ON validator_block_proposals(slot);`, + } + for _, q := range queries { + if _, err := db.Exec(q); err != nil { + return err + } + } + return nil +} + +// Upsert = Insert or Update. If a record with the same primary key exists, it updates the existing record. +// If the record does not exist, it inserts a new record. + +// UpsertValidatorEpochStatus inserts or updates validator epoch status. It will update fields if the record exists. +// If any of parameters are nil, the corresponding fields will be set to NULL in the database. +func (s *SQLiteStorage) UpsertValidatorEpochStatus(ctx context.Context, validatorIndex uint64, epoch uint64, liveness *bool, inSyncCommittee *bool, syncCommitteeReward *uint64, attestationReward *uint64, slashed *bool) error { + _, err := s.DB.ExecContext(ctx, + `INSERT INTO validator_epoch_status (validator_index, epoch, liveness, in_sync_committee, sync_committee_reward, attestation_reward, slashed) + VALUES (?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(validator_index, epoch) DO UPDATE SET + liveness=excluded.liveness, + in_sync_committee=excluded.in_sync_committee, + sync_committee_reward=excluded.sync_committee_reward, + attestation_reward=excluded.attestation_reward, + slashed=excluded.slashed, + updated_at=CURRENT_TIMESTAMP;`, + validatorIndex, epoch, liveness, inSyncCommittee, syncCommitteeReward, attestationReward, slashed, + ) + return err +} + +// UpsertValidatorBlockProposal inserts or updates a block proposal for a validator. It will update the block_reward if the record exists. +// If blockReward is nil, the block_reward field will be set to NULL in the database. +func (s *SQLiteStorage) UpsertValidatorBlockProposal(ctx context.Context, validatorIndex uint64, slot uint64, epoch uint64, blockReward *uint64) error { + _, err := s.DB.ExecContext(ctx, + `INSERT INTO validator_block_proposals (validator_index, slot, epoch, block_reward) + VALUES (?, ?, ?, ?) + ON CONFLICT(validator_index, slot) DO UPDATE SET + block_reward=excluded.block_reward;`, + validatorIndex, slot, epoch, blockReward, + ) + return err +} + +func (s *SQLiteStorage) UpsertValidatorMetadata(ctx context.Context, validatorIndex uint64, label *string) error { + _, err := s.DB.ExecContext(ctx, + `INSERT INTO validators (validator_index, label) + VALUES (?, ?) + ON CONFLICT(validator_index) DO UPDATE SET + label=excluded.label;`, + validatorIndex, label, + ) + return err +} diff --git a/internal/application/domain/notification.go b/internal/application/domain/notification.go index 08783f6..d7b6fe3 100644 --- a/internal/application/domain/notification.go +++ b/internal/application/domain/notification.go @@ -5,17 +5,19 @@ type ValidatorNotificationsEnabled map[ValidatorNotification]bool type ValidatorNotification string type validatorNotifications struct { - Liveness ValidatorNotification - Slashed ValidatorNotification - Proposal ValidatorNotification + Liveness ValidatorNotification + Slashed ValidatorNotification + Proposal ValidatorNotification + Committee ValidatorNotification } var Notifications validatorNotifications func InitNotifications(network string) { Notifications = validatorNotifications{ - Liveness: ValidatorNotification(network + "-validator-liveness"), - Slashed: ValidatorNotification(network + "-validator-slashed"), - Proposal: ValidatorNotification(network + "-block-proposal"), + Liveness: ValidatorNotification(network + "-validator-liveness"), + Slashed: ValidatorNotification(network + "-validator-slashed"), + Proposal: ValidatorNotification(network + "-block-proposal"), + Committee: ValidatorNotification(network + "-validator-committee"), } } diff --git a/internal/application/ports/beaconchain.go b/internal/application/ports/beaconchain.go index 60bd533..61dd78a 100644 --- a/internal/application/ports/beaconchain.go +++ b/internal/application/ports/beaconchain.go @@ -20,4 +20,5 @@ type BeaconChainAdapter interface { DidProposeBlock(ctx context.Context, slot domain.Slot) (bool, error) GetValidatorsLiveness(ctx context.Context, epoch domain.Epoch, indices []domain.ValidatorIndex) (map[domain.ValidatorIndex]bool, error) + GetSyncCommittee(ctx context.Context, epoch domain.Epoch, indices []domain.ValidatorIndex) (map[domain.ValidatorIndex]bool, error) } diff --git a/internal/application/ports/notifier.go b/internal/application/ports/notifier.go index d67a1f8..df36159 100644 --- a/internal/application/ports/notifier.go +++ b/internal/application/ports/notifier.go @@ -6,4 +6,5 @@ type NotifierPort interface { SendValidatorLivenessNot(validators []domain.ValidatorIndex, epoch domain.Epoch, live bool) error SendValidatorsSlashedNot(validators []domain.ValidatorIndex, epoch domain.Epoch) error SendBlockProposalNot(validators []domain.ValidatorIndex, epoch domain.Epoch, proposed bool) error + SendCommitteeNotification(validators []domain.ValidatorIndex, epoch domain.Epoch) error } diff --git a/internal/application/ports/validatorstorage.go b/internal/application/ports/validatorstorage.go new file mode 100644 index 0000000..e30e9bd --- /dev/null +++ b/internal/application/ports/validatorstorage.go @@ -0,0 +1,18 @@ +package ports + +import ( + "context" +) + +// ValidatorStoragePort defines methods for persisting validator duty and proposal data +// in a hexagonal architecture. +type ValidatorStoragePort interface { + // UpsertValidatorEpochStatus inserts or updates validator epoch status. + UpsertValidatorEpochStatus(ctx context.Context, valIndex uint64, epoch uint64, liveness *bool, inSyncCommittee *bool, syncCommitteeReward *uint64, attestationReward *uint64, slashed *bool) error + + // UpsertValidatorBlockProposal inserts or updates a block proposal for a validator. + UpsertValidatorBlockProposal(ctx context.Context, valIndex uint64, slot uint64, epoch uint64, blockReward *uint64) error + + // UpsertValidatorMetadata inserts or updates validator metadata. + UpsertValidatorMetadata(ctx context.Context, valIndex uint64, label *string) error +} diff --git a/internal/application/services/dutieschecker_service.go b/internal/application/services/dutieschecker_service.go index 10a670f..0a6b60f 100644 --- a/internal/application/services/dutieschecker_service.go +++ b/internal/application/services/dutieschecker_service.go @@ -2,6 +2,7 @@ package services import ( "context" + "slices" "time" "github.com/dappnode/validator-tracker/internal/application/domain" @@ -24,6 +25,8 @@ type DutiesChecker struct { // Tracking previous states for notifications PreviouslyAllLive bool PreviouslyOffline bool + + ValidatorStorage ports.ValidatorStoragePort // <-- added field for storage } func (a *DutiesChecker) Run(ctx context.Context) { @@ -119,20 +122,22 @@ func (a *DutiesChecker) performChecks(ctx context.Context, justifiedEpoch domain a.PreviouslyOffline = false } - // Check block proposals (successful or missed) - proposed, missed, err := a.checkProposals(ctx, justifiedEpoch, indices) + // Fetch sync committee membership for this epoch + syncCommitteeMap, err := a.Beacon.GetSyncCommittee(ctx, justifiedEpoch, indices) if err != nil { - logger.Error("Error checking block proposals: %v", err) - return err - } - if len(proposed) > 0 && notificationsEnabled[domain.Notifications.Proposal] { - if err := a.Notifier.SendBlockProposalNot(proposed, justifiedEpoch, true); err != nil { - logger.Warn("Error sending block proposal notification: %v", err) + logger.Warn("Error fetching sync committee membership: %v", err) + } else { + var inCommittee []domain.ValidatorIndex + for _, idx := range indices { + if syncCommitteeMap[idx] { + inCommittee = append(inCommittee, idx) + } } - } - if len(missed) > 0 && notificationsEnabled[domain.Notifications.Proposal] { - if err := a.Notifier.SendBlockProposalNot(missed, justifiedEpoch, false); err != nil { - logger.Warn("Error sending block proposal notification: %v", err) + if len(inCommittee) > 0 && notificationsEnabled[domain.Notifications.Committee] { + logger.Info("Sending committee notification for validators: %v", inCommittee) + if err := a.Notifier.SendCommitteeNotification(inCommittee, justifiedEpoch); err != nil { + logger.Warn("Error sending committee notification: %v", err) + } } } @@ -158,6 +163,73 @@ func (a *DutiesChecker) performChecks(ctx context.Context, justifiedEpoch domain } } + // Check block proposals (successful or missed) + proposed, missed, err := a.checkProposals(ctx, justifiedEpoch, indices) + if err != nil { + logger.Error("Error checking block proposals: %v", err) + return err + } + if len(proposed) > 0 && notificationsEnabled[domain.Notifications.Proposal] { + proposedIndices := make([]domain.ValidatorIndex, len(proposed)) + for i, p := range proposed { + proposedIndices[i] = p.ValidatorIndex + } + if err := a.Notifier.SendBlockProposalNot(proposedIndices, justifiedEpoch, true); err != nil { + logger.Warn("Error sending block proposal notification: %v", err) + } + } + if len(missed) > 0 && notificationsEnabled[domain.Notifications.Proposal] { + missedIndices := make([]domain.ValidatorIndex, len(missed)) + for i, m := range missed { + missedIndices[i] = m.ValidatorIndex + } + if err := a.Notifier.SendBlockProposalNot(missedIndices, justifiedEpoch, false); err != nil { + logger.Warn("Error sending block proposal notification: %v", err) + } + } + + // Persist block proposal data + for _, p := range proposed { + if err := a.ValidatorStorage.UpsertValidatorBlockProposal(ctx, uint64(p.ValidatorIndex), uint64(p.Slot), uint64(justifiedEpoch), nil); err != nil { + logger.Warn("Failed to persist block proposal for validator %d: %v", p.ValidatorIndex, err) + } + } + for _, m := range missed { + if err := a.ValidatorStorage.UpsertValidatorBlockProposal(ctx, uint64(m.ValidatorIndex), uint64(m.Slot), uint64(justifiedEpoch), nil); err != nil { + logger.Warn("Failed to persist missed proposal for validator %d: %v", m.ValidatorIndex, err) + } + } + + // Persist liveness, committee, attestation reward, and slashed status for all checked validators + for _, idx := range indices { + var liveness *bool + isLive := slices.Contains(online, idx) + liveness = new(bool) + *liveness = isLive + + var inSyncCommittee *bool + if syncCommitteeMap != nil { + val := syncCommitteeMap[idx] + inSyncCommittee = new(bool) + *inSyncCommittee = val + } + + var slashedFlag *bool + isSlashed := slices.Contains(slashed, idx) + slashedFlag = new(bool) + *slashedFlag = isSlashed + + var attestationReward *uint64 + var syncCommitteeReward *uint64 + // TODO: fetch attestation and sync committee rewards if available. For now, set to nil. + attestationReward = nil + syncCommitteeReward = nil + + if err := a.ValidatorStorage.UpsertValidatorEpochStatus(ctx, uint64(idx), uint64(justifiedEpoch), liveness, inSyncCommittee, syncCommitteeReward, attestationReward, slashedFlag); err != nil { + logger.Warn("Failed to persist epoch status for validator %d: %v", idx, err) + } + } + return nil } @@ -195,7 +267,7 @@ func (a *DutiesChecker) checkProposals( ctx context.Context, epochToTrack domain.Epoch, indices []domain.ValidatorIndex, -) (proposed []domain.ValidatorIndex, missed []domain.ValidatorIndex, err error) { +) (proposed []domain.ProposerDuty, missed []domain.ProposerDuty, err error) { proposerDuties, err := a.Beacon.GetProposerDuties(ctx, epochToTrack, indices) if err != nil { return nil, nil, err @@ -213,10 +285,10 @@ func (a *DutiesChecker) checkProposals( continue } if didPropose { - proposed = append(proposed, duty.ValidatorIndex) + proposed = append(proposed, duty) logger.Info("✅ Validator %d successfully proposed a block at slot %d", duty.ValidatorIndex, duty.Slot) } else { - missed = append(missed, duty.ValidatorIndex) + missed = append(missed, duty) logger.Warn("❌ Validator %d was scheduled to propose at slot %d but did not", duty.ValidatorIndex, duty.Slot) } }