Skip to content

fix(server): delete scheduler-xxx while deleting a project #579

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,15 @@ public class Neo4jSinkWriter extends BaseSinkWriter<Neo4jSinkNodeConfig> {
private Project project;
private static final String DOT = ".";

private static RejectedExecutionHandler handler =
private static final RejectedExecutionHandler handler =
(r, executor) -> {
try {
executor.getQueue().put(r);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
};
private static ExecutorService executor =
private static final ExecutorService executor =
new ThreadPoolExecutor(
NUM_THREADS,
NUM_THREADS,
Expand Down Expand Up @@ -169,7 +169,7 @@ private void awaitAllTasks(List<Future<Void>> futures)

private void writeNode(SubGraphRecord.Node node) {
try {
Long statr = System.currentTimeMillis();
long start = System.currentTimeMillis();
RecordAlterOperationEnum operation = context.getOperation();
if (StringUtils.isBlank(node.getId())
|| StringUtils.isBlank(node.getName())
Expand All @@ -184,7 +184,7 @@ private void writeNode(SubGraphRecord.Node node) {
List<LPGPropertyRecord> properties = Lists.newArrayList();
for (Map.Entry<String, Object> entry : node.getProperties().entrySet()) {
Object entryValue = entry.getValue();
if (!TypeChecker.isArrayOrCollectionOfPrimitives(entryValue)) {
if (!TypeChecker.isBasicType(entryValue)) {
entryValue = JSON.toJSONString(entryValue);
}
properties.add(new LPGPropertyRecord(entry.getKey(), entryValue));
Expand All @@ -199,7 +199,7 @@ private void writeNode(SubGraphRecord.Node node) {
log.info(
String.format(
"write Node succeed id:%s cons:%s",
node.getId(), System.currentTimeMillis() - statr));
node.getId(), System.currentTimeMillis() - start));
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand All @@ -214,7 +214,7 @@ private String labelPrefix(String label) {

private void writeEdge(SubGraphRecord.Edge edge) {
try {
Long statr = System.currentTimeMillis();
long start = System.currentTimeMillis();
RecordAlterOperationEnum operation = context.getOperation();
if (StringUtils.isBlank(edge.getFrom())
|| StringUtils.isBlank(edge.getTo())
Expand All @@ -226,7 +226,7 @@ private void writeEdge(SubGraphRecord.Edge edge) {
List<LPGPropertyRecord> properties = Lists.newArrayList();
for (Map.Entry<String, Object> entry : edge.getProperties().entrySet()) {
Object entryValue = entry.getValue();
if (!TypeChecker.isArrayOrCollectionOfPrimitives(entryValue)) {
if (!TypeChecker.isBasicType(entryValue)) {
entryValue = JSON.toJSONString(entryValue);
}
properties.add(new LPGPropertyRecord(entry.getKey(), entryValue));
Expand All @@ -244,9 +244,10 @@ private void writeEdge(SubGraphRecord.Edge edge) {
client.deleteEdge(edge.getLabel(), edgeRecords);
}
log.info(
String.format(
"write Edge succeed from:%s to:%s cons:%s",
edge.getFrom(), edge.getTo(), System.currentTimeMillis() - statr));
"write Edge succeed from:{} to:{} cons:{}",
edge.getFrom(),
edge.getTo(),
System.currentTimeMillis() - start);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,36 @@ public static boolean isArrayOrCollectionOfPrimitives(Object obj) {
}
return false;
}

public static boolean isBasicType(Object obj) {
if (obj == null) {
return false;
}
if (obj instanceof Object[]) {
for (Object element : (Object[]) obj) {
if (!isBasicType(element)) {
return false;
}
}
return true;

} else if (obj instanceof Collection<?>) {
for (Object element : (Collection<?>) obj) {
if (!isBasicType(element)) {
return false;
}
}
return true;
}

return obj.getClass().isPrimitive()
|| obj instanceof Integer
|| obj instanceof Double
|| obj instanceof Float
|| obj instanceof Long
|| obj instanceof Byte
|| obj instanceof Boolean
|| obj instanceof Character
|| obj instanceof CharSequence;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ public SchedulerEnum.TaskStatus submit(TaskExecuteContext context) {
context.addTraceLog("update index schema index_ids:%s", retrievals);
List<Long> retrievalList = JSON.parseObject(retrievals, new TypeReference<List<Long>>() {});
Project project = projectService.queryById(job.getProjectId());
if (project == null) {
context.addTraceLog("project not exist");
return SchedulerEnum.TaskStatus.FINISH;
}
for (Long id : retrievalList) {
Retrieval retrieval = retrievalService.getById(id);
context.addTraceLog("update index(%s) schema", retrieval.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ public void check(BaseAdvancedType advancedType, SchemaCheckContext context) {
}

this.checkBasicInfo(spgTypeIdentifier, property, context);
this.checkBuiltInProperty(advancedType.getSpgTypeEnum(), property);
if (!advancedType.isUpdate()) {
this.checkBuiltInProperty(advancedType.getSpgTypeEnum(), property);
}
this.checkConstraint(property);

if (property.getLogicalRule() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ public interface ProjectDOMapper {

void deleteFromKgBuilderJob(Long id);

void deleteFromKgSchedulerInstance(Long id);

void deleteFromKgSchedulerJob(Long id);

void deleteFromKgSchedulerTask(Long id);

void deleteFromKgResourcePermission(Long projectId);

int insert(ProjectDO record);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ public Integer deleteById(Long projectId) {
projectDOMapper.deleteFromKgReasonTask(projectId);
projectDOMapper.deleteFromKgReasonTutorial(projectId);
projectDOMapper.deleteFromKgBuilderJob(projectId);
projectDOMapper.deleteFromKgSchedulerInstance(projectId);
projectDOMapper.deleteFromKgSchedulerJob(projectId);
projectDOMapper.deleteFromKgSchedulerTask(projectId);
projectDOMapper.deleteFromKgResourcePermission(projectId);
return projectDOMapper.deleteByPrimaryKey(projectId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,15 @@
<delete id="deleteFromKgBuilderJob" parameterType="java.lang.Long">
DELETE FROM kg_builder_job WHERE project_id = #{id,jdbcType=BIGINT};
</delete>
<delete id="deleteFromKgSchedulerInstance" parameterType="java.lang.Long">
DELETE FROM kg_scheduler_instance WHERE project_id = #{id,jdbcType=BIGINT};
</delete>
<delete id="deleteFromKgSchedulerJob" parameterType="java.lang.Long">
DELETE FROM kg_scheduler_job WHERE project_id = #{id,jdbcType=BIGINT};
</delete>
<delete id="deleteFromKgSchedulerTask" parameterType="java.lang.Long">
DELETE FROM kg_scheduler_task WHERE project_id = #{id,jdbcType=BIGINT};
</delete>
<delete id="deleteFromKgResourcePermission" parameterType="java.lang.Long">
DELETE FROM kg_resource_permission WHERE resource_id = #{id,jdbcType=BIGINT} and resource_tag = 'KNOWLEDGE_BASE';
</delete>
Expand Down
Loading