pychemstation.analysis
1from .process_report import CSVProcessor 2from .process_report import TXTProcessor 3from .chromatogram import AgilentChannelChromatogramData 4from .chromatogram import AgilentHPLCChromatogram 5 6__all__ = [ 7 "CSVProcessor", 8 "TXTProcessor", 9 "AgilentChannelChromatogramData", 10 "AgilentHPLCChromatogram", 11]
67class CSVProcessor(ReportProcessor): 68 def __init__(self, path: str): 69 """Class to process reports in CSV form. 70 71 :param path: the parent folder that contains the CSV report(s) to parse. 72 """ 73 super().__init__(path) 74 75 def find_csv_prefix(self) -> str: 76 files = [ 77 f 78 for f in os.listdir(self.path) 79 if os.path.isfile(os.path.join(self.path, f)) 80 ] 81 for file in files: 82 if "00" in file: 83 name, _, file_extension = file.partition(".") 84 if "00" in name and file_extension.lower() == "csv": 85 prefix, _, _ = name.partition("00") 86 return prefix 87 raise FileNotFoundError( 88 "Couldn't find the prefix for CSV, please make sure the post-run settings generate a CSV." 89 ) 90 91 def report_contains(self, labels: List[str], want: List[str]): 92 for label in labels: 93 if label in want: 94 want.remove(label) 95 96 all_labels_seen = False 97 if len(want) != 0: 98 for want_label in want: 99 label_seen = False 100 for label in labels: 101 if want_label in label or want_label == label: 102 label_seen = True 103 all_labels_seen = label_seen 104 else: 105 return True 106 return all_labels_seen 107 108 def process_report(self) -> Result[AgilentReport, AnyStr]: 109 """Method to parse details from CSV report. 110 111 :return: subset of complete report details, specifically the sample location, solvents in pumps, 112 and list of peaks at each wavelength channel. 113 """ 114 prefix = self.find_csv_prefix() 115 labels = os.path.join(self.path, f"{prefix}00.CSV") 116 if not os.path.exists(labels): 117 raise ValueError( 118 "CSV reports do not exist, make sure to turn on the post run CSV report option!" 119 ) 120 elif os.path.exists(labels): 121 LOCATION = "Location" 122 NUM_SIGNALS = "Number of Signals" 123 SOLVENT = "Solvent" 124 df_labels: Dict[int, Dict[int, str]] = pd.read_csv( 125 labels, encoding="utf-16", header=None 126 ).to_dict() 127 vial_location: str = "" 128 signals: Dict[int, list[AgilentPeak]] = {} 129 solvents: Dict[str, str] = {} 130 report_labels: Dict[int, str] = df_labels[0] 131 132 if not self.report_contains( 133 list(report_labels.values()), [LOCATION, NUM_SIGNALS, SOLVENT] 134 ): 135 return Err(f"Missing one of: {LOCATION}, {NUM_SIGNALS}, {SOLVENT}") 136 137 for pos, val in report_labels.items(): 138 if val == "Location": 139 vial_location = df_labels[1][pos] 140 elif "Solvent" in val: 141 if val not in solvents.keys(): 142 solvents[val] = df_labels[2][pos] 143 elif val == "Number of Signals": 144 num_signals = int(df_labels[1][pos]) 145 for s in range(1, num_signals + 1): 146 try: 147 df = pd.read_csv( 148 os.path.join(self.path, f"{prefix}0{s}.CSV"), 149 encoding="utf-16", 150 header=None, 151 ) 152 peaks = df.apply(lambda row: AgilentPeak(*row), axis=1) 153 except EmptyDataError: 154 peaks = [] 155 try: 156 wavelength = df_labels[1][pos + s].partition(",4 Ref=off")[ 157 0 158 ][-3:] 159 signals[int(wavelength)] = list(peaks) 160 except (IndexError, ValueError): 161 # TODO: Ask about the MS signals 162 pass 163 break 164 165 return Ok( 166 AgilentReport( 167 signals=[ 168 Signals(wavelength=w, peaks=s, data=None) 169 for w, s in signals.items() 170 ], 171 vial_location=FiftyFourVialPlate.from_int(int(vial_location)), 172 solvents=solvents, 173 ) 174 ) 175 176 return Err("No report found")
Helper class that provides a standard way to create an ABC using inheritance.
68 def __init__(self, path: str): 69 """Class to process reports in CSV form. 70 71 :param path: the parent folder that contains the CSV report(s) to parse. 72 """ 73 super().__init__(path)
Class to process reports in CSV form.
Parameters
- path: the parent folder that contains the CSV report(s) to parse.
75 def find_csv_prefix(self) -> str: 76 files = [ 77 f 78 for f in os.listdir(self.path) 79 if os.path.isfile(os.path.join(self.path, f)) 80 ] 81 for file in files: 82 if "00" in file: 83 name, _, file_extension = file.partition(".") 84 if "00" in name and file_extension.lower() == "csv": 85 prefix, _, _ = name.partition("00") 86 return prefix 87 raise FileNotFoundError( 88 "Couldn't find the prefix for CSV, please make sure the post-run settings generate a CSV." 89 )
91 def report_contains(self, labels: List[str], want: List[str]): 92 for label in labels: 93 if label in want: 94 want.remove(label) 95 96 all_labels_seen = False 97 if len(want) != 0: 98 for want_label in want: 99 label_seen = False 100 for label in labels: 101 if want_label in label or want_label == label: 102 label_seen = True 103 all_labels_seen = label_seen 104 else: 105 return True 106 return all_labels_seen
108 def process_report(self) -> Result[AgilentReport, AnyStr]: 109 """Method to parse details from CSV report. 110 111 :return: subset of complete report details, specifically the sample location, solvents in pumps, 112 and list of peaks at each wavelength channel. 113 """ 114 prefix = self.find_csv_prefix() 115 labels = os.path.join(self.path, f"{prefix}00.CSV") 116 if not os.path.exists(labels): 117 raise ValueError( 118 "CSV reports do not exist, make sure to turn on the post run CSV report option!" 119 ) 120 elif os.path.exists(labels): 121 LOCATION = "Location" 122 NUM_SIGNALS = "Number of Signals" 123 SOLVENT = "Solvent" 124 df_labels: Dict[int, Dict[int, str]] = pd.read_csv( 125 labels, encoding="utf-16", header=None 126 ).to_dict() 127 vial_location: str = "" 128 signals: Dict[int, list[AgilentPeak]] = {} 129 solvents: Dict[str, str] = {} 130 report_labels: Dict[int, str] = df_labels[0] 131 132 if not self.report_contains( 133 list(report_labels.values()), [LOCATION, NUM_SIGNALS, SOLVENT] 134 ): 135 return Err(f"Missing one of: {LOCATION}, {NUM_SIGNALS}, {SOLVENT}") 136 137 for pos, val in report_labels.items(): 138 if val == "Location": 139 vial_location = df_labels[1][pos] 140 elif "Solvent" in val: 141 if val not in solvents.keys(): 142 solvents[val] = df_labels[2][pos] 143 elif val == "Number of Signals": 144 num_signals = int(df_labels[1][pos]) 145 for s in range(1, num_signals + 1): 146 try: 147 df = pd.read_csv( 148 os.path.join(self.path, f"{prefix}0{s}.CSV"), 149 encoding="utf-16", 150 header=None, 151 ) 152 peaks = df.apply(lambda row: AgilentPeak(*row), axis=1) 153 except EmptyDataError: 154 peaks = [] 155 try: 156 wavelength = df_labels[1][pos + s].partition(",4 Ref=off")[ 157 0 158 ][-3:] 159 signals[int(wavelength)] = list(peaks) 160 except (IndexError, ValueError): 161 # TODO: Ask about the MS signals 162 pass 163 break 164 165 return Ok( 166 AgilentReport( 167 signals=[ 168 Signals(wavelength=w, peaks=s, data=None) 169 for w, s in signals.items() 170 ], 171 vial_location=FiftyFourVialPlate.from_int(int(vial_location)), 172 solvents=solvents, 173 ) 174 ) 175 176 return Err("No report found")
Method to parse details from CSV report.
Returns
subset of complete report details, specifically the sample location, solvents in pumps, and list of peaks at each wavelength channel.
179class TXTProcessor(ReportProcessor): 180 """Regex matches for column and unit combinations, courtesy of Veronica Lai.""" 181 182 _column_re_dictionary = { 183 "Peak": { # peak index 184 "#": "[ ]+(?P<Peak>[\d]+)", # number 185 }, 186 "RetTime": { # retention time 187 "[min]": "(?P<RetTime>[\d]+.[\d]+)", # minutes 188 }, 189 "Type": { # peak type 190 "": "(?P<Type>[A-Z]{1,3}(?: [A-Z]{1,2})*)", # todo this is different from <4.8.8 aghplc tools 191 }, 192 "Width": { # peak width 193 "[min]": "(?P<Width>[\d]+.[\d]+[e+-]*[\d]+)", 194 }, 195 "Area": { # peak area 196 "[mAU*s]": "(?P<Area>[\d]+.[\d]+[e+-]*[\d]+)", # area units 197 "%": "(?P<percent>[\d]+.[\d]+[e+-]*[\d]+)", # percent 198 }, 199 "Height": { # peak height 200 "[mAU]": "(?P<Height>[\d]+.[\d]+[e+-]*[\d]+)", 201 }, 202 "Name": { 203 "": "(?P<Name>[^\s]+(?:\s[^\s]+)*)", # peak name 204 }, 205 } 206 207 def __init__( 208 self, 209 path: str, 210 min_ret_time: int = 0, 211 max_ret_time: int = 999, 212 target_wavelength_range=None, 213 ): 214 """Class to process reports in CSV form. 215 216 :param path: the parent folder that contains the CSV report(s) to parse. 217 :param min_ret_time: peaks after this value (min) will be returned 218 :param max_ret_time: peaks will only be returned up to this time (min) 219 :param target_wavelength_range: range of wavelengths to return 220 """ 221 if target_wavelength_range is None: 222 target_wavelength_range = list(range(200, 300)) 223 self.target_wavelength_range = target_wavelength_range 224 self.min_ret_time = min_ret_time 225 self.max_ret_time = max_ret_time 226 super().__init__(path) 227 228 def process_report(self) -> Result[AgilentReport, Union[AnyStr, Exception]]: 229 """Method to parse details from CSV report. 230 If you want more functionality, use `aghplctools`. 231 `from aghplctools.ingestion.text import pull_hplc_area_from_txt` 232 `signals = pull_hplc_area_from_txt(file_path)` 233 234 :return: subset of complete report details, specifically the sample location, solvents in pumps, 235 and list of peaks at each wavelength channel. 236 """ 237 try: 238 with open( 239 os.path.join(self.path, "REPORT.TXT"), "r", encoding="utf-16" 240 ) as openfile: 241 text = openfile.read() 242 243 try: 244 signals = self.parse_area_report(text) 245 except ValueError as e: 246 return Err("No peaks found: " + str(e)) 247 248 signals = { 249 key: signals[key] 250 for key in self.target_wavelength_range 251 if key in signals 252 } 253 254 parsed_signals = [] 255 for wavelength, wavelength_dict in signals.items(): 256 current_wavelength_signals = Signals( 257 wavelength=int(wavelength), peaks=[], data=None 258 ) 259 for ret_time, ret_time_dict in wavelength_dict.items(): 260 if self.min_ret_time <= ret_time <= self.max_ret_time: 261 current_wavelength_signals.peaks.append( 262 AgilentPeak( 263 retention_time=ret_time, 264 area=ret_time_dict["Area"], 265 width=ret_time_dict["Width"], 266 height=ret_time_dict["Height"], 267 peak_number=None, 268 peak_type=ret_time_dict["Type"], 269 area_percent=None, 270 ) 271 ) 272 parsed_signals.append(current_wavelength_signals) 273 274 return Ok( 275 AgilentReport(vial_location=None, solvents=None, signals=parsed_signals) 276 ) 277 except Exception as e: 278 return Err(e) 279 280 def parse_area_report(self, report_text: str) -> Dict: 281 """Interprets report text and parses the area report section, converting it to dictionary. 282 Courtesy of Veronica Lai. 283 284 :param report_text: plain text version of the report. 285 :raises ValueError: if there are no peaks defined in the report text file 286 :return: dictionary of signals in the form 287 dict[wavelength][retention time (float)][Width/Area/Height/etc.] 288 289 If you want more functionality, use `aghplctools`. 290 should be able to use the `parse_area_report` method of aghplctools v4.8.8 291 """ 292 if re.search(_no_peaks_re, report_text): # There are no peaks in Report.txt 293 raise ValueError("No peaks found in Report.txt") 294 blocks = _header_block_re.split(report_text) 295 signals: Dict[int, dict] = {} # output dictionary 296 for ind, block in enumerate(blocks): 297 # area report block 298 if _area_report_re.match(block): # match area report block 299 # break into signal blocks 300 signal_blocks = _signal_table_re.split(blocks[ind + 1]) 301 # iterate over signal blocks 302 for table in signal_blocks: 303 si = _signal_info_re.match(table) 304 if si is not None: 305 # some error state (e.g. 'not found') 306 if si.group("error") != "": 307 continue 308 wavelength = int(si.group("wavelength")) 309 if wavelength in signals: 310 # placeholder error raise just in case (this probably won't happen) 311 raise KeyError( 312 f"The wavelength {float(si.group('wavelength'))} is already in the signals dictionary" 313 ) 314 signals[wavelength] = {} 315 # build peak regex 316 peak_re = self.build_peak_regex(table) 317 if ( 318 peak_re is None 319 ): # if there are no columns (empty table), continue 320 continue 321 for line in table.split("\n"): 322 peak = peak_re.match(line) 323 if peak is not None: 324 signals[wavelength][float(peak.group("RetTime"))] = {} 325 current = signals[wavelength][ 326 float(peak.group("RetTime")) 327 ] 328 for key in self._column_re_dictionary: 329 if key in peak.re.groupindex: 330 try: # try float conversion, otherwise continue 331 current[key] = float(peak.group(key)) 332 except ValueError: 333 current[key] = peak.group(key) 334 else: # ensures defined 335 current[key] = None 336 return signals 337 338 def build_peak_regex(self, signal_table: str) -> Pattern[str] | None: 339 """Builds a peak regex from a signal table. Courtesy of Veronica Lai. 340 341 :param signal_table: block of lines associated with an area table 342 :return: peak line regex object (<=3.6 _sre.SRE_PATTERN, >=3.7 re.Pattern) 343 """ 344 split_table = signal_table.split("\n") 345 if len(split_table) <= 4: # catch peak table with no values 346 return None 347 # todo verify that these indicies are always true 348 column_line = split_table[2] # table column line 349 unit_line = split_table[3] # column unit line 350 length_line = [len(val) + 1 for val in split_table[4].split("|")] # length line 351 352 # iterate over header values and units to build peak table regex 353 peak_re_string = [] 354 for header, unit in zip( 355 chunk_string(column_line, length_line), chunk_string(unit_line, length_line) 356 ): 357 if header == "": # todo create a better catch for an undefined header 358 continue 359 try: 360 peak_re_string.append( 361 self._column_re_dictionary[header][ 362 unit 363 ] # append the appropriate regex 364 ) 365 except KeyError: # catch for undefined regexes (need to be built) 366 raise KeyError( 367 f'The header/unit combination "{header}" "{unit}" is not defined in the peak regex ' 368 f"dictionary. Let Lars know." 369 ) 370 371 return re.compile( 372 "[ ]+".join( 373 peak_re_string 374 ) # constructed string delimited by 1 or more spaces 375 + "[\s]*" # and any remaining white space 376 )
Regex matches for column and unit combinations, courtesy of Veronica Lai.
207 def __init__( 208 self, 209 path: str, 210 min_ret_time: int = 0, 211 max_ret_time: int = 999, 212 target_wavelength_range=None, 213 ): 214 """Class to process reports in CSV form. 215 216 :param path: the parent folder that contains the CSV report(s) to parse. 217 :param min_ret_time: peaks after this value (min) will be returned 218 :param max_ret_time: peaks will only be returned up to this time (min) 219 :param target_wavelength_range: range of wavelengths to return 220 """ 221 if target_wavelength_range is None: 222 target_wavelength_range = list(range(200, 300)) 223 self.target_wavelength_range = target_wavelength_range 224 self.min_ret_time = min_ret_time 225 self.max_ret_time = max_ret_time 226 super().__init__(path)
Class to process reports in CSV form.
Parameters
- path: the parent folder that contains the CSV report(s) to parse.
- min_ret_time: peaks after this value (min) will be returned
- max_ret_time: peaks will only be returned up to this time (min)
- target_wavelength_range: range of wavelengths to return
228 def process_report(self) -> Result[AgilentReport, Union[AnyStr, Exception]]: 229 """Method to parse details from CSV report. 230 If you want more functionality, use `aghplctools`. 231 `from aghplctools.ingestion.text import pull_hplc_area_from_txt` 232 `signals = pull_hplc_area_from_txt(file_path)` 233 234 :return: subset of complete report details, specifically the sample location, solvents in pumps, 235 and list of peaks at each wavelength channel. 236 """ 237 try: 238 with open( 239 os.path.join(self.path, "REPORT.TXT"), "r", encoding="utf-16" 240 ) as openfile: 241 text = openfile.read() 242 243 try: 244 signals = self.parse_area_report(text) 245 except ValueError as e: 246 return Err("No peaks found: " + str(e)) 247 248 signals = { 249 key: signals[key] 250 for key in self.target_wavelength_range 251 if key in signals 252 } 253 254 parsed_signals = [] 255 for wavelength, wavelength_dict in signals.items(): 256 current_wavelength_signals = Signals( 257 wavelength=int(wavelength), peaks=[], data=None 258 ) 259 for ret_time, ret_time_dict in wavelength_dict.items(): 260 if self.min_ret_time <= ret_time <= self.max_ret_time: 261 current_wavelength_signals.peaks.append( 262 AgilentPeak( 263 retention_time=ret_time, 264 area=ret_time_dict["Area"], 265 width=ret_time_dict["Width"], 266 height=ret_time_dict["Height"], 267 peak_number=None, 268 peak_type=ret_time_dict["Type"], 269 area_percent=None, 270 ) 271 ) 272 parsed_signals.append(current_wavelength_signals) 273 274 return Ok( 275 AgilentReport(vial_location=None, solvents=None, signals=parsed_signals) 276 ) 277 except Exception as e: 278 return Err(e)
Method to parse details from CSV report.
If you want more functionality, use aghplctools.
from aghplctools.ingestion.text import pull_hplc_area_from_txt
signals = pull_hplc_area_from_txt(file_path)
Returns
subset of complete report details, specifically the sample location, solvents in pumps, and list of peaks at each wavelength channel.
280 def parse_area_report(self, report_text: str) -> Dict: 281 """Interprets report text and parses the area report section, converting it to dictionary. 282 Courtesy of Veronica Lai. 283 284 :param report_text: plain text version of the report. 285 :raises ValueError: if there are no peaks defined in the report text file 286 :return: dictionary of signals in the form 287 dict[wavelength][retention time (float)][Width/Area/Height/etc.] 288 289 If you want more functionality, use `aghplctools`. 290 should be able to use the `parse_area_report` method of aghplctools v4.8.8 291 """ 292 if re.search(_no_peaks_re, report_text): # There are no peaks in Report.txt 293 raise ValueError("No peaks found in Report.txt") 294 blocks = _header_block_re.split(report_text) 295 signals: Dict[int, dict] = {} # output dictionary 296 for ind, block in enumerate(blocks): 297 # area report block 298 if _area_report_re.match(block): # match area report block 299 # break into signal blocks 300 signal_blocks = _signal_table_re.split(blocks[ind + 1]) 301 # iterate over signal blocks 302 for table in signal_blocks: 303 si = _signal_info_re.match(table) 304 if si is not None: 305 # some error state (e.g. 'not found') 306 if si.group("error") != "": 307 continue 308 wavelength = int(si.group("wavelength")) 309 if wavelength in signals: 310 # placeholder error raise just in case (this probably won't happen) 311 raise KeyError( 312 f"The wavelength {float(si.group('wavelength'))} is already in the signals dictionary" 313 ) 314 signals[wavelength] = {} 315 # build peak regex 316 peak_re = self.build_peak_regex(table) 317 if ( 318 peak_re is None 319 ): # if there are no columns (empty table), continue 320 continue 321 for line in table.split("\n"): 322 peak = peak_re.match(line) 323 if peak is not None: 324 signals[wavelength][float(peak.group("RetTime"))] = {} 325 current = signals[wavelength][ 326 float(peak.group("RetTime")) 327 ] 328 for key in self._column_re_dictionary: 329 if key in peak.re.groupindex: 330 try: # try float conversion, otherwise continue 331 current[key] = float(peak.group(key)) 332 except ValueError: 333 current[key] = peak.group(key) 334 else: # ensures defined 335 current[key] = None 336 return signals
Interprets report text and parses the area report section, converting it to dictionary. Courtesy of Veronica Lai.
Parameters
- report_text: plain text version of the report.
Raises
- ValueError: if there are no peaks defined in the report text file
Returns
dictionary of signals in the form dict[wavelength][retention time (float)][Width/Area/Height/etc.]
If you want more functionality, use aghplctools.
should be able to use the parse_area_report method of aghplctools v4.8.8
338 def build_peak_regex(self, signal_table: str) -> Pattern[str] | None: 339 """Builds a peak regex from a signal table. Courtesy of Veronica Lai. 340 341 :param signal_table: block of lines associated with an area table 342 :return: peak line regex object (<=3.6 _sre.SRE_PATTERN, >=3.7 re.Pattern) 343 """ 344 split_table = signal_table.split("\n") 345 if len(split_table) <= 4: # catch peak table with no values 346 return None 347 # todo verify that these indicies are always true 348 column_line = split_table[2] # table column line 349 unit_line = split_table[3] # column unit line 350 length_line = [len(val) + 1 for val in split_table[4].split("|")] # length line 351 352 # iterate over header values and units to build peak table regex 353 peak_re_string = [] 354 for header, unit in zip( 355 chunk_string(column_line, length_line), chunk_string(unit_line, length_line) 356 ): 357 if header == "": # todo create a better catch for an undefined header 358 continue 359 try: 360 peak_re_string.append( 361 self._column_re_dictionary[header][ 362 unit 363 ] # append the appropriate regex 364 ) 365 except KeyError: # catch for undefined regexes (need to be built) 366 raise KeyError( 367 f'The header/unit combination "{header}" "{unit}" is not defined in the peak regex ' 368 f"dictionary. Let Lars know." 369 ) 370 371 return re.compile( 372 "[ ]+".join( 373 peak_re_string 374 ) # constructed string delimited by 1 or more spaces 375 + "[\s]*" # and any remaining white space 376 )
Builds a peak regex from a signal table. Courtesy of Veronica Lai.
Parameters
- signal_table: block of lines associated with an area table
Returns
peak line regex object (<=3.6 _sre.SRE_PATTERN, >=3.7 re.Pattern)
108@dataclass 109class AgilentChannelChromatogramData: 110 A: AgilentHPLCChromatogram 111 B: AgilentHPLCChromatogram 112 C: AgilentHPLCChromatogram 113 D: AgilentHPLCChromatogram 114 E: AgilentHPLCChromatogram 115 F: AgilentHPLCChromatogram 116 G: AgilentHPLCChromatogram 117 H: AgilentHPLCChromatogram 118 119 @classmethod 120 def from_dict(cls, chroms: Dict[str, AgilentHPLCChromatogram]): 121 keys = chroms.keys() 122 class_keys = vars(AgilentChannelChromatogramData)["__annotations__"].keys() 123 if set(class_keys) == set(keys): 124 return AgilentChannelChromatogramData( 125 A=chroms["A"], 126 B=chroms["B"], 127 C=chroms["C"], 128 D=chroms["D"], 129 E=chroms["E"], 130 F=chroms["F"], 131 G=chroms["G"], 132 H=chroms["H"], 133 ) 134 else: 135 err = f"{keys} don't match {class_keys}" 136 raise KeyError(err)
119 @classmethod 120 def from_dict(cls, chroms: Dict[str, AgilentHPLCChromatogram]): 121 keys = chroms.keys() 122 class_keys = vars(AgilentChannelChromatogramData)["__annotations__"].keys() 123 if set(class_keys) == set(keys): 124 return AgilentChannelChromatogramData( 125 A=chroms["A"], 126 B=chroms["B"], 127 C=chroms["C"], 128 D=chroms["D"], 129 E=chroms["E"], 130 F=chroms["F"], 131 G=chroms["G"], 132 H=chroms["H"], 133 ) 134 else: 135 err = f"{keys} don't match {class_keys}" 136 raise KeyError(err)
22class AgilentHPLCChromatogram(AbstractSpectrum): 23 """Class for HPLC spectrum (chromatogram) loading and handling.""" 24 25 AXIS_MAPPING = {"x": "min", "y": "mAu"} 26 27 INTERNAL_PROPERTIES = { 28 "baseline", 29 "parameters", 30 "data_path", 31 } 32 33 # set of properties to be saved 34 PUBLIC_PROPERTIES = { 35 "x", 36 "y", 37 "peaks", 38 "timestamp", 39 } 40 41 def __init__(self, path=None, autosaving=False): 42 if path is not None: 43 os.makedirs(path, exist_ok=True) 44 self.path = path 45 else: 46 self.path = os.path.join("../utils", "hplc_data") 47 os.makedirs(self.path, exist_ok=True) 48 49 super().__init__(path=path, autosaving=autosaving) 50 51 def attach_spectrum(self, x, y): 52 # loading all data 53 super().load_spectrum(x, y, timestamp="NA") 54 55 def load_spectrum(self, data_path, channel="A"): 56 """Loads the spectra from the given folder. 57 58 Args: 59 data_path (str): Path where HPLC data has been saved. 60 """ 61 62 # to avoid dropping parameters when called in parent class 63 if self.x is not None: 64 if self.autosaving: 65 self.save_data(filename=f"{data_path}_{channel}") 66 self._dump() 67 68 # get raw data 69 x, y = self.extract_rawdata(data_path, channel) 70 71 # get timestamp 72 tstr = data_path.split(".")[0].split("_")[-1] 73 timestamp = "NA" 74 try: 75 timestamp = time.mktime(time.strptime(tstr, TIME_FORMAT)) 76 except ValueError: 77 pass 78 79 # loading all data 80 super().load_spectrum(x, y, timestamp) 81 82 ### PUBLIC METHODS TO LOAD RAW DATA ### 83 84 def extract_rawdata( 85 self, experiment_dir: str, channel: str 86 ) -> Tuple[np.ndarray, np.ndarray]: 87 """Reads raw data from Chemstation .CH files. 88 89 :param experiment_dir: .D directory with the .CH files 90 91 :returns: np.array(times), np.array(values) Raw chromatogram data 92 """ 93 filename = os.path.join(experiment_dir, f"DAD1{channel}") 94 npz_file = filename + ".npz" 95 96 if os.path.exists(npz_file): 97 # already processed 98 data = np.load(npz_file) 99 return data["times"], data["values"] 100 else: 101 self.logger.debug("NPZ file not found. First time loading data.") 102 ch_file = filename + ".ch" 103 data = CHFile(ch_file) 104 np.savez_compressed(npz_file, times=data.times, values=data.values) 105 return np.array(data.times), np.array(data.values)
Class for HPLC spectrum (chromatogram) loading and handling.
41 def __init__(self, path=None, autosaving=False): 42 if path is not None: 43 os.makedirs(path, exist_ok=True) 44 self.path = path 45 else: 46 self.path = os.path.join("../utils", "hplc_data") 47 os.makedirs(self.path, exist_ok=True) 48 49 super().__init__(path=path, autosaving=autosaving)
Default constructor, loads properties into instance namespace.
Can be redefined in ancestor classes.
Parameters
- path: Valid path to save data to. If omitted, uses ".//spectrum". If False - no folder created.
- autosaving: If the True (default) will save the spectrum when the new one is loaded. Will drop otherwise.
55 def load_spectrum(self, data_path, channel="A"): 56 """Loads the spectra from the given folder. 57 58 Args: 59 data_path (str): Path where HPLC data has been saved. 60 """ 61 62 # to avoid dropping parameters when called in parent class 63 if self.x is not None: 64 if self.autosaving: 65 self.save_data(filename=f"{data_path}_{channel}") 66 self._dump() 67 68 # get raw data 69 x, y = self.extract_rawdata(data_path, channel) 70 71 # get timestamp 72 tstr = data_path.split(".")[0].split("_")[-1] 73 timestamp = "NA" 74 try: 75 timestamp = time.mktime(time.strptime(tstr, TIME_FORMAT)) 76 except ValueError: 77 pass 78 79 # loading all data 80 super().load_spectrum(x, y, timestamp)
Loads the spectra from the given folder.
Args: data_path (str): Path where HPLC data has been saved.
84 def extract_rawdata( 85 self, experiment_dir: str, channel: str 86 ) -> Tuple[np.ndarray, np.ndarray]: 87 """Reads raw data from Chemstation .CH files. 88 89 :param experiment_dir: .D directory with the .CH files 90 91 :returns: np.array(times), np.array(values) Raw chromatogram data 92 """ 93 filename = os.path.join(experiment_dir, f"DAD1{channel}") 94 npz_file = filename + ".npz" 95 96 if os.path.exists(npz_file): 97 # already processed 98 data = np.load(npz_file) 99 return data["times"], data["values"] 100 else: 101 self.logger.debug("NPZ file not found. First time loading data.") 102 ch_file = filename + ".ch" 103 data = CHFile(ch_file) 104 np.savez_compressed(npz_file, times=data.times, values=data.values) 105 return np.array(data.times), np.array(data.values)
Reads raw data from Chemstation .CH files.
Parameters
- experiment_dir: .D directory with the .CH files
:returns: np.array(times), np.array(values) Raw chromatogram data