Skip to content
This repository was archived by the owner on May 14, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -254,21 +255,36 @@ public Set<Long> getJobExecutionIdsByTaskExecutionId(long taskExecutionId, Strin
Assert.notNull(taskExplorer, "Expected TaskExplorer for " + schemaTarget);
return taskExplorer.getJobExecutionIdsByTaskExecutionId(taskExecutionId);
}

private static void add(Map<String, Set<String>> setMap, String key, String value) {
Set<String> set = setMap.computeIfAbsent(key, (v) -> new HashSet<>());
set.add(value);
}
@Override
public List<AggregateTaskExecution> getLatestTaskExecutionsByTaskNames(String... taskNames) {
List<AggregateTaskExecution> result = new ArrayList<>();
Map<String, Set<String>> targetToTaskNames = new HashMap<>();
Map<String, String> taskNamePlatform = new HashMap<>();
for (String taskName : taskNames) {
SchemaVersionTarget target = aggregateExecutionSupport.findSchemaVersionTarget(taskName, taskDefinitionReader);
String platformName = getPlatformName(taskName);
Assert.notNull(target, "Expected to find SchemaVersionTarget for " + taskName);
TaskExplorer taskExplorer = taskExplorers.get(target.getName());
Assert.notNull(taskExplorer, "Expected TaskExplorer for " + target.getName());
List<AggregateTaskExecution> taskExecutions = taskExplorer.getLatestTaskExecutionsByTaskNames(taskNames)
add(targetToTaskNames, target.getName(), taskName);
if(platformName != null) {
taskNamePlatform.put(taskName, platformName);
}
}
for(String target : targetToTaskNames.keySet()) {
Set<String> tasks = targetToTaskNames.get(target);
if(!tasks.isEmpty()) {
TaskExplorer taskExplorer = taskExplorers.get(target);
Assert.notNull(taskExplorer, "Expected TaskExplorer for " + target);
List<AggregateTaskExecution> taskExecutions = taskExplorer
.getLatestTaskExecutionsByTaskNames(tasks.toArray(new String[0]))
.stream()
.map(execution -> aggregateExecutionSupport.from(execution, target.getName(), platformName))
.map(execution -> aggregateExecutionSupport.from(execution, target, taskNamePlatform.get(execution.getTaskName())))
.collect(Collectors.toList());
result.addAll(taskExecutions);
result.addAll(taskExecutions);
}
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,14 @@
import org.springframework.cloud.dataflow.server.controller.JobStepExecutionProgressController;
import org.springframework.cloud.dataflow.server.controller.RestControllerAdvice;
import org.springframework.cloud.dataflow.server.controller.SchemaController;
import org.springframework.cloud.dataflow.server.controller.TaskDefinitionController;
import org.springframework.cloud.dataflow.server.controller.TaskExecutionController;
import org.springframework.cloud.dataflow.server.controller.TaskExecutionThinController;
import org.springframework.cloud.dataflow.server.controller.TaskLogsController;
import org.springframework.cloud.dataflow.server.controller.TaskPlatformController;
import org.springframework.cloud.dataflow.server.controller.TasksInfoController;
import org.springframework.cloud.dataflow.server.controller.assembler.DefaultTaskDefinitionAssemblerProvider;
import org.springframework.cloud.dataflow.server.controller.assembler.TaskDefinitionAssemblerProvider;
import org.springframework.cloud.dataflow.server.job.LauncherRepository;
import org.springframework.cloud.dataflow.server.repository.DataflowJobExecutionDaoContainer;
import org.springframework.cloud.dataflow.server.repository.DataflowTaskExecutionDaoContainer;
Expand Down Expand Up @@ -409,7 +412,28 @@ public TaskExecutionInfoService taskDefinitionRetriever(
composedTaskRunnerConfigurationProperties
);
}

@Bean
public TaskDefinitionAssemblerProvider taskDefinitionAssemblerProvider(
TaskExecutionService taskExecutionService,
TaskJobService taskJobService,
AggregateTaskExplorer taskExplorer,
AggregateExecutionSupport aggregateExecutionSupport
) {
return new DefaultTaskDefinitionAssemblerProvider(taskExecutionService, taskJobService, taskExplorer, aggregateExecutionSupport);
}
@Bean
public TaskDefinitionController taskDefinitionController(
AggregateTaskExplorer explorer, TaskDefinitionRepository repository,
TaskSaveService taskSaveService, TaskDeleteService taskDeleteService,
TaskDefinitionAssemblerProvider taskDefinitionAssemblerProvider
) {
return new TaskDefinitionController(explorer,
repository,
taskSaveService,
taskDeleteService,
taskDefinitionAssemblerProvider
);
}
@Bean
@Primary
public PlatformTransactionManager transactionManager(EntityManagerFactory entityManagerFactory) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,11 @@
import org.springframework.cloud.dataflow.schema.AppBootSchemaVersion;
import org.springframework.cloud.dataflow.schema.SchemaVersionTarget;
import org.springframework.cloud.dataflow.schema.service.SchemaService;
import org.springframework.cloud.dataflow.server.config.DataFlowControllerAutoConfiguration;
import org.springframework.cloud.dataflow.server.config.DataflowAsyncAutoConfiguration;
import org.springframework.cloud.dataflow.server.config.apps.CommonApplicationProperties;
import org.springframework.cloud.dataflow.server.configuration.JobDependencies;
import org.springframework.cloud.dataflow.server.configuration.TestDependencies;
import org.springframework.cloud.dataflow.server.job.LauncherRepository;
import org.springframework.cloud.dataflow.server.repository.JobRepositoryContainer;
import org.springframework.cloud.dataflow.server.repository.TaskBatchDaoContainer;
Expand Down Expand Up @@ -109,9 +111,13 @@
* @author Chris Bono
* @author Corneil du Plessis
*/
@SpringBootTest(
classes = { JobDependencies.class, TaskExecutionAutoConfiguration.class, DataflowAsyncAutoConfiguration.class,
PropertyPlaceholderAutoConfiguration.class, BatchProperties.class})
@SpringBootTest(classes = {
JobDependencies.class,
TaskExecutionAutoConfiguration.class,
DataflowAsyncAutoConfiguration.class,
PropertyPlaceholderAutoConfiguration.class,
BatchProperties.class
})
@EnableConfigurationProperties({CommonApplicationProperties.class})
@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_EACH_TEST_METHOD)
@AutoConfigureTestDatabase(replace = Replace.ANY)
Expand Down Expand Up @@ -509,6 +515,13 @@ void getExecutionsByName() throws Exception {
.andExpect(jsonPath("$._embedded.taskExecutionResourceList", hasSize(2)));
}

@Test
void getDefinitionsWithLastExecution() throws Exception {
mockMvc.perform(get("/tasks/definitions").accept(MediaType.APPLICATION_JSON))
.andExpect(status().isOk())
.andDo(print())
.andExpect(jsonPath("$._embedded.taskDefinitionResourceList", hasSize(1)));
}
@Test
void getExecutionsByNameNotFound() throws Exception {
mockMvc.perform(get("/tasks/executions/").param("name", "BAZ").accept(MediaType.APPLICATION_JSON))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,7 @@ public abstract class DefaultTaskExecutionServiceTests {
ApplicationContext applicationContext;

@AutoConfigureTestDatabase(replace = Replace.ANY)
@Nested
public class SimpleDefaultPlatformTests extends DefaultTaskExecutionServiceTests {
public static class SimpleDefaultPlatformTests extends DefaultTaskExecutionServiceTests {

@Autowired
DataSource dataSource;
Expand Down Expand Up @@ -295,9 +294,8 @@ public void setupTest(DataSource dataSource) {
}

@AutoConfigureTestDatabase(replace = Replace.ANY)
@Nested
@TestPropertySource(properties = {"spring.cloud.dataflow.task.use-kubernetes-secrets-for-db-credentials=true"})
public class SimpleDefaultPlatformForKubernetesTests extends DefaultTaskExecutionServiceTests {
public static class SimpleDefaultPlatformForKubernetesTests extends DefaultTaskExecutionServiceTests {

@Autowired
DataSource dataSource;
Expand Down Expand Up @@ -333,8 +331,7 @@ public void executeSingleTaskDefaultsToExistingSinglePlatformTestForKubernetes()

@TestPropertySource(properties = {"spring.cloud.dataflow.task.maximum-concurrent-tasks=10"})
@AutoConfigureTestDatabase(replace = Replace.ANY)
@Nested
public class CICDTaskTests extends DefaultTaskExecutionServiceTests {
public static class CICDTaskTests extends DefaultTaskExecutionServiceTests {

private Launcher launcher;

Expand Down Expand Up @@ -817,8 +814,7 @@ public void testUpgradeFailureTaskCurrentlyRunning() throws MalformedURLExceptio

@TestPropertySource(properties = {"spring.cloud.dataflow.task.maximum-concurrent-tasks=10"})
@AutoConfigureTestDatabase(replace = Replace.ANY)
@Nested
public class SimpleTaskTests extends DefaultTaskExecutionServiceTests {
public static class SimpleTaskTests extends DefaultTaskExecutionServiceTests {

@Autowired
TaskDefinitionReader taskDefinitionReader;
Expand Down Expand Up @@ -1245,8 +1241,7 @@ public void validateNullResourceTaskTest() {

@TestPropertySource(properties = {"spring.cloud.dataflow.task.auto-create-task-definitions=true"})
@AutoConfigureTestDatabase(replace = Replace.ANY)
@Nested
public class AutoCreateTaskDefinitionTests extends DefaultTaskExecutionServiceTests {
public static class AutoCreateTaskDefinitionTests extends DefaultTaskExecutionServiceTests {

@Autowired
TaskDefinitionRepository taskDefinitionRepository;
Expand All @@ -1272,8 +1267,7 @@ public void executeTaskWithNullDefinitionCreatesDefinitionIfConfigured() {

@TestPropertySource(properties = {"spring.cloud.dataflow.applicationProperties.task.globalkey=globalvalue", "spring.cloud.dataflow.applicationProperties.stream.globalstreamkey=nothere"})
@AutoConfigureTestDatabase(replace = Replace.ANY)
@Nested
public class Boot3TaskTests extends DefaultTaskExecutionServiceTests {
public static class Boot3TaskTests extends DefaultTaskExecutionServiceTests {

public static final String TIMESTAMP_3 = "timestamp3";

Expand Down Expand Up @@ -1387,9 +1381,7 @@ public void launchBoot3WithVersion() throws IOException {
"spring.cloud.dataflow.applicationProperties.stream.globalstreamkey=nothere"
})
@AutoConfigureTestDatabase(replace = Replace.ANY)
@Nested

public class ComposedTaskTests extends DefaultTaskExecutionServiceTests {
public static class ComposedTaskTests extends DefaultTaskExecutionServiceTests {

@Autowired
TaskRepositoryContainer taskRepositoryContainer;
Expand Down Expand Up @@ -1962,8 +1954,7 @@ public void createDuplicateChildTaskComposedTask() {

@TestPropertySource(properties = {"spring.cloud.dataflow.applicationProperties.task.globalkey=globalvalue", "spring.cloud.dataflow.applicationProperties.stream.globalstreamkey=nothere", "spring.cloud.dataflow.task.useUserAccessToken=true"})
@AutoConfigureTestDatabase(replace = Replace.ANY)
@Nested
public class ComposedTaskWithSystemUseUserAccessTokenTests extends DefaultTaskExecutionServiceTests {
public static class ComposedTaskWithSystemUseUserAccessTokenTests extends DefaultTaskExecutionServiceTests {

@Autowired
TaskRepositoryContainer taskRepositoryContainer;
Expand Down