Skip to content

Commit 82b33cf

Browse files
authored
Merge branch 'master' into spark-shell-docs-2
2 parents 7ba025f + bab0584 commit 82b33cf

File tree

61 files changed

+1073
-724
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

61 files changed

+1073
-724
lines changed

Makefile

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,11 +85,20 @@ stop:
8585
make stop-standalone
8686
make stop-cluster
8787

88+
restart:
89+
make stop
90+
make start
91+
8892
test:
8993
make start
9094
# with --batch-mode maven doesn't print 'Progress: 125/150kB', the progress lines take up 90% of the log and causes
9195
# Travis build to fail with 'The job exceeded the maximum log length, and has been terminated'
92-
mvn --batch-mode -Dtest=${TEST} clean compile test
96+
mvn clean test -B
97+
make stop
98+
99+
benchmark:
100+
make start
101+
mvn clean test -B -Pbenchmark
93102
make stop
94103

95104
deploy:

doc/configuration.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ topology from the initial node, so there is no need to provide the rest of the c
88
* `spark.redis.auth` - the initial node's AUTH password
99
* `spark.redis.db` - optional DB number. Avoid using this, especially in cluster mode.
1010
* `spark.redis.timeout` - connection timeout in ms, 2000 ms by default
11-
* `spark.redis.max.pipeline.size` - the maximum number of commands per pipeline (used to batch commands). The default value is 10000.
12-
* `spark.redis.scan.count` - count option of SCAN command (used to iterate over keys). The default value is 10000.
11+
* `spark.redis.max.pipeline.size` - the maximum number of commands per pipeline (used to batch commands). The default value is 100.
12+
* `spark.redis.scan.count` - count option of SCAN command (used to iterate over keys). The default value is 100.
1313

1414

1515

doc/dataframe.md

Lines changed: 98 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,13 @@ object DataFrameExample {
2828
case class Person(name: String, age: Int)
2929

3030
def main(args: Array[String]): Unit = {
31-
val conf = new SparkConf().setAppName("redis-df")
32-
.setMaster("local[*]")
33-
.set("spark.redis.host", "localhost")
34-
.set("spark.redis.port", "6379")
35-
36-
val spark = SparkSession.builder().config(conf).getOrCreate()
31+
val spark = SparkSession
32+
.builder()
33+
.appName("redis-df")
34+
.master("local[*]")
35+
.config("spark.redis.host", "localhost")
36+
.config("spark.redis.port", "6379")
37+
.getOrCreate()
3738

3839
val personSeq = Seq(Person("John", 30), Person("Peter", 45))
3940
val df = spark.createDataFrame(personSeq)
@@ -93,6 +94,25 @@ The keys in Redis:
9394
2) "person:Peter"
9495
```
9596

97+
The keys will not be persisted in Redis hashes
98+
99+
```bash
100+
127.0.0.1:6379> hgetall person:John
101+
1) "age"
102+
2) "30"
103+
```
104+
105+
In order to load the keys back, you also need to specify
106+
the key column parameter while reading
107+
108+
```scala
109+
val df = spark.read
110+
.format("org.apache.spark.sql.redis")
111+
.option("table", "person")
112+
.option("key.column", "name")
113+
.load()
114+
```
115+
96116
### Save Modes
97117

98118
Spark-redis supports all DataFrame [SaveMode](https://spark.apache.org/docs/latest/sql-programming-guide.html#save-modes)'s: `Append`,
@@ -142,8 +162,8 @@ It also enables projection query optimization when only a small subset of column
142162
a limitation with Hash model - it doesn't support nested DataFrame schema. One option to overcome it is making your DataFrame schema flat.
143163
If it is not possible due to some constraints, you may consider using Binary persistence model.
144164

145-
With the Binary persistence model the DataFrame row is serialized into a byte array and stored as a string in Redis. This implies that
146-
storage model is private to spark-redis library and data cannot be easily queried from non-Spark environments. Another drawback
165+
With the Binary persistence model the DataFrame row is serialized into a byte array and stored as a string in Redis (the default Java Serialization is used).
166+
This implies that storage model is private to spark-redis library and data cannot be easily queried from non-Spark environments. Another drawback
147167
of Binary model is a larger memory footprint.
148168

149169
To enable Binary model use `option("model", "binary")`, e.g.
@@ -171,17 +191,18 @@ There are two options how you can read a DataFrame:
171191
To read a previously saved DataFrame, specify the table name that was used for saving. Example:
172192

173193
```scala
174-
object DataFrameTests {
194+
object DataFrameExample {
175195

176196
case class Person(name: String, age: Int)
177197

178198
def main(args: Array[String]): Unit = {
179-
val conf = new SparkConf().setAppName("redis-df")
180-
.setMaster("local[*]")
181-
.set("spark.redis.host", "localhost")
182-
.set("spark.redis.port", "6379")
183-
184-
val spark = SparkSession.builder().config(conf).getOrCreate()
199+
val spark = SparkSession
200+
.builder()
201+
.appName("redis-df")
202+
.master("local[*]")
203+
.config("spark.redis.host", "localhost")
204+
.config("spark.redis.port", "6379")
205+
.getOrCreate()
185206

186207
val personSeq = Seq(Person("John", 30), Person("Peter", 45))
187208
val df = spark.createDataFrame(personSeq)
@@ -213,9 +234,11 @@ root
213234
+-----+---+
214235
| John| 30|
215236
|Peter| 45|
216-
+-----+---+
237+
+-----+---+
217238
```
218239

240+
If they `key.column` option was used for writing, then it should be also used for reading table back. See [Specifying Redis key](#specifying-redis-key) for details.
241+
219242
To read with a Spark SQL:
220243

221244
```scala
@@ -230,22 +253,63 @@ val loadedDf = spark.sql(s"SELECT * FROM person")
230253

231254
To read Redis Hashes you have to provide keys pattern with `.option("keys.pattern", keysPattern)` option. The DataFrame schema should be explicitly specified or can be inferred from a random row.
232255

233-
An example of explicit schema:
256+
```bash
257+
hset person:1 name John age 30
258+
hset person:2 name Peter age 45
259+
```
260+
261+
An example of providing an explicit schema and specifying `key.column`:
234262

235263
```scala
236-
val df = spark.read
237-
.format("org.apache.spark.sql.redis")
238-
.schema(
239-
StructType(Array(
240-
StructField("name", StringType),
241-
StructField("age", IntegerType))
242-
)
243-
)
244-
.option("keys.pattern", "person:*")
245-
.load()
264+
val df = spark.read
265+
.format("org.apache.spark.sql.redis")
266+
.schema(
267+
StructType(Array(
268+
StructField("id", IntegerType),
269+
StructField("name", StringType),
270+
StructField("age", IntegerType))
271+
)
272+
)
273+
.option("keys.pattern", "person:*")
274+
.option("key.column", "id")
275+
.load()
276+
277+
df.show()
246278
```
247279

248-
Another option is to let spark-redis automatically infer schema based on a random row. In this case all columns will have `String` type. Example:
280+
```bash
281+
+---+-----+---+
282+
| id| name|age|
283+
+---+-----+---+
284+
| 1| John| 30|
285+
| 2|Peter| 45|
286+
+---+-----+---+
287+
```
288+
289+
Spark-Redis tries to extract the key based on the key pattern:
290+
- if the pattern ends with `*` and it's the only wildcard, the trailing substring will be extracted
291+
- otherwise there is no extraction - the key is kept as is, e.g.
292+
293+
```scala
294+
val df = // code omitted...
295+
.option("keys.pattern", "p*:*")
296+
.option("key.column", "id")
297+
.load()
298+
df.show()
299+
```
300+
301+
```bash
302+
+-----+---+------------+
303+
| name|age| id|
304+
+-----+---+------------+
305+
| John| 30| person:John|
306+
|Peter| 45|person:Peter|
307+
+-----+---+------------+
308+
```
309+
310+
Another option is to let spark-redis automatically infer schema based on a random row. In this case all columns will have `String` type.
311+
Also we don't specify `key.column` option in this example, so the column `_id` will be created.
312+
Example:
249313

250314
```scala
251315
val df = spark.read
@@ -262,21 +326,23 @@ The output is:
262326
root
263327
|-- name: string (nullable = true)
264328
|-- age: string (nullable = true)
329+
|-- _id: string (nullable = true)
265330
```
266331

332+
267333
## DataFrame options
268334

269335
| Name | Description | Type | Default |
270336
| ----------------- | ------------------------------------------------------------------------------------------| --------------------- | ------- |
271337
| model | defines Redis model used to persist DataFrame, see [Persistence model](#persistence-model)| `enum [binary, hash]` | `hash` |
272338
| partitions.number | number of partitions (applies only when reading dataframe) | `Int` | `3` |
273-
| key.column | specify unique column used as a Redis key, by default a key is auto-generated | `String` | - |
339+
| key.column | when writing - specifies unique column used as a Redis key, by default a key is auto-generated. <br/> When reading - specifies column name to store hash key | `String` | - |
274340
| ttl | data time to live in `seconds`. Data doesn't expire if `ttl` is less than `1` | `Int` | `0` |
275341
| infer.schema | infer schema from random row, all columns will have `String` type | `Boolean` | `false` |
276-
| max.pipeline.size | maximum number of commands per pipeline (used to batch commands) | `Int` | 10000 |
277-
| scan.count | count option of SCAN command (used to iterate over keys) | `Int` | 10000 |
342+
| max.pipeline.size | maximum number of commands per pipeline (used to batch commands) | `Int` | 100 |
343+
| scan.count | count option of SCAN command (used to iterate over keys) | `Int` | 100 |
278344

279345

280346
## Known limitations
281347

282-
- Nested DataFrame fields are not currently supported with Hash model. Consider making DataFrame schema flat or using Binary persistence model.
348+
- Nested DataFrame fields are not currently supported with Hash model. Consider making DataFrame schema flat or using Binary persistence model.

doc/java.md

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -81,11 +81,11 @@ Dataset<Row> df = spark.createDataFrame(Arrays.asList(
8181
new Person("Peter", 40)), Person.class);
8282

8383
df.write()
84-
.format("org.apache.spark.sql.redis")
85-
.option("table", "person")
86-
.option("key.column", "name")
87-
.mode(SaveMode.Overwrite)
88-
.save();
84+
.format("org.apache.spark.sql.redis")
85+
.option("table", "person")
86+
.option("key.column", "name")
87+
.mode(SaveMode.Overwrite)
88+
.save();
8989
```
9090

9191
## Streaming

pom.xml

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
<scala.complete.version>${scala.major.version}.12</scala.complete.version>
5252
<jedis.version>2.9.0</jedis.version>
5353
<spark.version>2.3.1</spark.version>
54+
<plugins.scalatest.version>1.0</plugins.scalatest.version>
5455
</properties>
5556

5657
<distributionManagement>
@@ -220,12 +221,13 @@
220221
<plugin>
221222
<groupId>org.scalatest</groupId>
222223
<artifactId>scalatest-maven-plugin</artifactId>
223-
<version>1.0</version>
224+
<version>${plugins.scalatest.version}</version>
224225
<configuration>
225226
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
226227
<junitxml>.</junitxml>
227228
<filereports>WDF TestSuite.txt</filereports>
228229
<argLine>-XX:MaxPermSize=256m -Xmx2g</argLine>
230+
<tagsToExclude>com.redislabs.provider.redis.util.BenchmarkTest</tagsToExclude>
229231
</configuration>
230232
<executions>
231233
<execution>
@@ -303,6 +305,21 @@
303305
</dependency>
304306
</dependencies>
305307
<profiles>
308+
<profile>
309+
<id>benchmark</id>
310+
<build>
311+
<plugins>
312+
<plugin>
313+
<groupId>org.scalatest</groupId>
314+
<artifactId>scalatest-maven-plugin</artifactId>
315+
<version>${plugins.scalatest.version}</version>
316+
<configuration combine.self="override">
317+
<tagsToInclude>com.redislabs.provider.redis.util.BenchmarkTest</tagsToInclude>
318+
</configuration>
319+
</plugin>
320+
</plugins>
321+
</build>
322+
</profile>
306323
<profile>
307324
<id>release</id>
308325
<build>

src/main/scala/com/redislabs/provider/redis/RedisConfig.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,11 +86,11 @@ case class ReadWriteConfig(scanCount: Int, maxPipelineSize: Int)
8686
object ReadWriteConfig {
8787
/** maximum number of commands per pipeline **/
8888
val MaxPipelineSizeConfKey = "spark.redis.max.pipeline.size"
89-
val MaxPipelineSizeDefault = 10000
89+
val MaxPipelineSizeDefault = 100
9090

9191
/** count option of SCAN command **/
9292
val ScanCountConfKey = "spark.redis.scan.count"
93-
val ScanCountDefault = 10000
93+
val ScanCountDefault = 100
9494

9595
val Default: ReadWriteConfig = ReadWriteConfig(ScanCountDefault, MaxPipelineSizeDefault)
9696

src/main/scala/com/redislabs/provider/redis/rdd/RedisRDD.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -475,3 +475,8 @@ trait Keys {
475475
keys.zip(types).filter(x => x._2 == t).map(x => x._1)
476476
}
477477
}
478+
479+
/**
480+
* Key utilities to avoid serialization issues.
481+
*/
482+
object Keys extends Keys
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package com.redislabs.provider.redis.util
2+
3+
import redis.clients.jedis.Jedis
4+
5+
/**
6+
* @author The Viet Nguyen
7+
*/
8+
object ConnectionUtils {
9+
10+
def withConnection[A](conn: Jedis)(body: Jedis => A): A = {
11+
val res = body(conn)
12+
conn.close()
13+
res
14+
}
15+
}

src/main/scala/org/apache/spark/sql/redis/BinaryRedisPersistence.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,14 @@ class BinaryRedisPersistence extends RedisPersistence[Array[Byte]] {
2525
override def load(pipeline: Pipeline, key: String, requiredColumns: Seq[String]): Unit =
2626
pipeline.get(key.getBytes(UTF_8))
2727

28-
override def encodeRow(value: Row): Array[Byte] = {
28+
override def encodeRow(keyName: String, value: Row): Array[Byte] = {
2929
val fields = value.schema.fields.map(_.name)
3030
val valuesArray = fields.map(f => value.getAs[Any](f))
3131
SerializationUtils.serialize(valuesArray)
3232
}
3333

34-
override def decodeRow(value: Array[Byte], schema: => StructType, inferSchema: Boolean): Row = {
34+
override def decodeRow(keyMap: (String, String), value: Array[Byte], schema: StructType,
35+
requiredColumns: Seq[String]): Row = {
3536
val valuesArray: Array[Any] = SerializationUtils.deserialize(value)
3637
new GenericRowWithSchema(valuesArray, schema)
3738
}

0 commit comments

Comments
 (0)