Add nuplan route info (#98)
This commit is contained in:
3
.github/workflows/main.yml
vendored
3
.github/workflows/main.yml
vendored
@@ -38,7 +38,6 @@ jobs:
|
||||
pip install tensorflow==2.11.0
|
||||
pip install protobuf==3.20
|
||||
pip install cython
|
||||
pip install numpy
|
||||
pip install -e .
|
||||
pip install pytest
|
||||
pip install pytest-cov
|
||||
@@ -48,7 +47,7 @@ jobs:
|
||||
cd metadrive
|
||||
pip install -e .
|
||||
cd ../
|
||||
|
||||
pip install numpy==1.26.4
|
||||
cd scenarionet/
|
||||
pytest --cov=./ --cov-config=.coveragerc --cov-report=xml -sv tests
|
||||
|
||||
|
||||
143
scenarionet/converter/nuplan/block_utils/bfs_roadblock.py
Normal file
143
scenarionet/converter/nuplan/block_utils/bfs_roadblock.py
Normal file
@@ -0,0 +1,143 @@
|
||||
from collections import deque
|
||||
from typing import Dict, Optional, Tuple, Union, List
|
||||
|
||||
try:
|
||||
from nuplan.common.maps.abstract_map import AbstractMap
|
||||
from nuplan.common.maps.abstract_map_objects import RoadBlockGraphEdgeMapObject
|
||||
except ImportError:
|
||||
AbstractMap = None
|
||||
RoadBlockGraphEdgeMapObject = None
|
||||
|
||||
|
||||
class BreadthFirstSearchRoadBlock:
|
||||
"""
|
||||
A class that performs iterative breadth first search. The class operates on the roadblock graph.
|
||||
"""
|
||||
def __init__(self, start_roadblock_id: int, map_api: Optional[AbstractMap], forward_search: str = True):
|
||||
"""
|
||||
Constructor of BreadthFirstSearchRoadBlock class
|
||||
:param start_roadblock_id: roadblock id where graph starts
|
||||
:param map_api: map class in nuPlan
|
||||
:param forward_search: whether to search in driving direction, defaults to True
|
||||
"""
|
||||
self._map_api: Optional[AbstractMap] = map_api
|
||||
self._queue = deque([self.id_to_roadblock(start_roadblock_id), None])
|
||||
self._parent: Dict[str, Optional[RoadBlockGraphEdgeMapObject]] = dict()
|
||||
self._forward_search = forward_search
|
||||
|
||||
# lazy loaded
|
||||
self._target_roadblock_ids: List[str] = None
|
||||
|
||||
def search(self, target_roadblock_id: Union[str, List[str]],
|
||||
max_depth: int) -> Tuple[List[RoadBlockGraphEdgeMapObject], bool]:
|
||||
"""
|
||||
Apply BFS to find route to target roadblock.
|
||||
:param target_roadblock_id: id of target roadblock
|
||||
:param max_depth: maximum search depth
|
||||
:return: tuple of route and whether a path was found
|
||||
"""
|
||||
|
||||
if isinstance(target_roadblock_id, str):
|
||||
target_roadblock_id = [target_roadblock_id]
|
||||
self._target_roadblock_ids = target_roadblock_id
|
||||
|
||||
start_edge = self._queue[0]
|
||||
|
||||
# Initial search states
|
||||
path_found: bool = False
|
||||
end_edge: RoadBlockGraphEdgeMapObject = start_edge
|
||||
end_depth: int = 1
|
||||
depth: int = 1
|
||||
|
||||
self._parent[start_edge.id + f"_{depth}"] = None
|
||||
|
||||
while self._queue:
|
||||
current_edge = self._queue.popleft()
|
||||
|
||||
# Early exit condition
|
||||
if self._check_end_condition(depth, max_depth):
|
||||
break
|
||||
|
||||
# Depth tracking
|
||||
if current_edge is None:
|
||||
depth += 1
|
||||
self._queue.append(None)
|
||||
if self._queue[0] is None:
|
||||
break
|
||||
continue
|
||||
|
||||
# Goal condition
|
||||
if self._check_goal_condition(current_edge, depth, max_depth):
|
||||
end_edge = current_edge
|
||||
end_depth = depth
|
||||
path_found = True
|
||||
break
|
||||
|
||||
neighbors = (current_edge.outgoing_edges if self._forward_search else current_edge.incoming_edges)
|
||||
|
||||
# Populate queue
|
||||
for next_edge in neighbors:
|
||||
# if next_edge.id in self._candidate_lane_edge_ids_old:
|
||||
self._queue.append(next_edge)
|
||||
self._parent[next_edge.id + f"_{depth + 1}"] = current_edge
|
||||
end_edge = next_edge
|
||||
end_depth = depth + 1
|
||||
|
||||
return self._construct_path(end_edge, end_depth), path_found
|
||||
|
||||
def id_to_roadblock(self, id: str) -> RoadBlockGraphEdgeMapObject:
|
||||
"""
|
||||
Retrieves roadblock from map-api based on id
|
||||
:param id: id of roadblock
|
||||
:return: roadblock class
|
||||
"""
|
||||
block = self._map_api._get_roadblock(id)
|
||||
block = block or self._map_api._get_roadblock_connector(id)
|
||||
return block
|
||||
|
||||
@staticmethod
|
||||
def _check_end_condition(depth: int, max_depth: int) -> bool:
|
||||
"""
|
||||
Check if the search should end regardless if the goal condition is met.
|
||||
:param depth: The current depth to check.
|
||||
:param target_depth: The target depth to check against.
|
||||
:return: whether depth exceeds the target depth.
|
||||
"""
|
||||
return depth > max_depth
|
||||
|
||||
def _check_goal_condition(
|
||||
self,
|
||||
current_edge: RoadBlockGraphEdgeMapObject,
|
||||
depth: int,
|
||||
max_depth: int,
|
||||
) -> bool:
|
||||
"""
|
||||
Check if the current edge is at the target roadblock at the given depth.
|
||||
:param current_edge: edge to check.
|
||||
:param depth: current depth to check.
|
||||
:param max_depth: maximum depth the edge should be at.
|
||||
:return: True if the lane edge is contain the in the target roadblock. False, otherwise.
|
||||
"""
|
||||
return current_edge.id in self._target_roadblock_ids and depth <= max_depth
|
||||
|
||||
def _construct_path(self, end_edge: RoadBlockGraphEdgeMapObject, depth: int) -> List[RoadBlockGraphEdgeMapObject]:
|
||||
"""
|
||||
Constructs a path when goal was found.
|
||||
:param end_edge: The end edge to start back propagating back to the start edge.
|
||||
:param depth: The depth of the target edge.
|
||||
:return: The constructed path as a list of RoadBlockGraphEdgeMapObject
|
||||
"""
|
||||
path = [end_edge]
|
||||
path_id = [end_edge.id]
|
||||
|
||||
while self._parent[end_edge.id + f"_{depth}"] is not None:
|
||||
path.append(self._parent[end_edge.id + f"_{depth}"])
|
||||
path_id.append(path[-1].id)
|
||||
end_edge = self._parent[end_edge.id + f"_{depth}"]
|
||||
depth -= 1
|
||||
|
||||
if self._forward_search:
|
||||
path.reverse()
|
||||
path_id.reverse()
|
||||
|
||||
return (path, path_id)
|
||||
149
scenarionet/converter/nuplan/block_utils/dijkstra.py
Normal file
149
scenarionet/converter/nuplan/block_utils/dijkstra.py
Normal file
@@ -0,0 +1,149 @@
|
||||
from typing import Dict, List, Optional, Tuple
|
||||
import numpy as np
|
||||
try:
|
||||
from nuplan.common.maps.abstract_map_objects import (
|
||||
LaneGraphEdgeMapObject,
|
||||
RoadBlockGraphEdgeMapObject,
|
||||
)
|
||||
finally:
|
||||
pass
|
||||
|
||||
|
||||
class Dijkstra:
|
||||
"""
|
||||
A class that performs dijkstra's shortest path. The class operates on lane level graph search.
|
||||
The goal condition is specified to be if the lane can be found at the target roadblock or roadblock connector.
|
||||
"""
|
||||
def __init__(self, start_edge: LaneGraphEdgeMapObject, candidate_lane_edge_ids: List[str]):
|
||||
"""
|
||||
Constructor for the Dijkstra class.
|
||||
:param start_edge: The starting edge for the search
|
||||
:param candidate_lane_edge_ids: The candidates lane ids that can be included in the search.
|
||||
"""
|
||||
self._queue = list([start_edge])
|
||||
self._parent: Dict[str, Optional[LaneGraphEdgeMapObject]] = dict()
|
||||
self._candidate_lane_edge_ids = candidate_lane_edge_ids
|
||||
|
||||
def search(self, target_roadblock: RoadBlockGraphEdgeMapObject) -> Tuple[List[LaneGraphEdgeMapObject], bool]:
|
||||
"""
|
||||
Performs dijkstra's shortest path to find a route to the target roadblock.
|
||||
:param target_roadblock: The target roadblock the path should end at.
|
||||
:return:
|
||||
- A route starting from the given start edge
|
||||
- A bool indicating if the route is successfully found. Successful means that there exists a path
|
||||
from the start edge to an edge contained in the end roadblock.
|
||||
If unsuccessful the shortest deepest path is returned.
|
||||
"""
|
||||
start_edge = self._queue[0]
|
||||
|
||||
# Initial search states
|
||||
path_found: bool = False
|
||||
end_edge: LaneGraphEdgeMapObject = start_edge
|
||||
|
||||
self._parent[start_edge.id] = None
|
||||
self._frontier = [start_edge.id]
|
||||
self._dist = [1]
|
||||
self._depth = [1]
|
||||
|
||||
self._expanded = []
|
||||
self._expanded_id = []
|
||||
self._expanded_dist = []
|
||||
self._expanded_depth = []
|
||||
|
||||
while len(self._queue) > 0:
|
||||
dist, idx = min((val, idx) for (idx, val) in enumerate(self._dist))
|
||||
current_edge = self._queue[idx]
|
||||
current_depth = self._depth[idx]
|
||||
|
||||
del self._dist[idx], self._queue[idx], self._frontier[idx], self._depth[idx]
|
||||
|
||||
if self._check_goal_condition(current_edge, target_roadblock):
|
||||
end_edge = current_edge
|
||||
path_found = True
|
||||
break
|
||||
|
||||
self._expanded.append(current_edge)
|
||||
self._expanded_id.append(current_edge.id)
|
||||
self._expanded_dist.append(dist)
|
||||
self._expanded_depth.append(current_depth)
|
||||
|
||||
# Populate queue
|
||||
for next_edge in current_edge.outgoing_edges:
|
||||
if not next_edge.id in self._candidate_lane_edge_ids:
|
||||
continue
|
||||
|
||||
alt = dist + self._edge_cost(next_edge)
|
||||
if next_edge.id not in self._expanded_id and next_edge.id not in self._frontier:
|
||||
self._parent[next_edge.id] = current_edge
|
||||
self._queue.append(next_edge)
|
||||
self._frontier.append(next_edge.id)
|
||||
self._dist.append(alt)
|
||||
self._depth.append(current_depth + 1)
|
||||
end_edge = next_edge
|
||||
|
||||
elif next_edge.id in self._frontier:
|
||||
next_edge_idx = self._frontier.index(next_edge.id)
|
||||
current_cost = self._dist[next_edge_idx]
|
||||
if alt < current_cost:
|
||||
self._parent[next_edge.id] = current_edge
|
||||
self._dist[next_edge_idx] = alt
|
||||
self._depth[next_edge_idx] = current_depth + 1
|
||||
|
||||
if not path_found:
|
||||
# filter max depth
|
||||
max_depth = max(self._expanded_depth)
|
||||
idx_max_depth = list(np.where(np.array(self._expanded_depth) == max_depth)[0])
|
||||
dist_at_max_depth = [self._expanded_dist[i] for i in idx_max_depth]
|
||||
|
||||
dist, _idx = min((val, idx) for (idx, val) in enumerate(dist_at_max_depth))
|
||||
end_edge = self._expanded[idx_max_depth[_idx]]
|
||||
|
||||
return self._construct_path(end_edge), path_found
|
||||
|
||||
@staticmethod
|
||||
def _edge_cost(lane: LaneGraphEdgeMapObject) -> float:
|
||||
"""
|
||||
Edge cost of given lane.
|
||||
:param lane: lane class
|
||||
:return: length of lane
|
||||
"""
|
||||
return lane.baseline_path.length
|
||||
|
||||
@staticmethod
|
||||
def _check_end_condition(depth: int, target_depth: int) -> bool:
|
||||
"""
|
||||
Check if the search should end regardless if the goal condition is met.
|
||||
:param depth: The current depth to check.
|
||||
:param target_depth: The target depth to check against.
|
||||
:return: True if:
|
||||
- The current depth exceeds the target depth.
|
||||
"""
|
||||
return depth > target_depth
|
||||
|
||||
@staticmethod
|
||||
def _check_goal_condition(
|
||||
current_edge: LaneGraphEdgeMapObject,
|
||||
target_roadblock: RoadBlockGraphEdgeMapObject,
|
||||
) -> bool:
|
||||
"""
|
||||
Check if the current edge is at the target roadblock at the given depth.
|
||||
:param current_edge: The edge to check.
|
||||
:param target_roadblock: The target roadblock the edge should be contained in.
|
||||
:return: whether the current edge is in the target roadblock
|
||||
"""
|
||||
return current_edge.get_roadblock_id() == target_roadblock.id
|
||||
|
||||
def _construct_path(self, end_edge: LaneGraphEdgeMapObject) -> List[LaneGraphEdgeMapObject]:
|
||||
"""
|
||||
:param end_edge: The end edge to start back propagating back to the start edge.
|
||||
:param depth: The depth of the target edge.
|
||||
:return: The constructed path as a list of LaneGraphEdgeMapObject
|
||||
"""
|
||||
path = [end_edge]
|
||||
while self._parent[end_edge.id] is not None:
|
||||
node = self._parent[end_edge.id]
|
||||
path.append(node)
|
||||
end_edge = node
|
||||
path.reverse()
|
||||
|
||||
return path
|
||||
230
scenarionet/converter/nuplan/block_utils/route_utils.py
Normal file
230
scenarionet/converter/nuplan/block_utils/route_utils.py
Normal file
@@ -0,0 +1,230 @@
|
||||
from typing import Dict, List, Tuple
|
||||
|
||||
import numpy as np
|
||||
try:
|
||||
from nuplan.common.actor_state.ego_state import EgoState
|
||||
from nuplan.common.actor_state.state_representation import StateSE2
|
||||
from nuplan.common.maps.abstract_map import AbstractMap
|
||||
from nuplan.common.maps.abstract_map_objects import RoadBlockGraphEdgeMapObject
|
||||
from nuplan.common.maps.maps_datatypes import SemanticMapLayer
|
||||
from nuplan.planning.simulation.occupancy_map.strtree_occupancy_map import (
|
||||
STRTreeOccupancyMapFactory,
|
||||
)
|
||||
finally:
|
||||
pass
|
||||
|
||||
from scenarionet.converter.nuplan.block_utils.bfs_roadblock import BreadthFirstSearchRoadBlock
|
||||
|
||||
|
||||
def normalize_angle(angle: float) -> float:
|
||||
return (angle + np.pi) % (2 * np.pi) - np.pi
|
||||
|
||||
|
||||
def get_current_roadblock_candidates(
|
||||
ego_state: EgoState,
|
||||
map_api: AbstractMap,
|
||||
route_roadblocks_dict: Dict[str, RoadBlockGraphEdgeMapObject],
|
||||
heading_error_thresh: float = np.pi / 4,
|
||||
displacement_error_thresh: float = 3,
|
||||
) -> Tuple[RoadBlockGraphEdgeMapObject, List[RoadBlockGraphEdgeMapObject]]:
|
||||
"""
|
||||
Determines a set of roadblock candidate where ego is located
|
||||
:param ego_state: class containing ego state
|
||||
:param map_api: map object
|
||||
:param route_roadblocks_dict: dictionary of on-route roadblocks
|
||||
:param heading_error_thresh: maximum heading error, defaults to np.pi/4
|
||||
:param displacement_error_thresh: maximum displacement, defaults to 3
|
||||
:return: tuple of most promising roadblock and other candidates
|
||||
"""
|
||||
ego_pose: StateSE2 = ego_state.rear_axle
|
||||
roadblock_candidates = []
|
||||
|
||||
layers = [SemanticMapLayer.ROADBLOCK, SemanticMapLayer.ROADBLOCK_CONNECTOR]
|
||||
roadblock_dict = map_api.get_proximal_map_objects(point=ego_pose.point, radius=2.5, layers=layers)
|
||||
roadblock_candidates = (
|
||||
roadblock_dict[SemanticMapLayer.ROADBLOCK] + roadblock_dict[SemanticMapLayer.ROADBLOCK_CONNECTOR]
|
||||
)
|
||||
|
||||
if not roadblock_candidates:
|
||||
for layer in layers:
|
||||
roadblock_id_, distance = map_api.get_distance_to_nearest_map_object(point=ego_pose.point, layer=layer)
|
||||
roadblock = map_api.get_map_object(roadblock_id_, layer)
|
||||
|
||||
if roadblock:
|
||||
roadblock_candidates.append(roadblock)
|
||||
|
||||
on_route_candidates, on_route_candidate_displacement_errors = [], []
|
||||
candidates, candidate_displacement_errors = [], []
|
||||
|
||||
roadblock_displacement_errors = []
|
||||
roadblock_heading_errors = []
|
||||
|
||||
for idx, roadblock in enumerate(roadblock_candidates):
|
||||
lane_displacement_error, lane_heading_error = np.inf, np.inf
|
||||
|
||||
for lane in roadblock.interior_edges:
|
||||
lane_discrete_path: List[StateSE2] = lane.baseline_path.discrete_path
|
||||
lane_discrete_points = np.array([state.point.array for state in lane_discrete_path], dtype=np.float64)
|
||||
lane_state_distances = ((lane_discrete_points - ego_pose.point.array[None, ...])**2.0).sum(axis=-1)**0.5
|
||||
argmin = np.argmin(lane_state_distances)
|
||||
|
||||
heading_error = np.abs(normalize_angle(lane_discrete_path[argmin].heading - ego_pose.heading))
|
||||
displacement_error = lane_state_distances[argmin]
|
||||
|
||||
if displacement_error < lane_displacement_error:
|
||||
lane_heading_error, lane_displacement_error = (
|
||||
heading_error,
|
||||
displacement_error,
|
||||
)
|
||||
|
||||
if (heading_error < heading_error_thresh and displacement_error < displacement_error_thresh):
|
||||
if roadblock.id in route_roadblocks_dict.keys():
|
||||
on_route_candidates.append(roadblock)
|
||||
on_route_candidate_displacement_errors.append(displacement_error)
|
||||
else:
|
||||
candidates.append(roadblock)
|
||||
candidate_displacement_errors.append(displacement_error)
|
||||
|
||||
roadblock_displacement_errors.append(lane_displacement_error)
|
||||
roadblock_heading_errors.append(lane_heading_error)
|
||||
|
||||
if on_route_candidates: # prefer on-route roadblocks
|
||||
return (
|
||||
on_route_candidates[np.argmin(on_route_candidate_displacement_errors)],
|
||||
on_route_candidates,
|
||||
)
|
||||
elif candidates: # fallback to most promising candidate
|
||||
return candidates[np.argmin(candidate_displacement_errors)], candidates
|
||||
|
||||
# otherwise, just find any close roadblock
|
||||
return (
|
||||
roadblock_candidates[np.argmin(roadblock_displacement_errors)],
|
||||
roadblock_candidates,
|
||||
)
|
||||
|
||||
|
||||
def route_roadblock_correction(
|
||||
ego_state: EgoState,
|
||||
map_api: AbstractMap,
|
||||
route_roadblock_ids: List[str],
|
||||
search_depth_backward: int = 15,
|
||||
search_depth_forward: int = 30,
|
||||
) -> List[str]:
|
||||
"""
|
||||
Applies several methods to correct route roadblocks.
|
||||
:param ego_state: class containing ego state
|
||||
:param map_api: map object
|
||||
:param route_roadblocks_dict: dictionary of on-route roadblocks
|
||||
:param search_depth_backward: depth of forward BFS search, defaults to 15
|
||||
:param search_depth_forward: depth of backward BFS search, defaults to 30
|
||||
:return: list of roadblock id's of corrected route
|
||||
"""
|
||||
|
||||
route_roadblock_dict = {}
|
||||
for id_ in route_roadblock_ids:
|
||||
block = map_api.get_map_object(id_, SemanticMapLayer.ROADBLOCK)
|
||||
block = block or map_api.get_map_object(id_, SemanticMapLayer.ROADBLOCK_CONNECTOR)
|
||||
route_roadblock_dict[id_] = block
|
||||
|
||||
starting_block, starting_block_candidates = get_current_roadblock_candidates(
|
||||
ego_state, map_api, route_roadblock_dict
|
||||
)
|
||||
starting_block_ids = [roadblock.id for roadblock in starting_block_candidates]
|
||||
|
||||
route_roadblocks = list(route_roadblock_dict.values())
|
||||
route_roadblock_ids = list(route_roadblock_dict.keys())
|
||||
|
||||
# Fix 1: when agent starts off-route
|
||||
if starting_block.id not in route_roadblock_ids:
|
||||
# Backward search if current roadblock not in route
|
||||
graph_search = BreadthFirstSearchRoadBlock(route_roadblock_ids[0], map_api, forward_search=False)
|
||||
(path, path_id), path_found = graph_search.search(starting_block_ids, max_depth=search_depth_backward)
|
||||
|
||||
if path_found:
|
||||
route_roadblocks[:0] = path[:-1]
|
||||
route_roadblock_ids[:0] = path_id[:-1]
|
||||
|
||||
else:
|
||||
# Forward search to any route roadblock
|
||||
graph_search = BreadthFirstSearchRoadBlock(starting_block.id, map_api, forward_search=True)
|
||||
(path, path_id), path_found = graph_search.search(route_roadblock_ids[:3], max_depth=search_depth_forward)
|
||||
|
||||
if path_found:
|
||||
end_roadblock_idx = np.argmax(np.array(route_roadblock_ids) == path_id[-1])
|
||||
|
||||
route_roadblocks = route_roadblocks[end_roadblock_idx + 1:]
|
||||
route_roadblock_ids = route_roadblock_ids[end_roadblock_idx + 1:]
|
||||
|
||||
route_roadblocks[:0] = path
|
||||
route_roadblock_ids[:0] = path_id
|
||||
|
||||
# Fix 2: check if roadblocks are linked, search for links if not
|
||||
roadblocks_to_append = {}
|
||||
for i in range(len(route_roadblocks) - 1):
|
||||
next_incoming_block_ids = [_roadblock.id for _roadblock in route_roadblocks[i + 1].incoming_edges]
|
||||
is_incoming = route_roadblock_ids[i] in next_incoming_block_ids
|
||||
|
||||
if is_incoming:
|
||||
continue
|
||||
|
||||
graph_search = BreadthFirstSearchRoadBlock(route_roadblock_ids[i], map_api, forward_search=True)
|
||||
(path, path_id), path_found = graph_search.search(route_roadblock_ids[i + 1], max_depth=search_depth_forward)
|
||||
|
||||
if path_found and path and len(path) >= 3:
|
||||
path, path_id = path[1:-1], path_id[1:-1]
|
||||
roadblocks_to_append[i] = (path, path_id)
|
||||
|
||||
# append missing intermediate roadblocks
|
||||
offset = 1
|
||||
for i, (path, path_id) in roadblocks_to_append.items():
|
||||
route_roadblocks[i + offset:i + offset] = path
|
||||
route_roadblock_ids[i + offset:i + offset] = path_id
|
||||
offset += len(path)
|
||||
|
||||
# Fix 3: cut route-loops
|
||||
route_roadblocks, route_roadblock_ids = remove_route_loops(route_roadblocks, route_roadblock_ids)
|
||||
|
||||
return route_roadblock_ids
|
||||
|
||||
|
||||
def remove_route_loops(
|
||||
route_roadblocks: List[RoadBlockGraphEdgeMapObject],
|
||||
route_roadblock_ids: List[str],
|
||||
) -> Tuple[List[str], List[RoadBlockGraphEdgeMapObject]]:
|
||||
"""
|
||||
Remove ending of route, if the roadblock are intersecting the route (forming a loop).
|
||||
:param route_roadblocks: input route roadblocks
|
||||
:param route_roadblock_ids: input route roadblocks ids
|
||||
:return: tuple of ids and roadblocks of route without loops
|
||||
"""
|
||||
|
||||
roadblock_occupancy_map = None
|
||||
loop_idx = None
|
||||
|
||||
for idx, roadblock in enumerate(route_roadblocks):
|
||||
# loops only occur at intersection, thus searching for roadblock-connectors.
|
||||
if str(roadblock.__class__.__name__) == "NuPlanRoadBlockConnector":
|
||||
if not roadblock_occupancy_map:
|
||||
roadblock_occupancy_map = STRTreeOccupancyMapFactory.get_from_geometry(
|
||||
[roadblock.polygon], [roadblock.id]
|
||||
)
|
||||
continue
|
||||
|
||||
strtree, index_by_id = roadblock_occupancy_map._build_strtree()
|
||||
indices = strtree.query(roadblock.polygon)
|
||||
if len(indices) > 0:
|
||||
for geom in strtree.geometries.take(indices):
|
||||
area = geom.intersection(roadblock.polygon).area
|
||||
if area > 1:
|
||||
loop_idx = idx
|
||||
break
|
||||
if loop_idx:
|
||||
break
|
||||
|
||||
roadblock_occupancy_map.insert(roadblock.id, roadblock.polygon)
|
||||
|
||||
if loop_idx:
|
||||
route_roadblocks = route_roadblocks[:loop_idx]
|
||||
route_roadblock_ids = route_roadblock_ids[:loop_idx]
|
||||
|
||||
return route_roadblocks, route_roadblock_ids
|
||||
@@ -4,7 +4,7 @@ import tempfile
|
||||
from dataclasses import dataclass
|
||||
from os.path import join
|
||||
from typing import Union
|
||||
|
||||
from scenarionet.converter.nuplan.block_utils.route_utils import route_roadblock_correction
|
||||
import numpy as np
|
||||
from metadrive.scenario import ScenarioDescription as SD
|
||||
from metadrive.type import MetaDriveType
|
||||
@@ -69,7 +69,8 @@ def get_nuplan_scenarios(data_root, map_root, logs: Union[list, None] = None, bu
|
||||
# filter
|
||||
"scenario_filter=all_scenarios", # simulate only one log
|
||||
"scenario_filter.remove_invalid_goals=true",
|
||||
"scenario_filter.shuffle=true",
|
||||
"scenario_filter.expand_scenarios=false",
|
||||
"scenario_filter.shuffle=false",
|
||||
"scenario_filter.log_names=[{}]".format(log_string),
|
||||
# "scenario_filter.scenario_types={}".format(all_scenario_types),
|
||||
# "scenario_filter.scenario_tokens=[]",
|
||||
@@ -78,7 +79,7 @@ def get_nuplan_scenarios(data_root, map_root, logs: Union[list, None] = None, bu
|
||||
# "scenario_filter.limit_total_scenarios=1000",
|
||||
# "scenario_filter.expand_scenarios=true",
|
||||
# "scenario_filter.limit_scenarios_per_type=10", # use 10 scenarios per scenario type
|
||||
"scenario_filter.timestamp_threshold_s=20", # minial scenario duration (s)
|
||||
"scenario_filter.timestamp_threshold_s=10", # minial scenario duration (s)
|
||||
]
|
||||
|
||||
base_config_path = os.path.join(nuplan_package_path, "planning", "script")
|
||||
@@ -175,7 +176,7 @@ def get_line_type(nuplan_type):
|
||||
raise ValueError("Unknown line tyep: {}".format(nuplan_type))
|
||||
|
||||
|
||||
def extract_map_features(map_api, center, radius=500):
|
||||
def extract_map_features(map_api, center, route_block_ids, radius=500):
|
||||
ret = {}
|
||||
np.seterr(all='ignore')
|
||||
# Center is Important !
|
||||
@@ -235,7 +236,9 @@ def extract_map_features(map_api, center, radius=500):
|
||||
if layer == SemanticMapLayer.ROADBLOCK else [],
|
||||
SD.RIGHT_NEIGHBORS: [edge.id for edge in block.interior_edges[index + 1:]] \
|
||||
if layer == SemanticMapLayer.ROADBLOCK else [],
|
||||
SD.POLYGON: polygon
|
||||
SD.POLYGON: polygon,
|
||||
"is_sdc_route": lane_meta_data.get_roadblock_id() in route_block_ids,
|
||||
"speed_limit_mps": lane_meta_data.speed_limit_mps,
|
||||
}
|
||||
if layer == SemanticMapLayer.ROADBLOCK_CONNECTOR:
|
||||
continue
|
||||
@@ -513,8 +516,15 @@ def convert_nuplan_scenario(scenario: NuPlanScenario, version):
|
||||
# traffic light
|
||||
result[SD.DYNAMIC_MAP_STATES] = extract_traffic_light(scenario, scenario_center)
|
||||
|
||||
# route
|
||||
route_block_ids = scenario.get_route_roadblock_ids()
|
||||
try:
|
||||
route_block_ids = route_roadblock_correction(state, scenario.map_api, route_block_ids)
|
||||
except Exception as e:
|
||||
logger.error("Route correction failed: {}".format(e))
|
||||
|
||||
# map
|
||||
result[SD.MAP_FEATURES] = extract_map_features(scenario.map_api, scenario_center)
|
||||
result[SD.MAP_FEATURES] = extract_map_features(scenario.map_api, scenario_center, route_block_ids)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
Reference in New Issue
Block a user