|
13 | 13 | import org.elasticsearch.action.support.PlainActionFuture; |
14 | 14 | import org.elasticsearch.cluster.metadata.IndexMetadata; |
15 | 15 | import org.elasticsearch.common.settings.Settings; |
| 16 | +import org.elasticsearch.common.util.CollectionUtils; |
| 17 | +import org.elasticsearch.common.util.set.Sets; |
| 18 | +import org.elasticsearch.index.Index; |
| 19 | +import org.elasticsearch.index.IndexSettings; |
| 20 | +import org.elasticsearch.index.engine.EngineException; |
| 21 | +import org.elasticsearch.index.engine.EngineFactory; |
| 22 | +import org.elasticsearch.index.engine.InternalEngine; |
| 23 | +import org.elasticsearch.index.shard.ShardId; |
| 24 | +import org.elasticsearch.plugins.EnginePlugin; |
| 25 | +import org.elasticsearch.plugins.Plugin; |
16 | 26 | import org.elasticsearch.test.ESIntegTestCase; |
17 | 27 | import org.elasticsearch.xcontent.XContentBuilder; |
18 | 28 | import org.elasticsearch.xcontent.XContentFactory; |
| 29 | +import org.junit.Before; |
| 30 | + |
| 31 | +import java.util.Collection; |
| 32 | +import java.util.List; |
| 33 | +import java.util.Optional; |
| 34 | +import java.util.Set; |
| 35 | +import java.util.stream.Collectors; |
| 36 | +import java.util.stream.IntStream; |
19 | 37 |
|
20 | 38 | import static org.hamcrest.Matchers.equalTo; |
21 | 39 | import static org.hamcrest.Matchers.greaterThan; |
22 | 40 |
|
23 | 41 | public class IndexDiskUsageAnalyzerIT extends ESIntegTestCase { |
24 | 42 |
|
| 43 | + @Override |
| 44 | + protected boolean addMockInternalEngine() { |
| 45 | + return false; |
| 46 | + } |
| 47 | + |
| 48 | + @Override |
| 49 | + protected Collection<Class<? extends Plugin>> nodePlugins() { |
| 50 | + return CollectionUtils.appendToCopy(super.nodePlugins(), EngineTestPlugin.class); |
| 51 | + } |
| 52 | + |
| 53 | + private static final Set<ShardId> failOnFlushShards = Sets.newConcurrentHashSet(); |
| 54 | + |
| 55 | + public static class EngineTestPlugin extends Plugin implements EnginePlugin { |
| 56 | + @Override |
| 57 | + public Optional<EngineFactory> getEngineFactory(IndexSettings indexSettings) { |
| 58 | + return Optional.of(config -> new InternalEngine(config) { |
| 59 | + @Override |
| 60 | + public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineException { |
| 61 | + final ShardId shardId = config.getShardId(); |
| 62 | + if (failOnFlushShards.contains(shardId)) { |
| 63 | + throw new EngineException(shardId, "simulated IO"); |
| 64 | + } |
| 65 | + return super.flush(force, waitIfOngoing); |
| 66 | + } |
| 67 | + }); |
| 68 | + } |
| 69 | + } |
| 70 | + |
| 71 | + @Before |
| 72 | + public void resetFailOnFlush() throws Exception { |
| 73 | + failOnFlushShards.clear(); |
| 74 | + } |
| 75 | + |
25 | 76 | public void testSimple() throws Exception { |
26 | 77 | final XContentBuilder mapping = XContentFactory.jsonBuilder(); |
27 | 78 | mapping.startObject(); |
@@ -152,6 +203,43 @@ public void testGeoShape() throws Exception { |
152 | 203 | assertMetadataFields(stats); |
153 | 204 | } |
154 | 205 |
|
| 206 | + public void testFailOnFlush() throws Exception { |
| 207 | + final String indexName = "test-index"; |
| 208 | + int numberOfShards = between(1, 5); |
| 209 | + client().admin() |
| 210 | + .indices() |
| 211 | + .prepareCreate(indexName) |
| 212 | + .setSettings( |
| 213 | + Settings.builder() |
| 214 | + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numberOfShards) |
| 215 | + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, between(0, 1)) |
| 216 | + ) |
| 217 | + .get(); |
| 218 | + ensureYellow(indexName); |
| 219 | + int numDocs = randomIntBetween(1, 10); |
| 220 | + for (int i = 0; i < numDocs; i++) { |
| 221 | + int value = randomIntBetween(1, 10); |
| 222 | + final XContentBuilder doc = XContentFactory.jsonBuilder() |
| 223 | + .startObject() |
| 224 | + .field("english_text", English.intToEnglish(value)) |
| 225 | + .field("value", value) |
| 226 | + .endObject(); |
| 227 | + client().prepareIndex(indexName, "_doc").setId("id-" + i).setSource(doc).get(); |
| 228 | + } |
| 229 | + Index index = clusterService().state().metadata().index(indexName).getIndex(); |
| 230 | + List<ShardId> failedShards = randomSubsetOf( |
| 231 | + between(1, numberOfShards), |
| 232 | + IntStream.range(0, numberOfShards).mapToObj(n -> new ShardId(index, n)).collect(Collectors.toList()) |
| 233 | + ); |
| 234 | + failOnFlushShards.addAll(failedShards); |
| 235 | + AnalyzeIndexDiskUsageResponse resp = client().execute( |
| 236 | + AnalyzeIndexDiskUsageAction.INSTANCE, |
| 237 | + new AnalyzeIndexDiskUsageRequest(new String[] { indexName }, AnalyzeIndexDiskUsageRequest.DEFAULT_INDICES_OPTIONS, true) |
| 238 | + ).actionGet(); |
| 239 | + assertThat(resp.getTotalShards(), equalTo(numberOfShards)); |
| 240 | + assertThat(resp.getFailedShards(), equalTo(failedShards.size())); |
| 241 | + } |
| 242 | + |
155 | 243 | void assertMetadataFields(IndexDiskUsageStats stats) { |
156 | 244 | final IndexDiskUsageStats.PerFieldDiskUsage sourceField = stats.getFields().get("_source"); |
157 | 245 | assertThat(sourceField.getInvertedIndexBytes(), equalTo(0L)); |
|
0 commit comments