From cebcbe6700bb9a909f8e511221c7ea188ee803d0 Mon Sep 17 00:00:00 2001 From: Quanyi Li Date: Wed, 5 Mar 2025 16:42:20 +0000 Subject: [PATCH] Add nuplan route info (#98) --- .github/workflows/main.yml | 3 +- .../converter/nuplan/block_utils/__init__.py | 0 .../nuplan/block_utils/bfs_roadblock.py | 143 +++++++++++ .../converter/nuplan/block_utils/dijkstra.py | 149 ++++++++++++ .../nuplan/block_utils/route_utils.py | 230 ++++++++++++++++++ scenarionet/converter/nuplan/utils.py | 22 +- 6 files changed, 539 insertions(+), 8 deletions(-) create mode 100644 scenarionet/converter/nuplan/block_utils/__init__.py create mode 100644 scenarionet/converter/nuplan/block_utils/bfs_roadblock.py create mode 100644 scenarionet/converter/nuplan/block_utils/dijkstra.py create mode 100644 scenarionet/converter/nuplan/block_utils/route_utils.py diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 5444369..d569605 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -38,7 +38,6 @@ jobs: pip install tensorflow==2.11.0 pip install protobuf==3.20 pip install cython - pip install numpy pip install -e . pip install pytest pip install pytest-cov @@ -48,7 +47,7 @@ jobs: cd metadrive pip install -e . cd ../ - + pip install numpy==1.26.4 cd scenarionet/ pytest --cov=./ --cov-config=.coveragerc --cov-report=xml -sv tests diff --git a/scenarionet/converter/nuplan/block_utils/__init__.py b/scenarionet/converter/nuplan/block_utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/scenarionet/converter/nuplan/block_utils/bfs_roadblock.py b/scenarionet/converter/nuplan/block_utils/bfs_roadblock.py new file mode 100644 index 0000000..5a3e6c4 --- /dev/null +++ b/scenarionet/converter/nuplan/block_utils/bfs_roadblock.py @@ -0,0 +1,143 @@ +from collections import deque +from typing import Dict, Optional, Tuple, Union, List + +try: + from nuplan.common.maps.abstract_map import AbstractMap + from nuplan.common.maps.abstract_map_objects import RoadBlockGraphEdgeMapObject +except ImportError: + AbstractMap = None + RoadBlockGraphEdgeMapObject = None + + +class BreadthFirstSearchRoadBlock: + """ + A class that performs iterative breadth first search. The class operates on the roadblock graph. + """ + def __init__(self, start_roadblock_id: int, map_api: Optional[AbstractMap], forward_search: str = True): + """ + Constructor of BreadthFirstSearchRoadBlock class + :param start_roadblock_id: roadblock id where graph starts + :param map_api: map class in nuPlan + :param forward_search: whether to search in driving direction, defaults to True + """ + self._map_api: Optional[AbstractMap] = map_api + self._queue = deque([self.id_to_roadblock(start_roadblock_id), None]) + self._parent: Dict[str, Optional[RoadBlockGraphEdgeMapObject]] = dict() + self._forward_search = forward_search + + # lazy loaded + self._target_roadblock_ids: List[str] = None + + def search(self, target_roadblock_id: Union[str, List[str]], + max_depth: int) -> Tuple[List[RoadBlockGraphEdgeMapObject], bool]: + """ + Apply BFS to find route to target roadblock. + :param target_roadblock_id: id of target roadblock + :param max_depth: maximum search depth + :return: tuple of route and whether a path was found + """ + + if isinstance(target_roadblock_id, str): + target_roadblock_id = [target_roadblock_id] + self._target_roadblock_ids = target_roadblock_id + + start_edge = self._queue[0] + + # Initial search states + path_found: bool = False + end_edge: RoadBlockGraphEdgeMapObject = start_edge + end_depth: int = 1 + depth: int = 1 + + self._parent[start_edge.id + f"_{depth}"] = None + + while self._queue: + current_edge = self._queue.popleft() + + # Early exit condition + if self._check_end_condition(depth, max_depth): + break + + # Depth tracking + if current_edge is None: + depth += 1 + self._queue.append(None) + if self._queue[0] is None: + break + continue + + # Goal condition + if self._check_goal_condition(current_edge, depth, max_depth): + end_edge = current_edge + end_depth = depth + path_found = True + break + + neighbors = (current_edge.outgoing_edges if self._forward_search else current_edge.incoming_edges) + + # Populate queue + for next_edge in neighbors: + # if next_edge.id in self._candidate_lane_edge_ids_old: + self._queue.append(next_edge) + self._parent[next_edge.id + f"_{depth + 1}"] = current_edge + end_edge = next_edge + end_depth = depth + 1 + + return self._construct_path(end_edge, end_depth), path_found + + def id_to_roadblock(self, id: str) -> RoadBlockGraphEdgeMapObject: + """ + Retrieves roadblock from map-api based on id + :param id: id of roadblock + :return: roadblock class + """ + block = self._map_api._get_roadblock(id) + block = block or self._map_api._get_roadblock_connector(id) + return block + + @staticmethod + def _check_end_condition(depth: int, max_depth: int) -> bool: + """ + Check if the search should end regardless if the goal condition is met. + :param depth: The current depth to check. + :param target_depth: The target depth to check against. + :return: whether depth exceeds the target depth. + """ + return depth > max_depth + + def _check_goal_condition( + self, + current_edge: RoadBlockGraphEdgeMapObject, + depth: int, + max_depth: int, + ) -> bool: + """ + Check if the current edge is at the target roadblock at the given depth. + :param current_edge: edge to check. + :param depth: current depth to check. + :param max_depth: maximum depth the edge should be at. + :return: True if the lane edge is contain the in the target roadblock. False, otherwise. + """ + return current_edge.id in self._target_roadblock_ids and depth <= max_depth + + def _construct_path(self, end_edge: RoadBlockGraphEdgeMapObject, depth: int) -> List[RoadBlockGraphEdgeMapObject]: + """ + Constructs a path when goal was found. + :param end_edge: The end edge to start back propagating back to the start edge. + :param depth: The depth of the target edge. + :return: The constructed path as a list of RoadBlockGraphEdgeMapObject + """ + path = [end_edge] + path_id = [end_edge.id] + + while self._parent[end_edge.id + f"_{depth}"] is not None: + path.append(self._parent[end_edge.id + f"_{depth}"]) + path_id.append(path[-1].id) + end_edge = self._parent[end_edge.id + f"_{depth}"] + depth -= 1 + + if self._forward_search: + path.reverse() + path_id.reverse() + + return (path, path_id) diff --git a/scenarionet/converter/nuplan/block_utils/dijkstra.py b/scenarionet/converter/nuplan/block_utils/dijkstra.py new file mode 100644 index 0000000..3296916 --- /dev/null +++ b/scenarionet/converter/nuplan/block_utils/dijkstra.py @@ -0,0 +1,149 @@ +from typing import Dict, List, Optional, Tuple +import numpy as np +try: + from nuplan.common.maps.abstract_map_objects import ( + LaneGraphEdgeMapObject, + RoadBlockGraphEdgeMapObject, + ) +finally: + pass + + +class Dijkstra: + """ + A class that performs dijkstra's shortest path. The class operates on lane level graph search. + The goal condition is specified to be if the lane can be found at the target roadblock or roadblock connector. + """ + def __init__(self, start_edge: LaneGraphEdgeMapObject, candidate_lane_edge_ids: List[str]): + """ + Constructor for the Dijkstra class. + :param start_edge: The starting edge for the search + :param candidate_lane_edge_ids: The candidates lane ids that can be included in the search. + """ + self._queue = list([start_edge]) + self._parent: Dict[str, Optional[LaneGraphEdgeMapObject]] = dict() + self._candidate_lane_edge_ids = candidate_lane_edge_ids + + def search(self, target_roadblock: RoadBlockGraphEdgeMapObject) -> Tuple[List[LaneGraphEdgeMapObject], bool]: + """ + Performs dijkstra's shortest path to find a route to the target roadblock. + :param target_roadblock: The target roadblock the path should end at. + :return: + - A route starting from the given start edge + - A bool indicating if the route is successfully found. Successful means that there exists a path + from the start edge to an edge contained in the end roadblock. + If unsuccessful the shortest deepest path is returned. + """ + start_edge = self._queue[0] + + # Initial search states + path_found: bool = False + end_edge: LaneGraphEdgeMapObject = start_edge + + self._parent[start_edge.id] = None + self._frontier = [start_edge.id] + self._dist = [1] + self._depth = [1] + + self._expanded = [] + self._expanded_id = [] + self._expanded_dist = [] + self._expanded_depth = [] + + while len(self._queue) > 0: + dist, idx = min((val, idx) for (idx, val) in enumerate(self._dist)) + current_edge = self._queue[idx] + current_depth = self._depth[idx] + + del self._dist[idx], self._queue[idx], self._frontier[idx], self._depth[idx] + + if self._check_goal_condition(current_edge, target_roadblock): + end_edge = current_edge + path_found = True + break + + self._expanded.append(current_edge) + self._expanded_id.append(current_edge.id) + self._expanded_dist.append(dist) + self._expanded_depth.append(current_depth) + + # Populate queue + for next_edge in current_edge.outgoing_edges: + if not next_edge.id in self._candidate_lane_edge_ids: + continue + + alt = dist + self._edge_cost(next_edge) + if next_edge.id not in self._expanded_id and next_edge.id not in self._frontier: + self._parent[next_edge.id] = current_edge + self._queue.append(next_edge) + self._frontier.append(next_edge.id) + self._dist.append(alt) + self._depth.append(current_depth + 1) + end_edge = next_edge + + elif next_edge.id in self._frontier: + next_edge_idx = self._frontier.index(next_edge.id) + current_cost = self._dist[next_edge_idx] + if alt < current_cost: + self._parent[next_edge.id] = current_edge + self._dist[next_edge_idx] = alt + self._depth[next_edge_idx] = current_depth + 1 + + if not path_found: + # filter max depth + max_depth = max(self._expanded_depth) + idx_max_depth = list(np.where(np.array(self._expanded_depth) == max_depth)[0]) + dist_at_max_depth = [self._expanded_dist[i] for i in idx_max_depth] + + dist, _idx = min((val, idx) for (idx, val) in enumerate(dist_at_max_depth)) + end_edge = self._expanded[idx_max_depth[_idx]] + + return self._construct_path(end_edge), path_found + + @staticmethod + def _edge_cost(lane: LaneGraphEdgeMapObject) -> float: + """ + Edge cost of given lane. + :param lane: lane class + :return: length of lane + """ + return lane.baseline_path.length + + @staticmethod + def _check_end_condition(depth: int, target_depth: int) -> bool: + """ + Check if the search should end regardless if the goal condition is met. + :param depth: The current depth to check. + :param target_depth: The target depth to check against. + :return: True if: + - The current depth exceeds the target depth. + """ + return depth > target_depth + + @staticmethod + def _check_goal_condition( + current_edge: LaneGraphEdgeMapObject, + target_roadblock: RoadBlockGraphEdgeMapObject, + ) -> bool: + """ + Check if the current edge is at the target roadblock at the given depth. + :param current_edge: The edge to check. + :param target_roadblock: The target roadblock the edge should be contained in. + :return: whether the current edge is in the target roadblock + """ + return current_edge.get_roadblock_id() == target_roadblock.id + + def _construct_path(self, end_edge: LaneGraphEdgeMapObject) -> List[LaneGraphEdgeMapObject]: + """ + :param end_edge: The end edge to start back propagating back to the start edge. + :param depth: The depth of the target edge. + :return: The constructed path as a list of LaneGraphEdgeMapObject + """ + path = [end_edge] + while self._parent[end_edge.id] is not None: + node = self._parent[end_edge.id] + path.append(node) + end_edge = node + path.reverse() + + return path diff --git a/scenarionet/converter/nuplan/block_utils/route_utils.py b/scenarionet/converter/nuplan/block_utils/route_utils.py new file mode 100644 index 0000000..b29d809 --- /dev/null +++ b/scenarionet/converter/nuplan/block_utils/route_utils.py @@ -0,0 +1,230 @@ +from typing import Dict, List, Tuple + +import numpy as np +try: + from nuplan.common.actor_state.ego_state import EgoState + from nuplan.common.actor_state.state_representation import StateSE2 + from nuplan.common.maps.abstract_map import AbstractMap + from nuplan.common.maps.abstract_map_objects import RoadBlockGraphEdgeMapObject + from nuplan.common.maps.maps_datatypes import SemanticMapLayer + from nuplan.planning.simulation.occupancy_map.strtree_occupancy_map import ( + STRTreeOccupancyMapFactory, + ) +finally: + pass + +from scenarionet.converter.nuplan.block_utils.bfs_roadblock import BreadthFirstSearchRoadBlock + + +def normalize_angle(angle: float) -> float: + return (angle + np.pi) % (2 * np.pi) - np.pi + + +def get_current_roadblock_candidates( + ego_state: EgoState, + map_api: AbstractMap, + route_roadblocks_dict: Dict[str, RoadBlockGraphEdgeMapObject], + heading_error_thresh: float = np.pi / 4, + displacement_error_thresh: float = 3, +) -> Tuple[RoadBlockGraphEdgeMapObject, List[RoadBlockGraphEdgeMapObject]]: + """ + Determines a set of roadblock candidate where ego is located + :param ego_state: class containing ego state + :param map_api: map object + :param route_roadblocks_dict: dictionary of on-route roadblocks + :param heading_error_thresh: maximum heading error, defaults to np.pi/4 + :param displacement_error_thresh: maximum displacement, defaults to 3 + :return: tuple of most promising roadblock and other candidates + """ + ego_pose: StateSE2 = ego_state.rear_axle + roadblock_candidates = [] + + layers = [SemanticMapLayer.ROADBLOCK, SemanticMapLayer.ROADBLOCK_CONNECTOR] + roadblock_dict = map_api.get_proximal_map_objects(point=ego_pose.point, radius=2.5, layers=layers) + roadblock_candidates = ( + roadblock_dict[SemanticMapLayer.ROADBLOCK] + roadblock_dict[SemanticMapLayer.ROADBLOCK_CONNECTOR] + ) + + if not roadblock_candidates: + for layer in layers: + roadblock_id_, distance = map_api.get_distance_to_nearest_map_object(point=ego_pose.point, layer=layer) + roadblock = map_api.get_map_object(roadblock_id_, layer) + + if roadblock: + roadblock_candidates.append(roadblock) + + on_route_candidates, on_route_candidate_displacement_errors = [], [] + candidates, candidate_displacement_errors = [], [] + + roadblock_displacement_errors = [] + roadblock_heading_errors = [] + + for idx, roadblock in enumerate(roadblock_candidates): + lane_displacement_error, lane_heading_error = np.inf, np.inf + + for lane in roadblock.interior_edges: + lane_discrete_path: List[StateSE2] = lane.baseline_path.discrete_path + lane_discrete_points = np.array([state.point.array for state in lane_discrete_path], dtype=np.float64) + lane_state_distances = ((lane_discrete_points - ego_pose.point.array[None, ...])**2.0).sum(axis=-1)**0.5 + argmin = np.argmin(lane_state_distances) + + heading_error = np.abs(normalize_angle(lane_discrete_path[argmin].heading - ego_pose.heading)) + displacement_error = lane_state_distances[argmin] + + if displacement_error < lane_displacement_error: + lane_heading_error, lane_displacement_error = ( + heading_error, + displacement_error, + ) + + if (heading_error < heading_error_thresh and displacement_error < displacement_error_thresh): + if roadblock.id in route_roadblocks_dict.keys(): + on_route_candidates.append(roadblock) + on_route_candidate_displacement_errors.append(displacement_error) + else: + candidates.append(roadblock) + candidate_displacement_errors.append(displacement_error) + + roadblock_displacement_errors.append(lane_displacement_error) + roadblock_heading_errors.append(lane_heading_error) + + if on_route_candidates: # prefer on-route roadblocks + return ( + on_route_candidates[np.argmin(on_route_candidate_displacement_errors)], + on_route_candidates, + ) + elif candidates: # fallback to most promising candidate + return candidates[np.argmin(candidate_displacement_errors)], candidates + + # otherwise, just find any close roadblock + return ( + roadblock_candidates[np.argmin(roadblock_displacement_errors)], + roadblock_candidates, + ) + + +def route_roadblock_correction( + ego_state: EgoState, + map_api: AbstractMap, + route_roadblock_ids: List[str], + search_depth_backward: int = 15, + search_depth_forward: int = 30, +) -> List[str]: + """ + Applies several methods to correct route roadblocks. + :param ego_state: class containing ego state + :param map_api: map object + :param route_roadblocks_dict: dictionary of on-route roadblocks + :param search_depth_backward: depth of forward BFS search, defaults to 15 + :param search_depth_forward: depth of backward BFS search, defaults to 30 + :return: list of roadblock id's of corrected route + """ + + route_roadblock_dict = {} + for id_ in route_roadblock_ids: + block = map_api.get_map_object(id_, SemanticMapLayer.ROADBLOCK) + block = block or map_api.get_map_object(id_, SemanticMapLayer.ROADBLOCK_CONNECTOR) + route_roadblock_dict[id_] = block + + starting_block, starting_block_candidates = get_current_roadblock_candidates( + ego_state, map_api, route_roadblock_dict + ) + starting_block_ids = [roadblock.id for roadblock in starting_block_candidates] + + route_roadblocks = list(route_roadblock_dict.values()) + route_roadblock_ids = list(route_roadblock_dict.keys()) + + # Fix 1: when agent starts off-route + if starting_block.id not in route_roadblock_ids: + # Backward search if current roadblock not in route + graph_search = BreadthFirstSearchRoadBlock(route_roadblock_ids[0], map_api, forward_search=False) + (path, path_id), path_found = graph_search.search(starting_block_ids, max_depth=search_depth_backward) + + if path_found: + route_roadblocks[:0] = path[:-1] + route_roadblock_ids[:0] = path_id[:-1] + + else: + # Forward search to any route roadblock + graph_search = BreadthFirstSearchRoadBlock(starting_block.id, map_api, forward_search=True) + (path, path_id), path_found = graph_search.search(route_roadblock_ids[:3], max_depth=search_depth_forward) + + if path_found: + end_roadblock_idx = np.argmax(np.array(route_roadblock_ids) == path_id[-1]) + + route_roadblocks = route_roadblocks[end_roadblock_idx + 1:] + route_roadblock_ids = route_roadblock_ids[end_roadblock_idx + 1:] + + route_roadblocks[:0] = path + route_roadblock_ids[:0] = path_id + + # Fix 2: check if roadblocks are linked, search for links if not + roadblocks_to_append = {} + for i in range(len(route_roadblocks) - 1): + next_incoming_block_ids = [_roadblock.id for _roadblock in route_roadblocks[i + 1].incoming_edges] + is_incoming = route_roadblock_ids[i] in next_incoming_block_ids + + if is_incoming: + continue + + graph_search = BreadthFirstSearchRoadBlock(route_roadblock_ids[i], map_api, forward_search=True) + (path, path_id), path_found = graph_search.search(route_roadblock_ids[i + 1], max_depth=search_depth_forward) + + if path_found and path and len(path) >= 3: + path, path_id = path[1:-1], path_id[1:-1] + roadblocks_to_append[i] = (path, path_id) + + # append missing intermediate roadblocks + offset = 1 + for i, (path, path_id) in roadblocks_to_append.items(): + route_roadblocks[i + offset:i + offset] = path + route_roadblock_ids[i + offset:i + offset] = path_id + offset += len(path) + + # Fix 3: cut route-loops + route_roadblocks, route_roadblock_ids = remove_route_loops(route_roadblocks, route_roadblock_ids) + + return route_roadblock_ids + + +def remove_route_loops( + route_roadblocks: List[RoadBlockGraphEdgeMapObject], + route_roadblock_ids: List[str], +) -> Tuple[List[str], List[RoadBlockGraphEdgeMapObject]]: + """ + Remove ending of route, if the roadblock are intersecting the route (forming a loop). + :param route_roadblocks: input route roadblocks + :param route_roadblock_ids: input route roadblocks ids + :return: tuple of ids and roadblocks of route without loops + """ + + roadblock_occupancy_map = None + loop_idx = None + + for idx, roadblock in enumerate(route_roadblocks): + # loops only occur at intersection, thus searching for roadblock-connectors. + if str(roadblock.__class__.__name__) == "NuPlanRoadBlockConnector": + if not roadblock_occupancy_map: + roadblock_occupancy_map = STRTreeOccupancyMapFactory.get_from_geometry( + [roadblock.polygon], [roadblock.id] + ) + continue + + strtree, index_by_id = roadblock_occupancy_map._build_strtree() + indices = strtree.query(roadblock.polygon) + if len(indices) > 0: + for geom in strtree.geometries.take(indices): + area = geom.intersection(roadblock.polygon).area + if area > 1: + loop_idx = idx + break + if loop_idx: + break + + roadblock_occupancy_map.insert(roadblock.id, roadblock.polygon) + + if loop_idx: + route_roadblocks = route_roadblocks[:loop_idx] + route_roadblock_ids = route_roadblock_ids[:loop_idx] + + return route_roadblocks, route_roadblock_ids diff --git a/scenarionet/converter/nuplan/utils.py b/scenarionet/converter/nuplan/utils.py index b45753e..f3e3cb8 100644 --- a/scenarionet/converter/nuplan/utils.py +++ b/scenarionet/converter/nuplan/utils.py @@ -4,7 +4,7 @@ import tempfile from dataclasses import dataclass from os.path import join from typing import Union - +from scenarionet.converter.nuplan.block_utils.route_utils import route_roadblock_correction import numpy as np from metadrive.scenario import ScenarioDescription as SD from metadrive.type import MetaDriveType @@ -69,7 +69,8 @@ def get_nuplan_scenarios(data_root, map_root, logs: Union[list, None] = None, bu # filter "scenario_filter=all_scenarios", # simulate only one log "scenario_filter.remove_invalid_goals=true", - "scenario_filter.shuffle=true", + "scenario_filter.expand_scenarios=false", + "scenario_filter.shuffle=false", "scenario_filter.log_names=[{}]".format(log_string), # "scenario_filter.scenario_types={}".format(all_scenario_types), # "scenario_filter.scenario_tokens=[]", @@ -78,7 +79,7 @@ def get_nuplan_scenarios(data_root, map_root, logs: Union[list, None] = None, bu # "scenario_filter.limit_total_scenarios=1000", # "scenario_filter.expand_scenarios=true", # "scenario_filter.limit_scenarios_per_type=10", # use 10 scenarios per scenario type - "scenario_filter.timestamp_threshold_s=20", # minial scenario duration (s) + "scenario_filter.timestamp_threshold_s=10", # minial scenario duration (s) ] base_config_path = os.path.join(nuplan_package_path, "planning", "script") @@ -175,7 +176,7 @@ def get_line_type(nuplan_type): raise ValueError("Unknown line tyep: {}".format(nuplan_type)) -def extract_map_features(map_api, center, radius=500): +def extract_map_features(map_api, center, route_block_ids, radius=500): ret = {} np.seterr(all='ignore') # Center is Important ! @@ -235,7 +236,9 @@ def extract_map_features(map_api, center, radius=500): if layer == SemanticMapLayer.ROADBLOCK else [], SD.RIGHT_NEIGHBORS: [edge.id for edge in block.interior_edges[index + 1:]] \ if layer == SemanticMapLayer.ROADBLOCK else [], - SD.POLYGON: polygon + SD.POLYGON: polygon, + "is_sdc_route": lane_meta_data.get_roadblock_id() in route_block_ids, + "speed_limit_mps": lane_meta_data.speed_limit_mps, } if layer == SemanticMapLayer.ROADBLOCK_CONNECTOR: continue @@ -513,8 +516,15 @@ def convert_nuplan_scenario(scenario: NuPlanScenario, version): # traffic light result[SD.DYNAMIC_MAP_STATES] = extract_traffic_light(scenario, scenario_center) + # route + route_block_ids = scenario.get_route_roadblock_ids() + try: + route_block_ids = route_roadblock_correction(state, scenario.map_api, route_block_ids) + except Exception as e: + logger.error("Route correction failed: {}".format(e)) + # map - result[SD.MAP_FEATURES] = extract_map_features(scenario.map_api, scenario_center) + result[SD.MAP_FEATURES] = extract_map_features(scenario.map_api, scenario_center, route_block_ids) return result