32
32
import tempfile
33
33
import time
34
34
import urllib2
35
+ import warnings
35
36
from optparse import OptionParser
36
37
from sys import stderr
37
38
import boto
@@ -60,11 +61,9 @@ def parse_args():
60
61
parser .add_option (
61
62
"-s" , "--slaves" , type = "int" , default = 1 ,
62
63
help = "Number of slaves to launch (default: %default)" )
63
- # NOTE: For strict "API" compatibility, we should probably leave this in
64
- # and just mark it as deprecated / not used.
65
- # parser.add_option(
66
- # "-w", "--wait", type="int", default=120,
67
- # help="Seconds to wait for nodes to start (default: %default)")
64
+ parser .add_option (
65
+ "-w" , "--wait" , type = "int" ,
66
+ help = "DEPRECATED - Seconds to wait for nodes to start" )
68
67
parser .add_option (
69
68
"-k" , "--key-pair" ,
70
69
help = "Key pair to use on instances" )
@@ -194,18 +193,6 @@ def get_or_make_group(conn, name):
194
193
return conn .create_security_group (name , "Spark EC2 group" )
195
194
196
195
197
- # Wait for a set of launched instances to exit the "pending" state
198
- # (i.e. either to start running or to fail and be terminated)
199
- def wait_for_instances (conn , instances ):
200
- while True :
201
- for i in instances :
202
- i .update ()
203
- if len ([i for i in instances if i .state == 'pending' ]) > 0 :
204
- time .sleep (5 )
205
- else :
206
- return
207
-
208
-
209
196
# Check whether a given EC2 instance object is in a state we consider active,
210
197
# i.e. not terminating or terminated. We count both stopping and stopped as
211
198
# active since we can restart stopped clusters.
@@ -610,16 +597,6 @@ def setup_spark_cluster(master, opts):
610
597
print "Ganglia started at http://%s:5080/ganglia" % master
611
598
612
599
613
- # Wait for a whole cluster (masters, slaves and ZooKeeper) to start up
614
- def wait_for_cluster (conn , wait_secs , master_nodes , slave_nodes ):
615
- print "Waiting for instances to start up..."
616
- time .sleep (5 )
617
- wait_for_instances (conn , master_nodes )
618
- wait_for_instances (conn , slave_nodes )
619
- print "Waiting %d more seconds..." % wait_secs
620
- time .sleep (wait_secs )
621
-
622
-
623
600
def is_ssh_available (host , opts ):
624
601
"Checks if SSH is available on the host."
625
602
try :
@@ -633,10 +610,7 @@ def is_ssh_available(host, opts):
633
610
stdout = devnull ,
634
611
stderr = devnull
635
612
)
636
- if ret == 0 :
637
- return True
638
- else :
639
- return False
613
+ return ret == 0
640
614
except subprocess .CalledProcessError as e :
641
615
return False
642
616
@@ -653,23 +627,24 @@ def wait_for_cluster_state(cluster_instances, cluster_state, opts):
653
627
"Waiting for all instances in cluster to enter '{s}' state." .format (s = cluster_state )
654
628
)
655
629
sys .stdout .flush ()
630
+
656
631
while True :
657
632
for i in cluster_instances :
658
633
s = i .update () # capture output to suppress print to screen in newer versions of boto
659
634
# print "{instance}: {state}".format(instance=i.id, state=i.state)
660
635
if cluster_state == 'ssh-ready' :
661
636
if all (i .state == 'running' for i in cluster_instances ) and \
662
637
all (is_ssh_available (host = i .ip_address , opts = opts ) for i in cluster_instances ):
663
- print "" # so that next line of output starts on new line
664
- return
638
+ break
665
639
else :
666
640
if all (i .state == cluster_state for i in cluster_instances ):
667
- print "" # so that next line of output starts on new line
668
- return
641
+ break
669
642
sys .stdout .write ("." )
670
643
sys .stdout .flush ()
671
644
time .sleep (3 )
672
645
646
+ print "" # so that next line of output starts on new line
647
+
673
648
674
649
# Get number of local disks available for a given EC2 instance type.
675
650
def get_num_disks (instance_type ):
@@ -903,6 +878,16 @@ def real_main():
903
878
(opts , action , cluster_name ) = parse_args ()
904
879
905
880
# Input parameter validation
881
+ if opts .wait is not None :
882
+ # NOTE: DeprecationWarnings are silent in 2.7+ by default.
883
+ # To show them, run Python with the -Wdefault switch.
884
+ # See: https://docs.python.org/3.5/whatsnew/2.7.html
885
+ warnings .warn (
886
+ "This option is deprecated and has no effect. "
887
+ "spark-ec2 automatically waits as long as necessary for clusters to startup." ,
888
+ DeprecationWarning
889
+ )
890
+
906
891
if opts .ebs_vol_num > 8 :
907
892
print >> stderr , "ebs-vol-num cannot be greater than 8"
908
893
sys .exit (1 )
@@ -925,9 +910,8 @@ def real_main():
925
910
(master_nodes , slave_nodes ) = get_existing_cluster (conn , opts , cluster_name )
926
911
else :
927
912
(master_nodes , slave_nodes ) = launch_cluster (conn , opts , cluster_name )
928
- # wait_for_cluster(conn, opts.wait, master_nodes, slave_nodes)
929
- # NOTE: This next line means if we have a terminally broken cluster, (e.gl for a resume)
930
- # we'll keep waiting until the user exits.
913
+ # NOTE: This next line means if we have a terminally broken cluster,
914
+ # (e.g during a --resume) we'll keep waiting until the user exits.
931
915
wait_for_cluster_state (
932
916
cluster_instances = (master_nodes + slave_nodes ),
933
917
cluster_state = 'ssh-ready' ,
@@ -1051,7 +1035,6 @@ def real_main():
1051
1035
for inst in master_nodes :
1052
1036
if inst .state not in ["shutting-down" , "terminated" ]:
1053
1037
inst .start ()
1054
- # wait_for_cluster(conn, opts.wait, master_nodes, slave_nodes)
1055
1038
wait_for_cluster_state (
1056
1039
cluster_instances = (master_nodes + slave_nodes ),
1057
1040
cluster_state = 'ssh-ready' ,
0 commit comments