33{-# LANGUAGE DeriveGeneric #-}
44{-# LANGUAGE DerivingVia #-}
55{-# LANGUAGE FlexibleContexts #-}
6- {-# LANGUAGE GADTs #-}
6+ {-# LANGUAGE FlexibleInstances #-}
77{-# LANGUAGE LambdaCase #-}
8+ {-# LANGUAGE MultiParamTypeClasses #-}
89{-# LANGUAGE NamedFieldPuns #-}
9- {-# LANGUAGE OverloadedStrings #-}
1010{-# LANGUAGE PatternSynonyms #-}
1111{-# LANGUAGE ScopedTypeVariables #-}
12+ {-# LANGUAGE TupleSections #-}
1213{-# LANGUAGE TypeApplications #-}
14+ {-# LANGUAGE TypeFamilies #-}
1315{-# LANGUAGE TypeOperators #-}
16+ {-# LANGUAGE UndecidableInstances #-}
1417
1518-- | A 'BackingStore' implementation based on [LMDB](http://www.lmdb.tech/doc/).
1619module Ouroboros.Consensus.Storage.LedgerDB.V1.BackingStore.Impl.LMDB
1720 ( -- * Opening a database
18- LMDBLimits (LMDBLimits , lmdbMapSize , lmdbMaxDatabases , lmdbMaxReaders )
19- , newLMDBBackingStore
20-
21- -- * Errors
21+ LMDB
22+ , Backend (.. )
23+ , Args (LMDBBackingStoreArgs )
24+ , LMDBLimits (LMDBLimits , lmdbMapSize , lmdbMaxDatabases , lmdbMaxReaders )
25+ , mkLMDBArgs
26+
27+ -- * Streaming
28+ , YieldArgs (YieldLMDB )
29+ , mkLMDBYieldArgs
30+ , SinkArgs (SinkLMDB )
31+ , mkLMDBSinkArgs
32+
33+ -- * Exposed for testing
2234 , LMDBErr (.. )
23-
24- -- * Internals exposed for @snapshot-converter@
25- , DbSeqNo (.. )
26- , LMDBMK (.. )
27- , getDb
28- , initLMDBTable
29- , withDbSeqNoRWMaybeNull
3035 ) where
3136
32- import Cardano.Slotting.Slot (SlotNo , WithOrigin (At ))
37+ import Cardano.Slotting.Slot (WithOrigin (At ))
3338import qualified Codec.Serialise as S (Serialise (.. ))
3439import qualified Control.Concurrent.Class.MonadSTM.TVar as IOLike
3540import Control.Monad (forM_ , unless , void , when )
3641import qualified Control.Monad.Class.MonadSTM as IOLike
3742import Control.Monad.IO.Class (MonadIO (liftIO ))
43+ import Control.Monad.Trans (lift )
44+ import Control.ResourceRegistry
3845import qualified Control.Tracer as Trace
3946import Data.Bifunctor (first )
4047import Data.Functor (($>) , (<&>) )
@@ -43,6 +50,7 @@ import Data.Map (Map)
4350import qualified Data.Map.Strict as Map
4451import Data.MemPack
4552import Data.Proxy
53+ import qualified Data.SOP.Dict as Dict
4654import qualified Data.Set as Set
4755import qualified Data.Text as Strict
4856import qualified Database.LMDB.Simple as LMDB
@@ -52,11 +60,17 @@ import qualified Database.LMDB.Simple.Internal as LMDB.Internal
5260import qualified Database.LMDB.Simple.TransactionHandle as TrH
5361import GHC.Generics (Generic )
5462import GHC.Stack (HasCallStack )
55- import Ouroboros.Consensus.Ledger.Tables
63+ import Ouroboros.Consensus.Block
64+ import Ouroboros.Consensus.Ledger.Basics
5665import qualified Ouroboros.Consensus.Ledger.Tables.Diff as Diff
66+ import Ouroboros.Consensus.Ledger.Tables.Utils (emptyLedgerTables )
67+ import Ouroboros.Consensus.Storage.LedgerDB.API
68+ import Ouroboros.Consensus.Storage.LedgerDB.Args
5769import Ouroboros.Consensus.Storage.LedgerDB.Snapshots
5870 ( SnapshotBackend (.. )
5971 )
72+ import qualified Ouroboros.Consensus.Storage.LedgerDB.V1.Args as V1
73+ import Ouroboros.Consensus.Storage.LedgerDB.V1.BackingStore
6074import qualified Ouroboros.Consensus.Storage.LedgerDB.V1.BackingStore.API as API
6175import qualified Ouroboros.Consensus.Storage.LedgerDB.V1.BackingStore.Impl.LMDB.Bridge as Bridge
6276import Ouroboros.Consensus.Storage.LedgerDB.V1.BackingStore.Impl.LMDB.Status
@@ -70,10 +84,17 @@ import Ouroboros.Consensus.Util.IOLike
7084 , IOLike
7185 , MonadCatch (.. )
7286 , MonadThrow (.. )
87+ , PrimState
7388 , bracket
7489 )
7590import Ouroboros.Consensus.Util.IndexedMemPack
91+ import qualified Streaming as S
92+ import qualified Streaming.Prelude as S
93+ import System.Directory
7694import qualified System.FS.API as FS
95+ import System.FS.IO
96+ import qualified System.FilePath as FilePath
97+ import System.IO.Temp
7798
7899{- ------------------------------------------------------------------------------
79100 Database definition
@@ -793,3 +814,187 @@ prettyPrintLMDBErr = \case
793814 LMDBErrNotADir path ->
794815 " The path " <> show path <> " should be a directory but it is a file instead."
795816 LMDBErrClosed -> " The database has been closed."
817+
818+ {- ------------------------------------------------------------------------------
819+ Backend
820+ -------------------------------------------------------------------------------}
821+
822+ data LMDB
823+
824+ instance
825+ ( HasLedgerTables l
826+ , MonadIO m
827+ , IOLike m
828+ , MemPackIdx l EmptyMK ~ l EmptyMK
829+ ) =>
830+ Backend m LMDB l
831+ where
832+ data Args m LMDB
833+ = LMDBBackingStoreArgs FilePath LMDBLimits (Dict. Dict MonadIOPrim m )
834+ data Trace m LMDB
835+ = OnDiskBackingStoreInitialise LMDB. Limits
836+ | OnDiskBackingStoreTrace BackingStoreTrace
837+ deriving (Eq , Show )
838+
839+ isRightBackendForSnapshot _ _ UTxOHDLMDBSnapshot = True
840+ isRightBackendForSnapshot _ _ _ = False
841+
842+ newBackingStoreInitialiser trcr (LMDBBackingStoreArgs fs limits Dict. Dict ) =
843+ newLMDBBackingStore
844+ (SomeBackendTrace . OnDiskBackingStoreTrace >$< trcr)
845+ limits
846+ (LiveLMDBFS $ FS. SomeHasFS $ ioHasFS $ FS. MountPoint fs)
847+
848+ -- | Create arguments for initializing the LedgerDB using the LMDB backend.
849+ mkLMDBArgs ::
850+ ( MonadIOPrim m
851+ , HasLedgerTables (LedgerState blk )
852+ , IOLike m
853+ ) =>
854+ V1. FlushFrequency -> FilePath -> LMDBLimits -> a -> (LedgerDbBackendArgs m blk , a )
855+ mkLMDBArgs flushing lmdbPath limits =
856+ (,) $
857+ LedgerDbBackendArgsV1 $
858+ V1. V1Args flushing $
859+ SomeBackendArgs $
860+ LMDBBackingStoreArgs lmdbPath limits Dict. Dict
861+
862+ class (MonadIO m , PrimState m ~ PrimState IO ) => MonadIOPrim m
863+ instance (MonadIO m , PrimState m ~ PrimState IO ) => MonadIOPrim m
864+
865+ {- ------------------------------------------------------------------------------
866+ Streaming
867+ -------------------------------------------------------------------------------}
868+
869+ instance (Ord (TxIn l ), GetTip l , Monad m ) => StreamingBackend m LMDB l where
870+ data SinkArgs m LMDB l
871+ = SinkLMDB
872+ -- \| Chunk size
873+ Int
874+ -- \| bsWrite
875+ ( SlotNo ->
876+ (l EmptyMK , l EmptyMK ) ->
877+ LedgerTables l DiffMK ->
878+ m ()
879+ )
880+ (l EmptyMK -> m () )
881+
882+ data YieldArgs m LMDB l
883+ = YieldLMDB
884+ Int
885+ (LedgerBackingStoreValueHandle m l )
886+
887+ yield _ (YieldLMDB chunkSize valueHandle ) = yieldLmdbS chunkSize valueHandle
888+ sink _ (SinkLMDB chunkSize write copy ) = sinkLmdbS chunkSize write copy
889+
890+ sinkLmdbS ::
891+ forall m l .
892+ (Ord (TxIn l ), GetTip l , Monad m ) =>
893+ Int ->
894+ (SlotNo -> (l EmptyMK , l EmptyMK ) -> LedgerTables l DiffMK -> m () ) ->
895+ (l EmptyMK -> m () ) ->
896+ Sink m l
897+ sinkLmdbS writeChunkSize bs copyTo hint s = do
898+ r <- go writeChunkSize mempty s
899+ lift $ copyTo hint
900+ pure (fmap (,Nothing ) r)
901+ where
902+ sl = withOrigin (error " unreachable" ) id $ pointSlot $ getTip hint
903+
904+ go 0 m s' = do
905+ lift $ bs sl (hint, hint) (LedgerTables $ DiffMK $ Diff. fromMapInserts m)
906+ go writeChunkSize mempty s'
907+ go n m s' = do
908+ mbs <- S. uncons s'
909+ case mbs of
910+ Nothing -> do
911+ lift $ bs sl (hint, hint) (LedgerTables $ DiffMK $ Diff. fromMapInserts m)
912+ S. effects s'
913+ Just ((k, v), s'') ->
914+ go (n - 1 ) (Map. insert k v m) s''
915+
916+ yieldLmdbS ::
917+ Monad m =>
918+ Int ->
919+ LedgerBackingStoreValueHandle m l ->
920+ Yield m l
921+ yieldLmdbS readChunkSize bsvh hint k = do
922+ r <- k (go (RangeQuery Nothing readChunkSize))
923+ lift $ S. effects r
924+ where
925+ go p = do
926+ (LedgerTables (ValuesMK values), mx) <- lift $ S. lift $ bsvhRangeRead bsvh hint p
927+ case mx of
928+ Nothing -> pure $ pure Nothing
929+ Just x -> do
930+ S. each $ Map. toList values
931+ go (RangeQuery (Just . LedgerTables . KeysMK $ Set. singleton x) readChunkSize)
932+
933+ -- | Create Yield args for LMDB
934+ mkLMDBYieldArgs ::
935+ forall l .
936+ ( HasCallStack
937+ , HasLedgerTables l
938+ , MemPackIdx l EmptyMK ~ l EmptyMK
939+ ) =>
940+ FilePath ->
941+ LMDBLimits ->
942+ l EmptyMK ->
943+ ResourceRegistry IO ->
944+ IO (YieldArgs IO LMDB l )
945+ mkLMDBYieldArgs fp limits hint reg = do
946+ let (dbPath, snapName) = FilePath. splitFileName fp
947+ tempDir <- getCanonicalTemporaryDirectory
948+ let lmdbTemp = tempDir FilePath. </> " lmdb_streaming_in"
949+ removePathForcibly lmdbTemp
950+ _ <-
951+ allocate
952+ reg
953+ (\ _ -> createDirectory lmdbTemp)
954+ (\ _ -> removePathForcibly lmdbTemp)
955+ (_, bs) <-
956+ allocate
957+ reg
958+ ( \ _ -> do
959+ newLMDBBackingStore
960+ Trace. nullTracer
961+ limits
962+ (LiveLMDBFS $ FS. SomeHasFS $ ioHasFS $ FS. MountPoint lmdbTemp)
963+ (SnapshotsFS $ FS. SomeHasFS $ ioHasFS $ FS. MountPoint dbPath)
964+ (InitFromCopy hint (FS. mkFsPath [snapName]))
965+ )
966+ bsClose
967+ (_, bsvh) <- allocate reg (\ _ -> bsValueHandle bs) bsvhClose
968+ pure (YieldLMDB 1000 bsvh)
969+
970+ -- | Create Sink args for LMDB
971+ mkLMDBSinkArgs ::
972+ forall l .
973+ ( HasCallStack
974+ , HasLedgerTables l
975+ , MemPackIdx l EmptyMK ~ l EmptyMK
976+ ) =>
977+ FilePath ->
978+ LMDBLimits ->
979+ l EmptyMK ->
980+ ResourceRegistry IO ->
981+ IO (SinkArgs IO LMDB l )
982+ mkLMDBSinkArgs fp limits hint reg = do
983+ let (snapDir, snapName) = FilePath. splitFileName fp
984+ tempDir <- getCanonicalTemporaryDirectory
985+ let lmdbTemp = tempDir FilePath. </> " lmdb_streaming_out"
986+ removePathForcibly lmdbTemp
987+ _ <- allocate reg (\ _ -> createDirectory lmdbTemp) (\ _ -> removePathForcibly lmdbTemp)
988+ (_, bs) <-
989+ allocate
990+ reg
991+ ( \ _ ->
992+ newLMDBBackingStore
993+ Trace. nullTracer
994+ limits
995+ (LiveLMDBFS $ FS. SomeHasFS $ ioHasFS $ FS. MountPoint lmdbTemp)
996+ (SnapshotsFS $ FS. SomeHasFS $ ioHasFS $ FS. MountPoint snapDir)
997+ (InitFromValues (At 0 ) hint emptyLedgerTables)
998+ )
999+ bsClose
1000+ pure $ SinkLMDB 1000 (bsWrite bs) (\ h -> bsCopy bs h (FS. mkFsPath [snapName, " tables" ]))
0 commit comments