Skip to content

Commit a48ea3c

Browse files
mengxrAndrew Or
authored andcommitted
[Spark-4509] Revert EC2 tag-based cluster membership patch
This PR reverts changes related to tag-based cluster membership. As discussed in SPARK-3332, we didn't figure out a safe strategy to use tags to determine cluster membership, because tagging is not atomic. The following changes are reverted: SPARK-2333: 94053a7 SPARK-3213: 7faf755 SPARK-3608: 78d4220. I tested launch, login, and destroy. It is easy to check the diff by comparing it to Josh's patch for branch-1.1: https://github.com/apache/spark/pull/2225/files JoshRosen I sent the PR to master. It might be easier for us to keep master and branch-1.2 the same at this time. We can always re-apply the patch once we figure out a stable solution. Author: Xiangrui Meng <[email protected]> Closes apache#3453 from mengxr/SPARK-4509 and squashes the following commits: f0b708b [Xiangrui Meng] revert 94053a7 4298ea5 [Xiangrui Meng] revert 7faf755 35963a1 [Xiangrui Meng] Revert "SPARK-3608 Break if the instance tag naming succeeds" (cherry picked from commit 7eba0fb) Signed-off-by: Andrew Or <[email protected]>
1 parent 93b914d commit a48ea3c

File tree

2 files changed

+28
-69
lines changed

2 files changed

+28
-69
lines changed

docs/ec2-scripts.md

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,14 @@ on the [Amazon Web Services site](http://aws.amazon.com/).
1212

1313
`spark-ec2` is designed to manage multiple named clusters. You can
1414
launch a new cluster (telling the script its size and giving it a name),
15-
shutdown an existing cluster, or log into a cluster. Each cluster
16-
launches a set of instances, which are tagged with the cluster name,
17-
and placed into EC2 security groups. If you don't specify a security
18-
group, the `spark-ec2` script will create security groups based on the
19-
cluster name you request. For example, a cluster named
15+
shutdown an existing cluster, or log into a cluster. Each cluster is
16+
identified by placing its machines into EC2 security groups whose names
17+
are derived from the name of the cluster. For example, a cluster named
2018
`test` will contain a master node in a security group called
2119
`test-master`, and a number of slave nodes in a security group called
22-
`test-slaves`. You can also specify a security group prefix to be used
23-
in place of the cluster name. Machines in a cluster can be identified
24-
by looking for the "Name" tag of the instance in the Amazon EC2 Console.
20+
`test-slaves`. The `spark-ec2` script will create these security groups
21+
for you based on the cluster name you request. You can also use them to
22+
identify machines belonging to each cluster in the Amazon EC2 Console.
2523

2624

2725
# Before You Start

ec2/spark_ec2.py

Lines changed: 22 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ def parse_args():
138138
help="The SSH user you want to connect as (default: %default)")
139139
parser.add_option(
140140
"--delete-groups", action="store_true", default=False,
141-
help="When destroying a cluster, delete the security groups that were created.")
141+
help="When destroying a cluster, delete the security groups that were created")
142142
parser.add_option(
143143
"--use-existing-master", action="store_true", default=False,
144144
help="Launch fresh slaves, but use an existing stopped master if possible")
@@ -152,9 +152,6 @@ def parse_args():
152152
parser.add_option(
153153
"--user-data", type="string", default="",
154154
help="Path to a user-data file (most AMI's interpret this as an initialization script)")
155-
parser.add_option(
156-
"--security-group-prefix", type="string", default=None,
157-
help="Use this prefix for the security group rather than the cluster name.")
158155
parser.add_option(
159156
"--authorized-address", type="string", default="0.0.0.0/0",
160157
help="Address to authorize on created security groups (default: %default)")
@@ -305,12 +302,8 @@ def launch_cluster(conn, opts, cluster_name):
305302
user_data_content = user_data_file.read()
306303

307304
print "Setting up security groups..."
308-
if opts.security_group_prefix is None:
309-
master_group = get_or_make_group(conn, cluster_name + "-master")
310-
slave_group = get_or_make_group(conn, cluster_name + "-slaves")
311-
else:
312-
master_group = get_or_make_group(conn, opts.security_group_prefix + "-master")
313-
slave_group = get_or_make_group(conn, opts.security_group_prefix + "-slaves")
305+
master_group = get_or_make_group(conn, cluster_name + "-master")
306+
slave_group = get_or_make_group(conn, cluster_name + "-slaves")
314307
authorized_address = opts.authorized_address
315308
if master_group.rules == []: # Group was just now created
316309
master_group.authorize(src_group=master_group)
@@ -335,11 +328,12 @@ def launch_cluster(conn, opts, cluster_name):
335328
slave_group.authorize('tcp', 60060, 60060, authorized_address)
336329
slave_group.authorize('tcp', 60075, 60075, authorized_address)
337330

338-
# Check if instances are already running with the cluster name
331+
# Check if instances are already running in our groups
339332
existing_masters, existing_slaves = get_existing_cluster(conn, opts, cluster_name,
340333
die_on_error=False)
341334
if existing_slaves or (existing_masters and not opts.use_existing_master):
342-
print >> stderr, ("ERROR: There are already instances for name: %s " % cluster_name)
335+
print >> stderr, ("ERROR: There are already instances running in " +
336+
"group %s or %s" % (master_group.name, slave_group.name))
343337
sys.exit(1)
344338

345339
# Figure out Spark AMI
@@ -413,13 +407,9 @@ def launch_cluster(conn, opts, cluster_name):
413407
for r in reqs:
414408
id_to_req[r.id] = r
415409
active_instance_ids = []
416-
outstanding_request_ids = []
417410
for i in my_req_ids:
418-
if i in id_to_req:
419-
if id_to_req[i].state == "active":
420-
active_instance_ids.append(id_to_req[i].instance_id)
421-
else:
422-
outstanding_request_ids.append(i)
411+
if i in id_to_req and id_to_req[i].state == "active":
412+
active_instance_ids.append(id_to_req[i].instance_id)
423413
if len(active_instance_ids) == opts.slaves:
424414
print "All %d slaves granted" % opts.slaves
425415
reservations = conn.get_all_instances(active_instance_ids)
@@ -428,8 +418,8 @@ def launch_cluster(conn, opts, cluster_name):
428418
slave_nodes += r.instances
429419
break
430420
else:
431-
print "%d of %d slaves granted, waiting longer for request ids including %s" % (
432-
len(active_instance_ids), opts.slaves, outstanding_request_ids[0:10])
421+
print "%d of %d slaves granted, waiting longer" % (
422+
len(active_instance_ids), opts.slaves)
433423
except:
434424
print "Canceling spot instance requests"
435425
conn.cancel_spot_instance_requests(my_req_ids)
@@ -488,72 +478,47 @@ def launch_cluster(conn, opts, cluster_name):
488478

489479
# Give the instances descriptive names
490480
for master in master_nodes:
491-
name = '{cn}-master-{iid}'.format(cn=cluster_name, iid=master.id)
492-
tag_instance(master, name)
493-
481+
master.add_tag(
482+
key='Name',
483+
value='{cn}-master-{iid}'.format(cn=cluster_name, iid=master.id))
494484
for slave in slave_nodes:
495-
name = '{cn}-slave-{iid}'.format(cn=cluster_name, iid=slave.id)
496-
tag_instance(slave, name)
485+
slave.add_tag(
486+
key='Name',
487+
value='{cn}-slave-{iid}'.format(cn=cluster_name, iid=slave.id))
497488

498489
# Return all the instances
499490
return (master_nodes, slave_nodes)
500491

501492

502-
def tag_instance(instance, name):
503-
for i in range(0, 5):
504-
try:
505-
instance.add_tag(key='Name', value=name)
506-
break
507-
except:
508-
print "Failed attempt %i of 5 to tag %s" % ((i + 1), name)
509-
if i == 5:
510-
raise "Error - failed max attempts to add name tag"
511-
time.sleep(5)
512-
513493
# Get the EC2 instances in an existing cluster if available.
514494
# Returns a tuple of lists of EC2 instance objects for the masters and slaves
515495

516496

517497
def get_existing_cluster(conn, opts, cluster_name, die_on_error=True):
518498
print "Searching for existing cluster " + cluster_name + "..."
519-
# Search all the spot instance requests, and copy any tags from the spot
520-
# instance request to the cluster.
521-
spot_instance_requests = conn.get_all_spot_instance_requests()
522-
for req in spot_instance_requests:
523-
if req.state != u'active':
524-
continue
525-
name = req.tags.get(u'Name', "")
526-
if name.startswith(cluster_name):
527-
reservations = conn.get_all_instances(instance_ids=[req.instance_id])
528-
for res in reservations:
529-
active = [i for i in res.instances if is_active(i)]
530-
for instance in active:
531-
if instance.tags.get(u'Name') is None:
532-
tag_instance(instance, name)
533-
# Now proceed to detect master and slaves instances.
534499
reservations = conn.get_all_instances()
535500
master_nodes = []
536501
slave_nodes = []
537502
for res in reservations:
538503
active = [i for i in res.instances if is_active(i)]
539504
for inst in active:
540-
name = inst.tags.get(u'Name', "")
541-
if name.startswith(cluster_name + "-master"):
505+
group_names = [g.name for g in inst.groups]
506+
if group_names == [cluster_name + "-master"]:
542507
master_nodes.append(inst)
543-
elif name.startswith(cluster_name + "-slave"):
508+
elif group_names == [cluster_name + "-slaves"]:
544509
slave_nodes.append(inst)
545510
if any((master_nodes, slave_nodes)):
546511
print "Found %d master(s), %d slaves" % (len(master_nodes), len(slave_nodes))
547512
if master_nodes != [] or not die_on_error:
548513
return (master_nodes, slave_nodes)
549514
else:
550515
if master_nodes == [] and slave_nodes != []:
551-
print >> sys.stderr, "ERROR: Could not find master in with name " + \
552-
cluster_name + "-master"
516+
print >> sys.stderr, "ERROR: Could not find master in group " + cluster_name + "-master"
553517
else:
554518
print >> sys.stderr, "ERROR: Could not find any existing cluster"
555519
sys.exit(1)
556520

521+
557522
# Deploy configuration files and run setup scripts on a newly launched
558523
# or started EC2 cluster.
559524

@@ -984,11 +949,7 @@ def real_main():
984949
# Delete security groups as well
985950
if opts.delete_groups:
986951
print "Deleting security groups (this will take some time)..."
987-
if opts.security_group_prefix is None:
988-
group_names = [cluster_name + "-master", cluster_name + "-slaves"]
989-
else:
990-
group_names = [opts.security_group_prefix + "-master",
991-
opts.security_group_prefix + "-slaves"]
952+
group_names = [cluster_name + "-master", cluster_name + "-slaves"]
992953
wait_for_cluster_state(
993954
cluster_instances=(master_nodes + slave_nodes),
994955
cluster_state='terminated',

0 commit comments

Comments
 (0)