diff --git a/config/settings.go b/config/settings.go index 9224072..c19cf1c 100644 --- a/config/settings.go +++ b/config/settings.go @@ -24,6 +24,9 @@ type Settings struct { Prewarm bool `json:"prewarm,omitempty"` RampUp bool `json:"rampUp,omitempty"` ReportPath string `json:"reportPath,omitempty"` + TxsDir string `json:"txsDir,omitempty"` + TargetGas uint64 `json:"targetGas,omitempty"` + NumBlocksToWrite int `json:"numBlocksToWrite,omitempty"` } // DefaultSettings returns the default configuration values @@ -41,6 +44,9 @@ func DefaultSettings() Settings { Prewarm: false, RampUp: false, ReportPath: "", + TxsDir: "", + TargetGas: 10_000_000, + NumBlocksToWrite: 100, } } @@ -60,6 +66,9 @@ func InitializeViper(cmd *cobra.Command) error { "workers": "workers", "rampUp": "ramp-up", "reportPath": "report-path", + "txsDir": "txs-dir", + "targetGas": "target-gas", + "numBlocksToWrite": "num-blocks-to-write", } for viperKey, flagName := range flagBindings { @@ -82,6 +91,9 @@ func InitializeViper(cmd *cobra.Command) error { viper.SetDefault("workers", defaults.Workers) viper.SetDefault("rampUp", defaults.RampUp) viper.SetDefault("reportPath", defaults.ReportPath) + viper.SetDefault("txsDir", defaults.TxsDir) + viper.SetDefault("targetGas", defaults.TargetGas) + viper.SetDefault("numBlocksToWrite", defaults.NumBlocksToWrite) return nil } @@ -120,5 +132,8 @@ func ResolveSettings() Settings { Prewarm: viper.GetBool("prewarm"), RampUp: viper.GetBool("rampUp"), ReportPath: viper.GetString("reportPath"), + TxsDir: viper.GetString("txsDir"), + TargetGas: viper.GetUint64("targetGas"), + NumBlocksToWrite: viper.GetInt("numBlocksToWrite"), } } diff --git a/config/settings_test.go b/config/settings_test.go index 95f54ae..cd8ccb2 100644 --- a/config/settings_test.go +++ b/config/settings_test.go @@ -93,6 +93,9 @@ func TestArgumentPrecedence(t *testing.T) { cmd.Flags().Int("buffer-size", 0, "Buffer size") cmd.Flags().Bool("ramp-up", false, "Ramp up loadtest") cmd.Flags().String("report-path", "", "Report path") + cmd.Flags().String("txs-dir", "", "Txs dir") + cmd.Flags().Uint64("target-gas", 0, "Target gas") + cmd.Flags().Int("num-blocks-to-write", 0, "Number of blocks to write") // Parse CLI args if len(tt.cliArgs) > 0 { @@ -133,6 +136,9 @@ func TestDefaultSettings(t *testing.T) { Prewarm: false, RampUp: false, ReportPath: "", + TxsDir: "", + TargetGas: 10_000_000, + NumBlocksToWrite: 100, } if defaults != expected { diff --git a/main.go b/main.go index 67111bc..cf61350 100644 --- a/main.go +++ b/main.go @@ -12,6 +12,7 @@ import ( "syscall" "time" + "github.com/ethereum/go-ethereum/ethclient" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/spf13/cobra" "go.opentelemetry.io/otel" @@ -65,6 +66,9 @@ func init() { rootCmd.Flags().String("metricsListenAddr", "0.0.0.0:9090", "The ip:port on which to export prometheus metrics.") rootCmd.Flags().Bool("ramp-up", false, "Ramp up loadtest") rootCmd.Flags().String("report-path", "", "Path to save the report") + rootCmd.Flags().String("txs-dir", "", "Path to save the transactions") + rootCmd.Flags().Uint64("target-gas", 10_000_000, "Target gas per block") + rootCmd.Flags().Int("num-blocks-to-write", 100, "Number of blocks to write") // Initialize Viper with proper error handling if err := config.InitializeViper(rootCmd); err != nil { @@ -169,12 +173,6 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error { sharedLimiter = rate.NewLimiter(rate.Inf, 1) } - // Create the sender from the config struct - snd, err := sender.NewShardedSender(cfg, settings.BufferSize, settings.Workers, sharedLimiter) - if err != nil { - return fmt.Errorf("failed to create sender: %w", err) - } - // Create and start block collector if endpoints are available var blockCollector *stats.BlockCollector if len(cfg.Endpoints) > 0 && settings.TrackBlocks { @@ -207,6 +205,12 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error { }) } + // Create the sender from the config struct + snd, err := sender.NewShardedSender(cfg, settings.BufferSize, settings.Workers, sharedLimiter) + if err != nil { + return fmt.Errorf("failed to create sender: %w", err) + } + // Enable dry-run mode in sender if specified if settings.DryRun { snd.SetDryRun(true) @@ -225,7 +229,25 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error { snd.SetStatsCollector(collector, logger) // Create dispatcher - dispatcher := sender.NewDispatcher(gen, snd) + var dispatcher *sender.Dispatcher + if settings.TxsDir != "" { + // get latest height + ethclient, err := ethclient.Dial(cfg.Endpoints[0]) + if err != nil { + return fmt.Errorf("failed to create ethclient: %w", err) + } + latestHeight, err := ethclient.BlockNumber(ctx) + if err != nil { + return fmt.Errorf("failed to get latest height: %w", err) + } + numBlocksToWrite := settings.NumBlocksToWrite + writerHeight := latestHeight + 10 // some buffer + log.Printf("🔍 Latest height: %d, writer start height: %d", latestHeight, writerHeight) + writer := sender.NewTxsWriter(settings.TargetGas, settings.TxsDir, writerHeight, uint64(numBlocksToWrite)) + dispatcher = sender.NewDispatcher(gen, writer) + } else { + dispatcher = sender.NewDispatcher(gen, snd) + } // Set statistics collector for dispatcher dispatcher.SetStatsCollector(collector) @@ -239,10 +261,11 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error { log.Printf("📝 Prewarm mode: Accounts will be prewarmed") } - // Start the sender (starts all workers) - s.SpawnBgNamed("sender", func() error { return snd.Run(ctx) }) - log.Printf("✅ Connected to %d endpoints", snd.GetNumShards()) - + if settings.TxsDir == "" { + // Start the sender (starts all workers) + s.SpawnBgNamed("sender", func() error { return snd.Run(ctx) }) + log.Printf("✅ Connected to %d endpoints", snd.GetNumShards()) + } // Perform prewarming if enabled (before starting logger to avoid logging prewarm transactions) if settings.Prewarm { if err := dispatcher.Prewarm(ctx); err != nil { diff --git a/profiles/conflict.json b/profiles/conflict.json new file mode 100644 index 0000000..6eca68a --- /dev/null +++ b/profiles/conflict.json @@ -0,0 +1,30 @@ +{ + "chainId": 713714, + "seiChainId": "sei-chain", + "endpoints": [ + "http://127.0.0.1:8545" + ], + "accounts": { + "count": 5000, + "newAccountRate": 0.0 + }, + "scenarios": [ + { + "name": "ERC20Conflict", + "weight": 1 + } + ], + "settings": { + "workers": 1, + "tps": 0, + "statsInterval": "5s", + "bufferSize": 1000, + "dryRun": false, + "debug": false, + "trackReceipts": false, + "trackBlocks": false, + "trackUserLatency": false, + "prewarm": false, + "rampUp": false + } +} \ No newline at end of file diff --git a/profiles/evm_transfer_write_file.json b/profiles/evm_transfer_write_file.json new file mode 100644 index 0000000..1fc9795 --- /dev/null +++ b/profiles/evm_transfer_write_file.json @@ -0,0 +1,33 @@ +{ + "chainId": 713714, + "seiChainId": "sei-chain", + "endpoints": [ + "http://127.0.0.1:8545" + ], + "accounts": { + "count": 5000, + "newAccountRate": 0.0 + }, + "scenarios": [ + { + "name": "EVMTransfer", + "weight": 1 + } + ], + "settings": { + "workers": 1, + "tps": 0, + "statsInterval": "5s", + "bufferSize": 1000, + "dryRun": false, + "debug": false, + "trackReceipts": false, + "trackBlocks": false, + "trackUserLatency": false, + "prewarm": false, + "rampUp": false, + "txsDir": "/root/load_txs", + "targetGas": 30000000, + "numBlocksToWrite": 1000 + } + } diff --git a/sender/writer.go b/sender/writer.go new file mode 100644 index 0000000..8be5542 --- /dev/null +++ b/sender/writer.go @@ -0,0 +1,98 @@ +package sender + +import ( + "context" + "encoding/json" + "fmt" + "log" + "os" + "path/filepath" + + "github.com/sei-protocol/sei-load/types" +) + +// implements `Send` + +type TxsWriter struct { + gasPerBlock uint64 + nextHeight uint64 + txsDir string + blocksGenerated uint64 + numBlocks uint64 + + bufferGas uint64 + txBuffer []*types.LoadTx +} + +func NewTxsWriter(gasPerBlock uint64, txsDir string, startHeight uint64, numBlocks uint64) *TxsWriter { + // what height to start at? + return &TxsWriter{ + gasPerBlock: gasPerBlock, + nextHeight: startHeight, + txsDir: txsDir, + blocksGenerated: 0, + numBlocks: numBlocks, + + bufferGas: 0, + txBuffer: make([]*types.LoadTx, 0), + } +} + +// Send writes the transaction to the writer +func (w *TxsWriter) Send(ctx context.Context, tx *types.LoadTx) error { + // if bwe would exceed gasPerBlock, flush + if w.bufferGas+tx.EthTx.Gas() > w.gasPerBlock { + if err := w.Flush(); err != nil { + return err + } + } + + // add to buffer + w.txBuffer = append(w.txBuffer, tx) + w.bufferGas += tx.EthTx.Gas() + return nil +} + +type TxWriteData struct { + TxPayloads [][]byte `json:"tx_payloads"` +} + +func (w *TxsWriter) Flush() error { + defer func() { + // clear buffer and reset bufferGas and increment nextHeight + w.txBuffer = make([]*types.LoadTx, 0) + w.bufferGas = 0 + w.nextHeight++ + w.blocksGenerated++ + }() + // write to dir `~/load_txs` + // make dir if it doesn't exist + err := os.MkdirAll(w.txsDir, 0755) + if err != nil { + return err + } + txsFile := filepath.Join(w.txsDir, fmt.Sprintf("%d_txs.json", w.nextHeight)) + txData := TxWriteData{ + TxPayloads: make([][]byte, 0), + } + for _, tx := range w.txBuffer { + txData.TxPayloads = append(txData.TxPayloads, tx.Payload) + } + + txDataJSON, err := json.Marshal(txData) + if err != nil { + return err + } + + if err := os.WriteFile(txsFile, txDataJSON, 0644); err != nil { + return err + } + + log.Printf("Flushed %d transactions to %s", len(w.txBuffer), txsFile) + + if w.blocksGenerated >= w.numBlocks { + return fmt.Errorf("reached max number of blocks: %d", w.numBlocks) + } + + return nil +} diff --git a/sender/writer_test.go b/sender/writer_test.go new file mode 100644 index 0000000..e0247cc --- /dev/null +++ b/sender/writer_test.go @@ -0,0 +1,58 @@ +package sender + +import ( + "context" + "testing" + + "github.com/sei-protocol/sei-load/config" + "github.com/sei-protocol/sei-load/generator" + "github.com/sei-protocol/sei-load/generator/scenarios" + "github.com/sei-protocol/sei-load/types" + "github.com/stretchr/testify/require" +) + +func TestTxsWriter_Flush(t *testing.T) { + // two evm transfer txs + writer := NewTxsWriter(42000, "/tmp", 1, 1) + + loadConfig := &config.LoadConfig{ + ChainID: 7777, + } + + sharedAccounts := types.NewAccountPool(&types.AccountConfig{ + Accounts: types.GenerateAccounts(10), + NewAccountRate: 0.0, + }) + + evmScenario := scenarios.CreateScenario(config.Scenario{ + Name: "EVMTransfer", + Weight: 1, + }) + evmScenario.Deploy(loadConfig, sharedAccounts.NextAccount()) + + generator := generator.NewScenarioGenerator(sharedAccounts, evmScenario) + + txs := generator.GenerateN(3) + + err := writer.Send(context.Background(), txs[0]) + require.NoError(t, err) + require.Equal(t, uint64(1), writer.nextHeight) + require.Equal(t, uint64(21000), writer.bufferGas) + require.Len(t, writer.txBuffer, 1) + require.Equal(t, txs[0], writer.txBuffer[0]) + + err = writer.Send(context.Background(), txs[1]) + require.NoError(t, err) + require.Equal(t, uint64(1), writer.nextHeight) + require.Equal(t, uint64(42000), writer.bufferGas) + require.Len(t, writer.txBuffer, 2) + require.Equal(t, txs[1], writer.txBuffer[1]) + + err = writer.Send(context.Background(), txs[2]) + require.NoError(t, err) + // now should be flushed and have the new tx + require.Equal(t, uint64(2), writer.nextHeight) + require.Equal(t, uint64(21000), writer.bufferGas) + require.Len(t, writer.txBuffer, 1) + +}