diff --git a/spring-cloud-dataflow-aggregate-task/src/main/java/org/springframework/cloud/dataflow/aggregate/task/impl/DefaultAggregateTaskExplorer.java b/spring-cloud-dataflow-aggregate-task/src/main/java/org/springframework/cloud/dataflow/aggregate/task/impl/DefaultAggregateTaskExplorer.java index 7a7046f72d..a933e1c498 100644 --- a/spring-cloud-dataflow-aggregate-task/src/main/java/org/springframework/cloud/dataflow/aggregate/task/impl/DefaultAggregateTaskExplorer.java +++ b/spring-cloud-dataflow-aggregate-task/src/main/java/org/springframework/cloud/dataflow/aggregate/task/impl/DefaultAggregateTaskExplorer.java @@ -22,6 +22,7 @@ import java.util.Collections; import java.util.Date; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -254,21 +255,36 @@ public Set getJobExecutionIdsByTaskExecutionId(long taskExecutionId, Strin Assert.notNull(taskExplorer, "Expected TaskExplorer for " + schemaTarget); return taskExplorer.getJobExecutionIdsByTaskExecutionId(taskExecutionId); } - + private static void add(Map> setMap, String key, String value) { + Set set = setMap.computeIfAbsent(key, (v) -> new HashSet<>()); + set.add(value); + } @Override public List getLatestTaskExecutionsByTaskNames(String... taskNames) { List result = new ArrayList<>(); + Map> targetToTaskNames = new HashMap<>(); + Map taskNamePlatform = new HashMap<>(); for (String taskName : taskNames) { SchemaVersionTarget target = aggregateExecutionSupport.findSchemaVersionTarget(taskName, taskDefinitionReader); String platformName = getPlatformName(taskName); Assert.notNull(target, "Expected to find SchemaVersionTarget for " + taskName); - TaskExplorer taskExplorer = taskExplorers.get(target.getName()); - Assert.notNull(taskExplorer, "Expected TaskExplorer for " + target.getName()); - List taskExecutions = taskExplorer.getLatestTaskExecutionsByTaskNames(taskNames) + add(targetToTaskNames, target.getName(), taskName); + if(platformName != null) { + taskNamePlatform.put(taskName, platformName); + } + } + for(String target : targetToTaskNames.keySet()) { + Set tasks = targetToTaskNames.get(target); + if(!tasks.isEmpty()) { + TaskExplorer taskExplorer = taskExplorers.get(target); + Assert.notNull(taskExplorer, "Expected TaskExplorer for " + target); + List taskExecutions = taskExplorer + .getLatestTaskExecutionsByTaskNames(tasks.toArray(new String[0])) .stream() - .map(execution -> aggregateExecutionSupport.from(execution, target.getName(), platformName)) + .map(execution -> aggregateExecutionSupport.from(execution, target, taskNamePlatform.get(execution.getTaskName()))) .collect(Collectors.toList()); - result.addAll(taskExecutions); + result.addAll(taskExecutions); + } } return result; } diff --git a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/configuration/JobDependencies.java b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/configuration/JobDependencies.java index 5022cff975..1ea7a4f909 100644 --- a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/configuration/JobDependencies.java +++ b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/configuration/JobDependencies.java @@ -69,11 +69,14 @@ import org.springframework.cloud.dataflow.server.controller.JobStepExecutionProgressController; import org.springframework.cloud.dataflow.server.controller.RestControllerAdvice; import org.springframework.cloud.dataflow.server.controller.SchemaController; +import org.springframework.cloud.dataflow.server.controller.TaskDefinitionController; import org.springframework.cloud.dataflow.server.controller.TaskExecutionController; import org.springframework.cloud.dataflow.server.controller.TaskExecutionThinController; import org.springframework.cloud.dataflow.server.controller.TaskLogsController; import org.springframework.cloud.dataflow.server.controller.TaskPlatformController; import org.springframework.cloud.dataflow.server.controller.TasksInfoController; +import org.springframework.cloud.dataflow.server.controller.assembler.DefaultTaskDefinitionAssemblerProvider; +import org.springframework.cloud.dataflow.server.controller.assembler.TaskDefinitionAssemblerProvider; import org.springframework.cloud.dataflow.server.job.LauncherRepository; import org.springframework.cloud.dataflow.server.repository.DataflowJobExecutionDaoContainer; import org.springframework.cloud.dataflow.server.repository.DataflowTaskExecutionDaoContainer; @@ -409,7 +412,28 @@ public TaskExecutionInfoService taskDefinitionRetriever( composedTaskRunnerConfigurationProperties ); } - + @Bean + public TaskDefinitionAssemblerProvider taskDefinitionAssemblerProvider( + TaskExecutionService taskExecutionService, + TaskJobService taskJobService, + AggregateTaskExplorer taskExplorer, + AggregateExecutionSupport aggregateExecutionSupport + ) { + return new DefaultTaskDefinitionAssemblerProvider(taskExecutionService, taskJobService, taskExplorer, aggregateExecutionSupport); + } + @Bean + public TaskDefinitionController taskDefinitionController( + AggregateTaskExplorer explorer, TaskDefinitionRepository repository, + TaskSaveService taskSaveService, TaskDeleteService taskDeleteService, + TaskDefinitionAssemblerProvider taskDefinitionAssemblerProvider + ) { + return new TaskDefinitionController(explorer, + repository, + taskSaveService, + taskDeleteService, + taskDefinitionAssemblerProvider + ); + } @Bean @Primary public PlatformTransactionManager transactionManager(EntityManagerFactory entityManagerFactory) { diff --git a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/controller/TaskExecutionControllerTests.java b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/controller/TaskExecutionControllerTests.java index b5d165d632..a4bcbf1302 100644 --- a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/controller/TaskExecutionControllerTests.java +++ b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/controller/TaskExecutionControllerTests.java @@ -57,9 +57,11 @@ import org.springframework.cloud.dataflow.schema.AppBootSchemaVersion; import org.springframework.cloud.dataflow.schema.SchemaVersionTarget; import org.springframework.cloud.dataflow.schema.service.SchemaService; +import org.springframework.cloud.dataflow.server.config.DataFlowControllerAutoConfiguration; import org.springframework.cloud.dataflow.server.config.DataflowAsyncAutoConfiguration; import org.springframework.cloud.dataflow.server.config.apps.CommonApplicationProperties; import org.springframework.cloud.dataflow.server.configuration.JobDependencies; +import org.springframework.cloud.dataflow.server.configuration.TestDependencies; import org.springframework.cloud.dataflow.server.job.LauncherRepository; import org.springframework.cloud.dataflow.server.repository.JobRepositoryContainer; import org.springframework.cloud.dataflow.server.repository.TaskBatchDaoContainer; @@ -109,9 +111,13 @@ * @author Chris Bono * @author Corneil du Plessis */ -@SpringBootTest( - classes = { JobDependencies.class, TaskExecutionAutoConfiguration.class, DataflowAsyncAutoConfiguration.class, - PropertyPlaceholderAutoConfiguration.class, BatchProperties.class}) +@SpringBootTest(classes = { + JobDependencies.class, + TaskExecutionAutoConfiguration.class, + DataflowAsyncAutoConfiguration.class, + PropertyPlaceholderAutoConfiguration.class, + BatchProperties.class +}) @EnableConfigurationProperties({CommonApplicationProperties.class}) @DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_EACH_TEST_METHOD) @AutoConfigureTestDatabase(replace = Replace.ANY) @@ -509,6 +515,13 @@ void getExecutionsByName() throws Exception { .andExpect(jsonPath("$._embedded.taskExecutionResourceList", hasSize(2))); } + @Test + void getDefinitionsWithLastExecution() throws Exception { + mockMvc.perform(get("/tasks/definitions").accept(MediaType.APPLICATION_JSON)) + .andExpect(status().isOk()) + .andDo(print()) + .andExpect(jsonPath("$._embedded.taskDefinitionResourceList", hasSize(1))); + } @Test void getExecutionsByNameNotFound() throws Exception { mockMvc.perform(get("/tasks/executions/").param("name", "BAZ").accept(MediaType.APPLICATION_JSON)) diff --git a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/service/impl/DefaultTaskExecutionServiceTests.java b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/service/impl/DefaultTaskExecutionServiceTests.java index abc17093d1..deb08991b2 100644 --- a/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/service/impl/DefaultTaskExecutionServiceTests.java +++ b/spring-cloud-dataflow-server-core/src/test/java/org/springframework/cloud/dataflow/server/service/impl/DefaultTaskExecutionServiceTests.java @@ -208,8 +208,7 @@ public abstract class DefaultTaskExecutionServiceTests { ApplicationContext applicationContext; @AutoConfigureTestDatabase(replace = Replace.ANY) - @Nested - public class SimpleDefaultPlatformTests extends DefaultTaskExecutionServiceTests { + public static class SimpleDefaultPlatformTests extends DefaultTaskExecutionServiceTests { @Autowired DataSource dataSource; @@ -295,9 +294,8 @@ public void setupTest(DataSource dataSource) { } @AutoConfigureTestDatabase(replace = Replace.ANY) - @Nested @TestPropertySource(properties = {"spring.cloud.dataflow.task.use-kubernetes-secrets-for-db-credentials=true"}) - public class SimpleDefaultPlatformForKubernetesTests extends DefaultTaskExecutionServiceTests { + public static class SimpleDefaultPlatformForKubernetesTests extends DefaultTaskExecutionServiceTests { @Autowired DataSource dataSource; @@ -333,8 +331,7 @@ public void executeSingleTaskDefaultsToExistingSinglePlatformTestForKubernetes() @TestPropertySource(properties = {"spring.cloud.dataflow.task.maximum-concurrent-tasks=10"}) @AutoConfigureTestDatabase(replace = Replace.ANY) - @Nested - public class CICDTaskTests extends DefaultTaskExecutionServiceTests { + public static class CICDTaskTests extends DefaultTaskExecutionServiceTests { private Launcher launcher; @@ -817,8 +814,7 @@ public void testUpgradeFailureTaskCurrentlyRunning() throws MalformedURLExceptio @TestPropertySource(properties = {"spring.cloud.dataflow.task.maximum-concurrent-tasks=10"}) @AutoConfigureTestDatabase(replace = Replace.ANY) - @Nested - public class SimpleTaskTests extends DefaultTaskExecutionServiceTests { + public static class SimpleTaskTests extends DefaultTaskExecutionServiceTests { @Autowired TaskDefinitionReader taskDefinitionReader; @@ -1245,8 +1241,7 @@ public void validateNullResourceTaskTest() { @TestPropertySource(properties = {"spring.cloud.dataflow.task.auto-create-task-definitions=true"}) @AutoConfigureTestDatabase(replace = Replace.ANY) - @Nested - public class AutoCreateTaskDefinitionTests extends DefaultTaskExecutionServiceTests { + public static class AutoCreateTaskDefinitionTests extends DefaultTaskExecutionServiceTests { @Autowired TaskDefinitionRepository taskDefinitionRepository; @@ -1272,8 +1267,7 @@ public void executeTaskWithNullDefinitionCreatesDefinitionIfConfigured() { @TestPropertySource(properties = {"spring.cloud.dataflow.applicationProperties.task.globalkey=globalvalue", "spring.cloud.dataflow.applicationProperties.stream.globalstreamkey=nothere"}) @AutoConfigureTestDatabase(replace = Replace.ANY) - @Nested - public class Boot3TaskTests extends DefaultTaskExecutionServiceTests { + public static class Boot3TaskTests extends DefaultTaskExecutionServiceTests { public static final String TIMESTAMP_3 = "timestamp3"; @@ -1387,9 +1381,7 @@ public void launchBoot3WithVersion() throws IOException { "spring.cloud.dataflow.applicationProperties.stream.globalstreamkey=nothere" }) @AutoConfigureTestDatabase(replace = Replace.ANY) - @Nested - - public class ComposedTaskTests extends DefaultTaskExecutionServiceTests { + public static class ComposedTaskTests extends DefaultTaskExecutionServiceTests { @Autowired TaskRepositoryContainer taskRepositoryContainer; @@ -1962,8 +1954,7 @@ public void createDuplicateChildTaskComposedTask() { @TestPropertySource(properties = {"spring.cloud.dataflow.applicationProperties.task.globalkey=globalvalue", "spring.cloud.dataflow.applicationProperties.stream.globalstreamkey=nothere", "spring.cloud.dataflow.task.useUserAccessToken=true"}) @AutoConfigureTestDatabase(replace = Replace.ANY) - @Nested - public class ComposedTaskWithSystemUseUserAccessTokenTests extends DefaultTaskExecutionServiceTests { + public static class ComposedTaskWithSystemUseUserAccessTokenTests extends DefaultTaskExecutionServiceTests { @Autowired TaskRepositoryContainer taskRepositoryContainer;