pychemstation.utils.abc_tables.run

Abstract module containing shared logic for Method and Sequence tables.

Authors: Lucy Hao

  1"""
  2Abstract module containing shared logic for Method and Sequence tables.
  3
  4Authors: Lucy Hao
  5"""
  6
  7from __future__ import annotations
  8
  9import abc
 10import math
 11import os
 12import time
 13import warnings
 14from typing import Dict, List, Optional, Tuple, Union, Set
 15
 16import polling
 17import rainbow as rb
 18from result import Err, Ok, Result
 19
 20from ..macro import HPLCRunningStatus, Command
 21from ..method_types import MethodDetails
 22from ..sequence_types import SequenceTable
 23from ..table_types import Table, T
 24from ...analysis.chromatogram import (
 25    AgilentChannelChromatogramData,
 26    AgilentHPLCChromatogram,
 27)
 28
 29from .table import ABCTableController
 30from ...analysis.process_report import (
 31    ReportType,
 32    AgilentReport,
 33    CSVProcessor,
 34    TXTProcessor,
 35)
 36from ...control.controllers import CommunicationController
 37
 38TableType = Union[MethodDetails, SequenceTable]
 39
 40
 41class RunController(ABCTableController, abc.ABC):
 42    """Abstract controller for all tables that can trigger runs on Chemstation.
 43
 44    :param controller: the controller for sending MACROs, must be initialized for a run to be triggered.
 45    :param src: complete directory path where files containing run parameters are stored.
 46    :param data_dirs: list of complete directories that Chemstation will write data to.
 47    :param table: contains register keys for accessing table in Chemstation.
 48    :param offline: whether the communication controller is online.
 49    """
 50
 51    def __init__(
 52        self,
 53        controller: Optional[CommunicationController],
 54        src: Optional[str],
 55        data_dirs: Optional[List[str]],
 56        table: Table,
 57        offline: bool = False,
 58    ):
 59        super().__init__(controller=controller, table=table)
 60        self.table_state: Optional[TableType] = None
 61        self.curr_run_starting_time: Optional[float] = None
 62        self.timeout: Optional[float] = None
 63        self.current_run_child_files: Set[str] = set()
 64
 65        if not offline:
 66            if src and not os.path.isdir(src):
 67                raise FileNotFoundError(f"dir: {src} not found.")
 68            if data_dirs:
 69                for d in data_dirs:
 70                    if not os.path.isdir(d):
 71                        raise FileNotFoundError(f"dir: {d} not found.")
 72                    if r"\\" in d:
 73                        raise ValueError("Data directories should not be raw strings!")
 74            if src and data_dirs:
 75                self.src: str = src
 76                self.data_dirs: List[str] = data_dirs
 77
 78        self.spectra: dict[str, AgilentHPLCChromatogram] = {
 79            "A": AgilentHPLCChromatogram(),
 80            "B": AgilentHPLCChromatogram(),
 81            "C": AgilentHPLCChromatogram(),
 82            "D": AgilentHPLCChromatogram(),
 83            "E": AgilentHPLCChromatogram(),
 84            "F": AgilentHPLCChromatogram(),
 85            "G": AgilentHPLCChromatogram(),
 86            "H": AgilentHPLCChromatogram(),
 87        }
 88        self.uv: Dict[int, AgilentHPLCChromatogram] = {}
 89        self.data_files: List = []
 90
 91    def __new__(cls, *args, **kwargs):
 92        if cls is RunController:
 93            raise TypeError(f"only children of '{cls.__name__}' may be instantiated")
 94        return object.__new__(cls)
 95
 96    @abc.abstractmethod
 97    def _fuzzy_match_most_recent_folder(self, most_recent_folder: T) -> Result[T, str]:
 98        pass
 99
100    @abc.abstractmethod
101    def get_data(
102        self, custom_path: Optional[str] = None
103    ) -> Union[List[AgilentChannelChromatogramData], AgilentChannelChromatogramData]:
104        pass
105
106    @abc.abstractmethod
107    def get_data_uv(
108        self, custom_path: str | None = None
109    ) -> Dict[int, AgilentHPLCChromatogram]:
110        pass
111
112    @abc.abstractmethod
113    def get_report(
114        self, custom_path: str, report_type: ReportType = ReportType.TXT
115    ) -> List[AgilentReport]:
116        pass
117
118    def check_hplc_is_running(self) -> bool:
119        if self.controller:
120            try:
121                started_running = polling.poll(
122                    lambda: isinstance(self.controller.get_status(), HPLCRunningStatus),
123                    step=1,
124                    max_tries=20,
125                )
126            except Exception as e:
127                print(e)
128                return False
129            if started_running:
130                self.curr_run_starting_time = time.time()
131            return started_running
132        else:
133            raise ValueError("Controller is offline")
134
135    def check_hplc_run_finished(self) -> Tuple[float, bool]:
136        if self.controller:
137            try:
138                _, current_run_file = self.get_current_run_data_dir_file()
139                sample_file, extension, _ = current_run_file.partition(".D")
140                self.current_run_child_files.add(sample_file)
141            except Exception:
142                pass
143            done_running = self.controller.check_if_not_running()
144            if self.curr_run_starting_time and self.timeout:
145                time_passed = time.time() - self.curr_run_starting_time
146                if time_passed > self.timeout:
147                    enough_time_passed = time_passed >= self.timeout
148                    run_finished = enough_time_passed and done_running
149                    if run_finished:
150                        self._reset_time()
151                        return 0, run_finished
152                else:
153                    time_left = self.timeout - time_passed
154                    return time_left, self.controller.check_if_not_running()
155            return 0, self.controller.check_if_not_running()
156        raise ValueError("Controller is offline!")
157
158    def check_hplc_done_running(self) -> Ok[T] | Err[str]:
159        """Checks if ChemStation has finished running and can read data back
160
161        :return: Data file object containing most recent run file information.
162        """
163        self.current_run_child_files = set()
164        if self.timeout is not None:
165            finished_run = False
166            minutes = math.ceil(self.timeout / 60)
167            try:
168                finished_run = not polling.poll(
169                    lambda: self.check_hplc_run_finished()[1],
170                    max_tries=minutes - 1,
171                    step=50,
172                )
173            except (
174                polling.TimeoutException,
175                polling.PollingException,
176                polling.MaxCallException,
177            ):
178                try:
179                    finished_run = polling.poll(
180                        lambda: self.check_hplc_run_finished()[1],
181                        timeout=self.timeout / 2,
182                        step=1,
183                    )
184                except (
185                    polling.TimeoutException,
186                    polling.PollingException,
187                    polling.MaxCallException,
188                ):
189                    pass
190        else:
191            raise ValueError("Timeout value is None, no comparison can be made.")
192
193        check_folder = self._fuzzy_match_most_recent_folder(self.data_files[-1])
194        if check_folder.is_ok() and finished_run:
195            return check_folder
196        elif check_folder.is_ok():
197            try:
198                finished_run = polling.poll(
199                    lambda: self.check_hplc_run_finished()[1], max_tries=10, step=50
200                )
201                if finished_run:
202                    return check_folder
203            except Exception:
204                self._reset_time()
205                return self.data_files[-1]
206        return Err("Run not may not have completed.")
207
208    def get_uv_spectrum(self, path: str):
209        data_uv = rb.agilent.chemstation.parse_file(os.path.join(path, "DAD1.UV"))
210        times = data_uv.xlabels
211        wavelengths = data_uv.ylabels
212        absorbances = data_uv.data.transpose()
213        for i, w in enumerate(wavelengths):
214            self.uv[w] = AgilentHPLCChromatogram()
215            self.uv[w].attach_spectrum(times, absorbances[i])
216
217    def get_report_details(
218        self, path: str, report_type: ReportType = ReportType.TXT
219    ) -> AgilentReport:
220        if report_type is ReportType.TXT:
221            txt_report = TXTProcessor(path).process_report()
222            if txt_report.is_ok():
223                return txt_report.ok_value
224            elif txt_report.is_err():
225                raise ValueError(txt_report.err_value)
226        if report_type is ReportType.CSV:
227            csv_report = CSVProcessor(path).process_report()
228            if csv_report.is_ok():
229                return csv_report.ok_value
230            elif csv_report.is_err():
231                raise ValueError(csv_report.err_value)
232        raise ValueError("Expected one of ReportType.TXT or ReportType.CSV")
233
234    def get_spectrum_at_channels(self, data_path: str):
235        """Load chromatogram for any channel in spectra dictionary."""
236        for channel, spec in self.spectra.items():
237            try:
238                spec.load_spectrum(data_path=data_path, channel=channel)
239            except FileNotFoundError:
240                self.spectra[channel] = AgilentHPLCChromatogram()
241                warning = f"No data at channel: {channel}"
242                warnings.warn(warning)
243
244    def _reset_time(self):
245        self.curr_run_starting_time = None
246        self.timeout = None
247
248    def get_current_run_data_dir_file(self) -> Tuple[str, str]:
249        self.send(Command.GET_CURRENT_RUN_DATA_DIR)
250        full_path_name = self.receive()
251        self.send(Command.GET_CURRENT_RUN_DATA_FILE)
252        current_sample_file = self.receive()
253        if full_path_name.is_ok() and current_sample_file.is_ok():
254            if os.path.isdir(full_path_name.ok_value.string_response) and os.path.isdir(
255                os.path.join(
256                    full_path_name.ok_value.string_response,
257                    current_sample_file.ok_value.string_response,
258                )
259            ):
260                return (
261                    full_path_name.ok_value.string_response,
262                    current_sample_file.ok_value.string_response,
263                )
264        raise ValueError("Couldn't read data dir and file or doesn't exist yet.")
class RunController(pychemstation.utils.abc_tables.table.ABCTableController, abc.ABC):
 42class RunController(ABCTableController, abc.ABC):
 43    """Abstract controller for all tables that can trigger runs on Chemstation.
 44
 45    :param controller: the controller for sending MACROs, must be initialized for a run to be triggered.
 46    :param src: complete directory path where files containing run parameters are stored.
 47    :param data_dirs: list of complete directories that Chemstation will write data to.
 48    :param table: contains register keys for accessing table in Chemstation.
 49    :param offline: whether the communication controller is online.
 50    """
 51
 52    def __init__(
 53        self,
 54        controller: Optional[CommunicationController],
 55        src: Optional[str],
 56        data_dirs: Optional[List[str]],
 57        table: Table,
 58        offline: bool = False,
 59    ):
 60        super().__init__(controller=controller, table=table)
 61        self.table_state: Optional[TableType] = None
 62        self.curr_run_starting_time: Optional[float] = None
 63        self.timeout: Optional[float] = None
 64        self.current_run_child_files: Set[str] = set()
 65
 66        if not offline:
 67            if src and not os.path.isdir(src):
 68                raise FileNotFoundError(f"dir: {src} not found.")
 69            if data_dirs:
 70                for d in data_dirs:
 71                    if not os.path.isdir(d):
 72                        raise FileNotFoundError(f"dir: {d} not found.")
 73                    if r"\\" in d:
 74                        raise ValueError("Data directories should not be raw strings!")
 75            if src and data_dirs:
 76                self.src: str = src
 77                self.data_dirs: List[str] = data_dirs
 78
 79        self.spectra: dict[str, AgilentHPLCChromatogram] = {
 80            "A": AgilentHPLCChromatogram(),
 81            "B": AgilentHPLCChromatogram(),
 82            "C": AgilentHPLCChromatogram(),
 83            "D": AgilentHPLCChromatogram(),
 84            "E": AgilentHPLCChromatogram(),
 85            "F": AgilentHPLCChromatogram(),
 86            "G": AgilentHPLCChromatogram(),
 87            "H": AgilentHPLCChromatogram(),
 88        }
 89        self.uv: Dict[int, AgilentHPLCChromatogram] = {}
 90        self.data_files: List = []
 91
 92    def __new__(cls, *args, **kwargs):
 93        if cls is RunController:
 94            raise TypeError(f"only children of '{cls.__name__}' may be instantiated")
 95        return object.__new__(cls)
 96
 97    @abc.abstractmethod
 98    def _fuzzy_match_most_recent_folder(self, most_recent_folder: T) -> Result[T, str]:
 99        pass
100
101    @abc.abstractmethod
102    def get_data(
103        self, custom_path: Optional[str] = None
104    ) -> Union[List[AgilentChannelChromatogramData], AgilentChannelChromatogramData]:
105        pass
106
107    @abc.abstractmethod
108    def get_data_uv(
109        self, custom_path: str | None = None
110    ) -> Dict[int, AgilentHPLCChromatogram]:
111        pass
112
113    @abc.abstractmethod
114    def get_report(
115        self, custom_path: str, report_type: ReportType = ReportType.TXT
116    ) -> List[AgilentReport]:
117        pass
118
119    def check_hplc_is_running(self) -> bool:
120        if self.controller:
121            try:
122                started_running = polling.poll(
123                    lambda: isinstance(self.controller.get_status(), HPLCRunningStatus),
124                    step=1,
125                    max_tries=20,
126                )
127            except Exception as e:
128                print(e)
129                return False
130            if started_running:
131                self.curr_run_starting_time = time.time()
132            return started_running
133        else:
134            raise ValueError("Controller is offline")
135
136    def check_hplc_run_finished(self) -> Tuple[float, bool]:
137        if self.controller:
138            try:
139                _, current_run_file = self.get_current_run_data_dir_file()
140                sample_file, extension, _ = current_run_file.partition(".D")
141                self.current_run_child_files.add(sample_file)
142            except Exception:
143                pass
144            done_running = self.controller.check_if_not_running()
145            if self.curr_run_starting_time and self.timeout:
146                time_passed = time.time() - self.curr_run_starting_time
147                if time_passed > self.timeout:
148                    enough_time_passed = time_passed >= self.timeout
149                    run_finished = enough_time_passed and done_running
150                    if run_finished:
151                        self._reset_time()
152                        return 0, run_finished
153                else:
154                    time_left = self.timeout - time_passed
155                    return time_left, self.controller.check_if_not_running()
156            return 0, self.controller.check_if_not_running()
157        raise ValueError("Controller is offline!")
158
159    def check_hplc_done_running(self) -> Ok[T] | Err[str]:
160        """Checks if ChemStation has finished running and can read data back
161
162        :return: Data file object containing most recent run file information.
163        """
164        self.current_run_child_files = set()
165        if self.timeout is not None:
166            finished_run = False
167            minutes = math.ceil(self.timeout / 60)
168            try:
169                finished_run = not polling.poll(
170                    lambda: self.check_hplc_run_finished()[1],
171                    max_tries=minutes - 1,
172                    step=50,
173                )
174            except (
175                polling.TimeoutException,
176                polling.PollingException,
177                polling.MaxCallException,
178            ):
179                try:
180                    finished_run = polling.poll(
181                        lambda: self.check_hplc_run_finished()[1],
182                        timeout=self.timeout / 2,
183                        step=1,
184                    )
185                except (
186                    polling.TimeoutException,
187                    polling.PollingException,
188                    polling.MaxCallException,
189                ):
190                    pass
191        else:
192            raise ValueError("Timeout value is None, no comparison can be made.")
193
194        check_folder = self._fuzzy_match_most_recent_folder(self.data_files[-1])
195        if check_folder.is_ok() and finished_run:
196            return check_folder
197        elif check_folder.is_ok():
198            try:
199                finished_run = polling.poll(
200                    lambda: self.check_hplc_run_finished()[1], max_tries=10, step=50
201                )
202                if finished_run:
203                    return check_folder
204            except Exception:
205                self._reset_time()
206                return self.data_files[-1]
207        return Err("Run not may not have completed.")
208
209    def get_uv_spectrum(self, path: str):
210        data_uv = rb.agilent.chemstation.parse_file(os.path.join(path, "DAD1.UV"))
211        times = data_uv.xlabels
212        wavelengths = data_uv.ylabels
213        absorbances = data_uv.data.transpose()
214        for i, w in enumerate(wavelengths):
215            self.uv[w] = AgilentHPLCChromatogram()
216            self.uv[w].attach_spectrum(times, absorbances[i])
217
218    def get_report_details(
219        self, path: str, report_type: ReportType = ReportType.TXT
220    ) -> AgilentReport:
221        if report_type is ReportType.TXT:
222            txt_report = TXTProcessor(path).process_report()
223            if txt_report.is_ok():
224                return txt_report.ok_value
225            elif txt_report.is_err():
226                raise ValueError(txt_report.err_value)
227        if report_type is ReportType.CSV:
228            csv_report = CSVProcessor(path).process_report()
229            if csv_report.is_ok():
230                return csv_report.ok_value
231            elif csv_report.is_err():
232                raise ValueError(csv_report.err_value)
233        raise ValueError("Expected one of ReportType.TXT or ReportType.CSV")
234
235    def get_spectrum_at_channels(self, data_path: str):
236        """Load chromatogram for any channel in spectra dictionary."""
237        for channel, spec in self.spectra.items():
238            try:
239                spec.load_spectrum(data_path=data_path, channel=channel)
240            except FileNotFoundError:
241                self.spectra[channel] = AgilentHPLCChromatogram()
242                warning = f"No data at channel: {channel}"
243                warnings.warn(warning)
244
245    def _reset_time(self):
246        self.curr_run_starting_time = None
247        self.timeout = None
248
249    def get_current_run_data_dir_file(self) -> Tuple[str, str]:
250        self.send(Command.GET_CURRENT_RUN_DATA_DIR)
251        full_path_name = self.receive()
252        self.send(Command.GET_CURRENT_RUN_DATA_FILE)
253        current_sample_file = self.receive()
254        if full_path_name.is_ok() and current_sample_file.is_ok():
255            if os.path.isdir(full_path_name.ok_value.string_response) and os.path.isdir(
256                os.path.join(
257                    full_path_name.ok_value.string_response,
258                    current_sample_file.ok_value.string_response,
259                )
260            ):
261                return (
262                    full_path_name.ok_value.string_response,
263                    current_sample_file.ok_value.string_response,
264                )
265        raise ValueError("Couldn't read data dir and file or doesn't exist yet.")

Abstract controller for all tables that can trigger runs on Chemstation.

Parameters
  • controller: the controller for sending MACROs, must be initialized for a run to be triggered.
  • src: complete directory path where files containing run parameters are stored.
  • data_dirs: list of complete directories that Chemstation will write data to.
  • table: contains register keys for accessing table in Chemstation.
  • offline: whether the communication controller is online.
curr_run_starting_time: Optional[float]
timeout: Optional[float]
current_run_child_files: Set[str]
data_files: List
@abc.abstractmethod
def get_data( self, custom_path: Optional[str] = None) -> Union[List[pychemstation.analysis.AgilentChannelChromatogramData], pychemstation.analysis.AgilentChannelChromatogramData]:
101    @abc.abstractmethod
102    def get_data(
103        self, custom_path: Optional[str] = None
104    ) -> Union[List[AgilentChannelChromatogramData], AgilentChannelChromatogramData]:
105        pass
@abc.abstractmethod
def get_data_uv( self, custom_path: str | None = None) -> Dict[int, pychemstation.analysis.AgilentHPLCChromatogram]:
107    @abc.abstractmethod
108    def get_data_uv(
109        self, custom_path: str | None = None
110    ) -> Dict[int, AgilentHPLCChromatogram]:
111        pass
@abc.abstractmethod
def get_report( self, custom_path: str, report_type: pychemstation.analysis.process_report.ReportType = <ReportType.TXT: 0>) -> List[pychemstation.analysis.process_report.AgilentReport]:
113    @abc.abstractmethod
114    def get_report(
115        self, custom_path: str, report_type: ReportType = ReportType.TXT
116    ) -> List[AgilentReport]:
117        pass
def check_hplc_is_running(self) -> bool:
119    def check_hplc_is_running(self) -> bool:
120        if self.controller:
121            try:
122                started_running = polling.poll(
123                    lambda: isinstance(self.controller.get_status(), HPLCRunningStatus),
124                    step=1,
125                    max_tries=20,
126                )
127            except Exception as e:
128                print(e)
129                return False
130            if started_running:
131                self.curr_run_starting_time = time.time()
132            return started_running
133        else:
134            raise ValueError("Controller is offline")
def check_hplc_run_finished(self) -> Tuple[float, bool]:
136    def check_hplc_run_finished(self) -> Tuple[float, bool]:
137        if self.controller:
138            try:
139                _, current_run_file = self.get_current_run_data_dir_file()
140                sample_file, extension, _ = current_run_file.partition(".D")
141                self.current_run_child_files.add(sample_file)
142            except Exception:
143                pass
144            done_running = self.controller.check_if_not_running()
145            if self.curr_run_starting_time and self.timeout:
146                time_passed = time.time() - self.curr_run_starting_time
147                if time_passed > self.timeout:
148                    enough_time_passed = time_passed >= self.timeout
149                    run_finished = enough_time_passed and done_running
150                    if run_finished:
151                        self._reset_time()
152                        return 0, run_finished
153                else:
154                    time_left = self.timeout - time_passed
155                    return time_left, self.controller.check_if_not_running()
156            return 0, self.controller.check_if_not_running()
157        raise ValueError("Controller is offline!")
def check_hplc_done_running(self) -> Union[result.result.Ok[~T], result.result.Err[str]]:
159    def check_hplc_done_running(self) -> Ok[T] | Err[str]:
160        """Checks if ChemStation has finished running and can read data back
161
162        :return: Data file object containing most recent run file information.
163        """
164        self.current_run_child_files = set()
165        if self.timeout is not None:
166            finished_run = False
167            minutes = math.ceil(self.timeout / 60)
168            try:
169                finished_run = not polling.poll(
170                    lambda: self.check_hplc_run_finished()[1],
171                    max_tries=minutes - 1,
172                    step=50,
173                )
174            except (
175                polling.TimeoutException,
176                polling.PollingException,
177                polling.MaxCallException,
178            ):
179                try:
180                    finished_run = polling.poll(
181                        lambda: self.check_hplc_run_finished()[1],
182                        timeout=self.timeout / 2,
183                        step=1,
184                    )
185                except (
186                    polling.TimeoutException,
187                    polling.PollingException,
188                    polling.MaxCallException,
189                ):
190                    pass
191        else:
192            raise ValueError("Timeout value is None, no comparison can be made.")
193
194        check_folder = self._fuzzy_match_most_recent_folder(self.data_files[-1])
195        if check_folder.is_ok() and finished_run:
196            return check_folder
197        elif check_folder.is_ok():
198            try:
199                finished_run = polling.poll(
200                    lambda: self.check_hplc_run_finished()[1], max_tries=10, step=50
201                )
202                if finished_run:
203                    return check_folder
204            except Exception:
205                self._reset_time()
206                return self.data_files[-1]
207        return Err("Run not may not have completed.")

Checks if ChemStation has finished running and can read data back

Returns

Data file object containing most recent run file information.

def get_uv_spectrum(self, path: str):
209    def get_uv_spectrum(self, path: str):
210        data_uv = rb.agilent.chemstation.parse_file(os.path.join(path, "DAD1.UV"))
211        times = data_uv.xlabels
212        wavelengths = data_uv.ylabels
213        absorbances = data_uv.data.transpose()
214        for i, w in enumerate(wavelengths):
215            self.uv[w] = AgilentHPLCChromatogram()
216            self.uv[w].attach_spectrum(times, absorbances[i])
def get_report_details( self, path: str, report_type: pychemstation.analysis.process_report.ReportType = <ReportType.TXT: 0>) -> pychemstation.analysis.process_report.AgilentReport:
218    def get_report_details(
219        self, path: str, report_type: ReportType = ReportType.TXT
220    ) -> AgilentReport:
221        if report_type is ReportType.TXT:
222            txt_report = TXTProcessor(path).process_report()
223            if txt_report.is_ok():
224                return txt_report.ok_value
225            elif txt_report.is_err():
226                raise ValueError(txt_report.err_value)
227        if report_type is ReportType.CSV:
228            csv_report = CSVProcessor(path).process_report()
229            if csv_report.is_ok():
230                return csv_report.ok_value
231            elif csv_report.is_err():
232                raise ValueError(csv_report.err_value)
233        raise ValueError("Expected one of ReportType.TXT or ReportType.CSV")
def get_spectrum_at_channels(self, data_path: str):
235    def get_spectrum_at_channels(self, data_path: str):
236        """Load chromatogram for any channel in spectra dictionary."""
237        for channel, spec in self.spectra.items():
238            try:
239                spec.load_spectrum(data_path=data_path, channel=channel)
240            except FileNotFoundError:
241                self.spectra[channel] = AgilentHPLCChromatogram()
242                warning = f"No data at channel: {channel}"
243                warnings.warn(warning)

Load chromatogram for any channel in spectra dictionary.

def get_current_run_data_dir_file(self) -> Tuple[str, str]:
249    def get_current_run_data_dir_file(self) -> Tuple[str, str]:
250        self.send(Command.GET_CURRENT_RUN_DATA_DIR)
251        full_path_name = self.receive()
252        self.send(Command.GET_CURRENT_RUN_DATA_FILE)
253        current_sample_file = self.receive()
254        if full_path_name.is_ok() and current_sample_file.is_ok():
255            if os.path.isdir(full_path_name.ok_value.string_response) and os.path.isdir(
256                os.path.join(
257                    full_path_name.ok_value.string_response,
258                    current_sample_file.ok_value.string_response,
259                )
260            ):
261                return (
262                    full_path_name.ok_value.string_response,
263                    current_sample_file.ok_value.string_response,
264                )
265        raise ValueError("Couldn't read data dir and file or doesn't exist yet.")