18
18
package org .apache .spark .examples .pythonconverters
19
19
20
20
import scala .collection .JavaConversions ._
21
+ import scala .util .parsing .json ._
21
22
22
23
import org .apache .spark .api .python .Converter
23
24
import org .apache .hadoop .hbase .client .{Put , Result }
@@ -28,22 +29,23 @@ import org.apache.hadoop.hbase.CellUtil
28
29
29
30
/**
30
31
* Implementation of [[org.apache.spark.api.python.Converter ]] that converts all
31
- * the records in an HBase Result to a String
32
+ * the records in an HBase Result to an Array[ String]
32
33
*/
33
- class HBaseResultToStringConverter extends Converter [Any , String ] {
34
- override def convert (obj : Any ): String = {
34
+ class HBaseResultToStringConverter extends Converter [Any , Array [ String ] ] {
35
+ override def convert (obj : Any ): Array [ String ] = {
35
36
import collection .JavaConverters ._
36
37
val result = obj.asInstanceOf [Result ]
37
38
val output = result.listCells.asScala.map(cell =>
38
- " {'columnFamily':'%s','qualifier':'%s','timestamp':'%s','type':'%s','value':'%s'}" .format(
39
- Bytes .toStringBinary(CellUtil .cloneFamily(cell)),
40
- Bytes .toStringBinary(CellUtil .cloneQualifier(cell)),
41
- cell.getTimestamp.toString,
42
- Type .codeToType(cell.getTypeByte),
43
- Bytes .toStringBinary(CellUtil .cloneValue(cell))
39
+ Map (
40
+ " row" -> Bytes .toStringBinary(CellUtil .cloneRow(cell)),
41
+ " columnFamily" -> Bytes .toStringBinary(CellUtil .cloneFamily(cell)),
42
+ " qualifier" -> Bytes .toStringBinary(CellUtil .cloneQualifier(cell)),
43
+ " timestamp" -> cell.getTimestamp.toString,
44
+ " type" -> Type .codeToType(cell.getTypeByte).toString,
45
+ " value" -> Bytes .toStringBinary(CellUtil .cloneValue(cell))
44
46
)
45
- )
46
- output.mkString( " " )
47
+ )
48
+ output.map( JSONObject (_).toString()).toArray
47
49
}
48
50
}
49
51
0 commit comments