diff --git a/codegenerator/cli/npm/envio/src/Batch.res b/codegenerator/cli/npm/envio/src/Batch.res index 3c4cfa43a..ca99426e9 100644 --- a/codegenerator/cli/npm/envio/src/Batch.res +++ b/codegenerator/cli/npm/envio/src/Batch.res @@ -84,7 +84,7 @@ let hasUnorderedReadyItem = (fetchStates: ChainMap.t) => { let hasMultichainReadyItem = ( fetchStates: ChainMap.t, - ~multichain: InternalConfig.multichain, + ~multichain: Config.multichain, ) => { switch multichain { | Ordered => hasOrderedReadyItem(fetchStates) @@ -496,7 +496,7 @@ let prepareUnorderedBatch = ( let make = ( ~checkpointIdBeforeBatch, ~chainsBeforeBatch: ChainMap.t, - ~multichain: InternalConfig.multichain, + ~multichain: Config.multichain, ~batchSizeTarget, ) => { if ( diff --git a/codegenerator/cli/npm/envio/src/Config.res b/codegenerator/cli/npm/envio/src/Config.res new file mode 100644 index 000000000..bd3505702 --- /dev/null +++ b/codegenerator/cli/npm/envio/src/Config.res @@ -0,0 +1,124 @@ +open Belt + +type ecosystem = | @as("evm") Evm | @as("fuel") Fuel + +type sourceSyncOptions = { + initialBlockInterval?: int, + backoffMultiplicative?: float, + accelerationAdditive?: int, + intervalCeiling?: int, + backoffMillis?: int, + queryTimeoutMillis?: int, + fallbackStallTimeout?: int, +} + +type contract = { + name: string, + abi: EvmTypes.Abi.t, + addresses: array, + events: array, + startBlock: option, +} + +type chain = { + id: int, + startBlock: int, + endBlock?: int, + maxReorgDepth: int, + contracts: array, + sources: array, +} + +type sourceSync = { + initialBlockInterval: int, + backoffMultiplicative: float, + accelerationAdditive: int, + intervalCeiling: int, + backoffMillis: int, + queryTimeoutMillis: int, + fallbackStallTimeout: int, +} + +type multichain = | @as("ordered") Ordered | @as("unordered") Unordered + +type t = { + shouldRollbackOnReorg: bool, + shouldSaveFullHistory: bool, + multichain: multichain, + chainMap: ChainMap.t, + defaultChain: option, + ecosystem: ecosystem, + enableRawEvents: bool, + preloadHandlers: bool, + maxAddrInPartition: int, + batchSize: int, + lowercaseAddresses: bool, + addContractNameToContractNameMapping: dict, +} + +let make = ( + ~shouldRollbackOnReorg=true, + ~shouldSaveFullHistory=false, + ~chains: array=[], + ~enableRawEvents=false, + ~preloadHandlers=false, + ~ecosystem=Evm, + ~batchSize=5000, + ~lowercaseAddresses=false, + ~multichain=Unordered, + ~shouldUseHypersyncClientDecoder=true, + ~maxAddrInPartition=5000, +) => { + // Validate that lowercase addresses is not used with viem decoder + if lowercaseAddresses && !shouldUseHypersyncClientDecoder { + Js.Exn.raiseError( + "lowercase addresses is not supported when event_decoder is 'viem'. Please set event_decoder to 'hypersync-client' or change address_format to 'checksum'.", + ) + } + + let chainMap = + chains + ->Js.Array2.map(n => { + (ChainMap.Chain.makeUnsafe(~chainId=n.id), n) + }) + ->ChainMap.fromArrayUnsafe + + // Build the contract name mapping for efficient lookup + let addContractNameToContractNameMapping = Js.Dict.empty() + chains->Array.forEach(chainConfig => { + chainConfig.contracts->Array.forEach(contract => { + let addKey = "add" ++ contract.name->Utils.String.capitalize + addContractNameToContractNameMapping->Js.Dict.set(addKey, contract.name) + }) + }) + + { + shouldRollbackOnReorg, + shouldSaveFullHistory, + multichain, + chainMap, + defaultChain: chains->Array.get(0), + enableRawEvents, + ecosystem, + maxAddrInPartition, + preloadHandlers, + batchSize, + lowercaseAddresses, + addContractNameToContractNameMapping, + } +} + +let shouldSaveHistory = (config, ~isInReorgThreshold) => + config.shouldSaveFullHistory || (config.shouldRollbackOnReorg && isInReorgThreshold) + +let shouldPruneHistory = (config, ~isInReorgThreshold) => + !config.shouldSaveFullHistory && (config.shouldRollbackOnReorg && isInReorgThreshold) + +let getChain = (config, ~chainId) => { + let chain = ChainMap.Chain.makeUnsafe(~chainId) + config.chainMap->ChainMap.has(chain) + ? chain + : Js.Exn.raiseError( + "No chain with id " ++ chain->ChainMap.Chain.toString ++ " found in config.yaml", + ) +} diff --git a/codegenerator/cli/npm/envio/src/EventRegister.res b/codegenerator/cli/npm/envio/src/EventRegister.res index 78407c98b..ecb8b4f52 100644 --- a/codegenerator/cli/npm/envio/src/EventRegister.res +++ b/codegenerator/cli/npm/envio/src/EventRegister.res @@ -4,8 +4,8 @@ type registrations = { } type activeRegistration = { - ecosystem: InternalConfig.ecosystem, - multichain: InternalConfig.multichain, + ecosystem: Config.ecosystem, + multichain: Config.multichain, preloadHandlers: bool, registrations: registrations, mutable finished: bool, diff --git a/codegenerator/cli/npm/envio/src/EventRegister.resi b/codegenerator/cli/npm/envio/src/EventRegister.resi index 330fa057e..254fe73cb 100644 --- a/codegenerator/cli/npm/envio/src/EventRegister.resi +++ b/codegenerator/cli/npm/envio/src/EventRegister.resi @@ -4,8 +4,8 @@ type registrations = { } let startRegistration: ( - ~ecosystem: InternalConfig.ecosystem, - ~multichain: InternalConfig.multichain, + ~ecosystem: Config.ecosystem, + ~multichain: Config.multichain, ~preloadHandlers: bool, ) => unit let finishRegistration: unit => registrations diff --git a/codegenerator/cli/npm/envio/src/Indexer.res b/codegenerator/cli/npm/envio/src/Indexer.res new file mode 100644 index 000000000..7668dcfb4 --- /dev/null +++ b/codegenerator/cli/npm/envio/src/Indexer.res @@ -0,0 +1,5 @@ +type t = { + config: Config.t, + registrations: EventRegister.registrations, + persistence: Persistence.t, +} diff --git a/codegenerator/cli/npm/envio/src/InternalConfig.res b/codegenerator/cli/npm/envio/src/InternalConfig.res deleted file mode 100644 index 447b999c1..000000000 --- a/codegenerator/cli/npm/envio/src/InternalConfig.res +++ /dev/null @@ -1,48 +0,0 @@ -// TODO: rename the file to Config.res after finishing the migration from codegen -// And turn it into PublicConfig instead -// For internal use we should create Indexer.res with a stateful type - -type ecosystem = | @as("evm") Evm | @as("fuel") Fuel - -type sourceSyncOptions = { - initialBlockInterval?: int, - backoffMultiplicative?: float, - accelerationAdditive?: int, - intervalCeiling?: int, - backoffMillis?: int, - queryTimeoutMillis?: int, - fallbackStallTimeout?: int, -} - -type historyFlag = FullHistory | MinHistory -type rollbackFlag = RollbackOnReorg | NoRollback -type historyConfig = {rollbackFlag: rollbackFlag, historyFlag: historyFlag} - -type contract = { - name: string, - abi: EvmTypes.Abi.t, - addresses: array, - events: array, - startBlock: option, -} - -type chain = { - id: int, - startBlock: int, - endBlock?: int, - maxReorgDepth: int, - contracts: array, - sources: array, -} - -type sourceSync = { - initialBlockInterval: int, - backoffMultiplicative: float, - accelerationAdditive: int, - intervalCeiling: int, - backoffMillis: int, - queryTimeoutMillis: int, - fallbackStallTimeout: int, -} - -type multichain = | @as("ordered") Ordered | @as("unordered") Unordered diff --git a/codegenerator/cli/npm/envio/src/Persistence.res b/codegenerator/cli/npm/envio/src/Persistence.res index f3449f3ff..1a95f7eda 100644 --- a/codegenerator/cli/npm/envio/src/Persistence.res +++ b/codegenerator/cli/npm/envio/src/Persistence.res @@ -43,7 +43,7 @@ type storage = { // Should initialize the storage so we can start interacting with it // Eg create connection, schema, tables, etc. initialize: ( - ~chainConfigs: array=?, + ~chainConfigs: array=?, ~entities: array=?, ~enums: array>=?, ) => promise, @@ -91,7 +91,10 @@ type t = { allEntities: array, allEnums: array>, mutable storageStatus: storageStatus, - storage: storage, + mutable storage: storage, + // FIXME: This is temporary to move it library + // Should be a part of the storage interface and db agnostic + mutable sql: Postgres.sql, } let entityHistoryActionEnumConfig: Internal.enumConfig = { @@ -106,6 +109,7 @@ let make = ( // TODO: Should only pass userEnums and create internal config in runtime ~allEnums, ~storage, + ~sql, ) => { let allEntities = userEntities->Js.Array2.concat([InternalTable.DynamicContractRegistry.config]) let allEnums = @@ -116,6 +120,7 @@ let make = ( allEnums, storageStatus: Unknown, storage, + sql, } } diff --git a/codegenerator/cli/npm/envio/src/TestIndexer.res b/codegenerator/cli/npm/envio/src/TestIndexer.res new file mode 100644 index 000000000..5e1164f8f --- /dev/null +++ b/codegenerator/cli/npm/envio/src/TestIndexer.res @@ -0,0 +1,208 @@ +type chainRange = {startBlock: int, endBlock: int} + +type state = Unititialized | Running | Finished + +type t = { + process: dict => promise, + history: unit => promise>>, +} + +let factory = ( + ~registerAllHandlers, + ~makeGeneratedConfig, + ~makePgClient, + ~makeStorage, + ~codegenPersistence, + ~createGlobalStateAndRun, +) => { + () => { + let state = ref(Unititialized) + + let registrations: EventRegister.registrations = registerAllHandlers() + let config: Config.t = { + let config = makeGeneratedConfig() + + { + ...config, + shouldRollbackOnReorg: true, + shouldSaveFullHistory: true, + } + } + + let sql = makePgClient() + let pgSchema = "envio_internal_test_indexer" + let storage = makeStorage(~sql, ~pgSchema, ~isHasuraEnabled=false) + let persistence: Persistence.t = { + ...codegenPersistence, + storageStatus: Persistence.Unknown, + storage, + sql, + } + + let process = async chainsToRun => { + switch state.contents { + | Unititialized => + state.contents = Running + + let runningChains = [] + + let chainsToRunKeys = chainsToRun->Js.Dict.keys + if chainsToRunKeys->Utils.Array.isEmpty { + Js.Exn.raiseError("No chains to run") + } + chainsToRunKeys->Js.Array2.forEach(key => { + switch key->Belt.Int.fromString { + | None => Js.Exn.raiseError("Invalid chain key") + | Some(chainId) => + // It'll throw with invalid chain Id + let _ = config->Config.getChain(~chainId) + runningChains + ->Js.Array2.push(ChainMap.Chain.makeUnsafe(~chainId)) + ->ignore + } + }) + + let chainMap = config.chainMap->ChainMap.mapWithKey((chain, chainConfig) => { + let chainToRun = chainsToRun->Utils.Dict.dangerouslyGetByIntNonOption(chainConfig.id) + + switch chainToRun { + | Some(chainToRun) => { + if chainConfig.startBlock > chainToRun.startBlock { + Js.Exn.raiseError("Start block is greater than the start block of the chain") + } + switch chainConfig.endBlock { + | Some(endBlock) => + if endBlock < chainToRun.endBlock { + Js.Exn.raiseError("End block is less than the end block of the chain") + } + | None => () + } + + { + ...chainConfig, + startBlock: chainToRun.startBlock, + endBlock: chainToRun.endBlock, + sources: chainConfig.sources, + maxReorgDepth: 0, // We want always be in reorg threshold + } + } + | None => { + ...chainConfig, + sources: [ + { + name: "MockSource", + sourceFor: Sync, + poweredByHyperSync: false, + chain, + pollingInterval: 1_000_000_000, + getBlockHashes: (~blockNumbers as _, ~logger as _) => { + Js.Exn.raiseError("Not implemented") + }, + getHeightOrThrow: () => Promise.make((_, _) => ()), + getItemsOrThrow: ( + ~fromBlock as _, + ~toBlock as _, + ~addressesByContractName as _, + ~indexingContracts as _, + ~currentBlockHeight as _, + ~partitionId as _, + ~selection as _, + ~retry as _, + ~logger as _, + ) => { + Js.Exn.raiseError("Not implemented") + }, + }, + ], + } + } + }) + + let config = { + ...config, + chainMap, + } + + await persistence->Persistence.init( + ~chainConfigs=config.chainMap->ChainMap.values, + ~reset=true, + ) + + let indexer = { + Indexer.registrations, + config, + persistence, + } + + await createGlobalStateAndRun(~indexer, ~runningChains) + + state.contents = Finished + | Running => Js.Exn.raiseError("Test indexer is already running") + | Finished => Js.Exn.raiseError("Test indexer has already finished") + } + } + let history = async () => { + switch state.contents { + | Unititialized + | Running => + Js.Dict.empty() + | Finished => + let data = Js.Dict.fromArray([ + ( + "checkpoints", + await sql + ->Postgres.unsafe( + PgStorage.makeLoadAllQuery( + ~pgSchema, + ~tableName=InternalTable.Checkpoints.table.tableName, + ), + ) + ->(Utils.magic: promise => promise>), + ), + ]) + let _ = await Promise.all( + persistence.allEntities->Js.Array2.map(entityConfig => { + sql + ->Postgres.unsafe( + PgStorage.makeLoadAllQuery( + ~pgSchema, + ~tableName=entityConfig.entityHistory.table.tableName, + ), + ) + ->Promise.thenResolve(items => { + data->Js.Dict.set( + entityConfig.name, + items + ->S.parseOrThrow( + S.array( + S.union([ + entityConfig.entityHistory.setUpdateSchema, + S.object( + (s): EntityHistory.entityUpdate<'entity> => { + s.tag(EntityHistory.changeFieldName, EntityHistory.RowAction.DELETE) + { + entityId: s.field("id", S.string), + checkpointId: s.field(EntityHistory.checkpointIdFieldName, S.int), + entityUpdateAction: Delete, + } + }, + ), + ]), + ), + ) + ->( + Utils.magic: array> => array + ), + ) + }) + }), + ) + data + } + } + { + process, + history, + } + } +} diff --git a/codegenerator/cli/npm/envio/src/db/InternalTable.res b/codegenerator/cli/npm/envio/src/db/InternalTable.res index 105b1a5cd..bbaef394b 100644 --- a/codegenerator/cli/npm/envio/src/db/InternalTable.res +++ b/codegenerator/cli/npm/envio/src/db/InternalTable.res @@ -165,7 +165,7 @@ module Chains = { ], ) - let initialFromConfig = (chainConfig: InternalConfig.chain) => { + let initialFromConfig = (chainConfig: Config.chain) => { { id: chainConfig.id, startBlock: chainConfig.startBlock, @@ -182,7 +182,7 @@ module Chains = { } } - let makeInitialValuesQuery = (~pgSchema, ~chainConfigs: array) => { + let makeInitialValuesQuery = (~pgSchema, ~chainConfigs: array) => { if chainConfigs->Array.length === 0 { None } else { diff --git a/codegenerator/cli/npm/envio/src/sources/RpcSource.res b/codegenerator/cli/npm/envio/src/sources/RpcSource.res index 12a3cf595..9d49f8055 100644 --- a/codegenerator/cli/npm/envio/src/sources/RpcSource.res +++ b/codegenerator/cli/npm/envio/src/sources/RpcSource.res @@ -218,7 +218,7 @@ let getNextPage = ( ~addresses, ~topicQuery, ~loadBlock, - ~syncConfig as sc: InternalConfig.sourceSync, + ~syncConfig as sc: Config.sourceSync, ~provider, ~mutSuggestedBlockIntervals, ~partitionId, @@ -465,7 +465,7 @@ let sanitizeUrl = (url: string) => { type options = { sourceFor: Source.sourceFor, - syncConfig: InternalConfig.sourceSync, + syncConfig: Config.sourceSync, url: string, chain: ChainMap.Chain.t, contracts: array, diff --git a/codegenerator/cli/src/hbs_templating/codegen_templates.rs b/codegenerator/cli/src/hbs_templating/codegen_templates.rs index 615f85d22..ad9cb303b 100644 --- a/codegenerator/cli/src/hbs_templating/codegen_templates.rs +++ b/codegenerator/cli/src/hbs_templating/codegen_templates.rs @@ -1098,7 +1098,7 @@ impl NetworkConfigTemplate { } MainEvmDataSource::Rpc(rpc) => { format!( - "Rpc({{syncConfig: Config.getSyncConfig({})}})", + "Rpc({{syncConfig: NetworkSources.getSyncConfig({})}})", rpc_to_sync_config_options(rpc) ) } @@ -1285,6 +1285,7 @@ pub struct ProjectTemplate { preload_handlers: bool, envio_version: String, types_code: String, + ts_types_code: String, //Used for the package.json reference to handlers in generated relative_path_to_root_from_generated: String, lowercase_addresses: bool, @@ -1379,12 +1380,43 @@ impl ProjectTemplate { type chainId = int @genType -type chain = [{chain_id_type}]"#, +type chain = [{chain_id_type}] + +@genType.import(("./Types.ts", "TestIndexerProcessOptions")) +type testIndexerProcessOptions = dict + +@genType +type rec testIndexer = {{ + process: testIndexerProcessOptions => promise, + history: unit => promise>>, +}} +"#, chain_id_type = chain_configs .iter() .map(|chain_config| format!("#{}", chain_config.network_config.id)) .collect::>() - .join(" | ") + .join(" | "), + ); + + let ts_types_code = format!( + r#" +export type TestIndexerChainRange = {{ + startBlock: number, + endBlock: number, +}} + +export type TestIndexerProcessOptions = {{ +{test_indexer_process_options_body} +}} +"#, + test_indexer_process_options_body = chain_configs + .iter() + .map(|chain_config| format!( + " {}?: TestIndexerChainRange", + chain_config.network_config.id + )) + .collect::>() + .join(",\n"), ); Ok(ProjectTemplate { @@ -1407,6 +1439,7 @@ type chain = [{chain_id_type}]"#, preload_handlers: cfg.preload_handlers, envio_version: get_envio_version()?, types_code, + ts_types_code, //Used for the package.json reference to handlers in generated relative_path_to_root_from_generated, lowercase_addresses: cfg.lowercase_addresses, @@ -1533,7 +1566,7 @@ mod test { codegen_contracts: vec![contract1], is_fuel: false, sources_code: "NetworkSources.evm(~chain, ~contracts=[{name: \"Contract1\",events: [Types.Contract1.NewGravatar.register(), Types.Contract1.UpdatedGravatar.register()],abi: Types.Contract1.abi}], ~hyperSync=None, ~allEventSignatures=[Types.Contract1.eventSignatures]->Belt.Array.concatMany, ~shouldUseHypersyncClientDecoder=true, ~rpcs=[{url: \"https://eth.com\", sourceFor: Sync, syncConfig: {accelerationAdditive: 2000,initialBlockInterval: 10000,backoffMultiplicative: 0.8,intervalCeiling: 10000,backoffMillis: 5000,queryTimeoutMillis: 20000,}}], ~lowercaseAddresses=false)".to_string(), - deprecated_sync_source_code: "Rpc({syncConfig: Config.getSyncConfig({accelerationAdditive: 2000,initialBlockInterval: 10000,backoffMultiplicative: 0.8,intervalCeiling: 10000,backoffMillis: 5000,queryTimeoutMillis: 20000,})})".to_string(), + deprecated_sync_source_code: "Rpc({syncConfig: NetworkSources.getSyncConfig({accelerationAdditive: 2000,initialBlockInterval: 10000,backoffMultiplicative: 0.8,intervalCeiling: 10000,backoffMillis: 5000,queryTimeoutMillis: 20000,})})".to_string(), }; let expected_chain_configs = vec![chain_config_1]; @@ -1588,14 +1621,14 @@ mod test { codegen_contracts: vec![contract1], is_fuel: false, sources_code: "NetworkSources.evm(~chain, ~contracts=[{name: \"Contract1\",events: [Types.Contract1.NewGravatar.register(), Types.Contract1.UpdatedGravatar.register()],abi: Types.Contract1.abi}], ~hyperSync=None, ~allEventSignatures=[Types.Contract1.eventSignatures]->Belt.Array.concatMany, ~shouldUseHypersyncClientDecoder=true, ~rpcs=[{url: \"https://eth.com\", sourceFor: Sync, syncConfig: {accelerationAdditive: 2000,initialBlockInterval: 10000,backoffMultiplicative: 0.8,intervalCeiling: 10000,backoffMillis: 5000,queryTimeoutMillis: 20000,}}], ~lowercaseAddresses=false)".to_string(), - deprecated_sync_source_code: "Rpc({syncConfig: Config.getSyncConfig({accelerationAdditive: 2000,initialBlockInterval: 10000,backoffMultiplicative: 0.8,intervalCeiling: 10000,backoffMillis: 5000,queryTimeoutMillis: 20000,})})".to_string(), + deprecated_sync_source_code: "Rpc({syncConfig: NetworkSources.getSyncConfig({accelerationAdditive: 2000,initialBlockInterval: 10000,backoffMultiplicative: 0.8,intervalCeiling: 10000,backoffMillis: 5000,queryTimeoutMillis: 20000,})})".to_string(), }; let chain_config_2 = super::NetworkConfigTemplate { network_config: network2, codegen_contracts: vec![contract2], is_fuel: false, sources_code: "NetworkSources.evm(~chain, ~contracts=[{name: \"Contract2\",events: [Types.Contract2.NewGravatar.register(), Types.Contract2.UpdatedGravatar.register()],abi: Types.Contract2.abi}], ~hyperSync=None, ~allEventSignatures=[Types.Contract2.eventSignatures]->Belt.Array.concatMany, ~shouldUseHypersyncClientDecoder=true, ~rpcs=[{url: \"https://eth.com\", sourceFor: Sync, syncConfig: {accelerationAdditive: 2000,initialBlockInterval: 10000,backoffMultiplicative: 0.8,intervalCeiling: 10000,backoffMillis: 5000,queryTimeoutMillis: 20000,}}, {url: \"https://eth.com/fallback\", sourceFor: Sync, syncConfig: {accelerationAdditive: 2000,initialBlockInterval: 10000,backoffMultiplicative: 0.8,intervalCeiling: 10000,backoffMillis: 5000,queryTimeoutMillis: 20000,}}], ~lowercaseAddresses=false)".to_string(), - deprecated_sync_source_code: "Rpc({syncConfig: Config.getSyncConfig({accelerationAdditive: 2000,initialBlockInterval: 10000,backoffMultiplicative: 0.8,intervalCeiling: 10000,backoffMillis: 5000,queryTimeoutMillis: 20000,})})".to_string(), + deprecated_sync_source_code: "Rpc({syncConfig: NetworkSources.getSyncConfig({accelerationAdditive: 2000,initialBlockInterval: 10000,backoffMultiplicative: 0.8,intervalCeiling: 10000,backoffMillis: 5000,queryTimeoutMillis: 20000,})})".to_string(), }; let expected_chain_configs = vec![chain_config_1, chain_config_2]; diff --git a/codegenerator/cli/templates/dynamic/codegen/index.d.ts.hbs b/codegenerator/cli/templates/dynamic/codegen/index.d.ts.hbs index f73d8e008..6907a6d3f 100644 --- a/codegenerator/cli/templates/dynamic/codegen/index.d.ts.hbs +++ b/codegenerator/cli/templates/dynamic/codegen/index.d.ts.hbs @@ -10,15 +10,16 @@ import { {{contract.name.capitalized}}, {{/each}} MockDb, - Addresses + Addresses } from "./src/TestHelpers.gen"; +export { createTestIndexer } from "./src/TestHelpers.gen"; export const TestHelpers = { {{#each codegen_contracts as |contract|}} {{contract.name.capitalized}}, {{/each}} MockDb, - Addresses + Addresses }; export { diff --git a/codegenerator/cli/templates/dynamic/codegen/src/ConfigYAML.res.hbs b/codegenerator/cli/templates/dynamic/codegen/src/ConfigYAML.res.hbs index 53c41730b..b656ae63d 100644 --- a/codegenerator/cli/templates/dynamic/codegen/src/ConfigYAML.res.hbs +++ b/codegenerator/cli/templates/dynamic/codegen/src/ConfigYAML.res.hbs @@ -4,7 +4,7 @@ type hyperFuelConfig = {endpointUrl: string} @genType.opaque type rpcConfig = { - syncConfig: InternalConfig.sourceSync, + syncConfig: Config.sourceSync, } @genType diff --git a/codegenerator/cli/templates/dynamic/codegen/src/RegisterHandlers.res.hbs b/codegenerator/cli/templates/dynamic/codegen/src/Generated.res.hbs similarity index 51% rename from codegenerator/cli/templates/dynamic/codegen/src/RegisterHandlers.res.hbs rename to codegenerator/cli/templates/dynamic/codegen/src/Generated.res.hbs index 5ff0d00ce..757550e31 100644 --- a/codegenerator/cli/templates/dynamic/codegen/src/RegisterHandlers.res.hbs +++ b/codegenerator/cli/templates/dynamic/codegen/src/Generated.res.hbs @@ -29,7 +29,7 @@ let makeGeneratedConfig = () => { let contracts = [ {{#each chain_config.codegen_contracts as | contract |}} { - InternalConfig.name: "{{contract.name.capitalized}}", + Config.name: "{{contract.name.capitalized}}", abi: Types.{{contract.name.capitalized}}.abi, addresses: [ {{#each contract.addresses as | address |}} @@ -55,7 +55,7 @@ let makeGeneratedConfig = () => { ] let chain = ChainMap.Chain.makeUnsafe(~chainId={{chain_config.network_config.id}}) { - InternalConfig.maxReorgDepth: {{chain_config.network_config.confirmed_block_threshold}}, + Config.maxReorgDepth: {{chain_config.network_config.confirmed_block_threshold}}, startBlock: {{chain_config.network_config.start_block}}, {{#if chain_config.network_config.end_block}} endBlock: {{chain_config.network_config.end_block}}, @@ -71,7 +71,17 @@ let makeGeneratedConfig = () => { Config.make( ~shouldRollbackOnReorg={{should_rollback_on_reorg}}, ~shouldSaveFullHistory={{should_save_full_history}}, - ~isUnorderedMultichainMode={{is_unordered_multichain_mode}}, + ~multichain=if ( + Env.Configurable.isUnorderedMultichainMode->Belt.Option.getWithDefault( + Env.Configurable.unstable__temp_unordered_head_mode->Belt.Option.getWithDefault( + {{is_unordered_multichain_mode}}, + ), + ) + ) { + Unordered + } else { + Ordered + }, ~chains, ~enableRawEvents={{enable_raw_events}}, ~batchSize=?Env.batchSize, @@ -84,12 +94,9 @@ let makeGeneratedConfig = () => { ) } -%%private( - let config: ref> = ref(None) -) +let configWithoutRegistrations = makeGeneratedConfig() let registerAllHandlers = () => { - let configWithoutRegistrations = makeGeneratedConfig() EventRegister.startRegistration( ~ecosystem=configWithoutRegistrations.ecosystem, ~multichain=configWithoutRegistrations.multichain, @@ -104,22 +111,96 @@ let registerAllHandlers = () => { ) {{/each}} - let generatedConfig = { - // Need to recreate initial config one more time, - // since configWithoutRegistrations called register for event - // before they were ready - ...makeGeneratedConfig(), - registrations: Some(EventRegister.finishRegistration()), - } - config := Some(generatedConfig) - generatedConfig + EventRegister.finishRegistration() } -let getConfig = () => { - switch config.contents { - | Some(config) => config - | None => registerAllHandlers() - } +let initialSql = Db.makeClient() +let storagePgSchema = Env.Db.publicSchema +let makeStorage = (~sql, ~pgSchema=storagePgSchema, ~isHasuraEnabled=Env.Hasura.enabled) => { + PgStorage.make( + ~sql, + ~pgSchema, + ~pgHost=Env.Db.host, + ~pgUser=Env.Db.user, + ~pgPort=Env.Db.port, + ~pgDatabase=Env.Db.database, + ~pgPassword=Env.Db.password, + ~onInitialize=?{ + if isHasuraEnabled { + Some( + () => { + Hasura.trackDatabase( + ~endpoint=Env.Hasura.graphqlEndpoint, + ~auth={ + role: Env.Hasura.role, + secret: Env.Hasura.secret, + }, + ~pgSchema=storagePgSchema, + ~userEntities=Entities.userEntities, + ~responseLimit=Env.Hasura.responseLimit, + ~schema=Db.schema, + ~aggregateEntities=Env.Hasura.aggregateEntities, + )->Promise.catch(err => { + Logging.errorWithExn( + err->Utils.prettifyExn, + `EE803: Error tracking tables`, + )->Promise.resolve + }) + }, + ) + } else { + None + } + }, + ~onNewTables=?{ + if isHasuraEnabled { + Some( + (~tableNames) => { + Hasura.trackTables( + ~endpoint=Env.Hasura.graphqlEndpoint, + ~auth={ + role: Env.Hasura.role, + secret: Env.Hasura.secret, + }, + ~pgSchema=storagePgSchema, + ~tableNames, + )->Promise.catch(err => { + Logging.errorWithExn( + err->Utils.prettifyExn, + `EE804: Error tracking new tables`, + )->Promise.resolve + }) + }, + ) + } else { + None + } + }, + ~isHasuraEnabled, + ) } -let getConfigWithoutRegistrations = makeGeneratedConfig +let codegenPersistence = Persistence.make( + ~userEntities=Entities.userEntities, + ~allEnums=Enums.allEnums, + ~storage=makeStorage(~sql=initialSql), + ~sql=initialSql, +) + +%%private(let indexer: ref> = ref(None)) +let getIndexer = () => { + switch indexer.contents { + | Some(indexer) => indexer + | None => + let i = { + Indexer.registrations: registerAllHandlers(), + // Need to recreate initial config one more time, + // since configWithoutRegistrations called register for event + // before they were ready + config: makeGeneratedConfig(), + persistence: codegenPersistence, + } + indexer := Some(i) + i + } +} diff --git a/codegenerator/cli/templates/dynamic/codegen/src/Path.res.hbs b/codegenerator/cli/templates/dynamic/codegen/src/Path.res.hbs index 3afcbc9cb..2ca19e4ab 100644 --- a/codegenerator/cli/templates/dynamic/codegen/src/Path.res.hbs +++ b/codegenerator/cli/templates/dynamic/codegen/src/Path.res.hbs @@ -1 +1 @@ -let relativePathToRootFromGenerated = "{{relative_path_to_root_from_generated}}" \ No newline at end of file +let relativePathToRootFromGenerated = "{{relative_path_to_root_from_generated}}" diff --git a/codegenerator/cli/templates/dynamic/codegen/src/TestHelpers.res.hbs b/codegenerator/cli/templates/dynamic/codegen/src/TestHelpers.res.hbs index 34e17f5cc..d5ac4e7c6 100644 --- a/codegenerator/cli/templates/dynamic/codegen/src/TestHelpers.res.hbs +++ b/codegenerator/cli/templates/dynamic/codegen/src/TestHelpers.res.hbs @@ -121,14 +121,13 @@ module EventFunctions = { mockEventData->Belt.Option.getWithDefault({}) let block = block->Belt.Option.getWithDefault({})->MockBlock.toBlock let transaction = transaction->Belt.Option.getWithDefault({})->MockTransaction.toTransaction - let config = RegisterHandlers.getConfig() let event: Internal.event = { params, transaction, chainId: switch chainId { | Some(chainId) => chainId | None => - switch config.defaultChain { + switch Generated.configWithoutRegistrations.defaultChain { | Some(chainConfig) => chainConfig.id | None => Js.Exn.raiseError( @@ -214,3 +213,46 @@ module {{contract.name.capitalized}} = { } {{/each}} + +@genType +let createTestIndexer = TestIndexer.factory( + ~registerAllHandlers=Generated.registerAllHandlers, + ~makeGeneratedConfig=Generated.makeGeneratedConfig, + ~makePgClient=Db.makeClient, + ~makeStorage=(~sql, ~pgSchema, ~isHasuraEnabled) => + Generated.makeStorage(~sql, ~pgSchema, ~isHasuraEnabled), + ~codegenPersistence=Generated.codegenPersistence, + // Have it here, since the modules are not available from npm + ~createGlobalStateAndRun=(~indexer, ~runningChains) => { + Promise.makeAsync(async (resolve, _reject) => { + let chainManager = await ChainManager.makeFromDbState( + ~initialState=indexer.persistence->Persistence.getInitializedState, + ~config=indexer.config, + ~registrations=indexer.registrations, + ~persistence=indexer.persistence, + ) + let globalState = GlobalState.make(~indexer, ~chainManager, ~shouldUseTui=false) // FIXME: Should replace use TUI with keep alive on finish + let gsManager = globalState->GlobalStateManager.make + + gsManager->GlobalStateManager.dispatchTask(NextQuery(CheckAllChains)) + + while { + let state = gsManager->GlobalStateManager.getState + runningChains->Js.Array2.some(chain => { + let chainFetcher = state.chainManager.chainFetchers->ChainMap.get(chain) + chainFetcher.committedProgressBlockNumber < + chainFetcher.fetchState.endBlock->Belt.Option.getUnsafe + }) + } { + await Utils.delay(1) + } + let state = gsManager->GlobalStateManager.getState + // Finish it + gsManager->GlobalStateManager.setState({ + ...gsManager->GlobalStateManager.getState, + id: state.id + 1, + }) + resolve() + }) + }, +)->(Utils.magic: (unit => TestIndexer.t) => unit => Types.testIndexer) diff --git a/codegenerator/cli/templates/dynamic/codegen/src/TestHelpers_MockDb.res.hbs b/codegenerator/cli/templates/dynamic/codegen/src/TestHelpers_MockDb.res.hbs index 256c85c6a..c442a193a 100644 --- a/codegenerator/cli/templates/dynamic/codegen/src/TestHelpers_MockDb.res.hbs +++ b/codegenerator/cli/templates/dynamic/codegen/src/TestHelpers_MockDb.res.hbs @@ -42,7 +42,7 @@ let deleteDictKey: (dict<'a>, string) => unit = %raw(` } `) -let config = RegisterHandlers.getConfigWithoutRegistrations() +let config = Generated.configWithoutRegistrations EventRegister.startRegistration( ~ecosystem=config.ecosystem, ~multichain=config.multichain, @@ -322,17 +322,19 @@ and makeProcessEvents = (mockDb: t, ~chainId=?) => async ( } else { let itemsWithContractRegister = [] - let registrations = EventRegister.finishRegistration() + let registrations = EventRegister.finishRegistration() - let config = if ( + let generatedIndexer = Generated.getIndexer() + let indexer: Indexer.t = if ( registrations.hasEvents || !(registrations.onBlockByChainId->Utils.Dict.isEmpty) ) { { - ...RegisterHandlers.makeGeneratedConfig(), - registrations: Some(registrations), + ...generatedIndexer, + config: Generated.makeGeneratedConfig(), + registrations, } } else { - RegisterHandlers.registerAllHandlers() + generatedIndexer } let processingChainId = ref(chainId) @@ -390,6 +392,7 @@ and makeProcessEvents = (mockDb: t, ~chainId=?) => async ( let chainFetcher = ChainFetcher.makeFromConfig( config.chainMap->ChainMap.get(processingChain), ~config, + ~registrations=indexer.registrations, ~targetBufferSize=5000, ) @@ -402,7 +405,7 @@ and makeProcessEvents = (mockDb: t, ~chainId=?) => async ( let inMemoryStore = InMemoryStore.make() let loadManager = LoadManager.make() let persistence = { - ...config.persistence, + ...indexer.persistence, storage: makeMockStorage(mockDb), storageStatus: Ready({ cleanRun: false, @@ -412,8 +415,8 @@ and makeProcessEvents = (mockDb: t, ~chainId=?) => async ( checkpointId: 0, }), } - let config = { - ...config, + let indexer = { + ...indexer, persistence, } @@ -505,11 +508,16 @@ and makeProcessEvents = (mockDb: t, ~chainId=?) => async ( chains->Js.Dict.set(processingChainId->Int.toString, {Internal.isReady: true}) try { - await batch->EventProcessing.preloadBatchOrThrow(~loadManager, ~persistence, ~inMemoryStore, ~chains) + await batch->EventProcessing.preloadBatchOrThrow( + ~loadManager, + ~persistence, + ~inMemoryStore, + ~chains, + ) await batch->EventProcessing.runBatchHandlersOrThrow( ~inMemoryStore, ~loadManager, - ~config, + ~indexer, ~shouldSaveHistory=false, ~shouldBenchmark=false, ~chains, @@ -602,7 +610,7 @@ writeFromMemoryStore = (mockDb: t, ~inMemoryStore: InMemoryStore.t) => { }, ) - Config.codegenPersistence.allEntities->Array.forEach(entityConfig => { + Generated.codegenPersistence.allEntities->Array.forEach(entityConfig => { mockDb->executeRowsEntity(~inMemoryStore, ~entityConfig) }) } diff --git a/codegenerator/cli/templates/dynamic/codegen/src/Types.ts.hbs b/codegenerator/cli/templates/dynamic/codegen/src/Types.ts.hbs index c72f5dc5b..1f6869588 100644 --- a/codegenerator/cli/templates/dynamic/codegen/src/Types.ts.hbs +++ b/codegenerator/cli/templates/dynamic/codegen/src/Types.ts.hbs @@ -121,3 +121,4 @@ export type HandlerContext = { {{/each}} }; {{/unless}} +{{ts_types_code}} \ No newline at end of file diff --git a/codegenerator/cli/templates/static/codegen/index.js b/codegenerator/cli/templates/static/codegen/index.js index 075fc5440..1259cce2d 100644 --- a/codegenerator/cli/templates/static/codegen/index.js +++ b/codegenerator/cli/templates/static/codegen/index.js @@ -8,6 +8,7 @@ const BigDecimal = require("bignumber.js"); module.exports = { ...handlers, + createTestIndexer: TestHelpers.createTestIndexer, BigDecimal, TestHelpers, }; diff --git a/codegenerator/cli/templates/static/codegen/src/Config.res b/codegenerator/cli/templates/static/codegen/src/Config.res deleted file mode 100644 index 41f480396..000000000 --- a/codegenerator/cli/templates/static/codegen/src/Config.res +++ /dev/null @@ -1,219 +0,0 @@ -open Belt - -let getSyncConfig = ( - { - ?initialBlockInterval, - ?backoffMultiplicative, - ?accelerationAdditive, - ?intervalCeiling, - ?backoffMillis, - ?queryTimeoutMillis, - ?fallbackStallTimeout, - }: InternalConfig.sourceSyncOptions, -): InternalConfig.sourceSync => { - let queryTimeoutMillis = queryTimeoutMillis->Option.getWithDefault(20_000) - { - initialBlockInterval: Env.Configurable.SyncConfig.initialBlockInterval->Option.getWithDefault( - initialBlockInterval->Option.getWithDefault(10_000), - ), - // After an RPC error, how much to scale back the number of blocks requested at once - backoffMultiplicative: Env.Configurable.SyncConfig.backoffMultiplicative->Option.getWithDefault( - backoffMultiplicative->Option.getWithDefault(0.8), - ), - // Without RPC errors or timeouts, how much to increase the number of blocks requested by for the next batch - accelerationAdditive: Env.Configurable.SyncConfig.accelerationAdditive->Option.getWithDefault( - accelerationAdditive->Option.getWithDefault(500), - ), - // Do not further increase the block interval past this limit - intervalCeiling: Env.Configurable.SyncConfig.intervalCeiling->Option.getWithDefault( - intervalCeiling->Option.getWithDefault(10_000), - ), - // After an error, how long to wait before retrying - backoffMillis: backoffMillis->Option.getWithDefault(5000), - // How long to wait before cancelling an RPC request - queryTimeoutMillis, - fallbackStallTimeout: fallbackStallTimeout->Option.getWithDefault(queryTimeoutMillis / 2), - } -} - -let storagePgSchema = Env.Db.publicSchema -let makeStorage = (~sql=Db.sql, ~pgSchema=storagePgSchema, ~isHasuraEnabled=Env.Hasura.enabled) => { - PgStorage.make( - ~sql, - ~pgSchema, - ~pgHost=Env.Db.host, - ~pgUser=Env.Db.user, - ~pgPort=Env.Db.port, - ~pgDatabase=Env.Db.database, - ~pgPassword=Env.Db.password, - ~onInitialize=?{ - if isHasuraEnabled { - Some( - () => { - Hasura.trackDatabase( - ~endpoint=Env.Hasura.graphqlEndpoint, - ~auth={ - role: Env.Hasura.role, - secret: Env.Hasura.secret, - }, - ~pgSchema=storagePgSchema, - ~userEntities=Entities.userEntities, - ~responseLimit=Env.Hasura.responseLimit, - ~schema=Db.schema, - ~aggregateEntities=Env.Hasura.aggregateEntities, - )->Promise.catch(err => { - Logging.errorWithExn( - err->Utils.prettifyExn, - `EE803: Error tracking tables`, - )->Promise.resolve - }) - }, - ) - } else { - None - } - }, - ~onNewTables=?{ - if isHasuraEnabled { - Some( - (~tableNames) => { - Hasura.trackTables( - ~endpoint=Env.Hasura.graphqlEndpoint, - ~auth={ - role: Env.Hasura.role, - secret: Env.Hasura.secret, - }, - ~pgSchema=storagePgSchema, - ~tableNames, - )->Promise.catch(err => { - Logging.errorWithExn( - err->Utils.prettifyExn, - `EE804: Error tracking new tables`, - )->Promise.resolve - }) - }, - ) - } else { - None - } - }, - ~isHasuraEnabled, - ) -} - -let codegenPersistence = Persistence.make( - ~userEntities=Entities.userEntities, - ~allEnums=Enums.allEnums, - ~storage=makeStorage(), -) - -type t = { - historyConfig: InternalConfig.historyConfig, - multichain: InternalConfig.multichain, - chainMap: ChainMap.t, - defaultChain: option, - ecosystem: InternalConfig.ecosystem, - enableRawEvents: bool, - preloadHandlers: bool, - persistence: Persistence.t, - addContractNameToContractNameMapping: dict, - maxAddrInPartition: int, - registrations: option, - batchSize: int, - lowercaseAddresses: bool, -} - -let make = ( - ~shouldRollbackOnReorg=true, - ~shouldSaveFullHistory=false, - ~isUnorderedMultichainMode=false, - ~chains: array=[], - ~enableRawEvents=false, - ~preloadHandlers=false, - ~persistence=codegenPersistence, - ~ecosystem=InternalConfig.Evm, - ~registrations=?, - ~batchSize=5000, - ~lowercaseAddresses=false, - ~shouldUseHypersyncClientDecoder=true, -) => { - // Validate that lowercase addresses is not used with viem decoder - if lowercaseAddresses && !shouldUseHypersyncClientDecoder { - Js.Exn.raiseError( - "lowercase addresses is not supported when event_decoder is 'viem'. Please set event_decoder to 'hypersync-client' or change address_format to 'checksum'.", - ) - } - - let chainMap = - chains - ->Js.Array2.map(n => { - (ChainMap.Chain.makeUnsafe(~chainId=n.id), n) - }) - ->ChainMap.fromArrayUnsafe - - // Build the contract name mapping for efficient lookup - let addContractNameToContractNameMapping = Js.Dict.empty() - chains->Array.forEach(chainConfig => { - chainConfig.contracts->Array.forEach(contract => { - let addKey = "add" ++ contract.name->Utils.String.capitalize - addContractNameToContractNameMapping->Js.Dict.set(addKey, contract.name) - }) - }) - - { - historyConfig: { - rollbackFlag: shouldRollbackOnReorg ? RollbackOnReorg : NoRollback, - historyFlag: shouldSaveFullHistory ? FullHistory : MinHistory, - }, - multichain: if ( - Env.Configurable.isUnorderedMultichainMode->Option.getWithDefault( - Env.Configurable.unstable__temp_unordered_head_mode->Option.getWithDefault( - isUnorderedMultichainMode, - ), - ) - ) { - Unordered - } else { - Ordered - }, - chainMap, - defaultChain: chains->Array.get(0), - enableRawEvents, - persistence, - ecosystem, - addContractNameToContractNameMapping, - maxAddrInPartition: Env.maxAddrInPartition, - registrations, - preloadHandlers, - batchSize, - lowercaseAddresses, - } -} - -let shouldRollbackOnReorg = config => - switch config.historyConfig { - | {rollbackFlag: RollbackOnReorg} => true - | _ => false - } - -let shouldSaveHistory = (config, ~isInReorgThreshold) => - switch config.historyConfig { - | {rollbackFlag: RollbackOnReorg} if isInReorgThreshold => true - | {historyFlag: FullHistory} => true - | _ => false - } - -let shouldPruneHistory = (config, ~isInReorgThreshold) => - switch config.historyConfig { - | {rollbackFlag: RollbackOnReorg, historyFlag: MinHistory} if isInReorgThreshold => true - | _ => false - } - -let getChain = (config, ~chainId) => { - let chain = ChainMap.Chain.makeUnsafe(~chainId) - config.chainMap->ChainMap.has(chain) - ? chain - : Js.Exn.raiseError( - "No chain with id " ++ chain->ChainMap.Chain.toString ++ " found in config.yaml", - ) -} diff --git a/codegenerator/cli/templates/static/codegen/src/EventProcessing.res b/codegenerator/cli/templates/static/codegen/src/EventProcessing.res index c6896170c..bc394e254 100644 --- a/codegenerator/cli/templates/static/codegen/src/EventProcessing.res +++ b/codegenerator/cli/templates/static/codegen/src/EventProcessing.res @@ -162,7 +162,7 @@ let runHandlerOrThrow = async ( ~checkpointId, ~inMemoryStore, ~loadManager, - ~config: Config.t, + ~indexer: Indexer.t, ~shouldSaveHistory, ~shouldBenchmark, ~chains: Internal.chains, @@ -174,7 +174,7 @@ let runHandlerOrThrow = async ( item, inMemoryStore, loadManager, - persistence: config.persistence, + persistence: indexer.persistence, shouldSaveHistory, checkpointId, isPreload: false, @@ -211,7 +211,7 @@ let runHandlerOrThrow = async ( ~checkpointId, ~inMemoryStore, ~loadManager, - ~persistence=config.persistence, + ~persistence=indexer.persistence, ~shouldSaveHistory, ~shouldBenchmark, ~chains, @@ -219,7 +219,7 @@ let runHandlerOrThrow = async ( | None => () } - if config.enableRawEvents { + if indexer.config.enableRawEvents { item->Internal.castUnsafeEventItem->addItemToRawEvents(~inMemoryStore) } } @@ -314,7 +314,7 @@ let runBatchHandlersOrThrow = async ( batch: Batch.t, ~inMemoryStore, ~loadManager, - ~config, + ~indexer, ~shouldSaveHistory, ~shouldBenchmark, ~chains: Internal.chains, @@ -334,7 +334,7 @@ let runBatchHandlersOrThrow = async ( ~checkpointId, ~inMemoryStore, ~loadManager, - ~config, + ~indexer, ~shouldSaveHistory, ~shouldBenchmark, ~chains, @@ -374,7 +374,7 @@ let processEventBatch = async ( ~inMemoryStore: InMemoryStore.t, ~isInReorgThreshold, ~loadManager, - ~config: Config.t, + ~indexer: Indexer.t, ~chainFetchers: ChainMap.t, ) => { let totalBatchSize = batch.totalBatchSize @@ -399,7 +399,7 @@ let processEventBatch = async ( if batch.items->Utils.Array.notEmpty { await batch->preloadBatchOrThrow( ~loadManager, - ~persistence=config.persistence, + ~persistence=indexer.persistence, ~inMemoryStore, ~chains, ) @@ -411,8 +411,8 @@ let processEventBatch = async ( await batch->runBatchHandlersOrThrow( ~inMemoryStore, ~loadManager, - ~config, - ~shouldSaveHistory=config->Config.shouldSaveHistory(~isInReorgThreshold), + ~indexer, + ~shouldSaveHistory=indexer.config->Config.shouldSaveHistory(~isInReorgThreshold), ~shouldBenchmark=Env.Benchmark.shouldSaveData, ~chains, ) @@ -422,11 +422,11 @@ let processEventBatch = async ( timeRef->Hrtime.timeSince->Hrtime.toMillis->Hrtime.intFromMillis let rec executeBatch = async (~escapeTables=?) => { - switch await Db.sql->IO.executeBatch( + switch await indexer.persistence.sql->IO.executeBatch( ~batch, ~inMemoryStore, ~isInReorgThreshold, - ~config, + ~indexer, ~escapeTables?, ) { | exception Persistence.StorageError({message, reason}) => diff --git a/codegenerator/cli/templates/static/codegen/src/IO.res b/codegenerator/cli/templates/static/codegen/src/IO.res index bee5b3040..7c53195e0 100644 --- a/codegenerator/cli/templates/static/codegen/src/IO.res +++ b/codegenerator/cli/templates/static/codegen/src/IO.res @@ -17,10 +17,10 @@ let executeBatch = async ( ~batch: Batch.t, ~inMemoryStore: InMemoryStore.t, ~isInReorgThreshold, - ~config, + ~indexer: Indexer.t, ~escapeTables=?, ) => { - let shouldSaveHistory = config->Config.shouldSaveHistory(~isInReorgThreshold) + let shouldSaveHistory = indexer.config->Config.shouldSaveHistory(~isInReorgThreshold) let specificError = ref(None) @@ -31,7 +31,7 @@ let executeBatch = async ( ~items, ~table=InternalTable.RawEvents.table, ~itemSchema=InternalTable.RawEvents.schema, - ~pgSchema=Config.storagePgSchema, + ~pgSchema=Generated.storagePgSchema, ) }, ~items=inMemoryStore.rawEvents->InMemoryTable.values, @@ -150,7 +150,7 @@ let executeBatch = async ( ~items=entitiesToSet, ~table=entityConfig.table, ~itemSchema=entityConfig.schema, - ~pgSchema=Config.storagePgSchema, + ~pgSchema=Generated.storagePgSchema, ), ) } @@ -298,7 +298,7 @@ let executeBatch = async ( ) }) Some( - config.persistence->Persistence.setEffectCacheOrThrow( + indexer.persistence->Persistence.setEffectCacheOrThrow( ~effect, ~items, ~invalidationsCount, @@ -326,7 +326,7 @@ let executeBatch = async ( } } -let prepareRollbackDiff = async (~rollbackTargetCheckpointId) => { +let prepareRollbackDiff = async (~persistence: Persistence.t, ~rollbackTargetCheckpointId) => { let inMemStore = InMemoryStore.make(~rollbackTargetCheckpointId) let deletedEntities = Js.Dict.empty() @@ -339,14 +339,14 @@ let prepareRollbackDiff = async (~rollbackTargetCheckpointId) => { let (removedIdsResult, restoredEntitiesResult) = await Promise.all2(( // Get IDs of entities that should be deleted (created after rollback target with no prior history) - Db.sql + persistence.sql ->Postgres.preparedUnsafe( entityConfig.entityHistory.makeGetRollbackRemovedIdsQuery(~pgSchema=Db.publicSchema), [rollbackTargetCheckpointId]->Utils.magic, ) ->(Utils.magic: promise => promise>), // Get entities that should be restored to their state at or before rollback target - Db.sql + persistence.sql ->Postgres.preparedUnsafe( entityConfig.entityHistory.makeGetRollbackRestoredEntitiesQuery( ~pgSchema=Db.publicSchema, diff --git a/codegenerator/cli/templates/static/codegen/src/Index.res b/codegenerator/cli/templates/static/codegen/src/Index.res index 1e538c249..bb933ac3d 100644 --- a/codegenerator/cli/templates/static/codegen/src/Index.res +++ b/codegenerator/cli/templates/static/codegen/src/Index.res @@ -54,7 +54,7 @@ let stateSchema = S.union([ })), ]) -let startServer = (~getState, ~config: Config.t, ~consoleBearerToken: option) => { +let startServer = (~getState, ~indexer: Indexer.t, ~consoleBearerToken: option) => { open Express let app = makeCjs() @@ -110,7 +110,7 @@ let startServer = (~getState, ~config: Config.t, ~consoleBearerToken: optionpost("/console/syncCache", (req, res) => { if req->checkIsAuthorizedConsole { - (config.persistence->Persistence.getInitializedStorageOrThrow).dumpEffectCache() + (indexer.persistence->Persistence.getInitializedStorageOrThrow).dumpEffectCache() ->Promise.thenResolve(_ => res->json(Boolean(true))) ->Promise.done } else { @@ -212,7 +212,7 @@ let makeAppState = (globalState: GlobalState.t): EnvioInkApp.appState => { ) }) { - config: globalState.config, + config: globalState.indexer.config, indexerStartTime: globalState.indexerStartTime, chains, } @@ -237,7 +237,7 @@ let main = async () => { let mainArgs: mainArgs = process->argv->Yargs.hideBin->Yargs.yargs->Yargs.argv let shouldUseTui = !(mainArgs.tuiOff->Belt.Option.getWithDefault(Env.tuiOffEnvVar)) - let config = RegisterHandlers.registerAllHandlers() + let indexer = Generated.getIndexer() let gsManagerRef = ref(None) @@ -248,10 +248,10 @@ let main = async () => { | Some(version) => Prometheus.Info.set(~version) | None => () } - Prometheus.RollbackEnabled.set(~enabled=config.historyConfig.rollbackFlag === RollbackOnReorg) + Prometheus.RollbackEnabled.set(~enabled=indexer.config.shouldRollbackOnReorg) startServer( - ~config, + ~indexer, ~consoleBearerToken={ // The most simple check to verify whether we are running in development mode // and prevent exposing the console to public, when creating a real deployment. @@ -305,8 +305,8 @@ let main = async () => { }), indexerStartTime: appState.indexerStartTime, isPreRegisteringDynamicContracts: false, - rollbackOnReorg: config.historyConfig.rollbackFlag === RollbackOnReorg, - isUnorderedMultichainMode: switch config.multichain { + rollbackOnReorg: indexer.config.shouldRollbackOnReorg, + isUnorderedMultichainMode: switch indexer.config.multichain { | Unordered => true | Ordered => false }, @@ -315,13 +315,17 @@ let main = async () => { }, ) - await config.persistence->Persistence.init(~chainConfigs=config.chainMap->ChainMap.values) + await indexer.persistence->Persistence.init( + ~chainConfigs=indexer.config.chainMap->ChainMap.values, + ) let chainManager = await ChainManager.makeFromDbState( - ~initialState=config.persistence->Persistence.getInitializedState, - ~config, + ~initialState=indexer.persistence->Persistence.getInitializedState, + ~config=indexer.config, + ~registrations=indexer.registrations, + ~persistence=indexer.persistence, ) - let globalState = GlobalState.make(~config, ~chainManager, ~shouldUseTui) + let globalState = GlobalState.make(~indexer, ~chainManager, ~shouldUseTui) let stateUpdatedHook = if shouldUseTui { let rerender = EnvioInkApp.startApp(makeAppState(globalState)) Some(globalState => globalState->makeAppState->rerender) diff --git a/codegenerator/cli/templates/static/codegen/src/db/Db.res b/codegenerator/cli/templates/static/codegen/src/db/Db.res index c3e7ae7d9..7c396d5e3 100644 --- a/codegenerator/cli/templates/static/codegen/src/db/Db.res +++ b/codegenerator/cli/templates/static/codegen/src/db/Db.res @@ -1,20 +1,27 @@ // This is a module with all the global configuration of the DB // Ideally it should be moved to the config and passed with it -let config: Postgres.poolConfig = { - host: Env.Db.host, - port: Env.Db.port, - username: Env.Db.user, - password: Env.Db.password, - database: Env.Db.database, - ssl: Env.Db.ssl, - // TODO: think how we want to pipe these logs to pino. - onnotice: ?(Env.userLogLevel == #warn || Env.userLogLevel == #error ? None : Some(_str => ())), - transform: {undefined: Null}, - max: 2, - // debug: (~connection, ~query, ~params as _, ~types as _) => Js.log2(connection, query), +let makeClient = () => { + Postgres.makeSql( + ~config={ + host: Env.Db.host, + port: Env.Db.port, + username: Env.Db.user, + password: Env.Db.password, + database: Env.Db.database, + ssl: Env.Db.ssl, + // TODO: think how we want to pipe these logs to pino. + onnotice: ?( + Env.userLogLevel == #warn || Env.userLogLevel == #error ? None : Some(_str => ()) + ), + transform: {undefined: Null}, + max: 2, + // debug: (~connection, ~query, ~params as _, ~types as _) => Js.log2(connection, query), + }, + ) } -let sql = Postgres.makeSql(~config) + +let sql = makeClient() let publicSchema = Env.Db.publicSchema let allEntityTables: array = Entities.allEntities->Belt.Array.map(entityConfig => { diff --git a/codegenerator/cli/templates/static/codegen/src/db/Migrations.res b/codegenerator/cli/templates/static/codegen/src/db/Migrations.res index d842bb58f..a293165c7 100644 --- a/codegenerator/cli/templates/static/codegen/src/db/Migrations.res +++ b/codegenerator/cli/templates/static/codegen/src/db/Migrations.res @@ -1,4 +1,3 @@ -let sql = Db.sql let unsafe = Postgres.unsafe let deleteAllTables: unit => promise = async () => { @@ -12,7 +11,7 @@ let deleteAllTables: unit => promise = async () => { GRANT ALL ON SCHEMA ${Env.Db.publicSchema} TO public; END $$;` - await sql->unsafe(query) + await Generated.codegenPersistence.sql->unsafe(query) } type t @@ -26,10 +25,9 @@ let runUpMigrations = async ( // Reset is used for db-setup ~reset=false, ) => { - let config = RegisterHandlers.getConfigWithoutRegistrations() - + let config = Generated.configWithoutRegistrations let exitCode = try { - await config.persistence->Persistence.init( + await Generated.codegenPersistence->Persistence.init( ~reset, ~chainConfigs=config.chainMap->ChainMap.values, ) diff --git a/codegenerator/cli/templates/static/codegen/src/eventFetching/ChainFetcher.res b/codegenerator/cli/templates/static/codegen/src/eventFetching/ChainFetcher.res index 79a06cff9..312cb2a37 100644 --- a/codegenerator/cli/templates/static/codegen/src/eventFetching/ChainFetcher.res +++ b/codegenerator/cli/templates/static/codegen/src/eventFetching/ChainFetcher.res @@ -11,7 +11,7 @@ type t = { logger: Pino.t, fetchState: FetchState.t, sourceManager: SourceManager.t, - chainConfig: InternalConfig.chain, + chainConfig: Config.chain, //The latest known block of the chain currentBlockHeight: int, isProgressAtHead: bool, @@ -26,13 +26,14 @@ type t = { //CONSTRUCTION let make = ( - ~chainConfig: InternalConfig.chain, + ~chainConfig: Config.chain, ~dynamicContracts: array, ~startBlock, ~endBlock, ~firstEventBlockNumber, ~progressBlockNumber, ~config: Config.t, + ~registrations: EventRegister.registrations, ~targetBufferSize, ~logger, ~timestampCaughtUpToHeadOrEndblock, @@ -123,37 +124,29 @@ let make = ( ) } - let onBlockConfigs = switch config.registrations { - | None => Js.Exn.raiseError("Indexer must be initialized with event registration finished.") - | Some(registrations) => - let onBlockConfigs = - registrations.onBlockByChainId->Utils.Dict.dangerouslyGetNonOption( - chainConfig.id->Int.toString, - ) - switch onBlockConfigs { - | Some(onBlockConfigs) => - // TODO: Move it to the EventRegister module - // so the error is thrown with better stack trace - onBlockConfigs->Array.forEach(onBlockConfig => { - if onBlockConfig.startBlock->Option.getWithDefault(startBlock) < startBlock { + let onBlockConfigs = + registrations.onBlockByChainId->Utils.Dict.dangerouslyGetNonOption(chainConfig.id->Int.toString) + switch onBlockConfigs { + | Some(onBlockConfigs) => + // TODO: Move it to the EventRegister module + // so the error is thrown with better stack trace + onBlockConfigs->Array.forEach(onBlockConfig => { + if onBlockConfig.startBlock->Option.getWithDefault(startBlock) < startBlock { + Js.Exn.raiseError( + `The start block for onBlock handler "${onBlockConfig.name}" is less than the chain start block (${startBlock->Belt.Int.toString}). This is not supported yet.`, + ) + } + switch endBlock { + | Some(chainEndBlock) => + if onBlockConfig.endBlock->Option.getWithDefault(chainEndBlock) > chainEndBlock { Js.Exn.raiseError( - `The start block for onBlock handler "${onBlockConfig.name}" is less than the chain start block (${startBlock->Belt.Int.toString}). This is not supported yet.`, + `The end block for onBlock handler "${onBlockConfig.name}" is greater than the chain end block (${chainEndBlock->Belt.Int.toString}). This is not supported yet.`, ) } - switch endBlock { - | Some(chainEndBlock) => - if onBlockConfig.endBlock->Option.getWithDefault(chainEndBlock) > chainEndBlock { - Js.Exn.raiseError( - `The end block for onBlock handler "${onBlockConfig.name}" is greater than the chain end block (${chainEndBlock->Belt.Int.toString}). This is not supported yet.`, - ) - } - | None => () - } - }) - | None => () - } - - onBlockConfigs + | None => () + } + }) + | None => () } let fetchState = FetchState.make( @@ -165,8 +158,9 @@ let make = ( ~eventConfigs, ~targetBufferSize, ~chainId=chainConfig.id, + // FIXME: Shouldn't set with full history ~blockLag=Pervasives.max( - !(config->Config.shouldRollbackOnReorg) || isInReorgThreshold ? 0 : chainConfig.maxReorgDepth, + !config.shouldRollbackOnReorg || isInReorgThreshold ? 0 : chainConfig.maxReorgDepth, Env.indexingBlockLag->Option.getWithDefault(0), ), ~onBlockConfigs?, @@ -190,11 +184,11 @@ let make = ( reorgDetection: ReorgDetection.make( ~chainReorgCheckpoints, ~maxReorgDepth, - ~shouldRollbackOnReorg=config->Config.shouldRollbackOnReorg, + ~shouldRollbackOnReorg=config.shouldRollbackOnReorg, ), safeCheckpointTracking: SafeCheckpointTracking.make( ~maxReorgDepth, - ~shouldRollbackOnReorg=config->Config.shouldRollbackOnReorg, + ~shouldRollbackOnReorg=config.shouldRollbackOnReorg, ~chainReorgCheckpoints, ), currentBlockHeight: 0, @@ -208,12 +202,13 @@ let make = ( } } -let makeFromConfig = (chainConfig: InternalConfig.chain, ~config, ~targetBufferSize) => { +let makeFromConfig = (chainConfig: Config.chain, ~config, ~registrations, ~targetBufferSize) => { let logger = Logging.createChild(~params={"chainId": chainConfig.id}) make( ~chainConfig, ~config, + ~registrations, ~startBlock=chainConfig.startBlock, ~endBlock=chainConfig.endBlock, ~reorgCheckpoints=[], @@ -234,11 +229,12 @@ let makeFromConfig = (chainConfig: InternalConfig.chain, ~config, ~targetBufferS * This function allows a chain fetcher to be created from metadata, in particular this is useful for restarting an indexer and making sure it fetches blocks from the same place. */ let makeFromDbState = async ( - chainConfig: InternalConfig.chain, + chainConfig: Config.chain, ~resumedChainState: Persistence.initialChainState, ~reorgCheckpoints, ~isInReorgThreshold, ~config, + ~registrations, ~targetBufferSize, ) => { let chainId = chainConfig.id @@ -258,6 +254,7 @@ let makeFromDbState = async ( ~startBlock=resumedChainState.startBlock, ~endBlock=resumedChainState.endBlock, ~config, + ~registrations, ~reorgCheckpoints, ~maxReorgDepth=resumedChainState.maxReorgDepth, ~firstEventBlockNumber=resumedChainState.firstEventBlockNumber, @@ -430,7 +427,7 @@ let getLastKnownValidBlock = async ( // Improtant: It's important to not include the reorg detection block number // because there might be different instances of the source // with mismatching hashes between them. - // So we MUST always rollback the block number where we detected a reorg. + // So we MUST always rollback the block number where we detected a reorg. let scannedBlockNumbers = chainFetcher.reorgDetection->ReorgDetection.getThresholdBlockNumbersBelowBlock( ~blockNumber=reorgBlockNumber, diff --git a/codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res b/codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res index a3debe4df..11bce6b59 100644 --- a/codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res +++ b/codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res @@ -3,7 +3,7 @@ open Belt type t = { committedCheckpointId: int, chainFetchers: ChainMap.t, - multichain: InternalConfig.multichain, + multichain: Config.multichain, isInReorgThreshold: bool, } @@ -16,13 +16,15 @@ let calculateTargetBufferSize = (~activeChainsCount, ~config: Config.t) => { } } -let makeFromConfig = (~config: Config.t): t => { +let makeFromConfig = (~config: Config.t, ~registrations): t => { let targetBufferSize = calculateTargetBufferSize( ~activeChainsCount=config.chainMap->ChainMap.size, ~config, ) let chainFetchers = - config.chainMap->ChainMap.map(ChainFetcher.makeFromConfig(_, ~config, ~targetBufferSize)) + config.chainMap->ChainMap.map( + ChainFetcher.makeFromConfig(_, ~config, ~registrations, ~targetBufferSize), + ) { committedCheckpointId: 0, chainFetchers, @@ -31,7 +33,12 @@ let makeFromConfig = (~config: Config.t): t => { } } -let makeFromDbState = async (~initialState: Persistence.initialState, ~config: Config.t): t => { +let makeFromDbState = async ( + ~initialState: Persistence.initialState, + ~config: Config.t, + ~registrations, + ~persistence: Persistence.t, +): t => { let isInReorgThreshold = if initialState.cleanRun { false } else { @@ -41,7 +48,7 @@ let makeFromDbState = async (~initialState: Persistence.initialState, ~config: C // This rows check might incorrectly return false for recovering the isInReorgThreshold option. // But this is not a problem. There's no history anyways, and the indexer will be able to // correctly calculate isInReorgThreshold as it starts. - let hasStartedSavingHistory = await Db.sql->DbFunctions.EntityHistory.hasRows + let hasStartedSavingHistory = await persistence.sql->DbFunctions.EntityHistory.hasRows //If we have started saving history, continue to save history //as regardless of whether we are still in a reorg threshold @@ -73,6 +80,7 @@ let makeFromDbState = async (~initialState: Persistence.initialState, ~config: C ~isInReorgThreshold, ~targetBufferSize, ~config, + ~registrations, ), ) }) diff --git a/codegenerator/cli/templates/static/codegen/src/eventFetching/NetworkSources.res b/codegenerator/cli/templates/static/codegen/src/eventFetching/NetworkSources.res index 090e236ea..71756ae8e 100644 --- a/codegenerator/cli/templates/static/codegen/src/eventFetching/NetworkSources.res +++ b/codegenerator/cli/templates/static/codegen/src/eventFetching/NetworkSources.res @@ -3,7 +3,43 @@ open Belt type rpc = { url: string, sourceFor: Source.sourceFor, - syncConfig?: InternalConfig.sourceSyncOptions, + syncConfig?: Config.sourceSyncOptions, +} + +let getSyncConfig = ( + { + ?initialBlockInterval, + ?backoffMultiplicative, + ?accelerationAdditive, + ?intervalCeiling, + ?backoffMillis, + ?queryTimeoutMillis, + ?fallbackStallTimeout, + }: Config.sourceSyncOptions, +): Config.sourceSync => { + let queryTimeoutMillis = queryTimeoutMillis->Option.getWithDefault(20_000) + { + initialBlockInterval: Env.Configurable.SyncConfig.initialBlockInterval->Option.getWithDefault( + initialBlockInterval->Option.getWithDefault(10_000), + ), + // After an RPC error, how much to scale back the number of blocks requested at once + backoffMultiplicative: Env.Configurable.SyncConfig.backoffMultiplicative->Option.getWithDefault( + backoffMultiplicative->Option.getWithDefault(0.8), + ), + // Without RPC errors or timeouts, how much to increase the number of blocks requested by for the next batch + accelerationAdditive: Env.Configurable.SyncConfig.accelerationAdditive->Option.getWithDefault( + accelerationAdditive->Option.getWithDefault(500), + ), + // Do not further increase the block interval past this limit + intervalCeiling: Env.Configurable.SyncConfig.intervalCeiling->Option.getWithDefault( + intervalCeiling->Option.getWithDefault(10_000), + ), + // After an error, how long to wait before retrying + backoffMillis: backoffMillis->Option.getWithDefault(5000), + // How long to wait before cancelling an RPC request + queryTimeoutMillis, + fallbackStallTimeout: fallbackStallTimeout->Option.getWithDefault(queryTimeoutMillis / 2), + } } let evm = ( @@ -43,7 +79,7 @@ let evm = ( chain, sourceFor, contracts, - syncConfig: Config.getSyncConfig(syncConfig->Option.getWithDefault({})), + syncConfig: getSyncConfig(syncConfig->Option.getWithDefault({})), url, eventRouter, allEventSignatures, diff --git a/codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res b/codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res index f35117192..de95a3475 100644 --- a/codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res +++ b/codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res @@ -40,7 +40,7 @@ module WriteThrottlers = { } type t = { - config: Config.t, + indexer: Indexer.t, chainManager: ChainManager.t, processedBatches: int, currentlyProcessingBatch: bool, @@ -54,9 +54,9 @@ type t = { id: int, } -let make = (~config: Config.t, ~chainManager: ChainManager.t, ~shouldUseTui=false) => { +let make = (~indexer: Indexer.t, ~chainManager: ChainManager.t, ~shouldUseTui=false) => { { - config, + indexer, currentlyProcessingBatch: false, processedBatches: 0, chainManager, @@ -157,7 +157,11 @@ let updateChainFetcherCurrentBlockHeight = (chainFetcher: ChainFetcher.t, ~curre } } -let updateChainMetadataTable = (cm: ChainManager.t, ~throttler: Throttler.t) => { +let updateChainMetadataTable = ( + cm: ChainManager.t, + ~persistence: Persistence.t, + ~throttler: Throttler.t, +) => { let chainsData: dict = Js.Dict.empty() cm.chainFetchers @@ -178,7 +182,7 @@ let updateChainMetadataTable = (cm: ChainManager.t, ~throttler: Throttler.t) => //Don't await this set, it can happen in its own time throttler->Throttler.schedule(() => - Db.sql + persistence.sql ->InternalTable.Chains.setMeta(~pgSchema=Db.publicSchema, ~chainsData) ->Promise.ignoreValue ) @@ -217,7 +221,7 @@ let updateProgressedChains = (chainManager: ChainManager.t, ~batch: Batch.t) => ~chainId=chain->ChainMap.Chain.toChainId, ) } - + // Calculate and set latency metrics switch batch->Batch.findLastEventItem(~chainId=chain->ChainMap.Chain.toChainId) { | Some(eventItem) => { @@ -225,12 +229,12 @@ let updateProgressedChains = (chainManager: ChainManager.t, ~batch: Batch.t) => let currentTimeMs = Js.Date.now()->Float.toInt let blockTimestampMs = blockTimestamp * 1000 let latencyMs = currentTimeMs - blockTimestampMs - + Prometheus.ProgressLatency.set(~latencyMs, ~chainId=chain->ChainMap.Chain.toChainId) } | None => () } - + { ...cf, // Since we process per chain always in order, @@ -403,7 +407,7 @@ let validatePartitionQueryResponse = ( | ReorgDetected(reorgDetected) => { chainFetcher.logger->Logging.childInfo( reorgDetected->ReorgDetection.reorgDetectedToLogParams( - ~shouldRollbackOnReorg=state.config->Config.shouldRollbackOnReorg, + ~shouldRollbackOnReorg=state.indexer.config.shouldRollbackOnReorg, ), ) Prometheus.ReorgCount.increment(~chain) @@ -411,7 +415,7 @@ let validatePartitionQueryResponse = ( ~blockNumber=reorgDetected.scannedBlock.blockNumber, ~chain, ) - if state.config->Config.shouldRollbackOnReorg { + if state.indexer.config.shouldRollbackOnReorg { Some(reorgDetected.scannedBlock.blockNumber) } else { None @@ -531,7 +535,7 @@ let processPartitionQueryResponse = async ( await ChainFetcher.runContractRegistersOrThrow( ~itemsWithContractRegister, ~chain, - ~config=state.config, + ~config=state.indexer.config, ) } @@ -590,7 +594,7 @@ let actionReducer = (state: t, action: action) => { switch action { | FinishWaitingForNewBlock({chain, currentBlockHeight}) => { let isBelowReorgThreshold = - !state.chainManager.isInReorgThreshold && state.config->Config.shouldRollbackOnReorg + !state.chainManager.isInReorgThreshold && state.indexer.config.shouldRollbackOnReorg let shouldEnterReorgThreshold = isBelowReorgThreshold && state.chainManager.chainFetchers @@ -635,7 +639,7 @@ let actionReducer = (state: t, action: action) => { ) | EventBatchProcessed({batch}) => let maybePruneEntityHistory = - state.config->Config.shouldPruneHistory( + state.indexer.config->Config.shouldPruneHistory( ~isInReorgThreshold=state.chainManager.isInReorgThreshold, ) ? [PruneStaleEntityHistory] @@ -655,7 +659,6 @@ let actionReducer = (state: t, action: action) => { state.chainManager.chainFetchers, ) ? { - // state.config.persistence.storage Logging.info("All chains are caught up to end blocks.") // Keep the indexer process running in TUI mode @@ -812,7 +815,7 @@ let injectedTaskReducer = ( switch state.chainManager->ChainManager.getSafeCheckpointId { | None => () | Some(safeCheckpointId) => - await Db.sql->InternalTable.Checkpoints.pruneStaleCheckpoints( + await state.indexer.persistence.sql->InternalTable.Checkpoints.pruneStaleCheckpoints( ~pgSchema=Env.Db.publicSchema, ~safeCheckpointId, ) @@ -827,7 +830,7 @@ let injectedTaskReducer = ( let timeRef = Hrtime.makeTimer() try { let () = - await Db.sql->EntityHistory.pruneStaleEntityHistory( + await state.indexer.persistence.sql->EntityHistory.pruneStaleEntityHistory( ~entityName=entityConfig.name, ~entityIndex=entityConfig.index, ~pgSchema=Env.Db.publicSchema, @@ -858,10 +861,18 @@ let injectedTaskReducer = ( let {chainManager, writeThrottlers} = state switch shouldExit { | ExitWithSuccess => - updateChainMetadataTable(chainManager, ~throttler=writeThrottlers.chainMetaData) + updateChainMetadataTable( + chainManager, + ~throttler=writeThrottlers.chainMetaData, + ~persistence=state.indexer.persistence, + ) dispatchAction(SuccessExit) | NoExit => - updateChainMetadataTable(chainManager, ~throttler=writeThrottlers.chainMetaData)->ignore + updateChainMetadataTable( + chainManager, + ~throttler=writeThrottlers.chainMetaData, + ~persistence=state.indexer.persistence, + )->ignore } | NextQuery(chainCheck) => let fetchForChain = checkAndFetchForChain( @@ -885,16 +896,18 @@ let injectedTaskReducer = ( | ProcessEventBatch => if !state.currentlyProcessingBatch && !isPreparingRollback(state) { let batch = - state.chainManager->ChainManager.createBatch(~batchSizeTarget=state.config.batchSize) + state.chainManager->ChainManager.createBatch( + ~batchSizeTarget=state.indexer.config.batchSize, + ) let progressedChainsById = batch.progressedChainsById let totalBatchSize = batch.totalBatchSize let isInReorgThreshold = state.chainManager.isInReorgThreshold - let shouldSaveHistory = state.config->Config.shouldSaveHistory(~isInReorgThreshold) + let shouldSaveHistory = state.indexer.config->Config.shouldSaveHistory(~isInReorgThreshold) let isBelowReorgThreshold = - !state.chainManager.isInReorgThreshold && state.config->Config.shouldRollbackOnReorg + !state.chainManager.isInReorgThreshold && state.indexer.config.shouldRollbackOnReorg let shouldEnterReorgThreshold = isBelowReorgThreshold && state.chainManager.chainFetchers @@ -946,7 +959,7 @@ let injectedTaskReducer = ( ~inMemoryStore, ~isInReorgThreshold, ~loadManager=state.loadManager, - ~config=state.config, + ~indexer=state.indexer, ~chainFetchers=state.chainManager.chainFetchers, ) { | exception exn => @@ -1005,7 +1018,7 @@ let injectedTaskReducer = ( let reorgChainId = reorgChain->ChainMap.Chain.toChainId let rollbackTargetCheckpointId = { - switch await Db.sql->InternalTable.Checkpoints.getRollbackTargetCheckpoint( + switch await state.indexer.persistence.sql->InternalTable.Checkpoints.getRollbackTargetCheckpoint( ~pgSchema=Env.Db.publicSchema, ~reorgChainId, ~lastKnownValidBlockNumber=rollbackTargetBlockNumber, @@ -1021,7 +1034,7 @@ let injectedTaskReducer = ( { let rollbackProgressDiff = - await Db.sql->InternalTable.Checkpoints.getRollbackProgressDiff( + await state.indexer.persistence.sql->InternalTable.Checkpoints.getRollbackProgressDiff( ~pgSchema=Env.Db.publicSchema, ~rollbackTargetCheckpointId, ) @@ -1105,7 +1118,10 @@ let injectedTaskReducer = ( }) // Construct in Memory store with rollback diff - let diff = await IO.prepareRollbackDiff(~rollbackTargetCheckpointId) + let diff = await IO.prepareRollbackDiff( + ~rollbackTargetCheckpointId, + ~persistence=state.indexer.persistence, + ) let chainManager = { ...state.chainManager, diff --git a/codegenerator/cli/templates/static/codegen/src/globalState/GlobalStateManager.res b/codegenerator/cli/templates/static/codegen/src/globalState/GlobalStateManager.res index 396c1eb9c..11fcc7880 100644 --- a/codegenerator/cli/templates/static/codegen/src/globalState/GlobalStateManager.res +++ b/codegenerator/cli/templates/static/codegen/src/globalState/GlobalStateManager.res @@ -10,15 +10,21 @@ module type State = { let getId: t => int } -let handleFatalError = e => { - e->ErrorHandling.make(~msg="Indexer has failed with an unexpected error")->ErrorHandling.log - NodeJs.process->NodeJs.exitWithCode(Failure) -} - module MakeManager = (S: State) => { - type t = {mutable state: S.t, stateUpdatedHook: option unit>} + type t = {mutable state: S.t, stateUpdatedHook: option unit>, onError: exn => unit} - let make = (~stateUpdatedHook: option unit>=?, state: S.t) => {state, stateUpdatedHook} + let make = ( + state: S.t, + ~stateUpdatedHook: option unit>=?, + ~onError=e => { + e->ErrorHandling.make(~msg="Indexer has failed with an unexpected error")->ErrorHandling.log + NodeJs.process->NodeJs.exitWithCode(Failure) + }, + ) => { + state, + stateUpdatedHook, + onError, + } let rec dispatchAction = (~stateId=0, self: t, action: S.action) => { try { @@ -37,7 +43,7 @@ module MakeManager = (S: State) => { self.state = nextState nextTasks->Array.forEach(task => dispatchTask(self, task)) } catch { - | e => e->handleFatalError + | e => e->self.onError } } and dispatchTask = (self, task: S.task) => { @@ -51,12 +57,12 @@ module MakeManager = (S: State) => { dispatchAction(~stateId, self, action) ) ->Promise.catch(e => { - e->handleFatalError + e->self.onError Promise.resolve() }) ->ignore } catch { - | e => e->handleFatalError + | e => e->self.onError } } }, 0)->ignore diff --git a/codegenerator/cli/templates/static/codegen/src/globalState/GlobalStateManager.resi b/codegenerator/cli/templates/static/codegen/src/globalState/GlobalStateManager.resi index 20dee4402..d9ff6d8c9 100644 --- a/codegenerator/cli/templates/static/codegen/src/globalState/GlobalStateManager.resi +++ b/codegenerator/cli/templates/static/codegen/src/globalState/GlobalStateManager.resi @@ -1,6 +1,6 @@ type t -let make: (~stateUpdatedHook: GlobalState.t => unit=?, GlobalState.t) => t +let make: (GlobalState.t, ~stateUpdatedHook: GlobalState.t => unit=?, ~onError: exn => unit=?) => t let dispatchAction: (~stateId: int=?, t, GlobalState.action) => unit let dispatchTask: (t, GlobalState.task) => unit let getState: t => GlobalState.t diff --git a/internal_docs/EventFetchers.md b/internal_docs/EventFetchers.md index ebbf85fd1..52556bc10 100644 --- a/internal_docs/EventFetchers.md +++ b/internal_docs/EventFetchers.md @@ -32,7 +32,7 @@ TODO: currently the ChainManager is passed directly to the `EventProcessor` as d classDiagram class ChainFetcher { fetchedEventQueue: ChainEventQueue.t, - chainConfig: InternalConfig.chain, + chainConfig: Config.chain, source: Source.source, startFetchingEvents(): promise diff --git a/scenarios/helpers/src/ChainMocking.res b/scenarios/helpers/src/ChainMocking.res index 699334fe0..813571eda 100644 --- a/scenarios/helpers/src/ChainMocking.res +++ b/scenarios/helpers/src/ChainMocking.res @@ -114,7 +114,7 @@ module Make = () => { } type t = { - chainConfig: InternalConfig.chain, + chainConfig: Config.chain, blocks: array, maxBlocksReturned: int, blockTimestampInterval: int, diff --git a/scenarios/test_codegen/test/ChainManager_test.res b/scenarios/test_codegen/test/ChainManager_test.res index cb8bb32e6..77ce23c08 100644 --- a/scenarios/test_codegen/test/ChainManager_test.res +++ b/scenarios/test_codegen/test/ChainManager_test.res @@ -2,7 +2,7 @@ open Belt open RescriptMocha let populateChainQueuesWithRandomEvents = (~runTime=1000, ~maxBlockTime=15, ()) => { - let config = RegisterHandlers.registerAllHandlers() + let config = Generated.configWithoutRegistrations let allEvents = [] let numberOfMockEventsCreated = ref(0) diff --git a/scenarios/test_codegen/test/E2EEthNode_test.res b/scenarios/test_codegen/test/E2EEthNode_test.res index a6a941826..ef04f07c1 100644 --- a/scenarios/test_codegen/test/E2EEthNode_test.res +++ b/scenarios/test_codegen/test/E2EEthNode_test.res @@ -15,10 +15,10 @@ describe("E2E Integration Test", () => { let contracts = await SetupRpcNode.deployContracts() await SetupRpcNode.runBasicGravatarTransactions(contracts.gravatar) - let localChainConfig: InternalConfig.chain = { + let localChainConfig: Config.chain = { let contracts = [ { - InternalConfig.name: "Gravatar", + Config.name: "Gravatar", abi: Types.Gravatar.abi, addresses: ["0x5FbDB2315678afecb367f032d93F642f64180aa3"->Address.Evm.fromStringOrThrow], events: [ @@ -69,11 +69,11 @@ describe("E2E Integration Test", () => { } } - let config = RegisterHandlers.registerAllHandlers() + let indexer = Generated.getIndexer() let chainManager = Integration_ts_helpers.makeChainManager(localChainConfig) - let globalState = GlobalState.make(~config, ~chainManager) + let globalState = GlobalState.make(~indexer, ~chainManager) let gsManager = globalState->GlobalStateManager.make diff --git a/scenarios/test_codegen/test/EventOrigin_test.res b/scenarios/test_codegen/test/EventOrigin_test.res index c8fdc7663..d963596ff 100644 --- a/scenarios/test_codegen/test/EventOrigin_test.res +++ b/scenarios/test_codegen/test/EventOrigin_test.res @@ -50,7 +50,7 @@ describe("Chains State", () => { let handlerContext = UserContext.getHandlerContext({ item, loadManager, - persistence: Config.codegenPersistence, + persistence: Generated.codegenPersistence, inMemoryStore, shouldSaveHistory: false, isPreload: false, diff --git a/scenarios/test_codegen/test/Integration_ts_helpers.gen.ts b/scenarios/test_codegen/test/Integration_ts_helpers.gen.ts index 4d26a2b47..2b27b66aa 100644 --- a/scenarios/test_codegen/test/Integration_ts_helpers.gen.ts +++ b/scenarios/test_codegen/test/Integration_ts_helpers.gen.ts @@ -15,4 +15,4 @@ export const getLocalChainConfig: (nftFactoryContractAddress:Address_t) => chain export const makeChainManager: (cfg:chainConfig) => chainManager = Integration_ts_helpersJS.makeChainManager as any; -export const startProcessing: (config:unknown, cfg:chainConfig, chainManager:chainManager) => void = Integration_ts_helpersJS.startProcessing as any; +export const startProcessing: (_config:T1, _cfg:chainConfig, _chainManager:chainManager) => void = Integration_ts_helpersJS.startProcessing as any; diff --git a/scenarios/test_codegen/test/Integration_ts_helpers.res b/scenarios/test_codegen/test/Integration_ts_helpers.res index 7263e6bf1..43c21db5b 100644 --- a/scenarios/test_codegen/test/Integration_ts_helpers.res +++ b/scenarios/test_codegen/test/Integration_ts_helpers.res @@ -1,11 +1,11 @@ @genType.opaque -type chainConfig = InternalConfig.chain +type chainConfig = Config.chain @genType let getLocalChainConfig = (nftFactoryContractAddress): chainConfig => { let contracts = [ { - InternalConfig.name: "NftFactory", + Config.name: "NftFactory", abi: Types.NftFactory.abi, addresses: [nftFactoryContractAddress], events: [(Types.NftFactory.SimpleNftCreated.register() :> Internal.eventConfig)], @@ -65,27 +65,23 @@ type chainManager = ChainManager.t let makeChainManager = (cfg: chainConfig): chainManager => { // FIXME: Should fork from the main ChainMap? ChainManager.makeFromConfig( - ~config=Config.make( - ~isUnorderedMultichainMode=true, - ~chains=[cfg], - ~registrations={onBlockByChainId: Js.Dict.empty(), hasEvents: false}, - ), + ~config=Config.make(~multichain=Unordered, ~chains=[cfg]), + ~registrations={onBlockByChainId: Js.Dict.empty(), hasEvents: false}, ) } @genType -let startProcessing = (config, cfg: chainConfig, chainManager: chainManager) => { - let globalState = GlobalState.make( - ~config=config->( - // Workaround for genType to treat the type as unknown, since we don't want to expose Config.t to TS users - Utils.magic: unknown => Config.t - ), - ~chainManager, - ) - - let gsManager = globalState->GlobalStateManager.make - - gsManager->GlobalStateManager.dispatchTask( - NextQuery(Chain(ChainMap.Chain.makeUnsafe(~chainId=cfg.id))), - ) +let startProcessing = (_config, _cfg: chainConfig, _chainManager: chainManager) => { + // let globalState = GlobalState.make( + // ~indexer=indexer->( + // // Workaround for genType to treat the type as unknown, since we don't want to expose Config.t to TS users + // Utils.magic: unknown => Config.t + // ), + // ~chainManager, + // ) + // let gsManager = globalState->GlobalStateManager.make + // gsManager->GlobalStateManager.dispatchTask( + // NextQuery(Chain(ChainMap.Chain.makeUnsafe(~chainId=cfg.id))), + // ) + () } diff --git a/scenarios/test_codegen/test/RpcSource_test.res b/scenarios/test_codegen/test/RpcSource_test.res index 49d527d48..3da040fac 100644 --- a/scenarios/test_codegen/test/RpcSource_test.res +++ b/scenarios/test_codegen/test/RpcSource_test.res @@ -29,7 +29,7 @@ describe("RpcSource - name", () => { contracts: [], eventRouter: EventRouter.empty(), sourceFor: Sync, - syncConfig: Config.getSyncConfig({}), + syncConfig: NetworkSources.getSyncConfig({}), allEventSignatures: [], shouldUseHypersyncClientDecoder: false, lowercaseAddresses: false, @@ -46,7 +46,7 @@ describe("RpcSource - getHeightOrThrow", () => { contracts: [], eventRouter: EventRouter.empty(), sourceFor: Sync, - syncConfig: Config.getSyncConfig({}), + syncConfig: NetworkSources.getSyncConfig({}), allEventSignatures: ["a", "b", "c"], shouldUseHypersyncClientDecoder: true, lowercaseAddresses: false, diff --git a/scenarios/test_codegen/test/__mocks__/MockConfig.res b/scenarios/test_codegen/test/__mocks__/MockConfig.res index 9f583ed8b..19c9b92cc 100644 --- a/scenarios/test_codegen/test/__mocks__/MockConfig.res +++ b/scenarios/test_codegen/test/__mocks__/MockConfig.res @@ -4,7 +4,7 @@ let chain1337 = ChainMap.Chain.makeUnsafe(~chainId=1337) let contracts = [ { - InternalConfig.name: "Gravatar", + Config.name: "Gravatar", abi: Types.Gravatar.abi, addresses: ["0x2B2f78c5BF6D9C12Ee1225D5F374aa91204580c3"->Address.Evm.fromStringOrThrow], events: [ @@ -38,7 +38,7 @@ let evmContracts = contracts->Js.Array2.map((contract): Internal.evmContractConf ), }) -let mockChainConfig: InternalConfig.chain = { +let mockChainConfig: Config.chain = { id: 1337, maxReorgDepth: 200, startBlock: 1, @@ -48,7 +48,7 @@ let mockChainConfig: InternalConfig.chain = { chain: chain1337, contracts: evmContracts, sourceFor: Sync, - syncConfig: Config.getSyncConfig({ + syncConfig: NetworkSources.getSyncConfig({ initialBlockInterval: 10000, backoffMultiplicative: 10000.0, accelerationAdditive: 10000, diff --git a/scenarios/test_codegen/test/helpers/DbHelpers.res b/scenarios/test_codegen/test/helpers/DbHelpers.res index 03e2b8630..8dc5f574d 100644 --- a/scenarios/test_codegen/test/helpers/DbHelpers.res +++ b/scenarios/test_codegen/test/helpers/DbHelpers.res @@ -1,9 +1,9 @@ -let _sqlConfig = Db.config - @@warning("-21") let resetPostgresClient: unit => unit = () => { // This is a hack to reset the postgres client between tests. postgres.js seems to cache some types, and if tests clear the DB you need to also reset sql. - %raw("require('../../generated/src/db/Db.res.js').sql = require('postgres')(_sqlConfig)") + let sql = Db.makeClient() + Generated.codegenPersistence.sql = sql + Generated.codegenPersistence.storage = Generated.makeStorage(~sql) } let runUpDownMigration = async () => { diff --git a/scenarios/test_codegen/test/helpers/Mock.res b/scenarios/test_codegen/test/helpers/Mock.res index 6dbd80b7c..ca453c0e5 100644 --- a/scenarios/test_codegen/test/helpers/Mock.res +++ b/scenarios/test_codegen/test/helpers/Mock.res @@ -13,7 +13,7 @@ module InMemoryStore = { checkpointId: 0, entityUpdateAction: Set(entity), }, - ~shouldSaveHistory=RegisterHandlers.getConfig()->Config.shouldSaveHistory( + ~shouldSaveHistory=Generated.configWithoutRegistrations->Config.shouldSaveHistory( ~isInReorgThreshold=false, ), ) @@ -47,7 +47,7 @@ module Storage = { resolveIsInitialized: bool => unit, initializeCalls: array<{ "entities": array, - "chainConfigs": array, + "chainConfigs": array, "enums": array>, }>, resolveInitialize: Persistence.initialState => unit, @@ -190,7 +190,7 @@ module Storage = { let toPersistence = (storageMock: t) => { { - ...Config.codegenPersistence, + ...Generated.codegenPersistence, storage: storageMock.storage, storageStatus: Ready({ cleanRun: false, @@ -227,77 +227,84 @@ module Indexer = { let rec make = async ( ~chains: array, - ~multichain=InternalConfig.Unordered, + ~multichain=Config.Unordered, ~saveFullHistory=false, // Reinit storage without Hasura // makes tests ~1.9 seconds faster ~enableHasura=false, ~reset=true, ) => { - DbHelpers.resetPostgresClient() // TODO: Should stop using global client PromClient.defaultRegister->PromClient.resetMetrics - let config = RegisterHandlers.registerAllHandlers() + let registrations = Generated.registerAllHandlers() - let chainMap = - chains - ->Js.Array2.map(chainConfig => { - let chain = ChainMap.Chain.makeUnsafe(~chainId=(chainConfig.chain :> int)) - let originalChainConfig = config.chainMap->ChainMap.get(chain) - ( - chain, - { - ...originalChainConfig, - sources: chainConfig.sources, - startBlock: chainConfig.startBlock->Option.getWithDefault( - originalChainConfig.startBlock, - ), - }, - ) - }) - ->ChainMap.fromArrayUnsafe + let config = { + let config = Generated.makeGeneratedConfig() - let graphqlClient = Rest.client(`${Env.Hasura.url}/v1/graphql`) - let graphqlRoute = Rest.route(() => { - method: Post, - path: "", - input: s => s.field("query", S.string), - responses: [s => s.data(S.unknown)], - }) + let chainMap = + chains + ->Js.Array2.map(chainConfig => { + let chain = ChainMap.Chain.makeUnsafe(~chainId=(chainConfig.chain :> int)) + let originalChainConfig = config.chainMap->ChainMap.get(chain) + ( + chain, + { + ...originalChainConfig, + sources: chainConfig.sources, + startBlock: chainConfig.startBlock->Option.getWithDefault( + originalChainConfig.startBlock, + ), + }, + ) + }) + ->ChainMap.fromArrayUnsafe + + { + ...config, + shouldRollbackOnReorg: true, + shouldSaveFullHistory: saveFullHistory, + enableRawEvents: false, + chainMap, + multichain, + } + } - let sql = Db.sql + let sql = Db.makeClient() let pgSchema = Env.Db.publicSchema - let storage = Config.makeStorage(~sql, ~pgSchema, ~isHasuraEnabled=enableHasura) + let storage = Generated.makeStorage(~sql, ~pgSchema, ~isHasuraEnabled=enableHasura) let persistence = { - ...config.persistence, + ...Generated.codegenPersistence, storageStatus: Persistence.Unknown, storage, + sql, } - let config: Config.t = { - ...config, - historyConfig: { - rollbackFlag: RollbackOnReorg, - historyFlag: saveFullHistory ? FullHistory : MinHistory, - }, + + let indexer = { + Indexer.registrations, + config, persistence, - enableRawEvents: false, - chainMap, - multichain, } + let graphqlClient = Rest.client(`${Env.Hasura.url}/v1/graphql`) + let graphqlRoute = Rest.route(() => { + method: Post, + path: "", + input: s => s.field("query", S.string), + responses: [s => s.data(S.unknown)], + }) + let gsManagerRef = ref(None) - await config.persistence->Persistence.init( - ~chainConfigs=config.chainMap->ChainMap.values, - ~reset, - ) + await persistence->Persistence.init(~chainConfigs=config.chainMap->ChainMap.values, ~reset) let chainManager = await ChainManager.makeFromDbState( - ~initialState=config.persistence->Persistence.getInitializedState, + ~initialState=persistence->Persistence.getInitializedState, ~config, + ~registrations, + ~persistence, ) - let globalState = GlobalState.make(~config, ~chainManager, ~shouldUseTui=false) + let globalState = GlobalState.make(~indexer, ~chainManager, ~shouldUseTui=false) // FIXME: Should replace use TUI with keep alive on finish let gsManager = globalState->GlobalStateManager.make gsManagerRef := Some(gsManager) gsManager->GlobalStateManager.dispatchTask(NextQuery(CheckAllChains)) @@ -335,7 +342,7 @@ module Indexer = { }, query: (type entity, entityMod) => { let entityConfig = entityMod->Entities.entityModToInternal - Db.sql + sql ->Postgres.unsafe( PgStorage.makeLoadAllQuery(~pgSchema, ~tableName=entityConfig.table.tableName), ) @@ -346,7 +353,7 @@ module Indexer = { }, queryHistory: (type entity, entityMod) => { let entityConfig = entityMod->Entities.entityModToInternal - Db.sql + sql ->Postgres.unsafe( PgStorage.makeLoadAllQuery( ~pgSchema, @@ -377,7 +384,7 @@ module Indexer = { ) }, queryCheckpoints: () => { - Db.sql + sql ->Postgres.unsafe( PgStorage.makeLoadAllQuery( ~pgSchema, @@ -387,7 +394,7 @@ module Indexer = { ->(Utils.magic: promise => promise>) }, queryEffectCache: (effectName: string) => { - Db.sql + sql ->Postgres.unsafe( PgStorage.makeLoadAllQuery(~pgSchema, ~tableName=Internal.cacheTablePrefix ++ effectName), ) diff --git a/scenarios/test_codegen/test/helpers/utils.ts b/scenarios/test_codegen/test/helpers/utils.ts index 1d47088ab..726feeb9e 100644 --- a/scenarios/test_codegen/test/helpers/utils.ts +++ b/scenarios/test_codegen/test/helpers/utils.ts @@ -1,8 +1,7 @@ import { runUpMigrations } from "../../generated/src/db/Migrations.res.js"; -import Postgres from "postgres"; -import { config } from "../../generated/src/db/Db.res.js"; +import { makeClient } from "../../generated/src/db/Db.res.js"; -export const createSql = () => Postgres(config); +export const createSql = makeClient; const originalConsoleLog = console.log; diff --git a/scenarios/test_codegen/test/integration-raw-events-test.ts b/scenarios/test_codegen/test/integration-raw-events-test.ts index 49c38bbf7..6b51d4977 100644 --- a/scenarios/test_codegen/test/integration-raw-events-test.ts +++ b/scenarios/test_codegen/test/integration-raw-events-test.ts @@ -1,150 +1,150 @@ -//LIBRARIES -import { expect } from "chai"; - -//CODEGEN -import { registerAllHandlers } from "../generated/src/RegisterHandlers.res.js"; -import { runDownMigrations } from "../generated/src/db/Migrations.res.js"; - -//HELPERS -import { - Users, - createNftFromFactory, - mintSimpleNft, -} from "./helpers/node-and-contracts"; -import { deployContracts } from "./helpers/setupNodeAndContracts"; - -import { runMigrationsNoLogs, createSql } from "./helpers/utils"; - -import { - getLocalChainConfig, - makeChainManager, - startProcessing, -} from "./Integration_ts_helpers.gen"; -// import { setLogLevel } from "generated/src/Logging.res.js"; -// require("mocha-reporter").hook(); //Outputs filename in error logs with mocha-reporter - -// TODO: I failed to connect RpcSource with hardhat to make the test work. -// Should rewrite it to use a real rpc or a completely mocked source. -describe.skip("Raw Events Integration", () => { - const sql = createSql(); - let simpleNftContractAddress: string; - let nftFactoryContractAddress: string; - let config: unknown; - - before(async function () { - this.timeout(30 * 1000); - // setLogLevel("trace"); - - await runMigrationsNoLogs(); - console.log("deploying Nft Factory"); - const deployedNftFactory = (await deployContracts()).nftFactory; - nftFactoryContractAddress = await deployedNftFactory.getAddress(); - console.log( - "Successfully deployed nftFactory at", - nftFactoryContractAddress - ); - - console.log("Creating Nft"); - const _createNftTx = await createNftFromFactory(deployedNftFactory, { - name: "test_name", - symbol: "t_sym", - supply: 200, - }); - - const simpleNftCreatedEventFilter = - deployedNftFactory.getEvent("SimpleNftCreated"); - const eventQuery = await deployedNftFactory.queryFilter( - simpleNftCreatedEventFilter - ); - const simplNftCreatedEvent = eventQuery[0]; - - const localChainConfig = getLocalChainConfig(nftFactoryContractAddress); - config = registerAllHandlers(); - - simpleNftContractAddress = simplNftCreatedEvent.args.contractAddress; - console.log("Created NFT at: ", simpleNftContractAddress); - - console.log("Minting Nft from user 1, 2 and 3"); - - const mintTxs = [ - { user: Users.User1, quantity: 1 }, - { user: Users.User2, quantity: 3 }, - { user: Users.User3, quantity: 5 }, - ].map((params) => - mintSimpleNft(params.user, simpleNftContractAddress, params.quantity) - ); - - await Promise.all(mintTxs); - - console.log("Successfully minted"); - console.log("Successfully processed events"); - - console.log("processing events after mint"); - const chainManager = makeChainManager(localChainConfig); - - startProcessing(config, localChainConfig, chainManager); - //Wait 0.5s for processing to occur it no longer finishes with a resolve - await new Promise((res) => - setTimeout(() => { - res(null); - }, 1000) - ); - }); - after(async () => { - await runMigrationsNoLogs(); - }); - - it("RawEvents table contains rows after indexer runs", async function () { - let rawEventsRows = await sql`SELECT * FROM public.raw_events`; - expect(rawEventsRows.count).to.be.gt(0); - }); - - it("should ensure Entites are created correctly", async function () { - let rowsNftCollection = await sql`SELECT * FROM public."NftCollection"`; - expect(rowsNftCollection.count).to.be.gt(0); - let rowsUsers = await sql`SELECT * FROM public."User"`; - expect(rowsUsers.count).to.be.gt(0); - let rowsToken = await sql`SELECT * FROM public."Token"`; - expect(rowsToken.count).to.be.gt(0); - }); - - it("should have 1 row in the dynamic_contract_registry table", async function () { - let rowsDCR = await sql`SELECT * FROM public.dynamic_contract_registry`; - console.log(rowsDCR); - expect(rowsDCR.count).to.be.eq(1); - }); - - // TODO: Fix this test. This test broke after rebasing the 'dev-mode' code on the lastest main with the restructiring and dynamic contracts code. - it.skip("Tracks dynamic contract on restart", async () => { - let beforeRawEventsRows = await sql`SELECT * FROM public.raw_events`; - //TODO: fix this test, This indicates this test is ineffective but the structure is what we want to test - // below show that the contract address store is still populated with the contract - console.log("new contract"); - - mintSimpleNft(Users.User1, simpleNftContractAddress, 1); - const localChainConfig = getLocalChainConfig(nftFactoryContractAddress); - const chainManager = makeChainManager(localChainConfig); - startProcessing(config, localChainConfig, chainManager); - - //Wait 2s for processing to occur - await new Promise((res) => - setTimeout(() => { - res(null); - }, 500) - ); - - let afterRawEventsRows = await sql`SELECT * FROM public.raw_events`; - expect(afterRawEventsRows.count).to.be.gt(beforeRawEventsRows.count); - }); - - it("RawEvents table does not exist after migration dropping raw events table", async function () { - await runDownMigrations(false); - let rawEventsRows = await sql` - SELECT EXISTS ( - SELECT FROM information_schema.tables - WHERE table_name = 'public.raw_events' - ); - `; - expect(rawEventsRows[0].exists).to.be.eq(false); - }); -}); +// //LIBRARIES +// import { expect } from "chai"; + +// //CODEGEN +// import { registerAllHandlers } from "../generated/src/RegisterHandlers.res.js"; +// import { runDownMigrations } from "../generated/src/db/Migrations.res.js"; + +// //HELPERS +// import { +// Users, +// createNftFromFactory, +// mintSimpleNft, +// } from "./helpers/node-and-contracts"; +// import { deployContracts } from "./helpers/setupNodeAndContracts"; + +// import { runMigrationsNoLogs, createSql } from "./helpers/utils"; + +// import { +// getLocalChainConfig, +// makeChainManager, +// startProcessing, +// } from "./Integration_ts_helpers.gen"; +// // import { setLogLevel } from "generated/src/Logging.res.js"; +// // require("mocha-reporter").hook(); //Outputs filename in error logs with mocha-reporter + +// // TODO: I failed to connect RpcSource with hardhat to make the test work. +// // Should rewrite it to use a real rpc or a completely mocked source. +// describe.skip("Raw Events Integration", () => { +// const sql = createSql(); +// let simpleNftContractAddress: string; +// let nftFactoryContractAddress: string; +// let config: unknown; + +// before(async function () { +// this.timeout(30 * 1000); +// // setLogLevel("trace"); + +// await runMigrationsNoLogs(); +// console.log("deploying Nft Factory"); +// const deployedNftFactory = (await deployContracts()).nftFactory; +// nftFactoryContractAddress = await deployedNftFactory.getAddress(); +// console.log( +// "Successfully deployed nftFactory at", +// nftFactoryContractAddress +// ); + +// console.log("Creating Nft"); +// const _createNftTx = await createNftFromFactory(deployedNftFactory, { +// name: "test_name", +// symbol: "t_sym", +// supply: 200, +// }); + +// const simpleNftCreatedEventFilter = +// deployedNftFactory.getEvent("SimpleNftCreated"); +// const eventQuery = await deployedNftFactory.queryFilter( +// simpleNftCreatedEventFilter +// ); +// const simplNftCreatedEvent = eventQuery[0]; + +// const localChainConfig = getLocalChainConfig(nftFactoryContractAddress); +// config = registerAllHandlers(); + +// simpleNftContractAddress = simplNftCreatedEvent.args.contractAddress; +// console.log("Created NFT at: ", simpleNftContractAddress); + +// console.log("Minting Nft from user 1, 2 and 3"); + +// const mintTxs = [ +// { user: Users.User1, quantity: 1 }, +// { user: Users.User2, quantity: 3 }, +// { user: Users.User3, quantity: 5 }, +// ].map((params) => +// mintSimpleNft(params.user, simpleNftContractAddress, params.quantity) +// ); + +// await Promise.all(mintTxs); + +// console.log("Successfully minted"); +// console.log("Successfully processed events"); + +// console.log("processing events after mint"); +// const chainManager = makeChainManager(localChainConfig); + +// startProcessing(config, localChainConfig, chainManager); +// //Wait 0.5s for processing to occur it no longer finishes with a resolve +// await new Promise((res) => +// setTimeout(() => { +// res(null); +// }, 1000) +// ); +// }); +// after(async () => { +// await runMigrationsNoLogs(); +// }); + +// it("RawEvents table contains rows after indexer runs", async function () { +// let rawEventsRows = await sql`SELECT * FROM public.raw_events`; +// expect(rawEventsRows.count).to.be.gt(0); +// }); + +// it("should ensure Entites are created correctly", async function () { +// let rowsNftCollection = await sql`SELECT * FROM public."NftCollection"`; +// expect(rowsNftCollection.count).to.be.gt(0); +// let rowsUsers = await sql`SELECT * FROM public."User"`; +// expect(rowsUsers.count).to.be.gt(0); +// let rowsToken = await sql`SELECT * FROM public."Token"`; +// expect(rowsToken.count).to.be.gt(0); +// }); + +// it("should have 1 row in the dynamic_contract_registry table", async function () { +// let rowsDCR = await sql`SELECT * FROM public.dynamic_contract_registry`; +// console.log(rowsDCR); +// expect(rowsDCR.count).to.be.eq(1); +// }); + +// // TODO: Fix this test. This test broke after rebasing the 'dev-mode' code on the lastest main with the restructiring and dynamic contracts code. +// it.skip("Tracks dynamic contract on restart", async () => { +// let beforeRawEventsRows = await sql`SELECT * FROM public.raw_events`; +// //TODO: fix this test, This indicates this test is ineffective but the structure is what we want to test +// // below show that the contract address store is still populated with the contract +// console.log("new contract"); + +// mintSimpleNft(Users.User1, simpleNftContractAddress, 1); +// const localChainConfig = getLocalChainConfig(nftFactoryContractAddress); +// const chainManager = makeChainManager(localChainConfig); +// startProcessing(config, localChainConfig, chainManager); + +// //Wait 2s for processing to occur +// await new Promise((res) => +// setTimeout(() => { +// res(null); +// }, 500) +// ); + +// let afterRawEventsRows = await sql`SELECT * FROM public.raw_events`; +// expect(afterRawEventsRows.count).to.be.gt(beforeRawEventsRows.count); +// }); + +// it("RawEvents table does not exist after migration dropping raw events table", async function () { +// await runDownMigrations(false); +// let rawEventsRows = await sql` +// SELECT EXISTS ( +// SELECT FROM information_schema.tables +// WHERE table_name = 'public.raw_events' +// ); +// `; +// expect(rawEventsRows[0].exists).to.be.eq(false); +// }); +// }); diff --git a/scenarios/test_codegen/test/lib_tests/Persistence_test.res b/scenarios/test_codegen/test/lib_tests/Persistence_test.res index d698036b8..9b7c91fb4 100644 --- a/scenarios/test_codegen/test/lib_tests/Persistence_test.res +++ b/scenarios/test_codegen/test/lib_tests/Persistence_test.res @@ -4,7 +4,12 @@ describe("Test Persistence layer init", () => { Async.it("Should initialize the persistence layer without the user entities", async () => { let storageMock = Mock.Storage.make([#isInitialized, #resumeInitialState, #initialize]) - let persistence = Persistence.make(~userEntities=[], ~allEnums=[], ~storage=storageMock.storage) + let persistence = Persistence.make( + ~userEntities=[], + ~allEnums=[], + ~storage=storageMock.storage, + ~sql=Generated.codegenPersistence.sql, + ) Assert.deepEqual( persistence.allEntities, @@ -130,7 +135,12 @@ describe("Test Persistence layer init", () => { Async.it("Should skip initialization when storage is already initialized", async () => { let storageMock = Mock.Storage.make([#isInitialized, #resumeInitialState]) - let persistence = Persistence.make(~userEntities=[], ~allEnums=[], ~storage=storageMock.storage) + let persistence = Persistence.make( + ~userEntities=[], + ~allEnums=[], + ~storage=storageMock.storage, + ~sql=Generated.codegenPersistence.sql, + ) let p = persistence->Persistence.init(~chainConfigs=[]) // Additional calls to init should not do anything diff --git a/scenarios/test_codegen/test/lib_tests/PgStorage_test.res b/scenarios/test_codegen/test/lib_tests/PgStorage_test.res index 1df64d79b..b90289458 100644 --- a/scenarios/test_codegen/test/lib_tests/PgStorage_test.res +++ b/scenarios/test_codegen/test/lib_tests/PgStorage_test.res @@ -622,7 +622,7 @@ WHERE cp."block_hash" IS NOT NULL Async.it( "Should create correct SQL for single chain config", async () => { - let chainConfig: InternalConfig.chain = { + let chainConfig: Config.chain = { id: 1, startBlock: 100, endBlock: 200, @@ -650,7 +650,7 @@ VALUES (1, 100, 200, 5, 0, NULL, -1, -1, NULL, 0, false, 0);` Async.it( "Should create correct SQL for single chain config with no end block", async () => { - let chainConfig: InternalConfig.chain = { + let chainConfig: Config.chain = { id: 1, startBlock: 100, maxReorgDepth: 5, @@ -677,7 +677,7 @@ VALUES (1, 100, NULL, 5, 0, NULL, -1, -1, NULL, 0, false, 0);` Async.it( "Should create correct SQL for multiple chain configs", async () => { - let chainConfig1: InternalConfig.chain = { + let chainConfig1: Config.chain = { id: 1, startBlock: 100, endBlock: 200, @@ -686,7 +686,7 @@ VALUES (1, 100, NULL, 5, 0, NULL, -1, -1, NULL, 0, false, 0);` sources: [], } - let chainConfig2: InternalConfig.chain = { + let chainConfig2: Config.chain = { id: 42, startBlock: 500, maxReorgDepth: 0, diff --git a/scenarios/test_codegen/test/rollback/ChainDataHelpers.res b/scenarios/test_codegen/test/rollback/ChainDataHelpers.res index 449db8f2e..ab571b70f 100644 --- a/scenarios/test_codegen/test/rollback/ChainDataHelpers.res +++ b/scenarios/test_codegen/test/rollback/ChainDataHelpers.res @@ -14,8 +14,7 @@ let makeTransaction = (~transactionIndex, ~transactionHash) => module Gravatar = { let contractName = "Gravatar" - let chainConfig = - RegisterHandlers.registerAllHandlers().chainMap->ChainMap.get(MockConfig.chain1337) + let chainConfig = Generated.getIndexer().config.chainMap->ChainMap.get(MockConfig.chain1337) let contract = chainConfig.contracts->Js.Array2.find(c => c.name == contractName)->Option.getExn let defaultAddress = contract.addresses[0]->Option.getExn diff --git a/scenarios/test_codegen/test/rollback/MockChainData_test.res b/scenarios/test_codegen/test/rollback/MockChainData_test.res index 12e44ada2..bde923850 100644 --- a/scenarios/test_codegen/test/rollback/MockChainData_test.res +++ b/scenarios/test_codegen/test/rollback/MockChainData_test.res @@ -3,9 +3,7 @@ open RescriptMocha describe("Check that MockChainData works as expected", () => { let mockChainDataInit = MockChainData.make( - ~chainConfig=RegisterHandlers.registerAllHandlers().chainMap->ChainMap.get( - MockConfig.chain1337, - ), + ~chainConfig=Generated.getIndexer().config.chainMap->ChainMap.get(MockConfig.chain1337), ~maxBlocksReturned=3, ~blockTimestampInterval=25, ) diff --git a/scenarios/test_codegen/test/rollback/Rollback_test.res b/scenarios/test_codegen/test/rollback/Rollback_test.res index 54f0af712..903e1c262 100644 --- a/scenarios/test_codegen/test/rollback/Rollback_test.res +++ b/scenarios/test_codegen/test/rollback/Rollback_test.res @@ -1,404 +1,14 @@ open Belt open RescriptMocha -module M = Mock - -let config = RegisterHandlers.registerAllHandlers() -// Keep only chain1337 -let config = Config.make( - ~shouldRollbackOnReorg=true, - ~shouldSaveFullHistory=false, - ~isUnorderedMultichainMode=false, - ~chains=config.chainMap - ->ChainMap.entries - ->Array.keepMap(((chain, config)) => chain == MockConfig.chain1337 ? Some(config) : None), - ~enableRawEvents=false, - ~registrations=?config.registrations, -) - -module Mock = { - let mockChainDataEmpty = MockChainData.make( - ~chainConfig=config.chainMap->ChainMap.get(MockConfig.chain1337), - ~maxBlocksReturned=3, - ~blockTimestampInterval=25, - ) - - open ChainDataHelpers.Gravatar - let blocksBase = [ - [], - [ - NewGravatar.mkEventConstr(MockEvents.newGravatar1), - NewGravatar.mkEventConstr(MockEvents.newGravatar2), - ], - ] - let blocksInitial = - blocksBase->Array.concat([ - [UpdatedGravatar.mkEventConstr(MockEvents.setGravatar1)], - [UpdatedGravatar.mkEventConstr(MockEvents.setGravatar2)], - [ - NewGravatar.mkEventConstr(MockEvents.newGravatar3), - UpdatedGravatar.mkEventConstr(MockEvents.setGravatar3), - ], - ]) - - let blocksReorg = - blocksBase->Array.concat([ - [UpdatedGravatar.mkEventConstr(MockEvents.setGravatar2)], - [UpdatedGravatar.mkEventConstr(MockEvents.setGravatar1)], - [ - NewGravatar.mkEventConstr(MockEvents.newGravatar3), - UpdatedGravatar.mkEventConstr(MockEvents.setGravatar3), - ], - ]) - - let applyBlocks = mcd => - mcd->Array.reduce(mockChainDataEmpty, (accum, next) => { - accum->MockChainData.addBlock(~makeLogConstructors=next) - }) - - let mockChainData = blocksInitial->applyBlocks - let mockChainDataReorg = blocksReorg->applyBlocks -} - -module Stubs = { - //Stub wait for new block - let waitForNewBlock = async (_sourceManager, ~currentBlockHeight) => { - currentBlockHeight->ignore - Mock.mockChainData->MockChainData.getHeight - } - - //Stub executePartitionQuery with mock data - let executePartitionQueryWithMockChainData = mockChainData => async ( - _, - ~query, - ~currentBlockHeight as _, - ) => { - mockChainData->MockChainData.executeQuery(query) - } - - //Stub for getting block hashes instead of the worker - let getBlockHashes = mockChainData => async (~blockNumbers, ~logger as _) => - mockChainData->MockChainData.getBlockHashes(~blockNumbers)->Ok - - //Hold next tasks temporarily here so they do not get actioned off automatically - let tasks = ref([]) - - //Stub dispatch action to set state and not dispatch task but store in - //the tasks ref - let dispatchAction = (gsManager, action) => { - let (nextState, nextTasks) = GlobalState.actionReducer( - gsManager->GlobalStateManager.getState, - action, - ) - gsManager->GlobalStateManager.setState(nextState) - tasks := tasks.contents->Array.concat(nextTasks) - } - - let dispatchTask = (gsManager, mockChainData, task) => { - GlobalState.injectedTaskReducer( - ~executeQuery=executePartitionQueryWithMockChainData(mockChainData), - ~waitForNewBlock, - ~getLastKnownValidBlock=(chainFetcher, ~reorgBlockNumber) => - chainFetcher->ChainFetcher.getLastKnownValidBlock( - ~reorgBlockNumber, - ~getBlockHashes=getBlockHashes(mockChainData), - ), - )( - ~dispatchAction=action => dispatchAction(gsManager, action), - gsManager->GlobalStateManager.getState, - task, - ) - } - - let dispatchAllTasks = async (gsManager, mockChainData) => { - let tasksToRun = tasks.contents - tasks := [] - for idx in 0 to tasksToRun->Array.length - 1 { - let taskToRun = tasksToRun->Array.getUnsafe(idx) - await dispatchTask(gsManager, mockChainData, taskToRun) - } - } -} - -module Sql = { - /** -NOTE: Do not use this for queries in the indexer - -Exposing -*/ - @send - external unsafe: (Postgres.sql, string) => promise<'a> = "unsafe" - - let query = unsafe(Db.sql, _) - - let getAllRowsInTable = tableName => query(`SELECT * FROM public."${tableName}";`) -} - -let setupDb = async () => { - open Migrations - Logging.info("Provisioning Database") - let _exitCodeUp = await runUpMigrations(~shouldExit=false, ~reset=true) -} - -describe("Single Chain Simple Rollback", () => { - Async.it("Detects reorgs and actions a rollback", async () => { - await setupDb() - - let chainManager = ChainManager.makeFromConfig(~config) - let initState = GlobalState.make(~config, ~chainManager) - let gsManager = initState->GlobalStateManager.make - let chain = MockConfig.chain1337 - let getState = () => gsManager->GlobalStateManager.getState - let getChainFetcher = () => getState().chainManager.chainFetchers->ChainMap.get(chain) - - open Stubs - let dispatchTaskInitalChain = dispatchTask(gsManager, Mock.mockChainData, ...) - let dispatchTaskReorgChain = dispatchTask(gsManager, Mock.mockChainDataReorg, ...) - let dispatchAllTasksInitalChain = () => dispatchAllTasks(gsManager, Mock.mockChainData) - tasks := [] - - await dispatchTaskInitalChain(NextQuery(Chain(chain))) - - Assert.deepEqual(tasks.contents, [NextQuery(CheckAllChains)]) - - await dispatchAllTasksInitalChain() - - Assert.deepEqual(tasks.contents->Utils.getVariantsTags, ["ProcessPartitionQueryResponse"]) - // Assert.deepEqual( - // tasks.contents->Js.Array2.unsafe_get(0), - // UpdateEndOfBlockRangeScannedData({ - // blockNumberThreshold: -198, - // chain: MockConfig.chain1337, - // nextEndOfBlockRangeScannedData: { - // blockHash: block2.blockHash, - // blockNumber: block2.blockNumber, - // chainId: 1337, - // }, - // }), - // ) - - await dispatchAllTasksInitalChain() - - Assert.deepEqual( - tasks.contents, - [UpdateChainMetaDataAndCheckForExit(NoExit), ProcessEventBatch, NextQuery(Chain(chain))], - ~message="should successfully have actioned batch", - ) - - Assert.equal( - getChainFetcher().fetchState->FetchState.bufferSize, - 3, - ~message="should have 3 events on the queue from the first 3 blocks of inital chainData", - ) - - tasks := [] - await dispatchTaskReorgChain(NextQuery(Chain(chain))) - Assert.deepEqual(tasks.contents, [Rollback], ~message="should detect rollback with reorg chain") - }) - - Async.it("Successfully rolls back single chain indexer to expected values", async () => { - await setupDb() - - let chainManager = { - ...ChainManager.makeFromConfig(~config), - multichain: Unordered, - } - let initState = GlobalState.make(~config, ~chainManager) - let gsManager = initState->GlobalStateManager.make - let chain = MockConfig.chain1337 - let getState = () => gsManager->GlobalStateManager.getState - let getChainFetcher = () => getState().chainManager.chainFetchers->ChainMap.get(chain) - - open Stubs - let dispatchTaskInitalChain = dispatchTask(gsManager, Mock.mockChainData, ...) - let dispatchAllTasksInitalChain = () => dispatchAllTasks(gsManager, Mock.mockChainData, ...) - let dispatchAllTasksReorgChain = () => dispatchAllTasks(gsManager, Mock.mockChainDataReorg, ...) - tasks := [] - - await dispatchTaskInitalChain(NextQuery(Chain(chain))) - - Assert.deepEqual(tasks.contents, [NextQuery(CheckAllChains)]) - - await dispatchAllTasksInitalChain() - - Assert.deepEqual(tasks.contents->Utils.getVariantsTags, ["ProcessPartitionQueryResponse"]) - // Assert.deepEqual( - // tasks.contents->Js.Array2.unsafe_get(0), - // UpdateEndOfBlockRangeScannedData({ - // blockNumberThreshold: -198, - // chain: MockConfig.chain1337, - // nextEndOfBlockRangeScannedData: { - // blockHash: block2.blockHash, - // blockNumber: block2.blockNumber, - // chainId: 1337, - // }, - // }), - // ) - - await dispatchAllTasksInitalChain() - - Assert.deepEqual( - tasks.contents, - [UpdateChainMetaDataAndCheckForExit(NoExit), ProcessEventBatch, NextQuery(Chain(chain))], - ~message="should successfully have processed batch", - ) - - Assert.equal( - getChainFetcher().fetchState->FetchState.bufferSize, - 3, - ~message="should have 3 events on the queue from the first 3 blocks of inital chainData", - ) - - await dispatchAllTasksReorgChain() - - let getAllGravatars = async () => - (await Sql.getAllRowsInTable("Gravatar"))->Array.map( - S.parseJsonOrThrow(_, Entities.Gravatar.schema), - ) - - let gravatars = await getAllGravatars() - - let toBigInt = BigInt.fromInt - let toString = BigInt.toString - - let expectedGravatars: array = [ - { - displayName: MockEvents.setGravatar1.displayName, - id: MockEvents.setGravatar1.id->toString, - imageUrl: MockEvents.setGravatar1.imageUrl, - owner_id: MockEvents.setGravatar1.owner->Utils.magic, - size: MEDIUM, - updatesCount: 2->toBigInt, - }, - { - displayName: MockEvents.newGravatar2.displayName, - id: MockEvents.newGravatar2.id->toString, - imageUrl: MockEvents.newGravatar2.imageUrl, - owner_id: MockEvents.newGravatar2.owner->Utils.magic, - size: SMALL, - updatesCount: 1->toBigInt, - }, - ] - - Assert.deepEqual( - gravatars, - expectedGravatars, - ~message="2 Gravatars should have been set and the first one updated in the first 3 events", - ) - - Assert.deepEqual( - tasks.contents, - [ - GlobalState.NextQuery(CheckAllChains), - UpdateChainMetaDataAndCheckForExit(NoExit), - ProcessEventBatch, - PruneStaleEntityHistory, - Rollback, - ], - ~message="should detect rollback with reorg chain", - ) - - await dispatchAllTasksReorgChain() - - Assert.deepEqual( - tasks.contents, - [Rollback], - ~message="Should finishe processing current batch and fire rollback again", - ) - - await dispatchAllTasksReorgChain() - - Assert.deepEqual( - tasks.contents, - [GlobalState.NextQuery(CheckAllChains), ProcessEventBatch], - ~message="Rollback should have actioned, and now next queries and process event batch should action", - ) - - await dispatchAllTasksReorgChain() - - Assert.deepEqual(tasks.contents->Utils.getVariantsTags, ["ProcessPartitionQueryResponse"]) - // Assert.deepEqual( - // tasks.contents->Js.Array2.unsafe_get(0), - // GlobalState.UpdateEndOfBlockRangeScannedData({ - // blockNumberThreshold: -198, - // chain: MockConfig.chain1337, - // nextEndOfBlockRangeScannedData: { - // blockHash: block2.blockHash, - // blockNumber: block2.blockNumber, - // chainId: 1337, - // }, - // }), - // ) - - await dispatchAllTasksReorgChain() - - Assert.deepEqual( - tasks.contents, - [UpdateChainMetaDataAndCheckForExit(NoExit), ProcessEventBatch, NextQuery(Chain(chain))], - ~message="Query should have returned with batch to process", - ) - - await dispatchAllTasksReorgChain() - - Assert.deepEqual( - tasks.contents->Utils.getVariantsTags, - [ - "NextQuery", - "UpdateChainMetaDataAndCheckForExit", - "ProcessEventBatch", - "PruneStaleEntityHistory", - "ProcessPartitionQueryResponse", - ], - ) - // Assert.deepEqual( - // tasks.contents->Js.Array2.unsafe_get(1), - // GlobalState.UpdateEndOfBlockRangeScannedData({ - // blockNumberThreshold: -196, - // chain: MockConfig.chain1337, - // nextEndOfBlockRangeScannedData: { - // blockHash: block4.blockHash, - // blockNumber: block4.blockNumber, - // chainId: 1337, - // }, - // }), - // ) - - let expectedGravatars: array = [ - { - displayName: MockEvents.newGravatar1.displayName, - id: MockEvents.newGravatar1.id->toString, - imageUrl: MockEvents.newGravatar1.imageUrl, - owner_id: MockEvents.newGravatar1.owner->Utils.magic, - size: SMALL, - updatesCount: 1->toBigInt, - }, - { - displayName: MockEvents.setGravatar2.displayName, - id: MockEvents.setGravatar2.id->toString, - imageUrl: MockEvents.setGravatar2.imageUrl, - owner_id: MockEvents.setGravatar2.owner->Utils.magic, - size: MEDIUM, - updatesCount: 2->toBigInt, - }, - ] - - let gravatars = await getAllGravatars() - Assert.deepEqual( - gravatars, - expectedGravatars, - ~message="First gravatar should roll back and change and second should have received an update", - ) - }) -}) - // A workaround for ReScript v11 issue, where it makes the field optional // instead of setting a value to undefined. It's fixed in v12. let undefined = (%raw(`undefined`): option<'a>) describe("E2E rollback tests", () => { let testSingleChainRollback = async ( - ~sourceMock: M.Source.t, - ~indexerMock: M.Indexer.t, + ~sourceMock: Mock.Source.t, + ~indexerMock: Mock.Indexer.t, ~firstHistoryCheckpointId=2, ) => { Assert.deepEqual( @@ -689,30 +299,30 @@ describe("E2E rollback tests", () => { } Async.it("Should re-enter reorg threshold on restart", async () => { - let sourceMock1337 = M.Source.make( + let sourceMock1337 = Mock.Source.make( [#getHeightOrThrow, #getItemsOrThrow, #getBlockHashes], ~chain=#1337, ) - let sourceMock100 = M.Source.make( + let sourceMock100 = Mock.Source.make( [#getHeightOrThrow, #getItemsOrThrow, #getBlockHashes], ~chain=#100, ) let chains = [ { - M.Indexer.chain: #1337, + Mock.Indexer.chain: #1337, sources: [sourceMock1337.source], }, { - M.Indexer.chain: #100, + Mock.Indexer.chain: #100, sources: [sourceMock100.source], }, ] - let indexerMock = await M.Indexer.make(~chains) + let indexerMock = await Mock.Indexer.make(~chains) await Utils.delay(0) let _ = await Promise.all2(( - M.Helper.initialEnterReorgThreshold(~indexerMock, ~sourceMock=sourceMock1337), - M.Helper.initialEnterReorgThreshold(~indexerMock, ~sourceMock=sourceMock100), + Mock.Helper.initialEnterReorgThreshold(~indexerMock, ~sourceMock=sourceMock1337), + Mock.Helper.initialEnterReorgThreshold(~indexerMock, ~sourceMock=sourceMock100), )) Assert.deepEqual( @@ -788,11 +398,11 @@ describe("E2E rollback tests", () => { }) Async.it("Rollback of a single chain indexer", async () => { - let sourceMock = M.Source.make( + let sourceMock = Mock.Source.make( [#getHeightOrThrow, #getItemsOrThrow, #getBlockHashes], ~chain=#1337, ) - let indexerMock = await M.Indexer.make( + let indexerMock = await Mock.Indexer.make( ~chains=[ { chain: #1337, @@ -802,18 +412,18 @@ describe("E2E rollback tests", () => { ) await Utils.delay(0) - await M.Helper.initialEnterReorgThreshold(~indexerMock, ~sourceMock) + await Mock.Helper.initialEnterReorgThreshold(~indexerMock, ~sourceMock) await testSingleChainRollback(~sourceMock, ~indexerMock) }) Async.it( "Stores checkpoints inside of the reorg threshold for batches without items", async () => { - let sourceMock = M.Source.make( + let sourceMock = Mock.Source.make( [#getHeightOrThrow, #getItemsOrThrow, #getBlockHashes], ~chain=#1337, ) - let indexerMock = await M.Indexer.make( + let indexerMock = await Mock.Indexer.make( ~chains=[ { chain: #1337, @@ -823,7 +433,7 @@ describe("E2E rollback tests", () => { ) await Utils.delay(0) - await M.Helper.initialEnterReorgThreshold(~indexerMock, ~sourceMock) + await Mock.Helper.initialEnterReorgThreshold(~indexerMock, ~sourceMock) sourceMock.resolveGetItemsOrThrow([], ~latestFetchedBlockNumber=102) @@ -846,11 +456,11 @@ describe("E2E rollback tests", () => { ) Async.it("Shouldn't detect reorg for rollbacked block", async () => { - let sourceMock = M.Source.make( + let sourceMock = Mock.Source.make( [#getHeightOrThrow, #getItemsOrThrow, #getBlockHashes], ~chain=#1337, ) - let indexerMock = await M.Indexer.make( + let indexerMock = await Mock.Indexer.make( ~chains=[ { chain: #1337, @@ -860,7 +470,7 @@ describe("E2E rollback tests", () => { ) await Utils.delay(0) - await M.Helper.initialEnterReorgThreshold(~indexerMock, ~sourceMock) + await Mock.Helper.initialEnterReorgThreshold(~indexerMock, ~sourceMock) sourceMock.resolveGetItemsOrThrow([], ~latestFetchedBlockNumber=102) await indexerMock.getBatchWritePromise() @@ -926,15 +536,15 @@ describe("E2E rollback tests", () => { Async.it( "Single chain rollback should also work for unordered multichain indexer when another chains are stale", async () => { - let sourceMock1 = M.Source.make( + let sourceMock1 = Mock.Source.make( [#getHeightOrThrow, #getItemsOrThrow, #getBlockHashes], ~chain=#1337, ) - let sourceMock2 = M.Source.make( + let sourceMock2 = Mock.Source.make( [#getHeightOrThrow, #getItemsOrThrow, #getBlockHashes], ~chain=#100, ) - let indexerMock = await M.Indexer.make( + let indexerMock = await Mock.Indexer.make( ~chains=[ { chain: #1337, @@ -949,8 +559,8 @@ describe("E2E rollback tests", () => { await Utils.delay(0) let _ = await Promise.all2(( - M.Helper.initialEnterReorgThreshold(~indexerMock, ~sourceMock=sourceMock1), - M.Helper.initialEnterReorgThreshold(~indexerMock, ~sourceMock=sourceMock2), + Mock.Helper.initialEnterReorgThreshold(~indexerMock, ~sourceMock=sourceMock1), + Mock.Helper.initialEnterReorgThreshold(~indexerMock, ~sourceMock=sourceMock2), )) await testSingleChainRollback( @@ -962,11 +572,11 @@ describe("E2E rollback tests", () => { ) Async.it("Rollback Dynamic Contract", async () => { - let sourceMock = M.Source.make( + let sourceMock = Mock.Source.make( [#getHeightOrThrow, #getItemsOrThrow, #getBlockHashes], ~chain=#1337, ) - let indexerMock = await M.Indexer.make( + let indexerMock = await Mock.Indexer.make( ~chains=[ { chain: #1337, @@ -976,7 +586,7 @@ describe("E2E rollback tests", () => { ) await Utils.delay(0) - await M.Helper.initialEnterReorgThreshold(~indexerMock, ~sourceMock) + await Mock.Helper.initialEnterReorgThreshold(~indexerMock, ~sourceMock) let calls = [] let handler: Types.HandlerTypes.loader = async ({event}) => { @@ -1212,15 +822,15 @@ This might be wrong after we start exposing a block hash for progress block.`, }) Async.it("Rollback of unordered multichain indexer (single entity id change)", async () => { - let sourceMock1337 = M.Source.make( + let sourceMock1337 = Mock.Source.make( [#getHeightOrThrow, #getItemsOrThrow, #getBlockHashes], ~chain=#1337, ) - let sourceMock100 = M.Source.make( + let sourceMock100 = Mock.Source.make( [#getHeightOrThrow, #getItemsOrThrow, #getBlockHashes], ~chain=#100, ) - let indexerMock = await M.Indexer.make( + let indexerMock = await Mock.Indexer.make( ~chains=[ { chain: #1337, @@ -1235,8 +845,8 @@ This might be wrong after we start exposing a block hash for progress block.`, await Utils.delay(0) let _ = await Promise.all2(( - M.Helper.initialEnterReorgThreshold(~indexerMock, ~sourceMock=sourceMock1337), - M.Helper.initialEnterReorgThreshold(~indexerMock, ~sourceMock=sourceMock100), + Mock.Helper.initialEnterReorgThreshold(~indexerMock, ~sourceMock=sourceMock1337), + Mock.Helper.initialEnterReorgThreshold(~indexerMock, ~sourceMock=sourceMock100), )) let callCount = ref(0) @@ -1605,15 +1215,15 @@ This might be wrong after we start exposing a block hash for progress block.`, Async.it( "Rollback of unordered multichain indexer (single entity id change + another entity on non-reorg chain)", async () => { - let sourceMock1337 = M.Source.make( + let sourceMock1337 = Mock.Source.make( [#getHeightOrThrow, #getItemsOrThrow, #getBlockHashes], ~chain=#1337, ) - let sourceMock100 = M.Source.make( + let sourceMock100 = Mock.Source.make( [#getHeightOrThrow, #getItemsOrThrow, #getBlockHashes], ~chain=#100, ) - let indexerMock = await M.Indexer.make( + let indexerMock = await Mock.Indexer.make( ~chains=[ { chain: #1337, @@ -1628,8 +1238,8 @@ This might be wrong after we start exposing a block hash for progress block.`, await Utils.delay(0) let _ = await Promise.all2(( - M.Helper.initialEnterReorgThreshold(~indexerMock, ~sourceMock=sourceMock1337), - M.Helper.initialEnterReorgThreshold(~indexerMock, ~sourceMock=sourceMock100), + Mock.Helper.initialEnterReorgThreshold(~indexerMock, ~sourceMock=sourceMock1337), + Mock.Helper.initialEnterReorgThreshold(~indexerMock, ~sourceMock=sourceMock100), )) let callCount = ref(0) @@ -2004,15 +1614,15 @@ This might be wrong after we start exposing a block hash for progress block.`, Async.it( "Rollback of ordered multichain indexer (single entity id change + another entity on non-reorg chain)", async () => { - let sourceMock1337 = M.Source.make( + let sourceMock1337 = Mock.Source.make( [#getHeightOrThrow, #getItemsOrThrow, #getBlockHashes], ~chain=#1337, ) - let sourceMock100 = M.Source.make( + let sourceMock100 = Mock.Source.make( [#getHeightOrThrow, #getItemsOrThrow, #getBlockHashes], ~chain=#100, ) - let indexerMock = await M.Indexer.make( + let indexerMock = await Mock.Indexer.make( ~chains=[ { chain: #1337, @@ -2028,8 +1638,8 @@ This might be wrong after we start exposing a block hash for progress block.`, await Utils.delay(0) let _ = await Promise.all2(( - M.Helper.initialEnterReorgThreshold(~indexerMock, ~sourceMock=sourceMock1337), - M.Helper.initialEnterReorgThreshold(~indexerMock, ~sourceMock=sourceMock100), + Mock.Helper.initialEnterReorgThreshold(~indexerMock, ~sourceMock=sourceMock1337), + Mock.Helper.initialEnterReorgThreshold(~indexerMock, ~sourceMock=sourceMock100), )) let callCount = ref(0) @@ -2407,11 +2017,11 @@ Sorted by timestamp and chain id`, ) Async.it("Double reorg should NOT cause negative event counter (regression test)", async () => { - let sourceMock = M.Source.make( + let sourceMock = Mock.Source.make( [#getHeightOrThrow, #getItemsOrThrow, #getBlockHashes], ~chain=#1337, ) - let indexerMock = await M.Indexer.make( + let indexerMock = await Mock.Indexer.make( ~chains=[ { chain: #1337, @@ -2421,7 +2031,7 @@ Sorted by timestamp and chain id`, ) await Utils.delay(0) - await M.Helper.initialEnterReorgThreshold(~indexerMock, ~sourceMock) + await Mock.Helper.initialEnterReorgThreshold(~indexerMock, ~sourceMock) sourceMock.resolveGetItemsOrThrow([]) await indexerMock.getBatchWritePromise() diff --git a/scenarios/test_codegen/test/schema_types/BigDecimal_test.res b/scenarios/test_codegen/test/schema_types/BigDecimal_test.res index 95a50b729..1d68ee6f8 100644 --- a/scenarios/test_codegen/test/schema_types/BigDecimal_test.res +++ b/scenarios/test_codegen/test/schema_types/BigDecimal_test.res @@ -13,7 +13,7 @@ describe("Load and save an entity with a BigDecimal from DB", () => { Async.it("be able to set and read entities with BigDecimal from DB", async () => { This.timeout(5 * 1000) - let sql = Db.sql + let sql = Generated.codegenPersistence.sql /// Setup DB let testEntity1: Entities.EntityWithBigDecimal.t = { id: "testEntity", @@ -31,7 +31,7 @@ describe("Load and save an entity with a BigDecimal from DB", () => { ], ~table=Entities.EntityWithBigDecimal.table, ~itemSchema=Entities.EntityWithBigDecimal.schema, - ~pgSchema=Config.storagePgSchema, + ~pgSchema=Generated.storagePgSchema, ) let inMemoryStore = InMemoryStore.make() @@ -45,7 +45,7 @@ describe("Load and save an entity with a BigDecimal from DB", () => { let handlerContext = UserContext.getHandlerContext({ item, loadManager, - persistence: Config.codegenPersistence, + persistence: Generated.codegenPersistence, inMemoryStore, shouldSaveHistory: false, isPreload: false, diff --git a/scenarios/test_codegen/test/schema_types/Timestamp_test.res b/scenarios/test_codegen/test/schema_types/Timestamp_test.res index 582def98c..22e06be53 100644 --- a/scenarios/test_codegen/test/schema_types/Timestamp_test.res +++ b/scenarios/test_codegen/test/schema_types/Timestamp_test.res @@ -13,7 +13,7 @@ describe("Load and save an entity with a Timestamp from DB", () => { Async.it("be able to set and read entities with Timestamp from DB", async () => { This.timeout(5 * 1000) - let sql = Db.sql + let sql = Generated.codegenPersistence.sql /// Setup DB let testEntity: Entities.EntityWithTimestamp.t = { id: "testEntity", @@ -23,7 +23,7 @@ describe("Load and save an entity with a Timestamp from DB", () => { ~items=[testEntity->Entities.EntityWithTimestamp.castToInternal], ~table=Entities.EntityWithTimestamp.table, ~itemSchema=Entities.EntityWithTimestamp.schema, - ~pgSchema=Config.storagePgSchema, + ~pgSchema=Generated.storagePgSchema, ) let inMemoryStore = InMemoryStore.make() @@ -37,7 +37,7 @@ describe("Load and save an entity with a Timestamp from DB", () => { let handlerContext = UserContext.getHandlerContext({ item, loadManager, - persistence: Config.codegenPersistence, + persistence: Generated.codegenPersistence, inMemoryStore, shouldSaveHistory: false, isPreload: false,