pychemstation.control.controllers.data_aq

1from .method import MethodController
2from .sequence import SequenceController
3
4__all__ = ["MethodController", "SequenceController"]
class MethodController(pychemstation.utils.abc_tables.run.RunController):
 37class MethodController(RunController):
 38    """Class containing method related logic."""
 39
 40    def __init__(
 41        self,
 42        controller: Optional[CommunicationController],
 43        src: Optional[str],
 44        data_dirs: Optional[List[str]],
 45        table: Table,
 46        offline: bool,
 47        injector: InjectorController,
 48        pump: PumpController,
 49        dad: DADController,
 50        column: ColumnController,
 51        sample_info: SampleInfo,
 52    ):
 53        self.injector = injector
 54        self.pump = pump
 55        self.dad = dad
 56        self.column = column
 57        self.sample_info = sample_info
 58        self.data_files: List[str] = []
 59        super().__init__(
 60            controller=controller,
 61            src=src,
 62            data_dirs=data_dirs,
 63            table=table,
 64            offline=offline,
 65        )
 66
 67    def get_sample_location(self) -> Tray:
 68        return self.sample_info.get_location()
 69
 70    def get_current_method_name(self) -> str:
 71        self.sleepy_send(Command.GET_METHOD_CMD)
 72        res = self.receive()
 73        if res.is_ok():
 74            return res.ok_value.string_response
 75        return "ERROR"
 76
 77    def get_method_params(self) -> HPLCMethodParams:
 78        if self.controller:
 79            return HPLCMethodParams(
 80                organic_modifier=self.get_om(),
 81                flow=self.get_flow(),
 82            )
 83        raise ValueError("Communication controller is offline!")
 84
 85    def get_row(self, row: int) -> TimeTableEntry:
 86        function = self.get_text(row, RegisterFlag.FUNCTION)
 87        if function == RegisterFlag.FLOW.value:
 88            return TimeTableEntry(
 89                start_time=self.get_num(row, RegisterFlag.TIME),
 90                organic_modifer=None,
 91                flow=self.get_num(row, RegisterFlag.TIMETABLE_FLOW),
 92            )
 93        if function == RegisterFlag.SOLVENT_COMPOSITION.value:
 94            return TimeTableEntry(
 95                start_time=self.get_num(row, RegisterFlag.TIME),
 96                organic_modifer=self.get_num(
 97                    row, RegisterFlag.TIMETABLE_SOLVENT_B_COMPOSITION
 98                ),
 99                flow=None,
100            )
101        raise ValueError("Both flow and organic modifier are empty")
102
103    def get_timetable(self, rows: int):
104        uncoalesced_timetable_rows = [self.get_row(r + 1) for r in range(rows)]
105        timetable_rows: Dict[str, TimeTableEntry] = {}
106        for row in uncoalesced_timetable_rows:
107            time_key = str(row.start_time)
108            if time_key not in timetable_rows.keys():
109                timetable_rows[time_key] = TimeTableEntry(
110                    start_time=row.start_time,
111                    flow=row.flow,
112                    organic_modifer=row.organic_modifer,
113                )
114            else:
115                if row.flow:
116                    timetable_rows[time_key].flow = row.flow
117                if row.organic_modifer:
118                    timetable_rows[time_key].organic_modifer = row.organic_modifer
119        entries = list(timetable_rows.values())
120        entries.sort(key=lambda e: e.start_time)
121        return entries
122
123    def load(self) -> MethodDetails:
124        rows = self.get_row_count_safely()
125        method_name = self.get_method_name()
126        timetable_rows = self.get_timetable(rows)
127        params = self.get_method_params()
128        stop_time = self.get_stop_time()
129        post_time = self.get_post_time()
130        self.table_state = MethodDetails(
131            name=method_name,
132            timetable=timetable_rows,
133            stop_time=stop_time,
134            post_time=post_time,
135            params=params,
136        )
137        return self.table_state
138
139    def get_method_name(self):
140        self.send(Command.GET_METHOD_CMD)
141        res = self.receive()
142        method_name = res.ok_value.string_response
143        return method_name
144
145    def get_total_runtime(self) -> Union[int, float]:
146        """Returns total method runtime in minutes."""
147        return self.get_post_time() + self.get_stop_time()
148
149    def current_method(self, method_name: str):
150        """
151        Checks if a given method is already loaded into Chemstation. Method name does not need the ".M" extension.
152
153        :param method_name: a Chemstation method
154        :return: True if method is already loaded
155        """
156        self.send(Command.GET_METHOD_CMD)
157        parsed_response = self.receive()
158        return method_name in parsed_response
159
160    def switch(self, method_name: str, alt_method_dir: Optional[str] = None):
161        """
162        Allows the user to switch between pre-programmed methods. No need to append '.M'
163        to the end of the method name. For example. for the method named 'General-Poroshell.M',
164        only 'General-Poroshell' is needed.
165
166        :param method_name: any available method in Chemstation method directory
167        :param alt_method_dir: directory where the method resides
168        :raise IndexError: Response did not have expected format. Try again.
169        :raise AssertionError: The desired method is not selected. Try again.
170        """
171        method_dir = self.src if not alt_method_dir else alt_method_dir
172        self.send(
173            Command.SWITCH_METHOD_CMD_SPECIFIC.value.format(
174                method_dir=method_dir, method_name=method_name
175            )
176        )
177        time.sleep(2)
178        self.send(Command.GET_METHOD_CMD)
179        time.sleep(2)
180        res = self.receive()
181        if res.is_ok():
182            parsed_response = res.ok_value.string_response
183            assert parsed_response == f"{method_name}.M", "Switching Methods failed."
184        self.table_state = None
185
186    def edit(self, updated_method: MethodDetails, save: bool):
187        """Updated the currently loaded method in ChemStation with provided values.
188
189        :param updated_method: the method with updated values, to be sent to Chemstation to modify the currently loaded method.
190        :param save: if false only modifies the method, otherwise saves to disk
191        """
192        self.table_state = updated_method
193        # Method settings required for all runs
194        self.update_method_params(
195            new_flow=updated_method.params.flow,
196            new_initial_om=updated_method.params.organic_modifier,
197            new_stop_time=updated_method.stop_time,
198            new_post_time=updated_method.post_time,
199        )
200        self.validate_timetable(updated_method.timetable)
201        self.edit_method_timetable(updated_method.timetable)
202
203        if save:
204            self.save_method()
205
206    def save_method(self):
207        self.send(
208            Command.SAVE_METHOD_CMD.value.format(
209                commit_msg=f"saved method at {str(time.time())}"
210            )
211        )
212
213    def _validate_organic_modifier(self, new_om):
214        if not (isinstance(new_om, int) or isinstance(new_om, float)):
215            raise ValueError("Organic modifier must be int or float")
216        if new_om < 0:
217            raise ValueError("Organic modifier must be positive")
218        if new_om > 100:
219            raise ValueError("Organic modifer must be less than 100.")
220
221    def _validate_flow(self, new_flow):
222        if not (isinstance(new_flow, int) or isinstance(new_flow, float)):
223            raise ValueError("Flow must be int or float")
224        if new_flow < 0:
225            raise ValueError("Flow must be positive")
226        if new_flow >= 5.0:
227            raise ValueError("Flow must be less than 5.0")
228
229    def validate_stop_time(self, new_stop_time):
230        if not (isinstance(new_stop_time, int) or isinstance(new_stop_time, float)):
231            raise ValueError("Stop time must be int or float")
232        if new_stop_time < 0:
233            raise ValueError("Stop time must be positive")
234
235    def validate_post_time(self, new_post_time):
236        if not (isinstance(new_post_time, int) or isinstance(new_post_time, float)):
237            raise ValueError("Post time must be int or float")
238        if new_post_time < 0:
239            raise ValueError("Post time must be positive")
240
241    def update_method_params(
242        self,
243        new_flow: Union[int, float],
244        new_initial_om: Union[int, float],
245        new_stop_time: Union[int, float] | None,
246        new_post_time: Union[int, float] | None,
247    ):
248        self.delete_table()
249        self._validate_flow(new_flow)
250        self.validate_post_time(new_post_time)
251        self._validate_organic_modifier(new_initial_om)
252        self.validate_stop_time(new_stop_time)
253        self.edit_flow(new_flow)
254        self.edit_initial_om(new_initial_om)
255        self.edit_stop_time(new_stop_time)
256        self.edit_post_time(new_post_time)
257
258    def download(self):
259        self.sleepy_send("DownloadRCMethod PMP1")
260
261    def _edit_row(
262        self,
263        row: TimeTableEntry,
264        first_row: bool,
265        time_added: bool,
266        flow_added: bool,
267        om_added: bool,
268        function_added: bool,
269    ) -> Tuple[bool, bool, bool, bool]:
270        def add_time():
271            nonlocal time_added
272            nonlocal first_row
273            if not time_added:
274                self.add_new_col_num(col_name=RegisterFlag.TIME, val=row.start_time)
275                time_added = True
276            elif not first_row:
277                self._edit_row_num(col_name=RegisterFlag.TIME, val=row.start_time)
278
279        def add_flow():
280            nonlocal flow_added
281            nonlocal function_added
282            if not flow_added:
283                if not function_added:
284                    self.add_new_col_text(
285                        col_name=RegisterFlag.FUNCTION,
286                        val=RegisterFlag.FLOW.value,
287                    )
288                    function_added = True
289                else:
290                    self._edit_row_text(
291                        col_name=RegisterFlag.FUNCTION,
292                        val=RegisterFlag.FLOW.value,
293                    )
294                self.add_new_col_num(
295                    col_name=RegisterFlag.TIMETABLE_FLOW,
296                    val=row.flow,
297                )
298                flow_added = True
299            else:
300                self._edit_row_text(
301                    col_name=RegisterFlag.FUNCTION, val=RegisterFlag.FLOW.value
302                )
303                self._edit_row_num(col_name=RegisterFlag.TIMETABLE_FLOW, val=row.flow)
304
305        def add_om():
306            nonlocal om_added
307            nonlocal function_added
308            if not om_added:
309                if not function_added:
310                    self.add_new_col_text(
311                        col_name=RegisterFlag.FUNCTION,
312                        val=RegisterFlag.SOLVENT_COMPOSITION.value,
313                    )
314                    function_added = True
315                else:
316                    self._edit_row_text(
317                        col_name=RegisterFlag.FUNCTION,
318                        val=RegisterFlag.SOLVENT_COMPOSITION.value,
319                    )
320                self.add_new_col_num(
321                    col_name=RegisterFlag.TIMETABLE_SOLVENT_B_COMPOSITION,
322                    val=row.organic_modifer,
323                )
324                om_added = True
325            else:
326                self._edit_row_text(
327                    col_name=RegisterFlag.FUNCTION,
328                    val=RegisterFlag.SOLVENT_COMPOSITION.value,
329                )
330                self._edit_row_num(
331                    col_name=RegisterFlag.TIMETABLE_SOLVENT_B_COMPOSITION,
332                    val=row.organic_modifer,
333                )
334
335        if row.organic_modifer:
336            self.add_row()
337            add_om()
338            add_time()
339        if row.flow:
340            self.add_row()
341            add_flow()
342            add_time()
343        self.download()
344        return time_added, flow_added, om_added, function_added
345
346    def edit_method_timetable(self, timetable_rows: List[TimeTableEntry]):
347        self.get_num_rows()
348        self.delete_table()
349        res = self.get_num_rows()
350        while not res.is_err():
351            self.delete_table()
352            res = self.get_num_rows()
353
354        self.new_table()
355        num_rows = self.get_row_count_safely()
356        if num_rows != 0:
357            raise ValueError("Should be zero rows!")
358
359        time_added = False
360        flow_added = False
361        om_added = False
362        function_added = False
363        for i, row in enumerate(timetable_rows):
364            time_added, flow_added, om_added, function_added = self._edit_row(
365                row=row,
366                first_row=i == 0,
367                time_added=time_added,
368                flow_added=flow_added,
369                om_added=om_added,
370                function_added=function_added,
371            )
372
373    def stop(self):
374        """
375        Stops the method run. A dialog window will pop up and manual intervention may be required.
376        """
377        self.send(Command.STOP_METHOD_CMD)
378
379    def run(
380        self,
381        experiment_name: str,
382        add_timestamp: bool = True,
383        stall_while_running: bool = True,
384    ):
385        """
386        :param experiment_name: Name of the experiment
387        :param stall_while_running: whether to stall or immediately return
388        :param add_timestamp: if should add timestamp to experiment name
389        """
390        hplc_is_running = False
391        tries = 0
392        while tries < 10 and not hplc_is_running:
393            timestamp = time.strftime(TIME_FORMAT)
394            self.send(
395                Command.RUN_METHOD_CMD.value.format(
396                    data_dir=self.data_dirs[0],
397                    experiment_name=f"{experiment_name}_{timestamp}"
398                    if add_timestamp
399                    else experiment_name,
400                )
401            )
402
403            hplc_is_running = self.check_hplc_is_running()
404            tries += 1
405
406        data_dir, data_file = self.get_current_run_data_dir_file()
407        if not hplc_is_running:
408            raise RuntimeError("Method failed to start.")
409
410        self.data_files.append(os.path.join(os.path.normpath(data_dir), data_file))
411        self.timeout = (self.get_total_runtime()) * 60
412
413        if stall_while_running:
414            run_completed = self.check_hplc_done_running()
415            if run_completed.is_ok():
416                self.data_files[-1] = run_completed.ok_value
417            else:
418                warnings.warn(run_completed.err_value)
419        else:
420            folder = self._fuzzy_match_most_recent_folder(self.data_files[-1])
421            i = 0
422            while folder.is_err() and i < 10:
423                folder = self._fuzzy_match_most_recent_folder(self.data_files[-1])
424                i += 1
425            if folder.is_ok():
426                self.data_files[-1] = folder.ok_value
427            else:
428                warning = f"Data folder {self.data_files[-1]} may not exist, returning and will check again after run is done."
429                warnings.warn(warning)
430
431    def _fuzzy_match_most_recent_folder(
432        self, most_recent_folder: T
433    ) -> Result[str, str]:
434        if isinstance(most_recent_folder, str) or isinstance(most_recent_folder, bytes):
435            if os.path.exists(most_recent_folder):
436                return Ok(str(most_recent_folder))
437            return Err("Folder not found!")
438        raise ValueError("Folder is not a str or byte type.")
439
440    def get_data(
441        self, custom_path: Optional[str] = None
442    ) -> AgilentChannelChromatogramData:
443        custom_path = custom_path if custom_path else self.data_files[-1]
444        self.get_spectrum_at_channels(custom_path)
445        return AgilentChannelChromatogramData.from_dict(self.spectra)
446
447    def get_data_uv(
448        self, custom_path: Optional[str] = None
449    ) -> dict[int, AgilentHPLCChromatogram]:
450        custom_path = custom_path if custom_path else self.data_files[-1]
451        self.get_uv_spectrum(custom_path)
452        return self.uv
453
454    def get_report(
455        self,
456        custom_path: Optional[str] = None,
457        report_type: ReportType = ReportType.TXT,
458    ) -> List[AgilentReport]:
459        custom_path = self.data_files[-1] if not custom_path else custom_path
460        metd_report = self.get_report_details(custom_path, report_type)
461        chrom_data: List[AgilentHPLCChromatogram] = list(
462            self.get_data(custom_path).__dict__.values()
463        )
464        for i, signal in enumerate(metd_report.signals):
465            possible_data = chrom_data[i]
466            if len(possible_data.x) > 0:
467                signal.data = possible_data
468        return [metd_report]
469
470    def _validate_row(self, row: TimeTableEntry):
471        if not (row.flow or row.organic_modifer):
472            raise ValueError(
473                "Require one of flow or organic modifier for the method timetable entry!"
474            )
475        if row.flow:
476            self._validate_flow(row.flow)
477        if row.organic_modifer:
478            self._validate_organic_modifier(row.organic_modifer)
479
480    def validate_timetable(self, timetable: List[TimeTableEntry]):
481        start_time = 0.0
482        for i, row in enumerate(timetable):
483            if row.start_time > start_time:
484                start_time = row.start_time
485            elif row.start_time <= start_time:
486                raise ValueError(
487                    f"""Every row's start time must be larger than the previous start time. 
488                    Row {i + 1} ({timetable[i].start_time}) has a smaller or equal starttime than row {i} ({start_time})"""
489                )
490            self._validate_row(row)
491
492    def get_om(self):
493        return self._read_num_param(RegisterFlag.SOLVENT_B_COMPOSITION)
494
495    def get_flow(self):
496        return self._read_num_param(RegisterFlag.FLOW)
497
498    def get_post_time(self) -> Union[int, float]:
499        return self._read_num_param(RegisterFlag.POST_TIME)
500
501    def get_stop_time(self) -> Union[int, float]:
502        return self._read_num_param(RegisterFlag.MAX_TIME)
503
504    def edit_post_time(self, new_post_time: Optional[int | float]):
505        if new_post_time:
506            post_time: Param = Param(
507                val=new_post_time,
508                chemstation_key=RegisterFlag.POST_TIME,
509                ptype=PType.NUM,
510            )
511            self._update_param(
512                Param(
513                    val="Set",
514                    chemstation_key=RegisterFlag.POSTIME_MODE,
515                    ptype=PType.STR,
516                )
517            )
518            self._update_param(post_time)
519        else:
520            self._update_param(
521                Param(
522                    val="Off",
523                    chemstation_key=RegisterFlag.POSTIME_MODE,
524                    ptype=PType.STR,
525                )
526            )
527
528    def edit_stop_time(self, new_stop_time: Optional[int | float]):
529        if new_stop_time:
530            stop_time: Param = Param(
531                val=new_stop_time,
532                chemstation_key=RegisterFlag.MAX_TIME,
533                ptype=PType.NUM,
534            )
535            self._update_param(
536                Param(
537                    val="Set",
538                    chemstation_key=RegisterFlag.STOPTIME_MODE,
539                    ptype=PType.STR,
540                )
541            )
542            self._update_param(stop_time)
543        else:
544            self._update_param(
545                Param(
546                    val="Off",
547                    chemstation_key=RegisterFlag.STOPTIME_MODE,
548                    ptype=PType.STR,
549                )
550            )
551
552    def edit_flow(self, new_flow: Union[int, float]):
553        flow: Param = Param(
554            val=new_flow, chemstation_key=RegisterFlag.FLOW, ptype=PType.NUM
555        )
556        self._update_param(flow)
557
558    def edit_initial_om(self, new_om: Union[int, float]):
559        initial_organic_modifier: Param = Param(
560            val=new_om,
561            chemstation_key=RegisterFlag.SOLVENT_B_COMPOSITION,
562            ptype=PType.NUM,
563        )
564        self._update_param(initial_organic_modifier)

Class containing method related logic.

MethodController( controller: Optional[pychemstation.control.controllers.CommunicationController], src: Optional[str], data_dirs: Optional[List[str]], table: pychemstation.utils.table_types.Table, offline: bool, injector: pychemstation.control.controllers.devices.InjectorController, pump: pychemstation.control.controllers.devices.pump.PumpController, dad: pychemstation.control.controllers.devices.dad.DADController, column: pychemstation.control.controllers.devices.column.ColumnController, sample_info: pychemstation.control.controllers.devices.sample_info.SampleInfo)
40    def __init__(
41        self,
42        controller: Optional[CommunicationController],
43        src: Optional[str],
44        data_dirs: Optional[List[str]],
45        table: Table,
46        offline: bool,
47        injector: InjectorController,
48        pump: PumpController,
49        dad: DADController,
50        column: ColumnController,
51        sample_info: SampleInfo,
52    ):
53        self.injector = injector
54        self.pump = pump
55        self.dad = dad
56        self.column = column
57        self.sample_info = sample_info
58        self.data_files: List[str] = []
59        super().__init__(
60            controller=controller,
61            src=src,
62            data_dirs=data_dirs,
63            table=table,
64            offline=offline,
65        )
injector
pump
dad
column
sample_info
data_files: List[str]
67    def get_sample_location(self) -> Tray:
68        return self.sample_info.get_location()
def get_current_method_name(self) -> str:
70    def get_current_method_name(self) -> str:
71        self.sleepy_send(Command.GET_METHOD_CMD)
72        res = self.receive()
73        if res.is_ok():
74            return res.ok_value.string_response
75        return "ERROR"
def get_method_params(self) -> pychemstation.utils.method_types.HPLCMethodParams:
77    def get_method_params(self) -> HPLCMethodParams:
78        if self.controller:
79            return HPLCMethodParams(
80                organic_modifier=self.get_om(),
81                flow=self.get_flow(),
82            )
83        raise ValueError("Communication controller is offline!")
def get_row(self, row: int) -> pychemstation.utils.method_types.TimeTableEntry:
 85    def get_row(self, row: int) -> TimeTableEntry:
 86        function = self.get_text(row, RegisterFlag.FUNCTION)
 87        if function == RegisterFlag.FLOW.value:
 88            return TimeTableEntry(
 89                start_time=self.get_num(row, RegisterFlag.TIME),
 90                organic_modifer=None,
 91                flow=self.get_num(row, RegisterFlag.TIMETABLE_FLOW),
 92            )
 93        if function == RegisterFlag.SOLVENT_COMPOSITION.value:
 94            return TimeTableEntry(
 95                start_time=self.get_num(row, RegisterFlag.TIME),
 96                organic_modifer=self.get_num(
 97                    row, RegisterFlag.TIMETABLE_SOLVENT_B_COMPOSITION
 98                ),
 99                flow=None,
100            )
101        raise ValueError("Both flow and organic modifier are empty")
def get_timetable(self, rows: int):
103    def get_timetable(self, rows: int):
104        uncoalesced_timetable_rows = [self.get_row(r + 1) for r in range(rows)]
105        timetable_rows: Dict[str, TimeTableEntry] = {}
106        for row in uncoalesced_timetable_rows:
107            time_key = str(row.start_time)
108            if time_key not in timetable_rows.keys():
109                timetable_rows[time_key] = TimeTableEntry(
110                    start_time=row.start_time,
111                    flow=row.flow,
112                    organic_modifer=row.organic_modifer,
113                )
114            else:
115                if row.flow:
116                    timetable_rows[time_key].flow = row.flow
117                if row.organic_modifer:
118                    timetable_rows[time_key].organic_modifer = row.organic_modifer
119        entries = list(timetable_rows.values())
120        entries.sort(key=lambda e: e.start_time)
121        return entries
def load(self) -> pychemstation.utils.method_types.MethodDetails:
123    def load(self) -> MethodDetails:
124        rows = self.get_row_count_safely()
125        method_name = self.get_method_name()
126        timetable_rows = self.get_timetable(rows)
127        params = self.get_method_params()
128        stop_time = self.get_stop_time()
129        post_time = self.get_post_time()
130        self.table_state = MethodDetails(
131            name=method_name,
132            timetable=timetable_rows,
133            stop_time=stop_time,
134            post_time=post_time,
135            params=params,
136        )
137        return self.table_state
def get_method_name(self):
139    def get_method_name(self):
140        self.send(Command.GET_METHOD_CMD)
141        res = self.receive()
142        method_name = res.ok_value.string_response
143        return method_name
def get_total_runtime(self) -> Union[int, float]:
145    def get_total_runtime(self) -> Union[int, float]:
146        """Returns total method runtime in minutes."""
147        return self.get_post_time() + self.get_stop_time()

Returns total method runtime in minutes.

def current_method(self, method_name: str):
149    def current_method(self, method_name: str):
150        """
151        Checks if a given method is already loaded into Chemstation. Method name does not need the ".M" extension.
152
153        :param method_name: a Chemstation method
154        :return: True if method is already loaded
155        """
156        self.send(Command.GET_METHOD_CMD)
157        parsed_response = self.receive()
158        return method_name in parsed_response

Checks if a given method is already loaded into Chemstation. Method name does not need the ".M" extension.

Parameters
  • method_name: a Chemstation method
Returns

True if method is already loaded

def switch(self, method_name: str, alt_method_dir: Optional[str] = None):
160    def switch(self, method_name: str, alt_method_dir: Optional[str] = None):
161        """
162        Allows the user to switch between pre-programmed methods. No need to append '.M'
163        to the end of the method name. For example. for the method named 'General-Poroshell.M',
164        only 'General-Poroshell' is needed.
165
166        :param method_name: any available method in Chemstation method directory
167        :param alt_method_dir: directory where the method resides
168        :raise IndexError: Response did not have expected format. Try again.
169        :raise AssertionError: The desired method is not selected. Try again.
170        """
171        method_dir = self.src if not alt_method_dir else alt_method_dir
172        self.send(
173            Command.SWITCH_METHOD_CMD_SPECIFIC.value.format(
174                method_dir=method_dir, method_name=method_name
175            )
176        )
177        time.sleep(2)
178        self.send(Command.GET_METHOD_CMD)
179        time.sleep(2)
180        res = self.receive()
181        if res.is_ok():
182            parsed_response = res.ok_value.string_response
183            assert parsed_response == f"{method_name}.M", "Switching Methods failed."
184        self.table_state = None

Allows the user to switch between pre-programmed methods. No need to append '.M' to the end of the method name. For example. for the method named 'General-Poroshell.M', only 'General-Poroshell' is needed.

Parameters
  • method_name: any available method in Chemstation method directory
  • alt_method_dir: directory where the method resides :raise IndexError: Response did not have expected format. Try again. :raise AssertionError: The desired method is not selected. Try again.
def edit( self, updated_method: pychemstation.utils.method_types.MethodDetails, save: bool):
186    def edit(self, updated_method: MethodDetails, save: bool):
187        """Updated the currently loaded method in ChemStation with provided values.
188
189        :param updated_method: the method with updated values, to be sent to Chemstation to modify the currently loaded method.
190        :param save: if false only modifies the method, otherwise saves to disk
191        """
192        self.table_state = updated_method
193        # Method settings required for all runs
194        self.update_method_params(
195            new_flow=updated_method.params.flow,
196            new_initial_om=updated_method.params.organic_modifier,
197            new_stop_time=updated_method.stop_time,
198            new_post_time=updated_method.post_time,
199        )
200        self.validate_timetable(updated_method.timetable)
201        self.edit_method_timetable(updated_method.timetable)
202
203        if save:
204            self.save_method()

Updated the currently loaded method in ChemStation with provided values.

Parameters
  • updated_method: the method with updated values, to be sent to Chemstation to modify the currently loaded method.
  • save: if false only modifies the method, otherwise saves to disk
def save_method(self):
206    def save_method(self):
207        self.send(
208            Command.SAVE_METHOD_CMD.value.format(
209                commit_msg=f"saved method at {str(time.time())}"
210            )
211        )
def validate_stop_time(self, new_stop_time):
229    def validate_stop_time(self, new_stop_time):
230        if not (isinstance(new_stop_time, int) or isinstance(new_stop_time, float)):
231            raise ValueError("Stop time must be int or float")
232        if new_stop_time < 0:
233            raise ValueError("Stop time must be positive")
def validate_post_time(self, new_post_time):
235    def validate_post_time(self, new_post_time):
236        if not (isinstance(new_post_time, int) or isinstance(new_post_time, float)):
237            raise ValueError("Post time must be int or float")
238        if new_post_time < 0:
239            raise ValueError("Post time must be positive")
def update_method_params( self, new_flow: Union[int, float], new_initial_om: Union[int, float], new_stop_time: Union[int, float, NoneType], new_post_time: Union[int, float, NoneType]):
241    def update_method_params(
242        self,
243        new_flow: Union[int, float],
244        new_initial_om: Union[int, float],
245        new_stop_time: Union[int, float] | None,
246        new_post_time: Union[int, float] | None,
247    ):
248        self.delete_table()
249        self._validate_flow(new_flow)
250        self.validate_post_time(new_post_time)
251        self._validate_organic_modifier(new_initial_om)
252        self.validate_stop_time(new_stop_time)
253        self.edit_flow(new_flow)
254        self.edit_initial_om(new_initial_om)
255        self.edit_stop_time(new_stop_time)
256        self.edit_post_time(new_post_time)
def download(self):
258    def download(self):
259        self.sleepy_send("DownloadRCMethod PMP1")
def edit_method_timetable( self, timetable_rows: List[pychemstation.utils.method_types.TimeTableEntry]):
346    def edit_method_timetable(self, timetable_rows: List[TimeTableEntry]):
347        self.get_num_rows()
348        self.delete_table()
349        res = self.get_num_rows()
350        while not res.is_err():
351            self.delete_table()
352            res = self.get_num_rows()
353
354        self.new_table()
355        num_rows = self.get_row_count_safely()
356        if num_rows != 0:
357            raise ValueError("Should be zero rows!")
358
359        time_added = False
360        flow_added = False
361        om_added = False
362        function_added = False
363        for i, row in enumerate(timetable_rows):
364            time_added, flow_added, om_added, function_added = self._edit_row(
365                row=row,
366                first_row=i == 0,
367                time_added=time_added,
368                flow_added=flow_added,
369                om_added=om_added,
370                function_added=function_added,
371            )
def stop(self):
373    def stop(self):
374        """
375        Stops the method run. A dialog window will pop up and manual intervention may be required.
376        """
377        self.send(Command.STOP_METHOD_CMD)

Stops the method run. A dialog window will pop up and manual intervention may be required.

def run( self, experiment_name: str, add_timestamp: bool = True, stall_while_running: bool = True):
379    def run(
380        self,
381        experiment_name: str,
382        add_timestamp: bool = True,
383        stall_while_running: bool = True,
384    ):
385        """
386        :param experiment_name: Name of the experiment
387        :param stall_while_running: whether to stall or immediately return
388        :param add_timestamp: if should add timestamp to experiment name
389        """
390        hplc_is_running = False
391        tries = 0
392        while tries < 10 and not hplc_is_running:
393            timestamp = time.strftime(TIME_FORMAT)
394            self.send(
395                Command.RUN_METHOD_CMD.value.format(
396                    data_dir=self.data_dirs[0],
397                    experiment_name=f"{experiment_name}_{timestamp}"
398                    if add_timestamp
399                    else experiment_name,
400                )
401            )
402
403            hplc_is_running = self.check_hplc_is_running()
404            tries += 1
405
406        data_dir, data_file = self.get_current_run_data_dir_file()
407        if not hplc_is_running:
408            raise RuntimeError("Method failed to start.")
409
410        self.data_files.append(os.path.join(os.path.normpath(data_dir), data_file))
411        self.timeout = (self.get_total_runtime()) * 60
412
413        if stall_while_running:
414            run_completed = self.check_hplc_done_running()
415            if run_completed.is_ok():
416                self.data_files[-1] = run_completed.ok_value
417            else:
418                warnings.warn(run_completed.err_value)
419        else:
420            folder = self._fuzzy_match_most_recent_folder(self.data_files[-1])
421            i = 0
422            while folder.is_err() and i < 10:
423                folder = self._fuzzy_match_most_recent_folder(self.data_files[-1])
424                i += 1
425            if folder.is_ok():
426                self.data_files[-1] = folder.ok_value
427            else:
428                warning = f"Data folder {self.data_files[-1]} may not exist, returning and will check again after run is done."
429                warnings.warn(warning)
Parameters
  • experiment_name: Name of the experiment
  • stall_while_running: whether to stall or immediately return
  • add_timestamp: if should add timestamp to experiment name
def get_data( self, custom_path: Optional[str] = None) -> pychemstation.analysis.AgilentChannelChromatogramData:
440    def get_data(
441        self, custom_path: Optional[str] = None
442    ) -> AgilentChannelChromatogramData:
443        custom_path = custom_path if custom_path else self.data_files[-1]
444        self.get_spectrum_at_channels(custom_path)
445        return AgilentChannelChromatogramData.from_dict(self.spectra)
def get_data_uv( self, custom_path: Optional[str] = None) -> dict[int, pychemstation.analysis.AgilentHPLCChromatogram]:
447    def get_data_uv(
448        self, custom_path: Optional[str] = None
449    ) -> dict[int, AgilentHPLCChromatogram]:
450        custom_path = custom_path if custom_path else self.data_files[-1]
451        self.get_uv_spectrum(custom_path)
452        return self.uv
def get_report( self, custom_path: Optional[str] = None, report_type: pychemstation.analysis.process_report.ReportType = <ReportType.TXT: 0>) -> List[pychemstation.analysis.process_report.AgilentReport]:
454    def get_report(
455        self,
456        custom_path: Optional[str] = None,
457        report_type: ReportType = ReportType.TXT,
458    ) -> List[AgilentReport]:
459        custom_path = self.data_files[-1] if not custom_path else custom_path
460        metd_report = self.get_report_details(custom_path, report_type)
461        chrom_data: List[AgilentHPLCChromatogram] = list(
462            self.get_data(custom_path).__dict__.values()
463        )
464        for i, signal in enumerate(metd_report.signals):
465            possible_data = chrom_data[i]
466            if len(possible_data.x) > 0:
467                signal.data = possible_data
468        return [metd_report]
def validate_timetable( self, timetable: List[pychemstation.utils.method_types.TimeTableEntry]):
480    def validate_timetable(self, timetable: List[TimeTableEntry]):
481        start_time = 0.0
482        for i, row in enumerate(timetable):
483            if row.start_time > start_time:
484                start_time = row.start_time
485            elif row.start_time <= start_time:
486                raise ValueError(
487                    f"""Every row's start time must be larger than the previous start time. 
488                    Row {i + 1} ({timetable[i].start_time}) has a smaller or equal starttime than row {i} ({start_time})"""
489                )
490            self._validate_row(row)
def get_om(self):
492    def get_om(self):
493        return self._read_num_param(RegisterFlag.SOLVENT_B_COMPOSITION)
def get_flow(self):
495    def get_flow(self):
496        return self._read_num_param(RegisterFlag.FLOW)
def get_post_time(self) -> Union[int, float]:
498    def get_post_time(self) -> Union[int, float]:
499        return self._read_num_param(RegisterFlag.POST_TIME)
def get_stop_time(self) -> Union[int, float]:
501    def get_stop_time(self) -> Union[int, float]:
502        return self._read_num_param(RegisterFlag.MAX_TIME)
def edit_post_time(self, new_post_time: Union[int, float, NoneType]):
504    def edit_post_time(self, new_post_time: Optional[int | float]):
505        if new_post_time:
506            post_time: Param = Param(
507                val=new_post_time,
508                chemstation_key=RegisterFlag.POST_TIME,
509                ptype=PType.NUM,
510            )
511            self._update_param(
512                Param(
513                    val="Set",
514                    chemstation_key=RegisterFlag.POSTIME_MODE,
515                    ptype=PType.STR,
516                )
517            )
518            self._update_param(post_time)
519        else:
520            self._update_param(
521                Param(
522                    val="Off",
523                    chemstation_key=RegisterFlag.POSTIME_MODE,
524                    ptype=PType.STR,
525                )
526            )
def edit_stop_time(self, new_stop_time: Union[int, float, NoneType]):
528    def edit_stop_time(self, new_stop_time: Optional[int | float]):
529        if new_stop_time:
530            stop_time: Param = Param(
531                val=new_stop_time,
532                chemstation_key=RegisterFlag.MAX_TIME,
533                ptype=PType.NUM,
534            )
535            self._update_param(
536                Param(
537                    val="Set",
538                    chemstation_key=RegisterFlag.STOPTIME_MODE,
539                    ptype=PType.STR,
540                )
541            )
542            self._update_param(stop_time)
543        else:
544            self._update_param(
545                Param(
546                    val="Off",
547                    chemstation_key=RegisterFlag.STOPTIME_MODE,
548                    ptype=PType.STR,
549                )
550            )
def edit_flow(self, new_flow: Union[int, float]):
552    def edit_flow(self, new_flow: Union[int, float]):
553        flow: Param = Param(
554            val=new_flow, chemstation_key=RegisterFlag.FLOW, ptype=PType.NUM
555        )
556        self._update_param(flow)
def edit_initial_om(self, new_om: Union[int, float]):
558    def edit_initial_om(self, new_om: Union[int, float]):
559        initial_organic_modifier: Param = Param(
560            val=new_om,
561            chemstation_key=RegisterFlag.SOLVENT_B_COMPOSITION,
562            ptype=PType.NUM,
563        )
564        self._update_param(initial_organic_modifier)
class SequenceController(pychemstation.utils.abc_tables.run.RunController):
 34class SequenceController(RunController):
 35    """
 36    Class containing sequence related logic
 37    """
 38
 39    def __init__(
 40        self,
 41        controller: Optional[CommunicationController],
 42        method_controller: MethodController,
 43        src: Optional[str],
 44        data_dirs: Optional[List[str]],
 45        table: Table,
 46        offline: bool,
 47    ):
 48        self.method_controller = method_controller
 49        self.data_files: List[SequenceDataFiles] = []
 50        super().__init__(
 51            controller=controller,
 52            src=src,
 53            data_dirs=data_dirs,
 54            table=table,
 55            offline=offline,
 56        )
 57
 58    def load(self) -> SequenceTable:
 59        rows = self.get_row_count_safely()
 60        self.send(Command.GET_SEQUENCE_CMD)
 61        seq_name = self.receive()
 62
 63        if seq_name.is_ok():
 64            self.table_state: SequenceTable = SequenceTable(
 65                name=seq_name.ok_value.string_response.partition(".S")[0],
 66                rows=[self.get_row(r + 1) for r in range(int(rows))],
 67            )
 68            return self.table_state
 69        else:
 70            raise RuntimeError(
 71                f"couldn't read rows or sequence name: {seq_name.err_value}"
 72            )
 73
 74    @staticmethod
 75    def try_int(val: Any) -> Optional[int]:
 76        try:
 77            return int(val)
 78        except ValueError:
 79            return None
 80
 81    @staticmethod
 82    def try_float(val: Any) -> Optional[float]:
 83        try:
 84            return float(val)
 85        except ValueError:
 86            return None
 87
 88    @staticmethod
 89    def try_vial_location(val: Any) -> Tray:
 90        try:
 91            return VialBar(val) if val <= 10 else FiftyFourVialPlate.from_int(num=val)
 92        except ValueError:
 93            raise ValueError("Expected vial location, is empty.")
 94
 95    def get_row(self, row: int) -> SequenceEntry:
 96        sample_name = self.get_sample_name(row)
 97        vial_location = self.get_vial_location(row)
 98        data_file = self.get_data_file(row)
 99        method = self.get_method(row)
100        num_inj = self.get_num_inj(row)
101        inj_vol = self.get_inj_vol(row)
102        inj_source = self.get_inj_source(row)
103        sample_type = self.get_sample_type(row)
104        return SequenceEntry(
105            sample_name=sample_name,
106            vial_location=vial_location,
107            method=None if len(method) == 0 else method,
108            num_inj=num_inj,
109            inj_vol=inj_vol,
110            inj_source=inj_source,
111            sample_type=sample_type,
112            data_file=data_file,
113        )
114
115    def get_sample_type(self, row):
116        return SampleType(self.get_num(row, RegisterFlag.SAMPLE_TYPE))
117
118    def get_inj_source(self, row):
119        return InjectionSource(self.get_text(row, RegisterFlag.INJ_SOR))
120
121    def get_inj_vol(self, row):
122        return self.try_float(self.get_text(row, RegisterFlag.INJ_VOL))
123
124    def get_num_inj(self, row):
125        return self.try_int(self.get_num(row, RegisterFlag.NUM_INJ))
126
127    def get_method(self, row):
128        return self.get_text(row, RegisterFlag.METHOD)
129
130    def get_data_file(self, row):
131        return self.get_text(row, RegisterFlag.DATA_FILE)
132
133    def get_vial_location(self, row) -> Tray:
134        return self.try_vial_location(
135            self.try_int(self.get_num(row, RegisterFlag.VIAL_LOCATION))
136        )
137
138    def get_sample_name(self, row):
139        return self.get_text(row, RegisterFlag.NAME)
140
141    def switch(self, seq_name: str):
142        """
143        Switch to the specified sequence. The sequence name does not need the '.S' extension.
144
145        :param seq_name: The name of the sequence file
146        """
147        self.send(f'_SeqFile$ = "{seq_name}.S"')
148        self.send(f'_SeqPath$ = "{self.src}"')
149        self.send(Command.SWITCH_SEQUENCE_CMD)
150        time.sleep(2)
151        parsed_response = self.get_current_sequence_name()
152
153        assert parsed_response == f"{seq_name}.S", "Switching sequence failed."
154
155    def get_current_sequence_name(self):
156        self.send(Command.GET_SEQUENCE_CMD)
157        time.sleep(2)
158        parsed_response = self.receive().ok_value.string_response
159        return parsed_response
160
161    def edit(self, sequence_table: SequenceTable):
162        """
163        Updates the currently loaded sequence table with the provided table. This method will delete the existing sequence table and remake it.
164        If you would only like to edit a single row of a sequence table, use `edit_sequence_table_row` instead.
165
166        :param sequence_table:
167        """
168        self.table_state = sequence_table
169        rows = self.get_row_count_safely()
170        existing_row_num = rows
171        wanted_row_num = len(sequence_table.rows)
172        for i in range(int(existing_row_num)):
173            self.delete_row(int(existing_row_num - i))
174            self.send(Command.SAVE_SEQUENCE_CMD)
175        for i in range(int(wanted_row_num)):
176            self.add_row()
177            self.download()
178        self.send(Command.SWITCH_SEQUENCE_CMD)
179        for i, row in enumerate(sequence_table.rows):
180            self._edit_row(row=row, row_num=i + 1)
181            self.sleep(1)
182        self.download()
183        self.send(Command.SWITCH_SEQUENCE_CMD)
184
185    def _edit_row(self, row: SequenceEntry, row_num: int):
186        """
187        Edits a row in the sequence table. If a row does NOT exist, a new one will be created.
188
189        :param row: sequence row entry with updated information
190        :param row_num: the row to edit, based on 1-based indexing
191        """
192        num_rows = self.get_row_count_safely()
193        while num_rows < row_num:
194            self.add_row()
195            self.download()
196            num_rows = self.get_row_count_safely()
197        if row.vial_location:
198            self.edit_vial_location(row.vial_location, row_num, save=False)
199        if row.method:
200            self.edit_method_name(row.method, row_num, save=False)
201        if row.num_inj:
202            self.edit_num_injections(row.num_inj, row_num, save=False)
203        if row.inj_vol:
204            self.edit_injection_volume(row.inj_vol, row_num, save=False)
205        if row.inj_source:
206            self.edit_injection_source(row.inj_source, row_num, save=False)
207        if row.sample_name:
208            self.edit_sample_name(row.sample_name, row_num, save=False)
209        if row.data_file:
210            self.edit_data_file(row.data_file, row_num, save=False)
211        if row.sample_type:
212            self.edit_sample_type(row.sample_type, row_num, save=False)
213        self.download()
214
215    def edit_sample_type(
216        self, sample_type: SampleType, row_num: int, save: bool = True
217    ):
218        if not isinstance(sample_type, SampleType):
219            raise ValueError("`sample_type` should be of type `SampleType`")
220        self._edit_row_num(
221            row=row_num,
222            col_name=RegisterFlag.SAMPLE_TYPE,
223            val=sample_type.value,
224        )
225        if save:
226            self.download()
227
228    def edit_data_file(self, data_file: str, row_num: int, save: bool = True):
229        self._edit_row_text(row=row_num, col_name=RegisterFlag.DATA_FILE, val=data_file)
230        if save:
231            self.download()
232
233    def edit_sample_name(self, sample_name: str, row_num: int, save: bool = True):
234        self._edit_row_text(row=row_num, col_name=RegisterFlag.NAME, val=sample_name)
235        if save:
236            self.download()
237
238    def edit_injection_source(
239        self, inj_source: InjectionSource, row_num: int, save: bool = True
240    ):
241        if not isinstance(inj_source, InjectionSource):
242            raise ValueError("`inj_source` should be of type `InjectionSource`")
243        self._edit_row_text(
244            row=row_num, col_name=RegisterFlag.INJ_SOR, val=inj_source.value
245        )
246        if save:
247            self.download()
248
249    def edit_injection_volume(
250        self, inj_vol: Union[int, float], row_num: int, save: bool = True
251    ):
252        self._edit_row_text(
253            row=row_num, col_name=RegisterFlag.INJ_VOL, val=str(inj_vol)
254        )
255        if save:
256            self.download()
257
258    def edit_num_injections(self, num_inj: int, row_num: int, save: bool = True):
259        self._edit_row_num(row=row_num, col_name=RegisterFlag.NUM_INJ, val=num_inj)
260        if save:
261            self.download()
262
263    def edit_method_name(
264        self, method: str, row_num: int, save: bool = True, override_check: bool = False
265    ):
266        method_dir = self.method_controller.src
267        possible_path = os.path.join(method_dir, method) + ".M\\"
268        if os.path.exists(possible_path):
269            method = os.path.join(method_dir, method)
270        elif not override_check:
271            raise ValueError(
272                "Method may not exist. If you would still like to use this method, set the `override_check` flag to `True`"
273            )
274        self._edit_row_text(row=row_num, col_name=RegisterFlag.METHOD, val=method)
275        if save:
276            self.download()
277
278    def edit_vial_location(self, loc: Tray, row_num: int, save: bool = True):
279        loc_num = -1
280        try:
281            previous_contents = self.get_row(row_num)
282            if (
283                isinstance(loc, VialBar)
284                and isinstance(previous_contents.vial_location, VialBar)
285                or isinstance(loc, FiftyFourVialPlate)
286                and isinstance(previous_contents.vial_location, FiftyFourVialPlate)
287            ):
288                if isinstance(loc, VialBar):
289                    loc_num = loc.value
290                elif isinstance(loc, FiftyFourVialPlate):
291                    loc_num = loc.value()
292                self._edit_row_num(
293                    row=row_num, col_name=RegisterFlag.VIAL_LOCATION, val=loc_num
294                )
295            elif isinstance(loc, VialBar) or isinstance(loc, FiftyFourVialPlate):
296                self.add_row()
297                previous_contents.vial_location = loc
298                num_rows = self.get_row_count_safely()
299                self._edit_row(previous_contents, num_rows)
300                self.move_row(int(num_rows), row_num)
301                self.delete_row(row_num + 1)
302                self.download()
303            else:
304                raise ValueError(
305                    "`loc` should be of type `VialBar`, `FiftyFourVialPlate`"
306                )
307        except Exception:
308            if not (isinstance(loc, VialBar) or isinstance(loc, FiftyFourVialPlate)):
309                raise ValueError(
310                    "`loc` should be of type `VialBar`, `FiftyFourVialPlate`"
311                )
312            if isinstance(loc, VialBar):
313                loc_num = loc.value
314            elif isinstance(loc, FiftyFourVialPlate):
315                loc_num = loc.value()
316            self._edit_row_num(
317                row=row_num, col_name=RegisterFlag.VIAL_LOCATION, val=loc_num
318            )
319        if save:
320            self.download()
321
322    def download(self):
323        self.send(Command.SAVE_SEQUENCE_CMD)
324
325    def run(self, stall_while_running: bool = True):
326        """
327        Starts the currently loaded sequence, storing data
328        under the <data_dir>/<sequence table name> folder.
329        Device must be ready.
330        """
331
332        current_sequence_name = self.get_current_sequence_name()
333        if not self.table_state or self.table_state.name not in current_sequence_name:
334            self.table_state = self.load()
335
336        total_runtime = 0.0
337        for entry in self.table_state.rows:
338            curr_method_runtime = self.method_controller.get_total_runtime()
339            loaded_method = self.method_controller.get_method_name().removesuffix(".M")
340            if entry.method:
341                method_path = entry.method.split(sep="\\")
342                method_name = method_path[-1]
343                if loaded_method != method_name:
344                    method_dir = (
345                        "\\".join(method_path[:-1]) + "\\"
346                        if len(method_path) > 1
347                        else None
348                    )
349                    self.method_controller.switch(
350                        method_name=method_name, alt_method_dir=method_dir
351                    )
352                    curr_method_runtime = self.method_controller.get_total_runtime()
353            total_runtime += curr_method_runtime
354
355        timestamp = time.strftime(SEQUENCE_TIME_FORMAT)
356        folder_name = f"{self.table_state.name} {timestamp}"
357
358        self.send(Command.SAVE_METHOD_CMD)
359        self.send(Command.SAVE_SEQUENCE_CMD)
360        self.send(Command.RUN_SEQUENCE_CMD.value)
361        self.timeout = total_runtime * 60
362
363        tries = 10
364        hplc_running = False
365        for _ in range(tries):
366            hplc_running = self.check_hplc_is_running()
367            if hplc_running:
368                break
369            else:
370                self.send(Command.RUN_SEQUENCE_CMD.value)
371
372        if hplc_running:
373            full_path_name, current_sample_file = self.try_getting_run_info(folder_name)
374            if full_path_name and current_sample_file:
375                data_file = SequenceDataFiles(
376                    sequence_name=self.table_state.name,
377                    dir=full_path_name,
378                    child_dirs=[os.path.join(full_path_name, current_sample_file)],
379                )
380                self.data_files.append(data_file)
381            else:
382                raise ValueError("Data directory for sequence was not found.")
383
384            if stall_while_running:
385                run_completed = self.check_hplc_done_running()
386                if run_completed.is_ok():
387                    self.data_files[-1] = run_completed.ok_value
388                else:
389                    warnings.warn(run_completed.err_value)
390        else:
391            raise RuntimeError("Sequence run may not have started.")
392
393    def try_getting_run_info(self, folder_name: str) -> Tuple[str, str | None]:
394        full_path_name, current_sample_file = None, None
395        for _ in range(5):
396            try:
397                full_path_name, current_sample_file = (
398                    self.get_current_run_data_dir_file()
399                )
400            except ValueError:
401                pass
402            if current_sample_file and full_path_name:
403                return full_path_name, current_sample_file
404            elif full_path_name:
405                return full_path_name, None
406        raise ValueError("Could not get sequence data folder")
407
408    @override
409    def _fuzzy_match_most_recent_folder(
410        self, most_recent_folder: T
411    ) -> Result[SequenceDataFiles, str]:
412        if isinstance(most_recent_folder, SequenceDataFiles):
413            try:
414                if most_recent_folder.dir and os.path.isdir(most_recent_folder.dir):
415                    subdirs = [x[0] for x in os.walk(most_recent_folder.dir)]
416                    most_recent_folder.child_dirs = [
417                        f
418                        for f in subdirs
419                        if most_recent_folder.dir in f and ".D" in f and f[-1] == "D"
420                    ]
421                    return Ok(most_recent_folder)
422                else:
423                    return Err("No sequence folder found, please give the full path.")
424            except Exception as e:
425                error = f"Failed to get sequence folder: {e}"
426                return Err(error)
427        return Err("Expected SequenceDataFile type.")
428
429    def get_data_mult_uv(self, custom_path: Optional[str] = None):
430        seq_data_dir = (
431            SequenceDataFiles(dir=custom_path, sequence_name="")
432            if custom_path
433            else self.data_files[-1]
434        )
435        search_folder = self._fuzzy_match_most_recent_folder(seq_data_dir)
436        if search_folder.is_ok():
437            seq_data_dir = search_folder.ok_value
438        else:
439            raise FileNotFoundError(search_folder.err_value)
440        all_w_spectra: List[Dict[int, AgilentHPLCChromatogram]] = []
441        for row in seq_data_dir.child_dirs:
442            all_w_spectra.append(self.get_data_uv(custom_path=row))
443        return all_w_spectra
444
445    def get_data_uv(
446        self, custom_path: Optional[str] = None
447    ) -> Dict[int, AgilentHPLCChromatogram]:
448        if isinstance(custom_path, str):
449            self.get_uv_spectrum(custom_path)
450            return self.uv
451        raise ValueError(
452            "Path should exist when calling from sequence. Provide a child path (contains the method)."
453        )
454
455    def get_data(
456        self, custom_path: Optional[str] = None
457    ) -> List[AgilentChannelChromatogramData]:
458        seq_file_dir = (
459            SequenceDataFiles(dir=custom_path, sequence_name="")
460            if custom_path
461            else self.data_files[-1]
462        )
463        self.data_files[-1] = self._fuzzy_match_most_recent_folder(
464            seq_file_dir
465        ).ok_value
466        spectra: List[AgilentChannelChromatogramData] = []
467        for row in self.data_files[-1].child_dirs:
468            self.get_spectrum_at_channels(row)
469            spectra.append(AgilentChannelChromatogramData.from_dict(self.spectra))
470        return spectra
471
472    def get_report(
473        self,
474        custom_path: Optional[str] = None,
475        report_type: ReportType = ReportType.TXT,
476    ) -> List[AgilentReport]:
477        if custom_path:
478            self.data_files.append(
479                self._fuzzy_match_most_recent_folder(
480                    most_recent_folder=SequenceDataFiles(
481                        dir=custom_path,
482                        sequence_name="NA",
483                    ),
484                ).ok_value
485            )
486        parent_dir = self.data_files[-1]
487        parent_dir = self._fuzzy_match_most_recent_folder(
488            most_recent_folder=parent_dir,
489        ).ok_value
490        assert len(parent_dir.child_dirs) != 0
491        spectra = self.get_data()
492        reports = []
493        for i, child_dir in enumerate(parent_dir.child_dirs):
494            metd_report = self.get_report_details(child_dir, report_type)
495            child_spectra: List[AgilentHPLCChromatogram] = list(
496                spectra[i].__dict__.values()
497            )
498            for j, signal in enumerate(metd_report.signals):
499                assert len(metd_report.signals) <= len(child_spectra)
500                try:
501                    possible_data = child_spectra[j]
502                    if len(possible_data.x) > 0:
503                        signal.data = possible_data
504                except IndexError:
505                    raise ValueError(j)
506            reports.append(metd_report)
507        return reports

Class containing sequence related logic

SequenceController( controller: Optional[pychemstation.control.controllers.CommunicationController], method_controller: MethodController, src: Optional[str], data_dirs: Optional[List[str]], table: pychemstation.utils.table_types.Table, offline: bool)
39    def __init__(
40        self,
41        controller: Optional[CommunicationController],
42        method_controller: MethodController,
43        src: Optional[str],
44        data_dirs: Optional[List[str]],
45        table: Table,
46        offline: bool,
47    ):
48        self.method_controller = method_controller
49        self.data_files: List[SequenceDataFiles] = []
50        super().__init__(
51            controller=controller,
52            src=src,
53            data_dirs=data_dirs,
54            table=table,
55            offline=offline,
56        )
method_controller
def load(self) -> pychemstation.utils.sequence_types.SequenceTable:
58    def load(self) -> SequenceTable:
59        rows = self.get_row_count_safely()
60        self.send(Command.GET_SEQUENCE_CMD)
61        seq_name = self.receive()
62
63        if seq_name.is_ok():
64            self.table_state: SequenceTable = SequenceTable(
65                name=seq_name.ok_value.string_response.partition(".S")[0],
66                rows=[self.get_row(r + 1) for r in range(int(rows))],
67            )
68            return self.table_state
69        else:
70            raise RuntimeError(
71                f"couldn't read rows or sequence name: {seq_name.err_value}"
72            )
@staticmethod
def try_int(val: Any) -> Optional[int]:
74    @staticmethod
75    def try_int(val: Any) -> Optional[int]:
76        try:
77            return int(val)
78        except ValueError:
79            return None
@staticmethod
def try_float(val: Any) -> Optional[float]:
81    @staticmethod
82    def try_float(val: Any) -> Optional[float]:
83        try:
84            return float(val)
85        except ValueError:
86            return None
88    @staticmethod
89    def try_vial_location(val: Any) -> Tray:
90        try:
91            return VialBar(val) if val <= 10 else FiftyFourVialPlate.from_int(num=val)
92        except ValueError:
93            raise ValueError("Expected vial location, is empty.")
def get_row(self, row: int) -> pychemstation.utils.sequence_types.SequenceEntry:
 95    def get_row(self, row: int) -> SequenceEntry:
 96        sample_name = self.get_sample_name(row)
 97        vial_location = self.get_vial_location(row)
 98        data_file = self.get_data_file(row)
 99        method = self.get_method(row)
100        num_inj = self.get_num_inj(row)
101        inj_vol = self.get_inj_vol(row)
102        inj_source = self.get_inj_source(row)
103        sample_type = self.get_sample_type(row)
104        return SequenceEntry(
105            sample_name=sample_name,
106            vial_location=vial_location,
107            method=None if len(method) == 0 else method,
108            num_inj=num_inj,
109            inj_vol=inj_vol,
110            inj_source=inj_source,
111            sample_type=sample_type,
112            data_file=data_file,
113        )
def get_sample_type(self, row):
115    def get_sample_type(self, row):
116        return SampleType(self.get_num(row, RegisterFlag.SAMPLE_TYPE))
def get_inj_source(self, row):
118    def get_inj_source(self, row):
119        return InjectionSource(self.get_text(row, RegisterFlag.INJ_SOR))
def get_inj_vol(self, row):
121    def get_inj_vol(self, row):
122        return self.try_float(self.get_text(row, RegisterFlag.INJ_VOL))
def get_num_inj(self, row):
124    def get_num_inj(self, row):
125        return self.try_int(self.get_num(row, RegisterFlag.NUM_INJ))
def get_method(self, row):
127    def get_method(self, row):
128        return self.get_text(row, RegisterFlag.METHOD)
def get_data_file(self, row):
130    def get_data_file(self, row):
131        return self.get_text(row, RegisterFlag.DATA_FILE)
133    def get_vial_location(self, row) -> Tray:
134        return self.try_vial_location(
135            self.try_int(self.get_num(row, RegisterFlag.VIAL_LOCATION))
136        )
def get_sample_name(self, row):
138    def get_sample_name(self, row):
139        return self.get_text(row, RegisterFlag.NAME)
def switch(self, seq_name: str):
141    def switch(self, seq_name: str):
142        """
143        Switch to the specified sequence. The sequence name does not need the '.S' extension.
144
145        :param seq_name: The name of the sequence file
146        """
147        self.send(f'_SeqFile$ = "{seq_name}.S"')
148        self.send(f'_SeqPath$ = "{self.src}"')
149        self.send(Command.SWITCH_SEQUENCE_CMD)
150        time.sleep(2)
151        parsed_response = self.get_current_sequence_name()
152
153        assert parsed_response == f"{seq_name}.S", "Switching sequence failed."

Switch to the specified sequence. The sequence name does not need the '.S' extension.

Parameters
  • seq_name: The name of the sequence file
def get_current_sequence_name(self):
155    def get_current_sequence_name(self):
156        self.send(Command.GET_SEQUENCE_CMD)
157        time.sleep(2)
158        parsed_response = self.receive().ok_value.string_response
159        return parsed_response
def edit( self, sequence_table: pychemstation.utils.sequence_types.SequenceTable):
161    def edit(self, sequence_table: SequenceTable):
162        """
163        Updates the currently loaded sequence table with the provided table. This method will delete the existing sequence table and remake it.
164        If you would only like to edit a single row of a sequence table, use `edit_sequence_table_row` instead.
165
166        :param sequence_table:
167        """
168        self.table_state = sequence_table
169        rows = self.get_row_count_safely()
170        existing_row_num = rows
171        wanted_row_num = len(sequence_table.rows)
172        for i in range(int(existing_row_num)):
173            self.delete_row(int(existing_row_num - i))
174            self.send(Command.SAVE_SEQUENCE_CMD)
175        for i in range(int(wanted_row_num)):
176            self.add_row()
177            self.download()
178        self.send(Command.SWITCH_SEQUENCE_CMD)
179        for i, row in enumerate(sequence_table.rows):
180            self._edit_row(row=row, row_num=i + 1)
181            self.sleep(1)
182        self.download()
183        self.send(Command.SWITCH_SEQUENCE_CMD)

Updates the currently loaded sequence table with the provided table. This method will delete the existing sequence table and remake it. If you would only like to edit a single row of a sequence table, use edit_sequence_table_row instead.

Parameters
  • sequence_table:
def edit_sample_type( self, sample_type: pychemstation.utils.sequence_types.SampleType, row_num: int, save: bool = True):
215    def edit_sample_type(
216        self, sample_type: SampleType, row_num: int, save: bool = True
217    ):
218        if not isinstance(sample_type, SampleType):
219            raise ValueError("`sample_type` should be of type `SampleType`")
220        self._edit_row_num(
221            row=row_num,
222            col_name=RegisterFlag.SAMPLE_TYPE,
223            val=sample_type.value,
224        )
225        if save:
226            self.download()
def edit_data_file(self, data_file: str, row_num: int, save: bool = True):
228    def edit_data_file(self, data_file: str, row_num: int, save: bool = True):
229        self._edit_row_text(row=row_num, col_name=RegisterFlag.DATA_FILE, val=data_file)
230        if save:
231            self.download()
def edit_sample_name(self, sample_name: str, row_num: int, save: bool = True):
233    def edit_sample_name(self, sample_name: str, row_num: int, save: bool = True):
234        self._edit_row_text(row=row_num, col_name=RegisterFlag.NAME, val=sample_name)
235        if save:
236            self.download()
def edit_injection_source( self, inj_source: pychemstation.utils.sequence_types.InjectionSource, row_num: int, save: bool = True):
238    def edit_injection_source(
239        self, inj_source: InjectionSource, row_num: int, save: bool = True
240    ):
241        if not isinstance(inj_source, InjectionSource):
242            raise ValueError("`inj_source` should be of type `InjectionSource`")
243        self._edit_row_text(
244            row=row_num, col_name=RegisterFlag.INJ_SOR, val=inj_source.value
245        )
246        if save:
247            self.download()
def edit_injection_volume(self, inj_vol: Union[int, float], row_num: int, save: bool = True):
249    def edit_injection_volume(
250        self, inj_vol: Union[int, float], row_num: int, save: bool = True
251    ):
252        self._edit_row_text(
253            row=row_num, col_name=RegisterFlag.INJ_VOL, val=str(inj_vol)
254        )
255        if save:
256            self.download()
def edit_num_injections(self, num_inj: int, row_num: int, save: bool = True):
258    def edit_num_injections(self, num_inj: int, row_num: int, save: bool = True):
259        self._edit_row_num(row=row_num, col_name=RegisterFlag.NUM_INJ, val=num_inj)
260        if save:
261            self.download()
def edit_method_name( self, method: str, row_num: int, save: bool = True, override_check: bool = False):
263    def edit_method_name(
264        self, method: str, row_num: int, save: bool = True, override_check: bool = False
265    ):
266        method_dir = self.method_controller.src
267        possible_path = os.path.join(method_dir, method) + ".M\\"
268        if os.path.exists(possible_path):
269            method = os.path.join(method_dir, method)
270        elif not override_check:
271            raise ValueError(
272                "Method may not exist. If you would still like to use this method, set the `override_check` flag to `True`"
273            )
274        self._edit_row_text(row=row_num, col_name=RegisterFlag.METHOD, val=method)
275        if save:
276            self.download()
def edit_vial_location( self, loc: Union[pychemstation.utils.tray_types.FiftyFourVialPlate, pychemstation.utils.tray_types.VialBar, pychemstation.utils.tray_types.LocationPlus], row_num: int, save: bool = True):
278    def edit_vial_location(self, loc: Tray, row_num: int, save: bool = True):
279        loc_num = -1
280        try:
281            previous_contents = self.get_row(row_num)
282            if (
283                isinstance(loc, VialBar)
284                and isinstance(previous_contents.vial_location, VialBar)
285                or isinstance(loc, FiftyFourVialPlate)
286                and isinstance(previous_contents.vial_location, FiftyFourVialPlate)
287            ):
288                if isinstance(loc, VialBar):
289                    loc_num = loc.value
290                elif isinstance(loc, FiftyFourVialPlate):
291                    loc_num = loc.value()
292                self._edit_row_num(
293                    row=row_num, col_name=RegisterFlag.VIAL_LOCATION, val=loc_num
294                )
295            elif isinstance(loc, VialBar) or isinstance(loc, FiftyFourVialPlate):
296                self.add_row()
297                previous_contents.vial_location = loc
298                num_rows = self.get_row_count_safely()
299                self._edit_row(previous_contents, num_rows)
300                self.move_row(int(num_rows), row_num)
301                self.delete_row(row_num + 1)
302                self.download()
303            else:
304                raise ValueError(
305                    "`loc` should be of type `VialBar`, `FiftyFourVialPlate`"
306                )
307        except Exception:
308            if not (isinstance(loc, VialBar) or isinstance(loc, FiftyFourVialPlate)):
309                raise ValueError(
310                    "`loc` should be of type `VialBar`, `FiftyFourVialPlate`"
311                )
312            if isinstance(loc, VialBar):
313                loc_num = loc.value
314            elif isinstance(loc, FiftyFourVialPlate):
315                loc_num = loc.value()
316            self._edit_row_num(
317                row=row_num, col_name=RegisterFlag.VIAL_LOCATION, val=loc_num
318            )
319        if save:
320            self.download()
def download(self):
322    def download(self):
323        self.send(Command.SAVE_SEQUENCE_CMD)
def run(self, stall_while_running: bool = True):
325    def run(self, stall_while_running: bool = True):
326        """
327        Starts the currently loaded sequence, storing data
328        under the <data_dir>/<sequence table name> folder.
329        Device must be ready.
330        """
331
332        current_sequence_name = self.get_current_sequence_name()
333        if not self.table_state or self.table_state.name not in current_sequence_name:
334            self.table_state = self.load()
335
336        total_runtime = 0.0
337        for entry in self.table_state.rows:
338            curr_method_runtime = self.method_controller.get_total_runtime()
339            loaded_method = self.method_controller.get_method_name().removesuffix(".M")
340            if entry.method:
341                method_path = entry.method.split(sep="\\")
342                method_name = method_path[-1]
343                if loaded_method != method_name:
344                    method_dir = (
345                        "\\".join(method_path[:-1]) + "\\"
346                        if len(method_path) > 1
347                        else None
348                    )
349                    self.method_controller.switch(
350                        method_name=method_name, alt_method_dir=method_dir
351                    )
352                    curr_method_runtime = self.method_controller.get_total_runtime()
353            total_runtime += curr_method_runtime
354
355        timestamp = time.strftime(SEQUENCE_TIME_FORMAT)
356        folder_name = f"{self.table_state.name} {timestamp}"
357
358        self.send(Command.SAVE_METHOD_CMD)
359        self.send(Command.SAVE_SEQUENCE_CMD)
360        self.send(Command.RUN_SEQUENCE_CMD.value)
361        self.timeout = total_runtime * 60
362
363        tries = 10
364        hplc_running = False
365        for _ in range(tries):
366            hplc_running = self.check_hplc_is_running()
367            if hplc_running:
368                break
369            else:
370                self.send(Command.RUN_SEQUENCE_CMD.value)
371
372        if hplc_running:
373            full_path_name, current_sample_file = self.try_getting_run_info(folder_name)
374            if full_path_name and current_sample_file:
375                data_file = SequenceDataFiles(
376                    sequence_name=self.table_state.name,
377                    dir=full_path_name,
378                    child_dirs=[os.path.join(full_path_name, current_sample_file)],
379                )
380                self.data_files.append(data_file)
381            else:
382                raise ValueError("Data directory for sequence was not found.")
383
384            if stall_while_running:
385                run_completed = self.check_hplc_done_running()
386                if run_completed.is_ok():
387                    self.data_files[-1] = run_completed.ok_value
388                else:
389                    warnings.warn(run_completed.err_value)
390        else:
391            raise RuntimeError("Sequence run may not have started.")

Starts the currently loaded sequence, storing data under the / folder. Device must be ready.

def try_getting_run_info(self, folder_name: str) -> Tuple[str, Optional[str]]:
393    def try_getting_run_info(self, folder_name: str) -> Tuple[str, str | None]:
394        full_path_name, current_sample_file = None, None
395        for _ in range(5):
396            try:
397                full_path_name, current_sample_file = (
398                    self.get_current_run_data_dir_file()
399                )
400            except ValueError:
401                pass
402            if current_sample_file and full_path_name:
403                return full_path_name, current_sample_file
404            elif full_path_name:
405                return full_path_name, None
406        raise ValueError("Could not get sequence data folder")
def get_data_mult_uv(self, custom_path: Optional[str] = None):
429    def get_data_mult_uv(self, custom_path: Optional[str] = None):
430        seq_data_dir = (
431            SequenceDataFiles(dir=custom_path, sequence_name="")
432            if custom_path
433            else self.data_files[-1]
434        )
435        search_folder = self._fuzzy_match_most_recent_folder(seq_data_dir)
436        if search_folder.is_ok():
437            seq_data_dir = search_folder.ok_value
438        else:
439            raise FileNotFoundError(search_folder.err_value)
440        all_w_spectra: List[Dict[int, AgilentHPLCChromatogram]] = []
441        for row in seq_data_dir.child_dirs:
442            all_w_spectra.append(self.get_data_uv(custom_path=row))
443        return all_w_spectra
def get_data_uv( self, custom_path: Optional[str] = None) -> Dict[int, pychemstation.analysis.AgilentHPLCChromatogram]:
445    def get_data_uv(
446        self, custom_path: Optional[str] = None
447    ) -> Dict[int, AgilentHPLCChromatogram]:
448        if isinstance(custom_path, str):
449            self.get_uv_spectrum(custom_path)
450            return self.uv
451        raise ValueError(
452            "Path should exist when calling from sequence. Provide a child path (contains the method)."
453        )
def get_data( self, custom_path: Optional[str] = None) -> List[pychemstation.analysis.AgilentChannelChromatogramData]:
455    def get_data(
456        self, custom_path: Optional[str] = None
457    ) -> List[AgilentChannelChromatogramData]:
458        seq_file_dir = (
459            SequenceDataFiles(dir=custom_path, sequence_name="")
460            if custom_path
461            else self.data_files[-1]
462        )
463        self.data_files[-1] = self._fuzzy_match_most_recent_folder(
464            seq_file_dir
465        ).ok_value
466        spectra: List[AgilentChannelChromatogramData] = []
467        for row in self.data_files[-1].child_dirs:
468            self.get_spectrum_at_channels(row)
469            spectra.append(AgilentChannelChromatogramData.from_dict(self.spectra))
470        return spectra
def get_report( self, custom_path: Optional[str] = None, report_type: pychemstation.analysis.process_report.ReportType = <ReportType.TXT: 0>) -> List[pychemstation.analysis.process_report.AgilentReport]:
472    def get_report(
473        self,
474        custom_path: Optional[str] = None,
475        report_type: ReportType = ReportType.TXT,
476    ) -> List[AgilentReport]:
477        if custom_path:
478            self.data_files.append(
479                self._fuzzy_match_most_recent_folder(
480                    most_recent_folder=SequenceDataFiles(
481                        dir=custom_path,
482                        sequence_name="NA",
483                    ),
484                ).ok_value
485            )
486        parent_dir = self.data_files[-1]
487        parent_dir = self._fuzzy_match_most_recent_folder(
488            most_recent_folder=parent_dir,
489        ).ok_value
490        assert len(parent_dir.child_dirs) != 0
491        spectra = self.get_data()
492        reports = []
493        for i, child_dir in enumerate(parent_dir.child_dirs):
494            metd_report = self.get_report_details(child_dir, report_type)
495            child_spectra: List[AgilentHPLCChromatogram] = list(
496                spectra[i].__dict__.values()
497            )
498            for j, signal in enumerate(metd_report.signals):
499                assert len(metd_report.signals) <= len(child_spectra)
500                try:
501                    possible_data = child_spectra[j]
502                    if len(possible_data.x) > 0:
503                        signal.data = possible_data
504                except IndexError:
505                    raise ValueError(j)
506            reports.append(metd_report)
507        return reports