Source code for openpmd_viewer.openpmd_timeseries.particle_tracker

"""
This file is part of the openPMD-viewer.

It defines the ParticleTracker class.

Copyright 2015-2016, openPMD-viewer contributors
Authors: Remi Lehe
License: 3-Clause-BSD-LBNL
"""
import numpy as np
from .numba_wrapper import jit

[docs] class ParticleTracker( object ): """ Class that allows to select particles at a given iteration (by initializing an instance of this class) and then to return the same particles at another iteration (by passing this instance as the argument `select` of the method `get_particle` of an `OpenPMDTimeSeries`) Usage ----- Here is a minimal example of how this class is used. In this example, all the particles in the simulation box at iteration 300 are selected, and then the position of the same particles at iteration 400 (or at least of those particles that remained in the simulation box at iteration 400) are returned. ``` >>> ts = OpenPMDTimeSeries('./hdf5_directory/') >>> pt = ParticleTracker( ts, iteration=300 ) >>> x, = ts.get_particle( ['x'], select=pt, iteration=400 ) ``` For more details on the API of ParticleTracker, see the docstring of the `__init__` method of this class. Note ---- `ParticleTracker` requires the `id` of the particles to be stored in the openPMD files. """ def __init__(self, ts, species=None, t=None, iteration=None, select=None, preserve_particle_index=False): """ Initialize an instance of `ParticleTracker`: select particles at a given iteration, so that they can be retrieved at a later iteration. Parameters ---------- ts: an OpenPMDTimeSeries object Contains the data on the particles species: string A string indicating the name of the species This is optional if there is only one species t : float (in seconds), optional Time at which to obtain the data (if this does not correspond to an existing iteration, the closest existing iteration will be used) Either `t` or `iteration` should be given by the user. iteration : int The iteration at which to obtain the data Either `t` or `iteration` should be given by the user. select: dict or 1darray of int, optional Either None or a dictionary of rules to select the particles, of the form 'x' : [-4., 10.] (Particles having x between -4 and 10) 'ux' : [-0.1, 0.1] (Particles having ux between -0.1 and 0.1 mc) 'uz' : [5., None] (Particles with uz above 5 mc). Can also be a 1d array of interegers corresponding to the selected particles `id` preserve_particle_index: bool, optional When retrieving particles at a several iterations, (for instance, with: ``` >>> x1, = ts.get_particle( ['x'], select=pt, iteration=400 ) >>> x2, = ts.get_particle( ['x'], select=pt, iteration=500 ) ``` it is sometimes important that the same individual particle has the same index in the array `x1` and `x2`. Using `preserve_particle_index=True` ensures that this is the case. However, this means that, for a particle that becomes absent at a later iteration, its index in the array has to be filled also. In this case, a NaN is returned at the index of this particle. When `preserve_particle_index=False`, no NaN is returned (the returned array is simply smaller when particles are absent) but then it is not garanteed that a given particle keeps the same index """ # Extract or load the particle id and sort them if (type(select) is dict) or (select is None): self.selected_pid, = ts.get_particle(['id'], species=species, select=select, t=t, iteration=iteration) elif (type(select) is np.ndarray): self.selected_pid = select self.selected_pid.sort() # Register a few metadata self.N_selected = len( self.selected_pid ) self.species = species self.preserve_particle_index = preserve_particle_index def extract_tracked_particles( self, iteration, data_reader, data_list, species, extensions ): """ Select the elements of each particle quantities in data_list, so as to only return those that correspond to the tracked particles Parameters ---------- iteration: int The iteration at which to extract the particles data_reader: a DataReader object Used in order to extract the macroparticle IDs data_list: list of 1darrays A list of arrays with one element per macroparticle, that represent different particle quantities species: string Name of the species being requested extensions: list of strings The extensions that the current OpenPMDTimeSeries complies with Returns ------- A list of 1darrays that correspond to data_list, but where only the particles that are tracked are kept. (NaNs may or may not be returned depending on whether `preserve_particle_index` was chosen at initialization) """ # Extract the particle id, and get the extraction indices pid = data_reader.read_species_data(iteration, species, 'id', extensions) selected_indices = self.get_extraction_indices( pid ) # For each particle quantity, select only the tracked particles for i in range(len(data_list)): if len(data_list[i]) > 1: # Do not apply selection on scalars data_list[i] = self.extract_quantity( data_list[i], selected_indices ) return( data_list ) def extract_quantity( self, q, selected_indices ): """ Select the elements of the array `q`, so as to only return those that correspond to the tracked particles. Parameters ---------- q: 1d array of floats or ints A particle quantity (one element per particle) selected_indices: 1d array of ints The indices (in array q) of the particles to be selected. If `preserve_particle_index` was selected to be True, this array contains -1 at the position of particles that are no longer present Returns ------- selected_q: 1d array of floats or ints A particle quantity (one element per particles) where only the tracked particles are kept """ # Extract the selected elements selected_q = q[ selected_indices ] # Handle the absent particles if self.preserve_particle_index: if q.dtype in [ np.float64, np.float32 ]: # Fill the position of absent particles by NaNs selected_q = np.where( selected_indices == -1, np.nan, selected_q) else: # The only non-float quantity in openPMD-viewer is particle id selected_q = self.selected_pid return( selected_q ) def get_extraction_indices( self, pid ): """ For each tracked particle (i.e. for each element of self.selected_pid) find the index of the same particle in the array `pid` Return these indices in an array, so that it can then be used to extract quantities (position, momentum, etc.) for the tracked particles Parameters ---------- pid: 1darray of ints The id of each particle (one element per particle) Returns ------- selected_indices: 1d array of ints The index of the tracked particles in the array `pid` If `preserve_particle_index` was selected to be True, this array contains -1 at the position of particles that are no longer present Note on the implementation -------------------------- This could be implemented in brute force (i.e. for each element of `self.selected_pid`, search the entire array `pid` for the same element), but would be very costly. Instead, we sort the array `pid` (and keep track of the original pid, so as to be able to retrieve the indices) and use the fact that it is sorted in order to rapidly extract the elements that are also in `self.selected_pid` (which is also sorted) """ # Sort the pid, and keep track of the original index # at which each pid was original_indices = pid.argsort().astype(np.int64) sorted_pid = pid[ original_indices ] # Extract only the indices for which sorted_pid is one of pid # in self.sselected_pid (i.e. which correpond to one # of the original particles) selected_indices = np.empty( self.N_selected, dtype=np.int64 ) N_extracted = extract_indices( original_indices, selected_indices, sorted_pid, self.selected_pid, self.preserve_particle_index ) # If there are less particles then self.N_selected # (i.e. not all the pid in self.selected_pid were in sorted_pid) if N_extracted < self.N_selected: if self.preserve_particle_index: # Finish filling the array with absent particles selected_indices[N_extracted:] = -1 else: # Resize the array selected_indices = \ selected_indices[:N_extracted] return( selected_indices )
@jit def extract_indices( original_indices, selected_indices, pid, selected_pid, preserve_particle_index ): """ Go through the sorted arrays `pid` and `selected_pid`, and record the indices (of the array `pid`) where they match, by storing them in the array `selected_indices` (this array is thus modified in-place) Return the number of elements that were filled in `selected_indices` """ i = 0 i_select = 0 i_fill = 0 N = len(pid) N_selected = len(selected_pid) # Go through both sorted arrays (pid and selected_pid) and match them. # i.e. whenever the same number appears in both arrays, # record the corresponding original index in selected_indices while i < N and i_select < N_selected: if pid[i] < selected_pid[i_select]: i += 1 elif pid[i] == selected_pid[i_select]: selected_indices[i_fill] = original_indices[i] i_fill += 1 i_select += 1 elif pid[i] > selected_pid[i_select]: i_select += 1 if preserve_particle_index: # Fill the index, to indicate that the particle is absent selected_indices[i_fill] = -1 i_fill += 1 return( i_fill )