Skip to content
This repository was archived by the owner on Aug 2, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 25 additions & 16 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ apply plugin: 'org.jetbrains.kotlin.jvm'
apply plugin: 'org.jetbrains.kotlin.plugin.allopen'

def usingRemoteCluster = System.properties.containsKey('tests.rest.cluster') || System.properties.containsKey('tests.cluster')
def usingMultiNode = project.properties.containsKey('numNodes')
def usingMultiNode = System.getProperty("numNodes")
// Only apply jacoco test coverage if we are running a local single node cluster
if (!usingRemoteCluster && !usingMultiNode) {
apply from: 'build-tools/esplugin-coverage.gradle'
Expand Down Expand Up @@ -99,14 +99,26 @@ configurations.all {
}
}

ext {
projectSubstitutions = [:]
opendistroVersion = "${version}"
isSnapshot = "true" == System.getProperty("build.snapshot", "true")
licenseFile = rootProject.file('LICENSE')
noticeFile = rootProject.file('NOTICE')
}

group = "com.amazon.opendistroforelasticsearch"
version = "${opendistroVersion}.1"

dependencies {
compileOnly "org.elasticsearch:elasticsearch:${es_version}"
compileOnly "com.amazon.opendistroforelasticsearch:opendistro-job-scheduler-spi:1.12.0.0"
compileOnly "com.amazon.opendistroforelasticsearch:opendistro-job-scheduler-spi:${opendistroVersion}.0"
compile "org.jetbrains.kotlin:kotlin-stdlib:${kotlin_version}"
compile "org.jetbrains.kotlin:kotlin-stdlib-common:${kotlin_version}"
compile 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.7'
compile "org.jetbrains:annotations:13.0"
compile "com.amazon.opendistroforelasticsearch:notification:1.12.0.0"
compile "com.amazon.opendistroforelasticsearch:notification:${opendistroVersion}.0"
compile "com.amazon.opendistroforelasticsearch:common-utils:${opendistroVersion}.2"

testCompile "org.elasticsearch.test:framework:${es_version}"
testCompile "org.jetbrains.kotlin:kotlin-test:${kotlin_version}"
Expand All @@ -121,17 +133,6 @@ repositories {
mavenLocal()
}

ext {
projectSubstitutions = [:]
opendistroVersion = "${version}"
isSnapshot = "true" == System.getProperty("build.snapshot", "true")
licenseFile = rootProject.file('LICENSE')
noticeFile = rootProject.file('NOTICE')
}

group = "com.amazon.opendistroforelasticsearch"
version = "${opendistroVersion}.1"

if (isSnapshot) {
version += "-SNAPSHOT"
}
Expand Down Expand Up @@ -191,7 +192,7 @@ test {
}

File repo = file("$buildDir/testclusters/repo")
def _numNodes = findProperty('numNodes') as Integer ?: 1
def _numNodes = System.getProperty("numNodes") as Integer ?: 1
testClusters.integTest {
plugin(project.tasks.bundlePlugin.archiveFile)
testDistribution = "OSS"
Expand Down Expand Up @@ -228,7 +229,8 @@ integTest {
systemProperty 'tests.security.manager', 'false'
systemProperty 'java.io.tmpdir', es_tmp_dir.absolutePath
systemProperty 'buildDir', buildDir.path
systemProperty "https", System.getProperty("https", securityEnabled.toString())
systemProperty "https", System.getProperty("https")
systemProperty "security", System.getProperty("security")
systemProperty "user", System.getProperty("user", "admin")
systemProperty "password", System.getProperty("password", "admin")
// Tell the test JVM if the cluster JVM is running under a debugger so that tests can use longer timeouts for
Expand All @@ -248,6 +250,13 @@ integTest {
if (System.getProperty("test.debug") != null) {
jvmArgs '-agentlib:jdwp=transport=dt_socket,server=n,suspend=y,address=8000'
}

// TODO: Fix running notification test against remote cluster with security plugin installed
if (System.getProperty("https") != null) {
filter {
excludeTestsMatching "com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.action.NotificationActionIT"
}
}
}

run {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ class IndexManagementIndicesIT : IndexStateManagementRestTestCase() {
}

fun `test update management index mapping with new schema version`() {
wipeAllODFEIndices()
waitForPendingTasks(adminClient())
assertIndexDoesNotExist(INDEX_MANAGEMENT_INDEX)

val mapping = indexManagementMappings.trim().trimStart('{').trimEnd('}')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import org.elasticsearch.client.Response
import org.elasticsearch.client.RestClient
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.rest.RestStatus
import org.elasticsearch.test.rest.ESRestTestCase
import org.junit.AfterClass
import org.junit.Before
import org.junit.rules.DisableOnDebug
Expand All @@ -34,7 +33,7 @@ import javax.management.ObjectName
import javax.management.remote.JMXConnectorFactory
import javax.management.remote.JMXServiceURL

abstract class IndexManagementRestTestCase : ESRestTestCase() {
abstract class IndexManagementRestTestCase : ODFERestTestCase() {

// Having issues with tests leaking into other tests and mappings being incorrect and they are not caught by any pending task wait check as
// they do not go through the pending task queue. Ideally this should probably be written in a way to wait for the
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.indexmanagement

import com.amazon.opendistroforelasticsearch.commons.ConfigConstants.OPENDISTRO_SECURITY_SSL_HTTP_ENABLED
import com.amazon.opendistroforelasticsearch.commons.ConfigConstants.OPENDISTRO_SECURITY_SSL_HTTP_KEYSTORE_FILEPATH
import com.amazon.opendistroforelasticsearch.commons.ConfigConstants.OPENDISTRO_SECURITY_SSL_HTTP_KEYSTORE_KEYPASSWORD
import com.amazon.opendistroforelasticsearch.commons.ConfigConstants.OPENDISTRO_SECURITY_SSL_HTTP_KEYSTORE_PASSWORD
import com.amazon.opendistroforelasticsearch.commons.ConfigConstants.OPENDISTRO_SECURITY_SSL_HTTP_PEMCERT_FILEPATH
import com.amazon.opendistroforelasticsearch.commons.rest.SecureRestClientBuilder
import org.apache.http.HttpHost
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction
import org.elasticsearch.client.Request
import org.elasticsearch.client.RequestOptions
import org.elasticsearch.client.Response
import org.elasticsearch.client.RestClient
import org.elasticsearch.client.WarningsHandler
import org.elasticsearch.common.io.PathUtils
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.common.xcontent.DeprecationHandler
import org.elasticsearch.common.xcontent.NamedXContentRegistry
import org.elasticsearch.common.xcontent.XContentType
import org.elasticsearch.test.rest.ESRestTestCase
import org.junit.After
import java.io.IOException

abstract class ODFERestTestCase : ESRestTestCase() {

fun isHttps(): Boolean = System.getProperty("https", "false")!!.toBoolean()

fun securityEnabled(): Boolean = System.getProperty("security", "false")!!.toBoolean()

override fun getProtocol(): String = if (isHttps()) "https" else "http"

// override fun preserveIndicesUponCompletion(): Boolean = true

@Suppress("UNCHECKED_CAST")
@Throws(IOException::class)
private fun runningTasks(response: Response): MutableSet<String> {
val runningTasks: MutableSet<String> = HashSet()
val nodes = entityAsMap(response)["nodes"] as Map<String, Any>?
for ((_, value) in nodes!!) {
val nodeInfo = value as Map<String, Any>
val nodeTasks = nodeInfo["tasks"] as Map<String, Any>?
for ((_, value1) in nodeTasks!!) {
val task = value1 as Map<String, Any>
runningTasks.add(task["action"].toString())
}
}
return runningTasks
}

@After
fun waitForCleanup() {
waitFor {
waitForRunningTasks()
waitForThreadPools()
waitForPendingTasks(adminClient())
}
}

@Throws(IOException::class)
private fun waitForRunningTasks() {
val runningTasks: MutableSet<String> = runningTasks(adminClient().performRequest(Request("GET", "/_tasks")))
// Ignore the task list API - it doesn't count against us
runningTasks.remove(ListTasksAction.NAME)
runningTasks.remove(ListTasksAction.NAME + "[n]")
if (runningTasks.isEmpty()) {
return
}
val stillRunning = ArrayList<String>(runningTasks)
fail("There are still tasks running after this test that might break subsequent tests $stillRunning.")
}

private fun waitForThreadPools() {
waitFor {
val response = client().performRequest(Request("GET", "/_cat/thread_pool?format=json"))

val xContentType = XContentType.fromMediaTypeOrFormat(response.entity.contentType.value)
xContentType.xContent().createParser(
NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
response.entity.content
).use { parser ->
for (index in parser.list()) {
val jsonObject: Map<*, *> = index as java.util.HashMap<*, *>
val active = (jsonObject["active"] as String).toInt()
val queue = (jsonObject["queue"] as String).toInt()
val name = jsonObject["name"]
val trueActive = if (name == "management") active - 1 else active
if (trueActive > 0 || queue > 0) {
fail("Still active threadpools in cluster: $jsonObject")
}
}
}
}
}

@Throws(IOException::class)
open fun wipeAllODFEIndices() {
val response = client().performRequest(Request("GET", "/_cat/indices?format=json&expand_wildcards=all"))

val xContentType = XContentType.fromMediaTypeOrFormat(response.entity.contentType.value)
xContentType.xContent().createParser(
NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
response.entity.content
).use { parser ->
for (index in parser.list()) {
val jsonObject: Map<*, *> = index as java.util.HashMap<*, *>
val indexName: String = jsonObject["index"] as String
// .opendistro_security isn't allowed to delete from cluster
if (".opendistro_security" != indexName) {
val request = Request("DELETE", "/$indexName")
// TODO: remove PERMISSIVE option after moving system index access to REST API call
val options = RequestOptions.DEFAULT.toBuilder()
options.setWarningsHandler(WarningsHandler.PERMISSIVE)
request.options = options.build()
adminClient().performRequest(request)
}
}
}
}
/**
* Returns the REST client settings used for super-admin actions like cleaning up after the test has completed.
*/
override fun restAdminSettings(): Settings {
return Settings
.builder()
.put("http.port", 9200)
.put(OPENDISTRO_SECURITY_SSL_HTTP_ENABLED, isHttps())
.put(OPENDISTRO_SECURITY_SSL_HTTP_PEMCERT_FILEPATH, "sample.pem")
.put(OPENDISTRO_SECURITY_SSL_HTTP_KEYSTORE_FILEPATH, "test-kirk.jks")
.put(OPENDISTRO_SECURITY_SSL_HTTP_KEYSTORE_PASSWORD, "changeit")
.put(OPENDISTRO_SECURITY_SSL_HTTP_KEYSTORE_KEYPASSWORD, "changeit")
.build()
}

@Throws(IOException::class)
override fun buildClient(settings: Settings, hosts: Array<HttpHost>): RestClient {
if (isHttps()) {
val keystore = settings.get(OPENDISTRO_SECURITY_SSL_HTTP_KEYSTORE_FILEPATH)
return when (keystore != null) {
true -> {
// create adminDN (super-admin) client
val uri = javaClass.classLoader.getResource("security/sample.pem").toURI()
val configPath = PathUtils.get(uri).parent.toAbsolutePath()
SecureRestClientBuilder(settings, configPath).setSocketTimeout(60000).build()
}
false -> {
// create client with passed user
val userName = System.getProperty("user")
val password = System.getProperty("password")
SecureRestClientBuilder(hosts, isHttps(), userName, password).setSocketTimeout(60000).build()
}
}
} else {
val builder = RestClient.builder(*hosts)
configureClient(builder, settings)
builder.setStrictDeprecationMode(true)
return builder.build()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ fun RestClient.makeRequest(
}

fun <T> waitFor(
timeout: Instant = Instant.ofEpochSecond(10),
timeout: Instant = Instant.ofEpochSecond(20),
block: () -> T
): T {
val startTime = Instant.now().toEpochMilli()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import org.elasticsearch.action.get.GetResponse
import org.elasticsearch.action.search.SearchResponse
import org.elasticsearch.client.Request
import org.elasticsearch.client.Response
import org.elasticsearch.client.ResponseException
import org.elasticsearch.cluster.metadata.IndexMetadata
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.common.unit.TimeValue
Expand Down Expand Up @@ -301,6 +302,18 @@ abstract class IndexStateManagementRestTestCase : IndexManagementRestTestCase()
}

protected fun updateManagedIndexConfigStartTime(update: ManagedIndexConfig, desiredStartTimeMillis: Long? = null) {
// Before updating start time of a job always make sure there are no unassigned shards that could cause the config
// index to move to a new node and negate this forced start
if (isMultiNode) {
waitFor {
try {
client().makeRequest("GET", "_cluster/allocation/explain")
fail("Expected 400 Bad Request when there are no unassigned shards to explain")
} catch (e: ResponseException) {
assertEquals(RestStatus.BAD_REQUEST, e.response.restStatus())
}
}
}
val intervalSchedule = (update.jobSchedule as IntervalSchedule)
val millis = Duration.of(intervalSchedule.interval.toLong(), intervalSchedule.unit).minusSeconds(2).toMillis()
val startTimeMillis = desiredStartTimeMillis ?: Instant.now().toEpochMilli() - millis
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,15 @@ class NotificationActionIT : IndexStateManagementRestTestCase() {
chime = null,
slack = null,
customWebhook = CustomWebhook(
url = "http://$clusterUri/$notificationIndex/_doc",
url = "$protocol://$clusterUri/$notificationIndex/_doc",
scheme = null,
host = null,
port = -1,
path = null,
queryParams = emptyMap(),
headerParams = mapOf("Content-Type" to "application/json"),
username = null,
password = null
username = if (securityEnabled()) "admin" else null,
password = if (securityEnabled()) "admin" else null
)
)
val messageTemplate = Script(ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG, "{ \"testing\": 5 }", emptyMap())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@ import com.amazon.opendistroforelasticsearch.indexmanagement.rollup.model.dimens
import com.amazon.opendistroforelasticsearch.indexmanagement.util._ID
import com.amazon.opendistroforelasticsearch.indexmanagement.util._PRIMARY_TERM
import com.amazon.opendistroforelasticsearch.indexmanagement.util._SEQ_NO
import com.amazon.opendistroforelasticsearch.indexmanagement.waitFor
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.schedule.IntervalSchedule
import org.apache.http.HttpEntity
import org.apache.http.HttpHeaders
import org.apache.http.entity.ContentType.APPLICATION_JSON
import org.apache.http.entity.StringEntity
import org.apache.http.message.BasicHeader
import org.elasticsearch.client.Response
import org.elasticsearch.client.ResponseException
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler
import org.elasticsearch.common.xcontent.NamedXContentRegistry
Expand Down Expand Up @@ -185,6 +187,18 @@ abstract class RollupRestTestCase : IndexManagementRestTestCase() {
protected fun Rollup.toHttpEntity(): HttpEntity = StringEntity(toJsonString(), APPLICATION_JSON)

protected fun updateRollupStartTime(update: Rollup, desiredStartTimeMillis: Long? = null) {
// Before updating start time of a job always make sure there are no unassigned shards that could cause the config
// index to move to a new node and negate this forced start
if (isMultiNode) {
waitFor {
try {
client().makeRequest("GET", "_cluster/allocation/explain")
fail("Expected 400 Bad Request when there are no unassigned shards to explain")
} catch (e: ResponseException) {
assertEquals(RestStatus.BAD_REQUEST, e.response.restStatus())
}
}
}
val intervalSchedule = (update.jobSchedule as IntervalSchedule)
val millis = Duration.of(intervalSchedule.interval.toLong(), intervalSchedule.unit).minusSeconds(2).toMillis()
val startTimeMillis = desiredStartTimeMillis ?: Instant.now().toEpochMilli() - millis
Expand Down
Loading