Skip to content
This repository was archived by the owner on Aug 2, 2022. It is now read-only.

Commit e29e3de

Browse files
rollup security (#394)
* rollup security * change getRollups action name to search * change runBlocking to withContext * ISM security comments * self review * show roles from explain API but not get API in rollup * ktlint * address comments * address comments * modify explain and get output * address comments
1 parent a452cf3 commit e29e3de

33 files changed

+536
-318
lines changed

src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/elasticapi/ElasticExtensions.kt

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,20 +13,21 @@
1313
* permissions and limitations under the License.
1414
*/
1515

16-
@file:Suppress("TooManyFunctions")
16+
@file:Suppress("TooManyFunctions", "MatchingDeclarationName")
1717

1818
package com.amazon.opendistroforelasticsearch.indexmanagement.elasticapi
1919

20-
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.ISMTemplate
21-
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.Policy
2220
import com.amazon.opendistroforelasticsearch.commons.InjectSecurity
2321
import com.amazon.opendistroforelasticsearch.commons.authuser.User
22+
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.ISMTemplate
23+
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.Policy
2424
import com.amazon.opendistroforelasticsearch.indexmanagement.util.NO_ID
2525
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.utils.LockService
2626
import kotlinx.coroutines.CoroutineScope
2727
import kotlinx.coroutines.ThreadContextElement
2828
import kotlinx.coroutines.delay
2929
import kotlinx.coroutines.withContext
30+
import org.apache.logging.log4j.LogManager
3031
import org.apache.logging.log4j.Logger
3132
import org.elasticsearch.ElasticsearchException
3233
import org.elasticsearch.ExceptionsHelper
@@ -38,6 +39,8 @@ import org.elasticsearch.common.bytes.BytesReference
3839
import org.elasticsearch.common.settings.Settings
3940
import org.elasticsearch.common.unit.TimeValue
4041
import org.elasticsearch.common.util.concurrent.ThreadContext
42+
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler
43+
import org.elasticsearch.common.xcontent.NamedXContentRegistry
4144
import org.elasticsearch.common.xcontent.ToXContent
4245
import org.elasticsearch.common.xcontent.XContentBuilder
4346
import org.elasticsearch.common.xcontent.XContentHelper
@@ -56,6 +59,12 @@ import kotlin.coroutines.resume
5659
import kotlin.coroutines.resumeWithException
5760
import kotlin.coroutines.suspendCoroutine
5861

62+
fun contentParser(bytesReference: BytesReference): XContentParser {
63+
return XContentHelper.createParser(
64+
NamedXContentRegistry.EMPTY,
65+
LoggingDeprecationHandler.INSTANCE, bytesReference, XContentType.JSON)
66+
}
67+
5968
/** Convert an object to maps and lists representation */
6069
fun ToXContent.convertToMap(): Map<String, Any> {
6170
val bytesReference = XContentHelper.toXContent(this, XContentType.JSON, false)
@@ -195,11 +204,19 @@ fun <T> XContentParser.parseWithType(
195204
return parsed
196205
}
197206

207+
val log = LogManager.getLogger("IndexManagementElasticExtention")
208+
const val INDEX_MANAGEMENT_PLUGIN_INTERNAL = "index_management_plugin_internal"
209+
/**
210+
* @param internalReq: used as flag to indicate if the request is from
211+
* outside user or plugin runner. if the value of this element is true
212+
* then we will not update user object.
213+
*/
198214
class InjectorContextElement(
199-
id: String,
215+
private val id: String,
200216
settings: Settings,
201-
threadContext: ThreadContext,
202-
private val roles: List<String>?
217+
private val threadContext: ThreadContext,
218+
private val roles: List<String>?,
219+
private val internalReq: Boolean = false
203220
) : ThreadContextElement<Unit> {
204221

205222
companion object Key : CoroutineContext.Key<InjectorContextElement>
@@ -210,10 +227,21 @@ class InjectorContextElement(
210227

211228
override fun updateThreadContext(context: CoroutineContext) {
212229
rolesInjectorHelper.injectRoles(roles)
230+
if (threadContext.getTransient<Boolean>(INDEX_MANAGEMENT_PLUGIN_INTERNAL) != internalReq) {
231+
threadContext.putTransient(INDEX_MANAGEMENT_PLUGIN_INTERNAL, internalReq)
232+
log.debug("Job [$id], rollup internal request: $internalReq;" +
233+
" Thread: ${Thread.currentThread().name}")
234+
} else {
235+
log.error("Job [$id], rollup internal request [$internalReq] not cleaned up;" +
236+
" Thread: ${Thread.currentThread().name}")
237+
}
213238
}
214239

215240
override fun restoreThreadContext(context: CoroutineContext, oldState: Unit) {
216241
rolesInjectorHelper.close()
242+
log.debug("Job [$id], rollup internal request cleaned: " +
243+
"${threadContext.getTransient<Boolean>(INDEX_MANAGEMENT_PLUGIN_INTERNAL)};" +
244+
" Thread: ${Thread.currentThread().name}")
217245
}
218246
}
219247

src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,13 @@ package com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanageme
1818
import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementIndices
1919
import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementPlugin
2020
import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX
21+
import com.amazon.opendistroforelasticsearch.indexmanagement.elasticapi.contentParser
2122
import com.amazon.opendistroforelasticsearch.indexmanagement.elasticapi.parseWithType
2223
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.getManagedIndexMetaData
2324
import com.amazon.opendistroforelasticsearch.indexmanagement.elasticapi.retry
2425
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.filterNotNullValues
25-
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.getManagedIndexMetaData
2626
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.getPolicyToTemplateMap
2727
import com.amazon.opendistroforelasticsearch.indexmanagement.elasticapi.suspendUntil
28-
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.contentParser
2928
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.ISMTemplate
3029
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.ManagedIndexConfig
3130
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.ManagedIndexMetaData

src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -209,17 +209,12 @@ object ManagedIndexRunner : ScheduledJobRunner,
209209
/*
210210
* We need to handle 3 cases:
211211
* 1. ISM jobs that are created by older versions and never updated wont have User details in the
212-
* job object. `managedIndexConfig.user` will be null. Insert `all_access, AmazonES_all_access` role.
212+
* job object. `managedIndexConfig.user` will be null. Insert `all_access` role.
213213
* 2. ISM jobs that are created when security plugin is disabled will have empty User object.
214214
* (`managedIndexConfig.user.name`, `managedIndexConfig.user.roles` are empty )
215215
* 3. ISM jobs that are created when security plugin is enabled will have an User object.
216216
*/
217-
val roles = if (managedIndexConfig.user == null) {
218-
// fixme: discuss and remove hardcoded to settings?
219-
settings.getAsList("", listOf("all_access", "AmazonES_all_access"))
220-
} else {
221-
managedIndexConfig.user.roles
222-
}
217+
val roles = managedIndexConfig.getRoles()
223218
logger.debug("Running ISM job: ${managedIndexConfig.name} with roles: $roles Thread: ${Thread.currentThread().name}")
224219

225220
// Get current IndexMetaData and ManagedIndexMetaData

src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/elasticapi/ElasticExtensions.kt

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,9 @@ import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagemen
2525
import org.elasticsearch.cluster.ClusterState
2626
import org.elasticsearch.action.search.SearchResponse
2727
import org.elasticsearch.cluster.metadata.IndexMetadata
28-
import org.elasticsearch.common.bytes.BytesReference
2928
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler
3029
import org.elasticsearch.common.xcontent.NamedXContentRegistry
3130
import org.elasticsearch.common.xcontent.XContentFactory
32-
import org.elasticsearch.common.xcontent.XContentHelper
33-
import org.elasticsearch.common.xcontent.XContentParser
3431
import org.elasticsearch.common.xcontent.XContentType
3532

3633
/**
@@ -87,8 +84,3 @@ fun getPolicyToTemplateMap(response: SearchResponse, xContentRegistry: NamedXCon
8784
@Suppress("UNCHECKED_CAST")
8885
fun <K, V> Map<K, V?>.filterNotNullValues(): Map<K, V> =
8986
filterValues { it != null } as Map<K, V>
90-
91-
fun contentParser(bytesReference: BytesReference): XContentParser {
92-
return XContentHelper.createParser(NamedXContentRegistry.EMPTY,
93-
LoggingDeprecationHandler.INSTANCE, bytesReference, XContentType.JSON)
94-
}

src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/model/ManagedIndexConfig.kt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import com.amazon.opendistroforelasticsearch.indexmanagement.elasticapi.instant
2020
import com.amazon.opendistroforelasticsearch.indexmanagement.elasticapi.optionalTimeField
2121
import com.amazon.opendistroforelasticsearch.indexmanagement.elasticapi.optionalUserField
2222
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.XCONTENT_WITHOUT_TYPE
23+
import com.amazon.opendistroforelasticsearch.indexmanagement.util.ALL_ACCESS_ROLE
2324
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.ScheduledJobParameter
2425
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.schedule.Schedule
2526
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.schedule.ScheduleParser
@@ -93,6 +94,12 @@ data class ManagedIndexConfig(
9394
return builder
9495
}
9596

97+
fun getRoles(): List<String> {
98+
return if (user == null) {
99+
ALL_ACCESS_ROLE
100+
} else user.roles
101+
}
102+
96103
companion object {
97104
const val MANAGED_INDEX_TYPE = "managed_index"
98105
const val NO_ID = ""

src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/util/RestHandlerUtils.kt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,12 @@ import java.time.Instant
3131

3232
const val WITH_TYPE = "with_type"
3333
val XCONTENT_WITHOUT_TYPE = ToXContent.MapParams(mapOf(WITH_TYPE to "false"))
34+
const val HAS_USER = "has_user"
35+
val XCONTENT_HAS_USER = ToXContent.MapParams(mapOf(HAS_USER to "true"))
36+
val XCONTENT_WITHOUT_TYPE_HAS_USER = ToXContent.MapParams(
37+
mapOf(WITH_TYPE to "false",
38+
HAS_USER to "true")
39+
)
3440

3541
const val FAILURES = "failures"
3642
const val FAILED_INDICES = "failed_indices"

src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/rollup/RollupRunner.kt

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,10 @@
1515

1616
package com.amazon.opendistroforelasticsearch.indexmanagement.rollup
1717

18+
import com.amazon.opendistroforelasticsearch.indexmanagement.elasticapi.InjectorContextElement
1819
import com.amazon.opendistroforelasticsearch.indexmanagement.elasticapi.retry
1920
import com.amazon.opendistroforelasticsearch.indexmanagement.elasticapi.suspendUntil
21+
import com.amazon.opendistroforelasticsearch.indexmanagement.elasticapi.withCloseableContext
2022
import com.amazon.opendistroforelasticsearch.indexmanagement.rollup.action.get.GetRollupAction
2123
import com.amazon.opendistroforelasticsearch.indexmanagement.rollup.action.get.GetRollupRequest
2224
import com.amazon.opendistroforelasticsearch.indexmanagement.rollup.action.get.GetRollupResponse
@@ -277,19 +279,32 @@ object RollupRunner : ScheduledJobRunner,
277279
while (rollupSearchService.shouldProcessRollup(updatableJob, metadata)) {
278280
do {
279281
try {
280-
val rollupResult = when (val rollupSearchResult = rollupSearchService.executeCompositeSearch(updatableJob, metadata)) {
281-
is RollupSearchResult.Success -> {
282-
val compositeRes: InternalComposite = rollupSearchResult.searchResponse.aggregations.get(updatableJob.id)
283-
metadata = metadata.incrementStats(rollupSearchResult.searchResponse, compositeRes)
284-
when (val rollupIndexResult = rollupIndexer.indexRollups(updatableJob, compositeRes)) {
285-
is RollupIndexResult.Success -> RollupResult.Success(compositeRes, rollupIndexResult.stats)
286-
is RollupIndexResult.Failure -> RollupResult.Failure(rollupIndexResult.message, rollupIndexResult.cause)
282+
val roles = job.getRoles()
283+
val rollupResult = withCloseableContext(InjectorContextElement(job.id, settings, threadPool.threadContext, roles)) {
284+
when (val rollupSearchResult =
285+
rollupSearchService.executeCompositeSearch(updatableJob, metadata)) {
286+
is RollupSearchResult.Success -> {
287+
val compositeRes: InternalComposite =
288+
rollupSearchResult.searchResponse.aggregations.get(updatableJob.id)
289+
metadata = metadata.incrementStats(rollupSearchResult.searchResponse, compositeRes)
290+
return@withCloseableContext when (val rollupIndexResult =
291+
rollupIndexer.indexRollups(updatableJob, compositeRes)) {
292+
is RollupIndexResult.Success -> RollupResult.Success(
293+
compositeRes,
294+
rollupIndexResult.stats
295+
)
296+
is RollupIndexResult.Failure -> RollupResult.Failure(
297+
rollupIndexResult.message,
298+
rollupIndexResult.cause
299+
)
300+
}
301+
}
302+
is RollupSearchResult.Failure -> {
303+
return@withCloseableContext RollupResult.Failure(rollupSearchResult.message, rollupSearchResult.cause)
287304
}
288-
}
289-
is RollupSearchResult.Failure -> {
290-
RollupResult.Failure(rollupSearchResult.message, rollupSearchResult.cause)
291305
}
292306
}
307+
293308
when (rollupResult) {
294309
is RollupResult.Success -> {
295310
metadata = rollupMetadataService.updateMetadata(updatableJob,
@@ -362,7 +377,12 @@ object RollupRunner : ScheduledJobRunner,
362377
private suspend fun updateRollupJob(job: Rollup, metadata: RollupMetadata): RollupJobResult {
363378
try {
364379
val req = IndexRollupRequest(rollup = job, refreshPolicy = WriteRequest.RefreshPolicy.IMMEDIATE)
365-
val res: IndexRollupResponse = client.suspendUntil { execute(IndexRollupAction.INSTANCE, req, it) }
380+
val roles = job.getRoles()
381+
val res = withCloseableContext(InjectorContextElement(job.id, settings, threadPool.threadContext, roles, true)) {
382+
return@withCloseableContext client.suspendUntil<Client, IndexRollupResponse> {
383+
execute(IndexRollupAction.INSTANCE, req, it)
384+
}
385+
}
366386
// TODO: Verify the seqNo/primterm got updated
367387
return RollupJobResult.Success(res.rollup)
368388
} catch (e: Exception) {

src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/rollup/action/delete/TransportDeleteRollupAction.kt

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@
1616
package com.amazon.opendistroforelasticsearch.indexmanagement.rollup.action.delete
1717

1818
import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX
19+
import com.amazon.opendistroforelasticsearch.indexmanagement.util.use
1920
import org.elasticsearch.action.ActionListener
21+
import org.elasticsearch.action.delete.DeleteRequest
2022
import org.elasticsearch.action.delete.DeleteResponse
2123
import org.elasticsearch.action.support.ActionFilters
2224
import org.elasticsearch.action.support.HandledTransportAction
@@ -34,15 +36,11 @@ class TransportDeleteRollupAction @Inject constructor(
3436
) {
3537

3638
override fun doExecute(task: Task, request: DeleteRollupRequest, actionListener: ActionListener<DeleteResponse>) {
37-
request.index(INDEX_MANAGEMENT_INDEX)
38-
client.delete(request, object : ActionListener<DeleteResponse> {
39-
override fun onResponse(response: DeleteResponse) {
40-
actionListener.onResponse(response)
41-
}
39+
val deleteRequest = DeleteRequest(INDEX_MANAGEMENT_INDEX, request.id())
40+
.setRefreshPolicy(request.refreshPolicy)
4241

43-
override fun onFailure(t: Exception) {
44-
actionListener.onFailure(t)
45-
}
46-
})
42+
client.threadPool().threadContext.stashContext().use {
43+
client.delete(deleteRequest, actionListener)
44+
}
4745
}
4846
}

src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/rollup/action/explain/ExplainRollupResponse.kt

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
package com.amazon.opendistroforelasticsearch.indexmanagement.rollup.action.explain
1717

18+
import com.amazon.opendistroforelasticsearch.commons.authuser.User.ROLES_FIELD
1819
import com.amazon.opendistroforelasticsearch.indexmanagement.rollup.model.ExplainRollup
1920
import org.elasticsearch.action.ActionResponse
2021
import org.elasticsearch.common.io.stream.StreamInput
@@ -26,9 +27,14 @@ import java.io.IOException
2627

2728
class ExplainRollupResponse : ActionResponse, ToXContentObject {
2829
val idsToExplain: Map<String, ExplainRollup?>
30+
val rolesMap: Map<String, List<String>?>
2931

30-
constructor(idsToExplain: Map<String, ExplainRollup?>) : super() {
32+
constructor(
33+
idsToExplain: Map<String, ExplainRollup?>,
34+
rolesMap: Map<String, List<String>?>
35+
) : super() {
3136
this.idsToExplain = idsToExplain
37+
this.rolesMap = rolesMap
3238
}
3339

3440
internal fun getIdsToExplain(): Map<String, ExplainRollup?> {
@@ -44,7 +50,8 @@ class ExplainRollupResponse : ActionResponse, ToXContentObject {
4450
idsToExplain[it.readString()] = if (sin.readBoolean()) ExplainRollup(it) else null
4551
}
4652
idsToExplain.toMap()
47-
}
53+
},
54+
rolesMap = sin.readMap() as Map<String, List<String>?>
4855
)
4956

5057
@Throws(IOException::class)
@@ -55,13 +62,22 @@ class ExplainRollupResponse : ActionResponse, ToXContentObject {
5562
out.writeBoolean(metadata != null)
5663
metadata?.writeTo(out)
5764
}
65+
out.writeMap(rolesMap)
5866
}
5967

6068
@Throws(IOException::class)
6169
override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
6270
builder.startObject()
6371
idsToExplain.entries.forEach { (id, explain) ->
64-
builder.field(id, explain)
72+
if (explain != null) {
73+
builder.startObject(id)
74+
explain.toXContent(builder, ToXContent.EMPTY_PARAMS)
75+
val roles = rolesMap[id]
76+
if (roles != null && roles.isNotEmpty()) {
77+
builder.field(ROLES_FIELD, roles)
78+
}
79+
builder.endObject()
80+
} else builder.nullField(id)
6581
}
6682
return builder.endObject()
6783
}

0 commit comments

Comments
 (0)