Coverage Control Library
Loading...
Searching...
No Matches
data_loader_utils.py
Go to the documentation of this file.
1# This file is part of the CoverageControl library
2#
3# Author: Saurav Agarwal
4# Contact: sauravag@seas.upenn.edu, agr.saurav1@gmail.com
5# Repository: https://github.com/KumarRobotics/CoverageControl
6#
7# Copyright (c) 2024, Saurav Agarwal
8#
9# The CoverageControl library is free software: you can redistribute it and/or
10# modify it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or (at your
12# option) any later version.
13#
14# The CoverageControl library is distributed in the hope that it will be
15# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
17# Public License for more details.
18#
19# You should have received a copy of the GNU General Public License along with
20# CoverageControl library. If not, see <https://www.gnu.org/licenses/>.
21
22import torch
23import torch_geometric
24from coverage_control import IOUtils
25
26__all__ = ["DataLoaderUtils"]
27
28
29
30class DataLoaderUtils:
31 """
32 Class to provide utility functions to load tensors and configuration files
33 """
34
35 @staticmethod
36 def load_maps(path: str, use_comm_map: bool = False) -> torch.tensor:
37 """
38 Function to load maps stored as tensors
39
40 The maps are stored as tensors in the following format:
41 - {path}/local_maps.pt: Local maps
42 - {path}/obstacle_maps.pt: Obstacle maps
43 - {path}/comm_maps.pt: Communication maps (if use_comm_map is True)
44
45 Args:
46 path (str): Path to the directory containing the maps
47 use_comm_map (bool): Whether to load the communication map
48
49 Returns:
50 maps: The loaded maps
51
52 """
53 local_maps = IOUtils.load_tensor(f"{path}/local_maps.pt")
54 local_maps = local_maps.to_dense().unsqueeze(2)
55 obstacle_maps = IOUtils.load_tensor(f"{path}/obstacle_maps.pt")
56 obstacle_maps = obstacle_maps.to_dense().unsqueeze(2)
57
58 if use_comm_map:
59 comm_maps = IOUtils.load_tensor(f"{path}/comm_maps.pt")
60 comm_maps = comm_maps.to_dense()
61 # comm_maps = (comm_maps * 256 + 256)/512
62 maps = torch.cat([local_maps, comm_maps, obstacle_maps], 2)
63 else:
64 maps = torch.cat([local_maps, obstacle_maps], 2)
65
66 return maps
67
68 @staticmethod
69 def load_features(
70 path: str, output_dim: int = None
71 ) -> tuple[torch.tensor, torch.tensor, torch.tensor]:
72 """
73 Function to load normalized features
74
75 The features are stored as tensors in the following format:
76 - {path}/normalized_coverage_features.pt: Normalized coverage features
77 - {path}/../coverage_features_mean.pt: Mean of the coverage features
78 - {path}/../coverage_features_std.pt: Standard deviation of the coverage features
79
80 Args:
81 path (str): Path to the directory containing the features
82 output_dim (int): Output dimension of the features
83
84 Returns:
85 features: The loaded features
86 features_mean: Mean of the features
87 features_std: Standard deviation of the features
88 """
89 normalized_coverage_features = IOUtils.load_tensor(
90 f"{path}/normalized_coverage_features.pt"
91 )
92 coverage_features_mean = IOUtils.load_tensor(
93 f"{path}/../coverage_features_mean.pt"
94 )
95 coverage_features_std = IOUtils.load_tensor(
96 f"{path}/../coverage_features_std.pt"
97 )
98
99 if output_dim is not None:
100 normalized_coverage_features = normalized_coverage_features[
101 :, :, :output_dim
102 ]
103
104 return (
105 normalized_coverage_features,
106 coverage_features_mean,
107 coverage_features_std,
108 )
109
110 @staticmethod
111 def load_actions(path: str) -> tuple[torch.tensor, torch.tensor, torch.tensor]:
112 """
113 Function to load normalized actions
114
115 The actions are stored as tensors in the following format:
116 - {path}/normalized_actions.pt: Normalized actions
117 - {path}/../actions_mean.pt: Mean of the actions
118 - {path}/../actions_std.pt: Standard deviation of the actions
119
120 Args:
121 path (str): Path to the directory containing the actions
122
123 Returns:
124 actions: The loaded actions
125 actions_mean: Mean of the actions
126 actions_std: Standard deviation of the actions
127
128 """
129 actions = IOUtils.load_tensor(f"{path}/normalized_actions.pt")
130 actions_mean = IOUtils.load_tensor(f"{path}/../actions_mean.pt")
131 actions_std = IOUtils.load_tensor(f"{path}/../actions_std.pt")
132
133 return actions, actions_mean, actions_std
134
135 @staticmethod
136 def load_robot_positions(path: str) -> torch.tensor:
137 """
138 Function to load robot positions
139
140 The robot positions are stored as tensors in the following format:
141 - {path}/robot_positions.pt: Robot positions
142
143 Args:
144 path (str): Path to the directory containing the robot positions
145
146 Returns:
147 robot_positions: The loaded robot positions
148
149 """
150 robot_positions = IOUtils.load_tensor(f"{path}/robot_positions.pt")
151
152 return robot_positions
153
154 @staticmethod
155 def load_edge_weights(path: str) -> torch.tensor:
156 """
157 Function to load edge weights
158
159 The edge weights are stored as tensors in the following format:
160 - {path}/edge_weights.pt: Edge weights
161
162 Args:
163 path (str): Path to the directory containing the edge weights
164
165 Returns:
166 edge_weights: The loaded edge weights
167
168 """
169 edge_weights = IOUtils.load_tensor(f"{path}/edge_weights.pt")
170 edge_weights.to_dense()
171
172 return edge_weights
173
174 @staticmethod
176 feature: torch.tensor, edge_weights: torch.tensor, pos: torch.tensor = None
177 ) -> torch_geometric.data.Data:
178 """
179 The function converts the feature, edge_weights and pos to a torch_geometric.data.Data object
180 This is essential for using the data with the PyTorch Geometric library
181
182 Args:
183 feature (torch.tensor): The feature tensor
184 edge_weights (torch.tensor): The edge weights tensor
185 pos (torch.tensor): The position tensor
186
187 Returns:
188 data: The torch_geometric.data.Data object
189 data.x: The feature tensor
190 data.edge_index: The edge index tensor
191 data.edge_weight: The edge weight tensor
192 data.pos: The position tensor (if pos is not None)
193
194 """
195 # senders, receivers = numpy.nonzero(edge_weights)
196 # weights = edge_weights[senders, receivers]
197 # edge_index = numpy.stack([senders, receivers])
198 edge_weights = edge_weights.to_sparse()
199 edge_weights = edge_weights.coalesce()
200 edge_index = edge_weights.indices().long()
201 weights = edge_weights.values().float()
202 # weights = torch.reciprocal(edge_weights.values().float())
203
204 if pos is None:
205 data = torch_geometric.data.Data(
206 x=feature,
207 edge_index=edge_index.clone(),
208 edge_weight=weights.clone(),
209 )
210 else:
211 data = torch_geometric.data.Data(
212 x=feature,
213 edge_index=edge_index.clone(),
214 edge_weight=weights.clone(),
215 pos=pos.clone(),
216 )
217
218 return data
Class to provide utility functions to load tensors and configuration files.
torch.tensor load_edge_weights(str path)
Function to load edge weights.
torch.tensor load_robot_positions(str path)
Function to load robot positions.
tuple[torch.tensor, torch.tensor, torch.tensor] load_actions(str path)
Function to load normalized actions.
torch_geometric.data.Data to_torch_geometric_data(torch.tensor feature, torch.tensor edge_weights, torch.tensor pos=None)
The function converts the feature, edge_weights and pos to a torch_geometric.data....
torch.tensor load_maps(str path, bool use_comm_map=False)
Function to load maps stored as tensors.
tuple[torch.tensor, torch.tensor, torch.tensor] load_features(str path, int output_dim=None)
Function to load normalized features.