Skip to content

[cherry-pick] Fix group rewards for POCA, add warning for non-POCA trainers #5120

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions ml-agents/mlagents/trainers/poca/trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,15 @@ def _update_policy(self):
self._clear_update_buffer()
return True

def end_episode(self) -> None:
"""
A signal that the Episode has ended. The buffer must be reset.
Get only called when the academy resets. For POCA, we should
also zero out the group rewards.
"""
super().end_episode()
self.collected_group_rewards.clear()

def create_torch_policy(
self, parsed_behavior_id: BehaviorIdentifiers, behavior_spec: BehaviorSpec
) -> TorchPolicy:
Expand Down
3 changes: 3 additions & 0 deletions ml-agents/mlagents/trainers/ppo/trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ def _process_trajectory(self, trajectory: Trajectory) -> None:
agent_id = trajectory.agent_id # All the agents should have the same ID

agent_buffer_trajectory = trajectory.to_agentbuffer()
# Check if we used group rewards, warn if so.
self._warn_if_group_reward(agent_buffer_trajectory)

# Update the normalization
if self.is_training:
self.policy.update_normalization(agent_buffer_trajectory)
Expand Down
2 changes: 2 additions & 0 deletions ml-agents/mlagents/trainers/sac/trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ def _process_trajectory(self, trajectory: Trajectory) -> None:
agent_id = trajectory.agent_id # All the agents should have the same ID

agent_buffer_trajectory = trajectory.to_agentbuffer()
# Check if we used group rewards, warn if so.
self._warn_if_group_reward(agent_buffer_trajectory)

# Update the normalization
if self.is_training:
Expand Down
15 changes: 11 additions & 4 deletions ml-agents/mlagents/trainers/tests/mock_brain.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ def make_fake_trajectory(
max_step_complete: bool = False,
memory_size: int = 10,
num_other_agents_in_group: int = 0,
group_reward: float = 0.0,
is_terminal: bool = True,
) -> Trajectory:
"""
Makes a fake trajectory of length length. If max_step_complete,
Expand Down Expand Up @@ -134,24 +136,29 @@ def make_fake_trajectory(
interrupted=max_step,
memory=memory,
group_status=group_status,
group_reward=0,
group_reward=group_reward,
)
steps_list.append(experience)
obs = []
for obs_spec in observation_specs:
obs.append(np.ones(obs_spec.shape, dtype=np.float32))
last_group_status = []
for _ in range(num_other_agents_in_group):
last_group_status.append(
AgentStatus(obs, reward, action, not max_step_complete and is_terminal)
)
last_experience = AgentExperience(
obs=obs,
reward=reward,
done=not max_step_complete,
done=not max_step_complete and is_terminal,
action=action,
action_probs=action_probs,
action_mask=action_mask,
prev_action=prev_action,
interrupted=max_step_complete,
memory=memory,
group_status=group_status,
group_reward=0,
group_status=last_group_status,
group_reward=group_reward,
)
steps_list.append(last_experience)
return Trajectory(
Expand Down
62 changes: 55 additions & 7 deletions ml-agents/mlagents/trainers/tests/torch/test_poca.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
from mlagents.trainers.behavior_id_utils import BehaviorIdentifiers
import pytest

import numpy as np
import attr

# Import to avoid circular import
from mlagents.trainers.trainer.trainer_factory import TrainerFactory # noqa F401

from mlagents.trainers.poca.optimizer_torch import TorchPOCAOptimizer
from mlagents.trainers.poca.trainer import POCATrainer
from mlagents.trainers.settings import RewardSignalSettings, RewardSignalType

from mlagents.trainers.policy.torch_policy import TorchPolicy
Expand All @@ -12,19 +17,21 @@
from mlagents.trainers.tests.test_trajectory import make_fake_trajectory
from mlagents.trainers.settings import NetworkSettings
from mlagents.trainers.tests.dummy_config import ( # noqa: F401
ppo_dummy_config,
create_observation_specs_with_shapes,
poca_dummy_config,
curiosity_dummy_config,
gail_dummy_config,
)
from mlagents.trainers.agent_processor import AgentManagerQueue
from mlagents.trainers.settings import TrainerSettings

from mlagents_envs.base_env import ActionSpec
from mlagents_envs.base_env import ActionSpec, BehaviorSpec
from mlagents.trainers.buffer import BufferKey, RewardSignalUtil


@pytest.fixture
def dummy_config():
# poca has the same hyperparameters as ppo for now
return ppo_dummy_config()
return poca_dummy_config()


VECTOR_ACTION_SPACE = 2
Expand Down Expand Up @@ -188,7 +195,7 @@ def test_poca_get_value_estimates(dummy_config, rnn, visual, discrete):
@pytest.mark.parametrize("visual", [True, False], ids=["visual", "vector"])
@pytest.mark.parametrize("rnn", [True, False], ids=["rnn", "no_rnn"])
# We need to test this separately from test_reward_signals.py to ensure no interactions
def test_ppo_optimizer_update_curiosity(
def test_poca_optimizer_update_curiosity(
dummy_config, curiosity_dummy_config, rnn, visual, discrete # noqa: F811
):
# Test evaluate
Expand Down Expand Up @@ -230,10 +237,10 @@ def test_ppo_optimizer_update_curiosity(


# We need to test this separately from test_reward_signals.py to ensure no interactions
def test_ppo_optimizer_update_gail(gail_dummy_config, dummy_config): # noqa: F811
def test_poca_optimizer_update_gail(gail_dummy_config, dummy_config): # noqa: F811
# Test evaluate
dummy_config.reward_signals = gail_dummy_config
config = ppo_dummy_config()
config = poca_dummy_config()
optimizer = create_test_poca_optimizer(
config, use_rnn=False, use_discrete=False, use_visual=False
)
Expand Down Expand Up @@ -286,5 +293,46 @@ def test_ppo_optimizer_update_gail(gail_dummy_config, dummy_config): # noqa: F8
)


def test_poca_end_episode():
name_behavior_id = "test_trainer"
trainer = POCATrainer(
name_behavior_id,
10,
TrainerSettings(max_steps=100, checkpoint_interval=10, summary_freq=20),
True,
False,
0,
"mock_model_path",
)
behavior_spec = BehaviorSpec(
create_observation_specs_with_shapes([(1,)]), ActionSpec.create_discrete((2,))
)
parsed_behavior_id = BehaviorIdentifiers.from_name_behavior_id(name_behavior_id)
mock_policy = trainer.create_policy(parsed_behavior_id, behavior_spec)
trainer.add_policy(parsed_behavior_id, mock_policy)
trajectory_queue = AgentManagerQueue("testbrain")
policy_queue = AgentManagerQueue("testbrain")
trainer.subscribe_trajectory_queue(trajectory_queue)
trainer.publish_policy_queue(policy_queue)
time_horizon = 10
trajectory = mb.make_fake_trajectory(
length=time_horizon,
observation_specs=behavior_spec.observation_specs,
max_step_complete=False,
action_spec=behavior_spec.action_spec,
num_other_agents_in_group=2,
group_reward=1.0,
is_terminal=False,
)
trajectory_queue.put(trajectory)
trainer.advance()
# Test that some trajectoories have been injested
for reward in trainer.collected_group_rewards.values():
assert reward == 10
# Test end episode
trainer.end_episode()
assert len(trainer.collected_group_rewards.keys()) == 0


if __name__ == "__main__":
pytest.main()
16 changes: 15 additions & 1 deletion ml-agents/mlagents/trainers/trainer/rl_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import abc
import time
import attr
import numpy as np
from mlagents_envs.side_channel.stats_side_channel import StatsAggregationMethod

from mlagents.trainers.policy.checkpoint_manager import (
Expand All @@ -13,7 +14,7 @@
from mlagents_envs.logging_util import get_logger
from mlagents_envs.timers import timed
from mlagents.trainers.optimizer import Optimizer
from mlagents.trainers.buffer import AgentBuffer
from mlagents.trainers.buffer import AgentBuffer, BufferKey
from mlagents.trainers.trainer import Trainer
from mlagents.trainers.torch.components.reward_providers.base_reward_provider import (
BaseRewardProvider,
Expand Down Expand Up @@ -58,6 +59,7 @@ def __init__(self, *args, **kwargs):
self.model_saver = self.create_model_saver(
self.trainer_settings, self.artifact_path, self.load
)
self._has_warned_group_rewards = False

def end_episode(self) -> None:
"""
Expand Down Expand Up @@ -256,6 +258,18 @@ def _maybe_save_model(self, step_after_process: int) -> None:
if step_after_process >= self._next_save_step and self.get_step != 0:
self._checkpoint()

def _warn_if_group_reward(self, buffer: AgentBuffer) -> None:
"""
Warn if the trainer receives a Group Reward but isn't a multiagent trainer (e.g. POCA).
"""
if not self._has_warned_group_rewards:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there are no group_rewards at all, this will be checked at every process trajectory. I wonder if we should only check for the first n trajectories only (Although this might have issues)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we could set n to a large number (like 1000) and it should cover most of the cases. It wouldn't catch situations where the group reward is sparse and doesn't occur until halfway through the training session. But then again, in that case is the warning actually useful? 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a = np.ones(10240)

%timeit np.sum(np.abs(a)) # 11.8 us
%timeit np.any(a) # 7.51 us
%timeit np.count_nonzero(a) # 23.2 us

It looks like if we go with np.any() we can speed this up a bit. For a trajectory of 1000 elements (the longest we have) each check costs us about 3.5 us. Count_nonzero is also great for shorter arrays (<2000-ish elements) but doesn't scale well.

if not np.any(buffer[BufferKey.GROUP_REWARD]):
logger.warning(
"An agent recieved a Group Reward, but you are not using a multi-agent trainer. "
"Please use the POCA trainer for best results."
)
self._has_warned_group_rewards = True

def advance(self) -> None:
"""
Steps the trainer, taking in trajectories and updates if ready.
Expand Down