Skip to content

Update controller selection #22

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions tactical-microgrid-standard/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ target_link_libraries(Controller PRIVATE Commands_Idl PowerSim_Idl)
add_executable(CLI
cli/main.cpp
cli/CLIClient.cpp
cli/ActiveMicrogridControllerStateDataReaderListenerImpl.cpp
)
target_include_directories(CLI PUBLIC ${CMAKE_CURRENT_SOURCE_DIR})
target_link_libraries(CLI PRIVATE Commands_Idl PowerSim_Idl)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#include "ActiveMicrogridControllerStateDataReaderListenerImpl.h"

void ActiveMicrogridControllerStateDataReaderListenerImpl::on_data_available(DDS::DataReader_ptr reader)
{
tms::ActiveMicrogridControllerStateSeq data;
DDS::SampleInfoSeq info_seq;
tms::ActiveMicrogridControllerStateDataReader_var typed_reader = tms::ActiveMicrogridControllerStateDataReader::_narrow(reader);
DDS::ReturnCode_t rc = typed_reader->take(data, info_seq, DDS::LENGTH_UNLIMITED,
DDS::ANY_SAMPLE_STATE, DDS::ANY_VIEW_STATE, DDS::ANY_INSTANCE_STATE);
if (rc != DDS::RETCODE_OK) {
ACE_ERROR((LM_WARNING, "(%P|%t) WARNING: ActiveMicrogridControllerStateDataReaderListenerImpl::on_data_available: "
"take data failed: %C\n", OpenDDS::DCPS::retcode_to_string(rc)));
return;
}

for (CORBA::ULong i = 0; i < data.length(); ++i) {
if (info_seq[i].valid_data) {
const tms::Identity& device_id = data[i].deviceId();
auto master_id = data[i].masterId();
cli_client_.set_active_controller(device_id, master_id);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#ifndef CLI_ACTIVE_MICROGRID_CONTROLLER_STATE_DATA_READER_LISTENER_IMPL_H
#define CLI_ACTIVE_MICROGRID_CONTROLLER_STATE_DATA_READER_LISTENER_IMPL_H

#include "common/DataReaderListenerBase.h"
#include "CLIClient.h"

class ActiveMicrogridControllerStateDataReaderListenerImpl : public DataReaderListenerBase {
public:
explicit ActiveMicrogridControllerStateDataReaderListenerImpl(CLIClient& cli_client)
: DataReaderListenerBase("tms::ActiveMicrogridControllerState - DataReaderListenerImpl")
, cli_client_(cli_client) {}

virtual ~ActiveMicrogridControllerStateDataReaderListenerImpl() = default;

void on_data_available(DDS::DataReader_ptr reader) final;

private:
CLIClient& cli_client_;
};

#endif
73 changes: 70 additions & 3 deletions tactical-microgrid-standard/cli/CLIClient.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "CLIClient.h"
#include "common/QosHelper.h"
#include "common/Utils.h"
#include "ActiveMicrogridControllerStateDataReaderListenerImpl.h"

#include <dds/DCPS/PublisherImpl.h>
#include <dds/DCPS/SubscriberImpl.h>
Expand All @@ -9,6 +10,7 @@

#include <cctype>
#include <thread>
#include <iomanip>

CLIClient::CLIClient(const tms::Identity& id)
: handshaking_(id)
Expand Down Expand Up @@ -90,6 +92,46 @@ DDS::ReturnCode_t CLIClient::init_tms(DDS::DomainId_t domain_id, int argc, char*
return DDS::RETCODE_ERROR;
}

// Subscribe to the tms::ActiveMicrogridControllerState topic
tms::ActiveMicrogridControllerStateTypeSupport_var amcs_ts = new tms::ActiveMicrogridControllerStateTypeSupportImpl;
if (DDS::RETCODE_OK != amcs_ts->register_type(dp, "")) {
ACE_ERROR((LM_ERROR, "(%P|%t) CLIClient::init: register_type ActiveMicrogridControllerState failed\n"));
return DDS::RETCODE_ERROR;
}

CORBA::String_var amcs_type_name = amcs_ts->get_type_name();
DDS::Topic_var amcs_topic = dp->create_topic(tms::topic::TOPIC_ACTIVE_MICROGRID_CONTROLLER_STATE.c_str(),
amcs_type_name,
TOPIC_QOS_DEFAULT,
nullptr,
::OpenDDS::DCPS::DEFAULT_STATUS_MASK);
if (!amcs_topic) {
ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: CLIClient::init: create_topic \"%C\" failed\n",
tms::topic::TOPIC_ACTIVE_MICROGRID_CONTROLLER_STATE.c_str()));
return DDS::RETCODE_ERROR;
}

const DDS::SubscriberQos tms_sub_qos = Qos::Subscriber::get_qos();
DDS::Subscriber_var tms_sub = dp->create_subscriber(tms_sub_qos,
nullptr,
::OpenDDS::DCPS::DEFAULT_STATUS_MASK);
if (!tms_sub) {
ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: CLIClient::init: create_subscriber with TMS QoS failed\n"));
return DDS::RETCODE_ERROR;
}

const DDS::DataReaderQos& amcs_dr_qos = Qos::DataReader::fn_map.at(tms::topic::TOPIC_ACTIVE_MICROGRID_CONTROLLER_STATE)(device_id);
DDS::DataReaderListener_var amcs_listener(new ActiveMicrogridControllerStateDataReaderListenerImpl(*this));
DDS::DataReader_var amcs_dr_base = tms_sub->create_datareader(amcs_topic,
amcs_dr_qos,
amcs_listener,
::OpenDDS::DCPS::DEFAULT_STATUS_MASK);
if (!amcs_dr_base) {
ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: CLIClient::init: create_datareader for topic \"%C\" failed\n",
tms::topic::TOPIC_ACTIVE_MICROGRID_CONTROLLER_STATE.c_str()));
return DDS::RETCODE_ERROR;
}

return DDS::RETCODE_OK;
}

Expand Down Expand Up @@ -328,6 +370,18 @@ void CLIClient::run()
thr.join();
}

void CLIClient::set_active_controller(const tms::Identity& device_id,
const OPENDDS_OPTIONAL_NS::optional<tms::Identity>& master_id)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One option here would be to require that we build with std::optional, so the macro wouldn't be needed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already require C++17, so we should have it.

{
std::lock_guard<std::mutex> guard(active_controllers_m_);
if (master_id.has_value()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

active_controllers_[device_id] = master_id.value();
} else {
// The device has lost its active controller or hasn't selected one yet.
active_controllers_[device_id] = "";
}
}

void CLIClient::tolower(std::string& s) const
{
for (size_t i = 0; i < s.size(); ++i) {
Expand Down Expand Up @@ -415,9 +469,22 @@ void CLIClient::display_power_devices() const
std::cout << "Number of Connected Power Devices: " << power_devices_.size() << std::endl;
size_t i = 1;
for (auto it = power_devices_.begin(); it != power_devices_.end(); ++it) {
std::cout << i << ". Device Id: " << it->first <<
". Type: " << Utils::device_role_to_string(it->second.device_info().role()) <<
". Energy Level: " << energy_level_to_string(it->second.essl()) << std::endl;
std::string selected_controller;
{
std::lock_guard<std::mutex> guard(active_controllers_m_);
auto ac_it = active_controllers_.find(it->first);
if (ac_it != active_controllers_.end()) {
selected_controller = "\"" + ac_it->second + "\"";
} else {
selected_controller = "\"Undetermined\"";
}
}
const std::string formated_id = "\"" + it->first + "\"";
std::cout << std::setfill(' ') << std::setw(3) << i++
<< ". Id: " << std::left << std::setw(15) << formated_id
<< "| Type: " << std::left << std::setw(18) << Utils::device_role_to_string(it->second.device_info().role())
<< "| Energy Level: " << std::left << std::setw(15) << energy_level_to_string(it->second.essl())
<< "| Active Controller: " << std::left << selected_controller << std::endl;
}
}

Expand Down
13 changes: 11 additions & 2 deletions tactical-microgrid-standard/cli/CLIClient.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#ifndef CONTROLLER_CLI_CLIENT_H
#define CONTROLLER_CLI_CLIENT_H
#ifndef CLI_CLI_CLIENT_H
#define CLI_CLI_CLIENT_H

#include "common/Handshaking.h"
#include "controller/Common.h"
Expand Down Expand Up @@ -36,8 +36,11 @@ class CLIClient : public TimerHandler<UnavailableController> {
~CLIClient() {}

DDS::ReturnCode_t init(DDS::DomainId_t domain_id, int argc = 0, char* argv[] = nullptr);

void run();

void set_active_controller(const tms::Identity& device_id, const OPENDDS_OPTIONAL_NS::optional<tms::Identity>& master_id);

private:
// Initialize DDS entities in the TMS domain
DDS::ReturnCode_t init_tms(DDS::DomainId_t tms_domain_id, int argc = 0, char* argv[] = nullptr);
Expand Down Expand Up @@ -127,6 +130,12 @@ class CLIClient : public TimerHandler<UnavailableController> {

// The current microgrid controller with which the CLI client is interacting
tms::Identity curr_controller_;

mutable std::mutex active_controllers_m_;

// Active controller selected by each power device (power device => its controller).
// Can be used to check that all power devices will eventually select the same active controller.
std::map<tms::Identity, tms::Identity> active_controllers_;
};

#endif
107 changes: 91 additions & 16 deletions tactical-microgrid-standard/common/ControllerSelector.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,36 @@
#include "ControllerSelector.h"

#include <ace/Timer_Hash.h>
#include <ace/Timer_Heap.h>
#include <ace/Select_Reactor.h>

// If caller passes a non-null reactor, use it unmodified.
// Otherwise, create another reactor for this class separated from the one for Handshaking.
ControllerSelector::ControllerSelector(const tms::Identity& device_id, ACE_Reactor* reactor)
: TimerHandler(reactor)
, device_id_(device_id)
{
if (!reactor) {
reactor_ = new ACE_Reactor;
Comment on lines +10 to +14
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the TimerHandler constructor (in the future) does anything with its parameter besides just assigning to reactor_ then this could lead to an inconsistency.

One option is to make the TimerHandler (or some other class) take charge of managing the reactor and timer queue, along with the own_reactor_ bit, so it's done in one place.


// We had an issue with using ACE_Reactor's default timer queue, which is
// ACE_Timer_Heap, when the rate of timer creation and cancellation is high
// for detecting missed heartbeat deadline from microgrid controllers.
// ACE_Timer_Hash seems working okay.
timer_queue_ = new ACE_Timer_Hash;
reactor_->timer_queue(timer_queue_);
own_reactor_ = true;
}
}

ControllerSelector::~ControllerSelector()
{
if (own_reactor_) {
delete timer_queue_;
delete reactor_;
}
}

void ControllerSelector::got_heartbeat(const tms::Heartbeat& hb)
{
Guard g(lock_);
Expand All @@ -10,15 +41,15 @@ void ControllerSelector::got_heartbeat(const tms::Heartbeat& hb)

if (selected_.empty()) {
if (!this->get_timer<NewController>()->active()) {
schedule_once(NewController{hb.deviceId()}, new_controller_delay);
schedule_once(NewController{hb.deviceId()}, new_active_controller_delay);
}
} else if (is_selected(hb.deviceId())) {
cancel<LostController>();
if (this->get_timer<MissedController>()->active()) {
reschedule<MissedController>();
if (this->get_timer<MissedHeartbeat>()->active()) {
reschedule<MissedHeartbeat>();
} else {
// MissedController was triggered, so we need to schedule it again.
schedule_once(MissedController{}, missed_controller_delay);
// MissedHeartbeat was triggered, so we need to schedule it again.
schedule_once(MissedHeartbeat{}, heartbeat_deadline);
}
}
}
Expand All @@ -39,19 +70,49 @@ void ControllerSelector::got_device_info(const tms::DeviceInfo& di)
void ControllerSelector::timer_fired(Timer<NewController>& timer)
{
Guard g(lock_);
const auto& id = timer.arg.id;
const auto& mc_id = timer.arg.id;
ACE_DEBUG((LM_INFO, "(%P|%t) INFO: ControllerSelector::timed_event(NewController): "
"\"%C\" -> \"%C\"\n", selected_.c_str(), id.c_str()));
select(id);
"\"%C\" -> \"%C\"\n", selected_.c_str(), mc_id.c_str()));

// The TMS spec isn't clear to whether the device needs to verify that the last
// heartbeat of this controller was received less than 3s (i.e., heartbeat deadline) ago.
// This check makes sense since if its last heartbeat was more than 3s ago, that means
// the controller is not available and should not be selected as the active controller.
const TimePoint now = Clock::now();
auto it = all_controllers_.find(mc_id);
if (it == all_controllers_.end()) {
ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: ControllerSelector::timed_event(NewController): Controller \"%C\" not found!\n",
mc_id.c_str()));
return;
}

if (now - it->second < heartbeat_deadline) {
selected_ = mc_id;
send_controller_state();
}
}

void ControllerSelector::timer_fired(Timer<MissedController>&)
void ControllerSelector::timer_fired(Timer<MissedHeartbeat>& timer)
{
Guard g(lock_);
ACE_DEBUG((LM_INFO, "(%P|%t) INFO: ControllerSelector::timed_event(MissedController): "
"\"%C\"\n", selected_.c_str()));
schedule_once(LostController{}, lost_controller_delay);
schedule_once(NoControllers{}, no_controllers_delay);
const auto& timer_id = timer.id;
ACE_DEBUG((LM_INFO, "(%P|%t) INFO: ControllerSelector::timed_event(MissedHeartbeat): "
"\"%C\". Timer id: %d\n", selected_.c_str(), timer_id));
schedule_once(LostController{}, lost_active_controller_delay);

// Start a No MC timer if the device has missed heartbeats from all MCs
const TimePoint now = Clock::now();
bool no_avail_mc = true;
for (const auto& pair : all_controllers_) {
if (now - pair.second < heartbeat_deadline) {
no_avail_mc = false;
break;
}
}

if (no_avail_mc) {
schedule_once(NoControllers{}, no_controllers_delay);
}
}

void ControllerSelector::timer_fired(Timer<LostController>&)
Expand All @@ -66,7 +127,7 @@ void ControllerSelector::timer_fired(Timer<LostController>&)
const TimePoint now = Clock::now();
for (auto it = all_controllers_.begin(); it != all_controllers_.end(); ++it) {
const auto last_hb = now - it->second;
if (last_hb < missed_controller_delay) {
if (last_hb < heartbeat_deadline) {
select(it->first, std::chrono::duration_cast<Sec>(last_hb));
break;
}
Expand All @@ -84,6 +145,20 @@ void ControllerSelector::select(const tms::Identity& id, Sec last_hb)
{
ACE_DEBUG((LM_INFO, "(%P|%t) INFO: ControllerSelector::select: \"%C\"\n", id.c_str()));
selected_ = id;
schedule_once(MissedController{}, missed_controller_delay - last_hb);
// TODO: Send ActiveMicrogridControllerState
send_controller_state();
schedule_once(MissedHeartbeat{}, heartbeat_deadline - last_hb);
}

void ControllerSelector::send_controller_state()
{
tms::ActiveMicrogridControllerState amcs;
amcs.deviceId() = device_id_;
if (!selected_.empty()) {
amcs.masterId() = selected_;
}

const DDS::ReturnCode_t rc = amcs_dw_->write(amcs, DDS::HANDLE_NIL);
if (rc != DDS::RETCODE_OK) {
ACE_ERROR((LM_WARNING, "(%P|%t) WARNING: ControllerSelector::send_controller_state: write ActiveMicrogridControllerState failed\n"));
}
}
Loading