From 817e7c173c4973e96488e3ecfea1a6a18a9f75dc Mon Sep 17 00:00:00 2001 From: QuanyiLi Date: Mon, 8 May 2023 11:40:47 +0100 Subject: [PATCH] build new dataset from error logs --- scenarionet/builder/utils.py | 31 ++++-- scenarionet/converter/utils.py | 23 +---- scenarionet/{examples => scripts}/__init__.py | 0 .../combine_dataset_and_run.py | 0 .../{examples => scripts}/convert_nuplan.py | 0 .../{examples => scripts}/convert_nuscenes.py | 0 .../{examples => scripts}/convert_pg.py | 0 .../{examples => scripts}/convert_waymo.py | 0 scenarionet/verifier/error.py | 95 +++++++++++++++++++ scenarionet/verifier/utils.py | 84 ++++++++-------- 10 files changed, 166 insertions(+), 67 deletions(-) rename scenarionet/{examples => scripts}/__init__.py (100%) rename scenarionet/{examples => scripts}/combine_dataset_and_run.py (100%) rename scenarionet/{examples => scripts}/convert_nuplan.py (100%) rename scenarionet/{examples => scripts}/convert_nuscenes.py (100%) rename scenarionet/{examples => scripts}/convert_pg.py (100%) rename scenarionet/{examples => scripts}/convert_waymo.py (100%) create mode 100644 scenarionet/verifier/error.py diff --git a/scenarionet/builder/utils.py b/scenarionet/builder/utils.py index 4ecf797..73f4e7d 100644 --- a/scenarionet/builder/utils.py +++ b/scenarionet/builder/utils.py @@ -7,6 +7,7 @@ import shutil from typing import Callable, List import metadrive.scenario.utils as sd_utils +import numpy as np from metadrive.scenario.scenario_description import ScenarioDescription logger = logging.getLogger(__name__) @@ -33,7 +34,8 @@ def try_generating_summary(file_folder): def combine_multiple_dataset( - output_path, *dataset_paths, force_overwrite=False, try_generate_missing_file=True, filters: List[Callable] = None + output_path, *dataset_paths, force_overwrite=False, try_generate_missing_file=True, + filters: List[Callable] = None ): """ Combine multiple datasets. Each dataset should have a dataset_summary.pkl @@ -99,10 +101,27 @@ def combine_multiple_dataset( summaries.pop(file) mappings.pop(file) - with open(osp.join(output_abs_path, ScenarioDescription.DATASET.SUMMARY_FILE), "wb+") as f: - pickle.dump(summaries, f) - - with open(osp.join(output_abs_path, ScenarioDescription.DATASET.MAPPING_FILE), "wb+") as f: - pickle.dump(mappings, f) + summary_file = osp.join(output_abs_path, ScenarioDescription.DATASET.SUMMARY_FILE) + mapping_file = osp.join(output_abs_path, ScenarioDescription.DATASET.MAPPING_FILE) + save_summary_anda_mapping(summary_file, mapping_file, summaries, mappings) return summaries, mappings + + +def dict_recursive_remove_array_and_set(d): + if isinstance(d, np.ndarray): + return d.tolist() + if isinstance(d, set): + return tuple(d) + if isinstance(d, dict): + for k in d.keys(): + d[k] = dict_recursive_remove_array_and_set(d[k]) + return d + + +def save_summary_anda_mapping(summary_file_path, mapping_file_path, summary, mapping): + with open(summary_file_path, "wb") as file: + pickle.dump(dict_recursive_remove_array_and_set(summary), file) + with open(mapping_file_path, "wb") as file: + pickle.dump(mapping, file) + print("Dataset Summary and Mapping are saved at: {}".format(summary_file_path)) diff --git a/scenarionet/converter/utils.py b/scenarionet/converter/utils.py index 428a7d1..0457301 100644 --- a/scenarionet/converter/utils.py +++ b/scenarionet/converter/utils.py @@ -6,7 +6,7 @@ import math import os import pickle import shutil - +from scenarionet.builder.utils import save_summary_anda_mapping import numpy as np import tqdm from metadrive.scenario import ScenarioDescription as SD @@ -46,17 +46,6 @@ def compute_angular_velocity(initial_heading, final_heading, dt): return angular_vel -def dict_recursive_remove_array_and_set(d): - if isinstance(d, np.ndarray): - return d.tolist() - if isinstance(d, set): - return tuple(d) - if isinstance(d, dict): - for k in d.keys(): - d[k] = dict_recursive_remove_array_and_set(d[k]) - return d - - def mph_to_kmh(speed_in_mph: float): speed_in_kmh = speed_in_mph * 1.609344 return speed_in_kmh @@ -67,7 +56,7 @@ def contains_explicit_return(f): def write_to_directory( - convert_func, scenarios, output_path, dataset_version, dataset_name, force_overwrite=False, **kwargs + convert_func, scenarios, output_path, dataset_version, dataset_name, force_overwrite=False, **kwargs ): """ Convert a batch of scenarios. @@ -134,12 +123,8 @@ def write_to_directory( with open(p, "wb") as f: pickle.dump(sd_scenario, f) - # store summary file, which is human-readable - with open(summary_file_path, "wb") as file: - pickle.dump(dict_recursive_remove_array_and_set(summary), file) - with open(mapping_file_path, "wb") as file: - pickle.dump(mapping, file) - print("Dataset Summary and Mapping are saved at: {}".format(summary_file_path)) + # store summary file + save_summary_anda_mapping(summary_file_path, mapping_file_path, summary, mapping) # rename and save if delay_remove is not None: diff --git a/scenarionet/examples/__init__.py b/scenarionet/scripts/__init__.py similarity index 100% rename from scenarionet/examples/__init__.py rename to scenarionet/scripts/__init__.py diff --git a/scenarionet/examples/combine_dataset_and_run.py b/scenarionet/scripts/combine_dataset_and_run.py similarity index 100% rename from scenarionet/examples/combine_dataset_and_run.py rename to scenarionet/scripts/combine_dataset_and_run.py diff --git a/scenarionet/examples/convert_nuplan.py b/scenarionet/scripts/convert_nuplan.py similarity index 100% rename from scenarionet/examples/convert_nuplan.py rename to scenarionet/scripts/convert_nuplan.py diff --git a/scenarionet/examples/convert_nuscenes.py b/scenarionet/scripts/convert_nuscenes.py similarity index 100% rename from scenarionet/examples/convert_nuscenes.py rename to scenarionet/scripts/convert_nuscenes.py diff --git a/scenarionet/examples/convert_pg.py b/scenarionet/scripts/convert_pg.py similarity index 100% rename from scenarionet/examples/convert_pg.py rename to scenarionet/scripts/convert_pg.py diff --git a/scenarionet/examples/convert_waymo.py b/scenarionet/scripts/convert_waymo.py similarity index 100% rename from scenarionet/examples/convert_waymo.py rename to scenarionet/scripts/convert_waymo.py diff --git a/scenarionet/verifier/error.py b/scenarionet/verifier/error.py new file mode 100644 index 0000000..607b7f4 --- /dev/null +++ b/scenarionet/verifier/error.py @@ -0,0 +1,95 @@ +import json +import logging +import os +from typing import List + +from metadrive.scenario.scenario_description import ScenarioDescription as SD + +from scenarionet.builder.utils import read_dataset_summary +from scenarionet.builder.utils import save_summary_anda_mapping + +logger = logging.getLogger(__name__) + + +class ErrorDescription: + INDEX = "scenario_index" + PATH = "file_path" + FILE_NAME = "file_name" + ERROR = "error_message" + METADATA = "metadata" + + @classmethod + def make(cls, scenario_index, file_path, file_name, error): + logger.warning( + "\n Scenario Error, " + "scenario_index: {}, file_path: {}.\n Error message: {}".format(scenario_index, file_path, str(error)) + ) + return {cls.INDEX: scenario_index, + cls.PATH: file_path, + cls.FILE_NAME: file_name, + cls.ERROR: str(error)} + + +class ErrorFile: + PREFIX = "error_scenarios_for" + DATASET = "dataset_path" + ERRORS = "errors" + + @classmethod + def dump(cls, save_dir, errors: List, dataset_path): + """ + Save test result + :param save_dir: which dir to save this file + :param errors: error list, containing a list of dict from ErrorDescription.make() + :param dataset_path: dataset_path, the dir of dataset_summary.pkl + """ + file_name = "{}_{}.json".format(cls.PREFIX, os.path.basename(dataset_path)) + with open(os.path.join(save_dir, file_name), "w+") as f: + json.dump({cls.DATASET: dataset_path, cls.ERRORS: errors}, f, indent=4) + + @classmethod + def generate_dataset(cls, error_file_path, new_dataset_path, force_overwrite=False, broken_scenario=False): + """ + Generate a new dataset containing all broken scenarios or all good scenarios + :param error_file_path: error file path + :param new_dataset_path: a directory where you want to store your data + :param force_overwrite: if new_dataset_path exists, whether to overwrite + :param broken_scenario: generate broken scenarios. You can generate such a broken scenarios for debugging + :return: dataset summary, dataset mapping + """ + # TODO Add test! + new_dataset_path = os.path.abspath(new_dataset_path) + if os.path.exists(new_dataset_path) and not force_overwrite: + raise ValueError("Directory: {} already exists! " + "Set force_overwrite=True to overwrite".format(new_dataset_path)) + os.makedirs(new_dataset_path, exist_ok=True) + + with open(error_file_path, "r+") as f: + error_file = json.load(f) + origin_dataset_path = error_file[cls.DATASET] + origin_summary, origin_list, origin_mapping = read_dataset_summary(origin_dataset_path) + errors = error_file[cls.ERRORS] + + # make new summary + new_summary = {} + new_mapping = {} + + new_summary_file_path = os.path.join(new_dataset_path, SD.DATASET.SUMMARY_FILE) + new_mapping_file_path = os.path.join(new_dataset_path, SD.DATASET.MAPPING_FILE) + + if broken_scenario: + for error in errors: + file_name = error[ErrorDescription.FILE_NAME] + new_summary[file_name] = origin_summary[file_name] + scenario_dir = os.path.join(origin_dataset_path, origin_mapping[file_name]) + new_mapping[file_name] = os.path.relpath(scenario_dir, new_dataset_path) + else: + error_scenario = [error[ErrorDescription.FILE_NAME] for error in errors] + for scenario in origin_summary: + if scenario in error_scenario: + continue + new_summary[scenario] = origin_summary[scenario] + scenario_dir = os.path.join(origin_dataset_path, origin_mapping[scenario]) + new_mapping[scenario] = os.path.relpath(scenario_dir, new_dataset_path) + save_summary_anda_mapping(new_summary_file_path, new_mapping_file_path, new_summary, new_mapping) + return new_summary, new_mapping diff --git a/scenarionet/verifier/utils.py b/scenarionet/verifier/utils.py index 3d34e3f..e120de0 100644 --- a/scenarionet/verifier/utils.py +++ b/scenarionet/verifier/utils.py @@ -1,8 +1,12 @@ -import json import logging import multiprocessing import os +from metadrive.scenario.scenario_description import ScenarioDescription as SD + +from scenarionet.verifier.error import ErrorDescription as ED +from scenarionet.verifier.error import ErrorFile as EF + logger = logging.getLogger(__name__) import tqdm from metadrive.envs.scenario_env import ScenarioEnv @@ -12,15 +16,14 @@ from functools import partial def verify_loading_into_metadrive(dataset_path, result_save_dir, steps_to_run=1000, num_workers=8): - if result_save_dir is not None: - assert os.path.exists(result_save_dir - ) and os.path.isdir(result_save_dir), "Argument result_save_dir must be an existing dir" + assert os.path.exists(result_save_dir) and os.path.isdir(result_save_dir), "result_save_dir must be an existing dir" num_scenario = get_number_of_scenarios(dataset_path) if num_scenario < num_workers: # single process logger.info("Use one worker, as num_scenario < num_workers:") num_workers = 1 + # prepare arguments argument_list = [] func = partial(loading_wrapper, dataset_path=dataset_path, steps_to_run=steps_to_run) @@ -32,47 +35,46 @@ def verify_loading_into_metadrive(dataset_path, result_save_dir, steps_to_run=10 scenario_num = num_scenario_each_worker argument_list.append([i * num_scenario_each_worker, scenario_num]) + # Run, workers and process result from worker with multiprocessing.Pool(num_workers) as p: all_result = list(p.imap(func, argument_list)) result = all([i[0] for i in all_result]) - logs = [] - for _, log in all_result: - logs += log + errors = [] + for _, error in all_result: + errors += error - if result_save_dir is not None: - file_name = "error_scenarios_for_{}.json".format(os.path.basename(dataset_path)) - with open(os.path.join(result_save_dir, file_name), "w+") as f: - json.dump(logs, f, indent=4) + # save result + EF.dump(result_save_dir, errors, dataset_path) + # logging if result: logger.info("All scenarios can be loaded successfully!") else: logger.info( - "Fail to load all scenarios, see log for more details! Number of failed scenarios: {}".format(len(logs))) - return result, logs + "Fail to load all scenarios, see log for more details! Number of failed scenarios: {}".format(len(errors))) + return result, errors -def loading_into_metadrive(start_scenario_index, num_scenario, dataset_path, steps_to_run): +def loading_into_metadrive(start_scenario_index, num_scenario, dataset_path, steps_to_run, metadrive_config=None): logger.info( "================ Begin Scenario Loading Verification for scenario {}-{} ================ \n".format( start_scenario_index, num_scenario + start_scenario_index)) success = True - env = ScenarioEnv( - { - "agent_policy": ReplayEgoCarPolicy, - "num_scenarios": num_scenario, - "horizon": 1000, - "start_scenario_index": start_scenario_index, - "no_static_vehicles": False, - "data_directory": dataset_path, - } - ) + metadrive_config = metadrive_config or {} + metadrive_config.update({ + "agent_policy": ReplayEgoCarPolicy, + "num_scenarios": num_scenario, + "horizon": 1000, + "start_scenario_index": start_scenario_index, + "no_static_vehicles": False, + "data_directory": dataset_path, + }) + env = ScenarioEnv(metadrive_config) logging.disable(logging.INFO) - error_files = [] - try: - for scenario_index in tqdm.tqdm(range(start_scenario_index, start_scenario_index + num_scenario), - desc="Scenarios: {}-{}".format(start_scenario_index, - start_scenario_index + num_scenario)): + error_msgs = [] + desc = "Scenarios: {}-{}".format(start_scenario_index, start_scenario_index + num_scenario) + for scenario_index in tqdm.tqdm(range(start_scenario_index, start_scenario_index + num_scenario), desc=desc): + try: env.reset(force_seed=scenario_index) arrive = False for _ in range(steps_to_run): @@ -80,19 +82,17 @@ def loading_into_metadrive(start_scenario_index, num_scenario, dataset_path, ste if d and info["arrive_dest"]: arrive = True assert arrive, "Can not arrive destination" - except Exception as e: - file_name = env.engine.data_manager.summary_lookup[scenario_index] - file_path = os.path.join(dataset_path, env.engine.data_manager.mapping[file_name], file_name) - error_file = {"scenario_index": scenario_index, "file_path": file_path, "error": str(e)} - error_files.append(error_file) - logger.warning( - "\n Scenario Error, " - "scenario_index: {}, file_path: {}.\n Error message: {}".format(scenario_index, file_path, str(e)) - ) - success = False - finally: - env.close() - return success, error_files + except Exception as e: + file_name = env.engine.data_manager.summary_lookup[scenario_index] + file_path = os.path.join(dataset_path, env.engine.data_manager.mapping[file_name], file_name) + error_msg = ED.make(scenario_index, file_path, file_name, str(e)) + error_msgs.append(error_msg) + success = False + # proceed to next scenario + continue + + env.close() + return success, error_msgs def loading_wrapper(arglist, dataset_path, steps_to_run):