add new test

This commit is contained in:
QuanyiLi
2023-05-08 12:07:09 +01:00
parent 817e7c173c
commit 104f03fbf4
11 changed files with 43 additions and 3 deletions

View File

@@ -19,7 +19,7 @@ def test_filter_dataset():
# nuscenes data has no light # nuscenes data has no light
# light_condition = ScenarioFilter.make(ScenarioFilter.has_traffic_light) # light_condition = ScenarioFilter.make(ScenarioFilter.has_traffic_light)
sdc_driving_condition = ScenarioFilter.make(ScenarioFilter.sdc_moving_dist, target_dist=30, condition="smaller") sdc_driving_condition = ScenarioFilter.make(ScenarioFilter.sdc_moving_dist, target_dist=30, condition="smaller")
answer = ['scene-0553', 'scene-0757', 'scene-1100'] answer = ['sd_nuscenes_v1.0-mini_scene-0553.pkl', '0.pkl', 'sd_nuscenes_v1.0-mini_scene-1100.pkl']
summary, mapping = combine_multiple_dataset( summary, mapping = combine_multiple_dataset(
output_path, output_path,
*dataset_paths, *dataset_paths,
@@ -34,7 +34,7 @@ def test_filter_dataset():
if a in s: if a in s:
in_ = True in_ = True
break break
assert in_ assert in_, summary.keys()
sdc_driving_condition = ScenarioFilter.make(ScenarioFilter.sdc_moving_dist, target_dist=5, condition="greater") sdc_driving_condition = ScenarioFilter.make(ScenarioFilter.sdc_moving_dist, target_dist=5, condition="greater")
summary, mapping = combine_multiple_dataset( summary, mapping = combine_multiple_dataset(

View File

@@ -0,0 +1,29 @@
import os
import os.path
from scenarionet import SCENARIONET_PACKAGE_PATH
from scenarionet.builder.utils import combine_multiple_dataset, read_dataset_summary, read_scenario
from scenarionet.verifier.utils import verify_loading_into_metadrive, set_random_drop
def test_combine_multiple_dataset():
set_random_drop(True)
dataset_name = "nuscenes"
original_dataset_path = os.path.join(SCENARIONET_PACKAGE_PATH, "tests", "test_dataset", dataset_name)
dataset_paths = [original_dataset_path + "_{}".format(i) for i in range(5)]
output_path = os.path.join(SCENARIONET_PACKAGE_PATH, "tests", "combine")
combine_multiple_dataset(output_path, *dataset_paths, force_overwrite=True, try_generate_missing_file=True)
dataset_paths.append(output_path)
for dataset_path in dataset_paths:
summary, sorted_scenarios, mapping = read_dataset_summary(dataset_path)
for scenario_file in sorted_scenarios:
read_scenario(os.path.join(dataset_path, mapping[scenario_file], scenario_file))
success, result = verify_loading_into_metadrive(
dataset_path, result_save_dir="test_dataset", steps_to_run=1000, num_workers=4)
assert success
set_random_drop(False)
if __name__ == '__main__':
test_combine_multiple_dataset()

View File

@@ -2,7 +2,7 @@ import logging
import multiprocessing import multiprocessing
import os import os
from metadrive.scenario.scenario_description import ScenarioDescription as SD import numpy as np
from scenarionet.verifier.error import ErrorDescription as ED from scenarionet.verifier.error import ErrorDescription as ED
from scenarionet.verifier.error import ErrorFile as EF from scenarionet.verifier.error import ErrorFile as EF
@@ -14,6 +14,14 @@ from metadrive.policy.replay_policy import ReplayEgoCarPolicy
from metadrive.scenario.utils import get_number_of_scenarios from metadrive.scenario.utils import get_number_of_scenarios
from functools import partial from functools import partial
# this global variable is for generating broken scenarios for testing
RANDOM_DROP = False
def set_random_drop(drop):
global RANDOM_DROP
RANDOM_DROP = drop
def verify_loading_into_metadrive(dataset_path, result_save_dir, steps_to_run=1000, num_workers=8): def verify_loading_into_metadrive(dataset_path, result_save_dir, steps_to_run=1000, num_workers=8):
assert os.path.exists(result_save_dir) and os.path.isdir(result_save_dir), "result_save_dir must be an existing dir" assert os.path.exists(result_save_dir) and os.path.isdir(result_save_dir), "result_save_dir must be an existing dir"
@@ -56,6 +64,7 @@ def verify_loading_into_metadrive(dataset_path, result_save_dir, steps_to_run=10
def loading_into_metadrive(start_scenario_index, num_scenario, dataset_path, steps_to_run, metadrive_config=None): def loading_into_metadrive(start_scenario_index, num_scenario, dataset_path, steps_to_run, metadrive_config=None):
global RANDOM_DROP
logger.info( logger.info(
"================ Begin Scenario Loading Verification for scenario {}-{} ================ \n".format( "================ Begin Scenario Loading Verification for scenario {}-{} ================ \n".format(
start_scenario_index, num_scenario + start_scenario_index)) start_scenario_index, num_scenario + start_scenario_index))
@@ -77,6 +86,8 @@ def loading_into_metadrive(start_scenario_index, num_scenario, dataset_path, ste
try: try:
env.reset(force_seed=scenario_index) env.reset(force_seed=scenario_index)
arrive = False arrive = False
if RANDOM_DROP and np.random.rand() < 0.5:
raise ValueError("Random Drop")
for _ in range(steps_to_run): for _ in range(steps_to_run):
o, r, d, info = env.step([0, 0]) o, r, d, info = env.step([0, 0])
if d and info["arrive_dest"]: if d and info["arrive_dest"]: