diff --git a/ml-agents-envs/mlagents_envs/base_env.py b/ml-agents-envs/mlagents_envs/base_env.py index fd98546418..12edfaa9eb 100644 --- a/ml-agents-envs/mlagents_envs/base_env.py +++ b/ml-agents-envs/mlagents_envs/base_env.py @@ -34,6 +34,7 @@ from mlagents_envs.exception import UnityActionException AgentId = int +GroupId = int BehaviorName = str @@ -172,7 +173,7 @@ class TerminalStep(NamedTuple): reward: float interrupted: bool agent_id: AgentId - group_id: int + group_id: GroupId group_reward: float diff --git a/ml-agents/mlagents/trainers/agent_processor.py b/ml-agents/mlagents/trainers/agent_processor.py index fb77df0795..37477b7f0e 100644 --- a/ml-agents/mlagents/trainers/agent_processor.py +++ b/ml-agents/mlagents/trainers/agent_processor.py @@ -1,4 +1,5 @@ import sys +import numpy as np from typing import List, Dict, TypeVar, Generic, Tuple, Any, Union from collections import defaultdict, Counter import queue @@ -14,12 +15,17 @@ StatsAggregationMethod, EnvironmentStats, ) -from mlagents.trainers.trajectory import Trajectory, AgentExperience +from mlagents.trainers.trajectory import AgentStatus, Trajectory, AgentExperience from mlagents.trainers.policy import Policy from mlagents.trainers.action_info import ActionInfo, ActionInfoOutputs from mlagents.trainers.torch.action_log_probs import LogProbsTuple from mlagents.trainers.stats import StatsReporter -from mlagents.trainers.behavior_id_utils import get_global_agent_id +from mlagents.trainers.behavior_id_utils import ( + get_global_agent_id, + get_global_group_id, + GlobalAgentId, + GlobalGroupId, +) T = TypeVar("T") @@ -47,20 +53,36 @@ def __init__( :param max_trajectory_length: Maximum length of a trajectory before it is added to the trainer. :param stats_category: The category under which to write the stats. Usually, this comes from the Trainer. """ - self.experience_buffers: Dict[str, List[AgentExperience]] = defaultdict(list) - self.last_step_result: Dict[str, Tuple[DecisionStep, int]] = {} + self._experience_buffers: Dict[ + GlobalAgentId, List[AgentExperience] + ] = defaultdict(list) + self._last_step_result: Dict[GlobalAgentId, Tuple[DecisionStep, int]] = {} + # current_group_obs is used to collect the current (i.e. the most recently seen) + # obs of all the agents in the same group, and assemble the group obs. + # It is a dictionary of GlobalGroupId to dictionaries of GlobalAgentId to observation. + self._current_group_obs: Dict[ + GlobalGroupId, Dict[GlobalAgentId, List[np.ndarray]] + ] = defaultdict(lambda: defaultdict(list)) + # group_status is used to collect the current, most recently seen + # group status of all the agents in the same group, and assemble the group's status. + # It is a dictionary of GlobalGroupId to dictionaries of GlobalAgentId to AgentStatus. + self._group_status: Dict[ + GlobalGroupId, Dict[GlobalAgentId, AgentStatus] + ] = defaultdict(lambda: defaultdict(None)) # last_take_action_outputs stores the action a_t taken before the current observation s_(t+1), while # grabbing previous_action from the policy grabs the action PRIOR to that, a_(t-1). - self.last_take_action_outputs: Dict[str, ActionInfoOutputs] = {} + self._last_take_action_outputs: Dict[GlobalAgentId, ActionInfoOutputs] = {} + + self._episode_steps: Counter = Counter() + self._episode_rewards: Dict[GlobalAgentId, float] = defaultdict(float) + self._stats_reporter = stats_reporter + self._max_trajectory_length = max_trajectory_length + self._trajectory_queues: List[AgentManagerQueue[Trajectory]] = [] + self._behavior_id = behavior_id + # Note: In the future this policy reference will be the policy of the env_manager and not the trainer. # We can in that case just grab the action from the policy rather than having it passed in. self.policy = policy - self.episode_steps: Counter = Counter() - self.episode_rewards: Dict[str, float] = defaultdict(float) - self.stats_reporter = stats_reporter - self.max_trajectory_length = max_trajectory_length - self.trajectory_queues: List[AgentManagerQueue[Trajectory]] = [] - self.behavior_id = behavior_id def add_experiences( self, @@ -78,55 +100,124 @@ def add_experiences( take_action_outputs = previous_action.outputs if take_action_outputs: for _entropy in take_action_outputs["entropy"]: - self.stats_reporter.add_stat("Policy/Entropy", _entropy) + self._stats_reporter.add_stat("Policy/Entropy", _entropy) # Make unique agent_ids that are global across workers action_global_agent_ids = [ get_global_agent_id(worker_id, ag_id) for ag_id in previous_action.agent_ids ] for global_id in action_global_agent_ids: - if global_id in self.last_step_result: # Don't store if agent just reset - self.last_take_action_outputs[global_id] = take_action_outputs + if global_id in self._last_step_result: # Don't store if agent just reset + self._last_take_action_outputs[global_id] = take_action_outputs - # Iterate over all the terminal steps + # Iterate over all the terminal steps, first gather all the group obs + # and then create the AgentExperiences/Trajectories. _add_to_group_status + # stores Group statuses in a common data structure self.group_status + for terminal_step in terminal_steps.values(): + self._add_group_status_and_obs(terminal_step, worker_id) for terminal_step in terminal_steps.values(): local_id = terminal_step.agent_id global_id = get_global_agent_id(worker_id, local_id) self._process_step( - terminal_step, global_id, terminal_steps.agent_id_to_index[local_id] + terminal_step, worker_id, terminal_steps.agent_id_to_index[local_id] ) - # Iterate over all the decision steps + # Clear the last seen group obs when agents die. + self._clear_group_status_and_obs(global_id) + + # Iterate over all the decision steps, first gather all the group obs + # and then create the trajectories. _add_to_group_status + # stores Group statuses in a common data structure self.group_status + for ongoing_step in decision_steps.values(): + self._add_group_status_and_obs(ongoing_step, worker_id) for ongoing_step in decision_steps.values(): local_id = ongoing_step.agent_id - global_id = get_global_agent_id(worker_id, local_id) self._process_step( - ongoing_step, global_id, decision_steps.agent_id_to_index[local_id] + ongoing_step, worker_id, decision_steps.agent_id_to_index[local_id] ) for _gid in action_global_agent_ids: # If the ID doesn't have a last step result, the agent just reset, # don't store the action. - if _gid in self.last_step_result: + if _gid in self._last_step_result: if "action" in take_action_outputs: self.policy.save_previous_action( [_gid], take_action_outputs["action"] ) + def _add_group_status_and_obs( + self, step: Union[TerminalStep, DecisionStep], worker_id: int + ) -> None: + """ + Takes a TerminalStep or DecisionStep and adds the information in it + to self.group_status. This information can then be retrieved + when constructing trajectories to get the status of group mates. Also stores the current + observation into current_group_obs, to be used to get the next group observations + for bootstrapping. + :param step: TerminalStep or DecisionStep + :param worker_id: Worker ID of this particular environment. Used to generate a + global group id. + """ + global_agent_id = get_global_agent_id(worker_id, step.agent_id) + stored_decision_step, idx = self._last_step_result.get( + global_agent_id, (None, None) + ) + stored_take_action_outputs = self._last_take_action_outputs.get( + global_agent_id, None + ) + if stored_decision_step is not None and stored_take_action_outputs is not None: + # 0, the default group_id, means that the agent doesn't belong to an agent group. + # If 0, don't add any groupmate information. + if step.group_id > 0: + global_group_id = get_global_group_id(worker_id, step.group_id) + stored_actions = stored_take_action_outputs["action"] + action_tuple = ActionTuple( + continuous=stored_actions.continuous[idx], + discrete=stored_actions.discrete[idx], + ) + group_status = AgentStatus( + obs=stored_decision_step.obs, + reward=step.reward, + action=action_tuple, + done=isinstance(step, TerminalStep), + ) + self._group_status[global_group_id][global_agent_id] = group_status + self._current_group_obs[global_group_id][global_agent_id] = step.obs + + def _clear_group_status_and_obs(self, global_id: GlobalAgentId) -> None: + """ + Clears an agent from self._group_status and self._current_group_obs. + """ + self._delete_in_nested_dict(self._current_group_obs, global_id) + self._delete_in_nested_dict(self._group_status, global_id) + + def _delete_in_nested_dict(self, nested_dict: Dict[str, Any], key: str) -> None: + for _manager_id in list(nested_dict.keys()): + _team_group = nested_dict[_manager_id] + self._safe_delete(_team_group, key) + if not _team_group: # if dict is empty + self._safe_delete(nested_dict, _manager_id) + def _process_step( - self, step: Union[TerminalStep, DecisionStep], global_id: str, index: int + self, step: Union[TerminalStep, DecisionStep], worker_id: int, index: int ) -> None: terminated = isinstance(step, TerminalStep) - stored_decision_step, idx = self.last_step_result.get(global_id, (None, None)) - stored_take_action_outputs = self.last_take_action_outputs.get(global_id, None) + global_agent_id = get_global_agent_id(worker_id, step.agent_id) + global_group_id = get_global_group_id(worker_id, step.group_id) + stored_decision_step, idx = self._last_step_result.get( + global_agent_id, (None, None) + ) + stored_take_action_outputs = self._last_take_action_outputs.get( + global_agent_id, None + ) if not terminated: # Index is needed to grab from last_take_action_outputs - self.last_step_result[global_id] = (step, index) + self._last_step_result[global_agent_id] = (step, index) # This state is the consequence of a past action if stored_decision_step is not None and stored_take_action_outputs is not None: obs = stored_decision_step.obs if self.policy.use_recurrent: - memory = self.policy.retrieve_previous_memories([global_id])[0, :] + memory = self.policy.retrieve_previous_memories([global_agent_id])[0, :] else: memory = None done = terminated # Since this is an ongoing step @@ -143,7 +234,14 @@ def _process_step( discrete=stored_action_probs.discrete[idx], ) action_mask = stored_decision_step.action_mask - prev_action = self.policy.retrieve_previous_action([global_id])[0, :] + prev_action = self.policy.retrieve_previous_action([global_agent_id])[0, :] + + # Assemble teammate_obs. If none saved, then it will be an empty list. + group_statuses = [] + for _id, _mate_status in self._group_status[global_group_id].items(): + if _id != global_agent_id: + group_statuses.append(_mate_status) + experience = AgentExperience( obs=obs, reward=step.reward, @@ -154,45 +252,54 @@ def _process_step( prev_action=prev_action, interrupted=interrupted, memory=memory, + group_status=group_statuses, + group_reward=step.group_reward, ) # Add the value outputs if needed - self.experience_buffers[global_id].append(experience) - self.episode_rewards[global_id] += step.reward + self._experience_buffers[global_agent_id].append(experience) + self._episode_rewards[global_agent_id] += step.reward if not terminated: - self.episode_steps[global_id] += 1 + self._episode_steps[global_agent_id] += 1 # Add a trajectory segment to the buffer if terminal or the length has reached the time horizon if ( - len(self.experience_buffers[global_id]) >= self.max_trajectory_length + len(self._experience_buffers[global_agent_id]) + >= self._max_trajectory_length or terminated ): - # Make next AgentExperience next_obs = step.obs + next_group_obs = [] + for _id, _obs in self._current_group_obs[global_group_id].items(): + if _id != global_agent_id: + next_group_obs.append(_obs) + trajectory = Trajectory( - steps=self.experience_buffers[global_id], - agent_id=global_id, + steps=self._experience_buffers[global_agent_id], + agent_id=global_agent_id, next_obs=next_obs, - behavior_id=self.behavior_id, + next_group_obs=next_group_obs, + behavior_id=self._behavior_id, ) - for traj_queue in self.trajectory_queues: + for traj_queue in self._trajectory_queues: traj_queue.put(trajectory) - self.experience_buffers[global_id] = [] + self._experience_buffers[global_agent_id] = [] if terminated: # Record episode length. - self.stats_reporter.add_stat( - "Environment/Episode Length", self.episode_steps.get(global_id, 0) + self._stats_reporter.add_stat( + "Environment/Episode Length", + self._episode_steps.get(global_agent_id, 0), ) - self._clean_agent_data(global_id) + self._clean_agent_data(global_agent_id) - def _clean_agent_data(self, global_id: str) -> None: + def _clean_agent_data(self, global_id: GlobalAgentId) -> None: """ Removes the data for an Agent. """ - self._safe_delete(self.experience_buffers, global_id) - self._safe_delete(self.last_take_action_outputs, global_id) - self._safe_delete(self.last_step_result, global_id) - self._safe_delete(self.episode_steps, global_id) - self._safe_delete(self.episode_rewards, global_id) + self._safe_delete(self._experience_buffers, global_id) + self._safe_delete(self._last_take_action_outputs, global_id) + self._safe_delete(self._last_step_result, global_id) + self._safe_delete(self._episode_steps, global_id) + self._safe_delete(self._episode_rewards, global_id) self.policy.remove_previous_action([global_id]) self.policy.remove_memories([global_id]) @@ -212,14 +319,14 @@ def publish_trajectory_queue( assembles a Trajectory :param trajectory_queue: Trajectory queue to publish to. """ - self.trajectory_queues.append(trajectory_queue) + self._trajectory_queues.append(trajectory_queue) def end_episode(self) -> None: """ Ends the episode, terminating the current trajectory and stopping stats collection for that episode. Used for forceful reset (e.g. in curriculum or generalization training.) """ - all_gids = list(self.experience_buffers.keys()) # Need to make copy + all_gids = list(self._experience_buffers.keys()) # Need to make copy for _gid in all_gids: self._clean_agent_data(_gid) @@ -304,12 +411,12 @@ def __init__( super().__init__(policy, behavior_id, stats_reporter, max_trajectory_length) trajectory_queue_len = 20 if threaded else 0 self.trajectory_queue: AgentManagerQueue[Trajectory] = AgentManagerQueue( - self.behavior_id, maxlen=trajectory_queue_len + self._behavior_id, maxlen=trajectory_queue_len ) # NOTE: we make policy queues of infinite length to avoid lockups of the trainers. # In the environment manager, we make sure to empty the policy queue before continuing to produce steps. self.policy_queue: AgentManagerQueue[Policy] = AgentManagerQueue( - self.behavior_id, maxlen=0 + self._behavior_id, maxlen=0 ) self.publish_trajectory_queue(self.trajectory_queue) @@ -328,11 +435,11 @@ def record_environment_stats( for stat_name, value_list in env_stats.items(): for val, agg_type in value_list: if agg_type == StatsAggregationMethod.AVERAGE: - self.stats_reporter.add_stat(stat_name, val, agg_type) + self._stats_reporter.add_stat(stat_name, val, agg_type) elif agg_type == StatsAggregationMethod.SUM: - self.stats_reporter.add_stat(stat_name, val, agg_type) + self._stats_reporter.add_stat(stat_name, val, agg_type) elif agg_type == StatsAggregationMethod.MOST_RECENT: # In order to prevent conflicts between multiple environments, # only stats from the first environment are recorded. if worker_id == 0: - self.stats_reporter.set_stat(stat_name, val) + self._stats_reporter.set_stat(stat_name, val) diff --git a/ml-agents/mlagents/trainers/behavior_id_utils.py b/ml-agents/mlagents/trainers/behavior_id_utils.py index 78a3a6ed1c..a9a0b5a9e7 100644 --- a/ml-agents/mlagents/trainers/behavior_id_utils.py +++ b/ml-agents/mlagents/trainers/behavior_id_utils.py @@ -1,5 +1,9 @@ from typing import NamedTuple from urllib.parse import urlparse, parse_qs +from mlagents_envs.base_env import AgentId, GroupId + +GlobalGroupId = str +GlobalAgentId = str class BehaviorIdentifiers(NamedTuple): @@ -46,8 +50,15 @@ def create_name_behavior_id(name: str, team_id: int) -> str: return name + "?team=" + str(team_id) -def get_global_agent_id(worker_id: int, agent_id: int) -> str: +def get_global_agent_id(worker_id: int, agent_id: AgentId) -> GlobalAgentId: """ Create an agent id that is unique across environment workers using the worker_id. """ - return f"${worker_id}-{agent_id}" + return f"agent_{worker_id}-{agent_id}" + + +def get_global_group_id(worker_id: int, group_id: GroupId) -> GlobalGroupId: + """ + Create a group id that is unique across environment workers when using the worker_id. + """ + return f"group_{worker_id}-{group_id}" diff --git a/ml-agents/mlagents/trainers/buffer.py b/ml-agents/mlagents/trainers/buffer.py index 302fe418a0..de30542a35 100644 --- a/ml-agents/mlagents/trainers/buffer.py +++ b/ml-agents/mlagents/trainers/buffer.py @@ -9,6 +9,10 @@ from mlagents_envs.exception import UnityException +# Elements in the buffer can be np.ndarray, or in the case of teammate obs, actions, rewards, +# a List of np.ndarray. This is done so that we don't have duplicated np.ndarrays, only references. +BufferEntry = Union[np.ndarray, List[np.ndarray]] + class BufferException(UnityException): """ @@ -21,8 +25,10 @@ class BufferException(UnityException): class BufferKey(enum.Enum): ACTION_MASK = "action_mask" CONTINUOUS_ACTION = "continuous_action" + NEXT_CONT_ACTION = "next_continuous_action" CONTINUOUS_LOG_PROBS = "continuous_log_probs" DISCRETE_ACTION = "discrete_action" + NEXT_DISC_ACTION = "next_discrete_action" DISCRETE_LOG_PROBS = "discrete_log_probs" DONE = "done" ENVIRONMENT_REWARDS = "environment_rewards" @@ -34,11 +40,22 @@ class BufferKey(enum.Enum): ADVANTAGES = "advantages" DISCOUNTED_RETURNS = "discounted_returns" + GROUP_DONES = "group_dones" + GROUPMATE_REWARDS = "groupmate_reward" + GROUP_REWARD = "group_reward" + GROUP_CONTINUOUS_ACTION = "group_continuous_action" + GROUP_DISCRETE_ACTION = "group_discrete_aaction" + GROUP_NEXT_CONT_ACTION = "group_next_cont_action" + GROUP_NEXT_DISC_ACTION = "group_next_disc_action" + class ObservationKeyPrefix(enum.Enum): OBSERVATION = "obs" NEXT_OBSERVATION = "next_obs" + GROUP_OBSERVATION = "group_obs" + NEXT_GROUP_OBSERVATION = "next_group_obs" + class RewardSignalKeyPrefix(enum.Enum): # Reward signals @@ -73,16 +90,23 @@ def advantage_key(name: str) -> AgentBufferKey: class AgentBufferField(list): """ - AgentBufferField is a list of numpy arrays. When an agent collects a field, you can add it to its - AgentBufferField with the append method. + AgentBufferField is a list of numpy arrays, or List[np.ndarray] for group entries. + When an agent collects a field, you can add it to its AgentBufferField with the append method. """ - def __init__(self): + def __init__(self, *args, **kwargs): self.padding_value = 0 - super().__init__() + super().__init__(*args, **kwargs) - def __str__(self): - return str(np.array(self).shape) + def __str__(self) -> str: + return f"AgentBufferField: {super().__str__()}" + + def __getitem__(self, index): + return_data = super().__getitem__(index) + if isinstance(return_data, list): + return AgentBufferField(return_data) + else: + return return_data def append(self, element: np.ndarray, padding_value: float = 0.0) -> None: """ @@ -95,31 +119,20 @@ def append(self, element: np.ndarray, padding_value: float = 0.0) -> None: super().append(element) self.padding_value = padding_value - def extend(self, data: np.ndarray) -> None: - """ - Adds a list of np.arrays to the end of the list of np.arrays. - :param data: The np.array list to append. - """ - self += list(np.array(data, dtype=np.float32)) - - def set(self, data): + def set(self, data: List[BufferEntry]) -> None: """ - Sets the list of np.array to the input data - :param data: The np.array list to be set. + Sets the list of BufferEntry to the input data + :param data: The BufferEntry list to be set. """ - # Make sure we convert incoming data to float32 if it's a float - dtype = None - if data is not None and len(data) and isinstance(data[0], float): - dtype = np.float32 self[:] = [] - self[:] = list(np.array(data, dtype=dtype)) + self[:] = data def get_batch( self, batch_size: int = None, training_length: Optional[int] = 1, sequential: bool = True, - ) -> np.ndarray: + ) -> List[BufferEntry]: """ Retrieve the last batch_size elements of length training_length from the list of np.array @@ -150,13 +163,10 @@ def get_batch( ) if batch_size * training_length > len(self): padding = np.array(self[-1], dtype=np.float32) * self.padding_value - return np.array( - [padding] * (training_length - leftover) + self[:], dtype=np.float32 - ) + return [padding] * (training_length - leftover) + self[:] + else: - return np.array( - self[len(self) - batch_size * training_length :], dtype=np.float32 - ) + return self[len(self) - batch_size * training_length :] else: # The sequences will have overlapping elements if batch_size is None: @@ -172,7 +182,7 @@ def get_batch( tmp_list: List[np.ndarray] = [] for end in range(len(self) - batch_size + 1, len(self) + 1): tmp_list += self[end - training_length : end] - return np.array(tmp_list, dtype=np.float32) + return tmp_list def reset_field(self) -> None: """ @@ -180,6 +190,44 @@ def reset_field(self) -> None: """ self[:] = [] + def padded_to_batch( + self, pad_value: np.float = 0, dtype: np.dtype = np.float32 + ) -> Union[np.ndarray, List[np.ndarray]]: + """ + Converts this AgentBufferField (which is a List[BufferEntry]) into a numpy array + with first dimension equal to the length of this AgentBufferField. If this AgentBufferField + contains a List[List[BufferEntry]] (i.e., in the case of group observations), return a List + containing numpy arrays or tensors, of length equal to the maximum length of an entry. Missing + For entries with less than that length, the array will be padded with pad_value. + :param pad_value: Value to pad List AgentBufferFields, when there are less than the maximum + number of agents present. + :param dtype: Dtype of output numpy array. + :return: Numpy array or List of numpy arrays representing this AgentBufferField, where the first + dimension is equal to the length of the AgentBufferField. + """ + if len(self) > 0 and not isinstance(self[0], list): + return np.asanyarray(self, dytpe=dtype) + + shape = None + for _entry in self: + # _entry could be an empty list if there are no group agents in this + # step. Find the first non-empty list and use that shape. + if _entry: + shape = _entry[0].shape + break + # If there were no groupmate agents in the entire batch, return an empty List. + if shape is None: + return [] + + # Convert to numpy array while padding with 0's + new_list = list( + map( + lambda x: np.asanyarray(x, dtype=dtype), + itertools.zip_longest(*self, fillvalue=np.full(shape, pad_value)), + ) + ) + return new_list + class AgentBuffer(MutableMapping): """ diff --git a/ml-agents/mlagents/trainers/policy/policy.py b/ml-agents/mlagents/trainers/policy/policy.py index 068c989d85..dbf899af7f 100644 --- a/ml-agents/mlagents/trainers/policy/policy.py +++ b/ml-agents/mlagents/trainers/policy/policy.py @@ -8,6 +8,7 @@ from mlagents.trainers.action_info import ActionInfo from mlagents.trainers.settings import TrainerSettings, NetworkSettings from mlagents.trainers.buffer import AgentBuffer +from mlagents.trainers.behavior_id_utils import GlobalAgentId class UnityPolicyException(UnityException): @@ -68,7 +69,7 @@ def make_empty_memory(self, num_agents): return np.zeros((num_agents, self.m_size), dtype=np.float32) def save_memories( - self, agent_ids: List[str], memory_matrix: Optional[np.ndarray] + self, agent_ids: List[GlobalAgentId], memory_matrix: Optional[np.ndarray] ) -> None: if memory_matrix is None: return @@ -81,21 +82,21 @@ def save_memories( for index, agent_id in enumerate(agent_ids): self.memory_dict[agent_id] = memory_matrix[index, :] - def retrieve_memories(self, agent_ids: List[str]) -> np.ndarray: + def retrieve_memories(self, agent_ids: List[GlobalAgentId]) -> np.ndarray: memory_matrix = np.zeros((len(agent_ids), self.m_size), dtype=np.float32) for index, agent_id in enumerate(agent_ids): if agent_id in self.memory_dict: memory_matrix[index, :] = self.memory_dict[agent_id] return memory_matrix - def retrieve_previous_memories(self, agent_ids: List[str]) -> np.ndarray: + def retrieve_previous_memories(self, agent_ids: List[GlobalAgentId]) -> np.ndarray: memory_matrix = np.zeros((len(agent_ids), self.m_size), dtype=np.float32) for index, agent_id in enumerate(agent_ids): if agent_id in self.previous_memory_dict: memory_matrix[index, :] = self.previous_memory_dict[agent_id] return memory_matrix - def remove_memories(self, agent_ids): + def remove_memories(self, agent_ids: List[GlobalAgentId]) -> None: for agent_id in agent_ids: if agent_id in self.memory_dict: self.memory_dict.pop(agent_id) @@ -113,19 +114,19 @@ def make_empty_previous_action(self, num_agents: int) -> np.ndarray: ) def save_previous_action( - self, agent_ids: List[str], action_tuple: ActionTuple + self, agent_ids: List[GlobalAgentId], action_tuple: ActionTuple ) -> None: for index, agent_id in enumerate(agent_ids): self.previous_action_dict[agent_id] = action_tuple.discrete[index, :] - def retrieve_previous_action(self, agent_ids: List[str]) -> np.ndarray: + def retrieve_previous_action(self, agent_ids: List[GlobalAgentId]) -> np.ndarray: action_matrix = self.make_empty_previous_action(len(agent_ids)) for index, agent_id in enumerate(agent_ids): if agent_id in self.previous_action_dict: action_matrix[index, :] = self.previous_action_dict[agent_id] return action_matrix - def remove_previous_action(self, agent_ids): + def remove_previous_action(self, agent_ids: List[GlobalAgentId]) -> None: for agent_id in agent_ids: if agent_id in self.previous_action_dict: self.previous_action_dict.pop(agent_id) diff --git a/ml-agents/mlagents/trainers/ppo/trainer.py b/ml-agents/mlagents/trainers/ppo/trainer.py index 5025adb500..c2aa20687f 100644 --- a/ml-agents/mlagents/trainers/ppo/trainer.py +++ b/ml-agents/mlagents/trainers/ppo/trainer.py @@ -180,7 +180,7 @@ def _update_policy(self): int(self.hyperparameters.batch_size / self.policy.sequence_length), 1 ) - advantages = self.update_buffer[BufferKey.ADVANTAGES].get_batch() + advantages = np.array(self.update_buffer[BufferKey.ADVANTAGES].get_batch()) self.update_buffer[BufferKey.ADVANTAGES].set( (advantages - advantages.mean()) / (advantages.std() + 1e-10) ) diff --git a/ml-agents/mlagents/trainers/tests/mock_brain.py b/ml-agents/mlagents/trainers/tests/mock_brain.py index a19b5046e5..a876f09d9d 100644 --- a/ml-agents/mlagents/trainers/tests/mock_brain.py +++ b/ml-agents/mlagents/trainers/tests/mock_brain.py @@ -3,7 +3,7 @@ from mlagents.trainers.buffer import AgentBuffer, AgentBufferKey from mlagents.trainers.torch.action_log_probs import LogProbsTuple -from mlagents.trainers.trajectory import Trajectory, AgentExperience +from mlagents.trainers.trajectory import AgentStatus, Trajectory, AgentExperience from mlagents_envs.base_env import ( DecisionSteps, TerminalSteps, @@ -20,6 +20,7 @@ def create_mock_steps( observation_specs: List[ObservationSpec], action_spec: ActionSpec, done: bool = False, + grouped: bool = False, ) -> Tuple[DecisionSteps, TerminalSteps]: """ Creates a mock Tuple[DecisionSteps, TerminalSteps] with observations. @@ -43,7 +44,8 @@ def create_mock_steps( reward = np.array(num_agents * [1.0], dtype=np.float32) interrupted = np.array(num_agents * [False], dtype=np.bool) agent_id = np.arange(num_agents, dtype=np.int32) - group_id = np.array(num_agents * [0], dtype=np.int32) + _gid = 1 if grouped else 0 + group_id = np.array(num_agents * [_gid], dtype=np.int32) group_reward = np.array(num_agents * [0.0], dtype=np.float32) behavior_spec = BehaviorSpec(observation_specs, action_spec) if done: @@ -78,6 +80,7 @@ def make_fake_trajectory( action_spec: ActionSpec, max_step_complete: bool = False, memory_size: int = 10, + num_other_agents_in_group: int = 0, ) -> Trajectory: """ Makes a fake trajectory of length length. If max_step_complete, @@ -117,6 +120,9 @@ def make_fake_trajectory( memory = np.ones(memory_size, dtype=np.float32) agent_id = "test_agent" behavior_id = "test_brain" + group_status = [] + for _ in range(num_other_agents_in_group): + group_status.append(AgentStatus(obs, reward, action, done)) experience = AgentExperience( obs=obs, reward=reward, @@ -127,6 +133,8 @@ def make_fake_trajectory( prev_action=prev_action, interrupted=max_step, memory=memory, + group_status=group_status, + group_reward=0, ) steps_list.append(experience) obs = [] @@ -142,10 +150,16 @@ def make_fake_trajectory( prev_action=prev_action, interrupted=max_step_complete, memory=memory, + group_status=group_status, + group_reward=0, ) steps_list.append(last_experience) return Trajectory( - steps=steps_list, agent_id=agent_id, behavior_id=behavior_id, next_obs=obs + steps=steps_list, + agent_id=agent_id, + behavior_id=behavior_id, + next_obs=obs, + next_group_obs=[obs] * num_other_agents_in_group, ) diff --git a/ml-agents/mlagents/trainers/tests/test_agent_processor.py b/ml-agents/mlagents/trainers/tests/test_agent_processor.py index 9b8affcc22..b0c446e974 100644 --- a/ml-agents/mlagents/trainers/tests/test_agent_processor.py +++ b/ml-agents/mlagents/trainers/tests/test_agent_processor.py @@ -1,5 +1,6 @@ from unittest import mock import pytest +from typing import List import mlagents.trainers.tests.mock_brain as mb import numpy as np from mlagents.trainers.agent_processor import ( @@ -27,6 +28,28 @@ def create_mock_policy(): return mock_policy +def _create_action_info(num_agents: int, agent_ids: List[str]) -> ActionInfo: + fake_action_outputs = { + "action": ActionTuple( + continuous=np.array([[0.1]] * num_agents, dtype=np.float32) + ), + "entropy": np.array([1.0], dtype=np.float32), + "learning_rate": 1.0, + "log_probs": LogProbsTuple( + continuous=np.array([[0.1]] * num_agents, dtype=np.float32) + ), + } + fake_action_info = ActionInfo( + action=ActionTuple(continuous=np.array([[0.1]] * num_agents, dtype=np.float32)), + env_action=ActionTuple( + continuous=np.array([[0.1]] * num_agents, dtype=np.float32) + ), + outputs=fake_action_outputs, + agent_ids=agent_ids, + ) + return fake_action_info + + @pytest.mark.parametrize("num_vis_obs", [0, 1, 2], ids=["vec", "1 viz", "2 viz"]) def test_agentprocessor(num_vis_obs): policy = create_mock_policy() @@ -39,14 +62,6 @@ def test_agentprocessor(num_vis_obs): stats_reporter=StatsReporter("testcat"), ) - fake_action_outputs = { - "action": ActionTuple(continuous=np.array([[0.1], [0.1]], dtype=np.float32)), - "entropy": np.array([1.0], dtype=np.float32), - "learning_rate": 1.0, - "log_probs": LogProbsTuple( - continuous=np.array([[0.1], [0.1]], dtype=np.float32) - ), - } mock_decision_steps, mock_terminal_steps = mb.create_mock_steps( num_agents=2, observation_specs=create_observation_specs_with_shapes( @@ -54,12 +69,7 @@ def test_agentprocessor(num_vis_obs): ), action_spec=ActionSpec.create_continuous(2), ) - fake_action_info = ActionInfo( - action=ActionTuple(continuous=np.array([[0.1], [0.1]], dtype=np.float32)), - env_action=ActionTuple(continuous=np.array([[0.1], [0.1]], dtype=np.float32)), - outputs=fake_action_outputs, - agent_ids=mock_decision_steps.agent_id, - ) + fake_action_info = _create_action_info(2, mock_decision_steps.agent_id) processor.publish_trajectory_queue(tqueue) # This is like the initial state after the env reset processor.add_experiences( @@ -76,9 +86,12 @@ def test_agentprocessor(num_vis_obs): # Assert that the trajectory is of length 5 trajectory = tqueue.put.call_args_list[0][0][0] assert len(trajectory.steps) == 5 + # Make sure ungrouped agents don't have team obs + for step in trajectory.steps: + assert len(step.group_status) == 0 # Assert that the AgentProcessor is empty - assert len(processor.experience_buffers[0]) == 0 + assert len(processor._experience_buffers[0]) == 0 # Test empty steps mock_decision_steps, mock_terminal_steps = mb.create_mock_steps( @@ -92,7 +105,66 @@ def test_agentprocessor(num_vis_obs): mock_decision_steps, mock_terminal_steps, 0, ActionInfo.empty() ) # Assert that the AgentProcessor is still empty - assert len(processor.experience_buffers[0]) == 0 + assert len(processor._experience_buffers[0]) == 0 + + +def test_group_statuses(): + policy = create_mock_policy() + tqueue = mock.Mock() + name_behavior_id = "test_brain_name" + processor = AgentProcessor( + policy, + name_behavior_id, + max_trajectory_length=5, + stats_reporter=StatsReporter("testcat"), + ) + + mock_decision_steps, mock_terminal_steps = mb.create_mock_steps( + num_agents=4, + observation_specs=create_observation_specs_with_shapes([(8,)]), + action_spec=ActionSpec.create_continuous(2), + grouped=True, + ) + fake_action_info = _create_action_info(4, mock_decision_steps.agent_id) + processor.publish_trajectory_queue(tqueue) + # This is like the initial state after the env reset + processor.add_experiences( + mock_decision_steps, mock_terminal_steps, 0, ActionInfo.empty() + ) + for _ in range(2): + processor.add_experiences( + mock_decision_steps, mock_terminal_steps, 0, fake_action_info + ) + + # Make terminal steps for some dead agents + mock_decision_steps_2, mock_terminal_steps_2 = mb.create_mock_steps( + num_agents=2, + observation_specs=create_observation_specs_with_shapes([(8,)]), + action_spec=ActionSpec.create_continuous(2), + done=True, + grouped=True, + ) + + processor.add_experiences( + mock_decision_steps_2, mock_terminal_steps_2, 0, fake_action_info + ) + fake_action_info = _create_action_info(4, mock_decision_steps.agent_id) + for _ in range(3): + processor.add_experiences( + mock_decision_steps, mock_terminal_steps, 0, fake_action_info + ) + + # Assert that four trajectories have been added to the Trainer + assert len(tqueue.put.call_args_list) == 4 + # Last trajectory should be the longest + trajectory = tqueue.put.call_args_list[0][0][-1] + + # Make sure trajectory has the right Groupmate Experiences + for step in trajectory.steps[0:3]: + assert len(step.group_status) == 3 + # After 2 agents has died + for step in trajectory.steps[3:]: + assert len(step.group_status) == 1 def test_agent_deletion(): @@ -156,21 +228,21 @@ def test_agent_deletion(): policy.save_previous_action.assert_has_calls(add_calls) policy.remove_previous_action.assert_has_calls(remove_calls) # Check that there are no experiences left - assert len(processor.experience_buffers.keys()) == 0 - assert len(processor.last_take_action_outputs.keys()) == 0 - assert len(processor.episode_steps.keys()) == 0 - assert len(processor.episode_rewards.keys()) == 0 - assert len(processor.last_step_result.keys()) == 0 + assert len(processor._experience_buffers.keys()) == 0 + assert len(processor._last_take_action_outputs.keys()) == 0 + assert len(processor._episode_steps.keys()) == 0 + assert len(processor._episode_rewards.keys()) == 0 + assert len(processor._last_step_result.keys()) == 0 # check that steps with immediate dones don't add to dicts processor.add_experiences( mock_done_decision_step, mock_done_terminal_step, 0, ActionInfo.empty() ) - assert len(processor.experience_buffers.keys()) == 0 - assert len(processor.last_take_action_outputs.keys()) == 0 - assert len(processor.episode_steps.keys()) == 0 - assert len(processor.episode_rewards.keys()) == 0 - assert len(processor.last_step_result.keys()) == 0 + assert len(processor._experience_buffers.keys()) == 0 + assert len(processor._last_take_action_outputs.keys()) == 0 + assert len(processor._episode_steps.keys()) == 0 + assert len(processor._episode_rewards.keys()) == 0 + assert len(processor._last_step_result.keys()) == 0 def test_end_episode(): @@ -222,10 +294,10 @@ def test_end_episode(): # Check that we removed every agent policy.remove_previous_action.assert_has_calls(remove_calls) # Check that there are no experiences left - assert len(processor.experience_buffers.keys()) == 0 - assert len(processor.last_take_action_outputs.keys()) == 0 - assert len(processor.episode_steps.keys()) == 0 - assert len(processor.episode_rewards.keys()) == 0 + assert len(processor._experience_buffers.keys()) == 0 + assert len(processor._last_take_action_outputs.keys()) == 0 + assert len(processor._episode_steps.keys()) == 0 + assert len(processor._episode_rewards.keys()) == 0 def test_agent_manager(): @@ -237,8 +309,8 @@ def test_agent_manager(): max_trajectory_length=5, stats_reporter=StatsReporter("testcat"), ) - assert len(manager.trajectory_queues) == 1 - assert isinstance(manager.trajectory_queues[0], AgentManagerQueue) + assert len(manager._trajectory_queues) == 1 + assert isinstance(manager._trajectory_queues[0], AgentManagerQueue) def test_agent_manager_queue(): diff --git a/ml-agents/mlagents/trainers/tests/test_buffer.py b/ml-agents/mlagents/trainers/tests/test_buffer.py index 5a3dbba7da..6d99cb0e88 100644 --- a/ml-agents/mlagents/trainers/tests/test_buffer.py +++ b/ml-agents/mlagents/trainers/tests/test_buffer.py @@ -21,14 +21,35 @@ def construct_fake_buffer(fake_agent_id): b = AgentBuffer() for step in range(9): b[ObsUtil.get_name_at(0)].append( - [ - 100 * fake_agent_id + 10 * step + 1, - 100 * fake_agent_id + 10 * step + 2, - 100 * fake_agent_id + 10 * step + 3, - ] + np.array( + [ + 100 * fake_agent_id + 10 * step + 1, + 100 * fake_agent_id + 10 * step + 2, + 100 * fake_agent_id + 10 * step + 3, + ], + dtype=np.float32, + ) ) b[BufferKey.CONTINUOUS_ACTION].append( - [100 * fake_agent_id + 10 * step + 4, 100 * fake_agent_id + 10 * step + 5] + np.array( + [ + 100 * fake_agent_id + 10 * step + 4, + 100 * fake_agent_id + 10 * step + 5, + ], + dtype=np.float32, + ) + ) + b[BufferKey.GROUP_CONTINUOUS_ACTION].append( + [ + np.array( + [ + 100 * fake_agent_id + 10 * step + 4, + 100 * fake_agent_id + 10 * step + 5, + ], + dtype=np.float32, + ) + ] + * 3 ) return b @@ -37,10 +58,16 @@ def test_buffer(): agent_1_buffer = construct_fake_buffer(1) agent_2_buffer = construct_fake_buffer(2) agent_3_buffer = construct_fake_buffer(3) + + # Test get_batch a = agent_1_buffer[ObsUtil.get_name_at(0)].get_batch( batch_size=2, training_length=1, sequential=True ) - assert_array(np.array(a), np.array([[171, 172, 173], [181, 182, 183]])) + assert_array( + np.array(a), np.array([[171, 172, 173], [181, 182, 183]], dtype=np.float32) + ) + + # Test get_batch a = agent_2_buffer[ObsUtil.get_name_at(0)].get_batch( batch_size=2, training_length=3, sequential=True ) @@ -54,7 +81,8 @@ def test_buffer(): [261, 262, 263], [271, 272, 273], [281, 282, 283], - ] + ], + dtype=np.float32, ), ) a = agent_2_buffer[ObsUtil.get_name_at(0)].get_batch( @@ -73,6 +101,13 @@ def test_buffer(): ] ), ) + # Test group entries return Lists of Lists + a = agent_2_buffer[BufferKey.GROUP_CONTINUOUS_ACTION].get_batch( + batch_size=2, training_length=1, sequential=True + ) + for _group_entry in a: + assert len(_group_entry) == 3 + agent_1_buffer.reset_agent() assert agent_1_buffer.num_experiences == 0 update_buffer = AgentBuffer() @@ -88,9 +123,38 @@ def test_buffer(): c = update_buffer.make_mini_batch(start=0, end=1) assert c.keys() == update_buffer.keys() + # Make sure the values of c are AgentBufferField + for val in c.values(): + assert isinstance(val, AgentBufferField) assert np.array(c[BufferKey.CONTINUOUS_ACTION]).shape == (1, 2) +def test_agentbufferfield(): + # Test constructor + a = AgentBufferField([0, 1, 2]) + for i, num in enumerate(a): + assert num == i + # Test indexing + assert a[i] == num + + # Test slicing + b = a[1:3] + assert b == [1, 2] + assert isinstance(b, AgentBufferField) + + # Test padding + c = AgentBufferField() + for _ in range(2): + c.append([np.array(1), np.array(2)]) + + for _ in range(2): + c.append([np.array(1)]) + + padded = c.padded_to_batch(pad_value=3) + assert np.array_equal(padded[0], np.array([1, 1, 1, 1])) + assert np.array_equal(padded[1], np.array([2, 2, 3, 3])) + + def fakerandint(values): return 19 diff --git a/ml-agents/mlagents/trainers/tests/test_trajectory.py b/ml-agents/mlagents/trainers/tests/test_trajectory.py index 5484f7da53..8d39478a9e 100644 --- a/ml-agents/mlagents/trainers/tests/test_trajectory.py +++ b/ml-agents/mlagents/trainers/tests/test_trajectory.py @@ -1,7 +1,10 @@ +import numpy as np + from mlagents.trainers.tests.mock_brain import make_fake_trajectory from mlagents.trainers.tests.dummy_config import create_observation_specs_with_shapes +from mlagents.trainers.trajectory import GroupObsUtil from mlagents_envs.base_env import ActionSpec -from mlagents.trainers.buffer import BufferKey, ObservationKeyPrefix +from mlagents.trainers.buffer import AgentBuffer, BufferKey, ObservationKeyPrefix VEC_OBS_SIZE = 6 ACTION_SIZE = 4 @@ -9,6 +12,7 @@ def test_trajectory_to_agentbuffer(): length = 15 + # These keys should be of type np.ndarray wanted_keys = [ (ObservationKeyPrefix.OBSERVATION, 0), (ObservationKeyPrefix.OBSERVATION, 1), @@ -24,14 +28,25 @@ def test_trajectory_to_agentbuffer(): BufferKey.ACTION_MASK, BufferKey.PREV_ACTION, BufferKey.ENVIRONMENT_REWARDS, + BufferKey.GROUP_REWARD, + ] + # These keys should be of type List + wanted_group_keys = [ + BufferKey.GROUPMATE_REWARDS, + BufferKey.GROUP_CONTINUOUS_ACTION, + BufferKey.GROUP_DISCRETE_ACTION, + BufferKey.GROUP_DONES, + BufferKey.GROUP_NEXT_CONT_ACTION, + BufferKey.GROUP_NEXT_DISC_ACTION, ] - wanted_keys = set(wanted_keys) + wanted_keys = set(wanted_keys + wanted_group_keys) trajectory = make_fake_trajectory( length=length, observation_specs=create_observation_specs_with_shapes( [(VEC_OBS_SIZE,), (84, 84, 3)] ), action_spec=ActionSpec.create_continuous(ACTION_SIZE), + num_other_agents_in_group=4, ) agentbuffer = trajectory.to_agentbuffer() seen_keys = set() @@ -39,4 +54,37 @@ def test_trajectory_to_agentbuffer(): assert len(field) == length seen_keys.add(key) - assert seen_keys == wanted_keys + assert seen_keys.issuperset(wanted_keys) + + for _key in wanted_group_keys: + for step in agentbuffer[_key]: + assert len(step) == 4 + + +def test_obsutil_group_from_buffer(): + buff = AgentBuffer() + # Create some obs + for _ in range(3): + buff[GroupObsUtil.get_name_at(0)].append(3 * [np.ones((5,), dtype=np.float32)]) + # Some agents have died + for _ in range(2): + buff[GroupObsUtil.get_name_at(0)].append(1 * [np.ones((5,), dtype=np.float32)]) + + # Get the group obs, which will be a List of Lists of np.ndarray, where each element is the same + # length as the AgentBuffer but contains only one agent's obs. Dead agents are padded by + # NaNs. + gobs = GroupObsUtil.from_buffer(buff, 1) + # Agent 0 is full + agent_0_obs = gobs[0] + for obs in agent_0_obs: + assert obs.shape == (buff.num_experiences, 5) + assert not np.isnan(obs).any() + + agent_1_obs = gobs[1] + for obs in agent_1_obs: + assert obs.shape == (buff.num_experiences, 5) + for i, _exp_obs in enumerate(obs): + if i >= 3: + assert np.isnan(_exp_obs).all() + else: + assert not np.isnan(_exp_obs).any() diff --git a/ml-agents/mlagents/trainers/tests/torch/test_agent_action.py b/ml-agents/mlagents/trainers/tests/torch/test_agent_action.py new file mode 100644 index 0000000000..cb17f805bd --- /dev/null +++ b/ml-agents/mlagents/trainers/tests/torch/test_agent_action.py @@ -0,0 +1,63 @@ +import numpy as np +from mlagents.torch_utils import torch + +from mlagents.trainers.buffer import AgentBuffer, BufferKey +from mlagents.trainers.torch.agent_action import AgentAction + + +def test_agent_action_group_from_buffer(): + buff = AgentBuffer() + # Create some actions + for _ in range(3): + buff[BufferKey.GROUP_CONTINUOUS_ACTION].append( + 3 * [np.ones((5,), dtype=np.float32)] + ) + buff[BufferKey.GROUP_DISCRETE_ACTION].append( + 3 * [np.ones((4,), dtype=np.float32)] + ) + # Some agents have died + for _ in range(2): + buff[BufferKey.GROUP_CONTINUOUS_ACTION].append( + 1 * [np.ones((5,), dtype=np.float32)] + ) + buff[BufferKey.GROUP_DISCRETE_ACTION].append( + 1 * [np.ones((4,), dtype=np.float32)] + ) + + # Get the group actions, which will be a List of Lists of AgentAction, where each element is the same + # length as the AgentBuffer but contains only one agent's obs. Dead agents are padded by + # NaNs. + gact = AgentAction.group_from_buffer(buff) + # Agent 0 is full + agent_0_act = gact[0] + assert agent_0_act.continuous_tensor.shape == (buff.num_experiences, 5) + assert agent_0_act.discrete_tensor.shape == (buff.num_experiences, 4) + + agent_1_act = gact[1] + assert agent_1_act.continuous_tensor.shape == (buff.num_experiences, 5) + assert agent_1_act.discrete_tensor.shape == (buff.num_experiences, 4) + assert (agent_1_act.continuous_tensor[0:3] > 0).all() + assert (agent_1_act.continuous_tensor[3:] == 0).all() + assert (agent_1_act.discrete_tensor[0:3] > 0).all() + assert (agent_1_act.discrete_tensor[3:] == 0).all() + + +def test_to_flat(): + # Both continuous and discrete + aa = AgentAction( + torch.tensor([[1.0, 1.0, 1.0]]), [torch.tensor([2]), torch.tensor([1])] + ) + flattened_actions = aa.to_flat([3, 3]) + assert torch.eq( + flattened_actions, torch.tensor([[1, 1, 1, 0, 0, 1, 0, 1, 0]]) + ).all() + + # Just continuous + aa = AgentAction(torch.tensor([[1.0, 1.0, 1.0]]), None) + flattened_actions = aa.to_flat([]) + assert torch.eq(flattened_actions, torch.tensor([1, 1, 1])).all() + + # Just discrete + aa = AgentAction(torch.tensor([]), [torch.tensor([2]), torch.tensor([1])]) + flattened_actions = aa.to_flat([3, 3]) + assert torch.eq(flattened_actions, torch.tensor([0, 0, 1, 0, 1, 0])).all() diff --git a/ml-agents/mlagents/trainers/torch/agent_action.py b/ml-agents/mlagents/trainers/torch/agent_action.py index 9d13d6b488..e862145392 100644 --- a/ml-agents/mlagents/trainers/torch/agent_action.py +++ b/ml-agents/mlagents/trainers/torch/agent_action.py @@ -1,4 +1,6 @@ from typing import List, Optional, NamedTuple +import itertools +import numpy as np from mlagents.torch_utils import torch from mlagents.trainers.buffer import AgentBuffer, BufferKey @@ -19,11 +21,14 @@ class AgentAction(NamedTuple): discrete_list: Optional[List[torch.Tensor]] @property - def discrete_tensor(self): + def discrete_tensor(self) -> torch.Tensor: """ Returns the discrete action list as a stacked tensor """ - return torch.stack(self.discrete_list, dim=-1) + if self.discrete_list is not None and len(self.discrete_list) > 0: + return torch.stack(self.discrete_list, dim=-1) + else: + return torch.empty(0) def to_action_tuple(self, clip: bool = False) -> ActionTuple: """ @@ -59,3 +64,80 @@ def from_buffer(buff: AgentBuffer) -> "AgentAction": discrete_tensor[..., i] for i in range(discrete_tensor.shape[-1]) ] return AgentAction(continuous, discrete) + + @staticmethod + def _group_agent_action_from_buffer( + buff: AgentBuffer, cont_action_key: BufferKey, disc_action_key: BufferKey + ) -> List["AgentAction"]: + """ + Extracts continuous and discrete groupmate actions, as specified by BufferKey, and + returns a List of AgentActions that correspond to the groupmate's actions. List will + be of length equal to the maximum number of groupmates in the buffer. Any spots where + there are less agents than maximum, the actions will be padded with 0's. + """ + continuous_tensors: List[torch.Tensor] = [] + discrete_tensors: List[torch.Tensor] = [] + if cont_action_key in buff: + padded_batch = buff[cont_action_key].padded_to_batch() + continuous_tensors = [ + ModelUtils.list_to_tensor(arr) for arr in padded_batch + ] + if disc_action_key in buff: + padded_batch = buff[disc_action_key].padded_to_batch(dtype=np.long) + discrete_tensors = [ + ModelUtils.list_to_tensor(arr, dtype=torch.long) for arr in padded_batch + ] + + actions_list = [] + for _cont, _disc in itertools.zip_longest( + continuous_tensors, discrete_tensors, fillvalue=None + ): + if _disc is not None: + _disc = [_disc[..., i] for i in range(_disc.shape[-1])] + actions_list.append(AgentAction(_cont, _disc)) + return actions_list + + @staticmethod + def group_from_buffer(buff: AgentBuffer) -> List["AgentAction"]: + """ + A static method that accesses next group continuous and discrete action fields in an AgentBuffer + and constructs a padded List of AgentActions that represent the group agent actions. + The List is of length equal to max number of groupmate agents in the buffer, and the AgentBuffer iss + of the same length as the buffer. Empty spots (e.g. when agents die) are padded with 0. + :param buff: AgentBuffer of a batch or trajectory + :return: List of groupmate's AgentActions + """ + return AgentAction._group_agent_action_from_buffer( + buff, BufferKey.GROUP_CONTINUOUS_ACTION, BufferKey.GROUP_DISCRETE_ACTION + ) + + @staticmethod + def group_from_buffer_next(buff: AgentBuffer) -> List["AgentAction"]: + """ + A static method that accesses next group continuous and discrete action fields in an AgentBuffer + and constructs a padded List of AgentActions that represent the next group agent actions. + The List is of length equal to max number of groupmate agents in the buffer, and the AgentBuffer iss + of the same length as the buffer. Empty spots (e.g. when agents die) are padded with 0. + :param buff: AgentBuffer of a batch or trajectory + :return: List of groupmate's AgentActions + """ + return AgentAction._group_agent_action_from_buffer( + buff, BufferKey.GROUP_NEXT_CONT_ACTION, BufferKey.GROUP_NEXT_DISC_ACTION + ) + + def to_flat(self, discrete_branches: List[int]) -> torch.Tensor: + """ + Flatten this AgentAction into a single torch Tensor of dimension (batch, num_continuous + num_one_hot_discrete). + Discrete actions are converted into one-hot and concatenated with continuous actions. + :param discrete_branches: List of sizes for discrete actions. + :return: Tensor of flattened actions. + """ + # if there are any discrete actions, create one-hot + if self.discrete_list is not None and self.discrete_list: + discrete_oh = ModelUtils.actions_to_onehot( + self.discrete_tensor, discrete_branches + ) + discrete_oh = torch.cat(discrete_oh, dim=1) + else: + discrete_oh = torch.empty(0) + return torch.cat([self.continuous_tensor, discrete_oh], dim=-1) diff --git a/ml-agents/mlagents/trainers/torch/utils.py b/ml-agents/mlagents/trainers/torch/utils.py index 342a1fdde2..a5c6cfece7 100644 --- a/ml-agents/mlagents/trainers/torch/utils.py +++ b/ml-agents/mlagents/trainers/torch/utils.py @@ -221,6 +221,18 @@ def list_to_tensor( """ return torch.as_tensor(np.asanyarray(ndarray_list), dtype=dtype) + @staticmethod + def list_to_tensor_list( + ndarray_list: List[np.ndarray], dtype: Optional[torch.dtype] = torch.float32 + ) -> torch.Tensor: + """ + Converts a list of numpy arrays into a list of tensors. MUCH faster than + calling as_tensor on the list directly. + """ + return [ + torch.as_tensor(np.asanyarray(_arr), dtype=dtype) for _arr in ndarray_list + ] + @staticmethod def to_numpy(tensor: torch.Tensor) -> np.ndarray: """ diff --git a/ml-agents/mlagents/trainers/trajectory.py b/ml-agents/mlagents/trainers/trajectory.py index ffb28ef973..efd59a68b9 100644 --- a/ml-agents/mlagents/trainers/trajectory.py +++ b/ml-agents/mlagents/trainers/trajectory.py @@ -11,7 +11,25 @@ from mlagents.trainers.torch.action_log_probs import LogProbsTuple +class AgentStatus(NamedTuple): + """ + Stores observation, action, and reward for an agent. Does not have additional + fields that are present in AgentExperience. + """ + + obs: List[np.ndarray] + reward: float + action: ActionTuple + done: bool + + class AgentExperience(NamedTuple): + """ + Stores the full amount of data for an agent in one timestep. Includes + the status' of group mates and the group reward, as well as the probabilities + outputted by the policy. + """ + obs: List[np.ndarray] reward: float done: bool @@ -21,6 +39,8 @@ class AgentExperience(NamedTuple): prev_action: np.ndarray interrupted: bool memory: np.ndarray + group_status: List[AgentStatus] + group_reward: float class ObsUtil: @@ -59,11 +79,66 @@ def from_buffer_next(batch: AgentBuffer, num_obs: int) -> List[np.array]: return result +class GroupObsUtil: + @staticmethod + def get_name_at(index: int) -> AgentBufferKey: + """ + returns the name of the observation given the index of the observation + """ + return ObservationKeyPrefix.GROUP_OBSERVATION, index + + @staticmethod + def get_name_at_next(index: int) -> AgentBufferKey: + """ + returns the name of the next team observation given the index of the observation + """ + return ObservationKeyPrefix.NEXT_GROUP_OBSERVATION, index + + @staticmethod + def _transpose_list_of_lists( + list_list: List[List[np.ndarray]], + ) -> List[List[np.ndarray]]: + return list(map(list, zip(*list_list))) + + @staticmethod + def from_buffer(batch: AgentBuffer, num_obs: int) -> List[np.array]: + """ + Creates the list of observations from an AgentBuffer + """ + separated_obs: List[np.array] = [] + for i in range(num_obs): + separated_obs.append( + batch[GroupObsUtil.get_name_at(i)].padded_to_batch(pad_value=np.nan) + ) + # separated_obs contains a List(num_obs) of Lists(num_agents), we want to flip + # that and get a List(num_agents) of Lists(num_obs) + result = GroupObsUtil._transpose_list_of_lists(separated_obs) + return result + + @staticmethod + def from_buffer_next(batch: AgentBuffer, num_obs: int) -> List[np.array]: + """ + Creates the list of observations from an AgentBuffer + """ + separated_obs: List[np.array] = [] + for i in range(num_obs): + separated_obs.append( + batch[GroupObsUtil.get_name_at_next(i)].padded_to_batch( + pad_value=np.nan + ) + ) + # separated_obs contains a List(num_obs) of Lists(num_agents), we want to flip + # that and get a List(num_agents) of Lists(num_obs) + result = GroupObsUtil._transpose_list_of_lists(separated_obs) + return result + + class Trajectory(NamedTuple): steps: List[AgentExperience] next_obs: List[ np.ndarray ] # Observation following the trajectory, for bootstrapping + next_group_obs: List[List[np.ndarray]] agent_id: str behavior_id: str @@ -78,7 +153,8 @@ def to_agentbuffer(self) -> AgentBuffer: agent_buffer_trajectory = AgentBuffer() obs = self.steps[0].obs for step, exp in enumerate(self.steps): - if step < len(self.steps) - 1: + is_last_step = step == len(self.steps) - 1 + if not is_last_step: next_obs = self.steps[step + 1].obs else: next_obs = self.next_obs @@ -88,11 +164,79 @@ def to_agentbuffer(self) -> AgentBuffer: agent_buffer_trajectory[ObsUtil.get_name_at(i)].append(obs[i]) agent_buffer_trajectory[ObsUtil.get_name_at_next(i)].append(next_obs[i]) + # Take care of teammate obs and actions + teammate_continuous_actions, teammate_discrete_actions, teammate_rewards = ( + [], + [], + [], + ) + for group_status in exp.group_status: + teammate_rewards.append(group_status.reward) + teammate_continuous_actions.append(group_status.action.continuous) + teammate_discrete_actions.append(group_status.action.discrete) + + # Team actions + agent_buffer_trajectory[BufferKey.GROUP_CONTINUOUS_ACTION].append( + teammate_continuous_actions + ) + agent_buffer_trajectory[BufferKey.GROUP_DISCRETE_ACTION].append( + teammate_discrete_actions + ) + agent_buffer_trajectory[BufferKey.GROUPMATE_REWARDS].append( + teammate_rewards + ) + agent_buffer_trajectory[BufferKey.GROUP_REWARD].append(exp.group_reward) + + # Next actions + teammate_cont_next_actions = [] + teammate_disc_next_actions = [] + if not is_last_step: + next_exp = self.steps[step + 1] + for group_status in next_exp.group_status: + teammate_cont_next_actions.append(group_status.action.continuous) + teammate_disc_next_actions.append(group_status.action.discrete) + else: + for group_status in exp.group_status: + teammate_cont_next_actions.append(group_status.action.continuous) + teammate_disc_next_actions.append(group_status.action.discrete) + + agent_buffer_trajectory[BufferKey.GROUP_NEXT_CONT_ACTION].append( + teammate_cont_next_actions + ) + agent_buffer_trajectory[BufferKey.GROUP_NEXT_DISC_ACTION].append( + teammate_disc_next_actions + ) + + for i in range(num_obs): + ith_group_obs = [] + for _group_status in exp.group_status: + # Assume teammates have same obs space + ith_group_obs.append(_group_status.obs[i]) + agent_buffer_trajectory[GroupObsUtil.get_name_at(i)].append( + ith_group_obs + ) + + ith_group_obs_next = [] + if is_last_step: + for _obs in self.next_group_obs: + ith_group_obs_next.append(_obs[i]) + else: + next_group_status = self.steps[step + 1].group_status + for _group_status in next_group_status: + # Assume teammates have same obs space + ith_group_obs_next.append(_group_status.obs[i]) + agent_buffer_trajectory[GroupObsUtil.get_name_at_next(i)].append( + ith_group_obs_next + ) + if exp.memory is not None: agent_buffer_trajectory[BufferKey.MEMORY].append(exp.memory) agent_buffer_trajectory[BufferKey.MASKS].append(1.0) agent_buffer_trajectory[BufferKey.DONE].append(exp.done) + agent_buffer_trajectory[BufferKey.GROUP_DONES].append( + [_status.done for _status in exp.group_status] + ) # Adds the log prob and action of continuous/discrete separately agent_buffer_trajectory[BufferKey.CONTINUOUS_ACTION].append( @@ -101,6 +245,22 @@ def to_agentbuffer(self) -> AgentBuffer: agent_buffer_trajectory[BufferKey.DISCRETE_ACTION].append( exp.action.discrete ) + + cont_next_actions = np.zeros_like(exp.action.continuous) + disc_next_actions = np.zeros_like(exp.action.discrete) + + if not is_last_step: + next_action = self.steps[step + 1].action + cont_next_actions = next_action.continuous + disc_next_actions = next_action.discrete + + agent_buffer_trajectory[BufferKey.NEXT_CONT_ACTION].append( + cont_next_actions + ) + agent_buffer_trajectory[BufferKey.NEXT_DISC_ACTION].append( + disc_next_actions + ) + agent_buffer_trajectory[BufferKey.CONTINUOUS_LOG_PROBS].append( exp.action_probs.continuous ) @@ -137,6 +297,14 @@ def done_reached(self) -> bool: """ return self.steps[-1].done + @property + def teammate_dones_reached(self) -> bool: + """ + Returns true if all teammates are done at the end of the trajectory. + Combine with done_reached to check if the whole team is done. + """ + return all(_status.done for _status in self.steps[-1].group_status) + @property def interrupted(self) -> bool: """