Skip to content

Commit 3545f3c

Browse files
committed
Merge pull request #24 in BACIBBD/spline from feature/SL-59-updating-Atlas-persistence-layer to feature/SL-43-new-data-model
* commit '947e742dabbeedc49df5b118beb1d71029e2f590': SL-59 Adding comment parameters SL-59 Refactoring Atlas persistence layer according to the latest Splline data model. SL-59 Renaming AtlasDataLineagePersistor to AtlasDataLineageWriter
2 parents b6e9186 + 947e742 commit 3545f3c

File tree

11 files changed

+189
-157
lines changed

11 files changed

+189
-157
lines changed

persistence/atlas/src/main/atlas/spline-meta-model.json

Lines changed: 18 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,14 @@
104104
"Process", "spark_entity"
105105
],
106106
"typeVersion": "1.0",
107+
"attributeDefs": []
108+
},
109+
{
110+
"name": "spark_generic_operation",
111+
"superTypes": [
112+
"Process", "spark_operation"
113+
],
114+
"typeVersion": "1.0",
107115
"attributeDefs": [
108116
{
109117
"name": "rawString",
@@ -200,7 +208,7 @@
200208
"attributeDefs": [
201209
{
202210
"name": "attributes",
203-
"typeName": "array<spark_dataset_attribute>",
211+
"typeName": "array<spark_attribute>",
204212
"cardinality": "LIST",
205213
"constraints": [
206214
{
@@ -260,34 +268,6 @@
260268
"schemaElementsAttribute": "attributes"
261269
}
262270
},
263-
{
264-
"name": "spark_dataset_attribute",
265-
"superTypes": [
266-
"spark_attribute"
267-
],
268-
"typeVersion": "1.0",
269-
"attributeDefs": [
270-
{
271-
"name": "dataset",
272-
"typeName": "spark_dataset",
273-
"cardinality": "SINGLE",
274-
"constraints": [
275-
{
276-
"type": "inverseRef",
277-
"params": {
278-
"attribute": "attributes"
279-
}
280-
}
281-
],
282-
"isIndexable": false,
283-
"isOptional": true,
284-
"isUnique": false
285-
}
286-
],
287-
"options": {
288-
"schemaAttributes": "[\"name\", \"type\"]"
289-
}
290-
},
291271
{
292272
"name": "spark_data_type",
293273
"superTypes": [
@@ -467,7 +447,7 @@
467447
"attributeDefs": [
468448
{
469449
"name": "attributeId",
470-
"typeName": "long",
450+
"typeName": "string",
471451
"cardinality": "SINGLE",
472452
"isIndexable": false,
473453
"isOptional": true,
@@ -480,6 +460,14 @@
480460
"isIndexable": false,
481461
"isOptional": false,
482462
"isUnique": false
463+
},
464+
{
465+
"name": "attribute",
466+
"typeName": "spark_attribute",
467+
"cardinality": "SINGLE",
468+
"isIndexable": false,
469+
"isOptional": false,
470+
"isUnique": false
483471
}
484472
]
485473
},

persistence/atlas/src/main/scala/za/co/absa/spline/persistence/atlas/conversion/AttributeConverter.scala

Lines changed: 3 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -16,46 +16,22 @@
1616

1717
package za.co.absa.spline.persistence.atlas.conversion
1818

19-
import za.co.absa.spline.model.Schema
2019
import za.co.absa.spline.persistence.atlas.model._
21-
import za.co.absa.spline.persistence.atlas.model.Dataset
2220

2321
/**
2422
* The object is responsible for conversion of [[za.co.absa.spline.model.Attribute Spline attributes]] to [[za.co.absa.spline.persistence.atlas.model.Attribute Atlas attributes]].
2523
*/
2624
object AttributeConverter {
2725

28-
/**
29-
* The method converts an options of [[za.co.absa.spline.model.Schema Spline attributes]] to a sequence [[za.co.absa.spline.persistence.atlas.model.Attribute Atlas attributes]].
30-
*
31-
* @param splineAttributes An Input option of Spline attributes
32-
* @param dataset A dataset that the attributes are part of
33-
* @return Sequence of Atlas attributes
34-
*/
35-
def convert(splineAttributes : Option[Schema], dataset : Dataset) : Seq[Attribute] = splineAttributes match {
36-
case None => Seq.empty
37-
case Some(x) => convert(x, dataset)
38-
}
39-
40-
/**
41-
* The method converts [[za.co.absa.spline.model.Schema Spline attributes]] to a sequence [[za.co.absa.spline.persistence.atlas.model.Attribute Atlas attributes]].
42-
*
43-
* @param splineAttributes Input Spline attributes
44-
* @param dataset A dataset that the attributes are part of
45-
* @return Sequence of Atlas attributes
46-
*/
47-
def convert(splineAttributes: Schema, dataset : Dataset) : Seq[Attribute] = splineAttributes.attrs.map(i => ??? /*convert(i, dataset)*/)
48-
4926
/**
5027
* The method converts an [[za.co.absa.spline.model.Attribute Spline attribute]] to an [[za.co.absa.spline.persistence.atlas.model.Attribute Atlas attribute]].
5128
* @param splineAttribute An input Spline attribute
52-
* @param dataset A dataset that the attribute is part of
5329
* @return An Atlas attributes
5430
*/
55-
def convert(splineAttribute : za.co.absa.spline.model.Attribute, dataset : Dataset) : Attribute = {
56-
val attributeQualifiedName = dataset.qualifiedName + "@" + splineAttribute.name
31+
def convert(splineAttribute : za.co.absa.spline.model.Attribute) : Attribute = {
32+
val attributeQualifiedName = splineAttribute.id.toString
5733
val dataType = DataTypeConverter.convert(splineAttribute.dataType, attributeQualifiedName)
58-
new Attribute(splineAttribute.name, attributeQualifiedName, dataType, dataset.getId)
34+
new Attribute(splineAttribute.name, splineAttribute.id, dataType)
5935
}
6036

6137
}

persistence/atlas/src/main/scala/za/co/absa/spline/persistence/atlas/conversion/DataLineageToTypeSystemConverter.scala

Lines changed: 10 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
package za.co.absa.spline.persistence.atlas.conversion
1818

1919
import org.apache.atlas.typesystem.Referenceable
20-
import za.co.absa.spline.model._
20+
import za.co.absa.spline.model.DataLineage
2121
import za.co.absa.spline.persistence.atlas.model._
2222

2323
/**
@@ -31,15 +31,16 @@ object DataLineageToTypeSystemConverter {
3131
* @return Atlas entities
3232
*/
3333
def convert(lineage: DataLineage): Seq[Referenceable] = {
34-
val hashSuffix = "_" + ???
35-
val nodesWithIndexes = lineage.operations.zipWithIndex
36-
val datasets = createDatasets(nodesWithIndexes, hashSuffix)
37-
val operations = createOperations(nodesWithIndexes, hashSuffix, datasets)
38-
val process = createProcess(lineage, hashSuffix, operations, datasets)
39-
datasets ++ operations ++ Seq(process)
34+
val attributes = lineage.attributes.map(i => AttributeConverter.convert(i))
35+
val attributeIdMap = attributes.map(i => i.qualifiedName -> i.getId).toMap
36+
val datasets = DatasetConverter.convert(lineage.operations, lineage.datasets, attributeIdMap)
37+
val datasetIdMap = datasets.map(i => i.qualifiedName -> i.getId).toMap
38+
val operations = OperationConverter.convert(lineage.operations, datasetIdMap, attributeIdMap)
39+
val process = createProcess(lineage, operations, datasets, attributes)
40+
attributes ++ datasets ++ operations :+ process
4041
}
4142

42-
private def createProcess(lineage: DataLineage, hashSuffix: String, operations : Seq[Operation] , datasets : Seq[Dataset]) : Referenceable = {
43+
private def createProcess(lineage: DataLineage, operations : Seq[Operation] , datasets : Seq[Dataset], attributes: Seq[Attribute]) : Referenceable = {
4344
val (inputDatasets, outputDatasets) = datasets
4445
.filter(_.isInstanceOf[EndpointDataset])
4546
.map(_.asInstanceOf[EndpointDataset])
@@ -48,7 +49,7 @@ object DataLineageToTypeSystemConverter {
4849
new Job(
4950
lineage.id.toString,
5051
lineage.appName,
51-
lineage.appName + hashSuffix,
52+
lineage.id.toString,
5253
operations.map(_.getId),
5354
datasets.map(_.getId),
5455
inputDatasets.map(_.getId),
@@ -57,52 +58,4 @@ object DataLineageToTypeSystemConverter {
5758
outputDatasets.map(_.endpoint.getId)
5859
)
5960
}
60-
private def createDatasets(nodesWithIndexes: Seq[Tuple2[op.Operation,Int]], hashSuffix: String) : Seq[Dataset] =
61-
nodesWithIndexes.map(i =>
62-
{
63-
val datasetSuffix = "_Dataset"
64-
val name = i._1.mainProps.name + datasetSuffix
65-
val operationIdSuffix = "_op" + i._2.toString
66-
val qualifiedName = name + operationIdSuffix + hashSuffix
67-
val dataset = i._1 match {
68-
case op.Source(m, st, paths) =>
69-
val path = paths.mkString(", ")
70-
new EndpointDataset(name, qualifiedName, new FileEndpoint(path, path), EndpointType.file, EndpointDirection.input, st)
71-
case op.Destination(m, dt, path) => new EndpointDataset(name, qualifiedName, new FileEndpoint(path, path), EndpointType.file, EndpointDirection.output, dt)
72-
case _ => new Dataset(name, qualifiedName)
73-
}
74-
??? //dataset.addAttributes(AttributeConverter.convert(i._1.mainProps.output, dataset))
75-
dataset
76-
})
77-
78-
private def createOperations(nodesWithIndexes: Seq[Tuple2[op.Operation,Int]], hashSuffix: String, datasets : Seq[Dataset]) : Seq[Operation] = {
79-
val operations = nodesWithIndexes.map(i =>
80-
{
81-
val operationIdSuffix = "_op" + i._2
82-
val commonProperties = OperationCommonProperties(
83-
i._2,
84-
i._1.mainProps.name,
85-
i._1.mainProps.name + operationIdSuffix + hashSuffix,
86-
???,
87-
??? //i._1.mainProps.childRefs
88-
)
89-
i._1 match {
90-
case op.Join(_, c, t) => new JoinOperation(commonProperties, t, c.map(j => ExpressionConverter.convert(commonProperties.qualifiedName, j)).get)
91-
case op.Filter(_, c) => new FilterOperation(commonProperties, ExpressionConverter.convert(commonProperties.qualifiedName, c))
92-
case op.Projection(_, t) => new ProjectOperation(commonProperties, t.zipWithIndex.map(j => ExpressionConverter.convert(commonProperties.qualifiedName + "@" + j._2, j._1)))
93-
case op.Alias(_, a) => new AliasOperation(commonProperties, a)
94-
case _ => new Operation(commonProperties)
95-
}
96-
}
97-
)
98-
99-
val datasetIds = datasets.map(_.getId)
100-
101-
for(o <- operations){
102-
o.resolveInputDatasets(datasetIds)
103-
o.resolveOutputDatasets(datasetIds)
104-
}
105-
106-
operations
10761
}
108-
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Copyright 2017 Barclays Africa Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package za.co.absa.spline.persistence.atlas.conversion
18+
19+
import java.util.UUID
20+
21+
import org.apache.atlas.typesystem.persistence.Id
22+
import za.co.absa.spline.model.{MetaDataset, op}
23+
import za.co.absa.spline.persistence.atlas.model._
24+
25+
/**
26+
* The object is responsible for conversion of [[za.co.absa.spline.model.MetaDataset Spline meta data sets]] to [[za.co.absa.spline.persistence.atlas.model.Dataset Atlas data sets]].
27+
*/
28+
object DatasetConverter {
29+
val datasetSuffix = "_Dataset"
30+
31+
/**
32+
* The method converts [[za.co.absa.spline.model.MetaDataset Spline meta data sets]] to [[za.co.absa.spline.persistence.atlas.model.Dataset Atlas data sets]].
33+
* @param operations A sequence of [[za.co.absa.spline.model.op.Operation Spline operations]]
34+
* @param datasets A sequence of [[za.co.absa.spline.model.MetaDataset Spline meta data sets]]
35+
* @param attributeIdMap A map of Spline attribute ids to Atlas ids
36+
* @return A sequence of [[za.co.absa.spline.persistence.atlas.model.Dataset Atlas data sets]]
37+
*/
38+
def convert(operations: Seq[op.Operation], datasets : Seq[MetaDataset], attributeIdMap: Map[UUID,Id]) : Seq[Dataset] =
39+
for(
40+
operation <- operations;
41+
dataset <- datasets.withFilter(d => d.id == operation.mainProps.output);
42+
name = operation.mainProps.name + datasetSuffix;
43+
qualifiedName = dataset.id;
44+
attributes = dataset.schema.attrs.map(i => attributeIdMap(i))
45+
)
46+
yield operation match {
47+
case op.Source(m, st, paths) =>
48+
val path = paths.mkString(", ")
49+
new EndpointDataset(name, qualifiedName, attributes, new FileEndpoint(path, path), EndpointType.file, EndpointDirection.input, st)
50+
case op.Destination(m, dt, path) => new EndpointDataset(name, qualifiedName, attributes, new FileEndpoint(path, path), EndpointType.file, EndpointDirection.output, dt)
51+
case _ => new Dataset(name, qualifiedName, attributes)
52+
}
53+
}

persistence/atlas/src/main/scala/za/co/absa/spline/persistence/atlas/conversion/ExpressionConverter.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616

1717
package za.co.absa.spline.persistence.atlas.conversion
1818

19+
import java.util.UUID
20+
21+
import org.apache.atlas.typesystem.persistence.Id
1922
import za.co.absa.spline.model._
2023
import za.co.absa.spline.persistence.atlas.model._
2124

@@ -29,11 +32,12 @@ object ExpressionConverter {
2932
*
3033
* @param qualifiedNamePrefix A prefix helping to ensure uniqueness of qualified names of created expressions
3134
* @param expression An input Spline expression
35+
* @param attributeIdMap A map of Spline attribute ids to Atlas ids
3236
* @return An Atlas expression
3337
*/
34-
def convert(qualifiedNamePrefix: String, expression: expr.Expression): Expression = {
38+
def convert(qualifiedNamePrefix: String, expression: expr.Expression, attributeIdMap: Map[UUID, Id]): Expression = {
3539
val qualifiedName = qualifiedNamePrefix + "@" + expression.text
36-
val children = expression.children.zipWithIndex.map(i => convert(qualifiedName + "@" + i._2, i._1))
40+
val children = expression.children.zipWithIndex.map(i => convert(qualifiedName + "@" + i._2, i._1, attributeIdMap))
3741
val mainProperties = ExpressionCommonProperties(
3842
qualifiedName,
3943
expression.text,
@@ -44,7 +48,7 @@ object ExpressionConverter {
4448

4549
expression match {
4650
case expr.Binary(_, symbol, _, _, _) => new BinaryExpression(mainProperties, symbol)
47-
case expr.AttributeReference(attributeId, attributeName, _, _) => new AttributeReferenceExpression(mainProperties, attributeId, attributeName)
51+
case expr.AttributeReference(attributeId, attributeName, _, _) => new AttributeReferenceExpression(mainProperties, attributeId, attributeName, attributeIdMap(attributeId))
4852
case expr.UserDefinedFunction(name, _, _, _) => new UserDefinedFunctionExpression(mainProperties, name)
4953
case _ => new Expression(mainProperties)
5054
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Copyright 2017 Barclays Africa Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package za.co.absa.spline.persistence.atlas.conversion
18+
19+
import java.util.UUID
20+
21+
import org.apache.atlas.typesystem.persistence.Id
22+
import za.co.absa.spline.model.op
23+
import za.co.absa.spline.persistence.atlas.model._
24+
25+
/**
26+
* The object is responsible for conversion of [[za.co.absa.spline.model.op.Operation Spline operations]] to [[za.co.absa.spline.persistence.atlas.model.Operation Atlas operations]].
27+
*/
28+
object OperationConverter {
29+
30+
/**
31+
* The method converts [[za.co.absa.spline.model.op.Operation Spline operations]] to [[za.co.absa.spline.persistence.atlas.model.Operation Atlas operations]].
32+
* @param operations A sequence of [[za.co.absa.spline.model.op.Operation Spline operations]]
33+
* @param datasetIdMap A map of Spline data set ids to Atlas ids
34+
* @param attributeIdMap A map of Spline attribute ids to Atlas ids
35+
* @return A sequence of [[za.co.absa.spline.persistence.atlas.model.Operation Atlas operations]]
36+
*/
37+
def convert(operations: Seq[op.Operation], datasetIdMap : Map[UUID, Id], attributeIdMap: Map[UUID, Id]) : Seq[Operation] =
38+
operations.map{o =>
39+
val commonProperties = OperationCommonProperties(
40+
o.mainProps.name,
41+
o.mainProps.id.toString,
42+
o.mainProps.inputs.map(i => datasetIdMap(i)),
43+
Seq(datasetIdMap(o.mainProps.output))
44+
)
45+
o match {
46+
case op.Join(_, c, t) => new JoinOperation(commonProperties, t, c.map(j => ExpressionConverter.convert(commonProperties.qualifiedName, j, attributeIdMap)).get)
47+
case op.Filter(_, c) => new FilterOperation(commonProperties, ExpressionConverter.convert(commonProperties.qualifiedName, c, attributeIdMap))
48+
case op.Projection(_, t) => new ProjectOperation(commonProperties, t.zipWithIndex.map(j => ExpressionConverter.convert(commonProperties.qualifiedName + "@" + j._2, j._1, attributeIdMap)))
49+
case op.Alias(_, a) => new AliasOperation(commonProperties, a)
50+
case op.Generic(_, r) => new GenericOperation(commonProperties, r)
51+
case _ => new Operation(commonProperties)
52+
}
53+
}
54+
}

persistence/atlas/src/main/scala/za/co/absa/spline/persistence/atlas/model/Attribute.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package za.co.absa.spline.persistence.atlas.model
1818

19+
import java.util.UUID
20+
1921
import org.apache.atlas.AtlasClient
2022
import org.apache.atlas.typesystem.persistence.Id
2123
import org.apache.atlas.typesystem.{Referenceable, Struct}
@@ -25,15 +27,13 @@ import org.apache.atlas.typesystem.{Referenceable, Struct}
2527
* @param name A name
2628
* @param qualifiedName An unique identifier
2729
* @param dataType A data type
28-
* @param dataset A data set that the attribute is part of
2930
*/
30-
class Attribute(name : String, qualifiedName: String, dataType : DataType, dataset: Id) extends Referenceable(
31+
class Attribute(val name : String, val qualifiedName: UUID, dataType : DataType) extends Referenceable(
3132
SparkDataTypes.Attribute,
3233
new java.util.HashMap[String, Object]{
3334
put(AtlasClient.NAME, name)
34-
put(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, qualifiedName)
35+
put(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, qualifiedName.toString)
3536
put("type", dataType.name)
3637
put("typeRef", dataType)
37-
put("dataset", dataset)
3838
}
3939
)

0 commit comments

Comments
 (0)