Skip to content

Commit c852201

Browse files
colorantaarondav
authored andcommitted
For SPARK-1082, Use Curator for ZK interaction in standalone cluster
Author: Raymond Liu <[email protected]> Closes #611 from colorant/curator and squashes the following commits: 7556aa1 [Raymond Liu] Address review comments af92e1f [Raymond Liu] Fix coding style 964f3c2 [Raymond Liu] Ignore NodeExists exception 6df2966 [Raymond Liu] Rewrite zookeeper client code with curator
1 parent 1f4c7f7 commit c852201

File tree

9 files changed

+99
-300
lines changed

9 files changed

+99
-300
lines changed

core/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,8 @@
5555
<artifactId>avro-ipc</artifactId>
5656
</dependency>
5757
<dependency>
58-
<groupId>org.apache.zookeeper</groupId>
59-
<artifactId>zookeeper</artifactId>
58+
<groupId>org.apache.curator</groupId>
59+
<artifactId>curator-recipes</artifactId>
6060
</dependency>
6161
<dependency>
6262
<groupId>org.eclipse.jetty</groupId>

core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import org.apache.spark.deploy.master.MasterMessages.ElectedLeader
3030
* [[org.apache.spark.deploy.master.MasterMessages.RevokedLeadership RevokedLeadership]]
3131
*/
3232
private[spark] trait LeaderElectionAgent extends Actor {
33+
//TODO: LeaderElectionAgent does not necessary to be an Actor anymore, need refactoring.
3334
val masterActor: ActorRef
3435
}
3536

core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,6 @@ private[master] object MasterMessages {
2828

2929
case object RevokedLeadership
3030

31-
// Actor System to LeaderElectionAgent
32-
33-
case object CheckLeader
34-
3531
// Actor System to Master
3632

3733
case object CheckForWorkerTimeOut
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.deploy.master
19+
20+
import org.apache.spark.{SparkConf, Logging}
21+
import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
22+
import org.apache.curator.retry.ExponentialBackoffRetry
23+
import org.apache.zookeeper.KeeperException
24+
25+
26+
object SparkCuratorUtil extends Logging {
27+
28+
val ZK_CONNECTION_TIMEOUT_MILLIS = 15000
29+
val ZK_SESSION_TIMEOUT_MILLIS = 60000
30+
val RETRY_WAIT_MILLIS = 5000
31+
val MAX_RECONNECT_ATTEMPTS = 3
32+
33+
def newClient(conf: SparkConf): CuratorFramework = {
34+
val ZK_URL = conf.get("spark.deploy.zookeeper.url")
35+
val zk = CuratorFrameworkFactory.newClient(ZK_URL,
36+
ZK_SESSION_TIMEOUT_MILLIS, ZK_CONNECTION_TIMEOUT_MILLIS,
37+
new ExponentialBackoffRetry(RETRY_WAIT_MILLIS, MAX_RECONNECT_ATTEMPTS))
38+
zk.start()
39+
zk
40+
}
41+
42+
def mkdir(zk: CuratorFramework, path: String) {
43+
if (zk.checkExists().forPath(path) == null) {
44+
try {
45+
zk.create().creatingParentsIfNeeded().forPath(path)
46+
} catch {
47+
case nodeExist: KeeperException.NodeExistsException =>
48+
// do nothing, ignore node existing exception.
49+
case e: Exception => throw e
50+
}
51+
}
52+
}
53+
}

core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala

Lines changed: 0 additions & 205 deletions
This file was deleted.

0 commit comments

Comments
 (0)