diff --git a/config/sac_trainer_config.yaml b/config/sac_trainer_config.yaml index 612b7f5675..7b353b63f8 100644 --- a/config/sac_trainer_config.yaml +++ b/config/sac_trainer_config.yaml @@ -7,7 +7,7 @@ default: init_entcoef: 1.0 learning_rate: 3.0e-4 learning_rate_schedule: constant - max_steps: 5.0e4 + max_steps: 5.0e5 memory_size: 256 normalize: false num_update: 1 @@ -15,7 +15,7 @@ default: num_layers: 2 time_horizon: 64 sequence_length: 64 - summary_freq: 1000 + summary_freq: 10000 tau: 0.005 use_recurrent: false vis_encode_type: simple @@ -28,65 +28,65 @@ FoodCollector: normalize: false batch_size: 256 buffer_size: 500000 - max_steps: 1.0e5 + max_steps: 2.0e6 init_entcoef: 0.05 train_interval: 1 Bouncer: normalize: true - max_steps: 5.0e5 + max_steps: 2.0e7 num_layers: 2 hidden_units: 64 - summary_freq: 1000 + summary_freq: 20000 PushBlock: - max_steps: 5.0e4 + max_steps: 1.5e7 init_entcoef: 0.05 hidden_units: 256 - summary_freq: 2000 + summary_freq: 60000 time_horizon: 64 num_layers: 2 SmallWallJump: - max_steps: 1.0e6 + max_steps: 3e7 hidden_units: 256 - summary_freq: 2000 + summary_freq: 20000 time_horizon: 128 init_entcoef: 0.1 num_layers: 2 normalize: false BigWallJump: - max_steps: 1.0e6 + max_steps: 3e7 hidden_units: 256 - summary_freq: 2000 + summary_freq: 20000 time_horizon: 128 num_layers: 2 init_entcoef: 0.1 normalize: false Striker: - max_steps: 5.0e5 + max_steps: 5.0e6 learning_rate: 1e-3 hidden_units: 256 - summary_freq: 2000 + summary_freq: 20000 time_horizon: 128 init_entcoef: 0.1 num_layers: 2 normalize: false Goalie: - max_steps: 5.0e5 + max_steps: 5.0e6 learning_rate: 1e-3 hidden_units: 256 - summary_freq: 2000 + summary_freq: 20000 time_horizon: 128 init_entcoef: 0.1 num_layers: 2 normalize: false Pyramids: - summary_freq: 2000 + summary_freq: 30000 time_horizon: 128 batch_size: 128 buffer_init_steps: 10000 @@ -94,7 +94,7 @@ Pyramids: hidden_units: 256 num_layers: 2 init_entcoef: 0.01 - max_steps: 5.0e5 + max_steps: 1.0e7 sequence_length: 16 tau: 0.01 use_recurrent: false @@ -115,7 +115,7 @@ VisualPyramids: hidden_units: 256 buffer_init_steps: 1000 num_layers: 1 - max_steps: 5.0e5 + max_steps: 1.0e7 buffer_size: 500000 init_entcoef: 0.01 tau: 0.01 @@ -134,7 +134,7 @@ VisualPyramids: normalize: true batch_size: 64 buffer_size: 12000 - summary_freq: 1000 + summary_freq: 12000 time_horizon: 1000 hidden_units: 64 init_entcoef: 0.5 @@ -142,13 +142,13 @@ VisualPyramids: 3DBallHard: normalize: true batch_size: 256 - summary_freq: 1000 + summary_freq: 12000 time_horizon: 1000 Tennis: buffer_size: 500000 normalize: true - max_steps: 2e5 + max_steps: 4e6 CrawlerStatic: normalize: true @@ -157,8 +157,8 @@ CrawlerStatic: train_interval: 2 buffer_size: 500000 buffer_init_steps: 2000 - max_steps: 5e5 - summary_freq: 3000 + max_steps: 5e6 + summary_freq: 30000 init_entcoef: 1.0 num_layers: 3 hidden_units: 512 @@ -172,10 +172,10 @@ CrawlerDynamic: time_horizon: 1000 batch_size: 256 buffer_size: 500000 - summary_freq: 3000 + summary_freq: 30000 train_interval: 2 num_layers: 3 - max_steps: 1e6 + max_steps: 1e7 hidden_units: 512 reward_signals: extrinsic: @@ -187,8 +187,8 @@ Walker: time_horizon: 1000 batch_size: 256 buffer_size: 500000 - max_steps: 2e6 - summary_freq: 3000 + max_steps: 2e7 + summary_freq: 30000 num_layers: 4 train_interval: 2 hidden_units: 512 @@ -202,8 +202,8 @@ Reacher: time_horizon: 1000 batch_size: 128 buffer_size: 500000 - max_steps: 2e5 - summary_freq: 3000 + max_steps: 2e7 + summary_freq: 60000 Hallway: sequence_length: 32 @@ -211,7 +211,7 @@ Hallway: hidden_units: 128 memory_size: 256 init_entcoef: 0.1 - max_steps: 5.0e5 + max_steps: 1.0e7 summary_freq: 1000 time_horizon: 64 use_recurrent: true @@ -223,8 +223,7 @@ VisualHallway: memory_size: 256 gamma: 0.99 batch_size: 64 - max_steps: 5.0e5 - summary_freq: 1000 + max_steps: 1.0e7 time_horizon: 64 use_recurrent: true @@ -237,8 +236,8 @@ VisualPushBlock: gamma: 0.99 buffer_size: 1024 batch_size: 64 - max_steps: 5.0e5 - summary_freq: 1000 + max_steps: 3.0e6 + summary_freq: 60000 time_horizon: 64 GridWorld: @@ -249,8 +248,8 @@ GridWorld: init_entcoef: 0.5 buffer_init_steps: 1000 buffer_size: 50000 - max_steps: 50000 - summary_freq: 2000 + max_steps: 500000 + summary_freq: 20000 time_horizon: 5 reward_signals: extrinsic: diff --git a/config/trainer_config.yaml b/config/trainer_config.yaml index db2cbab016..cd0bafcd75 100644 --- a/config/trainer_config.yaml +++ b/config/trainer_config.yaml @@ -8,14 +8,14 @@ default: lambd: 0.95 learning_rate: 3.0e-4 learning_rate_schedule: linear - max_steps: 5.0e4 + max_steps: 5.0e5 memory_size: 256 normalize: false num_epoch: 3 num_layers: 2 time_horizon: 64 sequence_length: 64 - summary_freq: 1000 + summary_freq: 10000 use_recurrent: false vis_encode_type: simple reward_signals: @@ -28,81 +28,81 @@ FoodCollector: beta: 5.0e-3 batch_size: 1024 buffer_size: 10240 - max_steps: 1.0e5 + max_steps: 2.0e6 Bouncer: normalize: true - max_steps: 1.0e6 + max_steps: 2.0e7 num_layers: 2 hidden_units: 64 PushBlock: - max_steps: 5.0e4 + max_steps: 1.5e7 batch_size: 128 buffer_size: 2048 beta: 1.0e-2 hidden_units: 256 - summary_freq: 2000 + summary_freq: 60000 time_horizon: 64 num_layers: 2 SmallWallJump: - max_steps: 1.0e6 + max_steps: 3e7 batch_size: 128 buffer_size: 2048 beta: 5.0e-3 hidden_units: 256 - summary_freq: 2000 + summary_freq: 20000 time_horizon: 128 num_layers: 2 normalize: false BigWallJump: - max_steps: 1.0e6 + max_steps: 3e7 batch_size: 128 buffer_size: 2048 beta: 5.0e-3 hidden_units: 256 - summary_freq: 2000 + summary_freq: 20000 time_horizon: 128 num_layers: 2 normalize: false Striker: - max_steps: 5.0e5 + max_steps: 5.0e6 learning_rate: 1e-3 batch_size: 128 num_epoch: 3 buffer_size: 2000 beta: 1.0e-2 hidden_units: 256 - summary_freq: 2000 + summary_freq: 20000 time_horizon: 128 num_layers: 2 normalize: false Goalie: - max_steps: 5.0e5 + max_steps: 5.0e6 learning_rate: 1e-3 batch_size: 320 num_epoch: 3 buffer_size: 2000 beta: 1.0e-2 hidden_units: 256 - summary_freq: 2000 + summary_freq: 20000 time_horizon: 128 num_layers: 2 normalize: false Pyramids: - summary_freq: 2000 + summary_freq: 30000 time_horizon: 128 batch_size: 128 buffer_size: 2048 hidden_units: 512 num_layers: 2 beta: 1.0e-2 - max_steps: 5.0e5 + max_steps: 1.0e7 num_epoch: 3 reward_signals: extrinsic: @@ -120,7 +120,7 @@ VisualPyramids: hidden_units: 256 num_layers: 1 beta: 1.0e-2 - max_steps: 5.0e5 + max_steps: 1.0e7 num_epoch: 3 reward_signals: extrinsic: @@ -135,7 +135,7 @@ VisualPyramids: normalize: true batch_size: 64 buffer_size: 12000 - summary_freq: 1000 + summary_freq: 12000 time_horizon: 1000 lambd: 0.99 beta: 0.001 @@ -144,7 +144,7 @@ VisualPyramids: normalize: true batch_size: 1200 buffer_size: 12000 - summary_freq: 1000 + summary_freq: 12000 time_horizon: 1000 max_steps: 5.0e5 beta: 0.001 @@ -155,7 +155,7 @@ VisualPyramids: Tennis: normalize: true - max_steps: 2e5 + max_steps: 4e6 CrawlerStatic: normalize: true @@ -163,8 +163,8 @@ CrawlerStatic: time_horizon: 1000 batch_size: 2024 buffer_size: 20240 - max_steps: 1e6 - summary_freq: 3000 + max_steps: 1e7 + summary_freq: 30000 num_layers: 3 hidden_units: 512 reward_signals: @@ -178,8 +178,8 @@ CrawlerDynamic: time_horizon: 1000 batch_size: 2024 buffer_size: 20240 - max_steps: 1e6 - summary_freq: 3000 + max_steps: 1e7 + summary_freq: 30000 num_layers: 3 hidden_units: 512 reward_signals: @@ -193,8 +193,8 @@ Walker: time_horizon: 1000 batch_size: 2048 buffer_size: 20480 - max_steps: 2e6 - summary_freq: 3000 + max_steps: 2e7 + summary_freq: 30000 num_layers: 3 hidden_units: 512 reward_signals: @@ -208,8 +208,8 @@ Reacher: time_horizon: 1000 batch_size: 2024 buffer_size: 20240 - max_steps: 1e6 - summary_freq: 3000 + max_steps: 2e7 + summary_freq: 60000 reward_signals: extrinsic: strength: 1.0 @@ -225,8 +225,8 @@ Hallway: num_epoch: 3 buffer_size: 1024 batch_size: 128 - max_steps: 5.0e5 - summary_freq: 1000 + max_steps: 1.0e7 + summary_freq: 10000 time_horizon: 64 VisualHallway: @@ -239,8 +239,8 @@ VisualHallway: num_epoch: 3 buffer_size: 1024 batch_size: 64 - max_steps: 5.0e5 - summary_freq: 1000 + max_steps: 1.0e7 + summary_freq: 10000 time_horizon: 64 VisualPushBlock: @@ -253,8 +253,8 @@ VisualPushBlock: num_epoch: 3 buffer_size: 1024 batch_size: 64 - max_steps: 5.0e5 - summary_freq: 1000 + max_steps: 3.0e6 + summary_freq: 60000 time_horizon: 64 GridWorld: @@ -264,8 +264,8 @@ GridWorld: hidden_units: 256 beta: 5.0e-3 buffer_size: 256 - max_steps: 50000 - summary_freq: 2000 + max_steps: 500000 + summary_freq: 20000 time_horizon: 5 reward_signals: extrinsic: diff --git a/docs/Migrating.md b/docs/Migrating.md index 3f22cc7a53..7a5da4b6bd 100644 --- a/docs/Migrating.md +++ b/docs/Migrating.md @@ -7,7 +7,15 @@ The versions can be found in # Migrating -## Migrating from 0.12 to latest +## Migrating from 0.13 to latest + +### Important changes +* Trainer steps are now counted per-Agent, not per-environment as in previous versions. For instance, if you have 10 Agents in the scene, 20 environment steps now corresponds to 200 steps as printed in the terminal and in Tensorboard. + +### Steps to Migrate +* Multiply `max_steps` and `summary_steps` in your `trainer_config.yaml` by the number of Agents in the scene. + +## Migrating from ML-Agents toolkit v0.12.0 to v0.13.0 ### Important changes * The low level Python API has changed. You can look at the document [Low Level Python API documentation](Python-API.md) for more information. This should only affect you if you're writing a custom trainer; if you use `mlagents-learn` for training, this should be a transparent change. diff --git a/ml-agents/mlagents/trainers/agent_processor.py b/ml-agents/mlagents/trainers/agent_processor.py index 9db4d31a73..f82a5b5f55 100644 --- a/ml-agents/mlagents/trainers/agent_processor.py +++ b/ml-agents/mlagents/trainers/agent_processor.py @@ -1,14 +1,16 @@ import sys -from typing import List, Dict -from collections import defaultdict, Counter +from typing import List, Dict, Deque, TypeVar, Generic +from collections import defaultdict, Counter, deque -from mlagents.trainers.trainer import Trainer from mlagents.trainers.trajectory import Trajectory, AgentExperience from mlagents.trainers.brain import BrainInfo from mlagents.trainers.tf_policy import TFPolicy +from mlagents.trainers.policy import Policy from mlagents.trainers.action_info import ActionInfoOutputs from mlagents.trainers.stats import StatsReporter +T = TypeVar("T") + class AgentProcessor: """ @@ -19,7 +21,6 @@ class AgentProcessor: def __init__( self, - trainer: Trainer, policy: TFPolicy, behavior_id: str, stats_reporter: StatsReporter, @@ -43,8 +44,8 @@ def __init__( self.episode_steps: Counter = Counter() self.episode_rewards: Dict[str, float] = defaultdict(float) self.stats_reporter = stats_reporter - self.trainer = trainer self.max_trajectory_length = max_trajectory_length + self.trajectory_queues: List[AgentManagerQueue[Trajectory]] = [] self.behavior_id = behavior_id def add_experiences( @@ -137,8 +138,8 @@ def add_experiences( next_obs=next_obs, behavior_id=self.behavior_id, ) - # This will eventually be replaced with a queue - self.trainer.process_trajectory(trajectory) + for traj_queue in self.trajectory_queues: + traj_queue.put(trajectory) self.experience_buffers[agent_id] = [] if next_info.local_done[next_idx]: self.stats_reporter.add_stat( @@ -156,3 +157,71 @@ def add_experiences( self.policy.save_previous_action( curr_info.agents, take_action_outputs["action"] ) + + def publish_trajectory_queue( + self, trajectory_queue: "AgentManagerQueue[Trajectory]" + ) -> None: + """ + Adds a trajectory queue to the list of queues to publish to when this AgentProcessor + assembles a Trajectory + :param trajectory_queue: Trajectory queue to publish to. + """ + self.trajectory_queues.append(trajectory_queue) + + +class AgentManagerQueue(Generic[T]): + """ + Queue used by the AgentManager. Note that we make our own class here because in most implementations + deque is sufficient and faster. However, if we want to switch to multiprocessing, we'll need to change + out this implementation. + """ + + class Empty(Exception): + """ + Exception for when the queue is empty. + """ + + pass + + def __init__(self, behavior_id: str): + """ + Initializes an AgentManagerQueue. Note that we can give it a behavior_id so that it can be identified + separately from an AgentManager. + """ + self.queue: Deque[T] = deque() + self.behavior_id = behavior_id + + def empty(self) -> bool: + return len(self.queue) == 0 + + def get_nowait(self) -> T: + try: + return self.queue.popleft() + except IndexError: + raise self.Empty("The AgentManagerQueue is empty.") + + def put(self, item: T) -> None: + self.queue.append(item) + + +class AgentManager(AgentProcessor): + """ + An AgentManager is an AgentProcessor that also holds a single trajectory and policy queue. + Note: this leaves room for adding AgentProcessors that publish multiple trajectory queues. + """ + + def __init__( + self, + policy: TFPolicy, + behavior_id: str, + stats_reporter: StatsReporter, + max_trajectory_length: int = sys.maxsize, + ): + super().__init__(policy, behavior_id, stats_reporter, max_trajectory_length) + self.trajectory_queue: AgentManagerQueue[Trajectory] = AgentManagerQueue( + self.behavior_id + ) + self.policy_queue: AgentManagerQueue[Policy] = AgentManagerQueue( + self.behavior_id + ) + self.publish_trajectory_queue(self.trajectory_queue) diff --git a/ml-agents/mlagents/trainers/ppo/trainer.py b/ml-agents/mlagents/trainers/ppo/trainer.py index 445a7f4c0c..4cf5434f98 100644 --- a/ml-agents/mlagents/trainers/ppo/trainer.py +++ b/ml-agents/mlagents/trainers/ppo/trainer.py @@ -66,18 +66,19 @@ def __init__( "model_path", "reward_signals", ] - self.check_param_keys() + self._check_param_keys() self.load = load self.multi_gpu = multi_gpu self.seed = seed self.policy: PPOPolicy = None # type: ignore - def process_trajectory(self, trajectory: Trajectory) -> None: + def _process_trajectory(self, trajectory: Trajectory) -> None: """ Takes a trajectory and processes it, putting it into the update buffer. Processing involves calculating value and advantage targets for model updating step. :param trajectory: The Trajectory tuple containing the steps to be processed. """ + super()._process_trajectory(trajectory) agent_id = trajectory.agent_id # All the agents should have the same ID # Add to episode_steps @@ -160,7 +161,7 @@ def process_trajectory(self, trajectory: Trajectory) -> None: agent_id, self.get_policy(trajectory.behavior_id) ) - def is_ready_update(self): + def _is_ready_update(self): """ Returns whether or not the trainer has enough elements to run update model :return: A boolean corresponding to whether or not update_model() can be run @@ -168,7 +169,7 @@ def is_ready_update(self): size_of_buffer = self.update_buffer.num_experiences return size_of_buffer > self.trainer_parameters["buffer_size"] - def update_policy(self): + def _update_policy(self): """ Uses demonstration_buffer to update the policy. The reward signal generators must be updated in this method at their own pace. diff --git a/ml-agents/mlagents/trainers/rl_trainer.py b/ml-agents/mlagents/trainers/rl_trainer.py index d31353c3d9..12f0ab475c 100644 --- a/ml-agents/mlagents/trainers/rl_trainer.py +++ b/ml-agents/mlagents/trainers/rl_trainer.py @@ -13,10 +13,9 @@ RewardSignalResults = Dict[str, RewardSignalResult] -class RLTrainer(Trainer): +class RLTrainer(Trainer): # pylint: disable=abstract-method """ This class is the base class for trainers that use Reward Signals. - Contains methods for adding BrainInfos to the Buffer. """ def __init__(self, *args, **kwargs): @@ -65,7 +64,14 @@ def _update_end_episode_stats(self, agent_id: str, policy: TFPolicy) -> None: def clear_update_buffer(self) -> None: """ - Clear the buffers that have been built up during inference. If - we're not training, this should be called instead of update_policy. + Clear the buffers that have been built up during inference. """ self.update_buffer.reset_agent() + + def advance(self) -> None: + """ + Steps the trainer, taking in trajectories and updates if ready + """ + super().advance() + if not self.is_training: + self.clear_update_buffer() diff --git a/ml-agents/mlagents/trainers/sac/trainer.py b/ml-agents/mlagents/trainers/sac/trainer.py index b64a919ef1..7387fa0760 100644 --- a/ml-agents/mlagents/trainers/sac/trainer.py +++ b/ml-agents/mlagents/trainers/sac/trainer.py @@ -74,7 +74,7 @@ def __init__( "vis_encode_type", ] - self.check_param_keys() + self._check_param_keys() self.load = load self.seed = seed self.policy: SACPolicy = None # type: ignore @@ -133,10 +133,11 @@ def load_replay_buffer(self) -> None: ) ) - def process_trajectory(self, trajectory: Trajectory) -> None: + def _process_trajectory(self, trajectory: Trajectory) -> None: """ Takes a trajectory and processes it, putting it into the replay buffer. """ + super()._process_trajectory(trajectory) last_step = trajectory.steps[-1] agent_id = trajectory.agent_id # All the agents should have the same ID @@ -191,7 +192,7 @@ def process_trajectory(self, trajectory: Trajectory) -> None: agent_id, self.get_policy(trajectory.behavior_id) ) - def is_ready_update(self) -> bool: + def _is_ready_update(self) -> bool: """ Returns whether or not the trainer has enough elements to run update model :return: A boolean corresponding to whether or not update_model() can be run @@ -202,7 +203,7 @@ def is_ready_update(self) -> bool: ) @timed - def update_policy(self) -> None: + def _update_policy(self) -> None: """ If train_interval is met, update the SAC policy given the current reward signals. If reward_signal_train_interval is met, update the reward signals from the buffer. diff --git a/ml-agents/mlagents/trainers/stats.py b/ml-agents/mlagents/trainers/stats.py index 7403f775e5..fe6ca6a924 100644 --- a/ml-agents/mlagents/trainers/stats.py +++ b/ml-agents/mlagents/trainers/stats.py @@ -139,18 +139,25 @@ def add_writer(writer: StatsWriter) -> None: def add_stat(self, key: str, value: float) -> None: """ Add a float value stat to the StatsReporter. - :param category: The highest categorization of the statistic, e.g. behavior name. :param key: The type of statistic, e.g. Environment/Reward. :param value: the value of the statistic. """ StatsReporter.stats_dict[self.category][key].append(value) + def set_stat(self, key: str, value: float) -> None: + """ + Sets a stat value to a float. This is for values that we don't want to average, and just + want the latest. + :param key: The type of statistic, e.g. Environment/Reward. + :param value: the value of the statistic. + """ + StatsReporter.stats_dict[self.category][key] = [value] + def write_stats(self, step: int) -> None: """ Write out all stored statistics that fall under the category specified. The currently stored values will be averaged, written out as a single value, and the buffer cleared. - :param category: The category which to write out the stats. :param step: Training step which to write these stats as. """ values: Dict[str, StatsSummary] = {} @@ -165,7 +172,6 @@ def write_stats(self, step: int) -> None: def write_text(self, text: str, step: int) -> None: """ Write out some text. - :param category: The highest categorization of the statistic, e.g. behavior name. :param text: The text to write out. :param step: Training step which to write these stats as. """ @@ -175,7 +181,6 @@ def write_text(self, text: str, step: int) -> None: def get_stats_summaries(self, key: str) -> StatsSummary: """ Get the mean, std, and count of a particular statistic, since last write. - :param category: The highest categorization of the statistic, e.g. behavior name. :param key: The type of statistic, e.g. Environment/Reward. :returns: A StatsSummary NamedTuple containing (mean, std, count). """ diff --git a/ml-agents/mlagents/trainers/tests/test_agent_processor.py b/ml-agents/mlagents/trainers/tests/test_agent_processor.py index 52ed36499a..b443078ee4 100644 --- a/ml-agents/mlagents/trainers/tests/test_agent_processor.py +++ b/ml-agents/mlagents/trainers/tests/test_agent_processor.py @@ -2,7 +2,12 @@ import pytest import mlagents.trainers.tests.mock_brain as mb import numpy as np -from mlagents.trainers.agent_processor import AgentProcessor +from mlagents.trainers.agent_processor import ( + AgentProcessor, + AgentManager, + AgentManagerQueue, +) +from mlagents.trainers.trajectory import Trajectory from mlagents.trainers.stats import StatsReporter @@ -29,10 +34,9 @@ def create_mock_policy(): @pytest.mark.parametrize("num_vis_obs", [0, 1, 2], ids=["vec", "1 viz", "2 viz"]) def test_agentprocessor(num_vis_obs): policy = create_mock_policy() - trainer = mock.Mock() + tqueue = mock.Mock() name_behavior_id = "test_brain_name" processor = AgentProcessor( - trainer, policy, name_behavior_id, max_trajectory_length=5, @@ -51,15 +55,40 @@ def test_agentprocessor(num_vis_obs): num_vector_acts=2, num_vis_observations=num_vis_obs, ) + processor.publish_trajectory_queue(tqueue) for _ in range(5): processor.add_experiences(mock_braininfo, mock_braininfo, fake_action_outputs) # Assert that two trajectories have been added to the Trainer - assert len(trainer.process_trajectory.call_args_list) == 2 + assert len(tqueue.put.call_args_list) == 2 # Assert that the trajectory is of length 5 - trajectory = trainer.process_trajectory.call_args_list[0][0][0] + trajectory = tqueue.put.call_args_list[0][0][0] assert len(trajectory.steps) == 5 # Assert that the AgentProcessor is empty assert len(processor.experience_buffers[0]) == 0 + + +def test_agent_manager(): + policy = create_mock_policy() + name_behavior_id = "test_brain_name" + manager = AgentManager( + policy, + name_behavior_id, + max_trajectory_length=5, + stats_reporter=StatsReporter("testcat"), + ) + assert len(manager.trajectory_queues) == 1 + assert isinstance(manager.trajectory_queues[0], AgentManagerQueue) + + +def test_agent_manager_queue(): + queue = AgentManagerQueue(behavior_id="testbehavior") + trajectory = mock.Mock(spec=Trajectory) + assert queue.empty() + queue.put(trajectory) + assert not queue.empty() + queue_traj = queue.get_nowait() + assert isinstance(queue_traj, Trajectory) + assert queue.empty() diff --git a/ml-agents/mlagents/trainers/tests/test_ppo.py b/ml-agents/mlagents/trainers/tests/test_ppo.py index e269cb03a8..4f24ca3b28 100644 --- a/ml-agents/mlagents/trainers/tests/test_ppo.py +++ b/ml-agents/mlagents/trainers/tests/test_ppo.py @@ -12,6 +12,7 @@ from mlagents.trainers.models import EncoderType, LearningModel from mlagents.trainers.trainer import UnityTrainerException from mlagents.trainers.brain import BrainParameters, CameraResolution +from mlagents.trainers.agent_processor import AgentManagerQueue from mlagents_envs.environment import UnityEnvironment from mlagents_envs.mock_communicator import MockCommunicator from mlagents.trainers.tests import mock_brain as mb @@ -332,15 +333,14 @@ def test_trainer_increment_step(dummy_config): trainer = PPOTrainer( brain_params.brain_name, 0, trainer_params, True, False, 0, "0", False ) - policy_mock = mock.Mock() + policy_mock = mock.Mock(spec=PPOPolicy) step_count = ( 5 - ) # 10 hacked becausee this function is no longer called through trainer + ) # 10 hacked because this function is no longer called through trainer policy_mock.increment_step = mock.Mock(return_value=step_count) - trainer.policy = policy_mock + trainer.add_policy("testbehavior", policy_mock) - trainer.increment_step(5) - print(trainer.policy.increment_step(5)) + trainer._increment_step(5, "testbehavior") policy_mock.increment_step.assert_called_with(5) assert trainer.step == step_count @@ -383,13 +383,13 @@ def test_trainer_update_policy(mock_env, dummy_config, use_discrete): buffer["curiosity_value_estimates"] = buffer["rewards"] trainer.update_buffer = buffer - trainer.update_policy() + trainer._update_policy() # Make batch length a larger multiple of sequence length trainer.trainer_parameters["batch_size"] = 128 - trainer.update_policy() + trainer._update_policy() # Make batch length a larger non-multiple of sequence length trainer.trainer_parameters["batch_size"] = 100 - trainer.update_policy() + trainer._update_policy() def test_process_trajectory(dummy_config): @@ -403,9 +403,11 @@ def test_process_trajectory(dummy_config): ) dummy_config["summary_path"] = "./summaries/test_trainer_summary" dummy_config["model_path"] = "./models/test_trainer_models/TestModel" - trainer = PPOTrainer( - brain_params.brain_name, 0, dummy_config, True, False, 0, "0", False - ) + trainer = PPOTrainer(brain_params, 0, dummy_config, True, False, 0, "0", False) + policy = trainer.create_policy(brain_params) + trainer.add_policy(brain_params.brain_name, policy) + trajectory_queue = AgentManagerQueue("testbrain") + trainer.subscribe_trajectory_queue(trajectory_queue) time_horizon = 15 trajectory = make_fake_trajectory( length=time_horizon, @@ -414,9 +416,8 @@ def test_process_trajectory(dummy_config): num_vis_obs=0, action_space=2, ) - policy = trainer.create_policy(brain_params) - trainer.add_policy(brain_params.brain_name, policy) - trainer.process_trajectory(trajectory) + trajectory_queue.put(trajectory) + trainer.advance() # Check that trainer put trajectory in update buffer assert trainer.update_buffer.num_experiences == 15 @@ -440,7 +441,8 @@ def test_process_trajectory(dummy_config): num_vis_obs=0, action_space=2, ) - trainer.process_trajectory(trajectory) + trajectory_queue.put(trajectory) + trainer.advance() # Check that the stats are reset as episode is finished for reward in trainer.collected_rewards.values(): @@ -477,7 +479,7 @@ def test_normalization(dummy_config): policy = trainer.create_policy(brain_params) trainer.add_policy(brain_params.brain_name, policy) - trainer.process_trajectory(trajectory) + trainer._process_trajectory(trajectory) # Check that the running mean and variance is correct steps, mean, variance = trainer.policy.sess.run( @@ -503,7 +505,7 @@ def test_normalization(dummy_config): num_vis_obs=0, action_space=2, ) - trainer.process_trajectory(trajectory) + trainer._process_trajectory(trajectory) # Check that the running mean and variance is correct steps, mean, variance = trainer.policy.sess.run( diff --git a/ml-agents/mlagents/trainers/tests/test_rl_trainer.py b/ml-agents/mlagents/trainers/tests/test_rl_trainer.py index b912d8ff47..986746ea74 100644 --- a/ml-agents/mlagents/trainers/tests/test_rl_trainer.py +++ b/ml-agents/mlagents/trainers/tests/test_rl_trainer.py @@ -1,7 +1,6 @@ -import unittest.mock as mock import yaml +from unittest import mock import mlagents.trainers.tests.mock_brain as mb -import numpy as np from mlagents.trainers.rl_trainer import RLTrainer from mlagents.trainers.tests.test_buffer import construct_fake_buffer @@ -10,6 +9,7 @@ def dummy_config(): return yaml.safe_load( """ summary_path: "test/" + summary_freq: 1000 reward_signals: extrinsic: strength: 1.0 @@ -28,24 +28,31 @@ def create_mock_brain(): return mock_brain -def create_rl_trainer(): - mock_brainparams = create_mock_brain() - trainer = RLTrainer(mock_brainparams.brain_name, dummy_config(), True, 0) - return trainer +# Add concrete implementations of abstract methods +class FakeTrainer(RLTrainer): + def get_policy(self, name_behavior_id): + return mock.Mock() + def _is_ready_update(self): + return True -def create_mock_all_brain_info(brain_info): - return {"MockBrain": brain_info} + def _update_policy(self): + pass + def add_policy(self): + pass -def create_mock_policy(): - mock_policy = mock.Mock() - mock_policy.reward_signals = {} - mock_policy.retrieve_memories.return_value = np.zeros((1, 1), dtype=np.float32) - mock_policy.retrieve_previous_action.return_value = np.zeros( - (1, 1), dtype=np.float32 - ) - return mock_policy + def create_policy(self): + return mock.Mock() + + def _process_trajectory(self, trajectory): + super()._process_trajectory(trajectory) + + +def create_rl_trainer(): + mock_brainparams = create_mock_brain() + trainer = FakeTrainer(mock_brainparams, dummy_config(), True, 0) + return trainer def test_rl_trainer(): diff --git a/ml-agents/mlagents/trainers/tests/test_sac.py b/ml-agents/mlagents/trainers/tests/test_sac.py index b2acda3436..24a36f2f48 100644 --- a/ml-agents/mlagents/trainers/tests/test_sac.py +++ b/ml-agents/mlagents/trainers/tests/test_sac.py @@ -9,6 +9,7 @@ from mlagents.trainers.sac.models import SACModel from mlagents.trainers.sac.policy import SACPolicy from mlagents.trainers.sac.trainer import SACTrainer +from mlagents.trainers.agent_processor import AgentManagerQueue from mlagents.trainers.tests import mock_brain as mb from mlagents.trainers.tests.mock_brain import make_brain_parameters from mlagents.trainers.tests.test_trajectory import make_fake_trajectory @@ -368,10 +369,14 @@ def test_process_trajectory(dummy_config): policy = trainer.create_policy(brain_params) trainer.add_policy(brain_params.brain_name, policy) + trajectory_queue = AgentManagerQueue("testbrain") + trainer.subscribe_trajectory_queue(trajectory_queue) + trajectory = make_fake_trajectory( length=15, max_step_complete=True, vec_obs_size=6, num_vis_obs=0, action_space=2 ) - trainer.process_trajectory(trajectory) + trajectory_queue.put(trajectory) + trainer.advance() # Check that trainer put trajectory in update buffer assert trainer.update_buffer.num_experiences == 15 @@ -389,7 +394,8 @@ def test_process_trajectory(dummy_config): num_vis_obs=0, action_space=2, ) - trainer.process_trajectory(trajectory) + trajectory_queue.put(trajectory) + trainer.advance() # Check that the stats are reset as episode is finished for reward in trainer.collected_rewards.values(): diff --git a/ml-agents/mlagents/trainers/tests/test_trainer_controller.py b/ml-agents/mlagents/trainers/tests/test_trainer_controller.py index 1b6fac0b93..9b7d8c24e1 100644 --- a/ml-agents/mlagents/trainers/tests/test_trainer_controller.py +++ b/ml-agents/mlagents/trainers/tests/test_trainer_controller.py @@ -2,7 +2,7 @@ import pytest from mlagents.tf_utils import tf -from mlagents.trainers.trainer_controller import TrainerController, AgentManager +from mlagents.trainers.trainer_controller import TrainerController from mlagents.trainers.subprocess_env_manager import EnvironmentStep from mlagents.trainers.sampler_class import SamplerManager @@ -48,6 +48,7 @@ def trainer_controller_with_start_learning_mocks(basic_trainer_controller): trainer_mock = MagicMock() trainer_mock.get_step = 0 trainer_mock.get_max_steps = 5 + trainer_mock.should_still_train = True trainer_mock.parameters = {"some": "parameter"} trainer_mock.write_tensorboard_text = MagicMock() @@ -59,6 +60,11 @@ def trainer_controller_with_start_learning_mocks(basic_trainer_controller): def take_step_sideeffect(env): tc.trainers["testbrain"].get_step += 1 + if ( + not tc.trainers["testbrain"].get_step + <= tc.trainers["testbrain"].get_max_steps + ): + tc.trainers["testbrain"].should_still_train = False if tc.trainers["testbrain"].get_step > 10: raise KeyboardInterrupt return 1 @@ -120,11 +126,9 @@ def trainer_controller_with_take_step_mocks(basic_trainer_controller): trainer_mock.parameters = {"some": "parameter"} trainer_mock.write_tensorboard_text = MagicMock() - processor_mock = MagicMock() - tc = basic_trainer_controller tc.trainers = {"testbrain": trainer_mock} - tc.managers = {"testbrain": AgentManager(processor=processor_mock)} + tc.managers = {"testbrain": MagicMock()} return tc, trainer_mock @@ -140,7 +144,7 @@ def test_take_step_adds_experiences_to_trainer_and_trains( brain_info_dict = {brain_name: Mock()} old_step_info = EnvironmentStep(brain_info_dict, brain_info_dict, action_info_dict) new_step_info = EnvironmentStep(brain_info_dict, brain_info_dict, action_info_dict) - trainer_mock.is_ready_update = MagicMock(return_value=True) + trainer_mock._is_ready_update = MagicMock(return_value=True) env_mock = MagicMock() env_mock.step.return_value = [new_step_info] @@ -153,14 +157,14 @@ def test_take_step_adds_experiences_to_trainer_and_trains( env_mock.reset.assert_not_called() env_mock.step.assert_called_once() - processor_mock = tc.managers[brain_name].processor - processor_mock.add_experiences.assert_called_once_with( + manager_mock = tc.managers[brain_name] + manager_mock.add_experiences.assert_called_once_with( new_step_info.previous_all_brain_info[brain_name], new_step_info.current_all_brain_info[brain_name], new_step_info.brain_name_to_action_info[brain_name].outputs, ) - trainer_mock.update_policy.assert_called_once() - trainer_mock.increment_step.assert_called_once() + + trainer_mock.advance.assert_called_once() def test_take_step_if_not_training(trainer_controller_with_take_step_mocks): @@ -174,7 +178,7 @@ def test_take_step_if_not_training(trainer_controller_with_take_step_mocks): old_step_info = EnvironmentStep(brain_info_dict, brain_info_dict, action_info_dict) new_step_info = EnvironmentStep(brain_info_dict, brain_info_dict, action_info_dict) - trainer_mock.is_ready_update = MagicMock(return_value=False) + trainer_mock._is_ready_update = MagicMock(return_value=False) env_mock = MagicMock() env_mock.step.return_value = [new_step_info] @@ -185,8 +189,8 @@ def test_take_step_if_not_training(trainer_controller_with_take_step_mocks): tc.advance(env_mock) env_mock.reset.assert_not_called() env_mock.step.assert_called_once() - processor_mock = tc.managers[brain_name].processor - processor_mock.add_experiences.assert_called_once_with( + manager_mock = tc.managers[brain_name] + manager_mock.add_experiences.assert_called_once_with( new_step_info.previous_all_brain_info[brain_name], new_step_info.current_all_brain_info[brain_name], new_step_info.brain_name_to_action_info[brain_name].outputs, diff --git a/ml-agents/mlagents/trainers/trainer.py b/ml-agents/mlagents/trainers/trainer.py index c34825fffb..05ed0d8b90 100644 --- a/ml-agents/mlagents/trainers/trainer.py +++ b/ml-agents/mlagents/trainers/trainer.py @@ -1,6 +1,8 @@ # # Unity ML-Agents Toolkit import logging from typing import Dict, List, Deque, Any +import time +import abc from mlagents.tf_utils import tf @@ -11,7 +13,10 @@ from mlagents.trainers.tf_policy import TFPolicy from mlagents.trainers.stats import StatsReporter from mlagents.trainers.trajectory import Trajectory +from mlagents.trainers.agent_processor import AgentManagerQueue from mlagents.trainers.brain import BrainParameters +from mlagents.trainers.policy import Policy +from mlagents_envs.timers import hierarchical_timer LOGGER = logging.getLogger("mlagents.trainers") @@ -24,7 +29,7 @@ class UnityTrainerException(UnityException): pass -class Trainer(object): +class Trainer(abc.ABC): """This class is the base class for the mlagents_envs.trainers""" def __init__( @@ -52,9 +57,14 @@ def __init__( self.cumulative_returns_since_policy_update: List[float] = [] self.is_training = training self._reward_buffer: Deque[float] = deque(maxlen=reward_buff_cap) + self.policy_queues: List[AgentManagerQueue[Policy]] = [] + self.trajectory_queues: List[AgentManagerQueue[Trajectory]] = [] self.step: int = 0 + self.training_start_time = time.time() + self.summary_freq = self.trainer_parameters["summary_freq"] + self.next_update_step = self.summary_freq - def check_param_keys(self): + def _check_param_keys(self): for k in self.param_keys: if k not in self.trainer_parameters: raise UnityTrainerException( @@ -83,7 +93,7 @@ def write_tensorboard_text(self, key: str, input_dict: Dict[str, Any]) -> None: LOGGER.info("Could not write text summary for Tensorboard.") pass - def dict_to_str(self, param_dict: Dict[str, Any], num_tabs: int) -> str: + def _dict_to_str(self, param_dict: Dict[str, Any], num_tabs: int) -> str: """ Takes a parameter dictionary and converts it to a human-readable string. Recurses if there are multiple levels of dict. Used to print out hyperaparameters. @@ -99,7 +109,7 @@ def dict_to_str(self, param_dict: Dict[str, Any], num_tabs: int) -> str: "\t" + " " * num_tabs + "{0}:\t{1}".format( - x, self.dict_to_str(param_dict[x], num_tabs + 1) + x, self._dict_to_str(param_dict[x], num_tabs + 1) ) for x in param_dict ] @@ -109,7 +119,7 @@ def __str__(self) -> str: return """Hyperparameters for the {0} of brain {1}: \n{2}""".format( self.__class__.__name__, self.brain_name, - self.dict_to_str(self.trainer_parameters, 0), + self._dict_to_str(self.trainer_parameters, 0), ) @property @@ -120,12 +130,12 @@ def parameters(self) -> Dict[str, Any]: return self.trainer_parameters @property - def get_max_steps(self) -> float: + def get_max_steps(self) -> int: """ Returns the maximum number of steps. Is used to know when the trainer should be stopped. :return: The maximum number of steps of the trainer """ - return float(self.trainer_parameters["max_steps"]) + return int(float(self.trainer_parameters["max_steps"])) @property def get_step(self) -> int: @@ -135,6 +145,15 @@ def get_step(self) -> int: """ return self.step + @property + def should_still_train(self) -> bool: + """ + Returns whether or not the trainer should train. A Trainer could + stop training if it wasn't training to begin with, or if max_steps + is reached. + """ + return self.is_training and self.get_step <= self.get_max_steps + @property def reward_buffer(self) -> Deque[float]: """ @@ -145,13 +164,18 @@ def reward_buffer(self) -> Deque[float]: """ return self._reward_buffer - def increment_step(self, n_steps: int) -> None: + def _increment_step(self, n_steps: int, name_behavior_id: str) -> None: """ Increment the step count of the trainer - :param n_steps: number of steps to increment the step count by """ self.step += n_steps + self.next_update_step = self.step + ( + self.summary_freq - self.step % self.summary_freq + ) + p = self.get_policy(name_behavior_id) + if p: + p.increment_step(n_steps) def save_model(self, name_behavior_id: str) -> None: """ @@ -165,97 +189,133 @@ def export_model(self, name_behavior_id: str) -> None: """ self.get_policy(name_behavior_id).export_model() - def write_summary(self, global_step: int, delta_train_start: float) -> None: + def _write_summary(self, step: int) -> None: """ Saves training statistics to Tensorboard. - :param delta_train_start: Time elapsed since training started. - :param global_step: The number of steps the simulation has been going for - """ - if ( - global_step % self.trainer_parameters["summary_freq"] == 0 - and global_step != 0 - ): - is_training = ( - "Training." - if self.is_training and self.get_step <= self.get_max_steps - else "Not Training." - ) - step = min(self.get_step, self.get_max_steps) - stats_summary = self.stats_reporter.get_stats_summaries( - "Environment/Cumulative Reward" - ) - if stats_summary.num > 0: - LOGGER.info( - " {}: {}: Step: {}. " - "Time Elapsed: {:0.3f} s " - "Mean " - "Reward: {:0.3f}" - ". Std of Reward: {:0.3f}. {}".format( - self.run_id, - self.brain_name, - step, - delta_train_start, - stats_summary.mean, - stats_summary.std, - is_training, - ) + """ + is_training = "Training." if self.should_still_train else "Not Training." + stats_summary = self.stats_reporter.get_stats_summaries( + "Environment/Cumulative Reward" + ) + if stats_summary.num > 0: + LOGGER.info( + " {}: {}: Step: {}. " + "Time Elapsed: {:0.3f} s " + "Mean " + "Reward: {:0.3f}" + ". Std of Reward: {:0.3f}. {}".format( + self.run_id, + self.brain_name, + step, + time.time() - self.training_start_time, + stats_summary.mean, + stats_summary.std, + is_training, ) - set_gauge(f"{self.brain_name}.mean_reward", stats_summary.mean) - else: - LOGGER.info( - " {}: {}: Step: {}. No episode was completed since last summary. {}".format( - self.run_id, self.brain_name, step, is_training - ) + ) + set_gauge(f"{self.brain_name}.mean_reward", stats_summary.mean) + else: + LOGGER.info( + " {}: {}: Step: {}. No episode was completed since last summary. {}".format( + self.run_id, self.brain_name, step, is_training ) - self.stats_reporter.write_stats(int(step)) + ) + self.stats_reporter.write_stats(int(step)) - def process_trajectory(self, trajectory: Trajectory) -> None: + @abc.abstractmethod + def _process_trajectory(self, trajectory: Trajectory) -> None: """ Takes a trajectory and processes it, putting it into the update buffer. - Processing involves calculating value and advantage targets for model updating step. :param trajectory: The Trajectory tuple containing the steps to be processed. """ - raise UnityTrainerException( - "The process_experiences method was not implemented." - ) - - def end_episode(self): - """ - A signal that the Episode has ended. The buffer must be reset. - Get only called when the academy resets. - """ - raise UnityTrainerException("The end_episode method was not implemented.") + self._maybe_write_summary(self.get_step + len(trajectory.steps)) + self._increment_step(len(trajectory.steps), trajectory.behavior_id) - def is_ready_update(self): + def _maybe_write_summary(self, step_after_process: int) -> None: """ - Returns whether or not the trainer has enough elements to run update model - :return: A boolean corresponding to wether or not update_model() can be run + If processing the trajectory will make the step exceed the next summary write, + write the summary. This logic ensures summaries are written on the update step and not in between. + :param step_after_process: the step count after processing the next trajectory. """ - raise UnityTrainerException("The is_ready_update method was not implemented.") + if step_after_process >= self.next_update_step and self.get_step != 0: + self._write_summary(self.next_update_step) - def update_policy(self): + @abc.abstractmethod + def end_episode(self): """ - Uses demonstration_buffer to update model. + A signal that the Episode has ended. The buffer must be reset. + Get only called when the academy resets. """ - raise UnityTrainerException("The update_model method was not implemented.") + pass + @abc.abstractmethod def create_policy(self, brain_parameters: BrainParameters) -> TFPolicy: """ Creates policy """ - raise UnityTrainerException("The create_policy method was not implemented.") + pass + @abc.abstractmethod def add_policy(self, name_behavior_id: str, policy: TFPolicy) -> None: """ Adds policy to trainer """ - raise UnityTrainerException("The add_policy method was not implemented") + pass + @abc.abstractmethod def get_policy(self, name_behavior_id: str) -> TFPolicy: """ Gets policy from trainer """ - raise UnityTrainerException("The get_policy method was not implemented.") + pass - def advance(self) -> None: + @abc.abstractmethod + def _is_ready_update(self): + """ + Returns whether or not the trainer has enough elements to run update model + :return: A boolean corresponding to wether or not update_model() can be run + """ + return False + + @abc.abstractmethod + def _update_policy(self): + """ + Uses demonstration_buffer to update model. + """ pass + + def advance(self) -> None: + """ + Steps the trainer, taking in trajectories and updates if ready. + """ + with hierarchical_timer("process_trajectory"): + for traj_queue in self.trajectory_queues: + try: + t = traj_queue.get_nowait() + self._process_trajectory(t) + except AgentManagerQueue.Empty: + pass + if self.should_still_train: + if self._is_ready_update(): + with hierarchical_timer("_update_policy"): + self._update_policy() + for q in self.policy_queues: + # Get policies that correspond to the policy queue in question + q.put(self.get_policy(q.behavior_id)) + + def publish_policy_queue(self, policy_queue: AgentManagerQueue[Policy]) -> None: + """ + Adds a policy queue to the list of queues to publish to when this Trainer + makes a policy update + :param queue: Policy queue to publish to. + """ + self.policy_queues.append(policy_queue) + + def subscribe_trajectory_queue( + self, trajectory_queue: AgentManagerQueue[Trajectory] + ) -> None: + """ + Adds a trajectory queue to the list of queues for the trainer injest Trajectories from. + :param queue: Trajectory queue to publish to. + """ + self.trajectory_queues.append(trajectory_queue) diff --git a/ml-agents/mlagents/trainers/trainer_controller.py b/ml-agents/mlagents/trainers/trainer_controller.py index 7647f2c105..d003eac61a 100644 --- a/ml-agents/mlagents/trainers/trainer_controller.py +++ b/ml-agents/mlagents/trainers/trainer_controller.py @@ -6,12 +6,11 @@ import sys import json import logging -from typing import Dict, List, Optional, Set, NamedTuple +from typing import Dict, List, Optional, Set from collections import defaultdict import numpy as np from mlagents.tf_utils import tf -from time import time from mlagents.trainers.env_manager import EnvManager, EnvironmentStep from mlagents_envs.exception import ( @@ -23,11 +22,7 @@ from mlagents.trainers.trainer import Trainer from mlagents.trainers.meta_curriculum import MetaCurriculum from mlagents.trainers.trainer_util import TrainerFactory -from mlagents.trainers.agent_processor import AgentProcessor - - -class AgentManager(NamedTuple): - processor: AgentProcessor +from mlagents.trainers.agent_processor import AgentManager, AgentManagerQueue class TrainerController(object): @@ -66,7 +61,6 @@ def __init__( self.save_freq = save_freq self.train_model = train self.meta_curriculum = meta_curriculum - self.training_start_time = time() self.sampler_manager = sampler_manager self.resampling_interval = resampling_interval np.random.seed(training_seed) @@ -83,9 +77,8 @@ def _get_measure_vals(self): if brain_name not in self.trainers: continue if curriculum.measure == "progress": - measure_val = ( - self.trainers[brain_name].get_step - / self.trainers[brain_name].get_max_steps + measure_val = self.trainers[brain_name].get_step / float( + self.trainers[brain_name].get_max_steps ) brain_names_to_measure_vals[brain_name] = measure_val elif curriculum.measure == "reward": @@ -164,23 +157,43 @@ def _should_save_model(self, global_step: int) -> bool: def _not_done_training(self) -> bool: return ( - any(t.get_step <= t.get_max_steps for k, t in self.trainers.items()) + any(t.should_still_train for t in self.trainers.values()) or not self.train_model ) or len(self.trainers) == 0 - def write_to_tensorboard(self, global_step: int) -> None: - for brain_name, trainer in self.trainers.items(): - # Write training statistics to Tensorboard. - delta_train_start = time() - self.training_start_time - if ( - self.meta_curriculum - and brain_name in self.meta_curriculum.brains_to_curricula - ): - lesson_num = self.meta_curriculum.brains_to_curricula[ - brain_name - ].lesson_num - trainer.stats_reporter.add_stat("Environment/Lesson", lesson_num) - trainer.write_summary(global_step, delta_train_start) + def _create_trainer_and_manager( + self, env_manager: EnvManager, name_behavior_id: str + ) -> None: + try: + brain_name, _ = name_behavior_id.split("?") + except ValueError: + brain_name = name_behavior_id + + try: + trainer = self.trainers[brain_name] + except KeyError: + trainer = self.trainer_factory.generate(brain_name) + self.trainers[brain_name] = trainer + self.logger.info(trainer) + if self.train_model: + trainer.write_tensorboard_text("Hyperparameters", trainer.parameters) + + policy = trainer.create_policy(env_manager.external_brains[name_behavior_id]) + trainer.add_policy(name_behavior_id, policy) + + env_manager.set_policy(name_behavior_id, policy) + + self.brain_name_to_identifier[brain_name].add(name_behavior_id) + + agent_manager = AgentManager( + policy, + name_behavior_id, + trainer.stats_reporter, + trainer.parameters.get("time_horizon", sys.maxsize), + ) + trainer.publish_policy_queue(agent_manager.policy_queue) + trainer.subscribe_trajectory_queue(agent_manager.trajectory_queue) + self.managers[name_behavior_id] = agent_manager def start_learning(self, env_manager: EnvManager) -> None: self._create_model_path(self.model_path) @@ -193,44 +206,8 @@ def start_learning(self, env_manager: EnvManager) -> None: external_brain_behavior_ids = set(env_manager.external_brains.keys()) new_behavior_ids = external_brain_behavior_ids - last_brain_behavior_ids for name_behavior_id in new_behavior_ids: - try: - brain_name, _ = name_behavior_id.split("?") - except ValueError: - brain_name = name_behavior_id - - try: - trainer = self.trainers[brain_name] - except KeyError: - trainer = self.trainer_factory.generate(brain_name) - self.trainers[brain_name] = trainer - self.logger.info(trainer) - if self.train_model: - trainer.write_tensorboard_text( - "Hyperparameters", trainer.parameters - ) - - policy = trainer.create_policy( - env_manager.external_brains[name_behavior_id] - ) - trainer.add_policy(name_behavior_id, policy) - - env_manager.set_policy(name_behavior_id, policy) - - self.brain_name_to_identifier[brain_name].add(name_behavior_id) - - agent_manager = AgentManager( - processor=AgentProcessor( - trainer, - policy, - name_behavior_id, - trainer.stats_reporter, - trainer.parameters.get("time_horizon", sys.maxsize), - ) - ) - self.managers[name_behavior_id] = agent_manager - + self._create_trainer_and_manager(env_manager, name_behavior_id) last_brain_behavior_ids = external_brain_behavior_ids - n_steps = self.advance(env_manager) for _ in range(n_steps): global_step += 1 @@ -238,7 +215,6 @@ def start_learning(self, env_manager: EnvManager) -> None: if self._should_save_model(global_step): # Save Tensorflow model self._save_model() - self.write_to_tensorboard(global_step) # Final save Tensorflow model if global_step != 0 and self.train_model: self._save_model() @@ -289,41 +265,50 @@ def reset_env_if_ready(self, env: EnvManager, steps: int) -> None: if meta_curriculum_reset or generalization_reset: self.end_trainer_episodes(env, lessons_incremented) - @timed - def advance(self, env: EnvManager) -> int: + def _get_and_process_experiences(self, env: EnvManager) -> int: with hierarchical_timer("env_step"): + # Get new policies if found + for brain_name in self.trainers.keys(): + for name_behavior_id in self.brain_name_to_identifier[brain_name]: + try: + _policy = self.managers[ + name_behavior_id + ].policy_queue.get_nowait() + env.set_policy(name_behavior_id, _policy) + except AgentManagerQueue.Empty: + pass + # Step the environment new_step_infos = env.step() + # Add to AgentProcessor for step_info in new_step_infos: for brain_name in self.trainers.keys(): for name_behavior_id in self.brain_name_to_identifier[brain_name]: if step_info.has_actions_for_brain(name_behavior_id): - _processor = self.managers[name_behavior_id].processor - _processor.add_experiences( + self.managers[name_behavior_id].add_experiences( step_info.previous_all_brain_info[name_behavior_id], step_info.current_all_brain_info[name_behavior_id], step_info.brain_name_to_action_info[ name_behavior_id ].outputs, ) + return len(new_step_infos) - for brain_name, trainer in self.trainers.items(): - if self.train_model and trainer.get_step <= trainer.get_max_steps: - n_steps = len(new_step_infos) - trainer.increment_step(n_steps) - for name_behavior_id in self.brain_name_to_identifier[brain_name]: - trainer.get_policy(name_behavior_id).increment_step(n_steps) - if trainer.is_ready_update(): - # Perform gradient descent with experience buffer - with hierarchical_timer("update_policy"): - trainer.update_policy() - for name_behavior_id in self.brain_name_to_identifier[brain_name]: - env.set_policy( - name_behavior_id, trainer.get_policy(name_behavior_id) - ) - else: - # Avoid memory leak during inference - # Eventually this whole block will take place in advance() - # But currently this only calls clear_update_buffer() in RLTrainer - # and nothing in the base class + @timed + def advance(self, env: EnvManager) -> int: + # Get steps + num_steps = self._get_and_process_experiences(env) + + # Report current lesson + if self.meta_curriculum: + for brain_name, curr in self.meta_curriculum.brains_to_curricula.items(): + if brain_name in self.trainers: + self.trainers[brain_name].stats_reporter.set_stat( + "Environment/Lesson", curr.lesson_num + ) + + # Advance trainers. This can be done in a separate loop in the future. + with hierarchical_timer("trainer_advance"): + for trainer in self.trainers.values(): trainer.advance() - return len(new_step_infos) + + return num_steps