Skip to content
This repository was archived by the owner on Aug 2, 2022. It is now read-only.

Commit a452cf3

Browse files
authored
Adds configurable index history shard/replica settings (#401)
* Adds configurable index history shard/replica settings * Only wait for config index shards in multiNode integ tests
1 parent 6932e39 commit a452cf3

File tree

9 files changed

+129
-25
lines changed

9 files changed

+129
-25
lines changed

src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/IndexManagementIndices.kt

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@ package com.amazon.opendistroforelasticsearch.indexmanagement
1717

1818
import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX
1919
import com.amazon.opendistroforelasticsearch.indexmanagement.elasticapi.suspendUntil
20+
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings
21+
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.INDEX_HIDDEN
22+
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.INDEX_NUMBER_OF_REPLICAS
23+
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.INDEX_NUMBER_OF_SHARDS
2024
import com.amazon.opendistroforelasticsearch.indexmanagement.util.IndexUtils
2125
import com.amazon.opendistroforelasticsearch.indexmanagement.util.OpenForTesting
2226
import com.amazon.opendistroforelasticsearch.indexmanagement.util._DOC
@@ -37,17 +41,30 @@ import org.elasticsearch.common.xcontent.XContentType
3741

3842
@OpenForTesting
3943
class IndexManagementIndices(
44+
settings: Settings,
4045
private val client: IndicesAdminClient,
4146
private val clusterService: ClusterService
4247
) {
4348

4449
private val logger = LogManager.getLogger(javaClass)
4550

51+
@Volatile private var historyNumberOfShards = ManagedIndexSettings.HISTORY_NUMBER_OF_SHARDS.get(settings)
52+
@Volatile private var historyNumberOfReplicas = ManagedIndexSettings.HISTORY_NUMBER_OF_REPLICAS.get(settings)
53+
54+
init {
55+
clusterService.clusterSettings.addSettingsUpdateConsumer(ManagedIndexSettings.HISTORY_NUMBER_OF_SHARDS) {
56+
historyNumberOfShards = it
57+
}
58+
clusterService.clusterSettings.addSettingsUpdateConsumer(ManagedIndexSettings.HISTORY_NUMBER_OF_REPLICAS) {
59+
historyNumberOfReplicas = it
60+
}
61+
}
62+
4663
fun checkAndUpdateIMConfigIndex(actionListener: ActionListener<AcknowledgedResponse>) {
4764
if (!indexManagementIndexExists()) {
4865
val indexRequest = CreateIndexRequest(INDEX_MANAGEMENT_INDEX)
4966
.mapping(_DOC, indexManagementMappings, XContentType.JSON)
50-
.settings(Settings.builder().put("index.hidden", true).build())
67+
.settings(Settings.builder().put(INDEX_HIDDEN, true).build())
5168
client.create(indexRequest, object : ActionListener<CreateIndexResponse> {
5269
override fun onFailure(e: Exception) {
5370
actionListener.onFailure(e)
@@ -118,8 +135,12 @@ class IndexManagementIndices(
118135

119136
val request = CreateIndexRequest(index)
120137
.mapping(_DOC, indexStateManagementHistoryMappings, XContentType.JSON)
121-
.settings(Settings.builder().put("index.hidden", true)
122-
.put("index.number_of_shards", 1).put("index.number_of_replicas", 1).build())
138+
.settings(
139+
Settings.builder()
140+
.put(INDEX_HIDDEN, true)
141+
.put(INDEX_NUMBER_OF_SHARDS, historyNumberOfShards)
142+
.put(INDEX_NUMBER_OF_REPLICAS, historyNumberOfReplicas).build()
143+
)
123144
if (alias != null) request.alias(Alias(alias))
124145
return try {
125146
val createIndexResponse: CreateIndexResponse = client.suspendUntil { client.create(request, it) }

src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/IndexManagementPlugin.kt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,7 @@ internal class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, Act
240240
.registerConsumers()
241241
rollupInterceptor = RollupInterceptor(clusterService, settings, indexNameExpressionResolver)
242242
this.indexNameExpressionResolver = indexNameExpressionResolver
243-
indexManagementIndices = IndexManagementIndices(client.admin().indices(), clusterService)
243+
indexManagementIndices = IndexManagementIndices(settings, client.admin().indices(), clusterService)
244244
val indexStateManagementHistory =
245245
IndexStateManagementHistory(
246246
settings,
@@ -263,6 +263,8 @@ internal class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, Act
263263
ManagedIndexSettings.HISTORY_MAX_DOCS,
264264
ManagedIndexSettings.HISTORY_RETENTION_PERIOD,
265265
ManagedIndexSettings.HISTORY_ROLLOVER_CHECK_PERIOD,
266+
ManagedIndexSettings.HISTORY_NUMBER_OF_SHARDS,
267+
ManagedIndexSettings.HISTORY_NUMBER_OF_REPLICAS,
266268
ManagedIndexSettings.POLICY_ID,
267269
ManagedIndexSettings.ROLLOVER_ALIAS,
268270
ManagedIndexSettings.INDEX_STATE_MANAGEMENT_ENABLED,

src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/IndexStateManagementHistory.kt

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ import com.amazon.opendistroforelasticsearch.indexmanagement.elasticapi.suspendU
2121
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.ManagedIndexMetaData
2222
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings
2323
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.step.Step
24+
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.INDEX_HIDDEN
25+
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.INDEX_NUMBER_OF_REPLICAS
26+
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.INDEX_NUMBER_OF_SHARDS
2427
import com.amazon.opendistroforelasticsearch.indexmanagement.util._DOC
2528
import org.apache.logging.log4j.LogManager
2629
import org.elasticsearch.action.DocWriteRequest
@@ -55,14 +58,12 @@ class IndexStateManagementHistory(
5558
private var scheduledRollover: Scheduler.Cancellable? = null
5659

5760
@Volatile private var historyEnabled = ManagedIndexSettings.HISTORY_ENABLED.get(settings)
58-
5961
@Volatile private var historyMaxDocs = ManagedIndexSettings.HISTORY_MAX_DOCS.get(settings)
60-
6162
@Volatile private var historyMaxAge = ManagedIndexSettings.HISTORY_INDEX_MAX_AGE.get(settings)
62-
6363
@Volatile private var historyRolloverCheckPeriod = ManagedIndexSettings.HISTORY_ROLLOVER_CHECK_PERIOD.get(settings)
64-
6564
@Volatile private var historyRetentionPeriod = ManagedIndexSettings.HISTORY_RETENTION_PERIOD.get(settings)
65+
@Volatile private var historyNumberOfShards = ManagedIndexSettings.HISTORY_NUMBER_OF_SHARDS.get(settings)
66+
@Volatile private var historyNumberOfReplicas = ManagedIndexSettings.HISTORY_NUMBER_OF_REPLICAS.get(settings)
6667

6768
init {
6869
clusterService.addLocalNodeMasterListener(this)
@@ -78,6 +79,12 @@ class IndexStateManagementHistory(
7879
clusterService.clusterSettings.addSettingsUpdateConsumer(ManagedIndexSettings.HISTORY_RETENTION_PERIOD) {
7980
historyRetentionPeriod = it
8081
}
82+
clusterService.clusterSettings.addSettingsUpdateConsumer(ManagedIndexSettings.HISTORY_NUMBER_OF_SHARDS) {
83+
historyNumberOfShards = it
84+
}
85+
clusterService.clusterSettings.addSettingsUpdateConsumer(ManagedIndexSettings.HISTORY_NUMBER_OF_REPLICAS) {
86+
historyNumberOfReplicas = it
87+
}
8188
}
8289

8390
override fun onMaster() {
@@ -119,7 +126,12 @@ class IndexStateManagementHistory(
119126
val request = RolloverRequest(IndexManagementIndices.HISTORY_WRITE_INDEX_ALIAS, null)
120127
request.createIndexRequest.index(IndexManagementIndices.HISTORY_INDEX_PATTERN)
121128
.mapping(_DOC, IndexManagementIndices.indexStateManagementHistoryMappings, XContentType.JSON)
122-
.settings(Settings.builder().put("index.hidden", true).put("index.number_of_shards", 1).put("index.number_of_replicas", 1))
129+
.settings(
130+
Settings.builder()
131+
.put(INDEX_HIDDEN, true)
132+
.put(INDEX_NUMBER_OF_SHARDS, historyNumberOfShards)
133+
.put(INDEX_NUMBER_OF_REPLICAS, historyNumberOfReplicas)
134+
)
123135
request.addMaxIndexDocsCondition(historyMaxDocs)
124136
request.addMaxIndexAgeCondition(historyMaxAge)
125137
val response = client.admin().indices().rolloverIndex(request).actionGet()

src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/settings/ManagedIndexSettings.kt

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,20 @@ class ManagedIndexSettings {
115115
Setting.Property.Dynamic
116116
)
117117

118+
val HISTORY_NUMBER_OF_SHARDS: Setting<Int> = Setting.intSetting(
119+
"opendistro.index_state_management.history.number_of_shards",
120+
1,
121+
Setting.Property.NodeScope,
122+
Setting.Property.Dynamic
123+
)
124+
125+
val HISTORY_NUMBER_OF_REPLICAS: Setting<Int> = Setting.intSetting(
126+
"opendistro.index_state_management.history.number_of_replicas",
127+
1,
128+
Setting.Property.NodeScope,
129+
Setting.Property.Dynamic
130+
)
131+
118132
val ALLOW_LIST: Setting<List<String>> = Setting.listSetting(
119133
"opendistro.index_state_management.allow_list",
120134
ALLOW_LIST_ALL,

src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/util/RestHandlerUtils.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,10 @@ const val DEFAULT_POLICY_SORT_FIELD = "policy.policy_id.keyword"
4545
const val DEFAULT_SORT_ORDER = "asc"
4646
const val DEFAULT_QUERY_STRING = "*"
4747

48+
const val INDEX_HIDDEN = "index.hidden"
49+
const val INDEX_NUMBER_OF_SHARDS = "index.number_of_shards"
50+
const val INDEX_NUMBER_OF_REPLICAS = "index.number_of_replicas"
51+
4852
fun buildInvalidIndexResponse(builder: XContentBuilder, failedIndices: List<FailedIndex>) {
4953
if (failedIndices.isNotEmpty()) {
5054
builder.field(FAILURES, true)

src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/IndexManagementIndicesIT.kt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagemen
1212
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.resthandler.RestChangePolicyAction
1313
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.FAILED_INDICES
1414
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.FAILURES
15+
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.INDEX_HIDDEN
1516
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.UPDATED_INDICES
1617
import org.apache.http.entity.ContentType
1718
import org.apache.http.entity.StringEntity
@@ -64,7 +65,7 @@ class IndexManagementIndicesIT : IndexStateManagementRestTestCase() {
6465
val mapping = indexManagementMappings.trim().trimStart('{').trimEnd('}')
6566
.replace("\"schema_version\": $configSchemaVersion", "\"schema_version\": 0")
6667

67-
createIndex(INDEX_MANAGEMENT_INDEX, Settings.builder().put("index.hidden", true).build(), mapping)
68+
createIndex(INDEX_MANAGEMENT_INDEX, Settings.builder().put(INDEX_HIDDEN, true).build(), mapping)
6869
assertIndexExists(INDEX_MANAGEMENT_INDEX)
6970
verifyIndexSchemaVersion(INDEX_MANAGEMENT_INDEX, 0)
7071

@@ -83,7 +84,7 @@ class IndexManagementIndicesIT : IndexStateManagementRestTestCase() {
8384
.replace("\"schema_version\": $historySchemaVersion", "\"schema_version\": 0")
8485

8586
val aliases = "\"$HISTORY_WRITE_INDEX_ALIAS\": { \"is_write_index\": true }"
86-
createIndex("$HISTORY_INDEX_BASE-1", Settings.builder().put("index.hidden", true).build(), mapping, aliases)
87+
createIndex("$HISTORY_INDEX_BASE-1", Settings.builder().put(INDEX_HIDDEN, true).build(), mapping, aliases)
8788
assertIndexExists(HISTORY_WRITE_INDEX_ALIAS)
8889
verifyIndexSchemaVersion(HISTORY_WRITE_INDEX_ALIAS, 0)
8990

src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/IndexStateManagementRestTestCase.kt

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagemen
3737
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings
3838
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.FAILED_INDICES
3939
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.FAILURES
40+
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.INDEX_NUMBER_OF_REPLICAS
41+
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.INDEX_NUMBER_OF_SHARDS
4042
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.UPDATED_INDICES
4143
import com.amazon.opendistroforelasticsearch.indexmanagement.makeRequest
4244
import com.amazon.opendistroforelasticsearch.indexmanagement.rollup.model.Rollup
@@ -173,16 +175,8 @@ abstract class IndexStateManagementRestTestCase : IndexManagementRestTestCase()
173175
} else {
174176
it.put(ManagedIndexSettings.ROLLOVER_ALIAS.key, alias)
175177
}
176-
if (replicas == null) {
177-
it.put("index.number_of_replicas", "1")
178-
} else {
179-
it.put("index.number_of_replicas", replicas)
180-
}
181-
if (shards == null) {
182-
it.put("index.number_of_shards", "1")
183-
} else {
184-
it.put("index.number_of_shards", shards)
185-
}
178+
it.put(INDEX_NUMBER_OF_REPLICAS, replicas ?: "1")
179+
it.put(INDEX_NUMBER_OF_SHARDS, shards ?: "1")
186180
}.build()
187181
val aliases = if (alias == null) "" else "\"$alias\": { \"is_write_index\": true }"
188182
createIndex(index, settings, mapping, aliases)
@@ -236,8 +230,9 @@ abstract class IndexStateManagementRestTestCase : IndexManagementRestTestCase()
236230
}
237231
}
238232
""".trimIndent()
239-
client().makeRequest("PUT", "_cluster/settings", emptyMap(),
233+
val res = client().makeRequest("PUT", "_cluster/settings", emptyMap(),
240234
StringEntity(request, APPLICATION_JSON))
235+
assertEquals("Request failed", RestStatus.OK, res.restStatus())
241236
}
242237

243238
protected fun getManagedIndexConfig(index: String): ManagedIndexConfig? {
@@ -318,7 +313,11 @@ abstract class IndexStateManagementRestTestCase : IndexManagementRestTestCase()
318313
if (isMultiNode) {
319314
waitFor {
320315
try {
321-
adminClient().makeRequest("GET", "_cluster/allocation/explain")
316+
adminClient().makeRequest(
317+
"GET",
318+
"_cluster/allocation/explain",
319+
StringEntity("{ \"index\": \"$INDEX_MANAGEMENT_INDEX\" }", APPLICATION_JSON)
320+
)
322321
fail("Expected 400 Bad Request when there are no unassigned shards to explain")
323322
} catch (e: ResponseException) {
324323
assertEquals(RestStatus.BAD_REQUEST, e.response.restStatus())
@@ -425,7 +424,13 @@ abstract class IndexStateManagementRestTestCase : IndexManagementRestTestCase()
425424
@Suppress("UNCHECKED_CAST")
426425
protected fun getNumberOfReplicasSetting(indexName: String): Int {
427426
val indexSettings = getIndexSettings(indexName) as Map<String, Map<String, Map<String, Any?>>>
428-
return (indexSettings[indexName]!!["settings"]!!["index.number_of_replicas"] as String).toInt()
427+
return (indexSettings[indexName]!!["settings"]!![INDEX_NUMBER_OF_REPLICAS] as String).toInt()
428+
}
429+
430+
@Suppress("UNCHECKED_CAST")
431+
protected fun getNumberOfShardsSetting(indexName: String): Int {
432+
val indexSettings = getIndexSettings(indexName) as Map<String, Map<String, Map<String, Any?>>>
433+
return (indexSettings[indexName]!!["settings"]!![INDEX_NUMBER_OF_SHARDS] as String).toInt()
429434
}
430435

431436
@Suppress("UNCHECKED_CAST")

src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/action/IndexStateManagementHistoryIT.kt

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -399,9 +399,53 @@ class IndexStateManagementHistoryIT : IndexStateManagementRestTestCase() {
399399
waitFor { assertEquals("true", getIndexBlocksWriteSetting(indexName)) }
400400
}
401401

402+
fun `test history shard settings`() {
403+
val indexName = "${testIndexName}_shard_settings"
404+
val policyID = "${testIndexName}_shard_settings_1"
405+
val actionConfig = ReadOnlyActionConfig(0)
406+
val states = listOf(State("ReadOnlyState", listOf(actionConfig), listOf()))
407+
408+
val policy = Policy(
409+
id = policyID,
410+
description = "$testIndexName description",
411+
schemaVersion = 1L,
412+
lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS),
413+
errorNotification = randomErrorNotification(),
414+
defaultState = states[0].name,
415+
states = states
416+
)
417+
418+
createPolicy(policy, policyID)
419+
createIndex(indexName, policyID)
420+
resetHistorySetting()
421+
updateClusterSetting(ManagedIndexSettings.HISTORY_NUMBER_OF_SHARDS.key, "2")
422+
updateClusterSetting(ManagedIndexSettings.HISTORY_NUMBER_OF_REPLICAS.key, "3")
423+
424+
val managedIndexConfig = getExistingManagedIndexConfig(indexName)
425+
426+
// Change the start time so the job will trigger in 2 seconds.
427+
updateManagedIndexConfigStartTime(managedIndexConfig)
428+
429+
waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) }
430+
431+
// Need to wait two cycles.
432+
// Change the start time so the job will trigger in 2 seconds.
433+
updateManagedIndexConfigStartTime(managedIndexConfig)
434+
waitFor {
435+
assertIndexExists(IndexManagementIndices.HISTORY_WRITE_INDEX_ALIAS)
436+
val indexSettings = getIndexSettings(IndexManagementIndices.HISTORY_WRITE_INDEX_ALIAS)
437+
val historyIndexName = indexSettings.keys.filter { it.startsWith(IndexManagementIndices.HISTORY_INDEX_BASE) }.firstOrNull()
438+
assertNotNull("Could not find a concrete history index", historyIndexName)
439+
assertEquals("Wrong number of shards", 2, getNumberOfShardsSetting(historyIndexName!!))
440+
assertEquals("Wrong number of replicas", 3, getNumberOfReplicasSetting(historyIndexName))
441+
}
442+
}
443+
402444
private fun resetHistorySetting() {
403445
updateClusterSetting(ManagedIndexSettings.HISTORY_ENABLED.key, "true")
404446
updateClusterSetting(ManagedIndexSettings.HISTORY_RETENTION_PERIOD.key, "60s")
405447
updateClusterSetting(ManagedIndexSettings.HISTORY_ROLLOVER_CHECK_PERIOD.key, "60s")
448+
updateClusterSetting(ManagedIndexSettings.HISTORY_NUMBER_OF_SHARDS.key, "1")
449+
updateClusterSetting(ManagedIndexSettings.HISTORY_NUMBER_OF_REPLICAS.key, "1")
406450
}
407451
}

src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/resthandler/ISMTemplateRestAPIIT.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagemen
2323
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.randomErrorNotification
2424
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.randomPolicy
2525
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings
26+
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.INDEX_HIDDEN
2627
import com.amazon.opendistroforelasticsearch.indexmanagement.randomInstant
2728
import com.amazon.opendistroforelasticsearch.indexmanagement.waitFor
2829
import org.elasticsearch.client.ResponseException
@@ -98,7 +99,7 @@ class ISMTemplateRestAPIIT : IndexStateManagementRestTestCase() {
9899
createPolicy(policy, policyID)
99100

100101
createIndex(indexName2, null)
101-
createIndex(indexName3, Settings.builder().put("index.hidden", true).build())
102+
createIndex(indexName3, Settings.builder().put(INDEX_HIDDEN, true).build())
102103

103104
waitFor { assertNotNull(getManagedIndexConfig(indexName2)) }
104105

0 commit comments

Comments
 (0)