From c2b10514d6d595de1bff4c3a2d028b7366656b11 Mon Sep 17 00:00:00 2001 From: James Date: Mon, 4 Aug 2025 12:47:44 -0400 Subject: [PATCH 1/5] feat: blob cache --- README.md | 44 +++--- crates/blobber/README.md | 6 +- crates/blobber/src/builder.rs | 45 +++--- crates/blobber/src/cache.rs | 138 +++++++++++++++++ crates/blobber/src/config.rs | 8 +- crates/blobber/src/error.rs | 30 ++-- .../blobber/src/{block_data.rs => fetch.rs} | 139 ++++++++---------- crates/blobber/src/lib.rs | 15 +- 8 files changed, 289 insertions(+), 136 deletions(-) create mode 100644 crates/blobber/src/cache.rs rename crates/blobber/src/{block_data.rs => fetch.rs} (78%) diff --git a/README.md b/README.md index f21b46d..0464bc1 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,32 @@ A collection of components for building the Signet node. These components implement core node functionality, but are potentially indepedently useful. -### What's new in Signet? +## What's in the Components? + +- **signet-node-types** - Shim types wrapping reth's internal node types + system to make it more usable in Signet. +- **signet-blobber** - Blob retrieval and parsing, using blob explorers, + Signet's Pylon, and the local node transaction API. +- **signet-rpc** - An Ethereum JSON-RPC Server for Signet nodes. Makes heavy + use of reth internals. +- **signet-db** - An extension of reth's database, providing a Signet-specific + database schema and utilities for working with Signet blocks and transactions. + +### Contributing to the Node Components + +Please see [CONTRIBUTING.md](CONTRIBUTING.md). + +[Signet docs]: https://docs.signet.sh + +## Note on Semver + +This repo is UNPUBLISHED and may NOT respect semantic versioning between tagged +versions. In general, it is versioned to match the signet-sdk version with +which it is compatible. I.e. `node-components@0.8.x` is expected to be +compatible with any signet-sdk `0.8.x` version. However, a release of +`node-components@0.8.1` may have breaking changes from `node-components@0.8.0`. + +## What's new in Signet? Signet is a pragmatic Ethereum rollup that offers a new set of ideas and aims to radically modernize rollup technology. @@ -22,20 +47,3 @@ knowledge. Signet does not have a native token. Signet is just a rollup. See the [Signet docs] for more info. - -### What's in the Components? - -- **signet-node-types** - Shim types wrapping reth's internal node types - system to make it more usable in Signet. -- **signet-blobber** - Blob retrieval and parsing, using blob explorers, - Signet's Pylon, and the local node transaction API. -- **signet-rpc** - An Ethereum JSON-RPC Server for Signet nodes. Makes heavy - use of reth internals. -- **signet-db** - An extension of reth's database, providing a Signet-specific - database schema and utilities for working with Signet blocks and transactions. - -### Contributing to the Node Components - -Please see [CONTRIBUTING.md](CONTRIBUTING.md). - -[Signet docs]: https://docs.signet.sh diff --git a/crates/blobber/README.md b/crates/blobber/README.md index 28e304c..717c64f 100644 --- a/crates/blobber/README.md +++ b/crates/blobber/README.md @@ -1,9 +1,13 @@ # Block Extractor -The [`BlockExtractor`] retrieves blobs from host chain blocks and parses them +The [`BlobFetcher`] retrieves blobs from host chain blocks and parses them into [`ZenithBlock`]s. It is used by the node during notification processing when a [`Zenith::BlockSubmitted`] event is extracted from a host chain block. +The [`BlobCacher`] is a wrapper around the [`BlobFetcher`] that caches +blobs in an in-memory cache. It is used to avoid fetching the same blob and to +manage retry logic during fetching. + ## Data Sources The following sources can be configured: diff --git a/crates/blobber/src/builder.rs b/crates/blobber/src/builder.rs index 074a8bc..2ab29cf 100644 --- a/crates/blobber/src/builder.rs +++ b/crates/blobber/src/builder.rs @@ -1,10 +1,10 @@ -use crate::{BlockExtractorConfig, block_data::BlockExtractor}; +use crate::{BlobCacher, BlobFetcher, BlobFetcherConfig}; use init4_bin_base::utils::calc::SlotCalculator; use reth::transaction_pool::TransactionPool; use url::Url; -/// Errors that can occur while building the [`BlockExtractor`] with a -/// [`BlockExtractorBuilder`]. +/// Errors that can occur while building the [`BlobFetcher`] with a +/// [`BlobFetcherBuilder`]. #[derive(Debug, thiserror::Error)] pub enum BuilderError { /// The transaction pool was not provided. @@ -27,9 +27,9 @@ pub enum BuilderError { MissingSlotCalculator, } -/// Builder for the [`BlockExtractor`]. +/// Builder for the [`BlobFetcher`]. #[derive(Debug, Default, Clone)] -pub struct BlockExtractorBuilder { +pub struct BlobFetcherBuilder { pool: Option, explorer_url: Option, client: Option, @@ -38,10 +38,10 @@ pub struct BlockExtractorBuilder { slot_calculator: Option, } -impl BlockExtractorBuilder { +impl BlobFetcherBuilder { /// Set the transaction pool to use for the extractor. - pub fn with_pool(self, pool: P2) -> BlockExtractorBuilder { - BlockExtractorBuilder { + pub fn with_pool(self, pool: P2) -> BlobFetcherBuilder { + BlobFetcherBuilder { pool: Some(pool), explorer_url: self.explorer_url, client: self.client, @@ -53,15 +53,13 @@ impl BlockExtractorBuilder { /// Set the transaction pool to use a mock test pool. #[cfg(feature = "test-utils")] - pub fn with_test_pool( - self, - ) -> BlockExtractorBuilder { + pub fn with_test_pool(self) -> BlobFetcherBuilder { self.with_pool(reth_transaction_pool::test_utils::testing_pool()) } /// Set the configuration for the CL url, pylon url, from the provided - /// [`BlockExtractorConfig`]. - pub fn with_config(self, config: &BlockExtractorConfig) -> Result { + /// [`BlobFetcherConfig`]. + pub fn with_config(self, config: &BlobFetcherConfig) -> Result { let this = self.with_explorer_url(config.blob_explorer_url()); let this = if let Some(cl_url) = config.cl_url() { this.with_cl_url(cl_url)? } else { this }; @@ -114,22 +112,22 @@ impl BlockExtractorBuilder { pub const fn with_slot_calculator( mut self, slot_calculator: SlotCalculator, - ) -> BlockExtractorBuilder { + ) -> BlobFetcherBuilder { self.slot_calculator = Some(slot_calculator); self } /// Set the slot calculator to use for the extractor, using the Pecornino /// host configuration. - pub const fn with_pecornino_slots(mut self) -> BlockExtractorBuilder { + pub const fn with_pecornino_slots(mut self) -> BlobFetcherBuilder { self.slot_calculator = Some(SlotCalculator::pecorino_host()); self } } -impl BlockExtractorBuilder { - /// Build the [`BlockExtractor`] with the provided parameters. - pub fn build(self) -> Result, BuilderError> { +impl BlobFetcherBuilder { + /// Build the [`BlobFetcher`] with the provided parameters. + pub fn build(self) -> Result, BuilderError> { let pool = self.pool.ok_or(BuilderError::MissingPool)?; let explorer_url = self.explorer_url.ok_or(BuilderError::MissingExplorerUrl)?; @@ -145,7 +143,16 @@ impl BlockExtractorBuilder { let slot_calculator = self.slot_calculator.ok_or(BuilderError::MissingSlotCalculator)?; - Ok(BlockExtractor::new(pool, explorer, client, cl_url, pylon_url, slot_calculator)) + Ok(BlobFetcher::new(pool, explorer, client, cl_url, pylon_url, slot_calculator)) + } + + /// Build a [`BlobCacher`] with the provided parameters. + pub fn build_cache(self) -> Result, BuilderError> + where + Pool: 'static, + { + let fetcher = self.build()?; + Ok(BlobCacher::new(fetcher)) } } diff --git a/crates/blobber/src/cache.rs b/crates/blobber/src/cache.rs new file mode 100644 index 0000000..603e431 --- /dev/null +++ b/crates/blobber/src/cache.rs @@ -0,0 +1,138 @@ +use crate::{BlobFetcherError, Blobs, FetchResult}; +use alloy::primitives::B256; +use reth::network::cache::LruMap; +use reth::transaction_pool::TransactionPool; +use std::{ + sync::{Arc, Mutex}, + time::Duration, +}; +use tokio::sync::{mpsc, oneshot}; +use tracing::{error, info, instrument, warn}; + +const BLOB_CACHE_SIZE: u32 = 144; +const FETCH_RETRIES: usize = 3; +const BETWEEN_RETRIES: Duration = Duration::from_millis(250); + +/// Instructions for the cache. +/// +/// These instructions are sent to the cache handle to perform operations like +/// retrieving blobs. +#[derive(Debug)] +enum CacheInst { + Retrieve { slot: u64, tx_hash: B256, version_hashes: Vec, resp: oneshot::Sender }, +} + +/// Handle for the cache. +#[derive(Debug, Clone)] +pub struct CacheHandle { + sender: mpsc::Sender, +} + +impl CacheHandle { + /// Sends a cache instruction. + async fn send(&self, inst: CacheInst) { + let _ = self.sender.send(inst).await; + } + + /// Fetches blobs from the cache. This triggers a background task to + /// fetch blobs if they are not found in the cache. + pub async fn fetch_blobs( + &self, + slot: u64, + tx_hash: B256, + version_hashes: Vec, + ) -> FetchResult { + let (resp, receiver) = oneshot::channel(); + + self.send(CacheInst::Retrieve { slot, tx_hash, version_hashes, resp }).await; + + receiver.await.map_err(|_| BlobFetcherError::missing_sidecar(tx_hash)) + } +} + +/// Retrieves blobs and stores them in a cache for later use. +pub struct BlobCacher { + fetcher: crate::BlobFetcher, + + cache: Mutex>, +} + +impl core::fmt::Debug for BlobCacher { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + f.debug_struct("BlobCacher").field("fetcher", &self.fetcher).finish_non_exhaustive() + } +} + +impl BlobCacher { + /// Creates a new `BlobCacher` with the provided extractor and cache size. + pub fn new(fetcher: crate::BlobFetcher) -> Self { + Self { fetcher, cache: LruMap::new(BLOB_CACHE_SIZE).into() } + } + + /// Fetches blobs for a given slot and transaction hash. + #[instrument(skip(self), target = "signet_blobber::BlobCacher", fields(retries = FETCH_RETRIES))] + async fn fetch_blobs( + &self, + slot: u64, + tx_hash: B256, + versioned_hashes: Vec, + ) -> FetchResult { + // Cache hit + if let Some(blobs) = self.cache.lock().unwrap().get(&(slot, tx_hash)) { + info!(target: "signet_blobber::BlobCacher", "Cache hit"); + return Ok(blobs.clone()); + } + + // Cache miss, use the fetcher to retrieve blobs + // Retry fetching blobs up to `FETCH_RETRIES` times + for attempt in 1..=FETCH_RETRIES { + let blobs = self.fetcher.fetch_blobs(slot, tx_hash, &versioned_hashes).await; + + match blobs { + Ok(blobs) => { + self.cache.lock().unwrap().insert((slot, tx_hash), blobs.clone()); + return Ok(blobs); + } + Err(BlobFetcherError::Ignorable(e)) => { + warn!(target: "signet_blobber::BlobCacher", attempt, %e, "Blob fetch attempt failed."); + tokio::time::sleep(BETWEEN_RETRIES).await; + continue; + } + Err(e) => return Err(e), // unrecoverable error + } + } + error!(target: "signet_blobber::BlobCacher", "All fetch attempts failed"); + Err(BlobFetcherError::missing_sidecar(tx_hash)) + } + + /// Processes the cache instructions. + async fn handle_inst(self: Arc, inst: CacheInst) { + match inst { + CacheInst::Retrieve { slot, tx_hash, version_hashes, resp } => { + if let Ok(blobs) = self.fetch_blobs(slot, tx_hash, version_hashes).await { + // if listener has gone away, that's okay, we just won't send the response + let _ = resp.send(blobs); + } + } + } + } + + async fn task_future(self: Arc, mut inst: mpsc::Receiver) { + while let Some(inst) = inst.recv().await { + let this = Arc::clone(&self); + tokio::spawn(async move { + this.handle_inst(inst).await; + }); + } + } + + /// Spawns the cache task to handle incoming instructions. + /// + /// # Panics + /// This function will panic if the cache task fails to spawn. + pub fn spawn(self) -> CacheHandle { + let (sender, inst) = mpsc::channel(12); + tokio::spawn(Arc::new(self).task_future(inst)); + CacheHandle { sender } + } +} diff --git a/crates/blobber/src/config.rs b/crates/blobber/src/config.rs index 85673fe..3ece3cd 100644 --- a/crates/blobber/src/config.rs +++ b/crates/blobber/src/config.rs @@ -4,7 +4,7 @@ use std::borrow::Cow; /// Configuration for the block extractor. #[derive(Debug, Clone, serde::Deserialize, FromEnv)] #[serde(rename_all = "camelCase")] -pub struct BlockExtractorConfig { +pub struct BlobFetcherConfig { /// URL of the blob explorer. #[from_env(var = "BLOB_EXPLORER_URL", desc = "URL of the blob explorer", infallible)] blob_explorer_url: Cow<'static, str>, @@ -18,8 +18,8 @@ pub struct BlockExtractorConfig { pylon_url: Option>, } -impl BlockExtractorConfig { - /// Create a new `BlockExtractorConfig` with default values. +impl BlobFetcherConfig { + /// Create a new `BlobFetcherConfig` with default values. pub const fn new(blob_explorer_url: Cow<'static, str>) -> Self { Self { blob_explorer_url, cl_url: None, pylon_url: None } } @@ -39,7 +39,7 @@ impl BlockExtractorConfig { self.pylon_url = Some(pylon_url); } - /// Create a new `BlockExtractorConfig` with the provided CL URL, Pylon URL, + /// Create a new `BlobFetcherConfig` with the provided CL URL, Pylon URL, pub fn cl_url(&self) -> Option<&str> { self.cl_url.as_deref() } diff --git a/crates/blobber/src/error.rs b/crates/blobber/src/error.rs index 87d7668..47b9ad9 100644 --- a/crates/blobber/src/error.rs +++ b/crates/blobber/src/error.rs @@ -1,10 +1,10 @@ use alloy::{eips::eip2718::Eip2718Error, primitives::B256}; use reth::transaction_pool::BlobStoreError; -/// Extraction Result -pub type ExtractionResult = std::result::Result; +/// Fetch Result +pub type FetchResult = std::result::Result; -/// Unrecoverable blob extraction errors. These result in the node shutting +/// Unrecoverable blob fetching errors. These result in the node shutting /// down. They occur when the blobstore is down or the sidecar is unretrievable. #[derive(Debug, thiserror::Error)] pub enum UnrecoverableBlobError { @@ -28,7 +28,7 @@ pub enum UnrecoverableBlobError { PylonClientUrlNotSet, } -/// Ignorable blob extraction errors. These result in the block being skipped. +/// Ignorable blob fetching errors. These result in the block being skipped. #[derive(Debug, thiserror::Error, Copy, Clone)] pub enum IgnorableBlobError { /// Incorrect transaction type error @@ -48,18 +48,18 @@ pub enum IgnorableBlobError { BlockDecodeError(#[from] Eip2718Error), } -/// Blob extraction errors +/// Blob fetching errors #[derive(Debug, thiserror::Error)] -pub enum BlockExtractionError { - /// Unrecoverable blob extraction error +pub enum BlobFetcherError { + /// Unrecoverable blob fetching error #[error(transparent)] Unrecoverable(#[from] UnrecoverableBlobError), - /// Ignorable blob extraction error + /// Ignorable blob fetching error #[error(transparent)] Ignorable(#[from] IgnorableBlobError), } -impl BlockExtractionError { +impl BlobFetcherError { /// Returns true if the error is ignorable pub const fn is_ignorable(&self) -> bool { matches!(self, Self::Ignorable(_)) @@ -110,20 +110,26 @@ impl From for UnrecoverableBlobError { } } -impl From for BlockExtractionError { +impl From for BlobFetcherError { fn from(err: BlobStoreError) -> Self { Self::Unrecoverable(err.into()) } } -impl From for BlockExtractionError { +impl From for BlobFetcherError { fn from(err: reqwest::Error) -> Self { Self::Unrecoverable(err.into()) } } -impl From for BlockExtractionError { +impl From for BlobFetcherError { fn from(err: Eip2718Error) -> Self { Self::Ignorable(err.into()) } } + +impl From for BlobFetcherError { + fn from(err: url::ParseError) -> Self { + Self::Unrecoverable(UnrecoverableBlobError::UrlParse(err)) + } +} diff --git a/crates/blobber/src/block_data.rs b/crates/blobber/src/fetch.rs similarity index 78% rename from crates/blobber/src/block_data.rs rename to crates/blobber/src/fetch.rs index 95ebc1a..499375d 100644 --- a/crates/blobber/src/block_data.rs +++ b/crates/blobber/src/fetch.rs @@ -1,9 +1,9 @@ use crate::{ - BlockExtractionError, BlockExtractorBuilder, ExtractionResult, error::UnrecoverableBlobError, + BlobFetcherBuilder, BlobFetcherError, FetchResult, error::UnrecoverableBlobError, shim::ExtractableChainShim, }; use alloy::{ - consensus::{Blob, SidecarCoder, SimpleCoder}, + consensus::{Blob, SidecarCoder, SimpleCoder, Transaction as _}, eips::eip7594::BlobTransactionSidecarVariant, primitives::{B256, TxHash, keccak256}, }; @@ -23,7 +23,7 @@ use tracing::{error, instrument, trace}; /// external source. /// /// The contents are arc-wrapped to allow for cheap cloning. -#[derive(Debug, Clone)] +#[derive(Hash, Debug, Clone, PartialEq, Eq)] pub enum Blobs { /// Local pooled transaction sidecar FromPool(Arc), @@ -93,8 +93,7 @@ impl From> for Blobs { /// transactions. Decoder attempts to fetch from the Pool first and then /// queries an explorer if it can't find the blob. When Decoder does find a /// blob, it decodes it and returns the decoded transactions. -#[derive(Debug)] -pub struct BlockExtractor { +pub struct BlobFetcher { pool: Pool, explorer: foundry_blob_explorers::Client, client: reqwest::Client, @@ -103,14 +102,25 @@ pub struct BlockExtractor { slot_calculator: SlotCalculator, } -impl BlockExtractor<()> { - /// Returns a new [`BlockExtractorBuilder`]. - pub fn builder() -> BlockExtractorBuilder<()> { - BlockExtractorBuilder::default() +impl core::fmt::Debug for BlobFetcher { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + f.debug_struct("BlobFetcher") + .field("pool", &self.pool) + .field("explorer", &self.explorer.baseurl()) + .field("cl_url", &self.cl_url) + .field("pylon_url", &self.pylon_url) + .finish_non_exhaustive() } } -impl BlockExtractor +impl BlobFetcher<()> { + /// Returns a new [`BlobFetcherBuilder`]. + pub fn builder() -> BlobFetcherBuilder<()> { + BlobFetcherBuilder::default() + } +} + +impl BlobFetcher where Pool: TransactionPool, { @@ -132,60 +142,63 @@ where &self, extract: &ExtractedEvent<'_, Receipt, BlockSubmitted>, slot: u64, - ) -> ExtractionResult> { + ) -> FetchResult> { debug_assert!(extract.tx.is_eip4844(), "Transaction must be of type EIP-4844"); let hash = extract.tx.tx_hash(); - let bz = self.fetch_blobs(extract, slot).await?; + let versioned_hashes = extract + .tx + .as_eip4844() + .expect("tx is eip4844") + .blob_versioned_hashes() + .expect("tx is eip4844"); + let bz = self.fetch_blobs(slot, extract.tx_hash(), versioned_hashes).await?; SimpleCoder::default() .decode_all(bz.as_ref()) - .ok_or_else(BlockExtractionError::blob_decode_error)? + .ok_or_else(BlobFetcherError::blob_decode_error)? .into_iter() .find(|data| keccak256(data) == extract.block_data_hash()) - .ok_or_else(|| BlockExtractionError::block_data_not_found(*hash)) + .ok_or_else(|| BlobFetcherError::block_data_not_found(*hash)) } /// Fetch blobs from the local txpool, or fall back to remote sources - async fn fetch_blobs( + #[instrument(skip(self, versioned_hashes))] + pub(crate) async fn fetch_blobs( &self, - extract: &ExtractedEvent<'_, Receipt, BlockSubmitted>, slot: u64, - ) -> ExtractionResult { - let hash = extract.tx_hash(); - - if let Ok(blobs) = self.get_blobs_from_pool(hash) { + tx_hash: B256, + versioned_hashes: &[B256], + ) -> FetchResult { + if let Ok(blobs) = self.get_blobs_from_pool(tx_hash) { return Ok(blobs); } // if the pool doesn't have it, reach out to other sources // and return the first successful response select! { - Ok(blobs) = self.get_blobs_from_explorer(hash) => { + Ok(blobs) = self.get_blobs_from_explorer(tx_hash) => { Ok(blobs) } - Ok(blobs) = self.get_blobs_from_cl(extract, slot) => { + Ok(blobs) = self.get_blobs_from_cl(slot, versioned_hashes) => { Ok(blobs) } - Ok(blobs) = self.get_blobs_from_pylon(hash) => { + Ok(blobs) = self.get_blobs_from_pylon(tx_hash) => { Ok(blobs) } else => { - error!(%hash, "Blobs not available from any source"); - Err(BlockExtractionError::missing_sidecar(hash)) + error!(%tx_hash, "Blobs not available from any source"); + Err(BlobFetcherError::missing_sidecar(tx_hash)) } } } /// Return a blob from the local pool or an error - fn get_blobs_from_pool(&self, tx: TxHash) -> ExtractionResult { - self.pool - .get_blob(tx)? - .map(Into::into) - .ok_or_else(|| BlockExtractionError::missing_sidecar(tx)) + fn get_blobs_from_pool(&self, tx: TxHash) -> FetchResult { + self.pool.get_blob(tx)?.map(Into::into).ok_or_else(|| BlobFetcherError::missing_sidecar(tx)) } /// Returns the blob from the explorer - async fn get_blobs_from_explorer(&self, tx: TxHash) -> ExtractionResult { + async fn get_blobs_from_explorer(&self, tx: TxHash) -> FetchResult { let sidecar = self.explorer.transaction(tx).await?; let blobs: Blobs = sidecar.blobs.iter().map(|b| *b.data).collect(); debug_assert!(!blobs.is_empty(), "Explorer returned no blobs"); @@ -194,11 +207,9 @@ where /// Returns the blob from the pylon blob indexer. #[instrument(skip_all, err)] - async fn get_blobs_from_pylon(&self, tx: TxHash) -> ExtractionResult { + async fn get_blobs_from_pylon(&self, tx: TxHash) -> FetchResult { if let Some(url) = &self.pylon_url { - let url = url.join(&format!("sidecar/{tx}")).map_err(|err| { - BlockExtractionError::Unrecoverable(UnrecoverableBlobError::UrlParse(err)) - })?; + let url = url.join(&format!("sidecar/{tx}"))?; let response = self.client.get(url).header("accept", "application/json").send().await?; response @@ -207,33 +218,25 @@ where .map(Into::into) .map_err(Into::into) } else { - Err(BlockExtractionError::Unrecoverable( - UnrecoverableBlobError::ConsensusClientUrlNotSet, - )) + Err(BlobFetcherError::Unrecoverable(UnrecoverableBlobError::ConsensusClientUrlNotSet)) } } /// Queries the connected consensus client for the blob transaction #[instrument(skip_all, err)] - async fn get_blobs_from_cl( - &self, - extract: &ExtractedEvent<'_, Receipt, BlockSubmitted>, - slot: u64, - ) -> ExtractionResult { + async fn get_blobs_from_cl(&self, slot: u64, versioned_hashes: &[B256]) -> FetchResult { if let Some(url) = &self.cl_url { let url = url.join(&format!("/eth/v1/beacon/blob_sidecars/{slot}")).map_err(|err| { - BlockExtractionError::Unrecoverable(UnrecoverableBlobError::UrlParse(err)) + BlobFetcherError::Unrecoverable(UnrecoverableBlobError::UrlParse(err)) })?; let response = self.client.get(url).header("accept", "application/json").send().await?; let response: BeaconBlobBundle = response.json().await?; - extract_blobs_from_bundle(response, extract) + extract_blobs_from_bundle(response, versioned_hashes) } else { - Err(BlockExtractionError::Unrecoverable( - UnrecoverableBlobError::ConsensusClientUrlNotSet, - )) + Err(BlobFetcherError::Unrecoverable(UnrecoverableBlobError::ConsensusClientUrlNotSet)) } } @@ -246,9 +249,9 @@ where extract: &ExtractedEvent<'_, Receipt, BlockSubmitted>, host_block_number: u64, host_block_timestamp: u64, - ) -> ExtractionResult { + ) -> FetchResult { if !extract.is_eip4844() { - return Err(BlockExtractionError::non_4844_transaction()); + return Err(BlobFetcherError::non_4844_transaction()); } let header = extract.ru_header(host_block_number); @@ -273,7 +276,7 @@ where pub async fn block_from_outputs( &self, outputs: &Extracts<'_, ExtractableChainShim<'_>>, - ) -> ExtractionResult> { + ) -> FetchResult> { if !outputs.contains_block() { return Ok(None); } @@ -302,20 +305,17 @@ where /// This also dedups any duplicate blobs if a builder lands the same blob multiple times in a block. fn extract_blobs_from_bundle( bundle: BeaconBlobBundle, - extract: &ExtractedEvent<'_, Receipt, BlockSubmitted>, -) -> ExtractionResult { + versioned_hashes: &[B256], +) -> FetchResult { let mut blobs = vec![]; // NB: There can be, at most, 9 blobs per block from Pectra forwards. We'll never need more space than this, unless blob capacity is increased again or made dynamic. let mut seen_versioned_hashes: SmallVec<[B256; 9]> = SmallVec::new(); - // NB: This is already checked and we know it's an EIP-4844 transaction. - let tx = extract.tx.as_eip4844().unwrap(); - for item in bundle.data.iter() { let versioned_hash = alloy::eips::eip4844::kzg_to_versioned_hash(item.kzg_commitment.as_ref()); - if tx.tx().blob_versioned_hashes.contains(&versioned_hash) + if versioned_hashes.contains(&versioned_hash) && !seen_versioned_hashes.contains(&versioned_hash) { blobs.push(*item.blob); @@ -331,15 +331,12 @@ mod tests { use super::*; use alloy::{ consensus::{ - BlobTransactionSidecar, SidecarBuilder, SignableTransaction, TxEip2930, TxEnvelope, + BlobTransactionSidecar, SidecarBuilder, SignableTransaction as _, TxEip2930, TxEnvelope, }, eips::Encodable2718, - primitives::{Address, TxKind, U256, bytes}, - rlp::encode, - signers::{SignerSync, local::PrivateKeySigner}, + primitives::{TxKind, U256, bytes}, }; use foundry_blob_explorers::TransactionDetails; - use reth::primitives::{Transaction, TransactionSigned}; use reth_transaction_pool::{ PoolTransaction, TransactionOrigin, test_utils::{MockTransaction, testing_pool}, @@ -361,21 +358,11 @@ mod tests { let tx: TxEnvelope = serde_json::from_str::(CL_BLOB_TX).unwrap(); let tx: TransactionSigned = tx.into(); - let extract = ExtractedEvent::<'_, Receipt, BlockSubmitted> { - tx: &tx, - receipt: &Receipt::default(), - log_index: 0, - event: BlockSubmitted { - sequencer: Address::ZERO, - rollupChainId: U256::ZERO, - gasLimit: U256::ZERO, - rewardAddress: Address::ZERO, - blockDataHash: B256::ZERO, - }, - }; + let versioned_hashes = + tx.blob_versioned_hashes().map(ToOwned::to_owned).unwrap_or_default(); // Extract the blobs from the CL beacon blob bundle. - let cl_blobs = extract_blobs_from_bundle(bundle, &extract).unwrap(); + let cl_blobs = extract_blobs_from_bundle(bundle, &versioned_hashes).unwrap(); assert_eq!(cl_blobs.len(), 1); // Now, process the pylon blobs which come in a [`BlobTransactionSidecar`]. @@ -416,7 +403,7 @@ mod tests { let explorer_url = "https://api.holesky.blobscan.com/"; let client = reqwest::Client::builder().use_rustls_tls(); - let extractor = BlockExtractor::builder() + let extractor = BlobFetcher::builder() .with_pool(pool.clone()) .with_explorer_url(explorer_url) .with_client_builder(client) diff --git a/crates/blobber/src/lib.rs b/crates/blobber/src/lib.rs index e01c823..8faf0da 100644 --- a/crates/blobber/src/lib.rs +++ b/crates/blobber/src/lib.rs @@ -11,17 +11,20 @@ #![deny(unused_must_use, rust_2018_idioms)] #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] -mod block_data; -pub use block_data::{Blobs, BlockExtractor}; - mod builder; -pub use builder::{BlockExtractorBuilder, BuilderError as BlockExtractorBuilderError}; +pub use builder::{BlobFetcherBuilder, BuilderError as BlobFetcherBuilderError}; + +mod cache; +pub use cache::{BlobCacher, CacheHandle}; mod config; -pub use config::BlockExtractorConfig; +pub use config::BlobFetcherConfig; mod error; -pub use error::{BlockExtractionError, ExtractionResult}; +pub use error::{BlobFetcherError, FetchResult}; + +mod fetch; +pub use fetch::{BlobFetcher, Blobs}; mod shim; pub use shim::ExtractableChainShim; From 38f2e0c67087ada5d866664e93172b85760b2420 Mon Sep 17 00:00:00 2001 From: James Date: Mon, 4 Aug 2025 13:09:26 -0400 Subject: [PATCH 2/5] feat: decoding methods --- crates/blobber/src/cache.rs | 56 +++++++++++++++++++++++++++++++++---- crates/blobber/src/fetch.rs | 12 +++++--- 2 files changed, 58 insertions(+), 10 deletions(-) diff --git a/crates/blobber/src/cache.rs b/crates/blobber/src/cache.rs index 603e431..52ccdce 100644 --- a/crates/blobber/src/cache.rs +++ b/crates/blobber/src/cache.rs @@ -1,7 +1,11 @@ use crate::{BlobFetcherError, Blobs, FetchResult}; -use alloy::primitives::B256; -use reth::network::cache::LruMap; +use alloy::consensus::{SidecarCoder, SimpleCoder, Transaction as _}; +use alloy::primitives::{keccak256, Bytes, B256}; use reth::transaction_pool::TransactionPool; +use reth::{network::cache::LruMap, primitives::Receipt}; +use signet_extract::ExtractedEvent; +use signet_zenith::Zenith::BlockSubmitted; +use signet_zenith::ZenithBlock; use std::{ sync::{Arc, Mutex}, time::Duration, @@ -19,7 +23,7 @@ const BETWEEN_RETRIES: Duration = Duration::from_millis(250); /// retrieving blobs. #[derive(Debug)] enum CacheInst { - Retrieve { slot: u64, tx_hash: B256, version_hashes: Vec, resp: oneshot::Sender }, + Retrieve { slot: usize, tx_hash: B256, version_hashes: Vec, resp: oneshot::Sender }, } /// Handle for the cache. @@ -38,7 +42,7 @@ impl CacheHandle { /// fetch blobs if they are not found in the cache. pub async fn fetch_blobs( &self, - slot: u64, + slot: usize, tx_hash: B256, version_hashes: Vec, ) -> FetchResult { @@ -48,13 +52,53 @@ impl CacheHandle { receiver.await.map_err(|_| BlobFetcherError::missing_sidecar(tx_hash)) } + + /// Fetch the blobs using [`Self::fetch_blobs`] and decode them to get the + /// Zenith block data. + pub async fn fetch_and_decode( + &self, + slot: usize, + extract: &ExtractedEvent<'_, Receipt, BlockSubmitted>, + ) -> FetchResult { + let tx_hash = extract.tx_hash(); + let versioned_hashes = extract + .tx + .as_eip4844() + .ok_or_else(BlobFetcherError::non_4844_transaction)? + .blob_versioned_hashes() + .expect("tx is eip4844"); + + let blobs = self.fetch_blobs(slot, tx_hash, versioned_hashes.to_owned()).await?; + + SimpleCoder::default() + .decode_all(blobs.as_ref()) + .ok_or_else(BlobFetcherError::blob_decode_error)? + .into_iter() + .find(|data| keccak256(data) == extract.block_data_hash()) + .map(Into::into) + .ok_or_else(|| BlobFetcherError::block_data_not_found(tx_hash)) + } + + /// Fetch the blobs, decode them, and construct a Zenith block from the + /// header and data. + pub async fn signet_block( + &self, + host_block_number: u64, + slot: usize, + extract: &ExtractedEvent<'_, Receipt, BlockSubmitted>, + ) -> FetchResult { + let header = extract.ru_header(host_block_number); + self.fetch_and_decode(slot, extract) + .await + .map(|buf| ZenithBlock::from_header_and_data(header, buf)) + } } /// Retrieves blobs and stores them in a cache for later use. pub struct BlobCacher { fetcher: crate::BlobFetcher, - cache: Mutex>, + cache: Mutex>, } impl core::fmt::Debug for BlobCacher { @@ -73,7 +117,7 @@ impl BlobCacher { #[instrument(skip(self), target = "signet_blobber::BlobCacher", fields(retries = FETCH_RETRIES))] async fn fetch_blobs( &self, - slot: u64, + slot: usize, tx_hash: B256, versioned_hashes: Vec, ) -> FetchResult { diff --git a/crates/blobber/src/fetch.rs b/crates/blobber/src/fetch.rs index 499375d..1884c36 100644 --- a/crates/blobber/src/fetch.rs +++ b/crates/blobber/src/fetch.rs @@ -140,8 +140,8 @@ where /// searching for the expected hash async fn get_and_decode_blobs( &self, + slot: usize, extract: &ExtractedEvent<'_, Receipt, BlockSubmitted>, - slot: u64, ) -> FetchResult> { debug_assert!(extract.tx.is_eip4844(), "Transaction must be of type EIP-4844"); let hash = extract.tx.tx_hash(); @@ -165,7 +165,7 @@ where #[instrument(skip(self, versioned_hashes))] pub(crate) async fn fetch_blobs( &self, - slot: u64, + slot: usize, tx_hash: B256, versioned_hashes: &[B256], ) -> FetchResult { @@ -224,7 +224,11 @@ where /// Queries the connected consensus client for the blob transaction #[instrument(skip_all, err)] - async fn get_blobs_from_cl(&self, slot: u64, versioned_hashes: &[B256]) -> FetchResult { + async fn get_blobs_from_cl( + &self, + slot: usize, + versioned_hashes: &[B256], + ) -> FetchResult { if let Some(url) = &self.cl_url { let url = url.join(&format!("/eth/v1/beacon/blob_sidecars/{slot}")).map_err(|err| { BlobFetcherError::Unrecoverable(UnrecoverableBlobError::UrlParse(err)) @@ -261,7 +265,7 @@ where .slot_ending_at(host_block_timestamp) .expect("host chain has started"); - let block_data = self.get_and_decode_blobs(extract, slot as u64).await?; + let block_data = self.get_and_decode_blobs(slot, extract).await?; Ok(ZenithBlock::from_header_and_data(header, block_data)) } From 466623d85c1c02137fac329bc4f2ffcbf44e8b4c Mon Sep 17 00:00:00 2001 From: James Date: Mon, 4 Aug 2025 16:09:31 -0400 Subject: [PATCH 3/5] feat: coder abstraction --- crates/blobber/src/cache.rs | 37 ++++++++++++++++++++++++++++++------- 1 file changed, 30 insertions(+), 7 deletions(-) diff --git a/crates/blobber/src/cache.rs b/crates/blobber/src/cache.rs index 52ccdce..0cdb9db 100644 --- a/crates/blobber/src/cache.rs +++ b/crates/blobber/src/cache.rs @@ -54,11 +54,12 @@ impl CacheHandle { } /// Fetch the blobs using [`Self::fetch_blobs`] and decode them to get the - /// Zenith block data. - pub async fn fetch_and_decode( + /// Zenith block data using the provided coder. + pub async fn fetch_and_decode_with_coder( &self, slot: usize, extract: &ExtractedEvent<'_, Receipt, BlockSubmitted>, + mut coder: C, ) -> FetchResult { let tx_hash = extract.tx_hash(); let versioned_hashes = extract @@ -70,7 +71,7 @@ impl CacheHandle { let blobs = self.fetch_blobs(slot, tx_hash, versioned_hashes.to_owned()).await?; - SimpleCoder::default() + coder .decode_all(blobs.as_ref()) .ok_or_else(BlobFetcherError::blob_decode_error)? .into_iter() @@ -79,19 +80,41 @@ impl CacheHandle { .ok_or_else(|| BlobFetcherError::block_data_not_found(tx_hash)) } - /// Fetch the blobs, decode them, and construct a Zenith block from the - /// header and data. - pub async fn signet_block( + /// Fetch the blobs using [`Self::fetch_blobs`] and decode them using + /// [`SimpleCoder`] to get the Zenith block data. + pub async fn fech_and_decode( + &self, + slot: usize, + extract: &ExtractedEvent<'_, Receipt, BlockSubmitted>, + ) -> FetchResult { + self.fetch_and_decode_with_coder(slot, extract, SimpleCoder::default()).await + } + + /// Fetch the blobs, decode them using the provided coder, and construct a + /// Zenith block from the header and data. + pub async fn signet_block_with_coder( &self, host_block_number: u64, slot: usize, extract: &ExtractedEvent<'_, Receipt, BlockSubmitted>, + coder: C, ) -> FetchResult { let header = extract.ru_header(host_block_number); - self.fetch_and_decode(slot, extract) + self.fetch_and_decode_with_coder(slot, extract, coder) .await .map(|buf| ZenithBlock::from_header_and_data(header, buf)) } + + /// Fetch the blobs, decode them using [`SimpleCoder`], and construct a + /// Zenith block from the header and data. + pub async fn signet_block( + &self, + host_block_number: u64, + slot: usize, + extract: &ExtractedEvent<'_, Receipt, BlockSubmitted>, + ) -> FetchResult { + self.signet_block_with_coder(host_block_number, slot, extract, SimpleCoder::default()).await + } } /// Retrieves blobs and stores them in a cache for later use. From c77ae5b9f8639277ac99d93a9990d3d95c999c37 Mon Sep 17 00:00:00 2001 From: James Date: Tue, 5 Aug 2025 09:39:52 -0400 Subject: [PATCH 4/5] refactor: improve constants --- crates/blobber/src/cache.rs | 7 +++++-- crates/blobber/src/fetch.rs | 5 ++++- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/crates/blobber/src/cache.rs b/crates/blobber/src/cache.rs index 0cdb9db..b24546d 100644 --- a/crates/blobber/src/cache.rs +++ b/crates/blobber/src/cache.rs @@ -1,5 +1,7 @@ use crate::{BlobFetcherError, Blobs, FetchResult}; use alloy::consensus::{SidecarCoder, SimpleCoder, Transaction as _}; +use alloy::eips::eip7691::MAX_BLOBS_PER_BLOCK_ELECTRA; +use alloy::eips::merge::EPOCH_SLOTS; use alloy::primitives::{keccak256, Bytes, B256}; use reth::transaction_pool::TransactionPool; use reth::{network::cache::LruMap, primitives::Receipt}; @@ -13,7 +15,8 @@ use std::{ use tokio::sync::{mpsc, oneshot}; use tracing::{error, info, instrument, warn}; -const BLOB_CACHE_SIZE: u32 = 144; +const BLOB_CACHE_SIZE: u32 = (MAX_BLOBS_PER_BLOCK_ELECTRA * EPOCH_SLOTS) as u32; +const CACHE_REQUEST_CHANNEL_SIZE: usize = (MAX_BLOBS_PER_BLOCK_ELECTRA * 2) as usize; const FETCH_RETRIES: usize = 3; const BETWEEN_RETRIES: Duration = Duration::from_millis(250); @@ -198,7 +201,7 @@ impl BlobCacher { /// # Panics /// This function will panic if the cache task fails to spawn. pub fn spawn(self) -> CacheHandle { - let (sender, inst) = mpsc::channel(12); + let (sender, inst) = mpsc::channel(CACHE_REQUEST_CHANNEL_SIZE); tokio::spawn(Arc::new(self).task_future(inst)); CacheHandle { sender } } diff --git a/crates/blobber/src/fetch.rs b/crates/blobber/src/fetch.rs index 1884c36..9fba923 100644 --- a/crates/blobber/src/fetch.rs +++ b/crates/blobber/src/fetch.rs @@ -339,13 +339,16 @@ mod tests { }, eips::Encodable2718, primitives::{TxKind, U256, bytes}, + rlp::encode, + signers::{SignerSync, local::PrivateKeySigner}, }; use foundry_blob_explorers::TransactionDetails; + use reth::primitives::Transaction; use reth_transaction_pool::{ PoolTransaction, TransactionOrigin, test_utils::{MockTransaction, testing_pool}, }; - use signet_types::constants::SignetSystemConstants; + use signet_types::{constants::SignetSystemConstants, primitives::TransactionSigned}; const BLOBSCAN_BLOB_RESPONSE: &str = include_str!("../../../tests/artifacts/blob.json"); /// Blob from Slot 2277733, corresponding to block 277722 on Pecorino host. From 47e2ec860089ff4a8f8da90eb4157ffd35c14e00 Mon Sep 17 00:00:00 2001 From: James Date: Tue, 5 Aug 2025 09:57:07 -0400 Subject: [PATCH 5/5] refactor: improve mod and test layout --- crates/blobber/src/cache.rs | 91 ++++++++++++++++++++++++++++++++++++- crates/blobber/src/fetch.rs | 80 +------------------------------- crates/blobber/src/lib.rs | 14 ++++++ crates/blobber/src/utils.rs | 87 +++++++++++++++++++++++++++++++++++ 4 files changed, 193 insertions(+), 79 deletions(-) create mode 100644 crates/blobber/src/utils.rs diff --git a/crates/blobber/src/cache.rs b/crates/blobber/src/cache.rs index b24546d..bdda09d 100644 --- a/crates/blobber/src/cache.rs +++ b/crates/blobber/src/cache.rs @@ -2,7 +2,7 @@ use crate::{BlobFetcherError, Blobs, FetchResult}; use alloy::consensus::{SidecarCoder, SimpleCoder, Transaction as _}; use alloy::eips::eip7691::MAX_BLOBS_PER_BLOCK_ELECTRA; use alloy::eips::merge::EPOCH_SLOTS; -use alloy::primitives::{keccak256, Bytes, B256}; +use alloy::primitives::{B256, Bytes, keccak256}; use reth::transaction_pool::TransactionPool; use reth::{network::cache::LruMap, primitives::Receipt}; use signet_extract::ExtractedEvent; @@ -206,3 +206,92 @@ impl BlobCacher { CacheHandle { sender } } } + +#[cfg(test)] +mod tests { + use crate::BlobFetcher; + + use super::*; + use alloy::{ + consensus::{SidecarBuilder, SignableTransaction as _, TxEip2930}, + eips::Encodable2718, + primitives::{TxKind, U256, bytes}, + rlp::encode, + signers::{SignerSync, local::PrivateKeySigner}, + }; + use init4_bin_base::utils::calc::SlotCalculator; + use reth::primitives::Transaction; + use reth_transaction_pool::{ + PoolTransaction, TransactionOrigin, + test_utils::{MockTransaction, testing_pool}, + }; + use signet_types::{constants::SignetSystemConstants, primitives::TransactionSigned}; + + #[tokio::test] + async fn test_fetch_from_pool() -> eyre::Result<()> { + let wallet = PrivateKeySigner::random(); + let pool = testing_pool(); + + let test = signet_constants::KnownChains::Test; + + let constants: SignetSystemConstants = test.try_into().unwrap(); + let calc = SlotCalculator::new(0, 0, 12); + + let explorer_url = "https://api.holesky.blobscan.com/"; + let client = reqwest::Client::builder().use_rustls_tls(); + + let tx = Transaction::Eip2930(TxEip2930 { + chain_id: 17001, + nonce: 2, + gas_limit: 50000, + gas_price: 1_500_000_000, + to: TxKind::Call(constants.host_zenith()), + value: U256::from(1_f64), + input: bytes!(""), + ..Default::default() + }); + + let encoded_transactions = + encode(vec![sign_tx_with_key_pair(wallet.clone(), tx).encoded_2718()]); + + let result = SidecarBuilder::::from_slice(&encoded_transactions).build(); + assert!(result.is_ok()); + + let mut mock_transaction = MockTransaction::eip4844_with_sidecar(result.unwrap().into()); + let transaction = + sign_tx_with_key_pair(wallet, Transaction::from(mock_transaction.clone())); + + mock_transaction.set_hash(*transaction.hash()); + + pool.add_transaction(TransactionOrigin::Local, mock_transaction.clone()).await?; + + // Spawn the cache + let cache = BlobFetcher::builder() + .with_pool(pool.clone()) + .with_explorer_url(explorer_url) + .with_client_builder(client) + .unwrap() + .with_slot_calculator(calc) + .build_cache()?; + let handle = cache.spawn(); + + let got = handle + .fetch_blobs( + 0, // this is ignored by the pool + *mock_transaction.hash(), + mock_transaction.blob_versioned_hashes().unwrap().to_owned(), + ) + .await; + assert!(got.is_ok()); + + let got_blobs = got.unwrap(); + assert!(got_blobs.len() == 1); + + Ok(()) + } + + fn sign_tx_with_key_pair(wallet: PrivateKeySigner, tx: Transaction) -> TransactionSigned { + let signature = wallet.sign_hash_sync(&tx.signature_hash()).unwrap(); + TransactionSigned::new_unhashed(tx, signature) + } +} diff --git a/crates/blobber/src/fetch.rs b/crates/blobber/src/fetch.rs index 9fba923..8eefb55 100644 --- a/crates/blobber/src/fetch.rs +++ b/crates/blobber/src/fetch.rs @@ -1,6 +1,6 @@ use crate::{ BlobFetcherBuilder, BlobFetcherError, FetchResult, error::UnrecoverableBlobError, - shim::ExtractableChainShim, + shim::ExtractableChainShim, utils::extract_blobs_from_bundle, }; use alloy::{ consensus::{Blob, SidecarCoder, SimpleCoder, Transaction as _}, @@ -14,7 +14,6 @@ use reth::{ }; use signet_extract::{ExtractedEvent, Extracts}; use signet_zenith::{Zenith::BlockSubmitted, ZenithBlock}; -use smallvec::SmallVec; use std::{ops::Deref, sync::Arc}; use tokio::select; use tracing::{error, instrument, trace}; @@ -305,44 +304,16 @@ where } } -/// Extracts the blobs from the [`BeaconBlobBundle`], and returns the blobs that match the versioned hashes in the transaction. -/// This also dedups any duplicate blobs if a builder lands the same blob multiple times in a block. -fn extract_blobs_from_bundle( - bundle: BeaconBlobBundle, - versioned_hashes: &[B256], -) -> FetchResult { - let mut blobs = vec![]; - // NB: There can be, at most, 9 blobs per block from Pectra forwards. We'll never need more space than this, unless blob capacity is increased again or made dynamic. - let mut seen_versioned_hashes: SmallVec<[B256; 9]> = SmallVec::new(); - - for item in bundle.data.iter() { - let versioned_hash = - alloy::eips::eip4844::kzg_to_versioned_hash(item.kzg_commitment.as_ref()); - - if versioned_hashes.contains(&versioned_hash) - && !seen_versioned_hashes.contains(&versioned_hash) - { - blobs.push(*item.blob); - seen_versioned_hashes.push(versioned_hash); - } - } - - Ok(blobs.into()) -} - #[cfg(test)] mod tests { use super::*; use alloy::{ - consensus::{ - BlobTransactionSidecar, SidecarBuilder, SignableTransaction as _, TxEip2930, TxEnvelope, - }, + consensus::{SidecarBuilder, SignableTransaction as _, TxEip2930}, eips::Encodable2718, primitives::{TxKind, U256, bytes}, rlp::encode, signers::{SignerSync, local::PrivateKeySigner}, }; - use foundry_blob_explorers::TransactionDetails; use reth::primitives::Transaction; use reth_transaction_pool::{ PoolTransaction, TransactionOrigin, @@ -350,53 +321,6 @@ mod tests { }; use signet_types::{constants::SignetSystemConstants, primitives::TransactionSigned}; - const BLOBSCAN_BLOB_RESPONSE: &str = include_str!("../../../tests/artifacts/blob.json"); - /// Blob from Slot 2277733, corresponding to block 277722 on Pecorino host. - const CL_BLOB_RESPONSE: &str = include_str!("../../../tests/artifacts/cl_blob.json"); - /// EIP4844 blob tx with hash 0x73d1c682fae85c761528a0a7ec22fac613b25ede87b80f0ac052107f3444324f, - /// corresponding to blob sent to block 277722 on Pecorino host. - const CL_BLOB_TX: &str = include_str!("../../../tests/artifacts/cl_blob_tx.json"); - /// Blob sidecar from Pylon, corresponding to block 277722 on Pecorino host. - const PYLON_BLOB_RESPONSE: &str = include_str!("../../../tests/artifacts/pylon_blob.json"); - - #[test] - fn test_process_blob_extraction() { - let bundle: BeaconBlobBundle = serde_json::from_str(CL_BLOB_RESPONSE).unwrap(); - let tx: TxEnvelope = serde_json::from_str::(CL_BLOB_TX).unwrap(); - let tx: TransactionSigned = tx.into(); - - let versioned_hashes = - tx.blob_versioned_hashes().map(ToOwned::to_owned).unwrap_or_default(); - - // Extract the blobs from the CL beacon blob bundle. - let cl_blobs = extract_blobs_from_bundle(bundle, &versioned_hashes).unwrap(); - assert_eq!(cl_blobs.len(), 1); - - // Now, process the pylon blobs which come in a [`BlobTransactionSidecar`]. - // NB: this should be changes to `BlobTransactionSidecarVariant` in the - // future. After https://github.com/alloy-rs/alloy/pull/2713 - // The json is definitely a `BlobTransactionSidecar`, so we can - // deserialize it directly and it doesn't really matter much. - let sidecar: BlobTransactionSidecar = - serde_json::from_str::(PYLON_BLOB_RESPONSE).unwrap(); - let pylon_blobs: Blobs = Arc::::new(sidecar.into()).into(); - - // Make sure that both blob sources have the same blobs after being processed. - assert_eq!(cl_blobs.len(), pylon_blobs.len()); - assert_eq!(cl_blobs.as_slice(), pylon_blobs.as_slice()); - - // Make sure both can be decoded - let cl_decoded = SimpleCoder::default().decode_all(cl_blobs.as_ref()).unwrap(); - let pylon_decoded = SimpleCoder::default().decode_all(pylon_blobs.as_ref()).unwrap(); - assert_eq!(cl_decoded.len(), pylon_decoded.len()); - assert_eq!(cl_decoded, pylon_decoded); - } - - #[test] - fn test_deser_blob() { - let _: TransactionDetails = serde_json::from_str(BLOBSCAN_BLOB_RESPONSE).unwrap(); - } - #[tokio::test] async fn test_fetch_from_pool() -> eyre::Result<()> { let wallet = PrivateKeySigner::random(); diff --git a/crates/blobber/src/lib.rs b/crates/blobber/src/lib.rs index 8faf0da..f4717f4 100644 --- a/crates/blobber/src/lib.rs +++ b/crates/blobber/src/lib.rs @@ -28,3 +28,17 @@ pub use fetch::{BlobFetcher, Blobs}; mod shim; pub use shim::ExtractableChainShim; + +pub(crate) mod utils; + +#[cfg(test)] +mod test { + use crate::utils::tests::BLOBSCAN_BLOB_RESPONSE; + use foundry_blob_explorers::TransactionDetails; + + // Sanity check on dependency compatibility. + #[test] + fn test_deser_blob() { + let _: TransactionDetails = serde_json::from_str(BLOBSCAN_BLOB_RESPONSE).unwrap(); + } +} diff --git a/crates/blobber/src/utils.rs b/crates/blobber/src/utils.rs new file mode 100644 index 0000000..71d27be --- /dev/null +++ b/crates/blobber/src/utils.rs @@ -0,0 +1,87 @@ +use crate::{Blobs, FetchResult}; +use alloy::{ + eips::{eip4844::kzg_to_versioned_hash, eip7691::MAX_BLOBS_PER_BLOCK_ELECTRA}, + primitives::B256, +}; +use reth::rpc::types::beacon::sidecar::BeaconBlobBundle; +use smallvec::SmallVec; + +/// Extracts the blobs from the [`BeaconBlobBundle`], and returns the blobs that match the versioned hashes in the transaction. +/// This also dedups any duplicate blobs if a builder lands the same blob multiple times in a block. +pub(crate) fn extract_blobs_from_bundle( + bundle: BeaconBlobBundle, + versioned_hashes: &[B256], +) -> FetchResult { + let mut blobs = vec![]; + // NB: There can be, at most, 9 blobs per block from Pectra forwards. We'll never need more space than this, unless blob capacity is increased again or made dynamic. + let mut seen_versioned_hashes: SmallVec<[B256; MAX_BLOBS_PER_BLOCK_ELECTRA as usize]> = + SmallVec::new(); + + for item in bundle.data.iter() { + let versioned_hash = kzg_to_versioned_hash(item.kzg_commitment.as_ref()); + + if versioned_hashes.contains(&versioned_hash) + && !seen_versioned_hashes.contains(&versioned_hash) + { + blobs.push(*item.blob); + seen_versioned_hashes.push(versioned_hash); + } + } + + Ok(blobs.into()) +} + +#[cfg(test)] +pub(crate) mod tests { + + use super::*; + use alloy::{ + consensus::{BlobTransactionSidecar, SidecarCoder, SimpleCoder, Transaction, TxEnvelope}, + eips::eip7594::BlobTransactionSidecarVariant, + }; + use signet_types::primitives::TransactionSigned; + use std::sync::Arc; + + pub(crate) const BLOBSCAN_BLOB_RESPONSE: &str = + include_str!("../../../tests/artifacts/blob.json"); + /// Blob from Slot 2277733, corresponding to block 277722 on Pecorino host. + pub(crate) const CL_BLOB_RESPONSE: &str = include_str!("../../../tests/artifacts/cl_blob.json"); + /// EIP4844 blob tx with hash 0x73d1c682fae85c761528a0a7ec22fac613b25ede87b80f0ac052107f3444324f, + /// corresponding to blob sent to block 277722 on Pecorino host. + pub(crate) const CL_BLOB_TX: &str = include_str!("../../../tests/artifacts/cl_blob_tx.json"); + /// Blob sidecar from Pylon, corresponding to block 277722 on Pecorino host. + pub(crate) const PYLON_BLOB_RESPONSE: &str = + include_str!("../../../tests/artifacts/pylon_blob.json"); + + #[test] + fn test_process_blob_extraction() { + let bundle: BeaconBlobBundle = serde_json::from_str(CL_BLOB_RESPONSE).unwrap(); + let tx: TxEnvelope = serde_json::from_str::(CL_BLOB_TX).unwrap(); + let tx: TransactionSigned = tx.into(); + + let versioned_hashes = tx.blob_versioned_hashes().unwrap().to_owned(); + + // Extract the blobs from the CL beacon blob bundle. + let cl_blobs = extract_blobs_from_bundle(bundle, &versioned_hashes).unwrap(); + assert_eq!(cl_blobs.len(), 1); + + // Now, process the pylon blobs which come in a [`BlobTransactionSidecar`]. + // NB: this should be changes to `BlobTransactionSidecarVariant` in the + // future. After https://github.com/alloy-rs/alloy/pull/2713 + // The json is definitely a `BlobTransactionSidecar`, so we can + // deserialize it directly and it doesn't really matter much. + let sidecar: BlobTransactionSidecar = + serde_json::from_str::(PYLON_BLOB_RESPONSE).unwrap(); + let pylon_blobs: Blobs = Arc::::new(sidecar.into()).into(); + + // Make sure that both blob sources have the same blobs after being processed. + assert_eq!(cl_blobs.len(), pylon_blobs.len()); + assert_eq!(cl_blobs.as_slice(), pylon_blobs.as_slice()); + + // Make sure both can be decoded + let cl_decoded = SimpleCoder::default().decode_all(cl_blobs.as_ref()).unwrap(); + let pylon_decoded = SimpleCoder::default().decode_all(pylon_blobs.as_ref()).unwrap(); + assert_eq!(cl_decoded.len(), pylon_decoded.len()); + assert_eq!(cl_decoded, pylon_decoded); + } +}