Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 67 additions & 131 deletions poetry.lock

Large diffs are not rendered by default.

28 changes: 28 additions & 0 deletions protocol/innpv.proto
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ message FromServer {
BreakdownResponse breakdown = 8;

HabitatResponse habitat = 9;
EnergyResponse energy = 10;
}

// Deprecated messages
Expand All @@ -97,6 +98,33 @@ message HabitatResponse {
repeated HabitatDevicePrediction predictions = 1;
}

// Energy messages
// =======================================

message EnergyResponse {
float total_consumption = 1;
repeated EnergyConsumptionComponent components = 2;

// A list of past energy measurements
repeated EnergyResponse past_measurements = 3;
}

// Reports the energy consumption of one system component (e.g. CPU+DRAM or GPU)
enum EnergyConsumptionComponentType {
ENERGY_UNSPECIFIED = 0;
ENERGY_CPU_DRAM = 1;
ENERGY_NVIDIA = 2;
}

message EnergyConsumptionComponent {
EnergyConsumptionComponentType component_type = 1;
float consumption_joules = 2;
}

// Records past experiments



// =======================================

message InitializeResponse {
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ numpy = "^1.22"
torch = "*"
nvidia-ml-py3 = "*"
toml = "^0.10.2"
pyRAPL = "^0.2.3"

[tool.poetry.dev-dependencies]

Expand Down
31 changes: 28 additions & 3 deletions skyline/analysis/request_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def _handle_analysis_request(self, analysis_request, context):

# Abort early if the connection has been closed
if not context.state.connected:
logger.debug(
logger.error(
'Aborting request %d from (%s:%d) early '
'because the client has disconnected.',
context.sequence_number,
Expand All @@ -75,7 +75,7 @@ def _handle_analysis_request(self, analysis_request, context):
)

if not context.state.connected:
logger.debug(
logger.error(
'Aborting request %d from (%s:%d) early '
'because the client has disconnected.',
context.sequence_number,
Expand All @@ -92,7 +92,7 @@ def _handle_analysis_request(self, analysis_request, context):

# send habitat response
if not context.state.connected:
logger.debug(
logger.error(
'Aborting request %d from (%s:%d) early '
'because the client has disconnected.',
context.sequence_number,
Expand All @@ -107,6 +107,23 @@ def _handle_analysis_request(self, analysis_request, context):
context,
)

# send energy response
if not context.state.connected:
logger.error(
'Aborting request %d from (%s:%d) early '
'because the client has disconnected.',
context.sequence_number,
*(context.address),
)
return

energy_resp = next(analyzer)
self._enqueue_response(
self._send_energy_response,
energy_resp,
context,
)

elapsed_time = time.perf_counter() - start_time
logger.debug(
'Processed analysis request %d from (%s:%d) in %.4f seconds.',
Expand Down Expand Up @@ -174,3 +191,11 @@ def _send_habitat_response(self, habitat_resp, context):
except:
logger.exception(
'Exception occurred when sending a habitat response.')

def _send_energy_response(self, energy_resp, context):
# Called from the main executor. Do not call directly!
try:
self._message_sender.send_energy_response(energy_resp, context)
except:
logger.exception(
'Exception occurred when sending an energy response.')
3 changes: 3 additions & 0 deletions skyline/analysis/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ def analyze_project(project_root, entry_point, nvml):
print("analyze_project: running habitat_predict()")
yield session.habitat_predict()

print("analyze_project: running energy_compute()")
yield session.energy_compute()


def main():
# This is used for development and debugging purposes
Expand Down
33 changes: 33 additions & 0 deletions skyline/analysis/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import skyline.protocol_gen.innpv_pb2 as pm
from skyline.analysis.static import StaticAnalyzer
from skyline.energy.measurer import EnergyMeasurer
from skyline.exceptions import AnalysisError, exceptions_as_analysis_errors
from skyline.profiler.iteration import IterationProfiler
from skyline.tracking.tracker import Tracker
Expand Down Expand Up @@ -131,6 +132,38 @@ def new_from(cls, project_root, entry_point):
StaticAnalyzer(entry_point_code, entry_point_ast),
)

def energy_compute(self) -> pm.EnergyResponse:
energy_measurer = EnergyMeasurer()

model = self._model_provider()
inputs = self._input_provider()
iteration = self._iteration_provider(model)
resp = pm.EnergyResponse()
try:
energy_measurer.begin_measurement()
iterations = 20
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can declare this variable outside the try block

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reason that it should be outside the try block? I'm not sure I understand why

Copy link

@ssaini4 ssaini4 Jan 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it will be easier to read if all the variables are declared at the beginning of the function. At least the ones that can be

for _ in range(iterations):
iteration(*inputs)
energy_measurer.end_measurement()

resp.total_consumption = energy_measurer.total_energy()/float(iterations)

cpu_component = pm.EnergyConsumptionComponent()
cpu_component.component_type = pm.ENERGY_CPU_DRAM
cpu_component.consumption_joules = energy_measurer.cpu_energy()/float(iterations)

gpu_component = pm.EnergyConsumptionComponent()
gpu_component.component_type = pm.ENERGY_NVIDIA
gpu_component.consumption_joules = energy_measurer.gpu_energy()/float(iterations)

resp.components.extend([cpu_component, gpu_component])

#TODO save each response to a database and get the past energy measurements
except PermissionError as err:
# Remind user to set their CPU permissions
print(err)
return resp

def habitat_compute_threshold(self, runnable, context):
tracker = habitat.OperationTracker(context.origin_device)
with tracker.track():
Expand Down
Empty file added skyline/energy/__init__.py
Empty file.
117 changes: 117 additions & 0 deletions skyline/energy/measurer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import time
from threading import Thread
import numpy as np

import pynvml as N
from pyRAPL import Sensor

class CPUMeasurer:
def __init__(self, interval):
self.interval = interval
self.power = []
self.last_cpu = None
self.last_dram = None

def measurer_init(self):
self.sensor = Sensor()
energy = self.sensor.energy()
self.last_cpu = np.array(energy[0::2])
self.last_dram = np.array(energy[1::2])

def measurer_measure(self):
# Get energy consumed so far (since last CPU reset)
energy = self.sensor.energy()
cpu = np.array(energy[0::2])
dram = np.array(energy[1::2])

# Compare against last measurement to determine energy since last measure
diff_cpu = cpu - self.last_cpu
diff_dram = dram - self.last_dram

# 1J = 10^6 uJ
# The cpu used this much since the last measurement
# We have mW = 1000*J/s = 1000*(uJ/10^6)/s
cpu_total = np.sum(diff_cpu)
cpu_mW = 1000 * (cpu_total / 1e6) / self.interval
self.power.append(cpu_mW)

self.last_cpu = cpu
self.last_dram = dram

def measurer_deallocate(self):
pass

def total_energy(self):
# J = W * s, 1W = 1000 mW
energy = self.interval * sum(self.power) / 1000.0
return energy

class GPUMeasurer:
def __init__(self, interval):
self.interval = interval
self.power = []

def measurer_init(self):
N.nvmlInit()
self.device_handle = N.nvmlDeviceGetHandleByIndex(0)

def measurer_measure(self):
power = N.nvmlDeviceGetPowerUsage(self.device_handle)
self.power.append(power)

def measurer_deallocate(self):
N.nvmlShutdown()

def total_energy(self):
# J = W * s, 1W = 1000 mW
energy = self.interval * sum(self.power) / 1000.0
return energy

class EnergyMeasurer:
def __init__(self):
self.sleep_interval = 0.1
self.measuring = False
self.measure_thread = None

self.measurers = {
"cpu": CPUMeasurer(self.sleep_interval),
"gpu": GPUMeasurer(self.sleep_interval),
}

def run_measure(self):
# Initialize
for m in self.measurers:
self.measurers[m].measurer_init()

# Run measurement loop
while self.measuring:
for m in self.measurers:
self.measurers[m].measurer_measure()
time.sleep(self.sleep_interval)

# Cleanup
for m in self.measurers:
self.measurers[m].measurer_deallocate()

def begin_measurement(self):
assert(self.measure_thread is None)
self.measure_thread = Thread(target=self.run_measure)
self.measuring = True
self.measure_thread.start()

def end_measurement(self):
self.measuring = False
self.measure_thread.join()
self.measure_thread = None

def total_energy(self):
total_energy = 0.
for m in self.measurers:
total_energy += self.measurers[m].total_energy()
return total_energy

def cpu_energy(self):
return self.measurers["cpu"].total_energy()

def gpu_energy(self):
return self.measurers["gpu"].total_energy()
3 changes: 3 additions & 0 deletions skyline/protocol/message_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ def send_throughput_response(self, throughput, context):

def send_habitat_response(self, habitat_resp, context):
self._send_message(habitat_resp, 'habitat', context)

def send_energy_response(self, energy_resp, context):
self._send_message(energy_resp, 'energy', context)

def _send_message(self, message, payload_name, context):
try:
Expand Down
Loading