pychemstation.control.controllers.data_aq
37class MethodController(RunController): 38 """Class containing method related logic.""" 39 40 def __init__( 41 self, 42 controller: Optional[CommunicationController], 43 src: Optional[str], 44 data_dirs: Optional[List[str]], 45 table: Table, 46 offline: bool, 47 injector: InjectorController, 48 pump: PumpController, 49 dad: DADController, 50 column: ColumnController, 51 sample_info: SampleInfo, 52 ): 53 self.injector = injector 54 self.pump = pump 55 self.dad = dad 56 self.column = column 57 self.sample_info = sample_info 58 self.data_files: List[str] = [] 59 super().__init__( 60 controller=controller, 61 src=src, 62 data_dirs=data_dirs, 63 table=table, 64 offline=offline, 65 ) 66 67 def get_sample_location(self) -> Tray: 68 return self.sample_info.get_location() 69 70 def get_current_method_name(self) -> str: 71 self.sleepy_send(Command.GET_METHOD_CMD) 72 res = self.receive() 73 if res.is_ok(): 74 return res.ok_value.string_response 75 return "ERROR" 76 77 def get_method_params(self) -> HPLCMethodParams: 78 if self.controller: 79 return HPLCMethodParams( 80 organic_modifier=self.get_om(), 81 flow=self.get_flow(), 82 ) 83 raise ValueError("Communication controller is offline!") 84 85 def get_row(self, row: int) -> TimeTableEntry: 86 function = self.get_text(row, RegisterFlag.FUNCTION) 87 if function == RegisterFlag.FLOW.value: 88 return TimeTableEntry( 89 start_time=self.get_num(row, RegisterFlag.TIME), 90 organic_modifer=None, 91 flow=self.get_num(row, RegisterFlag.TIMETABLE_FLOW), 92 ) 93 if function == RegisterFlag.SOLVENT_COMPOSITION.value: 94 return TimeTableEntry( 95 start_time=self.get_num(row, RegisterFlag.TIME), 96 organic_modifer=self.get_num( 97 row, RegisterFlag.TIMETABLE_SOLVENT_B_COMPOSITION 98 ), 99 flow=None, 100 ) 101 raise ValueError("Both flow and organic modifier are empty") 102 103 def get_timetable(self, rows: int): 104 uncoalesced_timetable_rows = [self.get_row(r + 1) for r in range(rows)] 105 timetable_rows: Dict[str, TimeTableEntry] = {} 106 for row in uncoalesced_timetable_rows: 107 time_key = str(row.start_time) 108 if time_key not in timetable_rows.keys(): 109 timetable_rows[time_key] = TimeTableEntry( 110 start_time=row.start_time, 111 flow=row.flow, 112 organic_modifer=row.organic_modifer, 113 ) 114 else: 115 if row.flow: 116 timetable_rows[time_key].flow = row.flow 117 if row.organic_modifer: 118 timetable_rows[time_key].organic_modifer = row.organic_modifer 119 entries = list(timetable_rows.values()) 120 entries.sort(key=lambda e: e.start_time) 121 return entries 122 123 def load(self) -> MethodDetails: 124 rows = self.get_row_count_safely() 125 method_name = self.get_method_name() 126 timetable_rows = self.get_timetable(rows) 127 params = self.get_method_params() 128 stop_time = self.get_stop_time() 129 post_time = self.get_post_time() 130 self.table_state = MethodDetails( 131 name=method_name, 132 timetable=timetable_rows, 133 stop_time=stop_time, 134 post_time=post_time, 135 params=params, 136 ) 137 return self.table_state 138 139 def get_method_name(self): 140 self.send(Command.GET_METHOD_CMD) 141 res = self.receive() 142 method_name = res.ok_value.string_response 143 return method_name 144 145 def get_total_runtime(self) -> Union[int, float]: 146 """Returns total method runtime in minutes.""" 147 return self.get_post_time() + self.get_stop_time() 148 149 def current_method(self, method_name: str): 150 """ 151 Checks if a given method is already loaded into Chemstation. Method name does not need the ".M" extension. 152 153 :param method_name: a Chemstation method 154 :return: True if method is already loaded 155 """ 156 self.send(Command.GET_METHOD_CMD) 157 parsed_response = self.receive() 158 return method_name in parsed_response 159 160 def switch(self, method_name: str, alt_method_dir: Optional[str] = None): 161 """ 162 Allows the user to switch between pre-programmed methods. No need to append '.M' 163 to the end of the method name. For example. for the method named 'General-Poroshell.M', 164 only 'General-Poroshell' is needed. 165 166 :param method_name: any available method in Chemstation method directory 167 :param alt_method_dir: directory where the method resides 168 :raise IndexError: Response did not have expected format. Try again. 169 :raise AssertionError: The desired method is not selected. Try again. 170 """ 171 method_dir = self.src if not alt_method_dir else alt_method_dir 172 self.send( 173 Command.SWITCH_METHOD_CMD_SPECIFIC.value.format( 174 method_dir=method_dir, method_name=method_name 175 ) 176 ) 177 time.sleep(2) 178 self.send(Command.GET_METHOD_CMD) 179 time.sleep(2) 180 res = self.receive() 181 if res.is_ok(): 182 parsed_response = res.ok_value.string_response 183 assert parsed_response == f"{method_name}.M", "Switching Methods failed." 184 self.table_state = None 185 186 def edit(self, updated_method: MethodDetails, save: bool): 187 """Updated the currently loaded method in ChemStation with provided values. 188 189 :param updated_method: the method with updated values, to be sent to Chemstation to modify the currently loaded method. 190 :param save: if false only modifies the method, otherwise saves to disk 191 """ 192 self.table_state = updated_method 193 # Method settings required for all runs 194 self.update_method_params( 195 new_flow=updated_method.params.flow, 196 new_initial_om=updated_method.params.organic_modifier, 197 new_stop_time=updated_method.stop_time, 198 new_post_time=updated_method.post_time, 199 ) 200 self.validate_timetable(updated_method.timetable) 201 self.edit_method_timetable(updated_method.timetable) 202 203 if save: 204 self.save_method() 205 206 def save_method(self): 207 self.send( 208 Command.SAVE_METHOD_CMD.value.format( 209 commit_msg=f"saved method at {str(time.time())}" 210 ) 211 ) 212 213 def _validate_organic_modifier(self, new_om): 214 if not (isinstance(new_om, int) or isinstance(new_om, float)): 215 raise ValueError("Organic modifier must be int or float") 216 if new_om < 0: 217 raise ValueError("Organic modifier must be positive") 218 if new_om > 100: 219 raise ValueError("Organic modifer must be less than 100.") 220 221 def _validate_flow(self, new_flow): 222 if not (isinstance(new_flow, int) or isinstance(new_flow, float)): 223 raise ValueError("Flow must be int or float") 224 if new_flow < 0: 225 raise ValueError("Flow must be positive") 226 if new_flow >= 5.0: 227 raise ValueError("Flow must be less than 5.0") 228 229 def validate_stop_time(self, new_stop_time): 230 if not (isinstance(new_stop_time, int) or isinstance(new_stop_time, float)): 231 raise ValueError("Stop time must be int or float") 232 if new_stop_time < 0: 233 raise ValueError("Stop time must be positive") 234 235 def validate_post_time(self, new_post_time): 236 if not (isinstance(new_post_time, int) or isinstance(new_post_time, float)): 237 raise ValueError("Post time must be int or float") 238 if new_post_time < 0: 239 raise ValueError("Post time must be positive") 240 241 def update_method_params( 242 self, 243 new_flow: Union[int, float], 244 new_initial_om: Union[int, float], 245 new_stop_time: Union[int, float] | None, 246 new_post_time: Union[int, float] | None, 247 ): 248 self.delete_table() 249 self._validate_flow(new_flow) 250 self.validate_post_time(new_post_time) 251 self._validate_organic_modifier(new_initial_om) 252 self.validate_stop_time(new_stop_time) 253 self.edit_flow(new_flow) 254 self.edit_initial_om(new_initial_om) 255 self.edit_stop_time(new_stop_time) 256 self.edit_post_time(new_post_time) 257 258 def download(self): 259 self.sleepy_send("DownloadRCMethod PMP1") 260 261 def _edit_row( 262 self, 263 row: TimeTableEntry, 264 first_row: bool, 265 time_added: bool, 266 flow_added: bool, 267 om_added: bool, 268 function_added: bool, 269 ) -> Tuple[bool, bool, bool, bool]: 270 def add_time(): 271 nonlocal time_added 272 nonlocal first_row 273 if not time_added: 274 self.add_new_col_num(col_name=RegisterFlag.TIME, val=row.start_time) 275 time_added = True 276 elif not first_row: 277 self._edit_row_num(col_name=RegisterFlag.TIME, val=row.start_time) 278 279 def add_flow(): 280 nonlocal flow_added 281 nonlocal function_added 282 if not flow_added: 283 if not function_added: 284 self.add_new_col_text( 285 col_name=RegisterFlag.FUNCTION, 286 val=RegisterFlag.FLOW.value, 287 ) 288 function_added = True 289 else: 290 self._edit_row_text( 291 col_name=RegisterFlag.FUNCTION, 292 val=RegisterFlag.FLOW.value, 293 ) 294 self.add_new_col_num( 295 col_name=RegisterFlag.TIMETABLE_FLOW, 296 val=row.flow, 297 ) 298 flow_added = True 299 else: 300 self._edit_row_text( 301 col_name=RegisterFlag.FUNCTION, val=RegisterFlag.FLOW.value 302 ) 303 self._edit_row_num(col_name=RegisterFlag.TIMETABLE_FLOW, val=row.flow) 304 305 def add_om(): 306 nonlocal om_added 307 nonlocal function_added 308 if not om_added: 309 if not function_added: 310 self.add_new_col_text( 311 col_name=RegisterFlag.FUNCTION, 312 val=RegisterFlag.SOLVENT_COMPOSITION.value, 313 ) 314 function_added = True 315 else: 316 self._edit_row_text( 317 col_name=RegisterFlag.FUNCTION, 318 val=RegisterFlag.SOLVENT_COMPOSITION.value, 319 ) 320 self.add_new_col_num( 321 col_name=RegisterFlag.TIMETABLE_SOLVENT_B_COMPOSITION, 322 val=row.organic_modifer, 323 ) 324 om_added = True 325 else: 326 self._edit_row_text( 327 col_name=RegisterFlag.FUNCTION, 328 val=RegisterFlag.SOLVENT_COMPOSITION.value, 329 ) 330 self._edit_row_num( 331 col_name=RegisterFlag.TIMETABLE_SOLVENT_B_COMPOSITION, 332 val=row.organic_modifer, 333 ) 334 335 if row.organic_modifer: 336 self.add_row() 337 add_om() 338 add_time() 339 if row.flow: 340 self.add_row() 341 add_flow() 342 add_time() 343 self.download() 344 return time_added, flow_added, om_added, function_added 345 346 def edit_method_timetable(self, timetable_rows: List[TimeTableEntry]): 347 self.get_num_rows() 348 self.delete_table() 349 res = self.get_num_rows() 350 while not res.is_err(): 351 self.delete_table() 352 res = self.get_num_rows() 353 354 self.new_table() 355 num_rows = self.get_row_count_safely() 356 if num_rows != 0: 357 raise ValueError("Should be zero rows!") 358 359 time_added = False 360 flow_added = False 361 om_added = False 362 function_added = False 363 for i, row in enumerate(timetable_rows): 364 time_added, flow_added, om_added, function_added = self._edit_row( 365 row=row, 366 first_row=i == 0, 367 time_added=time_added, 368 flow_added=flow_added, 369 om_added=om_added, 370 function_added=function_added, 371 ) 372 373 def stop(self): 374 """ 375 Stops the method run. A dialog window will pop up and manual intervention may be required. 376 """ 377 self.send(Command.STOP_METHOD_CMD) 378 379 def run( 380 self, 381 experiment_name: str, 382 add_timestamp: bool = True, 383 stall_while_running: bool = True, 384 ): 385 """ 386 :param experiment_name: Name of the experiment 387 :param stall_while_running: whether to stall or immediately return 388 :param add_timestamp: if should add timestamp to experiment name 389 """ 390 hplc_is_running = False 391 tries = 0 392 while tries < 10 and not hplc_is_running: 393 timestamp = time.strftime(TIME_FORMAT) 394 self.send( 395 Command.RUN_METHOD_CMD.value.format( 396 data_dir=self.data_dirs[0], 397 experiment_name=f"{experiment_name}_{timestamp}" 398 if add_timestamp 399 else experiment_name, 400 ) 401 ) 402 403 hplc_is_running = self.check_hplc_is_running() 404 tries += 1 405 406 data_dir, data_file = self.get_current_run_data_dir_file() 407 if not hplc_is_running: 408 raise RuntimeError("Method failed to start.") 409 410 self.data_files.append(os.path.join(os.path.normpath(data_dir), data_file)) 411 self.timeout = (self.get_total_runtime()) * 60 412 413 if stall_while_running: 414 run_completed = self.check_hplc_done_running() 415 if run_completed.is_ok(): 416 self.data_files[-1] = run_completed.ok_value 417 else: 418 warnings.warn(run_completed.err_value) 419 else: 420 folder = self._fuzzy_match_most_recent_folder(self.data_files[-1]) 421 i = 0 422 while folder.is_err() and i < 10: 423 folder = self._fuzzy_match_most_recent_folder(self.data_files[-1]) 424 i += 1 425 if folder.is_ok(): 426 self.data_files[-1] = folder.ok_value 427 else: 428 warning = f"Data folder {self.data_files[-1]} may not exist, returning and will check again after run is done." 429 warnings.warn(warning) 430 431 def _fuzzy_match_most_recent_folder( 432 self, most_recent_folder: T 433 ) -> Result[str, str]: 434 if isinstance(most_recent_folder, str) or isinstance(most_recent_folder, bytes): 435 if os.path.exists(most_recent_folder): 436 return Ok(str(most_recent_folder)) 437 return Err("Folder not found!") 438 raise ValueError("Folder is not a str or byte type.") 439 440 def get_data( 441 self, custom_path: Optional[str] = None 442 ) -> AgilentChannelChromatogramData: 443 custom_path = custom_path if custom_path else self.data_files[-1] 444 self.get_spectrum_at_channels(custom_path) 445 return AgilentChannelChromatogramData.from_dict(self.spectra) 446 447 def get_data_uv( 448 self, custom_path: Optional[str] = None 449 ) -> dict[int, AgilentHPLCChromatogram]: 450 custom_path = custom_path if custom_path else self.data_files[-1] 451 self.get_uv_spectrum(custom_path) 452 return self.uv 453 454 def get_report( 455 self, 456 custom_path: Optional[str] = None, 457 report_type: ReportType = ReportType.TXT, 458 ) -> List[AgilentReport]: 459 custom_path = self.data_files[-1] if not custom_path else custom_path 460 metd_report = self.get_report_details(custom_path, report_type) 461 chrom_data: List[AgilentHPLCChromatogram] = list( 462 self.get_data(custom_path).__dict__.values() 463 ) 464 for i, signal in enumerate(metd_report.signals): 465 possible_data = chrom_data[i] 466 if len(possible_data.x) > 0: 467 signal.data = possible_data 468 return [metd_report] 469 470 def _validate_row(self, row: TimeTableEntry): 471 if not (row.flow or row.organic_modifer): 472 raise ValueError( 473 "Require one of flow or organic modifier for the method timetable entry!" 474 ) 475 if row.flow: 476 self._validate_flow(row.flow) 477 if row.organic_modifer: 478 self._validate_organic_modifier(row.organic_modifer) 479 480 def validate_timetable(self, timetable: List[TimeTableEntry]): 481 start_time = 0.0 482 for i, row in enumerate(timetable): 483 if row.start_time > start_time: 484 start_time = row.start_time 485 elif row.start_time <= start_time: 486 raise ValueError( 487 f"""Every row's start time must be larger than the previous start time. 488 Row {i + 1} ({timetable[i].start_time}) has a smaller or equal starttime than row {i} ({start_time})""" 489 ) 490 self._validate_row(row) 491 492 def get_om(self): 493 return self._read_num_param(RegisterFlag.SOLVENT_B_COMPOSITION) 494 495 def get_flow(self): 496 return self._read_num_param(RegisterFlag.FLOW) 497 498 def get_post_time(self) -> Union[int, float]: 499 return self._read_num_param(RegisterFlag.POST_TIME) 500 501 def get_stop_time(self) -> Union[int, float]: 502 return self._read_num_param(RegisterFlag.MAX_TIME) 503 504 def edit_post_time(self, new_post_time: Optional[int | float]): 505 if new_post_time: 506 post_time: Param = Param( 507 val=new_post_time, 508 chemstation_key=RegisterFlag.POST_TIME, 509 ptype=PType.NUM, 510 ) 511 self._update_param( 512 Param( 513 val="Set", 514 chemstation_key=RegisterFlag.POSTIME_MODE, 515 ptype=PType.STR, 516 ) 517 ) 518 self._update_param(post_time) 519 else: 520 self._update_param( 521 Param( 522 val="Off", 523 chemstation_key=RegisterFlag.POSTIME_MODE, 524 ptype=PType.STR, 525 ) 526 ) 527 528 def edit_stop_time(self, new_stop_time: Optional[int | float]): 529 if new_stop_time: 530 stop_time: Param = Param( 531 val=new_stop_time, 532 chemstation_key=RegisterFlag.MAX_TIME, 533 ptype=PType.NUM, 534 ) 535 self._update_param( 536 Param( 537 val="Set", 538 chemstation_key=RegisterFlag.STOPTIME_MODE, 539 ptype=PType.STR, 540 ) 541 ) 542 self._update_param(stop_time) 543 else: 544 self._update_param( 545 Param( 546 val="Off", 547 chemstation_key=RegisterFlag.STOPTIME_MODE, 548 ptype=PType.STR, 549 ) 550 ) 551 552 def edit_flow(self, new_flow: Union[int, float]): 553 flow: Param = Param( 554 val=new_flow, chemstation_key=RegisterFlag.FLOW, ptype=PType.NUM 555 ) 556 self._update_param(flow) 557 558 def edit_initial_om(self, new_om: Union[int, float]): 559 initial_organic_modifier: Param = Param( 560 val=new_om, 561 chemstation_key=RegisterFlag.SOLVENT_B_COMPOSITION, 562 ptype=PType.NUM, 563 ) 564 self._update_param(initial_organic_modifier)
Class containing method related logic.
40 def __init__( 41 self, 42 controller: Optional[CommunicationController], 43 src: Optional[str], 44 data_dirs: Optional[List[str]], 45 table: Table, 46 offline: bool, 47 injector: InjectorController, 48 pump: PumpController, 49 dad: DADController, 50 column: ColumnController, 51 sample_info: SampleInfo, 52 ): 53 self.injector = injector 54 self.pump = pump 55 self.dad = dad 56 self.column = column 57 self.sample_info = sample_info 58 self.data_files: List[str] = [] 59 super().__init__( 60 controller=controller, 61 src=src, 62 data_dirs=data_dirs, 63 table=table, 64 offline=offline, 65 )
85 def get_row(self, row: int) -> TimeTableEntry: 86 function = self.get_text(row, RegisterFlag.FUNCTION) 87 if function == RegisterFlag.FLOW.value: 88 return TimeTableEntry( 89 start_time=self.get_num(row, RegisterFlag.TIME), 90 organic_modifer=None, 91 flow=self.get_num(row, RegisterFlag.TIMETABLE_FLOW), 92 ) 93 if function == RegisterFlag.SOLVENT_COMPOSITION.value: 94 return TimeTableEntry( 95 start_time=self.get_num(row, RegisterFlag.TIME), 96 organic_modifer=self.get_num( 97 row, RegisterFlag.TIMETABLE_SOLVENT_B_COMPOSITION 98 ), 99 flow=None, 100 ) 101 raise ValueError("Both flow and organic modifier are empty")
103 def get_timetable(self, rows: int): 104 uncoalesced_timetable_rows = [self.get_row(r + 1) for r in range(rows)] 105 timetable_rows: Dict[str, TimeTableEntry] = {} 106 for row in uncoalesced_timetable_rows: 107 time_key = str(row.start_time) 108 if time_key not in timetable_rows.keys(): 109 timetable_rows[time_key] = TimeTableEntry( 110 start_time=row.start_time, 111 flow=row.flow, 112 organic_modifer=row.organic_modifer, 113 ) 114 else: 115 if row.flow: 116 timetable_rows[time_key].flow = row.flow 117 if row.organic_modifer: 118 timetable_rows[time_key].organic_modifer = row.organic_modifer 119 entries = list(timetable_rows.values()) 120 entries.sort(key=lambda e: e.start_time) 121 return entries
123 def load(self) -> MethodDetails: 124 rows = self.get_row_count_safely() 125 method_name = self.get_method_name() 126 timetable_rows = self.get_timetable(rows) 127 params = self.get_method_params() 128 stop_time = self.get_stop_time() 129 post_time = self.get_post_time() 130 self.table_state = MethodDetails( 131 name=method_name, 132 timetable=timetable_rows, 133 stop_time=stop_time, 134 post_time=post_time, 135 params=params, 136 ) 137 return self.table_state
145 def get_total_runtime(self) -> Union[int, float]: 146 """Returns total method runtime in minutes.""" 147 return self.get_post_time() + self.get_stop_time()
Returns total method runtime in minutes.
149 def current_method(self, method_name: str): 150 """ 151 Checks if a given method is already loaded into Chemstation. Method name does not need the ".M" extension. 152 153 :param method_name: a Chemstation method 154 :return: True if method is already loaded 155 """ 156 self.send(Command.GET_METHOD_CMD) 157 parsed_response = self.receive() 158 return method_name in parsed_response
Checks if a given method is already loaded into Chemstation. Method name does not need the ".M" extension.
Parameters
- method_name: a Chemstation method
Returns
True if method is already loaded
160 def switch(self, method_name: str, alt_method_dir: Optional[str] = None): 161 """ 162 Allows the user to switch between pre-programmed methods. No need to append '.M' 163 to the end of the method name. For example. for the method named 'General-Poroshell.M', 164 only 'General-Poroshell' is needed. 165 166 :param method_name: any available method in Chemstation method directory 167 :param alt_method_dir: directory where the method resides 168 :raise IndexError: Response did not have expected format. Try again. 169 :raise AssertionError: The desired method is not selected. Try again. 170 """ 171 method_dir = self.src if not alt_method_dir else alt_method_dir 172 self.send( 173 Command.SWITCH_METHOD_CMD_SPECIFIC.value.format( 174 method_dir=method_dir, method_name=method_name 175 ) 176 ) 177 time.sleep(2) 178 self.send(Command.GET_METHOD_CMD) 179 time.sleep(2) 180 res = self.receive() 181 if res.is_ok(): 182 parsed_response = res.ok_value.string_response 183 assert parsed_response == f"{method_name}.M", "Switching Methods failed." 184 self.table_state = None
Allows the user to switch between pre-programmed methods. No need to append '.M' to the end of the method name. For example. for the method named 'General-Poroshell.M', only 'General-Poroshell' is needed.
Parameters
- method_name: any available method in Chemstation method directory
- alt_method_dir: directory where the method resides :raise IndexError: Response did not have expected format. Try again. :raise AssertionError: The desired method is not selected. Try again.
186 def edit(self, updated_method: MethodDetails, save: bool): 187 """Updated the currently loaded method in ChemStation with provided values. 188 189 :param updated_method: the method with updated values, to be sent to Chemstation to modify the currently loaded method. 190 :param save: if false only modifies the method, otherwise saves to disk 191 """ 192 self.table_state = updated_method 193 # Method settings required for all runs 194 self.update_method_params( 195 new_flow=updated_method.params.flow, 196 new_initial_om=updated_method.params.organic_modifier, 197 new_stop_time=updated_method.stop_time, 198 new_post_time=updated_method.post_time, 199 ) 200 self.validate_timetable(updated_method.timetable) 201 self.edit_method_timetable(updated_method.timetable) 202 203 if save: 204 self.save_method()
Updated the currently loaded method in ChemStation with provided values.
Parameters
- updated_method: the method with updated values, to be sent to Chemstation to modify the currently loaded method.
- save: if false only modifies the method, otherwise saves to disk
241 def update_method_params( 242 self, 243 new_flow: Union[int, float], 244 new_initial_om: Union[int, float], 245 new_stop_time: Union[int, float] | None, 246 new_post_time: Union[int, float] | None, 247 ): 248 self.delete_table() 249 self._validate_flow(new_flow) 250 self.validate_post_time(new_post_time) 251 self._validate_organic_modifier(new_initial_om) 252 self.validate_stop_time(new_stop_time) 253 self.edit_flow(new_flow) 254 self.edit_initial_om(new_initial_om) 255 self.edit_stop_time(new_stop_time) 256 self.edit_post_time(new_post_time)
346 def edit_method_timetable(self, timetable_rows: List[TimeTableEntry]): 347 self.get_num_rows() 348 self.delete_table() 349 res = self.get_num_rows() 350 while not res.is_err(): 351 self.delete_table() 352 res = self.get_num_rows() 353 354 self.new_table() 355 num_rows = self.get_row_count_safely() 356 if num_rows != 0: 357 raise ValueError("Should be zero rows!") 358 359 time_added = False 360 flow_added = False 361 om_added = False 362 function_added = False 363 for i, row in enumerate(timetable_rows): 364 time_added, flow_added, om_added, function_added = self._edit_row( 365 row=row, 366 first_row=i == 0, 367 time_added=time_added, 368 flow_added=flow_added, 369 om_added=om_added, 370 function_added=function_added, 371 )
373 def stop(self): 374 """ 375 Stops the method run. A dialog window will pop up and manual intervention may be required. 376 """ 377 self.send(Command.STOP_METHOD_CMD)
Stops the method run. A dialog window will pop up and manual intervention may be required.
379 def run( 380 self, 381 experiment_name: str, 382 add_timestamp: bool = True, 383 stall_while_running: bool = True, 384 ): 385 """ 386 :param experiment_name: Name of the experiment 387 :param stall_while_running: whether to stall or immediately return 388 :param add_timestamp: if should add timestamp to experiment name 389 """ 390 hplc_is_running = False 391 tries = 0 392 while tries < 10 and not hplc_is_running: 393 timestamp = time.strftime(TIME_FORMAT) 394 self.send( 395 Command.RUN_METHOD_CMD.value.format( 396 data_dir=self.data_dirs[0], 397 experiment_name=f"{experiment_name}_{timestamp}" 398 if add_timestamp 399 else experiment_name, 400 ) 401 ) 402 403 hplc_is_running = self.check_hplc_is_running() 404 tries += 1 405 406 data_dir, data_file = self.get_current_run_data_dir_file() 407 if not hplc_is_running: 408 raise RuntimeError("Method failed to start.") 409 410 self.data_files.append(os.path.join(os.path.normpath(data_dir), data_file)) 411 self.timeout = (self.get_total_runtime()) * 60 412 413 if stall_while_running: 414 run_completed = self.check_hplc_done_running() 415 if run_completed.is_ok(): 416 self.data_files[-1] = run_completed.ok_value 417 else: 418 warnings.warn(run_completed.err_value) 419 else: 420 folder = self._fuzzy_match_most_recent_folder(self.data_files[-1]) 421 i = 0 422 while folder.is_err() and i < 10: 423 folder = self._fuzzy_match_most_recent_folder(self.data_files[-1]) 424 i += 1 425 if folder.is_ok(): 426 self.data_files[-1] = folder.ok_value 427 else: 428 warning = f"Data folder {self.data_files[-1]} may not exist, returning and will check again after run is done." 429 warnings.warn(warning)
Parameters
- experiment_name: Name of the experiment
- stall_while_running: whether to stall or immediately return
- add_timestamp: if should add timestamp to experiment name
454 def get_report( 455 self, 456 custom_path: Optional[str] = None, 457 report_type: ReportType = ReportType.TXT, 458 ) -> List[AgilentReport]: 459 custom_path = self.data_files[-1] if not custom_path else custom_path 460 metd_report = self.get_report_details(custom_path, report_type) 461 chrom_data: List[AgilentHPLCChromatogram] = list( 462 self.get_data(custom_path).__dict__.values() 463 ) 464 for i, signal in enumerate(metd_report.signals): 465 possible_data = chrom_data[i] 466 if len(possible_data.x) > 0: 467 signal.data = possible_data 468 return [metd_report]
480 def validate_timetable(self, timetable: List[TimeTableEntry]): 481 start_time = 0.0 482 for i, row in enumerate(timetable): 483 if row.start_time > start_time: 484 start_time = row.start_time 485 elif row.start_time <= start_time: 486 raise ValueError( 487 f"""Every row's start time must be larger than the previous start time. 488 Row {i + 1} ({timetable[i].start_time}) has a smaller or equal starttime than row {i} ({start_time})""" 489 ) 490 self._validate_row(row)
504 def edit_post_time(self, new_post_time: Optional[int | float]): 505 if new_post_time: 506 post_time: Param = Param( 507 val=new_post_time, 508 chemstation_key=RegisterFlag.POST_TIME, 509 ptype=PType.NUM, 510 ) 511 self._update_param( 512 Param( 513 val="Set", 514 chemstation_key=RegisterFlag.POSTIME_MODE, 515 ptype=PType.STR, 516 ) 517 ) 518 self._update_param(post_time) 519 else: 520 self._update_param( 521 Param( 522 val="Off", 523 chemstation_key=RegisterFlag.POSTIME_MODE, 524 ptype=PType.STR, 525 ) 526 )
528 def edit_stop_time(self, new_stop_time: Optional[int | float]): 529 if new_stop_time: 530 stop_time: Param = Param( 531 val=new_stop_time, 532 chemstation_key=RegisterFlag.MAX_TIME, 533 ptype=PType.NUM, 534 ) 535 self._update_param( 536 Param( 537 val="Set", 538 chemstation_key=RegisterFlag.STOPTIME_MODE, 539 ptype=PType.STR, 540 ) 541 ) 542 self._update_param(stop_time) 543 else: 544 self._update_param( 545 Param( 546 val="Off", 547 chemstation_key=RegisterFlag.STOPTIME_MODE, 548 ptype=PType.STR, 549 ) 550 )
Inherited Members
34class SequenceController(RunController): 35 """ 36 Class containing sequence related logic 37 """ 38 39 def __init__( 40 self, 41 controller: Optional[CommunicationController], 42 method_controller: MethodController, 43 src: Optional[str], 44 data_dirs: Optional[List[str]], 45 table: Table, 46 offline: bool, 47 ): 48 self.method_controller = method_controller 49 self.data_files: List[SequenceDataFiles] = [] 50 super().__init__( 51 controller=controller, 52 src=src, 53 data_dirs=data_dirs, 54 table=table, 55 offline=offline, 56 ) 57 58 def load(self) -> SequenceTable: 59 rows = self.get_row_count_safely() 60 self.send(Command.GET_SEQUENCE_CMD) 61 seq_name = self.receive() 62 63 if seq_name.is_ok(): 64 self.table_state: SequenceTable = SequenceTable( 65 name=seq_name.ok_value.string_response.partition(".S")[0], 66 rows=[self.get_row(r + 1) for r in range(int(rows))], 67 ) 68 return self.table_state 69 else: 70 raise RuntimeError( 71 f"couldn't read rows or sequence name: {seq_name.err_value}" 72 ) 73 74 @staticmethod 75 def try_int(val: Any) -> Optional[int]: 76 try: 77 return int(val) 78 except ValueError: 79 return None 80 81 @staticmethod 82 def try_float(val: Any) -> Optional[float]: 83 try: 84 return float(val) 85 except ValueError: 86 return None 87 88 @staticmethod 89 def try_vial_location(val: Any) -> Tray: 90 try: 91 return VialBar(val) if val <= 10 else FiftyFourVialPlate.from_int(num=val) 92 except ValueError: 93 raise ValueError("Expected vial location, is empty.") 94 95 def get_row(self, row: int) -> SequenceEntry: 96 sample_name = self.get_sample_name(row) 97 vial_location = self.get_vial_location(row) 98 data_file = self.get_data_file(row) 99 method = self.get_method(row) 100 num_inj = self.get_num_inj(row) 101 inj_vol = self.get_inj_vol(row) 102 inj_source = self.get_inj_source(row) 103 sample_type = self.get_sample_type(row) 104 return SequenceEntry( 105 sample_name=sample_name, 106 vial_location=vial_location, 107 method=None if len(method) == 0 else method, 108 num_inj=num_inj, 109 inj_vol=inj_vol, 110 inj_source=inj_source, 111 sample_type=sample_type, 112 data_file=data_file, 113 ) 114 115 def get_sample_type(self, row): 116 return SampleType(self.get_num(row, RegisterFlag.SAMPLE_TYPE)) 117 118 def get_inj_source(self, row): 119 return InjectionSource(self.get_text(row, RegisterFlag.INJ_SOR)) 120 121 def get_inj_vol(self, row): 122 return self.try_float(self.get_text(row, RegisterFlag.INJ_VOL)) 123 124 def get_num_inj(self, row): 125 return self.try_int(self.get_num(row, RegisterFlag.NUM_INJ)) 126 127 def get_method(self, row): 128 return self.get_text(row, RegisterFlag.METHOD) 129 130 def get_data_file(self, row): 131 return self.get_text(row, RegisterFlag.DATA_FILE) 132 133 def get_vial_location(self, row) -> Tray: 134 return self.try_vial_location( 135 self.try_int(self.get_num(row, RegisterFlag.VIAL_LOCATION)) 136 ) 137 138 def get_sample_name(self, row): 139 return self.get_text(row, RegisterFlag.NAME) 140 141 def switch(self, seq_name: str): 142 """ 143 Switch to the specified sequence. The sequence name does not need the '.S' extension. 144 145 :param seq_name: The name of the sequence file 146 """ 147 self.send(f'_SeqFile$ = "{seq_name}.S"') 148 self.send(f'_SeqPath$ = "{self.src}"') 149 self.send(Command.SWITCH_SEQUENCE_CMD) 150 time.sleep(2) 151 parsed_response = self.get_current_sequence_name() 152 153 assert parsed_response == f"{seq_name}.S", "Switching sequence failed." 154 155 def get_current_sequence_name(self): 156 self.send(Command.GET_SEQUENCE_CMD) 157 time.sleep(2) 158 parsed_response = self.receive().ok_value.string_response 159 return parsed_response 160 161 def edit(self, sequence_table: SequenceTable): 162 """ 163 Updates the currently loaded sequence table with the provided table. This method will delete the existing sequence table and remake it. 164 If you would only like to edit a single row of a sequence table, use `edit_sequence_table_row` instead. 165 166 :param sequence_table: 167 """ 168 self.table_state = sequence_table 169 rows = self.get_row_count_safely() 170 existing_row_num = rows 171 wanted_row_num = len(sequence_table.rows) 172 for i in range(int(existing_row_num)): 173 self.delete_row(int(existing_row_num - i)) 174 self.send(Command.SAVE_SEQUENCE_CMD) 175 for i in range(int(wanted_row_num)): 176 self.add_row() 177 self.download() 178 self.send(Command.SWITCH_SEQUENCE_CMD) 179 for i, row in enumerate(sequence_table.rows): 180 self._edit_row(row=row, row_num=i + 1) 181 self.sleep(1) 182 self.download() 183 self.send(Command.SWITCH_SEQUENCE_CMD) 184 185 def _edit_row(self, row: SequenceEntry, row_num: int): 186 """ 187 Edits a row in the sequence table. If a row does NOT exist, a new one will be created. 188 189 :param row: sequence row entry with updated information 190 :param row_num: the row to edit, based on 1-based indexing 191 """ 192 num_rows = self.get_row_count_safely() 193 while num_rows < row_num: 194 self.add_row() 195 self.download() 196 num_rows = self.get_row_count_safely() 197 if row.vial_location: 198 self.edit_vial_location(row.vial_location, row_num, save=False) 199 if row.method: 200 self.edit_method_name(row.method, row_num, save=False) 201 if row.num_inj: 202 self.edit_num_injections(row.num_inj, row_num, save=False) 203 if row.inj_vol: 204 self.edit_injection_volume(row.inj_vol, row_num, save=False) 205 if row.inj_source: 206 self.edit_injection_source(row.inj_source, row_num, save=False) 207 if row.sample_name: 208 self.edit_sample_name(row.sample_name, row_num, save=False) 209 if row.data_file: 210 self.edit_data_file(row.data_file, row_num, save=False) 211 if row.sample_type: 212 self.edit_sample_type(row.sample_type, row_num, save=False) 213 self.download() 214 215 def edit_sample_type( 216 self, sample_type: SampleType, row_num: int, save: bool = True 217 ): 218 if not isinstance(sample_type, SampleType): 219 raise ValueError("`sample_type` should be of type `SampleType`") 220 self._edit_row_num( 221 row=row_num, 222 col_name=RegisterFlag.SAMPLE_TYPE, 223 val=sample_type.value, 224 ) 225 if save: 226 self.download() 227 228 def edit_data_file(self, data_file: str, row_num: int, save: bool = True): 229 self._edit_row_text(row=row_num, col_name=RegisterFlag.DATA_FILE, val=data_file) 230 if save: 231 self.download() 232 233 def edit_sample_name(self, sample_name: str, row_num: int, save: bool = True): 234 self._edit_row_text(row=row_num, col_name=RegisterFlag.NAME, val=sample_name) 235 if save: 236 self.download() 237 238 def edit_injection_source( 239 self, inj_source: InjectionSource, row_num: int, save: bool = True 240 ): 241 if not isinstance(inj_source, InjectionSource): 242 raise ValueError("`inj_source` should be of type `InjectionSource`") 243 self._edit_row_text( 244 row=row_num, col_name=RegisterFlag.INJ_SOR, val=inj_source.value 245 ) 246 if save: 247 self.download() 248 249 def edit_injection_volume( 250 self, inj_vol: Union[int, float], row_num: int, save: bool = True 251 ): 252 self._edit_row_text( 253 row=row_num, col_name=RegisterFlag.INJ_VOL, val=str(inj_vol) 254 ) 255 if save: 256 self.download() 257 258 def edit_num_injections(self, num_inj: int, row_num: int, save: bool = True): 259 self._edit_row_num(row=row_num, col_name=RegisterFlag.NUM_INJ, val=num_inj) 260 if save: 261 self.download() 262 263 def edit_method_name( 264 self, method: str, row_num: int, save: bool = True, override_check: bool = False 265 ): 266 method_dir = self.method_controller.src 267 possible_path = os.path.join(method_dir, method) + ".M\\" 268 if os.path.exists(possible_path): 269 method = os.path.join(method_dir, method) 270 elif not override_check: 271 raise ValueError( 272 "Method may not exist. If you would still like to use this method, set the `override_check` flag to `True`" 273 ) 274 self._edit_row_text(row=row_num, col_name=RegisterFlag.METHOD, val=method) 275 if save: 276 self.download() 277 278 def edit_vial_location(self, loc: Tray, row_num: int, save: bool = True): 279 loc_num = -1 280 try: 281 previous_contents = self.get_row(row_num) 282 if ( 283 isinstance(loc, VialBar) 284 and isinstance(previous_contents.vial_location, VialBar) 285 or isinstance(loc, FiftyFourVialPlate) 286 and isinstance(previous_contents.vial_location, FiftyFourVialPlate) 287 ): 288 if isinstance(loc, VialBar): 289 loc_num = loc.value 290 elif isinstance(loc, FiftyFourVialPlate): 291 loc_num = loc.value() 292 self._edit_row_num( 293 row=row_num, col_name=RegisterFlag.VIAL_LOCATION, val=loc_num 294 ) 295 elif isinstance(loc, VialBar) or isinstance(loc, FiftyFourVialPlate): 296 self.add_row() 297 previous_contents.vial_location = loc 298 num_rows = self.get_row_count_safely() 299 self._edit_row(previous_contents, num_rows) 300 self.move_row(int(num_rows), row_num) 301 self.delete_row(row_num + 1) 302 self.download() 303 else: 304 raise ValueError( 305 "`loc` should be of type `VialBar`, `FiftyFourVialPlate`" 306 ) 307 except Exception: 308 if not (isinstance(loc, VialBar) or isinstance(loc, FiftyFourVialPlate)): 309 raise ValueError( 310 "`loc` should be of type `VialBar`, `FiftyFourVialPlate`" 311 ) 312 if isinstance(loc, VialBar): 313 loc_num = loc.value 314 elif isinstance(loc, FiftyFourVialPlate): 315 loc_num = loc.value() 316 self._edit_row_num( 317 row=row_num, col_name=RegisterFlag.VIAL_LOCATION, val=loc_num 318 ) 319 if save: 320 self.download() 321 322 def download(self): 323 self.send(Command.SAVE_SEQUENCE_CMD) 324 325 def run(self, stall_while_running: bool = True): 326 """ 327 Starts the currently loaded sequence, storing data 328 under the <data_dir>/<sequence table name> folder. 329 Device must be ready. 330 """ 331 332 current_sequence_name = self.get_current_sequence_name() 333 if not self.table_state or self.table_state.name not in current_sequence_name: 334 self.table_state = self.load() 335 336 total_runtime = 0.0 337 for entry in self.table_state.rows: 338 curr_method_runtime = self.method_controller.get_total_runtime() 339 loaded_method = self.method_controller.get_method_name().removesuffix(".M") 340 if entry.method: 341 method_path = entry.method.split(sep="\\") 342 method_name = method_path[-1] 343 if loaded_method != method_name: 344 method_dir = ( 345 "\\".join(method_path[:-1]) + "\\" 346 if len(method_path) > 1 347 else None 348 ) 349 self.method_controller.switch( 350 method_name=method_name, alt_method_dir=method_dir 351 ) 352 curr_method_runtime = self.method_controller.get_total_runtime() 353 total_runtime += curr_method_runtime 354 355 timestamp = time.strftime(SEQUENCE_TIME_FORMAT) 356 folder_name = f"{self.table_state.name} {timestamp}" 357 358 self.send(Command.SAVE_METHOD_CMD) 359 self.send(Command.SAVE_SEQUENCE_CMD) 360 self.send(Command.RUN_SEQUENCE_CMD.value) 361 self.timeout = total_runtime * 60 362 363 tries = 10 364 hplc_running = False 365 for _ in range(tries): 366 hplc_running = self.check_hplc_is_running() 367 if hplc_running: 368 break 369 else: 370 self.send(Command.RUN_SEQUENCE_CMD.value) 371 372 if hplc_running: 373 full_path_name, current_sample_file = self.try_getting_run_info(folder_name) 374 if full_path_name and current_sample_file: 375 data_file = SequenceDataFiles( 376 sequence_name=self.table_state.name, 377 dir=full_path_name, 378 child_dirs=[os.path.join(full_path_name, current_sample_file)], 379 ) 380 self.data_files.append(data_file) 381 else: 382 raise ValueError("Data directory for sequence was not found.") 383 384 if stall_while_running: 385 run_completed = self.check_hplc_done_running() 386 if run_completed.is_ok(): 387 self.data_files[-1] = run_completed.ok_value 388 else: 389 warnings.warn(run_completed.err_value) 390 else: 391 raise RuntimeError("Sequence run may not have started.") 392 393 def try_getting_run_info(self, folder_name: str) -> Tuple[str, str | None]: 394 full_path_name, current_sample_file = None, None 395 for _ in range(5): 396 try: 397 full_path_name, current_sample_file = ( 398 self.get_current_run_data_dir_file() 399 ) 400 except ValueError: 401 pass 402 if current_sample_file and full_path_name: 403 return full_path_name, current_sample_file 404 elif full_path_name: 405 return full_path_name, None 406 raise ValueError("Could not get sequence data folder") 407 408 @override 409 def _fuzzy_match_most_recent_folder( 410 self, most_recent_folder: T 411 ) -> Result[SequenceDataFiles, str]: 412 if isinstance(most_recent_folder, SequenceDataFiles): 413 try: 414 if most_recent_folder.dir and os.path.isdir(most_recent_folder.dir): 415 subdirs = [x[0] for x in os.walk(most_recent_folder.dir)] 416 most_recent_folder.child_dirs = [ 417 f 418 for f in subdirs 419 if most_recent_folder.dir in f and ".D" in f and f[-1] == "D" 420 ] 421 return Ok(most_recent_folder) 422 else: 423 return Err("No sequence folder found, please give the full path.") 424 except Exception as e: 425 error = f"Failed to get sequence folder: {e}" 426 return Err(error) 427 return Err("Expected SequenceDataFile type.") 428 429 def get_data_mult_uv(self, custom_path: Optional[str] = None): 430 seq_data_dir = ( 431 SequenceDataFiles(dir=custom_path, sequence_name="") 432 if custom_path 433 else self.data_files[-1] 434 ) 435 search_folder = self._fuzzy_match_most_recent_folder(seq_data_dir) 436 if search_folder.is_ok(): 437 seq_data_dir = search_folder.ok_value 438 else: 439 raise FileNotFoundError(search_folder.err_value) 440 all_w_spectra: List[Dict[int, AgilentHPLCChromatogram]] = [] 441 for row in seq_data_dir.child_dirs: 442 all_w_spectra.append(self.get_data_uv(custom_path=row)) 443 return all_w_spectra 444 445 def get_data_uv( 446 self, custom_path: Optional[str] = None 447 ) -> Dict[int, AgilentHPLCChromatogram]: 448 if isinstance(custom_path, str): 449 self.get_uv_spectrum(custom_path) 450 return self.uv 451 raise ValueError( 452 "Path should exist when calling from sequence. Provide a child path (contains the method)." 453 ) 454 455 def get_data( 456 self, custom_path: Optional[str] = None 457 ) -> List[AgilentChannelChromatogramData]: 458 seq_file_dir = ( 459 SequenceDataFiles(dir=custom_path, sequence_name="") 460 if custom_path 461 else self.data_files[-1] 462 ) 463 self.data_files[-1] = self._fuzzy_match_most_recent_folder( 464 seq_file_dir 465 ).ok_value 466 spectra: List[AgilentChannelChromatogramData] = [] 467 for row in self.data_files[-1].child_dirs: 468 self.get_spectrum_at_channels(row) 469 spectra.append(AgilentChannelChromatogramData.from_dict(self.spectra)) 470 return spectra 471 472 def get_report( 473 self, 474 custom_path: Optional[str] = None, 475 report_type: ReportType = ReportType.TXT, 476 ) -> List[AgilentReport]: 477 if custom_path: 478 self.data_files.append( 479 self._fuzzy_match_most_recent_folder( 480 most_recent_folder=SequenceDataFiles( 481 dir=custom_path, 482 sequence_name="NA", 483 ), 484 ).ok_value 485 ) 486 parent_dir = self.data_files[-1] 487 parent_dir = self._fuzzy_match_most_recent_folder( 488 most_recent_folder=parent_dir, 489 ).ok_value 490 assert len(parent_dir.child_dirs) != 0 491 spectra = self.get_data() 492 reports = [] 493 for i, child_dir in enumerate(parent_dir.child_dirs): 494 metd_report = self.get_report_details(child_dir, report_type) 495 child_spectra: List[AgilentHPLCChromatogram] = list( 496 spectra[i].__dict__.values() 497 ) 498 for j, signal in enumerate(metd_report.signals): 499 assert len(metd_report.signals) <= len(child_spectra) 500 try: 501 possible_data = child_spectra[j] 502 if len(possible_data.x) > 0: 503 signal.data = possible_data 504 except IndexError: 505 raise ValueError(j) 506 reports.append(metd_report) 507 return reports
Class containing sequence related logic
39 def __init__( 40 self, 41 controller: Optional[CommunicationController], 42 method_controller: MethodController, 43 src: Optional[str], 44 data_dirs: Optional[List[str]], 45 table: Table, 46 offline: bool, 47 ): 48 self.method_controller = method_controller 49 self.data_files: List[SequenceDataFiles] = [] 50 super().__init__( 51 controller=controller, 52 src=src, 53 data_dirs=data_dirs, 54 table=table, 55 offline=offline, 56 )
58 def load(self) -> SequenceTable: 59 rows = self.get_row_count_safely() 60 self.send(Command.GET_SEQUENCE_CMD) 61 seq_name = self.receive() 62 63 if seq_name.is_ok(): 64 self.table_state: SequenceTable = SequenceTable( 65 name=seq_name.ok_value.string_response.partition(".S")[0], 66 rows=[self.get_row(r + 1) for r in range(int(rows))], 67 ) 68 return self.table_state 69 else: 70 raise RuntimeError( 71 f"couldn't read rows or sequence name: {seq_name.err_value}" 72 )
95 def get_row(self, row: int) -> SequenceEntry: 96 sample_name = self.get_sample_name(row) 97 vial_location = self.get_vial_location(row) 98 data_file = self.get_data_file(row) 99 method = self.get_method(row) 100 num_inj = self.get_num_inj(row) 101 inj_vol = self.get_inj_vol(row) 102 inj_source = self.get_inj_source(row) 103 sample_type = self.get_sample_type(row) 104 return SequenceEntry( 105 sample_name=sample_name, 106 vial_location=vial_location, 107 method=None if len(method) == 0 else method, 108 num_inj=num_inj, 109 inj_vol=inj_vol, 110 inj_source=inj_source, 111 sample_type=sample_type, 112 data_file=data_file, 113 )
141 def switch(self, seq_name: str): 142 """ 143 Switch to the specified sequence. The sequence name does not need the '.S' extension. 144 145 :param seq_name: The name of the sequence file 146 """ 147 self.send(f'_SeqFile$ = "{seq_name}.S"') 148 self.send(f'_SeqPath$ = "{self.src}"') 149 self.send(Command.SWITCH_SEQUENCE_CMD) 150 time.sleep(2) 151 parsed_response = self.get_current_sequence_name() 152 153 assert parsed_response == f"{seq_name}.S", "Switching sequence failed."
Switch to the specified sequence. The sequence name does not need the '.S' extension.
Parameters
- seq_name: The name of the sequence file
161 def edit(self, sequence_table: SequenceTable): 162 """ 163 Updates the currently loaded sequence table with the provided table. This method will delete the existing sequence table and remake it. 164 If you would only like to edit a single row of a sequence table, use `edit_sequence_table_row` instead. 165 166 :param sequence_table: 167 """ 168 self.table_state = sequence_table 169 rows = self.get_row_count_safely() 170 existing_row_num = rows 171 wanted_row_num = len(sequence_table.rows) 172 for i in range(int(existing_row_num)): 173 self.delete_row(int(existing_row_num - i)) 174 self.send(Command.SAVE_SEQUENCE_CMD) 175 for i in range(int(wanted_row_num)): 176 self.add_row() 177 self.download() 178 self.send(Command.SWITCH_SEQUENCE_CMD) 179 for i, row in enumerate(sequence_table.rows): 180 self._edit_row(row=row, row_num=i + 1) 181 self.sleep(1) 182 self.download() 183 self.send(Command.SWITCH_SEQUENCE_CMD)
Updates the currently loaded sequence table with the provided table. This method will delete the existing sequence table and remake it.
If you would only like to edit a single row of a sequence table, use edit_sequence_table_row instead.
Parameters
- sequence_table:
215 def edit_sample_type( 216 self, sample_type: SampleType, row_num: int, save: bool = True 217 ): 218 if not isinstance(sample_type, SampleType): 219 raise ValueError("`sample_type` should be of type `SampleType`") 220 self._edit_row_num( 221 row=row_num, 222 col_name=RegisterFlag.SAMPLE_TYPE, 223 val=sample_type.value, 224 ) 225 if save: 226 self.download()
238 def edit_injection_source( 239 self, inj_source: InjectionSource, row_num: int, save: bool = True 240 ): 241 if not isinstance(inj_source, InjectionSource): 242 raise ValueError("`inj_source` should be of type `InjectionSource`") 243 self._edit_row_text( 244 row=row_num, col_name=RegisterFlag.INJ_SOR, val=inj_source.value 245 ) 246 if save: 247 self.download()
263 def edit_method_name( 264 self, method: str, row_num: int, save: bool = True, override_check: bool = False 265 ): 266 method_dir = self.method_controller.src 267 possible_path = os.path.join(method_dir, method) + ".M\\" 268 if os.path.exists(possible_path): 269 method = os.path.join(method_dir, method) 270 elif not override_check: 271 raise ValueError( 272 "Method may not exist. If you would still like to use this method, set the `override_check` flag to `True`" 273 ) 274 self._edit_row_text(row=row_num, col_name=RegisterFlag.METHOD, val=method) 275 if save: 276 self.download()
278 def edit_vial_location(self, loc: Tray, row_num: int, save: bool = True): 279 loc_num = -1 280 try: 281 previous_contents = self.get_row(row_num) 282 if ( 283 isinstance(loc, VialBar) 284 and isinstance(previous_contents.vial_location, VialBar) 285 or isinstance(loc, FiftyFourVialPlate) 286 and isinstance(previous_contents.vial_location, FiftyFourVialPlate) 287 ): 288 if isinstance(loc, VialBar): 289 loc_num = loc.value 290 elif isinstance(loc, FiftyFourVialPlate): 291 loc_num = loc.value() 292 self._edit_row_num( 293 row=row_num, col_name=RegisterFlag.VIAL_LOCATION, val=loc_num 294 ) 295 elif isinstance(loc, VialBar) or isinstance(loc, FiftyFourVialPlate): 296 self.add_row() 297 previous_contents.vial_location = loc 298 num_rows = self.get_row_count_safely() 299 self._edit_row(previous_contents, num_rows) 300 self.move_row(int(num_rows), row_num) 301 self.delete_row(row_num + 1) 302 self.download() 303 else: 304 raise ValueError( 305 "`loc` should be of type `VialBar`, `FiftyFourVialPlate`" 306 ) 307 except Exception: 308 if not (isinstance(loc, VialBar) or isinstance(loc, FiftyFourVialPlate)): 309 raise ValueError( 310 "`loc` should be of type `VialBar`, `FiftyFourVialPlate`" 311 ) 312 if isinstance(loc, VialBar): 313 loc_num = loc.value 314 elif isinstance(loc, FiftyFourVialPlate): 315 loc_num = loc.value() 316 self._edit_row_num( 317 row=row_num, col_name=RegisterFlag.VIAL_LOCATION, val=loc_num 318 ) 319 if save: 320 self.download()
325 def run(self, stall_while_running: bool = True): 326 """ 327 Starts the currently loaded sequence, storing data 328 under the <data_dir>/<sequence table name> folder. 329 Device must be ready. 330 """ 331 332 current_sequence_name = self.get_current_sequence_name() 333 if not self.table_state or self.table_state.name not in current_sequence_name: 334 self.table_state = self.load() 335 336 total_runtime = 0.0 337 for entry in self.table_state.rows: 338 curr_method_runtime = self.method_controller.get_total_runtime() 339 loaded_method = self.method_controller.get_method_name().removesuffix(".M") 340 if entry.method: 341 method_path = entry.method.split(sep="\\") 342 method_name = method_path[-1] 343 if loaded_method != method_name: 344 method_dir = ( 345 "\\".join(method_path[:-1]) + "\\" 346 if len(method_path) > 1 347 else None 348 ) 349 self.method_controller.switch( 350 method_name=method_name, alt_method_dir=method_dir 351 ) 352 curr_method_runtime = self.method_controller.get_total_runtime() 353 total_runtime += curr_method_runtime 354 355 timestamp = time.strftime(SEQUENCE_TIME_FORMAT) 356 folder_name = f"{self.table_state.name} {timestamp}" 357 358 self.send(Command.SAVE_METHOD_CMD) 359 self.send(Command.SAVE_SEQUENCE_CMD) 360 self.send(Command.RUN_SEQUENCE_CMD.value) 361 self.timeout = total_runtime * 60 362 363 tries = 10 364 hplc_running = False 365 for _ in range(tries): 366 hplc_running = self.check_hplc_is_running() 367 if hplc_running: 368 break 369 else: 370 self.send(Command.RUN_SEQUENCE_CMD.value) 371 372 if hplc_running: 373 full_path_name, current_sample_file = self.try_getting_run_info(folder_name) 374 if full_path_name and current_sample_file: 375 data_file = SequenceDataFiles( 376 sequence_name=self.table_state.name, 377 dir=full_path_name, 378 child_dirs=[os.path.join(full_path_name, current_sample_file)], 379 ) 380 self.data_files.append(data_file) 381 else: 382 raise ValueError("Data directory for sequence was not found.") 383 384 if stall_while_running: 385 run_completed = self.check_hplc_done_running() 386 if run_completed.is_ok(): 387 self.data_files[-1] = run_completed.ok_value 388 else: 389 warnings.warn(run_completed.err_value) 390 else: 391 raise RuntimeError("Sequence run may not have started.")
Starts the currently loaded sequence, storing data
under the
393 def try_getting_run_info(self, folder_name: str) -> Tuple[str, str | None]: 394 full_path_name, current_sample_file = None, None 395 for _ in range(5): 396 try: 397 full_path_name, current_sample_file = ( 398 self.get_current_run_data_dir_file() 399 ) 400 except ValueError: 401 pass 402 if current_sample_file and full_path_name: 403 return full_path_name, current_sample_file 404 elif full_path_name: 405 return full_path_name, None 406 raise ValueError("Could not get sequence data folder")
429 def get_data_mult_uv(self, custom_path: Optional[str] = None): 430 seq_data_dir = ( 431 SequenceDataFiles(dir=custom_path, sequence_name="") 432 if custom_path 433 else self.data_files[-1] 434 ) 435 search_folder = self._fuzzy_match_most_recent_folder(seq_data_dir) 436 if search_folder.is_ok(): 437 seq_data_dir = search_folder.ok_value 438 else: 439 raise FileNotFoundError(search_folder.err_value) 440 all_w_spectra: List[Dict[int, AgilentHPLCChromatogram]] = [] 441 for row in seq_data_dir.child_dirs: 442 all_w_spectra.append(self.get_data_uv(custom_path=row)) 443 return all_w_spectra
445 def get_data_uv( 446 self, custom_path: Optional[str] = None 447 ) -> Dict[int, AgilentHPLCChromatogram]: 448 if isinstance(custom_path, str): 449 self.get_uv_spectrum(custom_path) 450 return self.uv 451 raise ValueError( 452 "Path should exist when calling from sequence. Provide a child path (contains the method)." 453 )
455 def get_data( 456 self, custom_path: Optional[str] = None 457 ) -> List[AgilentChannelChromatogramData]: 458 seq_file_dir = ( 459 SequenceDataFiles(dir=custom_path, sequence_name="") 460 if custom_path 461 else self.data_files[-1] 462 ) 463 self.data_files[-1] = self._fuzzy_match_most_recent_folder( 464 seq_file_dir 465 ).ok_value 466 spectra: List[AgilentChannelChromatogramData] = [] 467 for row in self.data_files[-1].child_dirs: 468 self.get_spectrum_at_channels(row) 469 spectra.append(AgilentChannelChromatogramData.from_dict(self.spectra)) 470 return spectra
472 def get_report( 473 self, 474 custom_path: Optional[str] = None, 475 report_type: ReportType = ReportType.TXT, 476 ) -> List[AgilentReport]: 477 if custom_path: 478 self.data_files.append( 479 self._fuzzy_match_most_recent_folder( 480 most_recent_folder=SequenceDataFiles( 481 dir=custom_path, 482 sequence_name="NA", 483 ), 484 ).ok_value 485 ) 486 parent_dir = self.data_files[-1] 487 parent_dir = self._fuzzy_match_most_recent_folder( 488 most_recent_folder=parent_dir, 489 ).ok_value 490 assert len(parent_dir.child_dirs) != 0 491 spectra = self.get_data() 492 reports = [] 493 for i, child_dir in enumerate(parent_dir.child_dirs): 494 metd_report = self.get_report_details(child_dir, report_type) 495 child_spectra: List[AgilentHPLCChromatogram] = list( 496 spectra[i].__dict__.values() 497 ) 498 for j, signal in enumerate(metd_report.signals): 499 assert len(metd_report.signals) <= len(child_spectra) 500 try: 501 possible_data = child_spectra[j] 502 if len(possible_data.x) > 0: 503 signal.data = possible_data 504 except IndexError: 505 raise ValueError(j) 506 reports.append(metd_report) 507 return reports