pychemstation.utils.abc_tables.run
Abstract module containing shared logic for Method and Sequence tables.
Authors: Lucy Hao
1""" 2Abstract module containing shared logic for Method and Sequence tables. 3 4Authors: Lucy Hao 5""" 6 7from __future__ import annotations 8 9import abc 10import math 11import os 12import time 13import warnings 14from typing import Dict, List, Optional, Tuple, Union, Set 15 16import polling 17import rainbow as rb 18from result import Err, Ok, Result 19 20from ..macro import HPLCRunningStatus, Command 21from ..method_types import MethodDetails 22from ..sequence_types import SequenceTable 23from ..table_types import Table, T 24from ...analysis.chromatogram import ( 25 AgilentChannelChromatogramData, 26 AgilentHPLCChromatogram, 27) 28 29from .table import ABCTableController 30from ...analysis.process_report import ( 31 ReportType, 32 AgilentReport, 33 CSVProcessor, 34 TXTProcessor, 35) 36from ...control.controllers import CommunicationController 37 38TableType = Union[MethodDetails, SequenceTable] 39 40 41class RunController(ABCTableController, abc.ABC): 42 """Abstract controller for all tables that can trigger runs on Chemstation. 43 44 :param controller: the controller for sending MACROs, must be initialized for a run to be triggered. 45 :param src: complete directory path where files containing run parameters are stored. 46 :param data_dirs: list of complete directories that Chemstation will write data to. 47 :param table: contains register keys for accessing table in Chemstation. 48 :param offline: whether the communication controller is online. 49 """ 50 51 def __init__( 52 self, 53 controller: Optional[CommunicationController], 54 src: Optional[str], 55 data_dirs: Optional[List[str]], 56 table: Table, 57 offline: bool = False, 58 ): 59 super().__init__(controller=controller, table=table) 60 self.table_state: Optional[TableType] = None 61 self.curr_run_starting_time: Optional[float] = None 62 self.timeout: Optional[float] = None 63 self.current_run_child_files: Set[str] = set() 64 65 if not offline: 66 if src and not os.path.isdir(src): 67 raise FileNotFoundError(f"dir: {src} not found.") 68 if data_dirs: 69 for d in data_dirs: 70 if not os.path.isdir(d): 71 raise FileNotFoundError(f"dir: {d} not found.") 72 if r"\\" in d: 73 raise ValueError("Data directories should not be raw strings!") 74 if src and data_dirs: 75 self.src: str = src 76 self.data_dirs: List[str] = data_dirs 77 78 self.spectra: dict[str, AgilentHPLCChromatogram] = { 79 "A": AgilentHPLCChromatogram(), 80 "B": AgilentHPLCChromatogram(), 81 "C": AgilentHPLCChromatogram(), 82 "D": AgilentHPLCChromatogram(), 83 "E": AgilentHPLCChromatogram(), 84 "F": AgilentHPLCChromatogram(), 85 "G": AgilentHPLCChromatogram(), 86 "H": AgilentHPLCChromatogram(), 87 } 88 self.uv: Dict[int, AgilentHPLCChromatogram] = {} 89 self.data_files: List = [] 90 91 def __new__(cls, *args, **kwargs): 92 if cls is RunController: 93 raise TypeError(f"only children of '{cls.__name__}' may be instantiated") 94 return object.__new__(cls) 95 96 @abc.abstractmethod 97 def _fuzzy_match_most_recent_folder(self, most_recent_folder: T) -> Result[T, str]: 98 pass 99 100 @abc.abstractmethod 101 def get_data( 102 self, custom_path: Optional[str] = None 103 ) -> Union[List[AgilentChannelChromatogramData], AgilentChannelChromatogramData]: 104 pass 105 106 @abc.abstractmethod 107 def get_data_uv( 108 self, custom_path: str | None = None 109 ) -> Dict[int, AgilentHPLCChromatogram]: 110 pass 111 112 @abc.abstractmethod 113 def get_report( 114 self, custom_path: str, report_type: ReportType = ReportType.TXT 115 ) -> List[AgilentReport]: 116 pass 117 118 def check_hplc_is_running(self) -> bool: 119 if self.controller: 120 try: 121 started_running = polling.poll( 122 lambda: isinstance(self.controller.get_status(), HPLCRunningStatus), 123 step=1, 124 max_tries=20, 125 ) 126 except Exception as e: 127 print(e) 128 return False 129 if started_running: 130 self.curr_run_starting_time = time.time() 131 return started_running 132 else: 133 raise ValueError("Controller is offline") 134 135 def check_hplc_run_finished(self) -> Tuple[float, bool]: 136 if self.controller: 137 try: 138 _, current_run_file = self.get_current_run_data_dir_file() 139 sample_file, extension, _ = current_run_file.partition(".D") 140 self.current_run_child_files.add(sample_file) 141 except Exception: 142 pass 143 done_running = self.controller.check_if_not_running() 144 if self.curr_run_starting_time and self.timeout: 145 time_passed = time.time() - self.curr_run_starting_time 146 if time_passed > self.timeout: 147 enough_time_passed = time_passed >= self.timeout 148 run_finished = enough_time_passed and done_running 149 if run_finished: 150 self._reset_time() 151 return 0, run_finished 152 else: 153 time_left = self.timeout - time_passed 154 return time_left, self.controller.check_if_not_running() 155 return 0, self.controller.check_if_not_running() 156 raise ValueError("Controller is offline!") 157 158 def check_hplc_done_running(self) -> Ok[T] | Err[str]: 159 """Checks if ChemStation has finished running and can read data back 160 161 :return: Data file object containing most recent run file information. 162 """ 163 self.current_run_child_files = set() 164 if self.timeout is not None: 165 finished_run = False 166 minutes = math.ceil(self.timeout / 60) 167 try: 168 finished_run = not polling.poll( 169 lambda: self.check_hplc_run_finished()[1], 170 max_tries=minutes - 1, 171 step=50, 172 ) 173 except ( 174 polling.TimeoutException, 175 polling.PollingException, 176 polling.MaxCallException, 177 ): 178 try: 179 finished_run = polling.poll( 180 lambda: self.check_hplc_run_finished()[1], 181 timeout=self.timeout / 2, 182 step=1, 183 ) 184 except ( 185 polling.TimeoutException, 186 polling.PollingException, 187 polling.MaxCallException, 188 ): 189 pass 190 else: 191 raise ValueError("Timeout value is None, no comparison can be made.") 192 193 check_folder = self._fuzzy_match_most_recent_folder(self.data_files[-1]) 194 if check_folder.is_ok() and finished_run: 195 return check_folder 196 elif check_folder.is_ok(): 197 try: 198 finished_run = polling.poll( 199 lambda: self.check_hplc_run_finished()[1], max_tries=10, step=50 200 ) 201 if finished_run: 202 return check_folder 203 except Exception: 204 self._reset_time() 205 return self.data_files[-1] 206 return Err("Run not may not have completed.") 207 208 def get_uv_spectrum(self, path: str): 209 data_uv = rb.agilent.chemstation.parse_file(os.path.join(path, "DAD1.UV")) 210 times = data_uv.xlabels 211 wavelengths = data_uv.ylabels 212 absorbances = data_uv.data.transpose() 213 for i, w in enumerate(wavelengths): 214 self.uv[w] = AgilentHPLCChromatogram() 215 self.uv[w].attach_spectrum(times, absorbances[i]) 216 217 def get_report_details( 218 self, path: str, report_type: ReportType = ReportType.TXT 219 ) -> AgilentReport: 220 if report_type is ReportType.TXT: 221 txt_report = TXTProcessor(path).process_report() 222 if txt_report.is_ok(): 223 return txt_report.ok_value 224 elif txt_report.is_err(): 225 raise ValueError(txt_report.err_value) 226 if report_type is ReportType.CSV: 227 csv_report = CSVProcessor(path).process_report() 228 if csv_report.is_ok(): 229 return csv_report.ok_value 230 elif csv_report.is_err(): 231 raise ValueError(csv_report.err_value) 232 raise ValueError("Expected one of ReportType.TXT or ReportType.CSV") 233 234 def get_spectrum_at_channels(self, data_path: str): 235 """Load chromatogram for any channel in spectra dictionary.""" 236 for channel, spec in self.spectra.items(): 237 try: 238 spec.load_spectrum(data_path=data_path, channel=channel) 239 except FileNotFoundError: 240 self.spectra[channel] = AgilentHPLCChromatogram() 241 warning = f"No data at channel: {channel}" 242 warnings.warn(warning) 243 244 def _reset_time(self): 245 self.curr_run_starting_time = None 246 self.timeout = None 247 248 def get_current_run_data_dir_file(self) -> Tuple[str, str]: 249 self.send(Command.GET_CURRENT_RUN_DATA_DIR) 250 full_path_name = self.receive() 251 self.send(Command.GET_CURRENT_RUN_DATA_FILE) 252 current_sample_file = self.receive() 253 if full_path_name.is_ok() and current_sample_file.is_ok(): 254 if os.path.isdir(full_path_name.ok_value.string_response) and os.path.isdir( 255 os.path.join( 256 full_path_name.ok_value.string_response, 257 current_sample_file.ok_value.string_response, 258 ) 259 ): 260 return ( 261 full_path_name.ok_value.string_response, 262 current_sample_file.ok_value.string_response, 263 ) 264 raise ValueError("Couldn't read data dir and file or doesn't exist yet.")
TableType =
typing.Union[pychemstation.utils.method_types.MethodDetails, pychemstation.utils.sequence_types.SequenceTable]
42class RunController(ABCTableController, abc.ABC): 43 """Abstract controller for all tables that can trigger runs on Chemstation. 44 45 :param controller: the controller for sending MACROs, must be initialized for a run to be triggered. 46 :param src: complete directory path where files containing run parameters are stored. 47 :param data_dirs: list of complete directories that Chemstation will write data to. 48 :param table: contains register keys for accessing table in Chemstation. 49 :param offline: whether the communication controller is online. 50 """ 51 52 def __init__( 53 self, 54 controller: Optional[CommunicationController], 55 src: Optional[str], 56 data_dirs: Optional[List[str]], 57 table: Table, 58 offline: bool = False, 59 ): 60 super().__init__(controller=controller, table=table) 61 self.table_state: Optional[TableType] = None 62 self.curr_run_starting_time: Optional[float] = None 63 self.timeout: Optional[float] = None 64 self.current_run_child_files: Set[str] = set() 65 66 if not offline: 67 if src and not os.path.isdir(src): 68 raise FileNotFoundError(f"dir: {src} not found.") 69 if data_dirs: 70 for d in data_dirs: 71 if not os.path.isdir(d): 72 raise FileNotFoundError(f"dir: {d} not found.") 73 if r"\\" in d: 74 raise ValueError("Data directories should not be raw strings!") 75 if src and data_dirs: 76 self.src: str = src 77 self.data_dirs: List[str] = data_dirs 78 79 self.spectra: dict[str, AgilentHPLCChromatogram] = { 80 "A": AgilentHPLCChromatogram(), 81 "B": AgilentHPLCChromatogram(), 82 "C": AgilentHPLCChromatogram(), 83 "D": AgilentHPLCChromatogram(), 84 "E": AgilentHPLCChromatogram(), 85 "F": AgilentHPLCChromatogram(), 86 "G": AgilentHPLCChromatogram(), 87 "H": AgilentHPLCChromatogram(), 88 } 89 self.uv: Dict[int, AgilentHPLCChromatogram] = {} 90 self.data_files: List = [] 91 92 def __new__(cls, *args, **kwargs): 93 if cls is RunController: 94 raise TypeError(f"only children of '{cls.__name__}' may be instantiated") 95 return object.__new__(cls) 96 97 @abc.abstractmethod 98 def _fuzzy_match_most_recent_folder(self, most_recent_folder: T) -> Result[T, str]: 99 pass 100 101 @abc.abstractmethod 102 def get_data( 103 self, custom_path: Optional[str] = None 104 ) -> Union[List[AgilentChannelChromatogramData], AgilentChannelChromatogramData]: 105 pass 106 107 @abc.abstractmethod 108 def get_data_uv( 109 self, custom_path: str | None = None 110 ) -> Dict[int, AgilentHPLCChromatogram]: 111 pass 112 113 @abc.abstractmethod 114 def get_report( 115 self, custom_path: str, report_type: ReportType = ReportType.TXT 116 ) -> List[AgilentReport]: 117 pass 118 119 def check_hplc_is_running(self) -> bool: 120 if self.controller: 121 try: 122 started_running = polling.poll( 123 lambda: isinstance(self.controller.get_status(), HPLCRunningStatus), 124 step=1, 125 max_tries=20, 126 ) 127 except Exception as e: 128 print(e) 129 return False 130 if started_running: 131 self.curr_run_starting_time = time.time() 132 return started_running 133 else: 134 raise ValueError("Controller is offline") 135 136 def check_hplc_run_finished(self) -> Tuple[float, bool]: 137 if self.controller: 138 try: 139 _, current_run_file = self.get_current_run_data_dir_file() 140 sample_file, extension, _ = current_run_file.partition(".D") 141 self.current_run_child_files.add(sample_file) 142 except Exception: 143 pass 144 done_running = self.controller.check_if_not_running() 145 if self.curr_run_starting_time and self.timeout: 146 time_passed = time.time() - self.curr_run_starting_time 147 if time_passed > self.timeout: 148 enough_time_passed = time_passed >= self.timeout 149 run_finished = enough_time_passed and done_running 150 if run_finished: 151 self._reset_time() 152 return 0, run_finished 153 else: 154 time_left = self.timeout - time_passed 155 return time_left, self.controller.check_if_not_running() 156 return 0, self.controller.check_if_not_running() 157 raise ValueError("Controller is offline!") 158 159 def check_hplc_done_running(self) -> Ok[T] | Err[str]: 160 """Checks if ChemStation has finished running and can read data back 161 162 :return: Data file object containing most recent run file information. 163 """ 164 self.current_run_child_files = set() 165 if self.timeout is not None: 166 finished_run = False 167 minutes = math.ceil(self.timeout / 60) 168 try: 169 finished_run = not polling.poll( 170 lambda: self.check_hplc_run_finished()[1], 171 max_tries=minutes - 1, 172 step=50, 173 ) 174 except ( 175 polling.TimeoutException, 176 polling.PollingException, 177 polling.MaxCallException, 178 ): 179 try: 180 finished_run = polling.poll( 181 lambda: self.check_hplc_run_finished()[1], 182 timeout=self.timeout / 2, 183 step=1, 184 ) 185 except ( 186 polling.TimeoutException, 187 polling.PollingException, 188 polling.MaxCallException, 189 ): 190 pass 191 else: 192 raise ValueError("Timeout value is None, no comparison can be made.") 193 194 check_folder = self._fuzzy_match_most_recent_folder(self.data_files[-1]) 195 if check_folder.is_ok() and finished_run: 196 return check_folder 197 elif check_folder.is_ok(): 198 try: 199 finished_run = polling.poll( 200 lambda: self.check_hplc_run_finished()[1], max_tries=10, step=50 201 ) 202 if finished_run: 203 return check_folder 204 except Exception: 205 self._reset_time() 206 return self.data_files[-1] 207 return Err("Run not may not have completed.") 208 209 def get_uv_spectrum(self, path: str): 210 data_uv = rb.agilent.chemstation.parse_file(os.path.join(path, "DAD1.UV")) 211 times = data_uv.xlabels 212 wavelengths = data_uv.ylabels 213 absorbances = data_uv.data.transpose() 214 for i, w in enumerate(wavelengths): 215 self.uv[w] = AgilentHPLCChromatogram() 216 self.uv[w].attach_spectrum(times, absorbances[i]) 217 218 def get_report_details( 219 self, path: str, report_type: ReportType = ReportType.TXT 220 ) -> AgilentReport: 221 if report_type is ReportType.TXT: 222 txt_report = TXTProcessor(path).process_report() 223 if txt_report.is_ok(): 224 return txt_report.ok_value 225 elif txt_report.is_err(): 226 raise ValueError(txt_report.err_value) 227 if report_type is ReportType.CSV: 228 csv_report = CSVProcessor(path).process_report() 229 if csv_report.is_ok(): 230 return csv_report.ok_value 231 elif csv_report.is_err(): 232 raise ValueError(csv_report.err_value) 233 raise ValueError("Expected one of ReportType.TXT or ReportType.CSV") 234 235 def get_spectrum_at_channels(self, data_path: str): 236 """Load chromatogram for any channel in spectra dictionary.""" 237 for channel, spec in self.spectra.items(): 238 try: 239 spec.load_spectrum(data_path=data_path, channel=channel) 240 except FileNotFoundError: 241 self.spectra[channel] = AgilentHPLCChromatogram() 242 warning = f"No data at channel: {channel}" 243 warnings.warn(warning) 244 245 def _reset_time(self): 246 self.curr_run_starting_time = None 247 self.timeout = None 248 249 def get_current_run_data_dir_file(self) -> Tuple[str, str]: 250 self.send(Command.GET_CURRENT_RUN_DATA_DIR) 251 full_path_name = self.receive() 252 self.send(Command.GET_CURRENT_RUN_DATA_FILE) 253 current_sample_file = self.receive() 254 if full_path_name.is_ok() and current_sample_file.is_ok(): 255 if os.path.isdir(full_path_name.ok_value.string_response) and os.path.isdir( 256 os.path.join( 257 full_path_name.ok_value.string_response, 258 current_sample_file.ok_value.string_response, 259 ) 260 ): 261 return ( 262 full_path_name.ok_value.string_response, 263 current_sample_file.ok_value.string_response, 264 ) 265 raise ValueError("Couldn't read data dir and file or doesn't exist yet.")
Abstract controller for all tables that can trigger runs on Chemstation.
Parameters
- controller: the controller for sending MACROs, must be initialized for a run to be triggered.
- src: complete directory path where files containing run parameters are stored.
- data_dirs: list of complete directories that Chemstation will write data to.
- table: contains register keys for accessing table in Chemstation.
- offline: whether the communication controller is online.
table_state: Union[pychemstation.utils.method_types.MethodDetails, pychemstation.utils.sequence_types.SequenceTable, NoneType]
spectra: dict[str, pychemstation.analysis.AgilentHPLCChromatogram]
uv: Dict[int, pychemstation.analysis.AgilentHPLCChromatogram]
@abc.abstractmethod
def
get_data( self, custom_path: Optional[str] = None) -> Union[List[pychemstation.analysis.AgilentChannelChromatogramData], pychemstation.analysis.AgilentChannelChromatogramData]:
@abc.abstractmethod
def
get_data_uv( self, custom_path: str | None = None) -> Dict[int, pychemstation.analysis.AgilentHPLCChromatogram]:
@abc.abstractmethod
def
get_report( self, custom_path: str, report_type: pychemstation.analysis.process_report.ReportType = <ReportType.TXT: 0>) -> List[pychemstation.analysis.process_report.AgilentReport]:
def
check_hplc_is_running(self) -> bool:
119 def check_hplc_is_running(self) -> bool: 120 if self.controller: 121 try: 122 started_running = polling.poll( 123 lambda: isinstance(self.controller.get_status(), HPLCRunningStatus), 124 step=1, 125 max_tries=20, 126 ) 127 except Exception as e: 128 print(e) 129 return False 130 if started_running: 131 self.curr_run_starting_time = time.time() 132 return started_running 133 else: 134 raise ValueError("Controller is offline")
def
check_hplc_run_finished(self) -> Tuple[float, bool]:
136 def check_hplc_run_finished(self) -> Tuple[float, bool]: 137 if self.controller: 138 try: 139 _, current_run_file = self.get_current_run_data_dir_file() 140 sample_file, extension, _ = current_run_file.partition(".D") 141 self.current_run_child_files.add(sample_file) 142 except Exception: 143 pass 144 done_running = self.controller.check_if_not_running() 145 if self.curr_run_starting_time and self.timeout: 146 time_passed = time.time() - self.curr_run_starting_time 147 if time_passed > self.timeout: 148 enough_time_passed = time_passed >= self.timeout 149 run_finished = enough_time_passed and done_running 150 if run_finished: 151 self._reset_time() 152 return 0, run_finished 153 else: 154 time_left = self.timeout - time_passed 155 return time_left, self.controller.check_if_not_running() 156 return 0, self.controller.check_if_not_running() 157 raise ValueError("Controller is offline!")
def
check_hplc_done_running(self) -> Union[result.result.Ok[~T], result.result.Err[str]]:
159 def check_hplc_done_running(self) -> Ok[T] | Err[str]: 160 """Checks if ChemStation has finished running and can read data back 161 162 :return: Data file object containing most recent run file information. 163 """ 164 self.current_run_child_files = set() 165 if self.timeout is not None: 166 finished_run = False 167 minutes = math.ceil(self.timeout / 60) 168 try: 169 finished_run = not polling.poll( 170 lambda: self.check_hplc_run_finished()[1], 171 max_tries=minutes - 1, 172 step=50, 173 ) 174 except ( 175 polling.TimeoutException, 176 polling.PollingException, 177 polling.MaxCallException, 178 ): 179 try: 180 finished_run = polling.poll( 181 lambda: self.check_hplc_run_finished()[1], 182 timeout=self.timeout / 2, 183 step=1, 184 ) 185 except ( 186 polling.TimeoutException, 187 polling.PollingException, 188 polling.MaxCallException, 189 ): 190 pass 191 else: 192 raise ValueError("Timeout value is None, no comparison can be made.") 193 194 check_folder = self._fuzzy_match_most_recent_folder(self.data_files[-1]) 195 if check_folder.is_ok() and finished_run: 196 return check_folder 197 elif check_folder.is_ok(): 198 try: 199 finished_run = polling.poll( 200 lambda: self.check_hplc_run_finished()[1], max_tries=10, step=50 201 ) 202 if finished_run: 203 return check_folder 204 except Exception: 205 self._reset_time() 206 return self.data_files[-1] 207 return Err("Run not may not have completed.")
Checks if ChemStation has finished running and can read data back
Returns
Data file object containing most recent run file information.
def
get_uv_spectrum(self, path: str):
209 def get_uv_spectrum(self, path: str): 210 data_uv = rb.agilent.chemstation.parse_file(os.path.join(path, "DAD1.UV")) 211 times = data_uv.xlabels 212 wavelengths = data_uv.ylabels 213 absorbances = data_uv.data.transpose() 214 for i, w in enumerate(wavelengths): 215 self.uv[w] = AgilentHPLCChromatogram() 216 self.uv[w].attach_spectrum(times, absorbances[i])
def
get_report_details( self, path: str, report_type: pychemstation.analysis.process_report.ReportType = <ReportType.TXT: 0>) -> pychemstation.analysis.process_report.AgilentReport:
218 def get_report_details( 219 self, path: str, report_type: ReportType = ReportType.TXT 220 ) -> AgilentReport: 221 if report_type is ReportType.TXT: 222 txt_report = TXTProcessor(path).process_report() 223 if txt_report.is_ok(): 224 return txt_report.ok_value 225 elif txt_report.is_err(): 226 raise ValueError(txt_report.err_value) 227 if report_type is ReportType.CSV: 228 csv_report = CSVProcessor(path).process_report() 229 if csv_report.is_ok(): 230 return csv_report.ok_value 231 elif csv_report.is_err(): 232 raise ValueError(csv_report.err_value) 233 raise ValueError("Expected one of ReportType.TXT or ReportType.CSV")
def
get_spectrum_at_channels(self, data_path: str):
235 def get_spectrum_at_channels(self, data_path: str): 236 """Load chromatogram for any channel in spectra dictionary.""" 237 for channel, spec in self.spectra.items(): 238 try: 239 spec.load_spectrum(data_path=data_path, channel=channel) 240 except FileNotFoundError: 241 self.spectra[channel] = AgilentHPLCChromatogram() 242 warning = f"No data at channel: {channel}" 243 warnings.warn(warning)
Load chromatogram for any channel in spectra dictionary.
def
get_current_run_data_dir_file(self) -> Tuple[str, str]:
249 def get_current_run_data_dir_file(self) -> Tuple[str, str]: 250 self.send(Command.GET_CURRENT_RUN_DATA_DIR) 251 full_path_name = self.receive() 252 self.send(Command.GET_CURRENT_RUN_DATA_FILE) 253 current_sample_file = self.receive() 254 if full_path_name.is_ok() and current_sample_file.is_ok(): 255 if os.path.isdir(full_path_name.ok_value.string_response) and os.path.isdir( 256 os.path.join( 257 full_path_name.ok_value.string_response, 258 current_sample_file.ok_value.string_response, 259 ) 260 ): 261 return ( 262 full_path_name.ok_value.string_response, 263 current_sample_file.ok_value.string_response, 264 ) 265 raise ValueError("Couldn't read data dir and file or doesn't exist yet.")