import math
import os
import sys
import numpy as np
import datetime
import json
import logging
from numba import jit
from abc import ABC, abstractmethod
from timeit import default_timer as timer
from gtsimulation.ElectricFields import GeneralFieldE
from gtsimulation.Global import Constants, Units, Regions, BreakCode, BreakIndex, SaveCode, SaveDef, BreakDef, vecRotMat
from gtsimulation.Interaction import NuclearInteraction, G4Decay, SynchCounter, RadLossStep
from gtsimulation.MagneticFields import AbsBfield
from gtsimulation.MagneticFields.Magnetosphere import Functions, Additions
from gtsimulation.Medium import GTGeneralMedium
from gtsimulation.Particle import ConvertT2R, GetAntiParticle, Flux
from gtsimulation.Particle.Generators import Distributions, Spectrums
from gtsimulation import functions
[docs]
class GTSimulator(ABC):
"""
Description
:param Region: The region of the in which the simulation is taken place. The parameter may have values as
`Global.Region.Magnetosphere`, `Global.Region.Heliosphere`, `Global.Region.Galaxy`.
See :py:mod:`Global.regions`. See also :py:mod:`Global.regions._AbsRegion.set_params` for additional
energy losses.
Example: one mau need to take into account the adiabatic energy losses in the heliosphere.
In that case they call `set_params(True)`.
:type Region: :py:mod:`Global.regions.Regions`
:param Bfield: The magentic field object
:type Bfield: :py:mod:`MagneticFields.magnetic_field.AbsBfield`
:param Efield: Electrical field. Similar to :ref:`Bfield`
:type Efield: None
:param Medium: The medium where particles may go into an interaction. See :py:mod:`Medium`.
:type Medium: :py:mod:`Medium.general_medium.GTGeneralMedium`
:param Date: Date that is used to initialize the fields
:type Date: datetime.datetime
:param RadLosses: a `bool` flag that turns the calculations of radiation losses of the particles on.
:type RadLosses: bool
# TODO add about the synchrotron emission
:param Particles: The parameter is responsible for the initial particle flux generation. It defines the initial
particle spectrum, distribution and chemical composition. See
:py:mod:`Particle.Generators.Spectrums` and :py:mod:`Particle.Generators.Distributions` for
available initial spectra and distributions respectively. For more information regarding flux
also see :py:mod:`Particle.Flux`.
:type Particles: :py:mod:`Particle.Flux`
:param TrackParams: a 'bool' flag that turns the calculations of additional parameters in given region.
If 'True' all the additional parameters will be calculated. If one needs to calculate specific parameter,
he may pass a 'dict' instead of 'bool' with the names of a needed parts and the values of 'True'.
Example: {'Invariants': True, 'GuidingCenter': True}.
See :py:mod:`Global.regions._AbsRegion.SaveAdd` for available parameters to a given region.
**Note**: to calculate some of the additional parameters others must be calculated as well.
:type TrackParams: bool or dict
:param ParticleOrigin: a 'bool' flag that turns the calculations of particle's origin through the backtracing
:type ParticleOrigin: bool
:param IsFirstRun:
:param ForwardTrck: 1 refers to forward tracing, and -1 to the backtracing
:type ForwardTrck: 1 or -1
:param Save: The number of steps that are saved. If the value is 0, then only staring
and finishing points are saved. The default parameters that are saved are `Coordinates` and
`Velocities`. Other parameters that one needs to save as well, can be turned on by passing a `list`
instead of `int` where the second element is a `dict` that's key is the parameter name and value is
`True`. Example `[10, {"Clock": True}]`. To available parameters are listed in
:py:mod:`Global.codes.SaveCode`.
:type Save: int or list
:param Num: The number of simulation steps
:type Num: int
:param Step: The time step of simulation in seconds. If `dict` one should pass:\n
1. `UseAdaptiveStep`: `True`/`False` --- whether to use adaptive time step\n
2. `InitialStep`: `float` --- The initial time step in seconds\n
3. `MinLarmorRad`: `int` --- The minimal number of points during on the Larmor radius\n
4. `MaxLarmorRad`: `int` --- The maximal number of points during on the Larmor radius\n
5. `LarmorRad`: `int` --- The fixed number of points, in case when the user needs to update time step during each step
:type Step: float or dict
:param Nfiles: Number of files if `int`, otherwise the `list` of file numbers (e.g. `Nfiles = [5, 10, 20]`, then
3 files are numerated as 5, 10, 20). If :ref:`Particles` creates a flux of `Nevents` particles then
the total number of particles that are going to be simulated is `Nevents`x`Nfiles`.
:type Nfiles: int or list
:param Output: If `None` no files are saved. Otherwise, the name of the saved *.npy* file. If :ref:`Nfiles` is
greater than 1. Then the names of the saved files will have the following form `"Output"_i.npy`
:type Output: str or None
:param Verbose: 0 - no output, 1 - short output, 2 - verbose output
:type Verbose: int
:param BreakCondition: If `None` no break conditions are applied. In case of a `dict` with a key corresponding to
the `BreakCondition` name and value corresponding to its value is passed.
Example: `{"Rmax": 10}`. In the example the maximal radius of the particle is 10
(in :py:mod:`MagneticFields` distance units). See the full list of break conditions
:py:mod:`Global.codes.BreakCode`.
If `list`, the first parameter is the `dict`, the second parameter describes the break
condition center, i.e. A 3d array of point (in :py:mod:`MagneticFields` distance units). It
represents the **(0, 0, 0)** relative to which `Rmax/Rmin, Xmax/Xmin, Ymax/Ymin, Zmax/Zmin`
are calculated. Default `np.array([0, 0, 0])`.
:type BreakCondition: dict or list or None
:param UseDecay: If `True` the particles may decay. Otherwise, not.
:type UseDecay: bool
:param InteractNUC:
# TODO describe the InteractNUC parameter
:return: dict
A dictionary is saved. It has the following keys.
1. Track: The parameters that are saved along the trajectory. See :py:mod:`Global.codes.SaveCode`.
2. BC: The parameters regarding the simulation end.
2.1. WOut: The code of break. See :py:mod:`Global.codes.BreakIndex`
3. Particle: Information about the particle
3.1. PDG: Its PDG code
3.2. M: The mass in MeV
3.3. Z: The charge of the particle in e units.
3.4. T0: Its initial kinetic energy in MeV
3.5. Gen: Its generation
4. Additions: Additional parameters calculated in defined region. See :py:mod:`Global.regions._AbsRegion.SaveAdd`
4.1. In magnetosphere:
Invariants:
I1: First adiabatic invariant along the trajectory of a particle
I2: Second adiabatic invariant between each pair of reflections at mirror points
PitchAngles:
Pitch: Pitch angles along the trajectory of a particle
PitchEq: Equatorial pitch angles
MirrorPoints:
NumMirror: Indexes of trajectory where particle turns to be at a mirror point
NumEqPitch: Indexes of trajectory where particle crosses the magnetic equator
NumB0: An array of trajectory points with the minimum value of the magnetic field strength
Hmirr: The value of the magnetic field at the mirror points
Heq: The value of the magnetic field at the magnetic equator
L-shell:
L: L-shell calculated on the basis of second invariant and the field at the mirror point
Lgen: L-shell calculated at every magnetic equator point
LonTotal: the angle of rotation of the particle around the Earth
GuidingCenter:
LR: Larmor radius of a particle
LRNit:
Rline: Coordinates of the field line of the guiding centre of the particle
Bline: The value of the magnetic field of the field line of the guiding centre of the particle
Req: Coordinates of the guiding centre of the particle calculated from the field line
Beq: The value of the magnetic field at the magnetic equator calculated from the field line
BB0: The ratio of the value of the magnetic field at the position of the guiding center corresponding to
the initial value of the coordinate and at the magnetic equator
L: L-shell calculated from the field line of the guiding centre
parReq: Coordinates of the guiding centre of the particle from the local field line
parBeq: The value of the magnetic field of the local field line of the particle
parBB0: The ratio of the value of the magnetic field at the position of
parL: L-shell calculated from the local field line
4.2. In heliosphere:
4.3. In galaxy:
5. Child: List of secondary particles. They have the same parameters.
"""
def __init__(
self,
Bfield: None | AbsBfield = None,
Efield: None | GeneralFieldE = None,
Region: Regions = Regions.Undefined,
Medium: None | GTGeneralMedium = None,
Date=datetime.datetime(2008, 1, 1),
RadLosses: bool | list = False,
Particles: None | Flux = None,
TrackParams=False,
ParticleOrigin=False,
IsFirstRun=True,
ForwardTrck=None,
Save: int | list = 1,
Num: int = 1e6,
Step: float = 1,
Nfiles=1,
Output=None,
Verbose: int = 1,
BreakCondition: None | dict = None,
UseDecay=False,
InteractNUC: None | NuclearInteraction = None,
):
self.ParamDict = locals().copy()
del self.ParamDict['self']
self.logger = logging.getLogger(__name__)
if Verbose < 1:
self.logger.setLevel(logging.WARNING)
elif Verbose == 1:
self.logger.setLevel(logging.INFO)
else:
self.logger.setLevel(logging.DEBUG)
if Verbose > 0 and not self.logger.hasHandlers():
h = logging.StreamHandler(stream=sys.stdout)
h.setLevel(self.logger.level)
h.setFormatter(logging.Formatter("%(message)s"))
self.logger.addHandler(h)
self.logger.propagate = False
self.logger.debug("Creating simulator object...")
self.Date = Date
self.logger.debug("Date: %s", self.Date)
self.StepParams = Step
self.Step = None
self.UseAdaptiveStep = False
self.__SetStep(Step)
self.Num = int(Num)
self.logger.debug("Number of steps: %d", self.Num)
self.__SetUseRadLosses(RadLosses)
self.Region = Region
self.logger.debug("Region: %s", self.Region.name)
self.logger.debug("%s", self.Region.value.ret_str())
self.ParticleOrigin = ParticleOrigin
self.ParticleOriginIsOn = bool(self.ParticleOrigin)
if self.ParticleOrigin:
TrackParams = True
self.TrackParamsIsOn = False
self.TrackParams = self.Region.value.SaveAdd
self.__SetAdditions(TrackParams, Save)
self.IsFirstRun = IsFirstRun
self.Nfiles = 1 if Nfiles is None or Nfiles == 0 else Nfiles
self.Output = Output
self.Npts = 2
self.Save = SaveDef.copy()
self.SaveCode = dict([(key, SaveCode[key][1]) for key in SaveCode.keys()])
self.SaveColumnLen = 22
self.logger.debug("Number of files: %s", self.Nfiles)
self.logger.debug("Output file name: %s_num.npy", self.Output)
if BreakCondition is not None and hasattr(BreakCondition, 'keys') and 'MaxRev' in BreakCondition.keys():
if not isinstance(Save, list):
Save = [Save, {'GuidingCenter': True, 'PitchAngles': True}]
else:
Save[1] = Save[1] | {'GuidingCenter': True, 'PitchAngles': True}
self.__SetSave(Save)
self.Efield = Efield
self.logger.debug("Electric field: %s", self.Efield)
self.Bfield = Bfield
self.logger.debug("Magnetic field: %s", self.Bfield)
self.Medium = Medium
self.logger.debug("Medium: %s", self.Medium)
self.UseDecay = UseDecay
self.logger.debug("Decay: %s", self.UseDecay)
if self.Medium is None and InteractNUC is not None:
raise ValueError('Nuclear Interaction is enabled but Medium is not set')
self.nuclear_interaction = InteractNUC
self.logger.debug("Nuclear Interactions: %s", self.nuclear_interaction)
self.__gen = 1
# self.UseDecay = False
# self.nuclear_interaction = None
# self.__set_nuclear_interaction(UseDecay, InteractNUC)
self.Particles = None
self.ForwardTracing = 1
self.__SetFlux(Particles, ForwardTrck)
self.__brck_index = BreakCode.copy()
self.__brck_index.pop("Loop")
self.__index_brck = BreakIndex.copy()
self.__brck_arr = BreakDef.copy()
self.__set_break_condition(BreakCondition)
self.index = 0
self.logger.debug("Simulator object created!\n")
def __SetStep(self, Step):
if isinstance(Step, (int, float)):
self.Step = Step
self.logger.debug("Time step: %s seconds", self.Step)
elif isinstance(Step, dict):
self.UseAdaptiveStep = Step.get("UseAdaptiveStep", False)
self.Step = Step.get("InitialStep", 1)
self.logger.debug("Using adaptive time step: %s", self.UseAdaptiveStep)
self.logger.debug("Initial time step: %f seconds", self.Step)
N = Step.get("LarmorRad", None)
if N is not None:
self.N1 = self.N2 = N
self.logger.debug("Steps per Larmor radius: %d", N)
else:
self.N1 = Step.get("MinLarmorRad", 600)
self.N2 = Step.get("MaxLarmorRad", 600)
self.logger.debug("Min steps per Larmor radius: %d", self.N1)
self.logger.debug("Max steps per Larmor radius: %d", self.N2)
assert isinstance(self.UseAdaptiveStep, bool)
assert isinstance(self.Step, (int, float))
assert isinstance(self.N1, int) and isinstance(self.N2, int)
assert self.N1 <= self.N2
else:
raise Exception("Step should be numeric or dict")
def __SetUseRadLosses(self, RadLosses):
if isinstance(RadLosses, bool):
self.UseRadLosses = [RadLosses, False]
if isinstance(RadLosses, list) and (RadLosses[0] == True) and (RadLosses[1]["Photons"] == False):
self.UseRadLosses = [True, False]
if isinstance(RadLosses, list) and (RadLosses[0] == True) and (RadLosses[1]["Photons"] == True):
MinMax = np.array([0, np.inf])
if "MinE" in RadLosses[1]:
if RadLosses[1]["MinE"] > 0:
MinMax[0] = RadLosses[1]["MinE"]
if "MaxE" in RadLosses[1]:
if RadLosses[1]["MaxE"] > 0:
MinMax[1] = RadLosses[1]["MaxE"]
self.UseRadLosses = [True, True, MinMax]
self.logger.debug("Radiation Losses: %s", self.UseRadLosses[0])
self.logger.debug("Synchrotron Emission: %s", self.UseRadLosses[1])
def __SetAdditions(self, TrackParams, Save):
# Change save settings due to dependencies
if isinstance(TrackParams, dict):
if "GuidingCenter" in TrackParams.keys() and TrackParams["GuidingCenter"]:
TrackParams["PitchAngles"] = True
if not isinstance(Save, list):
Save = [Save, {"GuidingCenter": True, "PitchAngles": True}]
else:
Save[1] = Save[1] | {"GuidingCenter": True, "PitchAngles": True}
if "Lshell" in TrackParams.keys() and TrackParams["Lshell"]:
TrackParams["Invariants"] = True
if "Invariants" in TrackParams.keys() and TrackParams["Invariants"]:
TrackParams["MirrorPoints"] = True
if "MirrorPoints" in TrackParams.keys() and TrackParams["MirrorPoints"]:
TrackParams["PitchAngles"] = True
if not isinstance(Save, list):
Save = [Save, {"PitchAngles": True}]
else:
Save[1] = Save[1] | {"PitchAngles": True}
if isinstance(TrackParams, bool):
self.TrackParamsIsOn = TrackParams
if self.TrackParamsIsOn:
self.TrackParams.update((key, True) for key in self.TrackParams)
elif isinstance(TrackParams, dict):
self.TrackParamsIsOn = True
for add in TrackParams.keys():
assert add in self.TrackParams.keys(), f'No such option as "{add}" is allowed'
self.TrackParams[add] = TrackParams[add]
if self.TrackParamsIsOn:
if not isinstance(Save, list):
Save = [Save, {"Bfield": True}]
else:
Save[1] = Save[1] | {"Bfield": True}
# def __set_nuclear_interaction(self, UseDecay, UseInteractNUC):
# self.UseDecay = UseDecay
# if self.Medium is None and UseInteractNUC is not None:
# raise ValueError('Nuclear Interaction is enabled but Medium is not set')
# self.nuclear_interaction = UseInteractNUC
# if self.nuclear_interaction is not None and 'l' in self.nuclear_interaction.get("ExcludeParticleList", []):
# self.nuclear_interaction['ExcludeParticleList'].extend([11, 12, 13, 14, 15, 16, 17, 18,
# -11, -12, -13, -14, -15, -16, -17, -18])
# self.logger.debug("Decay: %s", self.UseDecay)
# self.logger.debug("Nuclear Interactions: %s", self.nuclear_interaction)
def __set_break_condition(self, Brck):
center = np.array([0, 0, 0])
if Brck is not None:
if isinstance(Brck, list):
center = Brck[1]
assert isinstance(center, np.ndarray) and center.shape == (3,)
Brck = Brck[0]
assert isinstance(Brck, dict)
for key in Brck.keys():
self.__brck_arr[self.__brck_index[key]] = Brck[key]
self.logger.debug("Break Conditions:")
for key in self.__brck_index.keys():
self.logger.debug("\t%s: %s", key, self.__brck_arr[self.__brck_index[key]])
self.logger.debug("BC center: %s", center)
self.BCcenter = center
def __SetFlux(self, flux, forward_trck):
assert flux is not None
self.Particles = flux
self.logger.debug("Flux: %s", self.Particles)
if forward_trck is not None:
self.ForwardTracing = forward_trck
return
self.ForwardTracing = self.Particles.Mode.value
self.logger.debug("Tracing: %s", "Inward" if self.ForwardTracing == 1 else "Outward")
def __SetSave(self, Save):
Nsave = Save if not isinstance(Save, list) else Save[0]
self.Region.value.checkSave(self, Nsave)
self.Npts = math.ceil(self.Num / Nsave) if Nsave != 0 else 1
self.Nsave = Nsave
self.logger.debug("Save every %s step of:", self.Nsave)
if isinstance(Save, list):
for saves in Save[1].keys():
self.Save[saves] = Save[1][saves]
sorted_keys = sorted(SaveCode, key = lambda x: SaveCode[x][0])
for i, key in enumerate(sorted_keys):
if not self.Save[key]:
val = SaveCode[key][1]
num = 0
self.SaveCode[key] = None
if isinstance(val, int):
num = 1
elif isinstance(val, slice):
num = val.stop - val.start
self.SaveColumnLen -= num
for j in range(i + 1, len(sorted_keys)):
val_ = self.SaveCode[sorted_keys[j]]
if isinstance(val_, int):
val_ -= num
elif isinstance(val_, slice):
val_ = np.s_[val_.start - num:val_.stop - num:1]
self.SaveCode[sorted_keys[j]] = val_
for saves in self.Save.keys():
self.logger.debug("\t%s: %s", saves, self.Save[saves])
def __call__(self):
Track = []
self.logger.debug("Launching simulation...\n")
file_nums = np.arange(self.Nfiles) if isinstance(self.Nfiles, int) else self.Nfiles
for (idx, i) in enumerate(file_nums):
if self.IsFirstRun:
self.logger.info("File %d/%d started", idx + 1, len(file_nums))
self.logger.debug("")
if self.Output is not None:
file = self.Output.split(os.sep)
folder = os.sep.join(file[:-1])
if len(file) != 1 and not os.path.isdir(folder):
os.mkdir(folder)
def custom_serializer(obj):
if isinstance(obj, (AbsBfield, GTGeneralMedium)):
lines = [el.strip() for el in str(obj).strip().split('\n')]
return [lines[0], dict([el.split(': ') for el in lines[1:]])]
if isinstance(obj, Flux):
return dict([el.strip().split(': ') for el in str(obj).strip().split('\n')])
if isinstance(obj, Regions):
return str(obj)
if isinstance(obj, datetime.datetime):
return obj.isoformat()
raise TypeError(f"Type {type(obj)} not serializable")
def custom_serializer_safe(obj):
return str(obj)
with open(f'{self.Output}_params.json', 'w') as file:
try:
json.dump(self.ParamDict, file, default=custom_serializer, indent=4)
except:
json.dump(self.ParamDict, file, default=custom_serializer_safe, indent=4)
RetArr = self.CallOneFile()
if self.Output is not None:
if self.Nfiles == 1:
np.save(f"{self.Output}.npy", RetArr)
else:
np.save(f"{self.Output}_{i}.npy", RetArr)
self.logger.info("File %d/%d saved\n", idx + 1, len(file_nums))
RetArr.clear()
else:
Track.append(RetArr)
self.logger.info("Simulation completed!")
if self.Output is None:
return Track
[docs]
def CallOneFile(self):
self.Particles.generate()
RetArr = []
SaveR = self.Save["Coordinates"]
SaveV = self.Save["Velocities"]
SaveE = self.Save["Efield"]
SaveB = self.Save["Bfield"]
SaveA = self.Save["Angles"]
SaveP = self.Save["Path"]
SaveD = self.Save["Density"]
SaveC = self.Save["Clock"]
SaveT = self.Save["Energy"]
SavePA = self.Save["PitchAngles"]
SaveLR = self.Save["LarmorRadii"]
SaveGC = self.Save["GuidingCenter"]
Gen = self.__gen
GenMax = 1 if self.nuclear_interaction is None else self.nuclear_interaction.max_generations
UseAdditionalEnergyLosses = self.Region.value.CalcAdditional()
n_events = len(self.Particles)
progress_step = self.Num // 10
for self.index in range(n_events):
self.logger.debug("Event %d/%d started", self.index + 1, n_events)
TotTime, TotPathLen, TotPathDen = 0, 0, 0
if self.Medium is not None and self.nuclear_interaction is not None:
local_den, n_local, local_path_den = 0, 0, 0
local_chem_comp = np.zeros(len(self.Medium.get_element_list()))
local_path_den_vector = []
local_coordinate = []
local_velocity = []
lon_total, lon_prev, full_revolutions = np.array([[0.]]), np.array([[0.]]), np.array([[0.]])
particle = self.Particles[self.index]
Saves = []
BrckArr = self.__brck_arr
BCcenter = self.BCcenter
tau = particle.tau
rnd_dec = 0
self.IsPrimDeath = False
prod_tracks = []
if self.UseDecay:
rnd_dec = np.random.rand()
self.logger.debug("Use Decay: %s", self.UseDecay)
self.logger.debug("Decay rnd: %f", rnd_dec)
if self.ForwardTracing == -1:
self.logger.debug("Backtracing mode is ON")
self.logger.debug("Redefinition of particle to antiparticle")
GetAntiParticle(particle)
particle.velocities = -particle.velocities
Q = particle.Z * Constants.e
M = particle.M
m = M*Units.MeV2kg
T = particle.T
V_normalized = np.array(particle.velocities) # unit vector of velocity (beta vector)
V_norm = Constants.c * np.sqrt(particle.E ** 2 - M ** 2) / particle.E # scalar speed [m/s]
Vm = V_norm * V_normalized # vector of velocity [m/s]
r0 = np.array(particle.coordinates)
r = np.array(particle.coordinates)
r_old = r
B = np.array(self.Bfield.GetBfield(*r)) if self.Bfield is not None else np.zeros(3)
E = np.array(self.Efield.GetEfield(*r)) if self.Efield is not None else np.zeros(3)
Step = self.Step
if Q == 0:
Step *= 1e2
self.logger.debug(
"Particle: %s (M = %f [MeV/c2], Z = %d)",
particle.Name, M, self.Particles[self.index].Z
)
self.logger.debug(
"Energy: %f [MeV], Rigidity: %f [GV]",
T, ConvertT2R(T, M, particle.A, particle.Z) / 1000 if particle.Z != 0 else np.inf
)
self.logger.debug("Coordinates: %s [m]", r)
self.logger.debug("Velocity: %s", V_normalized)
self.logger.debug("Beta: %s", V_norm / Constants.c)
self.logger.debug("Beta * dt: %f [m]", V_norm * Step)
# Calculation of EAS for magnetosphere
self.Region.value.do_before_loop(self, Gen, prod_tracks)
q = Step * Q / 2 / m if M != 0 else 0
brk = BreakCode["Loop"]
Num = self.Num
Nsave = self.Nsave if self.Nsave != 0 else Num + 1
i_save = 0
st = timer()
if self.UseRadLosses[1]:
synch_record = SynchCounter()
else:
synch_record = 0
self.logger.debug("Calculating:")
PitchAngle = None
LarmorRadius = None
GuidingCenter = None
for i in range(Num):
if SavePA:
PitchAngle = functions.CalcPitchAngles(B, Vm)
if SaveLR:
LarmorRadius = functions.CalcLarmorRadii(np.linalg.norm(B), T, PitchAngle, M, particle.Z)
if SaveGC:
GuidingCenter = functions.CalcGuidingCenter(r, Vm, B, T, PitchAngle, M, particle.Z)
if i % Nsave == 0 or i == Num - 1 or i_save == 0:
self.SaveStep(
r_old, V_norm, TotPathLen, TotPathDen, TotTime, Vm, r, T, E, B,
PitchAngle, LarmorRadius, GuidingCenter, Saves, self.SaveColumnLen,
self.SaveCode["Coordinates"], self.SaveCode["Velocities"], self.SaveCode["Efield"],
self.SaveCode["Bfield"], self.SaveCode["Angles"], self.SaveCode["Path"],
self.SaveCode["Density"], self.SaveCode["Clock"], self.SaveCode["Energy"],
self.SaveCode["PitchAngles"], self.SaveCode["LarmorRadii"], self.SaveCode["GuidingCenter"],
SaveR, SaveV, SaveE, SaveB, SaveA, SaveP, SaveD, SaveC, SaveT, SavePA, SaveLR, SaveGC
)
i_save += 1
if self.UseAdaptiveStep:
Step = self.AdaptStep(Q, m, B, Vm, T, M, Step, self.N1, self.N2)
if i == 0:
self.Step = Step
q = Step * Q / 2 / m
PathLen = V_norm * Step
Vp, Yp, Ya = self.AlgoStep(T, M, q, Vm, r, B, E)
if self.UseRadLosses[1]:
synch_record.add_iteration(T, B, Vm, Step)
if self.UseRadLosses[0]:
Vm, T, new_photons, synch_record = RadLossStep.MakeRadLossStep(
Vp, Vm, Yp, Ya, M, Q, r, Step,
self.ForwardTracing, self.UseRadLosses[1:], particle, Gen, Constants, synch_record
)
prod_tracks.extend(new_photons)
elif M > 0:
T = M * (Yp - 1)
Vm = Vp
if UseAdditionalEnergyLosses:
Vm, T = self.Region.value.AdditionalEnergyLosses(r, Vm, T, M, Step, self.ForwardTracing,
Constants.c)
r_old = r
V_norm, r, TotPathLen, TotTime = self.Update(PathLen, Step, TotPathLen, TotTime, Vm, r)
# Medium
if self.Medium is not None:
self.Medium.calculate_model(*r)
Den = self.Medium.get_density() # kg/m3
PathDen = (Den * 1e-3) * (PathLen * 1e2) # g/cm2
TotPathDen += PathDen # g/cm2
if self.nuclear_interaction is not None and Den > 0:
local_den += Den
local_chem_comp += self.Medium.get_element_abundance()
n_local += 1
local_path_den += PathDen
local_path_den_vector.append(local_path_den)
local_coordinate.append(r)
local_velocity.append(Vm)
# Decay
if self.UseDecay and not self.IsPrimDeath:
lifetime = tau * (T / M + 1) if M > 0 else np.inf
if rnd_dec > np.exp(-TotTime / lifetime):
self.__Decay(Gen, GenMax, T, TotTime, V_norm, Vm, particle, prod_tracks, r)
self.IsPrimDeath = True
# Nuclear Interaction
check_interaction = (
self.nuclear_interaction is not None
and local_path_den > self.nuclear_interaction.grammage_threshold
and not self.IsPrimDeath
)
if check_interaction:
# Construct Rotation Matrix & Save velocity before possible interaction
rotationMatrix = vecRotMat(np.array([0, 0, 1]), Vm / V_norm)
primary, secondary = self.nuclear_interaction.run_matter_layer(
pdg=particle.PDG,
energy=T,
mass=local_path_den,
density=(local_den * 1e-3) / n_local,
element_name=self.Medium.get_element_list(),
element_abundance=local_chem_comp / n_local
)
T = primary['KineticEnergy']
if T > 0 and T > 1: # Cut particles with T < 1 MeV
# Only ionization losses
V_norm = Constants.c * np.sqrt(1 - (M / (T + M)) ** 2)
Vm = V_norm * rotationMatrix @ primary['MomentumDirection']
local_den, n_local, local_path_den = 0, 0, 0
local_chem_comp = np.zeros(len(self.Medium.get_element_list()))
local_path_den_vector.clear()
local_coordinate.clear()
local_velocity.clear()
else:
# Death due to ionization losses or nuclear interaction
self.IsPrimDeath = True
if secondary.size > 0 and Gen < GenMax:
self.logger.debug(
"Nuclear interaction %s: %d secondaries, total energy %f MeV",
primary["LastProcess"], secondary.size, np.sum(secondary["KineticEnergy"]),
)
self.logger.debug("%s", secondary)
# Coordinates of interaction point in XYZ
local_path_den_vector = np.array(local_path_den_vector)
path_den_cylinder = (np.linalg.norm(primary['Position']) * 1e2) * (local_den * 1e-3 / n_local) # Path in cylinder [g/cm2]
r_interaction = np.array(local_coordinate)[np.argmax(local_path_den_vector > path_den_cylinder), :]
v_interaction = np.array(local_velocity)[np.argmax(local_path_den_vector > path_den_cylinder), :]
rotationMatrix = vecRotMat(np.array([0, 0, 1]), v_interaction / np.linalg.norm(v_interaction))
for p in secondary:
V_p = rotationMatrix @ p['MomentumDirection']
T_p = p['KineticEnergy']
PDGcode_p = p["PDGcode"]
# Parameters for recursive call of GT
params = self.ParamDict.copy()
params["Date"] += datetime.timedelta(seconds=TotTime)
params["Particles"] = Flux(
Distribution=Distributions.UserInput(R0=r_interaction, V0=V_p),
Spectrum=Spectrums.UserInput(energy=T_p),
PDGcode=PDGcode_p
)
# if (PDGcode_p in self.nuclear_interaction.get("ExcludeParticleList", [])
# or T_p < self.nuclear_interaction.get("Emin", 0)):
# params["Num"] = 1
# params["UseDecay"] = False
# params["InteractNUC"] = None
if PDGcode_p in [12, 14, 16, 18, -12, -14, -16, -18]:
params["Medium"] = None
params["InteractNUC"] = None
new_process = self.__class__(**params)
new_process.__gen = Gen + 1
prod_tracks.append(new_process.CallOneFile()[0])
B = np.array(self.Bfield.GetBfield(*r)) if self.Bfield is not None else np.zeros(3)
E = np.array(self.Efield.GetEfield(*r)) if self.Efield is not None else np.zeros(3)
# TODO the code is region specific
# Full revolution
if self.Region == Regions.Magnetosphere:
if self.ParticleOriginIsOn or self.__brck_arr[self.__brck_index["MaxRev"]] != BreakDef[-1]:
a_, b_, _ = Functions.transformations.geo2mag_eccentric(GuidingCenter[0][0],
GuidingCenter[0][1],
GuidingCenter[0][2],
1,
self.Bfield.g,
self.Bfield.h)
lon_total, lon_prev, full_revolutions = Additions.AddLon(lon_total, lon_prev, full_revolutions,
i, a_, b_)
brck = self.CheckBreak(r, r0, BCcenter, TotPathLen, TotTime, full_revolutions, BrckArr)
brk = brck[1]
if brck[0] or self.IsPrimDeath:
if SavePA:
PitchAngle = functions.CalcPitchAngles(B, Vm)
if SaveLR:
LarmorRadius = functions.CalcLarmorRadii(np.linalg.norm(B), T, PitchAngle, M, particle.Z)
if SaveGC:
GuidingCenter = functions.CalcGuidingCenter(r, Vm, B, T, PitchAngle, M, particle.Z)
if brk != -1:
self.SaveStep(
r_old, V_norm, TotPathLen, TotPathDen, TotTime, Vm, r, T, E, B, PitchAngle, LarmorRadius,
GuidingCenter, Saves, self.SaveColumnLen,
self.SaveCode["Coordinates"], self.SaveCode["Velocities"], self.SaveCode["Efield"],
self.SaveCode["Bfield"], self.SaveCode["Angles"], self.SaveCode["Path"],
self.SaveCode["Density"], self.SaveCode["Clock"], self.SaveCode["Energy"],
self.SaveCode["PitchAngles"], self.SaveCode["LarmorRadii"], self.SaveCode["GuidingCenter"],
SaveR, SaveV, SaveE, SaveB, SaveA, SaveP, SaveD, SaveC, SaveT, SavePA, SaveLR, SaveGC
)
i_save += 1
if self.IsPrimDeath:
brk = self.__brck_index["Death"]
self.logger.debug("### Break due to %s ###", self.__index_brck[brk])
break
if i % progress_step == 0:
self.logger.debug("\tProgress: %d%%", int(i / self.Num * 100))
self.logger.debug("\tProgress: 100%")
if self.IsFirstRun:
self.logger.info(
"Event %d/%d finished in %.3f seconds",
self.index + 1, n_events, timer() - st,
)
self.logger.debug("")
Saves = np.array(Saves)
track = {}
if SaveR:
track['Coordinates'] = Saves[:, self.SaveCode["Coordinates"]]
if SaveV:
track["Velocities"] = Saves[:, self.SaveCode["Velocities"]]
if SaveE:
track["Efield"] = Saves[:, self.SaveCode["Efield"]]
if SaveB:
track["Bfield"] = Saves[:, self.SaveCode["Bfield"]]
if SaveA:
track["Angles"] = Saves[:, self.SaveCode["Angles"]]
if SaveP:
track["Path"] = Saves[:, self.SaveCode["Path"]]
if SaveC:
track["Clock"] = Saves[:, self.SaveCode["Clock"]]
if SaveT:
track["Energy"] = Saves[:, self.SaveCode["Energy"]]
if SaveD:
track["Density"] = Saves[:, self.SaveCode["Density"]]
if SavePA:
track["PitchAngles"] = Saves[:, self.SaveCode["PitchAngles"]]
if SaveLR:
track["LarmorRadii"] = Saves[:, self.SaveCode["LarmorRadii"]]
if SaveGC:
track["GuidingCenter"] = Saves[:, self.SaveCode["GuidingCenter"]]
RetArr.append({"Track": track,
"BC": {"WOut": brk},
"Particle": {"PDG": particle.PDG, "M": M, "Ze": particle.Z, "Gen": Gen,
"R0": particle.coordinates, "V0": particle.velocities, "T0": particle.T},
"Child": prod_tracks})
# TODO refactor
if self.Region == Regions.Magnetosphere:
# Particles in magnetosphere (Part 1)
if self.TrackParamsIsOn:
self.logger.debug("Calculating additional parameters ...")
TrackParams_i = Additions.GetTrackParams(self, RetArr[self.index])
if self.__brck_arr[self.__brck_index["MaxRev"]] != BreakDef[-1]:
TrackParams_i["LonTotal"] = lon_total
RetArr[self.index]["Additions"] = TrackParams_i
# Particles in magnetosphere (Part 2)
if self.ParticleOriginIsOn and self.IsFirstRun:
self.logger.debug("Finding particle origin ...")
origin = Additions.FindParticleOrigin(self, RetArr[self.index])
RetArr[self.index]["Additions"]["ParticleOrigin"] = origin
self.logger.debug("Particle origin: %s", origin.name)
return RetArr
def __Decay(self, Gen, GenMax, T, TotTime, V_norm, Vm, particle, prod_tracks, r):
if Gen < GenMax:
secondary = G4Decay(particle.PDG, T)
rotationMatrix = vecRotMat(np.array([0, 0, 1]), Vm / V_norm)
for p in secondary:
V_p = rotationMatrix @ p['MomentumDirection']
r_p = r
T_p = p['KineticEnergy']
PDGcode_p = p["PDGcode"]
params = self.ParamDict.copy()
params["Particles"] = Flux(
Distribution=Distributions.UserInput(R0=r_p, V0=V_p),
Spectrum=Spectrums.UserInput(energy=T_p),
PDGcode=PDGcode_p
)
params["Date"] = params["Date"] + datetime.timedelta(seconds=TotTime)
if PDGcode_p in [12, 14, 16, 18, -12, -14, -16, -18]:
params["Medium"] = None
params["InteractNUC"] = None
new_process = self.__class__(**params)
new_process.__gen = Gen + 1
prod_tracks.append(new_process.CallOneFile()[0])
[docs]
@staticmethod
@jit(fastmath=True, nopython=True)
def CheckBreak(r, r0, center, TotPath, TotTime, full_revolutions, Brck):
radi = np.linalg.norm(r - center)
dst2path = np.linalg.norm(r - r0) / TotPath
cond = np.concatenate((np.array([*np.abs(r), radi, dst2path]) < Brck[:5],
np.array([*np.abs(r), radi, TotPath, TotTime]) > Brck[5:-1],
np.array([full_revolutions[0][0]]) >= Brck[-1]))
if np.any(cond):
return True, np.where(cond)[0][0]
return False, -1
[docs]
@staticmethod
@jit(fastmath=True, nopython=True)
def Update(PathLen, Step, TotPathLen, TotTime, Vm, r):
V_norm = np.linalg.norm(Vm)
r_new = r + Vm * Step
TotTime += Step
TotPathLen += PathLen
return V_norm, r_new, TotPathLen, TotTime
[docs]
@staticmethod
# @jit(fastmath=True, nopython=True)
def SaveStep(r_old, V_norm, TotPathLen, TotPathDen, TotTime, Vm, r, T, E, B,
PitchAngles, LarmorRadii, GuidingCenter,
Saves, ColLen,
RCode, VCode, ECode, BCode, ACode, PCode, DCode, CCode, TCode, PACode, LRCode, GCCode,
SaveR, SaveV, SaveE, SaveB, SaveA, SaveP, SaveD, SaveC, SaveT, SavePA, SaveLR, SaveGC):
sv = np.zeros(ColLen)
if SaveR:
sv[RCode] = r
if SaveV:
sv[VCode] = Vm / V_norm
if SaveE:
sv[ECode] = E
if SaveB:
sv[BCode] = B
if SaveA:
sv[ACode] = np.arctan2(np.linalg.norm(np.cross(r_old, r)), np.dot(r, r_old))
if SaveP:
sv[PCode] = TotPathLen
if SaveD:
sv[DCode] = TotPathDen
if SaveC:
sv[CCode] = TotTime
if SaveT:
sv[TCode] = T
if SavePA:
sv[PACode] = PitchAngles
if SaveLR:
sv[LRCode] = LarmorRadii
if SaveGC:
sv[GCCode] = GuidingCenter
Saves.append(sv)
[docs]
@staticmethod
@jit(nopython=True, fastmath=True)
def RadLossStep(Vp, Vm, Yp, Ya, M, Q, UseRadLosses, Step, ForwardTracing, c, e):
if not UseRadLosses:
T = M * (Yp - 1)
return Vp, T
acc = (Vp - Vm) / Step
Vn = np.linalg.norm(Vp + Vm)
Vinter = (Vp + Vm) / Vn
acc_par = np.dot(acc, Vinter)
acc_per = np.sqrt(np.linalg.norm(acc) ** 2 - acc_par ** 2)
dE = Step * ((2 / (3 * 4 * np.pi * 8.854187e-12) * Q ** 2 * Ya ** 4 / c ** 3) *
(acc_per ** 2 + acc_par ** 2 * Ya ** 2) / e / 1e6)
T = M * (Yp - 1) - ForwardTracing * np.abs(dE)
V = c * np.sqrt((T + M) ** 2 - M ** 2) / (T + M)
Vn = np.linalg.norm(Vp)
Vm = V * Vp / Vn
return Vm, T
[docs]
@staticmethod
@jit(nopython=True, fastmath=True)
def AdaptStep(q, m, B, V, T, M, dt, N1, N2):
Y = T / M + 1
B_n = np.linalg.norm(B)
cos_theta = B @ V / (np.linalg.norm(V) * B_n)
sin_theta = np.sqrt(1 - cos_theta ** 2)
T = Y * m * sin_theta / (np.abs(q) * B_n)
if N1 <= T / dt <= N2:
return dt
return T / np.sqrt(N1*N2)
[docs]
@abstractmethod
def AlgoStep(self, T, M, q, Vm, r, H, E):
pass