Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
501a914
feat(graph): add Nozzle Flight service client
isum Sep 25, 2025
941f4ec
feat(graph): add Nozzle stream aggregator
isum Sep 25, 2025
08d7a21
feat(graph): add Nozzle data decoder
isum Sep 25, 2025
0dd263b
feat(graph): add SQL query parser, resolver and validator
isum Sep 25, 2025
6429c19
feat(graph): use a new identifier type in Nozzle related modules
isum Sep 25, 2025
0e8314b
feat(graph): add Nozzle Subgraph schema generation
isum Sep 25, 2025
01d2596
feat(graph): add Nozzle Subgraph manifest
isum Sep 25, 2025
e7e11f8
feat(graph): add reorg handling to the Nozzle FlightClient
isum Oct 28, 2025
be604db
feat(graph, core): extend SubgraphInstanceManager trait
isum Oct 28, 2025
3615301
feat(core, graph, node): allow multiple subgraph instance managers
isum Oct 28, 2025
efb7c59
fix(graph): update deterministic error patterns in Nozzle Flight client
isum Oct 28, 2025
5a2c3af
feat(graph): add Nozzle related ENV variables
isum Oct 28, 2025
e5b7898
fix(graph): make block range filter return a new query
isum Oct 28, 2025
71829d4
feat(graph): add decoding utilities
isum Oct 28, 2025
b0d0bcd
fix(graph): use decoding utilities in the stream aggregator
isum Oct 28, 2025
6a0930e
feat(graph): add more details to Nozzle data sources
isum Oct 28, 2025
19cf6dd
feat(core, graph, node): add Nozzle subgraph deployment
isum Oct 28, 2025
9a661b2
feat(graph): add a dedicated Nozzle manifest resolver
isum Oct 28, 2025
f7cc3ba
feat(node): add shutdown token
isum Oct 28, 2025
4c747b9
feat(core, graph): add Nozzle subgraph runner
isum Oct 28, 2025
2df58ab
chore(all): rename Nozzle to Amp
isum Oct 29, 2025
82cf29a
fix(graph): produce consistent query hashes for logging
isum Nov 5, 2025
762e27f
fix(core, graph): simplify SQL query requirements
isum Nov 5, 2025
4cadade
chore(graph): fix typos
isum Nov 5, 2025
4d74833
fix(graph): use nozzle-resume header name
isum Nov 5, 2025
5a8688c
fix(graph): extend common column aliases
isum Nov 6, 2025
3e71ed3
fix(core, graph): use named streams in the stream aggregator
isum Nov 6, 2025
a02db82
fix(core, graph): simplify working with identifiers
isum Nov 6, 2025
74c9357
fix(graph): validate query output column names
isum Nov 6, 2025
cd8f962
fix(graph): support all versions of the Amp server
isum Nov 6, 2025
c3fcb3b
fix(graph): extend the list of common column aliases
isum Nov 11, 2025
2c68f4f
test(graph): add decoder unit-tests
isum Nov 11, 2025
c57a959
feat(core, graph): add Amp subgraph metrics
isum Nov 18, 2025
eb2bf43
fix(graph): allow more complex dataset and table names
isum Nov 20, 2025
e4d71e8
fix(graph): remove CTE name requirements
isum Nov 20, 2025
459028c
fix(graph, node): add option to authenticate Flight service requests
isum Nov 20, 2025
b7c720b
fix(graph): update temporary predefined list of source context tables
isum Nov 21, 2025
3ebfc27
docs: add docs for Amp-powered subgraphs
isum Nov 21, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,382 changes: 1,326 additions & 56 deletions Cargo.lock

Large diffs are not rendered by default.

14 changes: 14 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,20 @@ substreams = "=0.6.0"
substreams-entity-change = "2"
substreams-near-core = "=0.10.2"
rand = { version = "0.9.2", features = ["os_rng"] }
prometheus = "0.14.0"

# Dependencies related to Amp subgraphs
ahash = "0.8.11"
alloy = { version = "1.0.12", default-features = false, features = ["json-abi", "serde"] }
arrow = { version = "=55.0.0" }
arrow-flight = { version = "=55.0.0", features = ["flight-sql-experimental"] }
futures = "0.3.31"
half = "2.7.1"
indoc = "2.0.7"
lazy-regex = "3.4.1"
parking_lot = "0.12.4"
sqlparser-latest = { version = "0.57.0", package = "sqlparser", features = ["visitor"] }
tokio-util = "0.7.15"

# Incremental compilation on Rust 1.58 causes an ICE on build. As soon as graph node builds again, these can be removed.
[profile.test]
Expand Down
1 change: 1 addition & 0 deletions chain/ethereum/src/runtime/runtime_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ impl blockchain::RuntimeAdapter<Chain> for RuntimeAdapter {
create_host_fns(abis, archive, call_cache, eth_adapters, eth_call_gas)
}
data_source::DataSource::Offchain(_) => vec![],
data_source::DataSource::Amp(_) => vec![],
};

Ok(host_fns)
Expand Down
14 changes: 14 additions & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,20 @@ thiserror = { workspace = true }
cid = "0.11.1"
anyhow = "1.0"

# Dependencies related to Amp subgraphs
alloy.workspace = true
arrow.workspace = true
chrono.workspace = true
futures.workspace = true
indoc.workspace = true
itertools.workspace = true
parking_lot.workspace = true
prometheus.workspace = true
slog.workspace = true
strum.workspace = true
tokio-util.workspace = true
tokio.workspace = true

[dev-dependencies]
tower-test = { git = "https://github.com/tower-rs/tower.git" }
wiremock = "0.6.5"
171 changes: 171 additions & 0 deletions core/src/amp_subgraph/manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
use std::sync::Arc;

use alloy::primitives::BlockNumber;
use anyhow::Context;
use async_trait::async_trait;
use graph::{
amp,
components::{
link_resolver::{LinkResolver, LinkResolverContext},
metrics::MetricsRegistry,
store::{DeploymentLocator, SubgraphStore},
subgraph::SubgraphInstanceManager,
},
env::EnvVars,
log::factory::LoggerFactory,
prelude::CheapClone,
};
use slog::{debug, error};
use tokio_util::sync::CancellationToken;

use super::{runner, Metrics, Monitor};

/// Manages Amp subgraph runner futures.
///
/// Creates and schedules Amp subgraph runner futures for execution on demand.
/// Also handles stopping previously started Amp subgraph runners.
pub struct Manager<SS, NC> {
logger_factory: LoggerFactory,
metrics_registry: Arc<MetricsRegistry>,
env_vars: Arc<EnvVars>,
monitor: Monitor,
subgraph_store: Arc<SS>,
link_resolver: Arc<dyn LinkResolver>,
amp_client: Arc<NC>,
}

impl<SS, NC> Manager<SS, NC>
where
SS: SubgraphStore,
NC: amp::Client,
{
/// Creates a new Amp subgraph manager.
pub fn new(
logger_factory: &LoggerFactory,
metrics_registry: Arc<MetricsRegistry>,
env_vars: Arc<EnvVars>,
cancel_token: &CancellationToken,
subgraph_store: Arc<SS>,
link_resolver: Arc<dyn LinkResolver>,
amp_client: Arc<NC>,
) -> Self {
let logger = logger_factory.component_logger("AmpSubgraphManager", None);
let logger_factory = logger_factory.with_parent(logger);

let monitor = Monitor::new(&logger_factory, cancel_token);

Self {
logger_factory,
metrics_registry,
env_vars,
monitor,
subgraph_store,
link_resolver,
amp_client,
}
}
}

#[async_trait]
impl<SS, NC> SubgraphInstanceManager for Manager<SS, NC>
where
SS: SubgraphStore,
NC: amp::Client + Send + Sync + 'static,
{
async fn start_subgraph(
self: Arc<Self>,
deployment: DeploymentLocator,
stop_block: Option<i32>,
) {
let manager = self.cheap_clone();

self.monitor.start(
deployment.cheap_clone(),
Box::new(move |cancel_token| {
Box::pin(async move {
let logger = manager.logger_factory.subgraph_logger(&deployment);

let store = manager
.subgraph_store
.cheap_clone()
.writable(logger.cheap_clone(), deployment.id, Vec::new().into())
.await
.context("failed to create writable store")?;

let metrics = Metrics::new(
&logger,
manager.metrics_registry.cheap_clone(),
store.cheap_clone(),
deployment.hash.cheap_clone(),
);

let link_resolver = manager
.link_resolver
.for_manifest(&deployment.hash.to_string())
.context("failed to create link resolver")?;

let manifest_bytes = link_resolver
.cat(
&LinkResolverContext::new(&deployment.hash, &logger),
&deployment.hash.to_ipfs_link(),
)
.await
.context("failed to load subgraph manifest")?;

let raw_manifest = serde_yaml::from_slice(&manifest_bytes)
.context("failed to parse subgraph manifest")?;

let mut manifest = amp::Manifest::resolve::<graph_chain_ethereum::Chain, _>(
&logger,
manager.link_resolver.cheap_clone(),
manager.amp_client.cheap_clone(),
manager.env_vars.max_spec_version.cheap_clone(),
deployment.hash.cheap_clone(),
raw_manifest,
)
.await?;

if let Some(stop_block) = stop_block {
for data_source in manifest.data_sources.iter_mut() {
data_source.source.end_block = stop_block as BlockNumber;
}
}

store
.start_subgraph_deployment(&logger)
.await
.context("failed to start subgraph deployment")?;

let runner_context = runner::Context::new(
&logger,
&manager.env_vars.amp,
manager.amp_client.cheap_clone(),
store,
deployment.hash.cheap_clone(),
manifest,
metrics,
);

let runner_result = runner::new_runner(runner_context)(cancel_token).await;

match manager.subgraph_store.stop_subgraph(&deployment).await {
Ok(()) => {
debug!(logger, "Subgraph writer stopped");
}
Err(e) => {
error!(logger, "Failed to stop subgraph writer";
"e" => ?e
);
}
}

runner_result
})
}),
);
}

async fn stop_subgraph(&self, deployment: DeploymentLocator) {
self.monitor.stop(deployment);
}
}
Loading
Loading