diff --git a/README.md b/README.md index f21b46d..0464bc1 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,32 @@ A collection of components for building the Signet node. These components implement core node functionality, but are potentially indepedently useful. -### What's new in Signet? +## What's in the Components? + +- **signet-node-types** - Shim types wrapping reth's internal node types + system to make it more usable in Signet. +- **signet-blobber** - Blob retrieval and parsing, using blob explorers, + Signet's Pylon, and the local node transaction API. +- **signet-rpc** - An Ethereum JSON-RPC Server for Signet nodes. Makes heavy + use of reth internals. +- **signet-db** - An extension of reth's database, providing a Signet-specific + database schema and utilities for working with Signet blocks and transactions. + +### Contributing to the Node Components + +Please see [CONTRIBUTING.md](CONTRIBUTING.md). + +[Signet docs]: https://docs.signet.sh + +## Note on Semver + +This repo is UNPUBLISHED and may NOT respect semantic versioning between tagged +versions. In general, it is versioned to match the signet-sdk version with +which it is compatible. I.e. `node-components@0.8.x` is expected to be +compatible with any signet-sdk `0.8.x` version. However, a release of +`node-components@0.8.1` may have breaking changes from `node-components@0.8.0`. + +## What's new in Signet? Signet is a pragmatic Ethereum rollup that offers a new set of ideas and aims to radically modernize rollup technology. @@ -22,20 +47,3 @@ knowledge. Signet does not have a native token. Signet is just a rollup. See the [Signet docs] for more info. - -### What's in the Components? - -- **signet-node-types** - Shim types wrapping reth's internal node types - system to make it more usable in Signet. -- **signet-blobber** - Blob retrieval and parsing, using blob explorers, - Signet's Pylon, and the local node transaction API. -- **signet-rpc** - An Ethereum JSON-RPC Server for Signet nodes. Makes heavy - use of reth internals. -- **signet-db** - An extension of reth's database, providing a Signet-specific - database schema and utilities for working with Signet blocks and transactions. - -### Contributing to the Node Components - -Please see [CONTRIBUTING.md](CONTRIBUTING.md). - -[Signet docs]: https://docs.signet.sh diff --git a/crates/blobber/README.md b/crates/blobber/README.md index 28e304c..717c64f 100644 --- a/crates/blobber/README.md +++ b/crates/blobber/README.md @@ -1,9 +1,13 @@ # Block Extractor -The [`BlockExtractor`] retrieves blobs from host chain blocks and parses them +The [`BlobFetcher`] retrieves blobs from host chain blocks and parses them into [`ZenithBlock`]s. It is used by the node during notification processing when a [`Zenith::BlockSubmitted`] event is extracted from a host chain block. +The [`BlobCacher`] is a wrapper around the [`BlobFetcher`] that caches +blobs in an in-memory cache. It is used to avoid fetching the same blob and to +manage retry logic during fetching. + ## Data Sources The following sources can be configured: diff --git a/crates/blobber/src/builder.rs b/crates/blobber/src/builder.rs index 074a8bc..2ab29cf 100644 --- a/crates/blobber/src/builder.rs +++ b/crates/blobber/src/builder.rs @@ -1,10 +1,10 @@ -use crate::{BlockExtractorConfig, block_data::BlockExtractor}; +use crate::{BlobCacher, BlobFetcher, BlobFetcherConfig}; use init4_bin_base::utils::calc::SlotCalculator; use reth::transaction_pool::TransactionPool; use url::Url; -/// Errors that can occur while building the [`BlockExtractor`] with a -/// [`BlockExtractorBuilder`]. +/// Errors that can occur while building the [`BlobFetcher`] with a +/// [`BlobFetcherBuilder`]. #[derive(Debug, thiserror::Error)] pub enum BuilderError { /// The transaction pool was not provided. @@ -27,9 +27,9 @@ pub enum BuilderError { MissingSlotCalculator, } -/// Builder for the [`BlockExtractor`]. +/// Builder for the [`BlobFetcher`]. #[derive(Debug, Default, Clone)] -pub struct BlockExtractorBuilder { +pub struct BlobFetcherBuilder { pool: Option, explorer_url: Option, client: Option, @@ -38,10 +38,10 @@ pub struct BlockExtractorBuilder { slot_calculator: Option, } -impl BlockExtractorBuilder { +impl BlobFetcherBuilder { /// Set the transaction pool to use for the extractor. - pub fn with_pool(self, pool: P2) -> BlockExtractorBuilder { - BlockExtractorBuilder { + pub fn with_pool(self, pool: P2) -> BlobFetcherBuilder { + BlobFetcherBuilder { pool: Some(pool), explorer_url: self.explorer_url, client: self.client, @@ -53,15 +53,13 @@ impl BlockExtractorBuilder { /// Set the transaction pool to use a mock test pool. #[cfg(feature = "test-utils")] - pub fn with_test_pool( - self, - ) -> BlockExtractorBuilder { + pub fn with_test_pool(self) -> BlobFetcherBuilder { self.with_pool(reth_transaction_pool::test_utils::testing_pool()) } /// Set the configuration for the CL url, pylon url, from the provided - /// [`BlockExtractorConfig`]. - pub fn with_config(self, config: &BlockExtractorConfig) -> Result { + /// [`BlobFetcherConfig`]. + pub fn with_config(self, config: &BlobFetcherConfig) -> Result { let this = self.with_explorer_url(config.blob_explorer_url()); let this = if let Some(cl_url) = config.cl_url() { this.with_cl_url(cl_url)? } else { this }; @@ -114,22 +112,22 @@ impl BlockExtractorBuilder { pub const fn with_slot_calculator( mut self, slot_calculator: SlotCalculator, - ) -> BlockExtractorBuilder { + ) -> BlobFetcherBuilder { self.slot_calculator = Some(slot_calculator); self } /// Set the slot calculator to use for the extractor, using the Pecornino /// host configuration. - pub const fn with_pecornino_slots(mut self) -> BlockExtractorBuilder { + pub const fn with_pecornino_slots(mut self) -> BlobFetcherBuilder { self.slot_calculator = Some(SlotCalculator::pecorino_host()); self } } -impl BlockExtractorBuilder { - /// Build the [`BlockExtractor`] with the provided parameters. - pub fn build(self) -> Result, BuilderError> { +impl BlobFetcherBuilder { + /// Build the [`BlobFetcher`] with the provided parameters. + pub fn build(self) -> Result, BuilderError> { let pool = self.pool.ok_or(BuilderError::MissingPool)?; let explorer_url = self.explorer_url.ok_or(BuilderError::MissingExplorerUrl)?; @@ -145,7 +143,16 @@ impl BlockExtractorBuilder { let slot_calculator = self.slot_calculator.ok_or(BuilderError::MissingSlotCalculator)?; - Ok(BlockExtractor::new(pool, explorer, client, cl_url, pylon_url, slot_calculator)) + Ok(BlobFetcher::new(pool, explorer, client, cl_url, pylon_url, slot_calculator)) + } + + /// Build a [`BlobCacher`] with the provided parameters. + pub fn build_cache(self) -> Result, BuilderError> + where + Pool: 'static, + { + let fetcher = self.build()?; + Ok(BlobCacher::new(fetcher)) } } diff --git a/crates/blobber/src/cache.rs b/crates/blobber/src/cache.rs new file mode 100644 index 0000000..bdda09d --- /dev/null +++ b/crates/blobber/src/cache.rs @@ -0,0 +1,297 @@ +use crate::{BlobFetcherError, Blobs, FetchResult}; +use alloy::consensus::{SidecarCoder, SimpleCoder, Transaction as _}; +use alloy::eips::eip7691::MAX_BLOBS_PER_BLOCK_ELECTRA; +use alloy::eips::merge::EPOCH_SLOTS; +use alloy::primitives::{B256, Bytes, keccak256}; +use reth::transaction_pool::TransactionPool; +use reth::{network::cache::LruMap, primitives::Receipt}; +use signet_extract::ExtractedEvent; +use signet_zenith::Zenith::BlockSubmitted; +use signet_zenith::ZenithBlock; +use std::{ + sync::{Arc, Mutex}, + time::Duration, +}; +use tokio::sync::{mpsc, oneshot}; +use tracing::{error, info, instrument, warn}; + +const BLOB_CACHE_SIZE: u32 = (MAX_BLOBS_PER_BLOCK_ELECTRA * EPOCH_SLOTS) as u32; +const CACHE_REQUEST_CHANNEL_SIZE: usize = (MAX_BLOBS_PER_BLOCK_ELECTRA * 2) as usize; +const FETCH_RETRIES: usize = 3; +const BETWEEN_RETRIES: Duration = Duration::from_millis(250); + +/// Instructions for the cache. +/// +/// These instructions are sent to the cache handle to perform operations like +/// retrieving blobs. +#[derive(Debug)] +enum CacheInst { + Retrieve { slot: usize, tx_hash: B256, version_hashes: Vec, resp: oneshot::Sender }, +} + +/// Handle for the cache. +#[derive(Debug, Clone)] +pub struct CacheHandle { + sender: mpsc::Sender, +} + +impl CacheHandle { + /// Sends a cache instruction. + async fn send(&self, inst: CacheInst) { + let _ = self.sender.send(inst).await; + } + + /// Fetches blobs from the cache. This triggers a background task to + /// fetch blobs if they are not found in the cache. + pub async fn fetch_blobs( + &self, + slot: usize, + tx_hash: B256, + version_hashes: Vec, + ) -> FetchResult { + let (resp, receiver) = oneshot::channel(); + + self.send(CacheInst::Retrieve { slot, tx_hash, version_hashes, resp }).await; + + receiver.await.map_err(|_| BlobFetcherError::missing_sidecar(tx_hash)) + } + + /// Fetch the blobs using [`Self::fetch_blobs`] and decode them to get the + /// Zenith block data using the provided coder. + pub async fn fetch_and_decode_with_coder( + &self, + slot: usize, + extract: &ExtractedEvent<'_, Receipt, BlockSubmitted>, + mut coder: C, + ) -> FetchResult { + let tx_hash = extract.tx_hash(); + let versioned_hashes = extract + .tx + .as_eip4844() + .ok_or_else(BlobFetcherError::non_4844_transaction)? + .blob_versioned_hashes() + .expect("tx is eip4844"); + + let blobs = self.fetch_blobs(slot, tx_hash, versioned_hashes.to_owned()).await?; + + coder + .decode_all(blobs.as_ref()) + .ok_or_else(BlobFetcherError::blob_decode_error)? + .into_iter() + .find(|data| keccak256(data) == extract.block_data_hash()) + .map(Into::into) + .ok_or_else(|| BlobFetcherError::block_data_not_found(tx_hash)) + } + + /// Fetch the blobs using [`Self::fetch_blobs`] and decode them using + /// [`SimpleCoder`] to get the Zenith block data. + pub async fn fech_and_decode( + &self, + slot: usize, + extract: &ExtractedEvent<'_, Receipt, BlockSubmitted>, + ) -> FetchResult { + self.fetch_and_decode_with_coder(slot, extract, SimpleCoder::default()).await + } + + /// Fetch the blobs, decode them using the provided coder, and construct a + /// Zenith block from the header and data. + pub async fn signet_block_with_coder( + &self, + host_block_number: u64, + slot: usize, + extract: &ExtractedEvent<'_, Receipt, BlockSubmitted>, + coder: C, + ) -> FetchResult { + let header = extract.ru_header(host_block_number); + self.fetch_and_decode_with_coder(slot, extract, coder) + .await + .map(|buf| ZenithBlock::from_header_and_data(header, buf)) + } + + /// Fetch the blobs, decode them using [`SimpleCoder`], and construct a + /// Zenith block from the header and data. + pub async fn signet_block( + &self, + host_block_number: u64, + slot: usize, + extract: &ExtractedEvent<'_, Receipt, BlockSubmitted>, + ) -> FetchResult { + self.signet_block_with_coder(host_block_number, slot, extract, SimpleCoder::default()).await + } +} + +/// Retrieves blobs and stores them in a cache for later use. +pub struct BlobCacher { + fetcher: crate::BlobFetcher, + + cache: Mutex>, +} + +impl core::fmt::Debug for BlobCacher { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + f.debug_struct("BlobCacher").field("fetcher", &self.fetcher).finish_non_exhaustive() + } +} + +impl BlobCacher { + /// Creates a new `BlobCacher` with the provided extractor and cache size. + pub fn new(fetcher: crate::BlobFetcher) -> Self { + Self { fetcher, cache: LruMap::new(BLOB_CACHE_SIZE).into() } + } + + /// Fetches blobs for a given slot and transaction hash. + #[instrument(skip(self), target = "signet_blobber::BlobCacher", fields(retries = FETCH_RETRIES))] + async fn fetch_blobs( + &self, + slot: usize, + tx_hash: B256, + versioned_hashes: Vec, + ) -> FetchResult { + // Cache hit + if let Some(blobs) = self.cache.lock().unwrap().get(&(slot, tx_hash)) { + info!(target: "signet_blobber::BlobCacher", "Cache hit"); + return Ok(blobs.clone()); + } + + // Cache miss, use the fetcher to retrieve blobs + // Retry fetching blobs up to `FETCH_RETRIES` times + for attempt in 1..=FETCH_RETRIES { + let blobs = self.fetcher.fetch_blobs(slot, tx_hash, &versioned_hashes).await; + + match blobs { + Ok(blobs) => { + self.cache.lock().unwrap().insert((slot, tx_hash), blobs.clone()); + return Ok(blobs); + } + Err(BlobFetcherError::Ignorable(e)) => { + warn!(target: "signet_blobber::BlobCacher", attempt, %e, "Blob fetch attempt failed."); + tokio::time::sleep(BETWEEN_RETRIES).await; + continue; + } + Err(e) => return Err(e), // unrecoverable error + } + } + error!(target: "signet_blobber::BlobCacher", "All fetch attempts failed"); + Err(BlobFetcherError::missing_sidecar(tx_hash)) + } + + /// Processes the cache instructions. + async fn handle_inst(self: Arc, inst: CacheInst) { + match inst { + CacheInst::Retrieve { slot, tx_hash, version_hashes, resp } => { + if let Ok(blobs) = self.fetch_blobs(slot, tx_hash, version_hashes).await { + // if listener has gone away, that's okay, we just won't send the response + let _ = resp.send(blobs); + } + } + } + } + + async fn task_future(self: Arc, mut inst: mpsc::Receiver) { + while let Some(inst) = inst.recv().await { + let this = Arc::clone(&self); + tokio::spawn(async move { + this.handle_inst(inst).await; + }); + } + } + + /// Spawns the cache task to handle incoming instructions. + /// + /// # Panics + /// This function will panic if the cache task fails to spawn. + pub fn spawn(self) -> CacheHandle { + let (sender, inst) = mpsc::channel(CACHE_REQUEST_CHANNEL_SIZE); + tokio::spawn(Arc::new(self).task_future(inst)); + CacheHandle { sender } + } +} + +#[cfg(test)] +mod tests { + use crate::BlobFetcher; + + use super::*; + use alloy::{ + consensus::{SidecarBuilder, SignableTransaction as _, TxEip2930}, + eips::Encodable2718, + primitives::{TxKind, U256, bytes}, + rlp::encode, + signers::{SignerSync, local::PrivateKeySigner}, + }; + use init4_bin_base::utils::calc::SlotCalculator; + use reth::primitives::Transaction; + use reth_transaction_pool::{ + PoolTransaction, TransactionOrigin, + test_utils::{MockTransaction, testing_pool}, + }; + use signet_types::{constants::SignetSystemConstants, primitives::TransactionSigned}; + + #[tokio::test] + async fn test_fetch_from_pool() -> eyre::Result<()> { + let wallet = PrivateKeySigner::random(); + let pool = testing_pool(); + + let test = signet_constants::KnownChains::Test; + + let constants: SignetSystemConstants = test.try_into().unwrap(); + let calc = SlotCalculator::new(0, 0, 12); + + let explorer_url = "https://api.holesky.blobscan.com/"; + let client = reqwest::Client::builder().use_rustls_tls(); + + let tx = Transaction::Eip2930(TxEip2930 { + chain_id: 17001, + nonce: 2, + gas_limit: 50000, + gas_price: 1_500_000_000, + to: TxKind::Call(constants.host_zenith()), + value: U256::from(1_f64), + input: bytes!(""), + ..Default::default() + }); + + let encoded_transactions = + encode(vec![sign_tx_with_key_pair(wallet.clone(), tx).encoded_2718()]); + + let result = SidecarBuilder::::from_slice(&encoded_transactions).build(); + assert!(result.is_ok()); + + let mut mock_transaction = MockTransaction::eip4844_with_sidecar(result.unwrap().into()); + let transaction = + sign_tx_with_key_pair(wallet, Transaction::from(mock_transaction.clone())); + + mock_transaction.set_hash(*transaction.hash()); + + pool.add_transaction(TransactionOrigin::Local, mock_transaction.clone()).await?; + + // Spawn the cache + let cache = BlobFetcher::builder() + .with_pool(pool.clone()) + .with_explorer_url(explorer_url) + .with_client_builder(client) + .unwrap() + .with_slot_calculator(calc) + .build_cache()?; + let handle = cache.spawn(); + + let got = handle + .fetch_blobs( + 0, // this is ignored by the pool + *mock_transaction.hash(), + mock_transaction.blob_versioned_hashes().unwrap().to_owned(), + ) + .await; + assert!(got.is_ok()); + + let got_blobs = got.unwrap(); + assert!(got_blobs.len() == 1); + + Ok(()) + } + + fn sign_tx_with_key_pair(wallet: PrivateKeySigner, tx: Transaction) -> TransactionSigned { + let signature = wallet.sign_hash_sync(&tx.signature_hash()).unwrap(); + TransactionSigned::new_unhashed(tx, signature) + } +} diff --git a/crates/blobber/src/config.rs b/crates/blobber/src/config.rs index 85673fe..3ece3cd 100644 --- a/crates/blobber/src/config.rs +++ b/crates/blobber/src/config.rs @@ -4,7 +4,7 @@ use std::borrow::Cow; /// Configuration for the block extractor. #[derive(Debug, Clone, serde::Deserialize, FromEnv)] #[serde(rename_all = "camelCase")] -pub struct BlockExtractorConfig { +pub struct BlobFetcherConfig { /// URL of the blob explorer. #[from_env(var = "BLOB_EXPLORER_URL", desc = "URL of the blob explorer", infallible)] blob_explorer_url: Cow<'static, str>, @@ -18,8 +18,8 @@ pub struct BlockExtractorConfig { pylon_url: Option>, } -impl BlockExtractorConfig { - /// Create a new `BlockExtractorConfig` with default values. +impl BlobFetcherConfig { + /// Create a new `BlobFetcherConfig` with default values. pub const fn new(blob_explorer_url: Cow<'static, str>) -> Self { Self { blob_explorer_url, cl_url: None, pylon_url: None } } @@ -39,7 +39,7 @@ impl BlockExtractorConfig { self.pylon_url = Some(pylon_url); } - /// Create a new `BlockExtractorConfig` with the provided CL URL, Pylon URL, + /// Create a new `BlobFetcherConfig` with the provided CL URL, Pylon URL, pub fn cl_url(&self) -> Option<&str> { self.cl_url.as_deref() } diff --git a/crates/blobber/src/error.rs b/crates/blobber/src/error.rs index 87d7668..47b9ad9 100644 --- a/crates/blobber/src/error.rs +++ b/crates/blobber/src/error.rs @@ -1,10 +1,10 @@ use alloy::{eips::eip2718::Eip2718Error, primitives::B256}; use reth::transaction_pool::BlobStoreError; -/// Extraction Result -pub type ExtractionResult = std::result::Result; +/// Fetch Result +pub type FetchResult = std::result::Result; -/// Unrecoverable blob extraction errors. These result in the node shutting +/// Unrecoverable blob fetching errors. These result in the node shutting /// down. They occur when the blobstore is down or the sidecar is unretrievable. #[derive(Debug, thiserror::Error)] pub enum UnrecoverableBlobError { @@ -28,7 +28,7 @@ pub enum UnrecoverableBlobError { PylonClientUrlNotSet, } -/// Ignorable blob extraction errors. These result in the block being skipped. +/// Ignorable blob fetching errors. These result in the block being skipped. #[derive(Debug, thiserror::Error, Copy, Clone)] pub enum IgnorableBlobError { /// Incorrect transaction type error @@ -48,18 +48,18 @@ pub enum IgnorableBlobError { BlockDecodeError(#[from] Eip2718Error), } -/// Blob extraction errors +/// Blob fetching errors #[derive(Debug, thiserror::Error)] -pub enum BlockExtractionError { - /// Unrecoverable blob extraction error +pub enum BlobFetcherError { + /// Unrecoverable blob fetching error #[error(transparent)] Unrecoverable(#[from] UnrecoverableBlobError), - /// Ignorable blob extraction error + /// Ignorable blob fetching error #[error(transparent)] Ignorable(#[from] IgnorableBlobError), } -impl BlockExtractionError { +impl BlobFetcherError { /// Returns true if the error is ignorable pub const fn is_ignorable(&self) -> bool { matches!(self, Self::Ignorable(_)) @@ -110,20 +110,26 @@ impl From for UnrecoverableBlobError { } } -impl From for BlockExtractionError { +impl From for BlobFetcherError { fn from(err: BlobStoreError) -> Self { Self::Unrecoverable(err.into()) } } -impl From for BlockExtractionError { +impl From for BlobFetcherError { fn from(err: reqwest::Error) -> Self { Self::Unrecoverable(err.into()) } } -impl From for BlockExtractionError { +impl From for BlobFetcherError { fn from(err: Eip2718Error) -> Self { Self::Ignorable(err.into()) } } + +impl From for BlobFetcherError { + fn from(err: url::ParseError) -> Self { + Self::Unrecoverable(UnrecoverableBlobError::UrlParse(err)) + } +} diff --git a/crates/blobber/src/block_data.rs b/crates/blobber/src/fetch.rs similarity index 58% rename from crates/blobber/src/block_data.rs rename to crates/blobber/src/fetch.rs index 95ebc1a..8eefb55 100644 --- a/crates/blobber/src/block_data.rs +++ b/crates/blobber/src/fetch.rs @@ -1,9 +1,9 @@ use crate::{ - BlockExtractionError, BlockExtractorBuilder, ExtractionResult, error::UnrecoverableBlobError, - shim::ExtractableChainShim, + BlobFetcherBuilder, BlobFetcherError, FetchResult, error::UnrecoverableBlobError, + shim::ExtractableChainShim, utils::extract_blobs_from_bundle, }; use alloy::{ - consensus::{Blob, SidecarCoder, SimpleCoder}, + consensus::{Blob, SidecarCoder, SimpleCoder, Transaction as _}, eips::eip7594::BlobTransactionSidecarVariant, primitives::{B256, TxHash, keccak256}, }; @@ -14,7 +14,6 @@ use reth::{ }; use signet_extract::{ExtractedEvent, Extracts}; use signet_zenith::{Zenith::BlockSubmitted, ZenithBlock}; -use smallvec::SmallVec; use std::{ops::Deref, sync::Arc}; use tokio::select; use tracing::{error, instrument, trace}; @@ -23,7 +22,7 @@ use tracing::{error, instrument, trace}; /// external source. /// /// The contents are arc-wrapped to allow for cheap cloning. -#[derive(Debug, Clone)] +#[derive(Hash, Debug, Clone, PartialEq, Eq)] pub enum Blobs { /// Local pooled transaction sidecar FromPool(Arc), @@ -93,8 +92,7 @@ impl From> for Blobs { /// transactions. Decoder attempts to fetch from the Pool first and then /// queries an explorer if it can't find the blob. When Decoder does find a /// blob, it decodes it and returns the decoded transactions. -#[derive(Debug)] -pub struct BlockExtractor { +pub struct BlobFetcher { pool: Pool, explorer: foundry_blob_explorers::Client, client: reqwest::Client, @@ -103,14 +101,25 @@ pub struct BlockExtractor { slot_calculator: SlotCalculator, } -impl BlockExtractor<()> { - /// Returns a new [`BlockExtractorBuilder`]. - pub fn builder() -> BlockExtractorBuilder<()> { - BlockExtractorBuilder::default() +impl core::fmt::Debug for BlobFetcher { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + f.debug_struct("BlobFetcher") + .field("pool", &self.pool) + .field("explorer", &self.explorer.baseurl()) + .field("cl_url", &self.cl_url) + .field("pylon_url", &self.pylon_url) + .finish_non_exhaustive() } } -impl BlockExtractor +impl BlobFetcher<()> { + /// Returns a new [`BlobFetcherBuilder`]. + pub fn builder() -> BlobFetcherBuilder<()> { + BlobFetcherBuilder::default() + } +} + +impl BlobFetcher where Pool: TransactionPool, { @@ -130,62 +139,65 @@ where /// searching for the expected hash async fn get_and_decode_blobs( &self, + slot: usize, extract: &ExtractedEvent<'_, Receipt, BlockSubmitted>, - slot: u64, - ) -> ExtractionResult> { + ) -> FetchResult> { debug_assert!(extract.tx.is_eip4844(), "Transaction must be of type EIP-4844"); let hash = extract.tx.tx_hash(); - let bz = self.fetch_blobs(extract, slot).await?; + let versioned_hashes = extract + .tx + .as_eip4844() + .expect("tx is eip4844") + .blob_versioned_hashes() + .expect("tx is eip4844"); + let bz = self.fetch_blobs(slot, extract.tx_hash(), versioned_hashes).await?; SimpleCoder::default() .decode_all(bz.as_ref()) - .ok_or_else(BlockExtractionError::blob_decode_error)? + .ok_or_else(BlobFetcherError::blob_decode_error)? .into_iter() .find(|data| keccak256(data) == extract.block_data_hash()) - .ok_or_else(|| BlockExtractionError::block_data_not_found(*hash)) + .ok_or_else(|| BlobFetcherError::block_data_not_found(*hash)) } /// Fetch blobs from the local txpool, or fall back to remote sources - async fn fetch_blobs( + #[instrument(skip(self, versioned_hashes))] + pub(crate) async fn fetch_blobs( &self, - extract: &ExtractedEvent<'_, Receipt, BlockSubmitted>, - slot: u64, - ) -> ExtractionResult { - let hash = extract.tx_hash(); - - if let Ok(blobs) = self.get_blobs_from_pool(hash) { + slot: usize, + tx_hash: B256, + versioned_hashes: &[B256], + ) -> FetchResult { + if let Ok(blobs) = self.get_blobs_from_pool(tx_hash) { return Ok(blobs); } // if the pool doesn't have it, reach out to other sources // and return the first successful response select! { - Ok(blobs) = self.get_blobs_from_explorer(hash) => { + Ok(blobs) = self.get_blobs_from_explorer(tx_hash) => { Ok(blobs) } - Ok(blobs) = self.get_blobs_from_cl(extract, slot) => { + Ok(blobs) = self.get_blobs_from_cl(slot, versioned_hashes) => { Ok(blobs) } - Ok(blobs) = self.get_blobs_from_pylon(hash) => { + Ok(blobs) = self.get_blobs_from_pylon(tx_hash) => { Ok(blobs) } else => { - error!(%hash, "Blobs not available from any source"); - Err(BlockExtractionError::missing_sidecar(hash)) + error!(%tx_hash, "Blobs not available from any source"); + Err(BlobFetcherError::missing_sidecar(tx_hash)) } } } /// Return a blob from the local pool or an error - fn get_blobs_from_pool(&self, tx: TxHash) -> ExtractionResult { - self.pool - .get_blob(tx)? - .map(Into::into) - .ok_or_else(|| BlockExtractionError::missing_sidecar(tx)) + fn get_blobs_from_pool(&self, tx: TxHash) -> FetchResult { + self.pool.get_blob(tx)?.map(Into::into).ok_or_else(|| BlobFetcherError::missing_sidecar(tx)) } /// Returns the blob from the explorer - async fn get_blobs_from_explorer(&self, tx: TxHash) -> ExtractionResult { + async fn get_blobs_from_explorer(&self, tx: TxHash) -> FetchResult { let sidecar = self.explorer.transaction(tx).await?; let blobs: Blobs = sidecar.blobs.iter().map(|b| *b.data).collect(); debug_assert!(!blobs.is_empty(), "Explorer returned no blobs"); @@ -194,11 +206,9 @@ where /// Returns the blob from the pylon blob indexer. #[instrument(skip_all, err)] - async fn get_blobs_from_pylon(&self, tx: TxHash) -> ExtractionResult { + async fn get_blobs_from_pylon(&self, tx: TxHash) -> FetchResult { if let Some(url) = &self.pylon_url { - let url = url.join(&format!("sidecar/{tx}")).map_err(|err| { - BlockExtractionError::Unrecoverable(UnrecoverableBlobError::UrlParse(err)) - })?; + let url = url.join(&format!("sidecar/{tx}"))?; let response = self.client.get(url).header("accept", "application/json").send().await?; response @@ -207,9 +217,7 @@ where .map(Into::into) .map_err(Into::into) } else { - Err(BlockExtractionError::Unrecoverable( - UnrecoverableBlobError::ConsensusClientUrlNotSet, - )) + Err(BlobFetcherError::Unrecoverable(UnrecoverableBlobError::ConsensusClientUrlNotSet)) } } @@ -217,23 +225,21 @@ where #[instrument(skip_all, err)] async fn get_blobs_from_cl( &self, - extract: &ExtractedEvent<'_, Receipt, BlockSubmitted>, - slot: u64, - ) -> ExtractionResult { + slot: usize, + versioned_hashes: &[B256], + ) -> FetchResult { if let Some(url) = &self.cl_url { let url = url.join(&format!("/eth/v1/beacon/blob_sidecars/{slot}")).map_err(|err| { - BlockExtractionError::Unrecoverable(UnrecoverableBlobError::UrlParse(err)) + BlobFetcherError::Unrecoverable(UnrecoverableBlobError::UrlParse(err)) })?; let response = self.client.get(url).header("accept", "application/json").send().await?; let response: BeaconBlobBundle = response.json().await?; - extract_blobs_from_bundle(response, extract) + extract_blobs_from_bundle(response, versioned_hashes) } else { - Err(BlockExtractionError::Unrecoverable( - UnrecoverableBlobError::ConsensusClientUrlNotSet, - )) + Err(BlobFetcherError::Unrecoverable(UnrecoverableBlobError::ConsensusClientUrlNotSet)) } } @@ -246,9 +252,9 @@ where extract: &ExtractedEvent<'_, Receipt, BlockSubmitted>, host_block_number: u64, host_block_timestamp: u64, - ) -> ExtractionResult { + ) -> FetchResult { if !extract.is_eip4844() { - return Err(BlockExtractionError::non_4844_transaction()); + return Err(BlobFetcherError::non_4844_transaction()); } let header = extract.ru_header(host_block_number); @@ -258,7 +264,7 @@ where .slot_ending_at(host_block_timestamp) .expect("host chain has started"); - let block_data = self.get_and_decode_blobs(extract, slot as u64).await?; + let block_data = self.get_and_decode_blobs(slot, extract).await?; Ok(ZenithBlock::from_header_and_data(header, block_data)) } @@ -273,7 +279,7 @@ where pub async fn block_from_outputs( &self, outputs: &Extracts<'_, ExtractableChainShim<'_>>, - ) -> ExtractionResult> { + ) -> FetchResult> { if !outputs.contains_block() { return Ok(None); } @@ -298,110 +304,22 @@ where } } -/// Extracts the blobs from the [`BeaconBlobBundle`], and returns the blobs that match the versioned hashes in the transaction. -/// This also dedups any duplicate blobs if a builder lands the same blob multiple times in a block. -fn extract_blobs_from_bundle( - bundle: BeaconBlobBundle, - extract: &ExtractedEvent<'_, Receipt, BlockSubmitted>, -) -> ExtractionResult { - let mut blobs = vec![]; - // NB: There can be, at most, 9 blobs per block from Pectra forwards. We'll never need more space than this, unless blob capacity is increased again or made dynamic. - let mut seen_versioned_hashes: SmallVec<[B256; 9]> = SmallVec::new(); - - // NB: This is already checked and we know it's an EIP-4844 transaction. - let tx = extract.tx.as_eip4844().unwrap(); - - for item in bundle.data.iter() { - let versioned_hash = - alloy::eips::eip4844::kzg_to_versioned_hash(item.kzg_commitment.as_ref()); - - if tx.tx().blob_versioned_hashes.contains(&versioned_hash) - && !seen_versioned_hashes.contains(&versioned_hash) - { - blobs.push(*item.blob); - seen_versioned_hashes.push(versioned_hash); - } - } - - Ok(blobs.into()) -} - #[cfg(test)] mod tests { use super::*; use alloy::{ - consensus::{ - BlobTransactionSidecar, SidecarBuilder, SignableTransaction, TxEip2930, TxEnvelope, - }, + consensus::{SidecarBuilder, SignableTransaction as _, TxEip2930}, eips::Encodable2718, - primitives::{Address, TxKind, U256, bytes}, + primitives::{TxKind, U256, bytes}, rlp::encode, signers::{SignerSync, local::PrivateKeySigner}, }; - use foundry_blob_explorers::TransactionDetails; - use reth::primitives::{Transaction, TransactionSigned}; + use reth::primitives::Transaction; use reth_transaction_pool::{ PoolTransaction, TransactionOrigin, test_utils::{MockTransaction, testing_pool}, }; - use signet_types::constants::SignetSystemConstants; - - const BLOBSCAN_BLOB_RESPONSE: &str = include_str!("../../../tests/artifacts/blob.json"); - /// Blob from Slot 2277733, corresponding to block 277722 on Pecorino host. - const CL_BLOB_RESPONSE: &str = include_str!("../../../tests/artifacts/cl_blob.json"); - /// EIP4844 blob tx with hash 0x73d1c682fae85c761528a0a7ec22fac613b25ede87b80f0ac052107f3444324f, - /// corresponding to blob sent to block 277722 on Pecorino host. - const CL_BLOB_TX: &str = include_str!("../../../tests/artifacts/cl_blob_tx.json"); - /// Blob sidecar from Pylon, corresponding to block 277722 on Pecorino host. - const PYLON_BLOB_RESPONSE: &str = include_str!("../../../tests/artifacts/pylon_blob.json"); - - #[test] - fn test_process_blob_extraction() { - let bundle: BeaconBlobBundle = serde_json::from_str(CL_BLOB_RESPONSE).unwrap(); - let tx: TxEnvelope = serde_json::from_str::(CL_BLOB_TX).unwrap(); - let tx: TransactionSigned = tx.into(); - - let extract = ExtractedEvent::<'_, Receipt, BlockSubmitted> { - tx: &tx, - receipt: &Receipt::default(), - log_index: 0, - event: BlockSubmitted { - sequencer: Address::ZERO, - rollupChainId: U256::ZERO, - gasLimit: U256::ZERO, - rewardAddress: Address::ZERO, - blockDataHash: B256::ZERO, - }, - }; - - // Extract the blobs from the CL beacon blob bundle. - let cl_blobs = extract_blobs_from_bundle(bundle, &extract).unwrap(); - assert_eq!(cl_blobs.len(), 1); - - // Now, process the pylon blobs which come in a [`BlobTransactionSidecar`]. - // NB: this should be changes to `BlobTransactionSidecarVariant` in the - // future. After https://github.com/alloy-rs/alloy/pull/2713 - // The json is definitely a `BlobTransactionSidecar`, so we can - // deserialize it directly and it doesn't really matter much. - let sidecar: BlobTransactionSidecar = - serde_json::from_str::(PYLON_BLOB_RESPONSE).unwrap(); - let pylon_blobs: Blobs = Arc::::new(sidecar.into()).into(); - - // Make sure that both blob sources have the same blobs after being processed. - assert_eq!(cl_blobs.len(), pylon_blobs.len()); - assert_eq!(cl_blobs.as_slice(), pylon_blobs.as_slice()); - - // Make sure both can be decoded - let cl_decoded = SimpleCoder::default().decode_all(cl_blobs.as_ref()).unwrap(); - let pylon_decoded = SimpleCoder::default().decode_all(pylon_blobs.as_ref()).unwrap(); - assert_eq!(cl_decoded.len(), pylon_decoded.len()); - assert_eq!(cl_decoded, pylon_decoded); - } - - #[test] - fn test_deser_blob() { - let _: TransactionDetails = serde_json::from_str(BLOBSCAN_BLOB_RESPONSE).unwrap(); - } + use signet_types::{constants::SignetSystemConstants, primitives::TransactionSigned}; #[tokio::test] async fn test_fetch_from_pool() -> eyre::Result<()> { @@ -416,7 +334,7 @@ mod tests { let explorer_url = "https://api.holesky.blobscan.com/"; let client = reqwest::Client::builder().use_rustls_tls(); - let extractor = BlockExtractor::builder() + let extractor = BlobFetcher::builder() .with_pool(pool.clone()) .with_explorer_url(explorer_url) .with_client_builder(client) diff --git a/crates/blobber/src/lib.rs b/crates/blobber/src/lib.rs index e01c823..f4717f4 100644 --- a/crates/blobber/src/lib.rs +++ b/crates/blobber/src/lib.rs @@ -11,17 +11,34 @@ #![deny(unused_must_use, rust_2018_idioms)] #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] -mod block_data; -pub use block_data::{Blobs, BlockExtractor}; - mod builder; -pub use builder::{BlockExtractorBuilder, BuilderError as BlockExtractorBuilderError}; +pub use builder::{BlobFetcherBuilder, BuilderError as BlobFetcherBuilderError}; + +mod cache; +pub use cache::{BlobCacher, CacheHandle}; mod config; -pub use config::BlockExtractorConfig; +pub use config::BlobFetcherConfig; mod error; -pub use error::{BlockExtractionError, ExtractionResult}; +pub use error::{BlobFetcherError, FetchResult}; + +mod fetch; +pub use fetch::{BlobFetcher, Blobs}; mod shim; pub use shim::ExtractableChainShim; + +pub(crate) mod utils; + +#[cfg(test)] +mod test { + use crate::utils::tests::BLOBSCAN_BLOB_RESPONSE; + use foundry_blob_explorers::TransactionDetails; + + // Sanity check on dependency compatibility. + #[test] + fn test_deser_blob() { + let _: TransactionDetails = serde_json::from_str(BLOBSCAN_BLOB_RESPONSE).unwrap(); + } +} diff --git a/crates/blobber/src/utils.rs b/crates/blobber/src/utils.rs new file mode 100644 index 0000000..71d27be --- /dev/null +++ b/crates/blobber/src/utils.rs @@ -0,0 +1,87 @@ +use crate::{Blobs, FetchResult}; +use alloy::{ + eips::{eip4844::kzg_to_versioned_hash, eip7691::MAX_BLOBS_PER_BLOCK_ELECTRA}, + primitives::B256, +}; +use reth::rpc::types::beacon::sidecar::BeaconBlobBundle; +use smallvec::SmallVec; + +/// Extracts the blobs from the [`BeaconBlobBundle`], and returns the blobs that match the versioned hashes in the transaction. +/// This also dedups any duplicate blobs if a builder lands the same blob multiple times in a block. +pub(crate) fn extract_blobs_from_bundle( + bundle: BeaconBlobBundle, + versioned_hashes: &[B256], +) -> FetchResult { + let mut blobs = vec![]; + // NB: There can be, at most, 9 blobs per block from Pectra forwards. We'll never need more space than this, unless blob capacity is increased again or made dynamic. + let mut seen_versioned_hashes: SmallVec<[B256; MAX_BLOBS_PER_BLOCK_ELECTRA as usize]> = + SmallVec::new(); + + for item in bundle.data.iter() { + let versioned_hash = kzg_to_versioned_hash(item.kzg_commitment.as_ref()); + + if versioned_hashes.contains(&versioned_hash) + && !seen_versioned_hashes.contains(&versioned_hash) + { + blobs.push(*item.blob); + seen_versioned_hashes.push(versioned_hash); + } + } + + Ok(blobs.into()) +} + +#[cfg(test)] +pub(crate) mod tests { + + use super::*; + use alloy::{ + consensus::{BlobTransactionSidecar, SidecarCoder, SimpleCoder, Transaction, TxEnvelope}, + eips::eip7594::BlobTransactionSidecarVariant, + }; + use signet_types::primitives::TransactionSigned; + use std::sync::Arc; + + pub(crate) const BLOBSCAN_BLOB_RESPONSE: &str = + include_str!("../../../tests/artifacts/blob.json"); + /// Blob from Slot 2277733, corresponding to block 277722 on Pecorino host. + pub(crate) const CL_BLOB_RESPONSE: &str = include_str!("../../../tests/artifacts/cl_blob.json"); + /// EIP4844 blob tx with hash 0x73d1c682fae85c761528a0a7ec22fac613b25ede87b80f0ac052107f3444324f, + /// corresponding to blob sent to block 277722 on Pecorino host. + pub(crate) const CL_BLOB_TX: &str = include_str!("../../../tests/artifacts/cl_blob_tx.json"); + /// Blob sidecar from Pylon, corresponding to block 277722 on Pecorino host. + pub(crate) const PYLON_BLOB_RESPONSE: &str = + include_str!("../../../tests/artifacts/pylon_blob.json"); + + #[test] + fn test_process_blob_extraction() { + let bundle: BeaconBlobBundle = serde_json::from_str(CL_BLOB_RESPONSE).unwrap(); + let tx: TxEnvelope = serde_json::from_str::(CL_BLOB_TX).unwrap(); + let tx: TransactionSigned = tx.into(); + + let versioned_hashes = tx.blob_versioned_hashes().unwrap().to_owned(); + + // Extract the blobs from the CL beacon blob bundle. + let cl_blobs = extract_blobs_from_bundle(bundle, &versioned_hashes).unwrap(); + assert_eq!(cl_blobs.len(), 1); + + // Now, process the pylon blobs which come in a [`BlobTransactionSidecar`]. + // NB: this should be changes to `BlobTransactionSidecarVariant` in the + // future. After https://github.com/alloy-rs/alloy/pull/2713 + // The json is definitely a `BlobTransactionSidecar`, so we can + // deserialize it directly and it doesn't really matter much. + let sidecar: BlobTransactionSidecar = + serde_json::from_str::(PYLON_BLOB_RESPONSE).unwrap(); + let pylon_blobs: Blobs = Arc::::new(sidecar.into()).into(); + + // Make sure that both blob sources have the same blobs after being processed. + assert_eq!(cl_blobs.len(), pylon_blobs.len()); + assert_eq!(cl_blobs.as_slice(), pylon_blobs.as_slice()); + + // Make sure both can be decoded + let cl_decoded = SimpleCoder::default().decode_all(cl_blobs.as_ref()).unwrap(); + let pylon_decoded = SimpleCoder::default().decode_all(pylon_blobs.as_ref()).unwrap(); + assert_eq!(cl_decoded.len(), pylon_decoded.len()); + assert_eq!(cl_decoded, pylon_decoded); + } +}