diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index e8ca422c49855..63b87ae35b242 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -37,7 +37,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.metrics2.MetricsSource; -import org.apache.hadoop.metrics2.MetricsSystem; import org.apache.hadoop.metrics2.impl.MetricsSystemImpl; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -58,7 +57,6 @@ import org.apache.spark.network.util.TransportConf; import org.apache.spark.network.yarn.util.HadoopConfigProvider; - /** * An external shuffle service used by Spark on Yarn. * @@ -116,6 +114,7 @@ public class YarnShuffleService extends AuxiliaryService { // The actual server that serves shuffle files private TransportServer shuffleServer = null; + private Configuration _conf = null; // The recovery path used to shuffle service recovery @@ -172,15 +171,15 @@ protected void serviceInit(Configuration conf) throws Exception { blockHandler = new ExternalShuffleBlockHandler(transportConf, registeredExecutorFile); // register metrics on the block handler into the Node Manager's metrics system. - YarnShuffleServiceMetrics serviceMetrics = new YarnShuffleServiceMetrics(blockHandler.getAllMetrics()); - MetricsSystem defaultMetricsSystem = DefaultMetricsSystem.instance(); try { - MetricsSystemImpl metricsSystem = (MetricsSystemImpl) defaultMetricsSystem; + YarnShuffleServiceMetrics serviceMetrics = new YarnShuffleServiceMetrics( + blockHandler.getAllMetrics()); + MetricsSystemImpl metricsSystem = (MetricsSystemImpl) DefaultMetricsSystem.instance(); Method registerSourceMethod = metricsSystem.getClass().getDeclaredMethod("registerSource", String.class, String.class, MetricsSource.class); registerSourceMethod.setAccessible(true); - registerSourceMethod.invoke(metricsSystem, "shuffleservice", "Metrics on the Spark " + + registerSourceMethod.invoke(metricsSystem, "shuffleService", "Metrics on the Spark " + "Shuffle Service", serviceMetrics); logger.info("Registered metrics with Hadoop's DefaultMetricsSystem"); } catch (Exception e) { diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java index 4c104b47357bf..86cb07ae711ac 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java @@ -17,11 +17,8 @@ package org.apache.spark.network.yarn; -import com.codahale.metrics.Gauge; -import com.codahale.metrics.Meter; -import com.codahale.metrics.Metric; -import com.codahale.metrics.MetricSet; -import com.codahale.metrics.Timer; +import com.codahale.metrics.*; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.metrics2.MetricsCollector; import org.apache.hadoop.metrics2.MetricsInfo; import org.apache.hadoop.metrics2.MetricsRecordBuilder; @@ -34,83 +31,93 @@ */ public class YarnShuffleServiceMetrics implements MetricsSource { - private final MetricSet metricSet; + private final MetricSet metricSet; - public YarnShuffleServiceMetrics(MetricSet metricSet) { - this.metricSet = metricSet; - } + public YarnShuffleServiceMetrics(MetricSet metricSet) { + this.metricSet = metricSet; + } - /** - * Get metrics from the source - * - * @param collector to contain the resulting metrics snapshot - * @param all if true, return all metrics even if unchanged. - */ - @Override - public void getMetrics(MetricsCollector collector, boolean all) { - MetricsRecordBuilder metricsRecordBuilder = collector.addRecord("shuffleService"); + /** + * Get metrics from the source + * + * @param collector to contain the resulting metrics snapshot + * @param all if true, return all metrics even if unchanged. + */ + @Override + public void getMetrics(MetricsCollector collector, boolean all) { + MetricsRecordBuilder metricsRecordBuilder = collector.addRecord("shuffleService"); + + for (Map.Entry entry : metricSet.getMetrics().entrySet()) { + collectMetric(metricsRecordBuilder, entry.getKey(), entry.getValue()); + } + } - for (Map.Entry entry : metricSet.getMetrics().entrySet()) { - String name = entry.getKey(); + @VisibleForTesting + public static void collectMetric(MetricsRecordBuilder metricsRecordBuilder, String name, Metric metric) { - // The metric types used in ExternalShuffleBlockHandler.ShuffleMetrics - if (entry.getValue() instanceof Timer) { - Timer t = (Timer) entry.getValue(); - metricsRecordBuilder - .addCounter(new ShuffleServiceMetricsInfo(name + "_count", "Count of timer " + name), - t.getCount()) - .addGauge(new ShuffleServiceMetricsInfo(name + "_rate15", "15 minute rate of timer " + name), - t.getFifteenMinuteRate()) - .addGauge(new ShuffleServiceMetricsInfo(name + "_rate5", "5 minute rate of timer " + name), - t.getFiveMinuteRate()) - .addGauge(new ShuffleServiceMetricsInfo(name + "_rate1", "1 minute rate of timer " + name), - t.getOneMinuteRate()) - .addGauge(new ShuffleServiceMetricsInfo(name + "_rateMean", "Mean rate of timer " + name), - t.getMeanRate()); - } else if (entry.getValue() instanceof Meter) { - Meter m = (Meter) entry.getValue(); - metricsRecordBuilder - .addCounter(new ShuffleServiceMetricsInfo(name + "_count", "Count of meter " + name), - m.getCount()) - .addGauge(new ShuffleServiceMetricsInfo(name + "_rate15", "15 minute rate of meter " + name), - m.getFifteenMinuteRate()) - .addGauge(new ShuffleServiceMetricsInfo(name + "_rate5", "5 minute rate of meter " + name), - m.getFiveMinuteRate()) - .addGauge(new ShuffleServiceMetricsInfo(name + "_rate1", "1 minute rate of meter " + name), - m.getOneMinuteRate()) - .addGauge(new ShuffleServiceMetricsInfo(name + "_rateMean", "Mean rate of meter " + name), - m.getMeanRate()); - } else if (entry.getValue() instanceof Gauge) { - Gauge m = (Gauge) entry.getValue(); - Object gaugeValue = m.getValue(); - if (gaugeValue instanceof Integer) { - Integer intValue = (Integer) gaugeValue; - metricsRecordBuilder - .addGauge(new ShuffleServiceMetricsInfo(name, "Integer value of " + - "gauge " + name), intValue.intValue()); - } - } - } + // The metric types used in ExternalShuffleBlockHandler.ShuffleMetrics + if (metric instanceof Timer) { + Timer t = (Timer) metric; + metricsRecordBuilder + .addCounter(new ShuffleServiceMetricsInfo(name + "_count", "Count of timer " + name), + t.getCount()) + .addGauge( + new ShuffleServiceMetricsInfo(name + "_rate15", "15 minute rate of timer " + name), + t.getFifteenMinuteRate()) + .addGauge( + new ShuffleServiceMetricsInfo(name + "_rate5", "5 minute rate of timer " + name), + t.getFiveMinuteRate()) + .addGauge( + new ShuffleServiceMetricsInfo(name + "_rate1", "1 minute rate of timer " + name), + t.getOneMinuteRate()) + .addGauge(new ShuffleServiceMetricsInfo(name + "_rateMean", "Mean rate of timer " + name), + t.getMeanRate()); + } else if (metric instanceof Meter) { + Meter m = (Meter) metric; + metricsRecordBuilder + .addCounter(new ShuffleServiceMetricsInfo(name + "_count", "Count of meter " + name), + m.getCount()) + .addGauge( + new ShuffleServiceMetricsInfo(name + "_rate15", "15 minute rate of meter " + name), + m.getFifteenMinuteRate()) + .addGauge( + new ShuffleServiceMetricsInfo(name + "_rate5", "5 minute rate of meter " + name), + m.getFiveMinuteRate()) + .addGauge( + new ShuffleServiceMetricsInfo(name + "_rate1", "1 minute rate of meter " + name), + m.getOneMinuteRate()) + .addGauge(new ShuffleServiceMetricsInfo(name + "_rateMean", "Mean rate of meter " + name), + m.getMeanRate()); + } else if (metric instanceof Gauge) { + Gauge m = (Gauge) metric; + Object gaugeValue = m.getValue(); + if (gaugeValue instanceof Integer) { + Integer intValue = (Integer) gaugeValue; + metricsRecordBuilder + .addGauge(new ShuffleServiceMetricsInfo(name, "Integer value of " + + "gauge " + name), intValue.intValue()); + } } + } - private static class ShuffleServiceMetricsInfo implements MetricsInfo { + private static class ShuffleServiceMetricsInfo implements MetricsInfo { - private final String name; - private final String description; + private final String name; + private final String description; - ShuffleServiceMetricsInfo(String name, String description) { - this.name = name; - this.description = description; - } + ShuffleServiceMetricsInfo(String name, String description) { + this.name = name; + this.description = description; + } - @Override - public String name() { - return name; - } + @Override + public String name() { + return name; + } - @Override - public String description() { - return description; - } + @Override + public String description() { + return description; } + } } diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala new file mode 100644 index 0000000000000..1330ceebf5087 --- /dev/null +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.network.yarn + +import org.apache.hadoop.metrics2.MetricsRecordBuilder +import org.mockito.Matchers._ +import org.mockito.Mockito.{mock, times, verify, when} +import org.scalatest.Matchers +import scala.collection.JavaConverters._ + +import org.apache.spark.network.server.OneForOneStreamManager +import org.apache.spark.network.shuffle.{ExternalShuffleBlockHandler, ExternalShuffleBlockResolver} +import org.apache.spark.SparkFunSuite + +class YarnShuffleServiceMetricsSuite extends SparkFunSuite with Matchers { + + val streamManager = mock(classOf[OneForOneStreamManager]) + val blockResolver = mock(classOf[ExternalShuffleBlockResolver]) + when(blockResolver.getRegisteredExecutorsSize).thenReturn(42) + + val metrics = new ExternalShuffleBlockHandler(streamManager, blockResolver).getAllMetrics + + test("metrics named as expected") { + val allMetrics = Set( + "openBlockRequestLatencyMillis", "registerExecutorRequestLatencyMillis", + "blockTransferRateBytes", "registeredExecutorsSize") + + metrics.getMetrics.keySet().asScala should be (allMetrics) + } + + // these three metrics have the same effect on the collector + for (testname <- Seq("openBlockRequestLatencyMillis", + "registerExecutorRequestLatencyMillis", + "blockTransferRateBytes")) { + test(s"$testname - collector receives correct types") { + val builder = mock(classOf[MetricsRecordBuilder]) + when(builder.addCounter(any(), anyLong())).thenReturn(builder) + when(builder.addGauge(any(), anyDouble())).thenReturn(builder) + + YarnShuffleServiceMetrics.collectMetric(builder, testname, + metrics.getMetrics.get(testname)) + + verify(builder).addCounter(anyObject(), anyLong()) + verify(builder, times(4)).addGauge(anyObject(), anyDouble()) + } + } + + // this metric writes only one gauge to the collector + test("registeredExecutorsSize - collector receives correct types") { + val builder = mock(classOf[MetricsRecordBuilder]) + when(builder.addCounter(any(), anyLong())).thenReturn(builder) + when(builder.addGauge(any(), anyDouble())).thenReturn(builder) + + YarnShuffleServiceMetrics.collectMetric(builder, "registeredExecutorsSize", + metrics.getMetrics.get("registeredExecutorsSize")) + + // only one + verify(builder).addGauge(anyObject(), anyInt()) + } +}