Source code for gammalearn.tests.test_datasets

import unittest
import os
import collections
import numpy as np
import gammalearn.datasets as dsets
import gammalearn.utils as utils
from gammalearn.logging import LOGGING_CONFIG
from gammalearn.constants import GAMMA_ID, PROTON_ID, ELECTRON_ID
import logging

import warnings
warnings.filterwarnings("ignore")

THIS_DIR = os.path.dirname(os.path.abspath(__file__))

logging.config.dictConfig(LOGGING_CONFIG)


[docs]class TestLSTDataset(unittest.TestCase):
[docs] def setUp(self) -> None: self.data_file = os.path.join(THIS_DIR, '../../share/data/MC_data/dl1_gamma_example.h5') self.camera_type = 'LST_LSTCam' self.particle_dict = {GAMMA_ID: 1, ELECTRON_ID: 0, PROTON_ID: 0} self.subarray = [1] self.targets = collections.OrderedDict({ 'energy': { 'output_shape': 1, }, 'impact': { 'output_shape': 2, }, 'direction': { 'output_shape': 2, }, 'class': { 'output_shape': 2, 'label_shape': 1, }, }) self.group_by_image = { 0: { 'image_0': np.float32(1.5583277), 'time_0': np.float32(-0.31744528), 'labels': { 'energy': np.float32(0.3544642), 'corex': np.float32(-158.21386719), 'corey': np.float32(188.8341217), 'alt': np.float32(1.22173047), 'az': np.float32(3.14159274) }, 'telescope': { 'alt': np.float32(1.2217305), 'az': np.float32(3.1415927), 'position': np.array([-70.93, -52.07, 43.0]) } }, 2: { 'image_0': np.float32(3.4182658), 'time_0': np.float32(26.897005), 'labels': { 'energy': np.float32(0.12193353), 'corex': np.float32(-36.47265244), 'corey': np.float32(-294.10467529), 'alt': np.float32(1.22173047), 'az': np.float32(3.14159274) }, 'telescope': { 'alt': np.float32(1.2217305), 'az': np.float32(3.1415927), 'position': np.array([-70.93, -52.07, 43.0]) } }, } self.energy_filter_parameters = {'energy': [0.1, np.inf], 'filter_only_gammas': True} self.energy_filter_true_events = 7 self.group_by_image_energy = { 1: { 'image_0': np.float32(3.4182658), 'time_0': np.float32(26.897005), } } self.intensity_filter_parameters = {'intensity': [500, np.inf], 'cleaning': True, 'picture_thresh': 8, 'boundary_thresh': 4, 'keep_isolated_pixels': False, 'min_number_picture_neighbors': 1 } self.group_by_image_intensity = { 1: { 'image_0': np.float32(2.447867), 'time_0': np.float32(7.753113), } } self.intensity_lstchain_filter_parameters = {'intensity': [500, np.inf], 'dl1': True} self.group_by_image_intensity_energy = { 1: { 'image_0': np.float32(2.447867), 'time_0': np.float32(7.753113), } } self.len_trig_energies = 17
[docs] def test_mono_memory(self): dataset = dsets.MemoryLSTDataset(self.data_file, self.camera_type, 'image', self.targets, self.particle_dict, use_time=True, subarray=self.subarray) sample_0 = dataset[0] assert sample_0['image'][0, 0] == self.group_by_image[0]['image_0'] assert sample_0['image'][1, 0] == self.group_by_image[0]['time_0'] assert np.isclose(sample_0['label']['energy'], np.log10(self.group_by_image[0]['labels']['energy'])) assert np.isclose(sample_0['label']['impact'][0], (self.group_by_image[0]['labels']['corex'] - self.group_by_image[0]['telescope']['position'][0])/1000) assert np.isclose(sample_0['label']['impact'][1], (self.group_by_image[0]['labels']['corey'] - self.group_by_image[0]['telescope']['position'][1]) / 1000) assert np.isclose(sample_0['label']['direction'][0], (self.group_by_image[0]['labels']['alt'] - self.group_by_image[0]['telescope']['alt'])) assert np.isclose(sample_0['label']['direction'][1], (self.group_by_image[0]['labels']['az'] - self.group_by_image[0]['telescope']['az'])) sample_2 = dataset[2] assert sample_2['image'][0, 0] == self.group_by_image[2]['image_0'] assert sample_2['image'][1, 0] == self.group_by_image[2]['time_0'] assert np.isclose(sample_2['label']['energy'], np.log10(self.group_by_image[2]['labels']['energy'])) assert np.isclose(sample_2['label']['impact'][0], (self.group_by_image[2]['labels']['corex'] - self.group_by_image[2]['telescope']['position'][0]) / 1000) assert np.isclose(sample_2['label']['impact'][1], (self.group_by_image[2]['labels']['corey'] - self.group_by_image[2]['telescope']['position'][1]) / 1000) assert np.isclose(sample_2['label']['direction'][0], (self.group_by_image[2]['labels']['alt'] - self.group_by_image[2]['telescope']['alt'])) assert np.isclose(sample_2['label']['direction'][1], (self.group_by_image[2]['labels']['az'] - self.group_by_image[2]['telescope']['az']))
[docs] def test_mono_memory_test_mode(self): dataset = dsets.MemoryLSTDataset(self.data_file, self.camera_type, 'image', self.targets, self.particle_dict, use_time=True, train=False, subarray=self.subarray) sample_0 = dataset[0] assert sample_0['image'][0, 0] == self.group_by_image[0]['image_0'] assert sample_0['image'][1, 0] == self.group_by_image[0]['time_0'] assert np.isclose(sample_0['dl1_params']['mc_energy'], self.group_by_image[0]['labels']['energy']) assert np.isclose(sample_0['dl1_params']['mc_core_x'], self.group_by_image[0]['labels']['corex']/1000) assert np.isclose(sample_0['dl1_params']['mc_alt'], self.group_by_image[0]['labels']['alt']) sample_2 = dataset[2] assert sample_2['image'][0, 0] == self.group_by_image[2]['image_0'] assert sample_2['image'][1, 0] == self.group_by_image[2]['time_0'] assert np.isclose(sample_2['dl1_params']['mc_energy'], self.group_by_image[2]['labels']['energy']) assert np.isclose(sample_2['dl1_params']['mc_core_x'], self.group_by_image[2]['labels']['corex']/1000) assert np.isclose(sample_2['dl1_params']['mc_alt'], self.group_by_image[2]['labels']['alt'])
[docs] def test_mono_file(self): dataset = dsets.FileLSTDataset(self.data_file, self.camera_type, 'image', self.targets, self.particle_dict, use_time=True, subarray=self.subarray) sample_0 = dataset[0] assert sample_0['image'][0, 0] == self.group_by_image[0]['image_0'] assert sample_0['image'][1, 0] == self.group_by_image[0]['time_0'] assert np.isclose(sample_0['label']['energy'], np.log10(self.group_by_image[0]['labels']['energy'])) assert np.isclose(sample_0['label']['impact'][0], (self.group_by_image[0]['labels']['corex'] - self.group_by_image[0]['telescope']['position'][0]) / 1000) assert np.isclose(sample_0['label']['impact'][1], (self.group_by_image[0]['labels']['corey'] - self.group_by_image[0]['telescope']['position'][1]) / 1000) assert np.isclose(sample_0['label']['direction'][0], (self.group_by_image[0]['labels']['alt'] - self.group_by_image[0]['telescope']['alt'])) assert np.isclose(sample_0['label']['direction'][1], (self.group_by_image[0]['labels']['az'] - self.group_by_image[0]['telescope']['az'])) sample_2 = dataset[2] assert sample_2['image'][0, 0] == self.group_by_image[2]['image_0'] assert sample_2['image'][1, 0] == self.group_by_image[2]['time_0'] assert np.isclose(sample_2['label']['energy'], np.log10(self.group_by_image[2]['labels']['energy'])) assert np.isclose(sample_2['label']['impact'][0], (self.group_by_image[2]['labels']['corex'] - self.group_by_image[2]['telescope']['position'][0]) / 1000) assert np.isclose(sample_2['label']['impact'][1], (self.group_by_image[2]['labels']['corey'] - self.group_by_image[2]['telescope']['position'][1]) / 1000) assert np.isclose(sample_2['label']['direction'][0], (self.group_by_image[2]['labels']['alt'] - self.group_by_image[2]['telescope']['alt'])) assert np.isclose(sample_2['label']['direction'][1], (self.group_by_image[2]['labels']['az'] - self.group_by_image[2]['telescope']['az']))
[docs] def test_energy_filter_file(self): dataset_mono = dsets.FileLSTDataset(self.data_file, self.camera_type, 'image', self.targets, self.particle_dict, use_time=True, subarray=self.subarray) dataset_mono.filter_event({utils.energyband_filter: self.energy_filter_parameters}) assert len(dataset_mono.dl1_params['mc_energy']) == self.energy_filter_true_events sample_1 = dataset_mono[1] assert sample_1['image'][0, 0] == self.group_by_image_energy[1]['image_0'] assert sample_1['image'][1, 0] == self.group_by_image_energy[1]['time_0']
[docs] def test_energy_filter_memory(self): dataset_mono = dsets.MemoryLSTDataset(self.data_file, self.camera_type, 'image', self.targets, self.particle_dict, use_time=True, subarray=self.subarray) dataset_mono.filter_event({utils.energyband_filter: self.energy_filter_parameters}) assert len(dataset_mono.dl1_params['mc_energy']) == self.energy_filter_true_events sample_1 = dataset_mono[1] assert sample_1['image'][0, 0] == self.group_by_image_energy[1]['image_0'] assert sample_1['image'][1, 0] == self.group_by_image_energy[1]['time_0']
[docs] def test_intensity_filter_memory(self): dataset_mono = dsets.MemoryLSTDataset(self.data_file, self.camera_type, 'image', self.targets, self.particle_dict, use_time=True, subarray=self.subarray) dataset_mono.filter_image({utils.intensity_filter: self.intensity_filter_parameters}) sample_0 = dataset_mono[1] assert sample_0['image'][0, 0] == self.group_by_image_intensity[1]['image_0'] assert sample_0['image'][1, 0] == self.group_by_image_intensity[1]['time_0']
[docs] def test_intensity_filter_file(self): dataset_mono = dsets.FileLSTDataset(self.data_file, self.camera_type, 'image', self.targets, self.particle_dict, use_time=True, subarray=self.subarray) dataset_mono.filter_image({utils.intensity_filter: self.intensity_filter_parameters}) sample_0 = dataset_mono[1] assert sample_0['image'][0, 0] == self.group_by_image_intensity[1]['image_0'] assert sample_0['image'][1, 0] == self.group_by_image_intensity[1]['time_0']
[docs] def test_intensity_lstchain_filter_memory(self): dataset_mono = dsets.MemoryLSTDataset(self.data_file, self.camera_type, 'image', self.targets, self.particle_dict, use_time=True, subarray=self.subarray) dataset_mono.filter_image({utils.intensity_filter: self.intensity_lstchain_filter_parameters}) sample_0 = dataset_mono[1] assert sample_0['image'][0, 0] == self.group_by_image_intensity[1]['image_0'] assert sample_0['image'][1, 0] == self.group_by_image_intensity[1]['time_0']
[docs] def test_intensity_lstchain_filter_file(self): dataset_mono = dsets.FileLSTDataset(self.data_file, self.camera_type, 'image', self.targets, self.particle_dict, use_time=True, subarray=self.subarray) dataset_mono.filter_image({utils.intensity_filter: self.intensity_lstchain_filter_parameters}) sample_0 = dataset_mono[1] assert sample_0['image'][0, 0] == self.group_by_image_intensity[1]['image_0'] assert sample_0['image'][1, 0] == self.group_by_image_intensity[1]['time_0']
[docs] def test_intensity_energy_filter_memory(self): dataset_mono = dsets.MemoryLSTDataset(self.data_file, self.camera_type, 'image', self.targets, self.particle_dict, use_time=True, subarray=self.subarray) dataset_mono.filter_image({utils.intensity_filter: self.intensity_filter_parameters}) dataset_mono.filter_event({utils.energyband_filter: self.energy_filter_parameters}) sample_2 = dataset_mono[1] assert sample_2['image'][0, 0] == self.group_by_image_intensity_energy[1]['image_0'] assert sample_2['image'][1, 0] == self.group_by_image_intensity_energy[1]['time_0']
[docs] def test_intensity_energy_filter_file(self): dataset_mono = dsets.FileLSTDataset(self.data_file, self.camera_type, 'image', self.targets, self.particle_dict, use_time=True, subarray=self.subarray) dataset_mono.filter_image({utils.intensity_filter: self.intensity_filter_parameters}) dataset_mono.filter_event({utils.energyband_filter: self.energy_filter_parameters}) sample_2 = dataset_mono[1] assert sample_2['image'][0, 0] == self.group_by_image_intensity_energy[1]['image_0'] assert sample_2['image'][1, 0] == self.group_by_image_intensity_energy[1]['time_0']
[docs] def test_subarray(self): dataset_mono = dsets.MemoryLSTDataset(self.data_file, self.camera_type, 'image', self.targets, self.particle_dict, use_time=True, subarray=self.subarray) assert len(dataset_mono.trig_energies) == self.len_trig_energies assert len(dataset_mono.images) == self.len_trig_energies
# TODO test stereo
[docs]class TestLSTRealDataset(unittest.TestCase):
[docs] def setUp(self) -> None: self.data_file = os.path.join(THIS_DIR, '../../share/data/real_data/dl1_realdata_example.h5') self.camera_type = 'LST_LSTCam' self.subarray = [1] self.group_by_image = { 0: { 'image_0': np.float32(2.8471088), 'time_0': np.float32(8.674426), 'telescope': { 'alt': np.float32(1.25764907), 'az': np.float32(0.80768447), 'position': np.array([50., 50., 16.]) } }, 2: { 'image_0': np.float32(0.822368), 'time_0': np.float32(31.549051), 'telescope': { 'alt': np.float32(1.25764908), 'az': np.float32(0.80768444), 'position': np.array([50., 50., 16.]) } }, } self.intensity_filter_parameters = {'intensity': [0, 250], 'cleaning': True, 'picture_thresh': 8, 'boundary_thresh': 4, 'keep_isolated_pixels': False, 'min_number_picture_neighbors': 1 } self.group_by_image_intensity = { 0: { 'image_0': np.float32(3.4779184), 'time_0': np.float32(19.180153), } } self.intensity_lstchain_filter_parameters = {'intensity': [0, 250], 'dl1': True}
[docs] def test_mono_memory(self): dataset = dsets.MemoryLSTDataset(self.data_file, self.camera_type, 'image', use_time=True, subarray=self.subarray) assert dataset.trig_energies is None sample_0 = dataset[0] assert sample_0['image'][0, 0] == self.group_by_image[0]['image_0'] assert sample_0['image'][1, 0] == self.group_by_image[0]['time_0'] sample_2 = dataset[2] assert sample_2['image'][0, 0] == self.group_by_image[2]['image_0'] assert sample_2['image'][1, 0] == self.group_by_image[2]['time_0']
[docs] def test_mono_memory_test_mode(self): dataset = dsets.MemoryLSTDataset(self.data_file, self.camera_type, 'image', use_time=True, train=False, subarray=self.subarray) sample_0 = dataset[0] assert sample_0['image'][0, 0] == self.group_by_image[0]['image_0'] assert sample_0['image'][1, 0] == self.group_by_image[0]['time_0'] sample_2 = dataset[2] assert sample_2['image'][0, 0] == self.group_by_image[2]['image_0'] assert sample_2['image'][1, 0] == self.group_by_image[2]['time_0']
[docs] def test_mono_file(self): dataset = dsets.FileLSTDataset(self.data_file, self.camera_type, 'image', use_time=True, subarray=self.subarray) sample_0 = dataset[0] assert sample_0['image'][0, 0] == self.group_by_image[0]['image_0'] assert sample_0['image'][1, 0] == self.group_by_image[0]['time_0'] sample_2 = dataset[2] assert sample_2['image'][0, 0] == self.group_by_image[2]['image_0'] assert sample_2['image'][1, 0] == self.group_by_image[2]['time_0']
[docs] def test_intensity_filter_memory(self): dataset_mono = dsets.MemoryLSTDataset(self.data_file, self.camera_type, 'image', use_time=True, subarray=self.subarray) dataset_mono.filter_image({utils.intensity_filter: self.intensity_filter_parameters}) sample_0 = dataset_mono[0] assert sample_0['image'][0, 0] == self.group_by_image_intensity[0]['image_0'] assert sample_0['image'][1, 0] == self.group_by_image_intensity[0]['time_0']
[docs] def test_intensity_filter_file(self): dataset_mono = dsets.FileLSTDataset(self.data_file, self.camera_type, 'image', use_time=True, subarray=self.subarray) dataset_mono.filter_image({utils.intensity_filter: self.intensity_filter_parameters}) sample_0 = dataset_mono[0] assert sample_0['image'][0, 0] == self.group_by_image_intensity[0]['image_0'] assert sample_0['image'][1, 0] == self.group_by_image_intensity[0]['time_0']
[docs] def test_intensity_lstchain_filter_memory(self): dataset_mono = dsets.MemoryLSTDataset(self.data_file, self.camera_type, 'image', use_time=True, subarray=self.subarray) dataset_mono.filter_image({utils.intensity_filter: self.intensity_lstchain_filter_parameters}) sample_0 = dataset_mono[0] assert sample_0['image'][0, 0] == self.group_by_image_intensity[0]['image_0'] assert sample_0['image'][1, 0] == self.group_by_image_intensity[0]['time_0']
[docs] def test_intensity_lstchain_filter_file(self): dataset_mono = dsets.FileLSTDataset(self.data_file, self.camera_type, 'image', use_time=True, subarray=self.subarray) dataset_mono.filter_image({utils.intensity_filter: self.intensity_lstchain_filter_parameters}) sample_0 = dataset_mono[0] assert sample_0['image'][0, 0] == self.group_by_image_intensity[0]['image_0'] assert sample_0['image'][1, 0] == self.group_by_image_intensity[0]['time_0']
# TODO test stereo
[docs]class TestDL1Parameters(unittest.TestCase):
[docs] def setUp(self) -> None: self.lst1_file = os.path.join(THIS_DIR, '../../share/data/real_data/dl1_realdata_example.h5') self.mc_file = os.path.join(THIS_DIR, '../../share/data/MC_data/dl1_gamma_example.h5') self.camera_type = 'LST_LSTCam' self.group_by = 'image' self.targets = [] self.particle_dict = {GAMMA_ID: 1, PROTON_ID: 0}
[docs] def test_train_dl1_parameters(self): mc_dataset = dsets.MemoryLSTDataset(self.mc_file, camera_type=self.camera_type, group_by=self.group_by, targets=self.targets, particle_dict=self.particle_dict) lst1_dataset = dsets.MemoryLSTDataset(self.lst1_file, camera_type=self.camera_type, group_by=self.group_by, targets=self.targets, particle_dict=self.particle_dict) assert mc_dataset[0]['dl1_params'].keys() == lst1_dataset[0]['dl1_params'].keys()
[docs] def test_test_dl1_parameters(self): mc_dataset = dsets.MemoryLSTDataset(self.mc_file, camera_type=self.camera_type, group_by=self.group_by, targets=self.targets, particle_dict=self.particle_dict, train=False) lst1_dataset = dsets.MemoryLSTDataset(self.lst1_file, camera_type=self.camera_type, group_by=self.group_by, targets=self.targets, particle_dict=self.particle_dict, train=False) assert list(mc_dataset[0]['dl1_params'].keys()) == mc_dataset.dl1_param_names assert list(lst1_dataset[0]['dl1_params'].keys()) == lst1_dataset.dl1_param_names
if __name__ == '__main__': unittest.main()