from __future__ import annotations
from typing import Literal, Any
import pathlib as plib
import numpy as np
import pandas as pd
from scipy.signal import savgol_filter
from lmfit.models import GaussianModel, LinearModel
from tga_data_analysis.measure import Measure
from myfigure.myfigure import MyFigure, colors, linestyles
[docs]
class Project:
"""
Represents a project (identified by the folder where the data is stored)
for TGA data analysis.
"""
def __init__(
self,
folder_path: plib.Path | str,
name: str | None = None,
column_name_mapping: dict[str, str] | None = None,
load_skiprows: int = 0,
time_moist: float = 38.0,
time_vm: float = 147.0,
temp_initial_celsius: float = 40,
temp_lim_dtg_celsius: tuple[float] | None = None,
temp_unit: Literal["C", "K"] = "C",
plot_font: Literal["Dejavu Sans", "Times New Roman"] = "Dejavu Sans",
temp_i_temp_b_threshold: float = 0.01, # % of the peak that is used for Ti and Tb
soliddist_steps_min: list[int] | None = None,
resolution_sec_deg_dtg: int = 5,
dtg_window_filter: int | None = 51,
plot_grid: bool = False,
auto_save_reports: bool = True,
):
"""
Initialize a new Project instance with various parameters for analysis.
:param folder_path: The path to the folder containing the project data.
:type folder_path: plib.Path
:param name: The name of the project. Defaults to the last part of the folder path if None.
:type name: str, optional
:param temp_unit: The unit of temperature used in the project ('C':Celsius, 'K':Kelvin).
:type temp_unit: Literal["C", "K"]
:param plot_font: The font used in plots, either 'Dejavu Sans' or 'Times New Roman'.
:type plot_font: Literal["Dejavu Sans", "Times New Roman"]
:param temp_i_temp_b_threshold: The threshold for Ti and Tb calculation in DTG analysis.
:type temp_i_temp_b_threshold: float
:param soliddist_steps_min: Temperature steps (in minutes) at which the weight loss is calculated. If None, default steps are used.
:type soliddist_steps_min: list[float], optional
:param resolution_sec_deg_dtg: The resolution in seconds or degrees for DTG analysis.
:type resolution_sec_deg_dtg: int
:param dtg_window_filter: The window size for the Savitzky-Golay filter in DTG analysis.
:type dtg_window_filter: int
:param plot_grid: Whether to display a grid in the plots.
:type plot_grid: bool
:param column_name_mapping: Mapping of column names from file to standard names used in the analysis.
:type column_name_mapping: dict[str, str], optional
:param load_skiprows: The number of rows to skip when loading data files.
:type load_skiprows: int
:param time_moist: The time considered for the moisture analysis.
:type time_moist: float
:param time_vm: The time considered for the volatile matter analysis.
:type time_vm: float
:param temp_initial_celsius: The initial temperature for certain calculations, in Celsius.
:type temp_initial_celsius: float
:param temp_lim_dtg_celsius: The temperature limits for DTG analysis, in Celsius.
:type temp_lim_dtg_celsius: tuple[float], optional
:param auto_save_reports: Whether to automatically save generated reports.
:type auto_save_reports: bool
"""
self.folder_path = plib.Path(folder_path)
self.out_path = plib.Path(self.folder_path, "output")
if name is None:
self.name = self.folder_path.parts[-1]
else:
self.name = name
self.temp_unit = temp_unit
self.plot_font = plot_font
self.plot_grid = plot_grid
self.temp_i_temp_b_threshold = temp_i_temp_b_threshold
self.resolution_sec_deg_dtg = resolution_sec_deg_dtg
self.dtg_window_filter = dtg_window_filter
self.load_skiprows = load_skiprows
self.time_moist = time_moist
self.time_vm = time_vm
self.temp_initial_celsius = temp_initial_celsius
self.auto_save_reports = auto_save_reports
if self.temp_unit == "C":
self.temp_symbol = "°C"
elif self.temp_unit == "K":
self.temp_symbol = "K"
self.tg_label = "TG [wt%]"
self.dtg_label = "DTG [wt%/min]"
if temp_lim_dtg_celsius is None:
self.temp_lim_dtg_celsius = (120, 880)
else:
self.temp_lim_dtg_celsius = temp_lim_dtg_celsius
if self.temp_unit == "C":
self.temp_lim_dtg = self.temp_lim_dtg_celsius
elif self.temp_unit == "K":
self.temp_lim_dtg = [t + 273.15 for t in self.temp_lim_dtg_celsius]
else:
raise ValueError(f"{self.temp_unit = } is not acceptable")
if soliddist_steps_min is None:
self.soliddist_steps_min = [40, 70, 100, 130, 160, 190]
else:
self.soliddist_steps_min = soliddist_steps_min
if column_name_mapping is None:
self.column_name_mapping = {
"Time": "t_min",
"Temperature": "T_C",
"Weight": "m_p",
"Weight.1": "m_mg",
"Heat Flow": "heatflow_mW",
"##Temp./>C": "T_C",
"Time/min": "t_min",
"Mass/%": "m_p",
"Segment": "segment",
}
else:
self.column_name_mapping = column_name_mapping
#
self.samples: dict[str, Sample] = {}
self.samplenames: list[str] = []
self.multireports: dict[str, pd.DataFrame] = {}
self.multireport_types_computed: list[str] = []
[docs]
def add_sample(self, samplename: str, sample: Sample):
"""
Add a sample to the project.
:param samplename: The name of the sample to add.
:type samplename: str
:param sample: The sample object to add.
:type sample: Sample
"""
if samplename not in self.samplenames:
self.samplenames.append(samplename)
self.samples[samplename] = sample
else:
print(f"{samplename = } already present in project. Sample not added.")
[docs]
def multireport(
self,
samples: list[Sample] | None = None,
labels: list[str] | None = None,
report_type: Literal[
"proximate", "oxidation", "oxidation_extended", "soliddist", "soliddist_extended"
] = "proximate",
report_style: Literal["repl_ave_std", "ave_std", "ave_pm_std"] = "ave_std",
decimals_in_ave_pm_std: int = 2,
filename: str | None = None,
) -> pd.DataFrame:
"""
Generate a multi-sample report based on the specified report type and style.
:param samples: A list of Sample objects to include in the report. If None, uses all samples in the project.
:type samples: list[Sample], optional
:param labels: A list of labels corresponding to each sample. If None, sample names are used as labels.
:type labels: list[str], optional
:param report_type: The type of report to generate, choices include 'proximate', 'oxidation', 'oxidation_extended', 'soliddist', and 'soliddist_extended'.
:type report_type: Literal["proximate", "oxidation", "oxidation_extended", "soliddist", "soliddist_extended"]
:param report_style: The style of the report, choices are 'repl_ave_std', 'ave_std', and 'ave_pm_std'.
:type report_style: Literal["repl_ave_std", "ave_std", "ave_pm_std"]
:param decimals_in_ave_pm_std: The number of decimal places to use for the average plus-minus standard deviation format.
:type decimals_in_ave_pm_std: int
:param filename: The name of the file to save the report. If None, the report is not saved.
:type filename: str, optional
:return: A pandas DataFrame containing the generated report.
:rtype: pd.DataFrame
"""
if samples is None:
samples = list(self.samples.values())
samplenames = [sample.name for sample in samples]
if labels is None:
labels = samplenames
for sample in samples:
if report_type not in sample.report_types_computed:
sample.report(report_type)
if report_type == "soliddist":
reports = [sample.reports[report_type] for sample in samples]
reports = self._reformat_ave_std_columns(reports)
elif report_type == "soliddist_extended":
raise ValueError(
f"{report_type = } not allowed for multireport, use 'soliddist' instead"
)
else:
reports = [sample.reports[report_type] for sample in samples]
if report_style == "repl_ave_std":
# Concatenate all individual reports
report = pd.concat(reports, keys=labels)
report.index.names = [None, None] # Remove index names
elif report_style == "ave_std":
# Keep only the average and standard deviation
ave_std_dfs = []
for label, report in zip(labels, reports):
ave_std_dfs.append(report.loc[["ave", "std"]])
report = pd.concat(ave_std_dfs, keys=labels)
report.index.names = [None, None] # Remove index names
elif report_style == "ave_pm_std":
# Format as "ave ± std" and use sample name as the index
rows = []
for label, report in zip(labels, reports):
row = {
col: f"{report.at['ave', col]:.{decimals_in_ave_pm_std}f} ± {report.at['std', col]:.{decimals_in_ave_pm_std}f}"
for col in report.columns
}
rows.append(pd.Series(row, name=label))
report = pd.DataFrame(rows)
else:
raise ValueError(f"{report_style = } is not a valid option")
self.multireport_types_computed.append(report_type)
self.multireports[report_type] = report
if self.auto_save_reports:
out_path = plib.Path(self.out_path, "multireports")
out_path.mkdir(parents=True, exist_ok=True)
if filename is None:
filename = f"{self.name}_{report_type}_{report_style}.xlsx"
else:
filename = filename + ".xlsx"
report.to_excel(plib.Path(out_path, filename))
return report
[docs]
def plot_multireport(
self,
filename: str = "plot",
samples: list[Sample] | None = None,
labels: list[str] | None = None,
report_type: Literal["proximate", "oxidation", "soliddist"] = "proximate",
bar_labels: list[str] | None = None,
**kwargs,
) -> MyFigure:
"""
Generate a plot for the multi-sample report.
:param filename: The name of the file to save the plot. Defaults to "plot".
:type filename: str
:param samples: A list of Sample objects to include in the plot. If None, uses all samples in the project.
:type samples: list[Sample], optional
:param labels: A list of labels corresponding to each sample. If None, sample names are used as labels.
:type labels: list[str], optional
:param report_type: The type of report to plot, choices include 'proximate', 'oxidation', and 'soliddist'.
:type report_type: Literal["proximate", "oxidation", "soliddist"]
:param bar_labels: Labels for the bars in the plot. If None, default labels based on the report type are used.
:type bar_labels: list[str], optional
:param kwargs: Additional keyword arguments to pass to the plotting function.
:type kwargs: dict
:return: An instance of MyFigure containing the generated plot.
:rtype: MyFigure
"""
if samples is None:
samples = list(self.samples.values())
samplenames = [sample.name for sample in samples]
if labels is None:
labels = samplenames
df = self.multireport(samples, labels, report_type, report_style="ave_std")
df_ave = df.xs("ave", level=1, drop_level=False)
df_std = df.xs("std", level=1, drop_level=False)
# drop the multi-level index to simplify the DataFrame
df_ave = df_ave.droplevel(1)
df_std = df_std.droplevel(1)
if report_type == "proximate":
if bar_labels is None:
vars_bar = ["moisture (stb)", "VM (db)", "FC (db)", "ash (db)"]
else:
vars_bar = bar_labels
df_ave.columns = vars_bar
df_std.columns = vars_bar
bar_yaxis = vars_bar
bar_ytaxis = None
twinx = None
y_lab = self.tg_label
yt_lab = None
elif report_type == "oxidation":
if bar_labels is None:
vars_bar = ["T$_i$", "T$_p$", "T$_b$", "S"]
else:
vars_bar = bar_labels
df_ave.columns = vars_bar
df_std.columns = vars_bar
bar_yaxis = vars_bar[:3]
bar_ytaxis = vars_bar[-1]
twinx = True
y_lab = f"T [{self.temp_symbol}]"
yt_lab = "S (comb. index)"
elif report_type == "soliddist":
if bar_labels is None:
vars_bar = [f"{col.split(' ')[0]} {col.split(' ')[-1]}" for col in df_ave.columns]
else:
vars_bar = bar_labels
df_ave.columns = vars_bar
df_std.columns = vars_bar
bar_yaxis = vars_bar
bar_ytaxis = None
twinx = None
y_lab = "step mass loss [wt%]"
yt_lab = None
out_path = plib.Path(self.out_path, "multireport_plots")
out_path.mkdir(parents=True, exist_ok=True)
default_kwargs = {
"filename": filename + report_type,
"out_path": out_path,
"height": 3.2,
"width": 3.2,
"grid": self.plot_grid,
"text_font": self.plot_font,
}
# Update kwargs with the default key-value pairs if the key is not present in kwargs
kwargs = {**default_kwargs, **kwargs}
myfig = MyFigure(
rows=1,
cols=1,
twinx=twinx,
y_lab=y_lab,
yt_lab=yt_lab,
**kwargs,
)
df_ave[bar_yaxis].plot(
ax=myfig.axs[0],
kind="bar",
yerr=df_std[bar_yaxis],
capsize=2,
width=0.85,
ecolor="k",
edgecolor="black",
)
if bar_ytaxis is not None:
myfig.axts[0].scatter(
df_ave.index,
df_ave[bar_ytaxis],
label=bar_ytaxis,
edgecolor="black",
color=colors[3],
# s=100
)
myfig.axts[0].errorbar(
df_ave.index,
df_ave[bar_ytaxis],
yerr=df_std[bar_ytaxis],
ecolor="k",
linestyle="None",
capsize=2,
)
myfig.save_figure()
return myfig
[docs]
def plot_multi_tg(
self,
filename: str = "plot",
samples: list[Sample] | None = None,
labels: list[str] | None = None,
**kwargs,
) -> MyFigure:
"""
Plot multiple thermogravimetric (TG) curves for the given samples.
:param filename: The name of the file to save the plot. Defaults to "plot".
:type filename: str
:param samples: A list of Sample objects to be plotted. If None, plots all samples in the project.
:type samples: list[Sample], optional
:param labels: Labels for each sample in the plot. If None, sample names are used.
:type labels: list[str], optional
:param kwargs: Additional keyword arguments for plotting customization.
:type kwargs: dict
:return: A MyFigure instance containing the plot.
:rtype: MyFigure
"""
if samples is None:
samples = list(self.samples.values())
samplenames = [sample.name for sample in samples]
if labels is None:
labels = samplenames
for sample in samples:
if not sample.proximate_computed:
sample.proximate_analysis()
out_path = plib.Path(self.out_path, "multisample_plots")
out_path.mkdir(parents=True, exist_ok=True)
default_kwargs = {
"filename": filename + "_tg",
"out_path": out_path,
"height": 3.2,
"width": 3.2,
"grid": self.plot_grid,
"text_font": self.plot_font,
"x_lab": f"T [{self.temp_symbol}]",
"y_lab": self.tg_label,
}
# Update kwargs with the default key-value pairs if the key is not present in kwargs
kwargs = {**default_kwargs, **kwargs}
myfig = MyFigure(
rows=1,
cols=1,
**kwargs,
)
for i, sample in enumerate(samples):
myfig.axs[0].plot(
sample.temp.ave(),
sample.mp_db.ave(),
color=colors[i],
linestyle=linestyles[i],
label=labels[i],
)
myfig.axs[0].fill_between(
sample.temp.ave(),
sample.mp_db.ave() - sample.mp_db.std(),
sample.mp_db.ave() + sample.mp_db.std(),
color=colors[i],
alpha=0.3,
)
myfig.save_figure()
return myfig
[docs]
def plot_multi_dtg(
self,
filename: str = "plot",
samples: list[Sample] | None = None,
labels: list[str] | None = None,
**kwargs,
) -> MyFigure:
"""
Plot multiple derivative thermogravimetric (DTG) curves for the given samples.
:param filename: The name of the file to save the plot. Defaults to "plot".
:type filename: str
:param samples: A list of Sample objects to be plotted. If None, plots all samples in the project.
:type samples: list[Sample], optional
:param labels: Labels for each sample in the plot. If None, sample names are used.
:type labels: list[str], optional
:param kwargs: Additional keyword arguments for plotting customization.
:type kwargs: dict
:return: A MyFigure instance containing the plot.
:rtype: MyFigure
"""
if samples is None:
samples = list(self.samples.values())
samplenames = [sample.name for sample in samples]
if labels is None:
labels = samplenames
for sample in samples:
if not sample.dtg_computed:
sample.dtg_analysis()
out_path = plib.Path(self.out_path, "multisample_plots")
out_path.mkdir(parents=True, exist_ok=True)
default_kwargs = {
"filename": filename + "_dtg",
"out_path": out_path,
"height": 3.2,
"width": 3.2,
"grid": self.plot_grid,
"text_font": self.plot_font,
"x_lab": f"T [{self.temp_symbol}]",
"y_lab": self.dtg_label,
"x_lim": self.temp_lim_dtg,
}
# Update kwargs with the default key-value pairs if the key is not present in kwargs
kwargs = {**default_kwargs, **kwargs}
myfig = MyFigure(
rows=1,
cols=1,
**kwargs,
)
for i, sample in enumerate(samples):
myfig.axs[0].plot(
sample.temp_dtg.ave(),
sample.dtg_db.ave(),
color=colors[i],
linestyle=linestyles[i],
label=labels[i],
)
myfig.axs[0].fill_between(
sample.temp_dtg.ave(),
sample.dtg_db.ave() - sample.dtg_db.std(),
sample.dtg_db.ave() + sample.dtg_db.std(),
color=colors[i],
alpha=0.3,
)
myfig.save_figure()
return myfig
[docs]
def plot_multi_soliddist(
self,
filename: str = "plot",
samples: list[Sample] | None = None,
labels: list[str] | None = None,
cut_curves_at_last_step: bool = True,
**kwargs,
) -> MyFigure:
"""
Plot multiple solid distribution curves for the given samples.
:param filename: The name of the file to save the plot. Defaults to "plot".
:type filename: str
:param samples: A list of Sample objects to be plotted. If None, plots all samples in the project.
:type samples: list[Sample], optional
:param labels: Labels for each sample in the plot. If None, sample names are used.
:type labels: list[str], optional
:param cut_curves_at_last_step: whether to cut the dtg curves at the end of the last segment.
:type cut_curves_at_last_step: bool, optional
:param kwargs: Additional keyword arguments for plotting customization.
:type kwargs: dict
:return: A MyFigure instance containing the plot.
:rtype: MyFigure
"""
if samples is None:
samples = list(self.samples.values())
samplenames = [sample.name for sample in samples]
if labels is None:
labels = samplenames
for sample in samples:
if not sample.dtg_computed:
sample.dtg_analysis()
if cut_curves_at_last_step is True:
index_end = np.argmax(samples[0].time.ave() > samples[0].soliddist_steps_min[-1])
else:
index_end = -1
out_path = plib.Path(self.out_path, "multisample_plots")
out_path.mkdir(parents=True, exist_ok=True)
default_kwargs = {
"filename": filename + "_soliddist",
"out_path": out_path,
"height": 3.2,
"width": 3.2,
"grid": self.plot_grid,
"text_font": self.plot_font,
"x_lab": "time [min]",
"y_lab": self.tg_label,
"yt_lab": f"T [{self.temp_symbol}]",
"legend_loc": "center left",
}
# Update kwargs with the default key-value pairs if the key is not present in kwargs
kwargs = {**default_kwargs, **kwargs}
myfig = MyFigure(
rows=1,
twinx=True,
**kwargs,
)
myfig.axts[0].plot(
samples[0].time.ave()[0:index_end],
samples[0].temp.ave()[0:index_end],
color="k",
linestyle=linestyles[1],
label="T",
)
for i, sample in enumerate(samples):
myfig.axs[0].plot(
sample.time.ave()[0:index_end],
sample.mp_db.ave()[0:index_end],
color=colors[i],
linestyle=linestyles[i],
label=labels[i],
)
myfig.axs[0].fill_between(
sample.time.ave()[0:index_end],
sample.mp_db.ave()[0:index_end] - sample.mp_db.std()[0:index_end],
sample.mp_db.ave()[0:index_end] + sample.mp_db.std()[0:index_end],
color=colors[i],
alpha=0.3,
)
myfig.save_figure()
return myfig
def _reformat_ave_std_columns(self, reports):
"""
Reformat the columns of the given reports to have standard deviation and average values.
This method is intended to be used internally within the Project class to standardize
the report dataframes before generating multi-sample reports.
:param reports: A list of report DataFrames to reformat.
:type reports: list[pd.DataFrame]
:return: A list of reformatted DataFrames.
:rtype: list[pd.DataFrame]
"""
# Check that all reports have the same number of columns
num_columns = len(reports[0].columns)
if not all(len(report.columns) == num_columns for report in reports):
raise ValueError("All reports must have the same number of columns.")
# Initialize a list to hold the new formatted column names
formatted_column_names = []
# Iterate over each column index
for i in range(num_columns):
# Extract the numeric part of the column name (assume it ends with ' K' or ' C')
column_values = [float(report.columns[i].split()[0]) for report in reports]
ave = np.mean(column_values)
std = np.std(column_values)
# Determine the unit (assuming all columns have the same unit)
unit = reports[0].columns[i].split()[-1]
# Create the new column name with the unit
formatted_column_name = f"{ave:.0f} ± {std:.0f} {unit}"
formatted_column_names.append(formatted_column_name)
# Rename the columns in each report using the new formatted names
for report in reports:
report.columns = formatted_column_names
return reports
[docs]
class Sample:
"""
A class representing a sample in the project, containing methods for loading, processing,
and analyzing thermogravimetric analysis (TGA) data associated with the sample.
"""
def __init__(
self,
project: Project,
name: str,
filenames: list[str],
folder_path: plib.Path | None = None,
label: str | None = None,
correct_ash_mg: list[float] | None = None,
correct_ash_fr: list[float] | None = None,
column_name_mapping: dict[str:str] | None = None,
load_skiprows: int | None = None,
time_moist: float | None = None,
time_vm: float | None = None,
heating_rate_deg_min: float | None = None,
temp_i_temp_b_threshold: float | None = None,
soliddist_steps_min: list[float] | None = None,
):
"""
Initialize a new Sample instance with parameters for TGA data analysis.
:param project: The Project object to which this sample belongs.
:type project: Project
:param name: The name of the sample.
:type name: str
:param filenames: A list of filenames associated with the sample.
:type filenames: list[str]
:param folder_path: The path to the folder containing the sample data. If None, the project's folder path is used.
:type folder_path: plib.Path, optional
:param label: A label for the sample. If None, the sample's name is used as the label.
:type label: str, optional
:param correct_ash_mg: A list of ash correction values in milligrams, one per file.
:type correct_ash_mg: list[float], optional
:param correct_ash_fr: A list of ash correction values as a fraction, one per file.
:type correct_ash_fr: list[float], optional
:param column_name_mapping: A dictionary mapping file column names to standardized column names for analysis.
:type column_name_mapping: dict[str, str], optional
:param load_skiprows: The number of rows to skip at the beginning of the files when loading.
:type load_skiprows: int
:param time_moist: The time considered for the moisture analysis.
:type time_moist: float
:param time_vm: The time considered for the volatile matter analysis.
:type time_vm: float
:param heating_rate_deg_min: The heating rate in degrees per minute, used for certain calculations.
:type heating_rate_deg_min: float, optional
:param temp_i_temp_b_threshold: The threshold percentage used for calculating initial and final temperatures in DTG analysis.
:type temp_i_temp_b_threshold: float, optional
:param soliddist_steps_min: Temperature steps (in minutes) at which the weight loss is calculated. If None, default steps are used.
:type soliddist_steps_min: list[float], optional
"""
# store the sample in the project
self.project_name = project.name
project.add_sample(name, self)
# prject defaults unless specified
self.out_path = project.out_path
self.temp_unit = project.temp_unit
self.temp_symbol = project.temp_symbol
self.tg_label = project.tg_label
self.dtg_label = project.dtg_label
self.plot_font = project.plot_font
self.plot_grid = project.plot_grid
self.temp_lim_dtg = project.temp_lim_dtg
self.auto_save_reports = project.auto_save_reports
self.resolution_sec_deg_dtg = project.resolution_sec_deg_dtg
self.dtg_window_filter = project.dtg_window_filter
self.temp_initial_celsius = project.temp_initial_celsius
self.dtg_window_filter = project.dtg_window_filter
if folder_path is None:
self.folder_path = project.folder_path
else:
self.folder_path = folder_path
if column_name_mapping is None:
self.column_name_mapping = project.column_name_mapping
else:
self.column_name_mapping = column_name_mapping
if load_skiprows is None:
self.load_skiprows = project.load_skiprows
else:
self.load_skiprows = load_skiprows
if time_moist is None:
self.time_moist = project.time_moist
else:
self.time_moist = time_moist
if time_vm is None:
self.time_vm = project.time_vm
else:
self.time_vm = time_vm
if temp_i_temp_b_threshold is None:
self.temp_i_temp_b_threshold = project.temp_i_temp_b_threshold
else:
self.temp_i_temp_b_threshold = temp_i_temp_b_threshold
if soliddist_steps_min is None:
self.soliddist_steps_min = project.soliddist_steps_min
else:
self.soliddist_steps_min = soliddist_steps_min
# sample default
self.name = name
self.filenames = filenames
self.n_repl = len(self.filenames)
self.heating_rate_deg_min = heating_rate_deg_min
self.correct_ash_mg = self._broadcast_value_prop(correct_ash_mg)
self.correct_ash_fr = self._broadcast_value_prop(correct_ash_fr)
if not label:
self.label = name
else:
self.label = label
# for variables and computations
self.files: dict[str : pd.DataFrame] = {}
self.len_files: dict[str : pd.DataFrame] = {}
self.len_sample: int = 0
# proximate
self.temp: Measure = Measure(name="temp_" + self.temp_unit)
self.time: Measure = Measure(name="time")
self.m_ar: Measure = Measure(name="m_ar")
self.mp_ar: Measure = Measure(name="mp_ar")
self.idx_moist: Measure = Measure(name="idx_moist")
self.idx_vm: Measure = Measure(name="idx_vm")
self.moist_ar: Measure = Measure(name="moist_ar")
self.ash_ar: Measure = Measure(name="ash_ar")
self.fc_ar: Measure = Measure(name="fc_ar")
self.vm_ar: Measure = Measure(name="vm_ar")
self.mp_db: Measure = Measure(name="mp_db")
self.ash_db: Measure = Measure(name="ash_db")
self.fc_db: Measure = Measure(name="fc_db")
self.vm_db: Measure = Measure(name="vm_db")
self.mp_daf: Measure = Measure(name="mp_daf")
self.fc_daf: Measure = Measure(name="fc_daf")
self.vm_daf: Measure = Measure(name="vm_daf")
self.temp_dtg: Measure = Measure(name="temp_dtg" + self.temp_unit)
self.time_dtg: Measure = Measure(name="time_dtg")
self.mp_db_dtg: Measure = Measure(name="mp_db_dtg")
self.dtg_db: Measure = Measure(name="dtg_db")
self.ave_dev_tga_perc: float | None = None
# oxidation
self.temp_i_idx: Measure = Measure(name="temp_i_idx")
self.temp_i: Measure = Measure(name="temp_i_" + self.temp_unit)
self.temp_p_idx: Measure = Measure(name="temp_p_idx")
self.temp_p: Measure = Measure(name="temp_p_" + self.temp_unit)
self.temp_b_idx: Measure = Measure(name="temp_b_idx")
self.temp_b: Measure = Measure(name="temp_b_" + self.temp_unit)
self.dwdtemp_max: Measure = Measure(name="dwdtemp_max")
self.dwdtemp_mean: Measure = Measure(name="dwdtemp_mean")
self.s_combustion_index: Measure = Measure(name="s_combustion_index")
# soliddist
self.temp_soliddist: Measure = Measure(name="temp_dist_" + self.temp_unit)
self.time_soliddist: Measure = Measure(name="time_dist")
self.dmp_soliddist: Measure = Measure(name="dmp_dist")
self.loc_soliddist: Measure = Measure(name="loc_dist")
# deconvolution
self.dcv_best_fit: Measure = Measure(name="dcv_best_fit")
self.dcv_r2: Measure = Measure(name="dcv_r2")
self.dcv_peaks: list[Measure] = []
# Flag to track if data is loaded
self.proximate_computed = False
self.dtg_computed = False
self.files_loaded = False
self.oxidation_computed = False
self.soliddist_computed = False
self.deconv_computed = False
# for reports
self.reports: dict[str, pd.DataFrame] = {}
self.report_types_computed: list[str] = []
self.load_files()
def _broadcast_value_prop(self, prop: list | str | float | int | bool) -> list:
"""
Broadcast a single value or a list of values to match the number of replicates.
This method is used internally to ensure that properties like corrections have a value
for each replicate, even if a single value is provided for all.
:param prop: A single value or a list of values to be broadcasted.
:type prop: list | float | int | bool
:return: A list of values with length equal to the number of replicates.
:rtype: list
"""
if prop is None:
broad_prop = [None] * self.n_repl
if isinstance(prop, (list, tuple)):
# If it's a list or tuple, but we're not expecting pairs, it's a single value per axis.
if len(prop) == self.n_repl:
broad_prop = prop
else:
raise ValueError(
f"The size of the property '{prop}' does not match the number of replicates."
)
if isinstance(prop, (str, float, int, bool)):
broad_prop = [prop] * self.n_repl
return broad_prop
[docs]
def load_single_file(
self,
filename: str,
folder_path: plib.Path | None = None,
load_skiprows: int | None = None,
column_name_mapping: dict | None = None,
) -> pd.DataFrame:
"""
Load data from a single file associated with the sample.
:param filename: The name of the file to be loaded.
:type filename: str
:param folder_path: The folder path where the file is located. If None, uses the sample's folder path.
:type folder_path: plib.Path, optional
:param load_skiprows: The number of rows to skip at the beginning of the file. If None, uses the sample's default.
:type load_skiprows: int, optional
:param column_name_mapping: A mapping of file column names to standardized column names. If None, uses the sample's default.
:type column_name_mapping: dict, optional
:return: The loaded data as a pandas DataFrame.
:rtype: pd.DataFrame
"""
if column_name_mapping is None:
column_name_mapping = self.column_name_mapping
if folder_path is None:
folder_path = self.folder_path
if load_skiprows is None:
load_skiprows = self.load_skiprows
file_path = plib.Path(folder_path, filename + ".txt")
if not file_path.is_file():
file_path = plib.Path(folder_path, filename + ".csv")
file = pd.read_csv(file_path, sep="\t", skiprows=load_skiprows)
if file.shape[1] < 3:
file = pd.read_csv(file_path, sep=",", skiprows=load_skiprows)
file = file.rename(columns={col: column_name_mapping.get(col, col) for col in file.columns})
for column in file.columns:
file[column] = pd.to_numeric(file[column], errors="coerce")
file.dropna(inplace=True)
return file
[docs]
def correct_file_values(
self,
file: pd.DataFrame,
correct_ash_fr: float | None,
correct_ash_mg: float | None,
):
"""
Apply corrections to the loaded file data.
This method adjusts the mass measurements in the file based on provided ash correction values.
:param file: The data file to be corrected.
:type file: pd.DataFrame
:param correct_ash_mg: The correction value for ash in milligrams. If None, no correction is applied.
:type correct_ash_mg: float, optional
:param correct_ash_fr: The correction value for ash as a fraction. If None, no correction is applied.
:type correct_ash_fr: float, optional
:return: The corrected data as a pandas DataFrame.
:rtype: pd.DataFrame
"""
if "m_mg" not in file.columns:
file["m_mg"] = file["m_p"] # this avoids keyerrors
if correct_ash_mg is not None:
file["m_mg"] = file["m_mg"] - np.min(file["m_mg"]) + correct_ash_mg
try:
if file["m_mg"].iloc[-1] < 0:
print(
"neg. mass correction: Max [mg]",
np.round(np.max(file["m_mg"]), 3),
"; Min [mg]",
np.round(np.min(file["m_mg"]), 3),
)
file["m_mg"] = file["m_mg"] - np.min(file["m_mg"])
except KeyError:
file["m_mg"] = file["m_p"]
file["m_p"] = file["m_mg"] / np.max(file["m_mg"]) * 100
if correct_ash_fr is not None:
# set the ash value to zero
file["m_p"] = file["m_p"] - np.min(file["m_p"])
# shift the non-ash fraction up by enough to have ash_ar=correct_ash_fr
file["m_p"] = file["m_p"] + np.max(file["m_p"]) * (
correct_ash_fr / (1 - correct_ash_fr)
)
# scale everything to 100 %
file["m_p"] = file["m_p"] / np.max(file["m_p"]) * 100
if "T_C" not in file.columns and "T_K" in file.columns:
file["T_C"] = file["T_K"] - 273.15
else:
file["T_K"] = file["T_C"] + 273.15
file = file[file["T_C"] >= self.temp_initial_celsius].copy()
return file
[docs]
def load_files(self):
"""
Load all files associated with this sample, applying necessary corrections and adjustments.
This method loads and processes each file, ensuring consistent data structure and applying
corrections such as ash content adjustments.
:return: A dictionary where keys are filenames and values are the corresponding corrected data as pandas DataFrames.
:rtype: dict[str, pd.DataFrame]
"""
print("\n" + self.name)
# import files and makes sure that replicates have the same size
for f, filename in enumerate(self.filenames):
print(filename)
file_to_correct = self.load_single_file(filename)
file = self.correct_file_values(
file_to_correct, self.correct_ash_fr[f], self.correct_ash_fr[f]
)
# FILE CORRECTION
self.files[filename] = file
self.len_files[filename] = max(file.shape)
self.len_sample = min(self.len_files.values())
# keep the shortest vector size for all replicates, create the object
for filename in self.filenames:
self.files[filename] = self.files[filename].head(self.len_sample)
self.files_loaded = True # Flag to track if data is loaded
return self.files
[docs]
def proximate_analysis(self):
"""
Perform proximate analysis on the loaded data for the sample.
This analysis calculates moisture content, ash content, volatile matter, and fixed carbon
based on the thermogravimetric data. The results are stored in the instance's attributes for later use.
"""
if not self.files_loaded:
self.load_files()
for f, file in enumerate(self.files.values()):
if self.temp_unit == "C":
self.temp.add(f, file["T_C"])
elif self.temp_unit == "K":
self.temp.add(f, file["T_K"])
self.time.add(f, file["t_min"])
self.m_ar.add(f, file["m_mg"])
self.mp_ar.add(f, file["m_p"])
self.idx_moist.add(f, np.argmax(self.time.stk(f) > self.time_moist + 0.01))
self.moist_ar.add(f, 100 - self.mp_ar.stk(f)[self.idx_moist.stk(f)])
self.ash_ar.add(f, self.mp_ar.stk(f)[-1])
self.mp_db.add(f, self.mp_ar.stk(f) * 100 / (100 - self.moist_ar.stk(f)))
self.mp_db.add(f, np.where(self.mp_db.stk(f) > 100, 100, self.mp_db.stk(f)))
self.ash_db.add(f, self.ash_ar.stk(f) * 100 / (100 - self.moist_ar.stk(f)))
self.mp_daf.add(
f, ((self.mp_db.stk(f) - self.ash_db.stk(f)) * 100 / (100 - self.ash_db.stk(f)))
)
if self.time_vm is not None:
self.idx_vm.add(f, np.argmax(self.time.stk(f) > self.time_vm))
self.fc_ar.add(f, self.mp_ar.stk(f)[self.idx_vm.stk(f)] - self.ash_ar.stk(f))
self.vm_ar.add(
f, (100 - self.moist_ar.stk(f) - self.ash_ar.stk(f) - self.fc_ar.stk(f))
)
self.vm_db.add(f, self.vm_ar.stk(f) * 100 / (100 - self.moist_ar.stk(f)))
self.fc_db.add(f, self.fc_ar.stk(f) * 100 / (100 - self.moist_ar.stk(f)))
self.vm_daf.add(
f, ((self.vm_db.stk(f) - self.ash_db.stk(f)) * 100 / (100 - self.ash_db.stk(f)))
)
self.fc_daf.add(
f, ((self.fc_db.stk(f) - self.ash_db.stk(f)) * 100 / (100 - self.ash_db.stk(f)))
)
self.proximate_computed = True
[docs]
def dtg_analysis(self):
"""
Compute the derivative thermogravimetric (DTG) data for the sample.
This method calculates the DTG data based on the thermogravimetric data and stores the results
in the instance's attributes for later use.
"""
if not self.proximate_computed:
self.proximate_analysis()
idxs_in_dtg = []
idxs_fin_dtg = []
vector_len_dtg = []
for f in range(self.n_repl):
idxs_in_dtg.append(np.argmax(self.temp.stk(f) > self.temp_lim_dtg[0]))
idxs_fin_dtg.append(np.argmax(self.temp.stk(f) > self.temp_lim_dtg[1]))
vector_len_dtg.append(idxs_fin_dtg[f] - idxs_in_dtg[f])
min_vector_len_dtg = min(vector_len_dtg)
idxs_fin_dtg = [initial + min_vector_len_dtg for initial in idxs_in_dtg]
for f in range(self.n_repl):
time_dtg = self.time.stk(f)[idxs_in_dtg[f] : idxs_fin_dtg[f]]
self.time_dtg.add(f, time_dtg - np.min(time_dtg)) # starts from 0
self.temp_dtg.add(f, self.temp.stk(f)[idxs_in_dtg[f] : idxs_fin_dtg[f]])
self.mp_db_dtg.add(f, self.mp_db.stk(f)[idxs_in_dtg[f] : idxs_fin_dtg[f]])
dtg_db = np.gradient(self.mp_db_dtg.stk(f), self.time_dtg.stk(f))
if self.dtg_window_filter is not None:
self.dtg_db.add(f, savgol_filter(dtg_db, self.dtg_window_filter, 1))
else:
self.dtg_db.add(f, dtg_db)
# average
self.ave_dev_tga_perc = np.average(self.mp_db_dtg.std())
print(f"Average TG [%] St. Dev. for replicates: {self.ave_dev_tga_perc:0.2f} %")
self.dtg_computed = True
[docs]
def oxidation_analysis(self):
"""
Conduct oxidation analysis on the sample's data.
This method calculates various oxidation parameters such as the initial oxidation temperature (Ti),
peak oxidation temperature (Tp), and final oxidation temperature (Tb). It also computes derivative
parameters like maximum and average rates of weight loss. The results are stored in the instance's
attributes for further analysis.
"""
if not self.dtg_computed:
self.dtg_analysis()
for f in range(self.n_repl):
threshold: float = np.max(np.abs(self.dtg_db.stk(f))) * self.temp_i_temp_b_threshold
# Ti = T at which dtg > Ti_thresh wt%/min after moisture removal
self.temp_i_idx.add(f, int(np.argmax(np.abs(self.dtg_db.stk(f)) > threshold)))
self.temp_i.add(f, self.temp_dtg.stk(f)[self.temp_i_idx.stk(f)])
# Tp is the T of max abs(dtg)
self.temp_p_idx.add(f, int(np.argmax(np.abs(self.dtg_db.stk(f)))))
self.temp_p.add(f, self.temp_dtg.stk(f)[self.temp_p_idx.stk(f)])
# Tb reaches < 1 wt%/min at end of curve
try:
self.temp_b_idx.add(f, int(np.flatnonzero(self.dtg_db.stk(f) < -threshold)[-1]))
except IndexError: # the curve nevers goes above 1%
self.temp_b_idx.add(f, 0)
self.temp_b.add(f, self.temp_dtg.stk(f)[self.temp_b_idx.stk(f)])
self.dwdtemp_max.add(f, np.max(np.abs(self.dtg_db.stk(f))))
self.dwdtemp_mean.add(f, np.average(np.abs(self.dtg_db.stk(f))))
# combustion index
self.s_combustion_index.add(
f,
(
self.dwdtemp_max.stk(f)
* self.dwdtemp_mean.stk(f)
/ self.temp_i.stk(f)
/ self.temp_i.stk(f)
/ self.temp_b.stk(f)
),
)
# # average
self.oxidation_computed = True
[docs]
def soliddist_analysis(self):
"""
Perform solid distribution analysis on the sample's data.
This analysis calculates the weight loss at specified temperature steps, providing insight into
the solid decomposition process. The results are used for generating solid distribution plots.
"""
if not self.dtg_computed:
self.dtg_analysis()
for f in range(self.n_repl):
idxs = []
for step in self.soliddist_steps_min:
idxs.append(np.argmax(self.time.stk(f) > step))
self.temp_soliddist.add(f, self.temp.stk(f)[idxs])
self.time_soliddist.add(f, self.time.stk(f)[idxs])
self.dmp_soliddist.add(f, -np.diff(self.mp_db.stk(f)[idxs], prepend=100))
self.loc_soliddist.add(
f, np.convolve(np.insert(self.mp_db.stk(f)[idxs], 0, 100), [0.5, 0.5], mode="valid")
)
self.soliddist_computed = True
[docs]
def deconv_analysis(
self,
centers: list[float],
sigmas: list[float] | None = None,
amplitudes: list[float] | None = None,
center_mins: list[float] | None = None,
center_maxs: list[float] | None = None,
sigma_mins: list[float] | None = None,
sigma_maxs: list[float] | None = None,
amplitude_mins: list[float] | None = None,
amplitude_maxs: list[float] | None = None,
):
"""
Perform deconvolution analysis on the sample's DTG data.
This method fits multiple Gaussian peaks to the DTG curve to identify and analyze individual
decomposition steps within the sample.
:param centers: Initial guesses for the centers of the Gaussian peaks.
:type centers: list[float]
:param sigmas: Initial guesses for the standard deviations of the Gaussian peaks. If None, defaults are used.
:type sigmas: list[float], optional
:param amplitudes: Initial guesses for the amplitudes of the Gaussian peaks. If None, defaults are used.
:type amplitudes: list[float], optional
:param center_mins: Minimum allowed values for the centers of the Gaussian peaks. If None, no bounds are applied.
:type center_mins: list[float], optional
:param center_maxs: Maximum allowed values for the centers of the Gaussian peaks. If None, no bounds are applied.
:type center_maxs: list[float], optional
:param sigma_mins: Minimum allowed values for the standard deviations of the Gaussian peaks. If None, no bounds are applied.
:type sigma_mins: list[float], optional
:param sigma_maxs: Maximum allowed values for the standard deviations of the Gaussian peaks. If None, no bounds are applied.
:type sigma_maxs: list[float], optional
:param amplitude_mins: Minimum allowed values for the amplitudes of the Gaussian peaks. If None, no bounds are applied.
:type amplitude_mins: list[float], optional
:param amplitude_maxs: Maximum allowed values for the amplitudes of the Gaussian peaks. If None, no bounds are applied.
:type amplitude_maxs: list[float], optional
"""
if not self.proximate_computed:
self.dtg_analysis()
n_peaks = len(centers)
# self.dcv_best_fit_stk = np.zeros((self.len_dtg_db, self.n_repl))
# self.dcv_r2_stk = np.zeros(self.n_repl)
# self.dcv_peaks_stk = np.zeros((self.len_dtg_db, self.n_repl, n_peaks))
if sigmas is None:
sigmas = [1] * n_peaks
if amplitudes is None:
amplitudes = [10] * n_peaks
if center_mins is None:
center_mins = [None] * n_peaks
if center_maxs is None:
center_maxs = [None] * n_peaks
if sigma_mins is None:
sigma_mins = [None] * n_peaks
if sigma_maxs is None:
sigma_maxs = [None] * n_peaks
if amplitude_mins is None:
amplitude_mins = [0] * n_peaks
if amplitude_maxs is None:
amplitude_maxs = [None] * n_peaks
for f in range(self.n_repl):
y = np.abs(self.dtg_db.stk(f))
model = LinearModel(prefix="bkg_")
params = model.make_params(intercept=0, slope=0, vary=False)
for p in range(n_peaks):
prefix = f"peak_{p}"
self.dcv_peaks.append(Measure(name=prefix))
peak_model = GaussianModel(prefix=prefix)
pars = peak_model.make_params()
pars[prefix + "center"].set(
value=centers[p], min=center_mins[p], max=center_maxs[p]
)
pars[prefix + "sigma"].set(value=sigmas[p], min=sigma_mins[p], max=sigma_maxs[p])
pars[prefix + "amplitude"].set(
value=amplitudes[p], min=amplitude_mins[p], max=amplitude_maxs[p]
)
model += peak_model
params.update(pars)
result = model.fit(y, params=params, x=self.temp_dtg.stk(f))
self.dcv_best_fit.add(f, -result.best_fit)
self.dcv_r2.add(f, 1 - result.residual.var() / np.var(y))
components = result.eval_components(x=self.temp_dtg.stk(f))
for p in range(n_peaks):
prefix = f"peak_{p}"
if prefix in components:
self.dcv_peaks[p].add(f, -components[prefix])
else:
self.dcv_peaks[p].add(f, 0)
self.deconv_computed = True
[docs]
def report(
self,
report_type: Literal[
"proximate", "oxidation", "oxidation_extended", "soliddist", "soliddist_extended"
] = "proximate",
) -> pd.DataFrame:
"""
Generate a report based on the specified analysis type.
This method provides detailed insights into the sample's properties, such as proximate analysis,
oxidation characteristics, and solid distribution, based on the selected report type.
:param report_type: The type of report to generate. Options include 'proximate', 'oxidation',
'oxidation_extended', 'soliddist', and 'soliddist_extended'.
:type report_type: Literal["proximate", "oxidation", "oxidation_extended", "soliddist", "soliddist_extended"]
:return: A pandas DataFrame containing the analysis results.
:rtype: pd.DataFrame
"""
if report_type == "proximate":
if not self.proximate_computed:
self.proximate_analysis()
variables = [self.moist_ar, self.vm_db, self.fc_db, self.ash_db]
elif report_type == "oxidation" or report_type == "oxidation_extended":
if not self.oxidation_computed:
self.oxidation_analysis()
variables = [self.temp_i, self.temp_p, self.temp_b, self.s_combustion_index]
if report_type == "oxidation_extended":
variables += [self.dwdtemp_max, self.dwdtemp_mean]
elif report_type == "soliddist" or report_type == "soliddist_extended":
if not self.soliddist_computed:
self.soliddist_analysis()
if report_type == "soliddist":
variables = []
for p in self.temp_soliddist.ave():
variables.append(Measure(name=f"{p:0.0f} {self.temp_unit}"))
for f in range(self.n_repl):
for t, dmp in enumerate(self.dmp_soliddist.stk(f)):
variables[t].add(f, dmp)
elif report_type == "soliddist_extended":
variables = [self.temp_soliddist, self.time_soliddist, self.dmp_soliddist]
else:
raise ValueError(f"{report_type = } is not a valid option")
var_names = [var.name for var in variables]
index = [f"repl_{f}" for f in range(self.n_repl)] + ["ave", "std"]
rep_data = []
for f in range(self.n_repl):
rep_data.append([var.stk(f) for var in variables])
rep_data.append([var.ave() for var in variables])
rep_data.append([var.std() for var in variables])
report = pd.DataFrame(data=rep_data, columns=var_names, index=index)
report.index.name = self.name
self.report_types_computed.append(report_type)
self.reports[report_type] = report
if self.auto_save_reports:
out_path = plib.Path(self.out_path, "single_sample_reports")
out_path.mkdir(parents=True, exist_ok=True)
report.to_excel(plib.Path(out_path, f"{self.name}_{report_type}.xlsx"))
return report
[docs]
def plot_tg_dtg(self, **kwargs: dict[str, Any]) -> MyFigure:
"""
Generate a plot combining thermogravimetric (TG) and derivative thermogravimetric (DTG) data.
This method creates a figure showing the TG and DTG curves, providing a visual representation
of the sample's thermal decomposition behavior.
:param kwargs: Additional keyword arguments for plot customization.
:type kwargs: dict
:return: A MyFigure instance containing the generated plot.
:rtype: MyFigure
"""
if not self.dtg_computed:
self.dtg_analysis()
out_path = plib.Path(self.out_path, "single_sample_plots")
out_path.mkdir(parents=True, exist_ok=True)
default_kwargs = {
"filename": self.name + "_tg_dtg",
"out_path": out_path,
"height": 8.53,
"width": 6.4,
"x_lab": "time [min]",
"y_lab": [
f"T [{self.temp_symbol}]",
f"T [{self.temp_symbol}]",
f"{self.tg_label} (stb)",
f"{self.tg_label} (db)",
f"{self.tg_label} (db)",
f"{self.dtg_label} (db)",
],
"grid": self.plot_grid,
"text_font": self.plot_font,
}
# Update kwargs with the default key-value pairs if the key is not present in kwargs
kwargs = {**default_kwargs, **kwargs}
mf = MyFigure(
rows=3,
cols=2,
**kwargs,
)
# tg plot 0, 2, 4 on the left
for f in range(self.n_repl):
mf.axs[0].plot(
self.time.stk(f),
self.temp.stk(f),
color=colors[f],
linestyle=linestyles[f],
label=self.filenames[f],
)
mf.axs[2].plot(
self.time.stk(f),
self.mp_ar.stk(f),
color=colors[f],
linestyle=linestyles[f],
label=self.filenames[f],
)
mf.axs[4].plot(
self.time.stk(f),
self.mp_db.stk(f),
color=colors[f],
linestyle=linestyles[f],
label=self.filenames[f],
)
if self.time_moist:
mf.axs[0].plot(
self.time.stk(f)[self.idx_moist.stk(f)],
self.temp.stk(f)[self.idx_moist.stk(f)],
marker="x",
linestyle="None",
color=colors[f],
)
mf.axs[2].plot(
self.time.stk(f)[self.idx_moist.stk(f)],
self.mp_ar.stk(f)[self.idx_moist.stk(f)],
marker="x",
linestyle="None",
color=colors[f],
)
if f == self.n_repl - 1: # only add at the end
mf.axs[0].plot(
[], [], marker="x", linestyle="None", color="grey", label="moist_loc"
)
mf.axs[2].plot(
[], [], marker="x", linestyle="None", color="grey", label="moist_loc"
)
# only try to plot VM is the analysis includes it
if self.time_vm:
mf.axs[0].plot(
self.time.stk(f)[self.idx_vm.stk(f)],
self.temp.stk(f)[self.idx_vm.stk(f)],
marker="+",
linestyle="None",
color=colors[f],
)
mf.axs[4].plot(
self.time.stk(f)[self.idx_vm.stk(f)],
self.mp_db.stk(f)[self.idx_vm.stk(f)],
marker="+",
linestyle="None",
color=colors[f],
)
if f == self.n_repl - 1: # only add at the end
mf.axs[0].plot(
[], [], marker="+", linestyle="None", color="grey", label="vm_loc"
)
mf.axs[4].plot(
[], [], marker="+", linestyle="None", color="grey", label="vm_loc"
)
# tg plot 1, 3, 5 on the right
mf.axs[1].plot(
self.time_dtg.stk(f),
self.temp_dtg.stk(f),
color=colors[f],
linestyle=linestyles[f],
label=self.filenames[f],
)
mf.axs[3].plot(
self.time_dtg.stk(f),
self.mp_db_dtg.stk(f),
color=colors[f],
linestyle=linestyles[f],
label=self.filenames[f],
)
mf.axs[5].plot(
self.time_dtg.stk(f),
self.dtg_db.stk(f),
color=colors[f],
linestyle=linestyles[f],
label=self.filenames[f],
)
#
if self.oxidation_computed:
mf.axs[5].plot(
self.time_dtg.stk(f)[self.temp_i_idx.stk(f)],
self.dtg_db.stk(f)[self.temp_i_idx.stk(f)],
marker="x",
linestyle="None",
color=colors[f],
)
mf.axs[5].plot(
self.time_dtg.stk(f)[self.temp_p_idx.stk(f)],
self.dtg_db.stk(f)[self.temp_p_idx.stk(f)],
marker="+",
linestyle="None",
color=colors[f],
)
mf.axs[5].plot(
self.time_dtg.stk(f)[self.temp_b_idx.stk(f)],
self.dtg_db.stk(f)[self.temp_b_idx.stk(f)],
marker="1",
linestyle="None",
color=colors[f],
)
if f == self.n_repl - 1: # only add at the end
mf.axs[5].plot(
[], [], marker="x", linestyle="None", color="grey", label="Ti_loc"
)
mf.axs[5].plot(
[], [], marker="+", linestyle="None", color="grey", label="Tp_loc"
)
mf.axs[5].plot(
[], [], marker="1", linestyle="None", color="grey", label="Tb_loc"
)
mf.save_figure()
return mf
[docs]
def plot_soliddist(self, **kwargs: dict[str, Any]) -> MyFigure:
"""
Generate a plot illustrating the solid distribution analysis results.
This method plots the weight loss of the sample at specified temperature steps, showing the
distribution of solid components within the sample.
:param kwargs: Additional keyword arguments for plot customization.
:type kwargs: dict
:return: A MyFigure instance containing the generated plot.
:rtype: MyFigure
"""
# slightly different plotting behaviour (uses averages)
if not self.soliddist_computed:
self.soliddist_analysis()
out_path = plib.Path(self.out_path, "single_sample_plots")
out_path.mkdir(parents=True, exist_ok=True)
default_kwargs = {
"filename": self.name + "_soliddist",
"out_path": out_path,
"height": 3.2,
"width": 3.2,
"x_lab": "time [min]",
"y_lab": f"{self.tg_label} (db)",
"yt_lab": f"T [{self.temp_symbol}]",
"legend_loc": "center left",
"grid": self.plot_grid,
"text_font": self.plot_font,
}
# Update kwargs with the default key-value pairs if the key is not present in kwargs
kwargs = {**default_kwargs, **kwargs}
mf = MyFigure(
rows=1,
cols=1,
twinx=True,
**kwargs,
)
for f in range(self.n_repl):
mf.axs[0].plot(
self.time.stk(f),
self.mp_db.stk(f),
color=colors[f],
linestyle=linestyles[f],
label=self.filenames[f],
)
mf.axts[0].plot(self.time.stk(f), self.temp.stk(f))
for tm, mp, dmp in zip(self.time_soliddist(), self.loc_soliddist(), self.dmp_soliddist()):
mf.axs[0].annotate(
f"{dmp:0.0f}%", ha="center", va="top", xy=(tm - 10, mp + 1), fontsize=9
)
mf.save_figure()
return mf
[docs]
def plot_deconv(self, **kwargs: dict[str, Any]) -> MyFigure:
"""
Generate a plot showing the deconvolution analysis results.
This method visualizes the Gaussian peak fitting performed on the DTG data, illustrating the
identified decomposition steps within the sample.
:param kwargs: Additional keyword arguments for plot customization.
:type kwargs: dict
:return: A MyFigure instance containing the deconvolution analysis plot.
:rtype: MyFigure
"""
if not self.deconv_computed:
raise ValueError("Deconvolution analysis not computed")
out_path = plib.Path(self.out_path, "single_sample_plots")
out_path.mkdir(parents=True, exist_ok=True)
default_kwargs = {
"filename": self.name + "_deconv",
"out_path": out_path,
"height": 8.53,
"width": 3.2,
"x_lab": f"T [{self.temp_symbol}]",
"y_lab": f"{self.dtg_label} (db)",
"grid": self.plot_grid,
"text_font": self.plot_font,
}
# Update kwargs with the default key-value pairs if the key is not present in kwargs
kwargs = {**default_kwargs, **kwargs}
mf = MyFigure(
rows=self.n_repl,
cols=1,
**kwargs,
)
# Plot DTG data
for f in range(self.n_repl):
mf.axs[f].plot(
self.temp_dtg.stk(f),
self.dtg_db.stk(f),
color="black",
label="DTG",
)
# Plot best fit and individual peaks
mf.axs[f].plot(
self.temp_dtg.stk(f),
self.dcv_best_fit.stk(f),
label="best fit",
color="red",
linestyle="--",
)
colors_p = colors[:3] + colors[5:] # avoid using red
for p, peak in enumerate(self.dcv_peaks):
if peak.stk(f) is not None:
mf.axs[f].plot(
self.temp_dtg.stk(f),
peak.stk(f),
label=peak.name,
color=colors_p[p],
linestyle=linestyles[p],
)
mf.axs[f].annotate(
f"r$^2$={self.dcv_r2.stk(f):.2f}",
xycoords="axes fraction",
xy=(0.85, 0.96),
size="x-small",
)
mf.save_figure()
return mf