Skip to content
This repository was archived by the owner on Nov 14, 2024. It is now read-only.

Commit caed2dd

Browse files
authored
Reduce allocations in VersionedEventStore#retentionEvents (#7118)
Reduce allocations in VersionedEventStore#retentionEvents
1 parent bc2f357 commit caed2dd

File tree

4 files changed

+43
-40
lines changed

4 files changed

+43
-40
lines changed

atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/keyvalue/api/watch/LockWatchEventLog.java

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ private LockWatchEventLog(ClientLockWatchSnapshot snapshot, CacheMetrics metrics
6262

6363
CacheUpdate processUpdate(LockWatchStateUpdate update) {
6464
if (latestVersion.isEmpty()
65-
|| !update.logId().equals(latestVersion.get().id())) {
65+
|| !update.logId().equals(latestVersion.orElseThrow().id())) {
6666
return update.accept(new NewLeaderVisitor());
6767
} else {
6868
return update.accept(new ProcessingVisitor());
@@ -134,11 +134,9 @@ private ClientLogEvents getEventsBetweenVersionsInternal(VersionBounds versionBo
134134
+ "transactions"));
135135
return ClientLogEvents.builder()
136136
.clearCache(false)
137-
.events(LockWatchEvents.builder()
138-
.addAllEvents(eventStore.getEventsBetweenVersionsInclusive(
139-
Optional.of(startVersion.get().version()),
140-
versionBounds.endVersion().version()))
141-
.build())
137+
.events(LockWatchEvents.of(eventStore.getEventsBetweenVersionsInclusive(
138+
Optional.of(startVersion.get().version()),
139+
versionBounds.endVersion().version())))
142140
.build();
143141
}
144142
}
@@ -174,8 +172,7 @@ private LockWatchEvent getCompressedSnapshot(VersionBounds versionBounds) {
174172
long snapshotVersion = versionBounds.snapshotVersion();
175173
Collection<LockWatchEvent> collapsibleEvents =
176174
eventStore.getEventsBetweenVersionsInclusive(Optional.empty(), snapshotVersion);
177-
LockWatchEvents events =
178-
LockWatchEvents.builder().addAllEvents(collapsibleEvents).build();
175+
LockWatchEvents events = LockWatchEvents.of(collapsibleEvents);
179176

180177
return LockWatchCreatedEvent.fromSnapshot(snapshot.getSnapshotWithEvents(events, versionBounds.leader()));
181178
}
@@ -211,7 +208,7 @@ private LockWatchVersion createStartVersion(LockWatchVersion startVersion) {
211208

212209
private LockWatchVersion getLatestVersionAndVerify(LockWatchVersion endVersion) {
213210
Preconditions.checkState(latestVersion.isPresent(), "Cannot get events when log does not know its version");
214-
LockWatchVersion currentVersion = latestVersion.get();
211+
LockWatchVersion currentVersion = latestVersion.orElseThrow();
215212
Preconditions.checkArgument(
216213
endVersion.version() <= currentVersion.version(),
217214
"Transactions' view of the world is more up-to-date than the log");
@@ -243,9 +240,8 @@ private void processSuccessInternal(LockWatchStateUpdate.Success success) {
243240
+ " should only happen very rarely.");
244241
}
245242

246-
if (success.lastKnownVersion() > latestVersion.get().version()) {
247-
LockWatchEvents events =
248-
LockWatchEvents.builder().events(success.events()).build();
243+
if (success.lastKnownVersion() > latestVersion.orElseThrow().version()) {
244+
LockWatchEvents events = LockWatchEvents.of(success.events());
249245
if (events.events().isEmpty()) {
250246
throw new TransactionLockWatchFailedException("Success event has a later version than the current "
251247
+ "version, but has no events to bridge the gap. The transaction should be retried, but this "

atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/keyvalue/api/watch/LockWatchEvents.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.palantir.atlasdb.keyvalue.api.watch;
1818

19+
import com.google.common.collect.ImmutableList;
1920
import com.google.common.collect.Range;
2021
import com.palantir.lock.watch.LockWatchEvent;
2122
import com.palantir.lock.watch.LockWatchVersion;
@@ -27,6 +28,8 @@
2728

2829
@Value.Immutable
2930
public interface LockWatchEvents {
31+
32+
@Value.Parameter
3033
List<LockWatchEvent> events();
3134

3235
@Value.Derived
@@ -55,7 +58,7 @@ default void contiguousSequence() {
5558
@Value.Check
5659
default void rangeOnlyPresentIffEventsAre() {
5760
if (events().isEmpty()) {
58-
Preconditions.checkState(!versionRange().isPresent(), "Cannot have a version range with no events");
61+
Preconditions.checkState(versionRange().isEmpty(), "Cannot have a version range with no events");
5962
} else {
6063
Preconditions.checkState(versionRange().isPresent(), "Non-empty events must have a version range");
6164
}
@@ -67,7 +70,7 @@ default void assertNoEventsAreMissingAfterLatestVersion(Optional<LockWatchVersio
6770
}
6871

6972
if (latestVersion.isPresent()) {
70-
long firstVersion = versionRange().get().lowerEndpoint();
73+
long firstVersion = versionRange().orElseThrow().lowerEndpoint();
7174
Preconditions.checkArgument(
7275
firstVersion <= latestVersion.get().version()
7376
|| latestVersion.get().version() + 1 == firstVersion,
@@ -80,4 +83,12 @@ default void assertNoEventsAreMissingAfterLatestVersion(Optional<LockWatchVersio
8083
static ImmutableLockWatchEvents.Builder builder() {
8184
return ImmutableLockWatchEvents.builder();
8285
}
86+
87+
static LockWatchEvents of(Iterable<LockWatchEvent> events) {
88+
return ImmutableLockWatchEvents.of(events);
89+
}
90+
91+
static LockWatchEvents empty() {
92+
return of(ImmutableList.of());
93+
}
8394
}

atlasdb-impl-shared/src/main/java/com/palantir/atlasdb/keyvalue/api/watch/VersionedEventStore.java

Lines changed: 17 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,12 @@
2525
import com.palantir.logsafe.UnsafeArg;
2626
import java.util.ArrayList;
2727
import java.util.Collection;
28-
import java.util.List;
2928
import java.util.Map.Entry;
3029
import java.util.NavigableMap;
3130
import java.util.Optional;
3231
import java.util.concurrent.ConcurrentSkipListMap;
32+
import java.util.stream.Collectors;
33+
import java.util.stream.Stream;
3334

3435
final class VersionedEventStore {
3536
private static final boolean INCLUSIVE = true;
@@ -60,41 +61,31 @@ Collection<LockWatchEvent> getEventsBetweenVersionsInclusive(Optional<Long> mayb
6061
}
6162

6263
LockWatchEvents retentionEvents(Optional<Sequence> earliestSequenceToKeep) {
63-
if (eventMap.size() < minEvents) {
64-
return LockWatchEvents.builder().build();
64+
int numToRetention = eventMap.size() - minEvents;
65+
if (numToRetention <= 0) {
66+
return LockWatchEvents.empty();
6567
}
6668

67-
// Guarantees that we remove some events while still also potentially performing further retention - note
68-
// that each call to retentionEventsInternal modifies eventMap.
69+
Stream<LockWatchEvent> events = retentionEvents(numToRetention, earliestSequenceToKeep.orElse(MAX_VERSION));
70+
71+
// Guarantees that we remove some events while still also potentially performing further retention.
72+
// Note that consuming elements from retentionEvents stream removes them from eventMap.
6973
if (eventMap.size() > maxEvents) {
70-
List<LockWatchEvent> overMaxSizeEvents = retentionEventsInternal(eventMap.size() - maxEvents, MAX_VERSION);
71-
List<LockWatchEvent> restOfEvents =
72-
retentionEventsInternal(eventMap.size() - minEvents, earliestSequenceToKeep.orElse(MAX_VERSION));
73-
return ImmutableLockWatchEvents.builder()
74-
.addAllEvents(overMaxSizeEvents)
75-
.addAllEvents(restOfEvents)
76-
.build();
77-
} else {
78-
return ImmutableLockWatchEvents.builder()
79-
.addAllEvents(retentionEventsInternal(
80-
eventMap.size() - minEvents, earliestSequenceToKeep.orElse(MAX_VERSION)))
81-
.build();
74+
Stream<LockWatchEvent> overMaxSizeEvents = retentionEvents(eventMap.size() - maxEvents, MAX_VERSION);
75+
return ImmutableLockWatchEvents.of(Stream.concat(overMaxSizeEvents, events)
76+
.collect(Collectors.toCollection(() -> new ArrayList<>(maxEvents))));
8277
}
78+
return ImmutableLockWatchEvents.of(events.collect(Collectors.toCollection(() -> new ArrayList<>(minEvents))));
8379
}
8480

85-
private List<LockWatchEvent> retentionEventsInternal(int numToRetention, Sequence maxVersion) {
86-
List<LockWatchEvent> events = new ArrayList<>(numToRetention);
87-
81+
private Stream<LockWatchEvent> retentionEvents(int numToRetention, Sequence maxVersion) {
8882
// The correctness of this depends upon eventMap's entrySet returning entries in ascending sorted order.
89-
eventMap.entrySet().stream()
90-
.takeWhile(entry -> entry.getKey().value() < maxVersion.value())
83+
return eventMap.headMap(maxVersion).entrySet().stream()
9184
.limit(numToRetention)
92-
.forEachOrdered(entry -> {
85+
.map(entry -> {
9386
eventMap.remove(entry.getKey());
94-
events.add(entry.getValue());
87+
return entry.getValue();
9588
});
96-
97-
return events;
9889
}
9990

10091
boolean containsEntryLessThanOrEqualTo(long version) {
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
type: improvement
2+
improvement:
3+
description: Reduce allocations in VersionedEventStore#retentionEvents
4+
links:
5+
- https://github.com/palantir/atlasdb/pull/7118

0 commit comments

Comments
 (0)