Source code for gammalearn.tests.test_utils

import os
import unittest
import torch
import tempfile
import pandas as pd
import numpy as np
from copy import deepcopy

from ctapipe.instrument import CameraGeometry
import astropy.units as u
from astropy.table import Table

import gammalearn.utils as utils


THIS_DIR = os.path.dirname(os.path.abspath(__file__))


[docs]class MockLSTDataset(object): def __init__(self): self.images = np.array([np.full(1855, 0.001), np.full(1855, 1), np.full(1855, 0.0001), np.full(1855, 0.1)]) self.images[3, 903:910] = 30 self.images[2, 1799:1806] = 10 # for cleaning and leakage self.camera_type = 'LST_LSTCam' self.group_by = 'image' self.original_geometry = CameraGeometry.from_name('LSTCam') self.simu = True self.dl1_params = { 'event_id': np.array([0, 0, 1, 2]), 'mc_type': np.array([1, 0, 0, 0]), 'mc_energy': np.array([0.010, 2.5, 0.12, 0.8]), 'log_mc_energy': np.log10(np.array([0.010, 2.5, 0.12, 0.8])), 'mc_alt_tel': np.full(4, np.deg2rad(70)), 'mc_az_tel': np.full(4, np.deg2rad(180)), 'mc_alt': np.deg2rad([71, 75, 68, 69]), 'mc_az': np.deg2rad([180, 180, 179.5, 175]), 'mc_core_x': np.array([50.3, -150, -100, 100])/1000, 'mc_core_y': np.array([48, -51, 0, 0])/1000, 'tel_id': np.array([2, 1, 3, 1]), 'tel_pos_x': np.array([75.28, -70.93, -70.93, -70.93])/1000, 'tel_pos_y': np.array([50.46, -52.07, 53.1, -52.07])/1000, } def __len__(self): return len(self.images)
[docs]class TestUtils(unittest.TestCase):
[docs] def setUp(self) -> None: self.intensity_filter_parameters = [300, np.inf] self.intensity_true_mask = [False, True, False, True] self.cleaning_filter_parameters = {'picture_thresh': 6, 'boundary_thresh': 3, 'keep_isolated_pixels': False, 'min_number_picture_neighbors': 2} self.cleaning_true_mask = [False, False, True, True] self.leakage_parameters = {'leakage2_cut': 0.2, 'picture_thresh': 6, 'boundary_thresh': 3, 'keep_isolated_pixels': False, 'min_number_picture_neighbors': 2} self.leakage_true_mask = [False, False, False, True] self.energy_parameters = {'energy': [0.02, 2], 'filter_only_gammas': True} self.energy_true_mask = [True, False, True, True] self.emission_cone_parameters = {'max_angle': np.deg2rad(4.)} self.emission_cone_true_mask = [True, False, True, True] self.impact_distance_parameters = {'max_distance': 0.05} self.impact_distance_true_mask = [True, False, False, False] self.multiplicity_parameters = {'multiplicity': 2} self.multiplicity_true_mask = [True, True, False, False] self.multiplicity_strict_parameters = {'multiplicity': 1, 'strict': True} self.multiplicity_strict_true_mask = [False, False, True, True] self.multiplicity_strict_true_trig_energies = np.array([0.010, 0.12, 0.8]) self.net_parameters_dic = { 'model': 'GammaPhysNet', 'parameters': { 'backbone': { 'model': 'ResNetAttentionIndexed', 'parameters': { 'num_layers': 3, 'initialization': (torch.nn.init.kaiming_uniform_, {'mode': 'fan_out'}), 'normalization': (torch.nn.BatchNorm2d, {}), 'num_channels': 2, 'block_features': [16, 32, 64], 'attention_layer': ('DualAttention', {'ratio': 16}), } }, 'fc_width': 256, 'last_bias_init': None, } }
[docs] def test_emission_cone(self): self.dataset = MockLSTDataset() assert np.all(utils.emission_cone_filter(self.dataset, **self.emission_cone_parameters) == self.emission_cone_true_mask)
[docs] def test_impact_distance(self): self.dataset = MockLSTDataset() assert np.all(utils.impact_distance_filter(self.dataset, **self.impact_distance_parameters) == self.impact_distance_true_mask)
[docs] def test_multiplicity(self): self.dataset = MockLSTDataset() assert np.all(utils.telescope_multiplicity_filter(self.dataset, **self.multiplicity_parameters) == self.multiplicity_true_mask)
[docs] def test_multiplicity_strict(self): self.dataset = MockLSTDataset() assert np.all(utils.telescope_multiplicity_filter(self.dataset, **self.multiplicity_strict_parameters) == self.multiplicity_strict_true_mask)
[docs] def test_energy(self): self.dataset_lst = MockLSTDataset() assert np.all(utils.energyband_filter(self.dataset_lst, **self.energy_parameters) == self.energy_true_mask)
[docs] def test_leakage(self): self.dataset = MockLSTDataset() assert np.all(utils.leakage_filter(self.dataset, **self.leakage_parameters) == self.leakage_true_mask)
[docs] def test_cleaning(self): self.dataset = MockLSTDataset() assert np.all(utils.cleaning_filter(self.dataset, **self.cleaning_filter_parameters) == self.cleaning_true_mask)
[docs] def test_rotated_indices(self): pix_pos = np.transpose([[0, 1, 0, -1], [1, 0, -1, 0]]) np.testing.assert_array_equal(utils.rotated_indices(pix_pos, 0), [0, 1, 2, 3]) np.testing.assert_array_equal(utils.rotated_indices(pix_pos, np.pi/2), [1, 2, 3, 0]) np.testing.assert_array_equal(utils.rotated_indices(pix_pos, -np.pi / 2), [3, 0, 1, 2])
[docs] def test_nets_definition_path(self): path = utils.nets_definition_path() assert os.path.exists(path)
[docs] def test_inject_geometry_into_parameters(self): self.net_parameters_dic = utils.inject_geometry_into_parameters(self.net_parameters_dic, 'LSTCam_geometry') assert self.net_parameters_dic['parameters']['backbone']['parameters']['camera_geometry'] == 'LSTCam_geometry'
[docs]class TestIndexMatrix(unittest.TestCase):
[docs] def setUp(self): lstcam_geom = CameraGeometry.from_name('LSTCam-002') self.minihex = deepcopy(lstcam_geom) self.minihex.camera_name = 'minihex'
[docs] def test_get_index_matrix_from_geom_7(self): """ Test the converter with a simple 7 pixels geometry Hexa: ``` 0 1 2 3 4 5 6 ``` Square: ``` 0 1 -1 2 3 4 -1 5 6 ``` """ minihex = self.minihex # make a test geometry with 7 pixels minihex.n_pixels = 7 short_dist = np.sqrt(3) / 2 minihex.pix_x = 0.05 * np.array([-0.5, 0.5, -1, 0, 1, -0.5, 0.5]) * u.m minihex.pix_y = 0.05 * np.array([short_dist, short_dist, 0, 0, 0, -short_dist, -short_dist]) * u.m minihex.pix_rotation *= 0 minihex.neighbors = minihex.neighbors[:7] minihex.pix_id = minihex.pix_id[:7] minihex.pixel_width = minihex.pixel_width[:7] # fix the neighbors minihex.neighbors = [ [1, 2, 3], [0, 3, 4], [0, 3, 5], [0, 1, 2, 4, 5, 6], [1, 3, 6], [2, 3, 6], [4, 3, 5] ] idx_mat = utils.get_index_matrix_from_geom(minihex) np.testing.assert_array_equal(idx_mat, [[ 0, 1, -1], [ 2, 3, 4], [-1, 5, 6]])
[docs] def test_get_index_matrix_from_geom_19(self): """ Test the converter with a simple 19 pixels geometry Hexa: ``` 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 ``` Square: ``` 0 1 2 -1 -1 3 4 5 6 -1 7 8 9 10 11 -1 12 13 14 15 -1 -1 16 17 18 ``` """ minihex = self.minihex minihex.n_pixels = 19 short_dist = np.sqrt(3) / 2 minihex.pix_x = 0.05 * np.array([-1, 0, 1, -1.5, -0.5, 0.5, 1.5, -2, -1, 0, 1, 2, -1.5, -0.5, 0.5, 1.5, -1, 0, 1, ]) * u.m minihex.pix_y = 0.05 * np.array([2 * short_dist, 2 * short_dist, 2 * short_dist, short_dist, short_dist, short_dist, short_dist, 0, 0, 0, 0, 0, -short_dist, -short_dist, -short_dist, -short_dist, -2 * short_dist, -2 * short_dist, -2 * short_dist, ]) * u.m minihex.pix_rotation *= 0 minihex.neighbors = minihex.neighbors[:minihex.n_pixels] minihex.pix_id = minihex.pix_id[:minihex.n_pixels] minihex.pixel_width = minihex.pixel_width[:minihex.n_pixels] # fix the neighbors minihex.neighbors = [ [1, 3, 4], [0, 4, 5, 2], [1, 5, 6], [0, 4, 7, 8], [0, 1, 5, 9, 8, 3], [1, 2, 6, 10, 9, 4], [2, 5, 10, 11], [7, 8, 12], [3, 4, 9, 13, 12, 7], [4, 5, 10, 14, 13, 8], [5, 6, 11, 15, 14, 9], [6, 10, 15], [7, 8, 13, 16], [8, 9, 14, 17, 16, 12], [9, 10, 15, 18, 17, 13], [10, 11, 14, 18], [12, 13, 17], [13, 14, 16, 18], [14, 15, 17], ] idx_mat = utils.get_index_matrix_from_geom(minihex) np.testing.assert_array_equal(idx_mat, [[ 0, 1, 2, -1, -1], [ 3, 4, 5, 6, -1], [ 7, 8, 9, 10, 11], [-1, 12, 13, 14, 15], [-1, -1, 16, 17, 18]])
[docs] def test_compare_indexedconv_method(self): """ Test that the new method gives the same result as the previous one for the LSTCam geometry """ import indexedconv.utils as cvutils from gammalearn.utils import load_camera_parameters lst_params = load_camera_parameters('LST_LSTCam') idx_mat = cvutils.create_index_matrix(lst_params['nbRow'], lst_params['nbCol'], lst_params['injTable']) lst_geom = CameraGeometry.from_name('LSTCam') new_idx_mat, camera_layout = utils.get_camera_layout_from_geom(lst_geom) assert camera_layout == 'Hex' np.testing.assert_array_equal(idx_mat.numpy(), new_idx_mat.numpy())
[docs]class TestTransformerUtils(unittest.TestCase):
[docs] def test_patches_and_centroids_LSTCam(self): geom_LSTCam = CameraGeometry.from_name('LSTCam') patch_indices_LSTCam, patch_centroids_LSTCam = utils.get_patch_indices_and_centroids_from_geometry(geom_LSTCam) utils.check_patches(patch_indices_LSTCam, patch_centroids_LSTCam, geom_LSTCam)
[docs] def test_patches_and_centroids_lstchain_07_MC(self): hdf5_file_path = os.path.join(THIS_DIR, '../../share/data/MC_data/dl1_gamma_example.h5') geom_table = Table.read(hdf5_file_path, path='/configuration/instrument/telescope/camera/geometry_LSTCam') geometry = CameraGeometry.from_table(geom_table) patch_indices, patch_centroids = utils.get_patch_indices_and_centroids_from_geometry(geometry) utils.check_patches(patch_indices, patch_centroids, geometry)
[docs] def test_patches_and_centroids_lstchain_07_real(self): hdf5_file_path = os.path.join(THIS_DIR, '../../share/data/real_data/dl1_realdata_example.h5') geom_table = Table.read(hdf5_file_path, path='/configuration/instrument/telescope/camera/geometry_LSTCam') geometry = CameraGeometry.from_table(geom_table) patch_indices, patch_centroids = utils.get_patch_indices_and_centroids_from_geometry(geometry) utils.check_patches(patch_indices, patch_centroids, geometry)
[docs] def test_2d_sincos_pos_embedding(self): geom_LSTCam = CameraGeometry.from_name('LSTCam') patch_indices_LSTCam, patch_centroids_LSTCam = utils.get_patch_indices_and_centroids_from_geometry(geom_LSTCam) pos_emb = utils.get_2d_sincos_pos_embedding_from_patch_centroids(patch_centroids_LSTCam, 1024, ['class']) assert pos_emb.shape == (266, 1024) assert torch.all(pos_emb[0] == 0) pos_emb = utils.get_2d_sincos_pos_embedding_from_patch_centroids(patch_centroids_LSTCam, 1024, ['class', 'energy']) assert pos_emb.shape == (267, 1024) assert torch.all(pos_emb[1] == 1)
[docs]class TestWrite(unittest.TestCase):
[docs] def test_write_dl2_dataframe(self): data = {'col1': [1, 2, 3], 'col2': [4.0, 5, 6]} df = pd.DataFrame(data) # Write the DataFrame to a temporary file and read back with tempfile.NamedTemporaryFile() as f: output_path = f.name utils.write_dl2_dataframe(df, output_path) df_read = pd.read_hdf(output_path, key=utils.dl2_params_lstcam_key) self.assertTrue(df.equals(df_read))
if __name__ == '__main__': unittest.main()