Coverage Control Library
Loading...
Searching...
No Matches
coverage_env_utils.py
Go to the documentation of this file.
1# This file is part of the CoverageControl library
2#
3# Author: Saurav Agarwal
4# Contact: sauravag@seas.upenn.edu, agr.saurav1@gmail.com
5# Repository: https://github.com/KumarRobotics/CoverageControl
6#
7# Copyright (c) 2024, Saurav Agarwal
8#
9# The CoverageControl library is free software: you can redistribute it and/or
10# modify it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or (at your
12# option) any later version.
13#
14# The CoverageControl library is distributed in the hope that it will be
15# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
17# Public License for more details.
18#
19# You should have received a copy of the GNU General Public License along with
20# CoverageControl library. If not, see <https://www.gnu.org/licenses/>.
21
22"""
23Utility functions for coverage environment
24"""
25
26
28
29import math
30
31import numpy
32
33# import cv2
34import torch
35import torch_geometric
36import torchvision
37
38from .core import CoverageSystem, DblVector, DblVectorVector, Parameters, PointVector
39
40
41
43 """
44 Class for utility functions for coverage environment
45 """
46
47 @staticmethod
48 def to_tensor(data: object) -> torch.Tensor:
49 """
50 Converts various types of data to torch.Tensor
51
52 Can accept the following types:
53 - numpy.ndarray
54 - PointVector
55 - DblVectorVector
56 - DblVector
57
58 Args:
59 data: input data
60
61 Returns:
62 torch.Tensor: converted data
63
64 Raises:
65 ValueError: if data type is not supported
66
67 """
68
69 if isinstance(data, numpy.ndarray):
70 return torch.from_numpy(numpy.copy(data.astype(numpy.float32)))
71
72 if isinstance(data, PointVector):
73 data_tensor = torch.Tensor(len(data), 2)
74
75 for i, _ in enumerate(data):
76 data_tensor[i] = CoverageEnvUtils.to_tensor(data[i])
77
78 return data_tensor
79
80 if isinstance(data, DblVectorVector):
81 data_tensor = torch.Tensor(len(data))
82
83 for i, _ in enumerate(data):
84 data_tensor[i] = CoverageEnvUtils.to_tensor(data[i])
85
86 return data_tensor
87
88 if isinstance(data, DblVector):
89 data_tensor = torch.Tensor(len(data))
90
91 for i, _ in enumerate(data):
92 data_tensor[i] = float(data[i])
93
94 return data_tensor
95 raise ValueError(f"Unknown data type: {type(data)}")
96
97 @staticmethod
98 def get_raw_local_maps(env: CoverageSystem, params: Parameters) -> torch.Tensor:
99 """
100 Get raw local maps
101
102 Args:
103 env: coverage environment
104 params: parameters
105
106 Returns:
107 torch.Tensor: raw local maps
108
109 """
110 local_maps = torch.zeros(
111 (env.GetNumRobots(), params.pLocalMapSize, params.pLocalMapSize)
112 )
113
114 for r_idx in range(env.GetNumRobots()):
115 local_maps[r_idx] = CoverageEnvUtils.to_tensor(env.GetRobotLocalMap(r_idx))
116
117 return local_maps
118
119 @staticmethod
120 def get_raw_obstacle_maps(env: CoverageSystem, params: Parameters) -> torch.Tensor:
121 """
122 Get raw obstacle maps
123
124 Args:
125 env: coverage environment
126 params: parameters
127
128 Returns:
129 torch.Tensor: raw obstacle maps
130
131 """
132 obstacle_maps = torch.zeros(
133 (env.GetNumRobots(), params.pLocalMapSize, params.pLocalMapSize)
134 )
135
136 for r_idx in range(env.GetNumRobots()):
137 obstacle_maps[r_idx] = CoverageEnvUtils.to_tensor(
138 env.GetRobotObstacleMap(r_idx)
139 )
140
141 return obstacle_maps
142
143 @staticmethod
145 env: CoverageSystem, params: Parameters, map_size: int
146 ) -> torch.Tensor:
147 """
148 Generate communication maps from positions
149
150 Communication maps are composed of two channels.
151 Each channnel has non-zero values for cells that correspond to the relative positions of the neighbors.
152 For the first channel, the value is the x-coordinate of the relative position divided by the communication range.
153 Similarly, the y-coordinte is used for the second channel.
154
155 Args:
156 env: coverage environment
157 params: parameters
158 map_size: size of the map
159
160 Returns:
161 torch.Tensor: communication maps
162 """
163 num_robots = env.GetNumRobots()
164
165 comm_maps = torch.zeros((num_robots, 2, map_size, map_size))
166
167 for r_idx in range(num_robots):
168 neighbors_pos = CoverageEnvUtils.to_tensor(
169 env.GetRelativePositonsNeighbors(r_idx)
170 )
171 scaled_indices = torch.round(
172 neighbors_pos
173 * map_size
174 / (params.pCommunicationRange * params.pResolution * 2.0)
175 + (map_size / 2.0 - params.pResolution / 2.0)
176 )
177 # comm_range_mask = relative_dist[r_idx] < params.pCommunicationRange
178 # scaled_indices = scaled_relative_pos[r_idx][comm_range_mask]
179 indices = torch.transpose(scaled_indices, 1, 0)
180 indices = indices.long()
181 values = neighbors_pos / params.pCommunicationRange
182 # values = values / params.pCommunicationRange
183 # values = (values + params.pCommunicationRange) / (2. * params.pCommunicationRange)
184 comm_maps[r_idx][0] = torch.sparse_coo_tensor(
185 indices, values[:, 0], torch.Size([map_size, map_size])
186 ).to_dense()
187 comm_maps[r_idx][1] = torch.sparse_coo_tensor(
188 indices, values[:, 1], torch.Size([map_size, map_size])
189 ).to_dense()
190
191 return comm_maps
192 # positions = env.GetRobotPositions()
193 # robot_positions = CoverageEnvUtils.to_tensor(env.GetRobotPositions())
194 # relative_pos = robot_positions.unsqueeze(0) - robot_positions.unsqueeze(1)
195 # scaled_relative_pos = torch.round(relative_pos * map_size / (params.pCommunicationRange * params.pResolution * 2.) + (map_size / 2. - params.pResolution / 2.))
196 # relative_dist = relative_pos.norm(2, 2)
197 # diagonal_mask = torch.eye(num_robots).to(torch.bool)
198 # relative_dist.masked_fill_(diagonal_mask, params.pCommunicationRange + 1)
199
200 @staticmethod
201 def resize_maps(maps: torch.Tensor, resized_map_size: int) -> torch.Tensor:
202 """
203 Resize maps to a given size
204 Uses bilinear interpolation from torchvision.transforms.functional.resize
205 Options: antialias=True
206
207 Args:
208 maps: input maps
209 resized_map_size: size of the resized maps
210
211 Returns:
212 torch.Tensor: resized maps
213
214 """
215 shape = maps.shape
216 maps = maps.view(-1, maps.shape[-2], maps.shape[-1])
217 maps = torchvision.transforms.functional.resize(
218 maps,
219 (resized_map_size, resized_map_size),
220 interpolation=torchvision.transforms.InterpolationMode.BILINEAR,
221 antialias=True,
222 )
223 maps = maps.view(shape[:-2] + maps.shape[-2:])
224
225 return maps
226
227 @staticmethod
228 def get_maps(
229 env: CoverageSystem,
230 params: Parameters,
231 resized_map_size: int,
232 use_comm_map: bool,
233 ) -> torch.Tensor:
234 """
235 Get maps for the coverage environment
236
237 Args:
238 env: coverage environment
239 params: parameters
240 resized_map_size: size of the resized maps
241 use_comm_map: whether to use communication maps
242
243 Returns:
244 torch.Tensor: maps
245
246 """
247
248 num_robots = env.GetNumRobots()
249 raw_local_maps = CoverageEnvUtils.get_raw_local_maps(env, params)
250 resized_local_maps = CoverageEnvUtils.resize_maps(
251 raw_local_maps, resized_map_size
252 )
253 raw_obstacle_maps = CoverageEnvUtils.get_raw_obstacle_maps(env, params)
254 resized_obstacle_maps = CoverageEnvUtils.resize_maps(
255 raw_obstacle_maps, resized_map_size
256 )
257
258 if use_comm_map:
259 comm_maps = env.GetCommunicationMaps(resized_map_size)
260 # comm_maps = torch.tensor(numpy.array(env.GetCommunicationMaps(resized_map_size)), dtype=torch.float32).reshape(num_robots, 2, resized_map_size, resized_map_size)
261 comm_maps = CoverageEnvUtils.get_communication_maps(
262 env, params, resized_map_size
263 )
264 maps = torch.cat(
265 [
266 resized_local_maps.unsqueeze(1),
267 comm_maps,
268 resized_obstacle_maps.unsqueeze(1),
269 ],
270 1,
271 )
272 else:
273 maps = torch.cat(
274 [resized_local_maps.unsqueeze(1), resized_obstacle_maps.unsqueeze(1)], 1
275 )
276
277 return maps
278
279 @staticmethod
280 def get_voronoi_features(env: CoverageSystem) -> torch.Tensor:
281 """
282 Get voronoi features
283
284 Args:
285 env: coverage environment
286
287 Returns:
288 torch.Tensor: voronoi features
289 """
290 features = env.GetRobotVoronoiFeatures()
291 tensor_features = torch.zeros((len(features), len(features[0])))
292
293 for r_idx, _ in enumerate(features):
294 tensor_features[r_idx] = CoverageEnvUtils.to_tensor(features[r_idx])
295
296 return tensor_features
297
298 @staticmethod
299 def get_robot_positions(env: CoverageSystem) -> torch.Tensor:
300 """
301 Get robot positions
302
303 Args:
304 env: coverage environment
305
306 Returns:
307 torch.Tensor: robot positions
308 """
309 robot_positions = CoverageEnvUtils.to_tensor(env.GetRobotPositions())
311 return robot_positions
312
313 @staticmethod
314 def get_weights(env: CoverageSystem, params: Parameters) -> torch.Tensor:
315 """
316 Get edge weights for the communication graph
317
318 Args:
319 env: coverage environment
320 params: parameters
321
322 Returns:
323 torch.Tensor: edge weights
324 """
325 onebyexp = 1.0 / math.exp(1.0)
326 robot_positions = CoverageEnvUtils.to_tensor(env.GetRobotPositions())
327 pairwise_distances = torch.cdist(robot_positions, robot_positions, 2)
328 edge_weights = torch.exp(
329 -(pairwise_distances.square())
330 / (params.pCommunicationRange * params.pCommunicationRange)
331 )
332 edge_weights.masked_fill_(edge_weights < onebyexp, 0)
333 edge_weights.fill_diagonal_(0)
334
335 return edge_weights
336
337 @staticmethod
339 env: CoverageSystem,
340 params: Parameters,
341 use_cnn: bool,
342 use_comm_map: bool,
343 map_size: int,
344 ) -> torch_geometric.data.Data:
345 """
346 Get torch geometric data
347 In this function, the edge weights are binary
348
349 Args:
350 env: coverage environment
351 params: parameters
352 use_cnn: whether to use CNN
353 use_comm_map: whether to use communication maps
354 map_size: size of the maps
356 Returns:
357 torch_geometric.data.Data: torch geometric data
358
359 """
360
361 if use_cnn:
362 features = CoverageEnvUtils.get_maps(env, params, map_size, use_comm_map)
363 else:
364 features = CoverageEnvUtils.get_voronoi_features(env)
365 edge_weights = CoverageEnvUtils.get_weights(env, params).to_sparse().coalesce()
366 edge_index = edge_weights.indices().long()
367 weights = edge_weights.values().float()
368 pos = CoverageEnvUtils.get_robot_positions(env)
369 pos = (pos + params.pWorldMapSize / 2.0) / params.pWorldMapSize
370 data = torch_geometric.data.Data(
371 x=features,
372 edge_index=edge_index.clone().detach(),
373 edge_weight=weights.clone().detach(),
374 pos=pos.clone().detach(),
375 )
376
377 return data
378
379 # Legacy maps which gives decent results
380 # Trying to move away from this
381 # @staticmethod
382 # def get_stable_maps(env, params, resized_map_size):
383 # robot_positions = CoverageEnvUtils.to_tensor(env.GetRobotPositions())
384 # num_robots = env.GetNumRobots()
385 # maps = torch.empty((num_robots, 4, resized_map_size, resized_map_size))
386 # h_vals = torch.linspace(1.0, -1.0, maps.shape[-2]+1)
387 # h_vals = (h_vals[1:] + h_vals[:-1])/2
388 # w_vals = torch.linspace(-1.0, 1.0, maps.shape[-1]+1)
389 # w_vals = (w_vals[1:] + w_vals[:-1])/2
390 # heatmap_x = torch.stack([h_vals] * maps.shape[-1], dim=1)/100
391 # heatmap_y = torch.stack([w_vals] * maps.shape[-2], dim=0)/100
392 # for r_idx in range(num_robots):
393 # local_map = env.GetRobotLocalMap(r_idx)
394 # resized_local_map = cv2.resize(local_map, dsize=(resized_map_size, resized_map_size), interpolation=cv2.INTER_AREA)
395 # maps[r_idx][0] = torch.tensor(resized_local_map).float()
396
397 # comm_map = env.GetCommunicationMap(r_idx)
398 # filtered_comm_map = gaussian_filter(comm_map, sigma=(3,3), order=0)
399 # resized_comm_map = torch.tensor(cv2.resize(numpy.array(filtered_comm_map), dsize=(resized_map_size, resized_map_size), interpolation=cv2.INTER_AREA)).float()
400 # maps[r_idx][1] = resized_comm_map
401
402 # maps[r_idx][2] = heatmap_x
403 # maps[r_idx][3] = heatmap_y
404
405 # return maps
406
407
408 # Legacy edge weights used in previous research
409 # The weights are proportional to the distance
410 # Trying to move away from this
411 # @staticmethod
412 # def robot_positions_to_edge_weights(
413 # robot_positions: PointVector, world_map_size: int, comm_range: float
414 # ) -> torch.Tensor:
415 # """
416 # Convert robot positions to edge weights
417
418 # Args:
419 # robot_positions: robot positions
420 # world_map_size: size of the world map
421 # comm_range: communication range
422
423 # Returns:
424 # torch.Tensor: edge weights
425 # """
426 # x = numpy.array(robot_positions)
427 # s_mat = distance_matrix(x, x)
428 # s_mat[s_mat > comm_range] = 0
429 # c_mat = (world_map_size**2) / (s_mat.shape[0] ** 2)
430 # c_mat = 3 / c_mat
431 # graph_obs = c_mat * s_mat
432
433 # return graph_obs
434
Class for utility functions for coverage environment.
torch.Tensor get_weights(CoverageSystem env, Parameters params)
Get edge weights for the communication graph.
torch.Tensor resize_maps(torch.Tensor maps, int resized_map_size)
Resize maps to a given size Uses bilinear interpolation from torchvision.transforms....
torch.Tensor get_voronoi_features(CoverageSystem env)
Get voronoi features.
torch.Tensor to_tensor(object data)
Converts various types of data to torch.Tensor.
torch.Tensor get_communication_maps(CoverageSystem env, Parameters params, int map_size)
Generate communication maps from positions.
torch_geometric.data.Data get_torch_geometric_data(CoverageSystem env, Parameters params, bool use_cnn, bool use_comm_map, int map_size)
Get torch geometric data In this function, the edge weights are binary.
torch.Tensor get_raw_local_maps(CoverageSystem env, Parameters params)
Get raw local maps.
torch.Tensor get_robot_positions(CoverageSystem env)
Get robot positions.
torch.Tensor get_raw_obstacle_maps(CoverageSystem env, Parameters params)
Get raw obstacle maps.
torch.Tensor get_maps(CoverageSystem env, Parameters params, int resized_map_size, bool use_comm_map)
Get maps for the coverage environment.