1
1
package com .mapr .db .spark .streaming .sink
2
2
3
3
4
- import com .mapr .db .spark .sql . _
4
+ import com .mapr .db .spark ._
5
5
import com .mapr .db .spark .streaming .MapRDBSourceConfig
6
6
import org .apache .spark .internal .Logging
7
7
import org .apache .spark .sql .DataFrame
8
+ import org .apache .spark .sql .catalyst .encoders .RowEncoder
9
+ import org .apache .spark .sql .catalyst .plans .logical .{Command , LocalRelation , LogicalPlan , Union }
8
10
import org .apache .spark .sql .execution .streaming .Sink
9
11
import org .ojai .DocumentConstants
10
12
@@ -26,9 +28,26 @@ private[streaming] class MapRDBSink(parameters: Map[String, String]) extends Sin
26
28
val createTable = parameters.getOrElse(MapRDBSourceConfig .CreateTableOption , " false" ).toBoolean
27
29
val bulkInsert = parameters.getOrElse(MapRDBSourceConfig .BulkModeOption , " false" ).toBoolean
28
30
29
- data.saveToMapRDB(tablePath.get, idFieldPath, createTable, bulkInsert)
31
+ val logicalPlan : LogicalPlan = {
32
+ // For various commands (like DDL) and queries with side effects, we force query execution
33
+ // to happen right away to let these side effects take place eagerly.
34
+ data.queryExecution.analyzed match {
35
+ case c : Command =>
36
+ LocalRelation (c.output, data.queryExecution.executedPlan.executeCollect())
37
+ case u@ Union (children) if children.forall(_.isInstanceOf [Command ]) =>
38
+ LocalRelation (u.output, data.queryExecution.executedPlan.executeCollect())
39
+ case _ =>
40
+ data.queryExecution.analyzed
41
+ }
42
+ }
43
+
44
+ val encoder = RowEncoder (data.schema).resolveAndBind(
45
+ logicalPlan.output,
46
+ data.sparkSession.sessionState.analyzer)
47
+ data.queryExecution.toRdd.map(encoder.fromRow).saveToMapRDB(tablePath.get, createTable, bulkInsert, idFieldPath)
30
48
31
49
latestBatchId = batchId
32
50
}
33
51
}
52
+
34
53
}
0 commit comments