pychemstation.analysis

 1from .process_report import CSVProcessor
 2from .process_report import TXTProcessor
 3from .chromatogram import AgilentChannelChromatogramData
 4from .chromatogram import AgilentHPLCChromatogram
 5
 6__all__ = [
 7    "CSVProcessor",
 8    "TXTProcessor",
 9    "AgilentChannelChromatogramData",
10    "AgilentHPLCChromatogram",
11]
class CSVProcessor(pychemstation.analysis.process_report.ReportProcessor):
 67class CSVProcessor(ReportProcessor):
 68    def __init__(self, path: str):
 69        """Class to process reports in CSV form.
 70
 71        :param path: the parent folder that contains the CSV report(s) to parse.
 72        """
 73        super().__init__(path)
 74
 75    def find_csv_prefix(self) -> str:
 76        files = [
 77            f
 78            for f in os.listdir(self.path)
 79            if os.path.isfile(os.path.join(self.path, f))
 80        ]
 81        for file in files:
 82            if "00" in file:
 83                name, _, file_extension = file.partition(".")
 84                if "00" in name and file_extension.lower() == "csv":
 85                    prefix, _, _ = name.partition("00")
 86                    return prefix
 87        raise FileNotFoundError(
 88            "Couldn't find the prefix for CSV, please make sure the post-run settings generate a CSV."
 89        )
 90
 91    def report_contains(self, labels: List[str], want: List[str]):
 92        for label in labels:
 93            if label in want:
 94                want.remove(label)
 95
 96        all_labels_seen = False
 97        if len(want) != 0:
 98            for want_label in want:
 99                label_seen = False
100                for label in labels:
101                    if want_label in label or want_label == label:
102                        label_seen = True
103                all_labels_seen = label_seen
104        else:
105            return True
106        return all_labels_seen
107
108    def process_report(self) -> Result[AgilentReport, AnyStr]:
109        """Method to parse details from CSV report.
110
111        :return: subset of complete report details, specifically the sample location, solvents in pumps,
112         and list of peaks at each wavelength channel.
113        """
114        prefix = self.find_csv_prefix()
115        labels = os.path.join(self.path, f"{prefix}00.CSV")
116        if not os.path.exists(labels):
117            raise ValueError(
118                "CSV reports do not exist, make sure to turn on the post run CSV report option!"
119            )
120        elif os.path.exists(labels):
121            LOCATION = "Location"
122            NUM_SIGNALS = "Number of Signals"
123            SOLVENT = "Solvent"
124            df_labels: Dict[int, Dict[int, str]] = pd.read_csv(
125                labels, encoding="utf-16", header=None
126            ).to_dict()
127            vial_location: str = ""
128            signals: Dict[int, list[AgilentPeak]] = {}
129            solvents: Dict[str, str] = {}
130            report_labels: Dict[int, str] = df_labels[0]
131
132            if not self.report_contains(
133                list(report_labels.values()), [LOCATION, NUM_SIGNALS, SOLVENT]
134            ):
135                return Err(f"Missing one of: {LOCATION}, {NUM_SIGNALS}, {SOLVENT}")
136
137            for pos, val in report_labels.items():
138                if val == "Location":
139                    vial_location = df_labels[1][pos]
140                elif "Solvent" in val:
141                    if val not in solvents.keys():
142                        solvents[val] = df_labels[2][pos]
143                elif val == "Number of Signals":
144                    num_signals = int(df_labels[1][pos])
145                    for s in range(1, num_signals + 1):
146                        try:
147                            df = pd.read_csv(
148                                os.path.join(self.path, f"{prefix}0{s}.CSV"),
149                                encoding="utf-16",
150                                header=None,
151                            )
152                            peaks = df.apply(lambda row: AgilentPeak(*row), axis=1)
153                        except EmptyDataError:
154                            peaks = []
155                        try:
156                            wavelength = df_labels[1][pos + s].partition(",4 Ref=off")[
157                                0
158                            ][-3:]
159                            signals[int(wavelength)] = list(peaks)
160                        except (IndexError, ValueError):
161                            # TODO: Ask about the MS signals
162                            pass
163                    break
164
165            return Ok(
166                AgilentReport(
167                    signals=[
168                        Signals(wavelength=w, peaks=s, data=None)
169                        for w, s in signals.items()
170                    ],
171                    vial_location=FiftyFourVialPlate.from_int(int(vial_location)),
172                    solvents=solvents,
173                )
174            )
175
176        return Err("No report found")

Helper class that provides a standard way to create an ABC using inheritance.

CSVProcessor(path: str)
68    def __init__(self, path: str):
69        """Class to process reports in CSV form.
70
71        :param path: the parent folder that contains the CSV report(s) to parse.
72        """
73        super().__init__(path)

Class to process reports in CSV form.

Parameters
  • path: the parent folder that contains the CSV report(s) to parse.
def find_csv_prefix(self) -> str:
75    def find_csv_prefix(self) -> str:
76        files = [
77            f
78            for f in os.listdir(self.path)
79            if os.path.isfile(os.path.join(self.path, f))
80        ]
81        for file in files:
82            if "00" in file:
83                name, _, file_extension = file.partition(".")
84                if "00" in name and file_extension.lower() == "csv":
85                    prefix, _, _ = name.partition("00")
86                    return prefix
87        raise FileNotFoundError(
88            "Couldn't find the prefix for CSV, please make sure the post-run settings generate a CSV."
89        )
def report_contains(self, labels: List[str], want: List[str]):
 91    def report_contains(self, labels: List[str], want: List[str]):
 92        for label in labels:
 93            if label in want:
 94                want.remove(label)
 95
 96        all_labels_seen = False
 97        if len(want) != 0:
 98            for want_label in want:
 99                label_seen = False
100                for label in labels:
101                    if want_label in label or want_label == label:
102                        label_seen = True
103                all_labels_seen = label_seen
104        else:
105            return True
106        return all_labels_seen
def process_report( self) -> Union[result.result.Ok[pychemstation.analysis.process_report.AgilentReport], result.result.Err[~AnyStr]]:
108    def process_report(self) -> Result[AgilentReport, AnyStr]:
109        """Method to parse details from CSV report.
110
111        :return: subset of complete report details, specifically the sample location, solvents in pumps,
112         and list of peaks at each wavelength channel.
113        """
114        prefix = self.find_csv_prefix()
115        labels = os.path.join(self.path, f"{prefix}00.CSV")
116        if not os.path.exists(labels):
117            raise ValueError(
118                "CSV reports do not exist, make sure to turn on the post run CSV report option!"
119            )
120        elif os.path.exists(labels):
121            LOCATION = "Location"
122            NUM_SIGNALS = "Number of Signals"
123            SOLVENT = "Solvent"
124            df_labels: Dict[int, Dict[int, str]] = pd.read_csv(
125                labels, encoding="utf-16", header=None
126            ).to_dict()
127            vial_location: str = ""
128            signals: Dict[int, list[AgilentPeak]] = {}
129            solvents: Dict[str, str] = {}
130            report_labels: Dict[int, str] = df_labels[0]
131
132            if not self.report_contains(
133                list(report_labels.values()), [LOCATION, NUM_SIGNALS, SOLVENT]
134            ):
135                return Err(f"Missing one of: {LOCATION}, {NUM_SIGNALS}, {SOLVENT}")
136
137            for pos, val in report_labels.items():
138                if val == "Location":
139                    vial_location = df_labels[1][pos]
140                elif "Solvent" in val:
141                    if val not in solvents.keys():
142                        solvents[val] = df_labels[2][pos]
143                elif val == "Number of Signals":
144                    num_signals = int(df_labels[1][pos])
145                    for s in range(1, num_signals + 1):
146                        try:
147                            df = pd.read_csv(
148                                os.path.join(self.path, f"{prefix}0{s}.CSV"),
149                                encoding="utf-16",
150                                header=None,
151                            )
152                            peaks = df.apply(lambda row: AgilentPeak(*row), axis=1)
153                        except EmptyDataError:
154                            peaks = []
155                        try:
156                            wavelength = df_labels[1][pos + s].partition(",4 Ref=off")[
157                                0
158                            ][-3:]
159                            signals[int(wavelength)] = list(peaks)
160                        except (IndexError, ValueError):
161                            # TODO: Ask about the MS signals
162                            pass
163                    break
164
165            return Ok(
166                AgilentReport(
167                    signals=[
168                        Signals(wavelength=w, peaks=s, data=None)
169                        for w, s in signals.items()
170                    ],
171                    vial_location=FiftyFourVialPlate.from_int(int(vial_location)),
172                    solvents=solvents,
173                )
174            )
175
176        return Err("No report found")

Method to parse details from CSV report.

Returns

subset of complete report details, specifically the sample location, solvents in pumps, and list of peaks at each wavelength channel.

class TXTProcessor(pychemstation.analysis.process_report.ReportProcessor):
179class TXTProcessor(ReportProcessor):
180    """Regex matches for column and unit combinations, courtesy of Veronica Lai."""
181
182    _column_re_dictionary = {
183        "Peak": {  # peak index
184            "#": "[ ]+(?P<Peak>[\d]+)",  # number
185        },
186        "RetTime": {  # retention time
187            "[min]": "(?P<RetTime>[\d]+.[\d]+)",  # minutes
188        },
189        "Type": {  # peak type
190            "": "(?P<Type>[A-Z]{1,3}(?: [A-Z]{1,2})*)",  # todo this is different from <4.8.8 aghplc tools
191        },
192        "Width": {  # peak width
193            "[min]": "(?P<Width>[\d]+.[\d]+[e+-]*[\d]+)",
194        },
195        "Area": {  # peak area
196            "[mAU*s]": "(?P<Area>[\d]+.[\d]+[e+-]*[\d]+)",  # area units
197            "%": "(?P<percent>[\d]+.[\d]+[e+-]*[\d]+)",  # percent
198        },
199        "Height": {  # peak height
200            "[mAU]": "(?P<Height>[\d]+.[\d]+[e+-]*[\d]+)",
201        },
202        "Name": {
203            "": "(?P<Name>[^\s]+(?:\s[^\s]+)*)",  # peak name
204        },
205    }
206
207    def __init__(
208        self,
209        path: str,
210        min_ret_time: int = 0,
211        max_ret_time: int = 999,
212        target_wavelength_range=None,
213    ):
214        """Class to process reports in CSV form.
215
216        :param path: the parent folder that contains the CSV report(s) to parse.
217        :param min_ret_time: peaks after this value (min) will be returned
218        :param max_ret_time: peaks will only be returned up to this time (min)
219        :param target_wavelength_range: range of wavelengths to return
220        """
221        if target_wavelength_range is None:
222            target_wavelength_range = list(range(200, 300))
223        self.target_wavelength_range = target_wavelength_range
224        self.min_ret_time = min_ret_time
225        self.max_ret_time = max_ret_time
226        super().__init__(path)
227
228    def process_report(self) -> Result[AgilentReport, Union[AnyStr, Exception]]:
229        """Method to parse details from CSV report.
230        If you want more functionality, use `aghplctools`.
231        `from aghplctools.ingestion.text import pull_hplc_area_from_txt`
232        `signals = pull_hplc_area_from_txt(file_path)`
233
234        :return: subset of complete report details, specifically the sample location, solvents in pumps,
235         and list of peaks at each wavelength channel.
236        """
237        try:
238            with open(
239                os.path.join(self.path, "REPORT.TXT"), "r", encoding="utf-16"
240            ) as openfile:
241                text = openfile.read()
242
243            try:
244                signals = self.parse_area_report(text)
245            except ValueError as e:
246                return Err("No peaks found: " + str(e))
247
248            signals = {
249                key: signals[key]
250                for key in self.target_wavelength_range
251                if key in signals
252            }
253
254            parsed_signals = []
255            for wavelength, wavelength_dict in signals.items():
256                current_wavelength_signals = Signals(
257                    wavelength=int(wavelength), peaks=[], data=None
258                )
259                for ret_time, ret_time_dict in wavelength_dict.items():
260                    if self.min_ret_time <= ret_time <= self.max_ret_time:
261                        current_wavelength_signals.peaks.append(
262                            AgilentPeak(
263                                retention_time=ret_time,
264                                area=ret_time_dict["Area"],
265                                width=ret_time_dict["Width"],
266                                height=ret_time_dict["Height"],
267                                peak_number=None,
268                                peak_type=ret_time_dict["Type"],
269                                area_percent=None,
270                            )
271                        )
272                parsed_signals.append(current_wavelength_signals)
273
274            return Ok(
275                AgilentReport(vial_location=None, solvents=None, signals=parsed_signals)
276            )
277        except Exception as e:
278            return Err(e)
279
280    def parse_area_report(self, report_text: str) -> Dict:
281        """Interprets report text and parses the area report section, converting it to dictionary.
282        Courtesy of Veronica Lai.
283
284        :param report_text: plain text version of the report.
285        :raises ValueError: if there are no peaks defined in the report text file
286        :return: dictionary of signals in the form
287            dict[wavelength][retention time (float)][Width/Area/Height/etc.]
288
289        If you want more functionality, use `aghplctools`.
290        should be able to use the `parse_area_report` method of aghplctools v4.8.8
291        """
292        if re.search(_no_peaks_re, report_text):  # There are no peaks in Report.txt
293            raise ValueError("No peaks found in Report.txt")
294        blocks = _header_block_re.split(report_text)
295        signals: Dict[int, dict] = {}  # output dictionary
296        for ind, block in enumerate(blocks):
297            # area report block
298            if _area_report_re.match(block):  # match area report block
299                # break into signal blocks
300                signal_blocks = _signal_table_re.split(blocks[ind + 1])
301                # iterate over signal blocks
302                for table in signal_blocks:
303                    si = _signal_info_re.match(table)
304                    if si is not None:
305                        # some error state (e.g. 'not found')
306                        if si.group("error") != "":
307                            continue
308                        wavelength = int(si.group("wavelength"))
309                        if wavelength in signals:
310                            # placeholder error raise just in case (this probably won't happen)
311                            raise KeyError(
312                                f"The wavelength {float(si.group('wavelength'))} is already in the signals dictionary"
313                            )
314                        signals[wavelength] = {}
315                        # build peak regex
316                        peak_re = self.build_peak_regex(table)
317                        if (
318                            peak_re is None
319                        ):  # if there are no columns (empty table), continue
320                            continue
321                        for line in table.split("\n"):
322                            peak = peak_re.match(line)
323                            if peak is not None:
324                                signals[wavelength][float(peak.group("RetTime"))] = {}
325                                current = signals[wavelength][
326                                    float(peak.group("RetTime"))
327                                ]
328                                for key in self._column_re_dictionary:
329                                    if key in peak.re.groupindex:
330                                        try:  # try float conversion, otherwise continue
331                                            current[key] = float(peak.group(key))
332                                        except ValueError:
333                                            current[key] = peak.group(key)
334                                    else:  # ensures defined
335                                        current[key] = None
336        return signals
337
338    def build_peak_regex(self, signal_table: str) -> Pattern[str] | None:
339        """Builds a peak regex from a signal table. Courtesy of Veronica Lai.
340
341        :param signal_table: block of lines associated with an area table
342        :return: peak line regex object (<=3.6 _sre.SRE_PATTERN, >=3.7 re.Pattern)
343        """
344        split_table = signal_table.split("\n")
345        if len(split_table) <= 4:  # catch peak table with no values
346            return None
347        # todo verify that these indicies are always true
348        column_line = split_table[2]  # table column line
349        unit_line = split_table[3]  # column unit line
350        length_line = [len(val) + 1 for val in split_table[4].split("|")]  # length line
351
352        # iterate over header values and units to build peak table regex
353        peak_re_string = []
354        for header, unit in zip(
355            chunk_string(column_line, length_line), chunk_string(unit_line, length_line)
356        ):
357            if header == "":  # todo create a better catch for an undefined header
358                continue
359            try:
360                peak_re_string.append(
361                    self._column_re_dictionary[header][
362                        unit
363                    ]  # append the appropriate regex
364                )
365            except KeyError:  # catch for undefined regexes (need to be built)
366                raise KeyError(
367                    f'The header/unit combination "{header}" "{unit}" is not defined in the peak regex '
368                    f"dictionary. Let Lars know."
369                )
370
371        return re.compile(
372            "[ ]+".join(
373                peak_re_string
374            )  # constructed string delimited by 1 or more spaces
375            + "[\s]*"  # and any remaining white space
376        )

Regex matches for column and unit combinations, courtesy of Veronica Lai.

TXTProcessor( path: str, min_ret_time: int = 0, max_ret_time: int = 999, target_wavelength_range=None)
207    def __init__(
208        self,
209        path: str,
210        min_ret_time: int = 0,
211        max_ret_time: int = 999,
212        target_wavelength_range=None,
213    ):
214        """Class to process reports in CSV form.
215
216        :param path: the parent folder that contains the CSV report(s) to parse.
217        :param min_ret_time: peaks after this value (min) will be returned
218        :param max_ret_time: peaks will only be returned up to this time (min)
219        :param target_wavelength_range: range of wavelengths to return
220        """
221        if target_wavelength_range is None:
222            target_wavelength_range = list(range(200, 300))
223        self.target_wavelength_range = target_wavelength_range
224        self.min_ret_time = min_ret_time
225        self.max_ret_time = max_ret_time
226        super().__init__(path)

Class to process reports in CSV form.

Parameters
  • path: the parent folder that contains the CSV report(s) to parse.
  • min_ret_time: peaks after this value (min) will be returned
  • max_ret_time: peaks will only be returned up to this time (min)
  • target_wavelength_range: range of wavelengths to return
target_wavelength_range
min_ret_time
max_ret_time
def process_report( self) -> Union[result.result.Ok[pychemstation.analysis.process_report.AgilentReport], result.result.Err[Union[~AnyStr, Exception]]]:
228    def process_report(self) -> Result[AgilentReport, Union[AnyStr, Exception]]:
229        """Method to parse details from CSV report.
230        If you want more functionality, use `aghplctools`.
231        `from aghplctools.ingestion.text import pull_hplc_area_from_txt`
232        `signals = pull_hplc_area_from_txt(file_path)`
233
234        :return: subset of complete report details, specifically the sample location, solvents in pumps,
235         and list of peaks at each wavelength channel.
236        """
237        try:
238            with open(
239                os.path.join(self.path, "REPORT.TXT"), "r", encoding="utf-16"
240            ) as openfile:
241                text = openfile.read()
242
243            try:
244                signals = self.parse_area_report(text)
245            except ValueError as e:
246                return Err("No peaks found: " + str(e))
247
248            signals = {
249                key: signals[key]
250                for key in self.target_wavelength_range
251                if key in signals
252            }
253
254            parsed_signals = []
255            for wavelength, wavelength_dict in signals.items():
256                current_wavelength_signals = Signals(
257                    wavelength=int(wavelength), peaks=[], data=None
258                )
259                for ret_time, ret_time_dict in wavelength_dict.items():
260                    if self.min_ret_time <= ret_time <= self.max_ret_time:
261                        current_wavelength_signals.peaks.append(
262                            AgilentPeak(
263                                retention_time=ret_time,
264                                area=ret_time_dict["Area"],
265                                width=ret_time_dict["Width"],
266                                height=ret_time_dict["Height"],
267                                peak_number=None,
268                                peak_type=ret_time_dict["Type"],
269                                area_percent=None,
270                            )
271                        )
272                parsed_signals.append(current_wavelength_signals)
273
274            return Ok(
275                AgilentReport(vial_location=None, solvents=None, signals=parsed_signals)
276            )
277        except Exception as e:
278            return Err(e)

Method to parse details from CSV report. If you want more functionality, use aghplctools. from aghplctools.ingestion.text import pull_hplc_area_from_txt signals = pull_hplc_area_from_txt(file_path)

Returns

subset of complete report details, specifically the sample location, solvents in pumps, and list of peaks at each wavelength channel.

def parse_area_report(self, report_text: str) -> Dict:
280    def parse_area_report(self, report_text: str) -> Dict:
281        """Interprets report text and parses the area report section, converting it to dictionary.
282        Courtesy of Veronica Lai.
283
284        :param report_text: plain text version of the report.
285        :raises ValueError: if there are no peaks defined in the report text file
286        :return: dictionary of signals in the form
287            dict[wavelength][retention time (float)][Width/Area/Height/etc.]
288
289        If you want more functionality, use `aghplctools`.
290        should be able to use the `parse_area_report` method of aghplctools v4.8.8
291        """
292        if re.search(_no_peaks_re, report_text):  # There are no peaks in Report.txt
293            raise ValueError("No peaks found in Report.txt")
294        blocks = _header_block_re.split(report_text)
295        signals: Dict[int, dict] = {}  # output dictionary
296        for ind, block in enumerate(blocks):
297            # area report block
298            if _area_report_re.match(block):  # match area report block
299                # break into signal blocks
300                signal_blocks = _signal_table_re.split(blocks[ind + 1])
301                # iterate over signal blocks
302                for table in signal_blocks:
303                    si = _signal_info_re.match(table)
304                    if si is not None:
305                        # some error state (e.g. 'not found')
306                        if si.group("error") != "":
307                            continue
308                        wavelength = int(si.group("wavelength"))
309                        if wavelength in signals:
310                            # placeholder error raise just in case (this probably won't happen)
311                            raise KeyError(
312                                f"The wavelength {float(si.group('wavelength'))} is already in the signals dictionary"
313                            )
314                        signals[wavelength] = {}
315                        # build peak regex
316                        peak_re = self.build_peak_regex(table)
317                        if (
318                            peak_re is None
319                        ):  # if there are no columns (empty table), continue
320                            continue
321                        for line in table.split("\n"):
322                            peak = peak_re.match(line)
323                            if peak is not None:
324                                signals[wavelength][float(peak.group("RetTime"))] = {}
325                                current = signals[wavelength][
326                                    float(peak.group("RetTime"))
327                                ]
328                                for key in self._column_re_dictionary:
329                                    if key in peak.re.groupindex:
330                                        try:  # try float conversion, otherwise continue
331                                            current[key] = float(peak.group(key))
332                                        except ValueError:
333                                            current[key] = peak.group(key)
334                                    else:  # ensures defined
335                                        current[key] = None
336        return signals

Interprets report text and parses the area report section, converting it to dictionary. Courtesy of Veronica Lai.

Parameters
  • report_text: plain text version of the report.
Raises
  • ValueError: if there are no peaks defined in the report text file
Returns

dictionary of signals in the form dict[wavelength][retention time (float)][Width/Area/Height/etc.]

If you want more functionality, use aghplctools. should be able to use the parse_area_report method of aghplctools v4.8.8

def build_peak_regex(self, signal_table: str) -> Optional[Pattern[str]]:
338    def build_peak_regex(self, signal_table: str) -> Pattern[str] | None:
339        """Builds a peak regex from a signal table. Courtesy of Veronica Lai.
340
341        :param signal_table: block of lines associated with an area table
342        :return: peak line regex object (<=3.6 _sre.SRE_PATTERN, >=3.7 re.Pattern)
343        """
344        split_table = signal_table.split("\n")
345        if len(split_table) <= 4:  # catch peak table with no values
346            return None
347        # todo verify that these indicies are always true
348        column_line = split_table[2]  # table column line
349        unit_line = split_table[3]  # column unit line
350        length_line = [len(val) + 1 for val in split_table[4].split("|")]  # length line
351
352        # iterate over header values and units to build peak table regex
353        peak_re_string = []
354        for header, unit in zip(
355            chunk_string(column_line, length_line), chunk_string(unit_line, length_line)
356        ):
357            if header == "":  # todo create a better catch for an undefined header
358                continue
359            try:
360                peak_re_string.append(
361                    self._column_re_dictionary[header][
362                        unit
363                    ]  # append the appropriate regex
364                )
365            except KeyError:  # catch for undefined regexes (need to be built)
366                raise KeyError(
367                    f'The header/unit combination "{header}" "{unit}" is not defined in the peak regex '
368                    f"dictionary. Let Lars know."
369                )
370
371        return re.compile(
372            "[ ]+".join(
373                peak_re_string
374            )  # constructed string delimited by 1 or more spaces
375            + "[\s]*"  # and any remaining white space
376        )

Builds a peak regex from a signal table. Courtesy of Veronica Lai.

Parameters
  • signal_table: block of lines associated with an area table
Returns

peak line regex object (<=3.6 _sre.SRE_PATTERN, >=3.7 re.Pattern)

@dataclass
class AgilentChannelChromatogramData:
108@dataclass
109class AgilentChannelChromatogramData:
110    A: AgilentHPLCChromatogram
111    B: AgilentHPLCChromatogram
112    C: AgilentHPLCChromatogram
113    D: AgilentHPLCChromatogram
114    E: AgilentHPLCChromatogram
115    F: AgilentHPLCChromatogram
116    G: AgilentHPLCChromatogram
117    H: AgilentHPLCChromatogram
118
119    @classmethod
120    def from_dict(cls, chroms: Dict[str, AgilentHPLCChromatogram]):
121        keys = chroms.keys()
122        class_keys = vars(AgilentChannelChromatogramData)["__annotations__"].keys()
123        if set(class_keys) == set(keys):
124            return AgilentChannelChromatogramData(
125                A=chroms["A"],
126                B=chroms["B"],
127                C=chroms["C"],
128                D=chroms["D"],
129                E=chroms["E"],
130                F=chroms["F"],
131                G=chroms["G"],
132                H=chroms["H"],
133            )
134        else:
135            err = f"{keys} don't match {class_keys}"
136            raise KeyError(err)
@classmethod
def from_dict( cls, chroms: Dict[str, AgilentHPLCChromatogram]):
119    @classmethod
120    def from_dict(cls, chroms: Dict[str, AgilentHPLCChromatogram]):
121        keys = chroms.keys()
122        class_keys = vars(AgilentChannelChromatogramData)["__annotations__"].keys()
123        if set(class_keys) == set(keys):
124            return AgilentChannelChromatogramData(
125                A=chroms["A"],
126                B=chroms["B"],
127                C=chroms["C"],
128                D=chroms["D"],
129                E=chroms["E"],
130                F=chroms["F"],
131                G=chroms["G"],
132                H=chroms["H"],
133            )
134        else:
135            err = f"{keys} don't match {class_keys}"
136            raise KeyError(err)
class AgilentHPLCChromatogram(pychemstation.analysis.base_spectrum.AbstractSpectrum):
 22class AgilentHPLCChromatogram(AbstractSpectrum):
 23    """Class for HPLC spectrum (chromatogram) loading and handling."""
 24
 25    AXIS_MAPPING = {"x": "min", "y": "mAu"}
 26
 27    INTERNAL_PROPERTIES = {
 28        "baseline",
 29        "parameters",
 30        "data_path",
 31    }
 32
 33    # set of properties to be saved
 34    PUBLIC_PROPERTIES = {
 35        "x",
 36        "y",
 37        "peaks",
 38        "timestamp",
 39    }
 40
 41    def __init__(self, path=None, autosaving=False):
 42        if path is not None:
 43            os.makedirs(path, exist_ok=True)
 44            self.path = path
 45        else:
 46            self.path = os.path.join("../utils", "hplc_data")
 47            os.makedirs(self.path, exist_ok=True)
 48
 49        super().__init__(path=path, autosaving=autosaving)
 50
 51    def attach_spectrum(self, x, y):
 52        # loading all data
 53        super().load_spectrum(x, y, timestamp="NA")
 54
 55    def load_spectrum(self, data_path, channel="A"):
 56        """Loads the spectra from the given folder.
 57
 58        Args:
 59            data_path (str): Path where HPLC data has been saved.
 60        """
 61
 62        # to avoid dropping parameters when called in parent class
 63        if self.x is not None:
 64            if self.autosaving:
 65                self.save_data(filename=f"{data_path}_{channel}")
 66                self._dump()
 67
 68        # get raw data
 69        x, y = self.extract_rawdata(data_path, channel)
 70
 71        # get timestamp
 72        tstr = data_path.split(".")[0].split("_")[-1]
 73        timestamp = "NA"
 74        try:
 75            timestamp = time.mktime(time.strptime(tstr, TIME_FORMAT))
 76        except ValueError:
 77            pass
 78
 79        # loading all data
 80        super().load_spectrum(x, y, timestamp)
 81
 82    ### PUBLIC METHODS TO LOAD RAW DATA ###
 83
 84    def extract_rawdata(
 85        self, experiment_dir: str, channel: str
 86    ) -> Tuple[np.ndarray, np.ndarray]:
 87        """Reads raw data from Chemstation .CH files.
 88
 89        :param experiment_dir: .D directory with the .CH files
 90
 91        :returns: np.array(times), np.array(values)   Raw chromatogram data
 92        """
 93        filename = os.path.join(experiment_dir, f"DAD1{channel}")
 94        npz_file = filename + ".npz"
 95
 96        if os.path.exists(npz_file):
 97            # already processed
 98            data = np.load(npz_file)
 99            return data["times"], data["values"]
100        else:
101            self.logger.debug("NPZ file not found. First time loading data.")
102            ch_file = filename + ".ch"
103            data = CHFile(ch_file)
104            np.savez_compressed(npz_file, times=data.times, values=data.values)
105            return np.array(data.times), np.array(data.values)

Class for HPLC spectrum (chromatogram) loading and handling.

AgilentHPLCChromatogram(path=None, autosaving=False)
41    def __init__(self, path=None, autosaving=False):
42        if path is not None:
43            os.makedirs(path, exist_ok=True)
44            self.path = path
45        else:
46            self.path = os.path.join("../utils", "hplc_data")
47            os.makedirs(self.path, exist_ok=True)
48
49        super().__init__(path=path, autosaving=autosaving)

Default constructor, loads properties into instance namespace.

Can be redefined in ancestor classes.

Parameters
  • path: Valid path to save data to. If omitted, uses ".//spectrum". If False - no folder created.
  • autosaving: If the True (default) will save the spectrum when the new one is loaded. Will drop otherwise.
AXIS_MAPPING = {'x': 'min', 'y': 'mAu'}
INTERNAL_PROPERTIES = {'baseline', 'data_path', 'parameters'}
PUBLIC_PROPERTIES = {'peaks', 'timestamp', 'x', 'y'}
def attach_spectrum(self, x, y):
51    def attach_spectrum(self, x, y):
52        # loading all data
53        super().load_spectrum(x, y, timestamp="NA")
def load_spectrum(self, data_path, channel='A'):
55    def load_spectrum(self, data_path, channel="A"):
56        """Loads the spectra from the given folder.
57
58        Args:
59            data_path (str): Path where HPLC data has been saved.
60        """
61
62        # to avoid dropping parameters when called in parent class
63        if self.x is not None:
64            if self.autosaving:
65                self.save_data(filename=f"{data_path}_{channel}")
66                self._dump()
67
68        # get raw data
69        x, y = self.extract_rawdata(data_path, channel)
70
71        # get timestamp
72        tstr = data_path.split(".")[0].split("_")[-1]
73        timestamp = "NA"
74        try:
75            timestamp = time.mktime(time.strptime(tstr, TIME_FORMAT))
76        except ValueError:
77            pass
78
79        # loading all data
80        super().load_spectrum(x, y, timestamp)

Loads the spectra from the given folder.

Args: data_path (str): Path where HPLC data has been saved.

def extract_rawdata( self, experiment_dir: str, channel: str) -> Tuple[numpy.ndarray, numpy.ndarray]:
 84    def extract_rawdata(
 85        self, experiment_dir: str, channel: str
 86    ) -> Tuple[np.ndarray, np.ndarray]:
 87        """Reads raw data from Chemstation .CH files.
 88
 89        :param experiment_dir: .D directory with the .CH files
 90
 91        :returns: np.array(times), np.array(values)   Raw chromatogram data
 92        """
 93        filename = os.path.join(experiment_dir, f"DAD1{channel}")
 94        npz_file = filename + ".npz"
 95
 96        if os.path.exists(npz_file):
 97            # already processed
 98            data = np.load(npz_file)
 99            return data["times"], data["values"]
100        else:
101            self.logger.debug("NPZ file not found. First time loading data.")
102            ch_file = filename + ".ch"
103            data = CHFile(ch_file)
104            np.savez_compressed(npz_file, times=data.times, values=data.values)
105            return np.array(data.times), np.array(data.values)

Reads raw data from Chemstation .CH files.

Parameters
  • experiment_dir: .D directory with the .CH files

:returns: np.array(times), np.array(values) Raw chromatogram data