@@ -722,19 +722,7 @@ newForkerAtTarget ::
722722 ResourceRegistry m ->
723723 Target (Point blk ) ->
724724 m (Either GetForkerError (Forker m l blk ))
725- newForkerAtTarget h rr pt = getEnv h $ \ ldbEnv -> do
726- tv <- newTVarIO (pure () )
727- void $ allocate rr
728- (\ _ -> atomically $ do
729- -- Populate the tvar with the releasing action. Once we create the
730- -- forker the release will be done via `forkerClose` if an exception
731- -- makes us deallocate the forker
732- writeTVar tv (atomically $ unsafeReleaseReadAccess (ldbLock ldbEnv))
733- -- Acquire the read access
734- unsafeAcquireReadAccess (ldbLock ldbEnv)
735- )
736- (\ _ -> join $ readTVarIO tv)
737- runReadLocked (acquireAtTarget ldbEnv (Right pt) >>= traverse (newForker h ldbEnv tv rr))
725+ newForkerAtTarget h rr pt = withTransferrableReadAccess h rr (Right pt)
738726
739727newForkerByRollback ::
740728 ( HeaderHash l ~ HeaderHash blk
@@ -749,19 +737,41 @@ newForkerByRollback ::
749737 -- | How many blocks to rollback from the tip
750738 Word64 ->
751739 m (Either GetForkerError (Forker m l blk ))
752- newForkerByRollback h rr n = getEnv h $ \ ldbEnv -> do
740+ newForkerByRollback h rr n = withTransferrableReadAccess h rr (Left n)
741+
742+ -- | Acquire read access and then allocate a forker, acquiring it at the given
743+ -- point or rollback.
744+ withTransferrableReadAccess ::
745+ ( HeaderHash l ~ HeaderHash blk
746+ , IOLike m
747+ , IsLedger l
748+ , StandardHash l
749+ , HasLedgerTables l
750+ , LedgerSupportsProtocol blk
751+ ) =>
752+ LedgerDBHandle m l blk ->
753+ ResourceRegistry m ->
754+ Either Word64 (Target (Point blk )) ->
755+ m (Either GetForkerError (Forker m l blk ))
756+ withTransferrableReadAccess h rr f = getEnv h $ \ ldbEnv -> do
757+ -- This TVar will be used to maybe release the read lock by the resource
758+ -- registry. Once the forker was opened it will be emptied.
753759 tv <- newTVarIO (pure () )
754- void $ allocate rr
755- (\ _ -> atomically $ do
756- -- Populate the tvar with the releasing action. Once we create the
757- -- forker the release will be done via `forkerClose` if an exception
758- -- makes us deallocate the forker
759- writeTVar tv (atomically $ unsafeReleaseReadAccess (ldbLock ldbEnv))
760- -- Acquire the read access
761- unsafeAcquireReadAccess (ldbLock ldbEnv)
762- )
763- (\ _ -> join $ readTVarIO tv)
764- runReadLocked (acquireAtTarget ldbEnv (Left n) >>= traverse (newForker h ldbEnv tv rr))
760+ void $
761+ allocate
762+ rr
763+ ( \ _ -> atomically $ do
764+ -- Populate the tvar with the releasing action. Creating the forker will empty this
765+ writeTVar tv (atomically $ unsafeReleaseReadAccess (ldbLock ldbEnv))
766+ -- Acquire the read access
767+ unsafeAcquireReadAccess (ldbLock ldbEnv)
768+ )
769+ ( \ _ ->
770+ -- Run the contents of the releasing TVar which will be `pure ()` if
771+ -- the forker was opened.
772+ join $ readTVarIO tv
773+ )
774+ runReadLocked (acquireAtTarget ldbEnv f >>= traverse (newForker h ldbEnv tv rr))
765775
766776-- | Acquire both a value handle and a db changelog at the tip. Holds a read lock
767777-- while doing so.
@@ -819,6 +829,7 @@ acquireAtTarget ldbEnv target = readLocked $ runExceptT $ do
819829-------------------------------------------------------------------------------}
820830
821831newForker ::
832+ forall m l blk .
822833 ( IOLike m
823834 , HasLedgerTables l
824835 , LedgerSupportsProtocol blk
@@ -832,28 +843,36 @@ newForker ::
832843 ResourceRegistry m ->
833844 DbChangelog l ->
834845 ReadLocked m (Forker m l blk )
835- newForker h ldbEnv releaseVar rr dblog = readLocked $ do
836- dblogVar <- newTVarIO dblog
837- forkerKey <- atomically $ stateTVar (ldbNextForkerKey ldbEnv) $ \ r -> (r, r + 1 )
838- forkerMVar <- newMVar $ Left (ldbLock ldbEnv, ldbBackingStore ldbEnv, rr)
839- let forkerEnv =
840- ForkerEnv
841- { foeBackingStoreValueHandle = forkerMVar
842- , foeChangelog = dblogVar
843- , foeSwitchVar = ldbChangelog ldbEnv
844- , foeTracer =
845- LedgerDBForkerEvent . TraceForkerEventWithKey forkerKey >$< ldbTracer ldbEnv
846- }
847- atomically $ do
848- -- Note that we add the forkerEnv to the 'ldbForkers' so that an exception
849- -- which will close all the forkers, also closes this one, releasing the
850- -- read access we acquired above.
851- modifyTVar (ldbForkers ldbEnv) $ Map. insert forkerKey forkerEnv
852- -- And we tell the bracketOnError above to not release the lock as closing the
853- -- forker will.
854- writeTVar releaseVar (pure () )
855- traceWith (foeTracer forkerEnv) ForkerOpen
856- pure $ mkForker h (ldbQueryBatchSize ldbEnv) forkerKey forkerEnv
846+ newForker h ldbEnv releaseVar rr dblog =
847+ readLocked $
848+ fmap snd $
849+ allocate
850+ rr
851+ ( \ _ -> do
852+ dblogVar <- newTVarIO dblog
853+ forkerKey <- atomically $ stateTVar (ldbNextForkerKey ldbEnv) $ \ r -> (r, r + 1 )
854+ forkerMVar <- newMVar $ Left (ldbLock ldbEnv, ldbBackingStore ldbEnv, rr)
855+ let forkerEnv =
856+ ForkerEnv
857+ { foeBackingStoreValueHandle = forkerMVar
858+ , foeChangelog = dblogVar
859+ , foeSwitchVar = ldbChangelog ldbEnv
860+ , foeTracer =
861+ LedgerDBForkerEvent . TraceForkerEventWithKey forkerKey >$< ldbTracer ldbEnv
862+ }
863+ atomically $ do
864+ -- Note that we add the forkerEnv to the 'ldbForkers' so that an exception
865+ -- which will close all the forkers, also closes this one, releasing the
866+ -- read access we acquired above.
867+ modifyTVar (ldbForkers ldbEnv) $ Map. insert forkerKey forkerEnv
868+ -- Empty the tvar created for allocating the unsafe read access,
869+ -- so that it is the forker the one that takes care of releasing
870+ -- it.
871+ writeTVar releaseVar (pure () )
872+ traceWith (foeTracer forkerEnv) ForkerOpen
873+ pure $ (mkForker h (ldbQueryBatchSize ldbEnv) forkerKey forkerEnv)
874+ )
875+ forkerClose
857876
858877mkForker ::
859878 ( IOLike m
0 commit comments