Coverage Control Library
Loading...
Searching...
No Matches
coverage_env_utils.py
Go to the documentation of this file.
1# This file is part of the CoverageControl library
2#
3# Author: Saurav Agarwal
4# Contact: sauravag@seas.upenn.edu, agr.saurav1@gmail.com
5# Repository: https://github.com/KumarRobotics/CoverageControl
6#
7# Copyright (c) 2024, Saurav Agarwal
8#
9# The CoverageControl library is free software: you can redistribute it and/or
10# modify it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or (at your
12# option) any later version.
13#
14# The CoverageControl library is distributed in the hope that it will be
15# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
17# Public License for more details.
18#
19# You should have received a copy of the GNU General Public License along with
20# CoverageControl library. If not, see <https://www.gnu.org/licenses/>.
21
22"""
23Utility functions for coverage environment
24"""
25
26
28
29import math
30
31import numpy
32
33# import cv2
34import torch
35import torch_geometric
36import torchvision
37from scipy.spatial import distance_matrix
38
39from ...core import CoverageSystem, DblVector, DblVectorVector, Parameters, PointVector
40
41
42
43class CoverageEnvUtils:
44 """
45 Class for utility functions for coverage environment
46 """
47
48 @staticmethod
49 def to_tensor(data: object) -> torch.Tensor:
50 """
51 Converts various types of data to torch.Tensor
52
53 Can accept the following types:
54 - numpy.ndarray
55 - PointVector
56 - DblVectorVector
57 - DblVector
58
59 Args:
60 data: input data
61
62 Returns:
63 torch.Tensor: converted data
64
65 Raises:
66 ValueError: if data type is not supported
67
68 """
69
70 if isinstance(data, numpy.ndarray):
71 return torch.from_numpy(numpy.copy(data.astype(numpy.float32)))
72
73 if isinstance(data, PointVector):
74 data_tensor = torch.Tensor(len(data), 2)
75
76 for i, _ in enumerate(data):
77 data_tensor[i] = CoverageEnvUtils.to_tensor(data[i])
78
79 return data_tensor
80
81 if isinstance(data, DblVectorVector):
82 data_tensor = torch.Tensor(len(data))
83
84 for i, _ in enumerate(data):
85 data_tensor[i] = CoverageEnvUtils.to_tensor(data[i])
86
87 return data_tensor
88
89 if isinstance(data, DblVector):
90 data_tensor = torch.Tensor(len(data))
91
92 for i, _ in enumerate(data):
93 data_tensor[i] = float(data[i])
94
95 return data_tensor
96 raise ValueError(f"Unknown data type: {type(data)}")
97
98 @staticmethod
99 def get_raw_local_maps(env: CoverageSystem, params: Parameters) -> torch.Tensor:
100 """
101 Get raw local maps
102
103 Args:
104 env: coverage environment
105 params: parameters
106
107 Returns:
108 torch.Tensor: raw local maps
109
110 """
111 local_maps = torch.zeros(
112 (env.GetNumRobots(), params.pLocalMapSize, params.pLocalMapSize)
113 )
114
115 for r_idx in range(env.GetNumRobots()):
116 local_maps[r_idx] = CoverageEnvUtils.to_tensor(env.GetRobotLocalMap(r_idx))
117
118 return local_maps
119
120 @staticmethod
121 def get_raw_obstacle_maps(env: CoverageSystem, params: Parameters) -> torch.Tensor:
122 """
123 Get raw obstacle maps
124
125 Args:
126 env: coverage environment
127 params: parameters
128
129 Returns:
130 torch.Tensor: raw obstacle maps
131
132 """
133 obstacle_maps = torch.zeros(
134 (env.GetNumRobots(), params.pLocalMapSize, params.pLocalMapSize)
135 )
136
137 for r_idx in range(env.GetNumRobots()):
138 obstacle_maps[r_idx] = CoverageEnvUtils.to_tensor(
139 env.GetRobotObstacleMap(r_idx)
140 )
141
142 return obstacle_maps
143
144 @staticmethod
146 env: CoverageSystem, params: Parameters, map_size: int
147 ) -> torch.Tensor:
148 """
149 Generate communication maps from positions
150
151 Communication maps are composed of two channels.
152 Each channnel has non-zero values for cells that correspond to the relative positions of the neighbors.
153 For the first channel, the value is the x-coordinate of the relative position divided by the communication range.
154 Similarly, the y-coordinte is used for the second channel.
155
156 Args:
157 env: coverage environment
158 params: parameters
159 map_size: size of the map
160
161 Returns:
162 torch.Tensor: communication maps
163 """
164 num_robots = env.GetNumRobots()
165
166 comm_maps = torch.zeros((num_robots, 2, map_size, map_size))
167
168 for r_idx in range(num_robots):
169 neighbors_pos = CoverageEnvUtils.to_tensor(
170 env.GetRelativePositonsNeighbors(r_idx)
171 )
172 scaled_indices = torch.round(
173 neighbors_pos
174 * map_size
175 / (params.pCommunicationRange * params.pResolution * 2.0)
176 + (map_size / 2.0 - params.pResolution / 2.0)
177 )
178 # comm_range_mask = relative_dist[r_idx] < params.pCommunicationRange
179 # scaled_indices = scaled_relative_pos[r_idx][comm_range_mask]
180 indices = torch.transpose(scaled_indices, 1, 0)
181 indices = indices.long()
182 values = neighbors_pos / params.pCommunicationRange
183 # values = values / params.pCommunicationRange
184 # values = (values + params.pCommunicationRange) / (2. * params.pCommunicationRange)
185 comm_maps[r_idx][0] = torch.sparse_coo_tensor(
186 indices, values[:, 0], torch.Size([map_size, map_size])
187 ).to_dense()
188 comm_maps[r_idx][1] = torch.sparse_coo_tensor(
189 indices, values[:, 1], torch.Size([map_size, map_size])
190 ).to_dense()
191
192 return comm_maps
193 # positions = env.GetRobotPositions()
194 # robot_positions = CoverageEnvUtils.to_tensor(env.GetRobotPositions())
195 # relative_pos = robot_positions.unsqueeze(0) - robot_positions.unsqueeze(1)
196 # scaled_relative_pos = torch.round(relative_pos * map_size / (params.pCommunicationRange * params.pResolution * 2.) + (map_size / 2. - params.pResolution / 2.))
197 # relative_dist = relative_pos.norm(2, 2)
198 # diagonal_mask = torch.eye(num_robots).to(torch.bool)
199 # relative_dist.masked_fill_(diagonal_mask, params.pCommunicationRange + 1)
200
201 @staticmethod
202 def resize_maps(maps: torch.Tensor, resized_map_size: int) -> torch.Tensor:
203 """
204 Resize maps to a given size
205 Uses bilinear interpolation from torchvision.transforms.functional.resize
206 Options: antialias=True
207
208 Args:
209 maps: input maps
210 resized_map_size: size of the resized maps
211
212 Returns:
213 torch.Tensor: resized maps
214
215 """
216 shape = maps.shape
217 maps = maps.view(-1, maps.shape[-2], maps.shape[-1])
218 maps = torchvision.transforms.functional.resize(
219 maps,
220 (resized_map_size, resized_map_size),
221 interpolation=torchvision.transforms.InterpolationMode.BILINEAR,
222 antialias=True,
223 )
224 maps = maps.view(shape[:-2] + maps.shape[-2:])
225
226 return maps
227
228 @staticmethod
229 def get_maps(
230 env: CoverageSystem,
231 params: Parameters,
232 resized_map_size: int,
233 use_comm_map: bool,
234 ) -> torch.Tensor:
235 """
236 Get maps for the coverage environment
237
238 Args:
239 env: coverage environment
240 params: parameters
241 resized_map_size: size of the resized maps
242 use_comm_map: whether to use communication maps
243
244 Returns:
245 torch.Tensor: maps
246
247 """
248
249 num_robots = env.GetNumRobots()
250 raw_local_maps = CoverageEnvUtils.get_raw_local_maps(env, params)
251 resized_local_maps = CoverageEnvUtils.resize_maps(
252 raw_local_maps, resized_map_size
253 )
254 raw_obstacle_maps = CoverageEnvUtils.get_raw_obstacle_maps(env, params)
255 resized_obstacle_maps = CoverageEnvUtils.resize_maps(
256 raw_obstacle_maps, resized_map_size
257 )
258
259 if use_comm_map:
260 comm_maps = CoverageEnvUtils.get_communication_maps(
261 env, params, resized_map_size
262 )
263 maps = torch.cat(
264 [
265 resized_local_maps.unsqueeze(1),
266 comm_maps,
267 resized_obstacle_maps.unsqueeze(1),
268 ],
269 1,
270 )
271 else:
272 maps = torch.cat(
273 [resized_local_maps.unsqueeze(1), resized_obstacle_maps.unsqueeze(1)], 1
274 )
275
276 return maps
277
278 @staticmethod
279 def get_voronoi_features(env: CoverageSystem) -> torch.Tensor:
280 """
281 Get voronoi features
282
283 Args:
284 env: coverage environment
285
286 Returns:
287 torch.Tensor: voronoi features
288 """
289 features = env.GetRobotVoronoiFeatures()
290 tensor_features = torch.zeros((len(features), len(features[0])))
291
292 for r_idx, _ in enumerate(features):
293 tensor_features[r_idx] = CoverageEnvUtils.to_tensor(features[r_idx])
294
295 return tensor_features
296
297 @staticmethod
298 def get_robot_positions(env: CoverageSystem) -> torch.Tensor:
299 """
300 Get robot positions
301
302 Args:
303 env: coverage environment
304
305 Returns:
306 torch.Tensor: robot positions
307 """
308 robot_positions = CoverageEnvUtils.to_tensor(env.GetRobotPositions())
310 return robot_positions
311
312 @staticmethod
313 def get_weights(env: CoverageSystem, params: Parameters) -> torch.Tensor:
314 """
315 Get edge weights for the communication graph
316
317 Args:
318 env: coverage environment
319 params: parameters
320
321 Returns:
322 torch.Tensor: edge weights
323 """
324 onebyexp = 1.0 / math.exp(1.0)
325 robot_positions = CoverageEnvUtils.to_tensor(env.GetRobotPositions())
326 pairwise_distances = torch.cdist(robot_positions, robot_positions, 2)
327 edge_weights = torch.exp(
328 -(pairwise_distances.square())
329 / (params.pCommunicationRange * params.pCommunicationRange)
330 )
331 edge_weights.masked_fill_(edge_weights < onebyexp, 0)
332 edge_weights.fill_diagonal_(0)
333
334 return edge_weights
335
336 # Legacy edge weights used in previous research
337 # The weights are proportional to the distance
338 # Trying to move away from this
339 @staticmethod
341 robot_positions: PointVector, world_map_size: int, comm_range: float
342 ) -> torch.Tensor:
343 """
344 Convert robot positions to edge weights
345
346 Args:
347 robot_positions: robot positions
348 world_map_size: size of the world map
349 comm_range: communication range
350
351 Returns:
352 torch.Tensor: edge weights
353 """
354 x = numpy.array(robot_positions)
355 s_mat = distance_matrix(x, x)
356 s_mat[s_mat > comm_range] = 0
357 c_mat = (world_map_size**2) / (s_mat.shape[0] ** 2)
358 c_mat = 3 / c_mat
359 graph_obs = c_mat * s_mat
360
361 return graph_obs
362
363 @staticmethod
365 env: CoverageSystem,
366 params: Parameters,
367 use_cnn: bool,
368 use_comm_map: bool,
369 map_size: int,
370 ) -> torch_geometric.data.Data:
371 """
372 Get torch geometric data
373 In this function, the edge weights are binary
374
375 Args:
376 env: coverage environment
377 params: parameters
378 use_cnn: whether to use CNN
379 use_comm_map: whether to use communication maps
380 map_size: size of the maps
382 Returns:
383 torch_geometric.data.Data: torch geometric data
384
385 """
386
387 if use_cnn:
388 features = CoverageEnvUtils.get_maps(env, params, map_size, use_comm_map)
389 else:
390 features = CoverageEnvUtils.get_voronoi_features(env)
391 edge_weights = CoverageEnvUtils.get_weights(env, params).to_sparse().coalesce()
392 edge_index = edge_weights.indices().long()
393 weights = edge_weights.values().float()
394 pos = CoverageEnvUtils.get_robot_positions(env)
395 pos = (pos + params.pWorldMapSize / 2.0) / params.pWorldMapSize
396 data = torch_geometric.data.Data(
397 x=features,
398 edge_index=edge_index.clone().detach(),
399 edge_weight=weights.clone().detach(),
400 pos=pos.clone().detach(),
401 )
402
403 return data
404
405 # Legacy maps which gives decent results
406 # Trying to move away from this
407 # @staticmethod
408 # def get_stable_maps(env, params, resized_map_size):
409 # robot_positions = CoverageEnvUtils.to_tensor(env.GetRobotPositions())
410 # num_robots = env.GetNumRobots()
411 # maps = torch.empty((num_robots, 4, resized_map_size, resized_map_size))
412 # h_vals = torch.linspace(1.0, -1.0, maps.shape[-2]+1)
413 # h_vals = (h_vals[1:] + h_vals[:-1])/2
414 # w_vals = torch.linspace(-1.0, 1.0, maps.shape[-1]+1)
415 # w_vals = (w_vals[1:] + w_vals[:-1])/2
416 # heatmap_x = torch.stack([h_vals] * maps.shape[-1], dim=1)/100
417 # heatmap_y = torch.stack([w_vals] * maps.shape[-2], dim=0)/100
418 # for r_idx in range(num_robots):
419 # local_map = env.GetRobotLocalMap(r_idx)
420 # resized_local_map = cv2.resize(local_map, dsize=(resized_map_size, resized_map_size), interpolation=cv2.INTER_AREA)
421 # maps[r_idx][0] = torch.tensor(resized_local_map).float()
422
423 # comm_map = env.GetCommunicationMap(r_idx)
424 # filtered_comm_map = gaussian_filter(comm_map, sigma=(3,3), order=0)
425 # resized_comm_map = torch.tensor(cv2.resize(numpy.array(filtered_comm_map), dsize=(resized_map_size, resized_map_size), interpolation=cv2.INTER_AREA)).float()
426 # maps[r_idx][1] = resized_comm_map
427
428 # maps[r_idx][2] = heatmap_x
429 # maps[r_idx][3] = heatmap_y
430
431 # return maps
torch.Tensor get_weights(CoverageSystem env, Parameters params)
Get edge weights for the communication graph.
torch.Tensor resize_maps(torch.Tensor maps, int resized_map_size)
Resize maps to a given size Uses bilinear interpolation from torchvision.transforms....
torch.Tensor robot_positions_to_edge_weights(PointVector robot_positions, int world_map_size, float comm_range)
Convert robot positions to edge weights.
torch.Tensor get_voronoi_features(CoverageSystem env)
Get voronoi features.
torch.Tensor to_tensor(object data)
Converts various types of data to torch.Tensor.
torch.Tensor get_communication_maps(CoverageSystem env, Parameters params, int map_size)
Generate communication maps from positions.
torch_geometric.data.Data get_torch_geometric_data(CoverageSystem env, Parameters params, bool use_cnn, bool use_comm_map, int map_size)
Get torch geometric data In this function, the edge weights are binary.
torch.Tensor get_raw_local_maps(CoverageSystem env, Parameters params)
Get raw local maps.
torch.Tensor get_robot_positions(CoverageSystem env)
Get robot positions.
torch.Tensor get_raw_obstacle_maps(CoverageSystem env, Parameters params)
Get raw obstacle maps.
torch.Tensor get_maps(CoverageSystem env, Parameters params, int resized_map_size, bool use_comm_map)
Get maps for the coverage environment.