pychemstation.utils.parsing

File parser for Chemstation files (*.ch) Basically a port of the matlab script at: https://github.com/chemplexity/chromatography/blob/master/Development/File%20Conversion/ImportAgilentFID.m

This file is a standalone file to parse the binary files created by Chemstation

I use it for file with version 130, genereted by an Agilent LC.

  1#!/usr/bin/python
  2# coding: utf-8
  3
  4"""
  5File parser for Chemstation files (*.ch)
  6Basically a port of the matlab script at:
  7https://github.com/chemplexity/chromatography/blob/master/Development/File%20Conversion/ImportAgilentFID.m
  8
  9This file is a standalone file to parse the binary files created by Chemstation
 10
 11I use it for file with version 130, genereted by an Agilent LC.
 12"""
 13
 14import struct
 15from struct import unpack
 16
 17import numpy as np
 18
 19# Constants used for binary file parsing
 20ENDIAN = ">"
 21STRING = ENDIAN + "{}s"
 22UINT8 = ENDIAN + "B"
 23UINT16 = ENDIAN + "H"
 24INT16 = ENDIAN + "h"
 25INT32 = ENDIAN + "i"
 26UINT32 = ENDIAN + "I"
 27
 28
 29def fread(fid, nelements, dtype):
 30    """Equivalent to Matlab fread function"""
 31
 32    if dtype is str:
 33        dt = np.uint8  # WARNING: assuming 8-bit ASCII for np.str!
 34    else:
 35        dt = dtype
 36
 37    data_array = np.fromfile(fid, dt, nelements)
 38    data_array.shape = (nelements, 1)
 39
 40    return data_array
 41
 42
 43def parse_utf16_string(file_, encoding="UTF16"):
 44    """Parse a pascal type UTF16 encoded string from a binary file object"""
 45
 46    # First read the expected number of CHARACTERS
 47    string_length = unpack(UINT8, file_.read(1))[0]
 48    # Then read and decode
 49    parsed = unpack(STRING.format(2 * string_length), file_.read(2 * string_length))
 50    return parsed[0].decode(encoding)
 51
 52
 53class cached_property(object):
 54    """A property that is only computed once per instance and then replaces
 55    itself with an ordinary attribute. Deleting the attribute resets the
 56    property.
 57
 58    https://github.com/bottlepy/bottle/commit/fa7733e075da0d790d809aa3d2f53071897e6f76
 59    """
 60
 61    def __init__(self, func):
 62        self.__doc__ = getattr(func, "__doc__")
 63        self.func = func
 64
 65    def __get__(self, obj, cls):
 66        if obj is None:
 67            return self
 68        value = obj.__dict__[self.func.__name__] = self.func(obj)
 69        return value
 70
 71
 72class CHFile(object):
 73    """Class that implementats the Agilent .ch file format version
 74    130. Warning: Not all aspects of the file header is understood,
 75    so there may and probably is information that is not parsed. See
 76    _parse_header_status for an overview of which parts of the header
 77    is understood.
 78
 79    Attributes:
 80        values (numpy.array): The internsity values (y-value) or the
 81        spectrum. The unit for the values is given in `metadata['units']`
 82
 83        metadata (dict): The extracted metadata
 84
 85        filepath (str): The filepath this object was loaded from
 86    """
 87
 88    # Fields is a table of name, offset and type. Types 'x-time' and 'utf16'
 89    # are specially handled, the rest are format arguments for struct unpack
 90    fields = (
 91        ("sequence_line_or_injection", 252, UINT16),
 92        ("injection_or_sequence_line", 256, UINT16),
 93        ("data_offset", 264, UINT32),
 94        ("start_time", 282, "x-time"),
 95        ("end_time", 286, "x-time"),
 96        ("version_string", 326, "utf16"),
 97        ("description", 347, "utf16"),
 98        ("sample", 858, "utf16"),
 99        ("operator", 1880, "utf16"),
100        ("date", 2391, "utf16"),
101        ("inlet", 2492, "utf16"),
102        ("instrument", 2533, "utf16"),
103        ("method", 2574, "utf16"),
104        ("software version", 3601, "utf16"),
105        ("software name", 3089, "utf16"),
106        ("software revision", 3802, "utf16"),
107        ("zero", 4110, INT32),
108        ("units", 4172, "utf16"),
109        ("detector", 4213, "utf16"),
110        ("yscaling", 4732, ENDIAN + "d"),
111    )
112
113    # The start position of the data
114    # Get it from metadata['data_offset'] * 512
115    data_start = 6144
116
117    # The versions of the file format supported by this implementation
118    supported_versions = {130}
119
120    def __init__(self, filepath):
121        self.filepath = filepath
122        self.metadata = {}
123        with open(self.filepath, "rb") as file_:
124            self._parse_header(file_)
125            self.values = self._parse_data(file_)
126
127    def _parse_header(self, file_):
128        """Parse the header"""
129
130        # Parse and check version
131        length = unpack(UINT8, file_.read(1))[0]
132        parsed = unpack(STRING.format(length), file_.read(length))
133        version = int(parsed[0])
134        if version not in self.supported_versions:
135            raise ValueError("Unsupported file version {}".format(version))
136        self.metadata["magic_number_version"] = version
137
138        # Parse all metadata fields
139        for name, offset, type_ in self.fields:
140            file_.seek(offset)
141            if type_ == "utf16":
142                self.metadata[name] = parse_utf16_string(file_)
143            elif type_ == "x-time":
144                self.metadata[name] = unpack(UINT32, file_.read(4))[0] / 60000
145            else:
146                self.metadata[name] = unpack(type_, file_.read(struct.calcsize(type_)))[
147                    0
148                ]
149
150    def _parse_header_status(self):
151        """Print known and unknown parts of the header"""
152
153        file_ = open(self.filepath, "rb")
154
155        print("Header parsing status")
156        # Map positions to fields for all the known fields
157        knowns = {item[1]: item for item in self.fields}
158        # A couple of places has a \x01 byte before a string, these we simply
159        # skip
160        skips = {325, 3600}
161        # Jump to after the magic number version
162        file_.seek(4)
163
164        # Initialize variables for unknown bytes
165        unknown_start = None
166        unknown_bytes = b""
167        # While we have not yet reached the data
168        while file_.tell() < self.data_start:
169            current_position = file_.tell()
170            # Just continue on skip bytes
171            if current_position in skips:
172                file_.read(1)
173                continue
174
175            # If we know about a data field that starts at this point
176            if current_position in knowns:
177                # If we have collected unknown bytes, print them out and reset
178                if unknown_bytes != b"":
179                    print(
180                        "Unknown at", unknown_start, repr(unknown_bytes.rstrip(b"\x00"))
181                    )
182                    unknown_bytes = b""
183                    unknown_start = None
184
185                # Print out the position, type, name and value of the known
186                # value
187                print("Known field at {: >4},".format(current_position), end=" ")
188                name, _, type_ = knowns[current_position]
189                if type_ == "x-time":
190                    print(
191                        'x-time, "{: <19}'.format(name + '"'),
192                        unpack(ENDIAN + "f", file_.read(4))[0] / 60000,
193                    )
194                elif type_ == "utf16":
195                    print(
196                        ' utf16, "{: <19}'.format(name + '"'), parse_utf16_string(file_)
197                    )
198                else:
199                    size = struct.calcsize(type_)
200                    print(
201                        '{: >6}, "{: <19}'.format(type_, name + '"'),
202                        unpack(type_, file_.read(size))[0],
203                    )
204
205            # We do not know about a data field at this position If we have
206            # already collected 4 zero bytes, assume that we are done with
207            # this unkonw field, print and reset
208            else:
209                if unknown_bytes[-4:] == b"\x00\x00\x00\x00":
210                    print(
211                        "Unknown at", unknown_start, repr(unknown_bytes.rstrip(b"\x00"))
212                    )
213                    unknown_bytes = b""
214                    unknown_start = None
215
216                # Read one byte and save it
217                one_byte = file_.read(1)
218                if unknown_bytes == b"":
219                    # Only start a new collection of unknown bytes, if this
220                    # byte is not a zero byte
221                    if one_byte != b"\x00":
222                        unknown_bytes = one_byte
223                        unknown_start = file_.tell() - 1
224                else:
225                    unknown_bytes += one_byte
226
227        file_.close()
228
229    def _parse_data(self, file_):
230        """Parse the data. Decompress the delta-encoded data, and scale them
231        with y-scaling"""
232
233        scaling = self.metadata["yscaling"]
234
235        # Go to the end of the file
236        file_.seek(0, 2)
237        stop = file_.tell()
238
239        # Go to the start point of the data
240        file_.seek(self.data_start)
241
242        signal = []
243
244        buff = [0, 0, 0, 0]
245
246        while file_.tell() < stop:
247            buff[0] = fread(file_, 1, INT16)[0][0]
248            buff[1] = buff[3]
249
250            if buff[0] << 12 == 0:
251                break
252
253            for i in range(buff[0] & 4095):
254                buff[2] = fread(file_, 1, INT16)[0][0]
255
256                if buff[2] != -32768:
257                    buff[1] = buff[1] + buff[2]
258                else:
259                    buff[1] = fread(file_, 1, INT32)[0][0]
260
261                signal.append(buff[1])
262
263            buff[3] = buff[1]
264
265        signal = np.array(signal)
266        signal = signal * scaling
267
268        return signal
269
270    @cached_property
271    def times(self):
272        """The time values (x-value) for the data set in minutes"""
273
274        return np.linspace(
275            self.metadata["start_time"], self.metadata["end_time"], len(self.values)
276        )
277
278
279if __name__ == "__main__":
280    CHFile("lcdiag.reg")
ENDIAN = '>'
STRING = '>{}s'
UINT8 = '>B'
UINT16 = '>H'
INT16 = '>h'
INT32 = '>i'
UINT32 = '>I'
def fread(fid, nelements, dtype):
30def fread(fid, nelements, dtype):
31    """Equivalent to Matlab fread function"""
32
33    if dtype is str:
34        dt = np.uint8  # WARNING: assuming 8-bit ASCII for np.str!
35    else:
36        dt = dtype
37
38    data_array = np.fromfile(fid, dt, nelements)
39    data_array.shape = (nelements, 1)
40
41    return data_array

Equivalent to Matlab fread function

def parse_utf16_string(file_, encoding='UTF16'):
44def parse_utf16_string(file_, encoding="UTF16"):
45    """Parse a pascal type UTF16 encoded string from a binary file object"""
46
47    # First read the expected number of CHARACTERS
48    string_length = unpack(UINT8, file_.read(1))[0]
49    # Then read and decode
50    parsed = unpack(STRING.format(2 * string_length), file_.read(2 * string_length))
51    return parsed[0].decode(encoding)

Parse a pascal type UTF16 encoded string from a binary file object

class cached_property:
54class cached_property(object):
55    """A property that is only computed once per instance and then replaces
56    itself with an ordinary attribute. Deleting the attribute resets the
57    property.
58
59    https://github.com/bottlepy/bottle/commit/fa7733e075da0d790d809aa3d2f53071897e6f76
60    """
61
62    def __init__(self, func):
63        self.__doc__ = getattr(func, "__doc__")
64        self.func = func
65
66    def __get__(self, obj, cls):
67        if obj is None:
68            return self
69        value = obj.__dict__[self.func.__name__] = self.func(obj)
70        return value

A property that is only computed once per instance and then replaces itself with an ordinary attribute. Deleting the attribute resets the property.

https://github.com/bottlepy/bottle/commit/fa7733e075da0d790d809aa3d2f53071897e6f76

cached_property(func)
62    def __init__(self, func):
63        self.__doc__ = getattr(func, "__doc__")
64        self.func = func
func
class CHFile:
 73class CHFile(object):
 74    """Class that implementats the Agilent .ch file format version
 75    130. Warning: Not all aspects of the file header is understood,
 76    so there may and probably is information that is not parsed. See
 77    _parse_header_status for an overview of which parts of the header
 78    is understood.
 79
 80    Attributes:
 81        values (numpy.array): The internsity values (y-value) or the
 82        spectrum. The unit for the values is given in `metadata['units']`
 83
 84        metadata (dict): The extracted metadata
 85
 86        filepath (str): The filepath this object was loaded from
 87    """
 88
 89    # Fields is a table of name, offset and type. Types 'x-time' and 'utf16'
 90    # are specially handled, the rest are format arguments for struct unpack
 91    fields = (
 92        ("sequence_line_or_injection", 252, UINT16),
 93        ("injection_or_sequence_line", 256, UINT16),
 94        ("data_offset", 264, UINT32),
 95        ("start_time", 282, "x-time"),
 96        ("end_time", 286, "x-time"),
 97        ("version_string", 326, "utf16"),
 98        ("description", 347, "utf16"),
 99        ("sample", 858, "utf16"),
100        ("operator", 1880, "utf16"),
101        ("date", 2391, "utf16"),
102        ("inlet", 2492, "utf16"),
103        ("instrument", 2533, "utf16"),
104        ("method", 2574, "utf16"),
105        ("software version", 3601, "utf16"),
106        ("software name", 3089, "utf16"),
107        ("software revision", 3802, "utf16"),
108        ("zero", 4110, INT32),
109        ("units", 4172, "utf16"),
110        ("detector", 4213, "utf16"),
111        ("yscaling", 4732, ENDIAN + "d"),
112    )
113
114    # The start position of the data
115    # Get it from metadata['data_offset'] * 512
116    data_start = 6144
117
118    # The versions of the file format supported by this implementation
119    supported_versions = {130}
120
121    def __init__(self, filepath):
122        self.filepath = filepath
123        self.metadata = {}
124        with open(self.filepath, "rb") as file_:
125            self._parse_header(file_)
126            self.values = self._parse_data(file_)
127
128    def _parse_header(self, file_):
129        """Parse the header"""
130
131        # Parse and check version
132        length = unpack(UINT8, file_.read(1))[0]
133        parsed = unpack(STRING.format(length), file_.read(length))
134        version = int(parsed[0])
135        if version not in self.supported_versions:
136            raise ValueError("Unsupported file version {}".format(version))
137        self.metadata["magic_number_version"] = version
138
139        # Parse all metadata fields
140        for name, offset, type_ in self.fields:
141            file_.seek(offset)
142            if type_ == "utf16":
143                self.metadata[name] = parse_utf16_string(file_)
144            elif type_ == "x-time":
145                self.metadata[name] = unpack(UINT32, file_.read(4))[0] / 60000
146            else:
147                self.metadata[name] = unpack(type_, file_.read(struct.calcsize(type_)))[
148                    0
149                ]
150
151    def _parse_header_status(self):
152        """Print known and unknown parts of the header"""
153
154        file_ = open(self.filepath, "rb")
155
156        print("Header parsing status")
157        # Map positions to fields for all the known fields
158        knowns = {item[1]: item for item in self.fields}
159        # A couple of places has a \x01 byte before a string, these we simply
160        # skip
161        skips = {325, 3600}
162        # Jump to after the magic number version
163        file_.seek(4)
164
165        # Initialize variables for unknown bytes
166        unknown_start = None
167        unknown_bytes = b""
168        # While we have not yet reached the data
169        while file_.tell() < self.data_start:
170            current_position = file_.tell()
171            # Just continue on skip bytes
172            if current_position in skips:
173                file_.read(1)
174                continue
175
176            # If we know about a data field that starts at this point
177            if current_position in knowns:
178                # If we have collected unknown bytes, print them out and reset
179                if unknown_bytes != b"":
180                    print(
181                        "Unknown at", unknown_start, repr(unknown_bytes.rstrip(b"\x00"))
182                    )
183                    unknown_bytes = b""
184                    unknown_start = None
185
186                # Print out the position, type, name and value of the known
187                # value
188                print("Known field at {: >4},".format(current_position), end=" ")
189                name, _, type_ = knowns[current_position]
190                if type_ == "x-time":
191                    print(
192                        'x-time, "{: <19}'.format(name + '"'),
193                        unpack(ENDIAN + "f", file_.read(4))[0] / 60000,
194                    )
195                elif type_ == "utf16":
196                    print(
197                        ' utf16, "{: <19}'.format(name + '"'), parse_utf16_string(file_)
198                    )
199                else:
200                    size = struct.calcsize(type_)
201                    print(
202                        '{: >6}, "{: <19}'.format(type_, name + '"'),
203                        unpack(type_, file_.read(size))[0],
204                    )
205
206            # We do not know about a data field at this position If we have
207            # already collected 4 zero bytes, assume that we are done with
208            # this unkonw field, print and reset
209            else:
210                if unknown_bytes[-4:] == b"\x00\x00\x00\x00":
211                    print(
212                        "Unknown at", unknown_start, repr(unknown_bytes.rstrip(b"\x00"))
213                    )
214                    unknown_bytes = b""
215                    unknown_start = None
216
217                # Read one byte and save it
218                one_byte = file_.read(1)
219                if unknown_bytes == b"":
220                    # Only start a new collection of unknown bytes, if this
221                    # byte is not a zero byte
222                    if one_byte != b"\x00":
223                        unknown_bytes = one_byte
224                        unknown_start = file_.tell() - 1
225                else:
226                    unknown_bytes += one_byte
227
228        file_.close()
229
230    def _parse_data(self, file_):
231        """Parse the data. Decompress the delta-encoded data, and scale them
232        with y-scaling"""
233
234        scaling = self.metadata["yscaling"]
235
236        # Go to the end of the file
237        file_.seek(0, 2)
238        stop = file_.tell()
239
240        # Go to the start point of the data
241        file_.seek(self.data_start)
242
243        signal = []
244
245        buff = [0, 0, 0, 0]
246
247        while file_.tell() < stop:
248            buff[0] = fread(file_, 1, INT16)[0][0]
249            buff[1] = buff[3]
250
251            if buff[0] << 12 == 0:
252                break
253
254            for i in range(buff[0] & 4095):
255                buff[2] = fread(file_, 1, INT16)[0][0]
256
257                if buff[2] != -32768:
258                    buff[1] = buff[1] + buff[2]
259                else:
260                    buff[1] = fread(file_, 1, INT32)[0][0]
261
262                signal.append(buff[1])
263
264            buff[3] = buff[1]
265
266        signal = np.array(signal)
267        signal = signal * scaling
268
269        return signal
270
271    @cached_property
272    def times(self):
273        """The time values (x-value) for the data set in minutes"""
274
275        return np.linspace(
276            self.metadata["start_time"], self.metadata["end_time"], len(self.values)
277        )

Class that implementats the Agilent .ch file format version

  1. Warning: Not all aspects of the file header is understood, so there may and probably is information that is not parsed. See _parse_header_status for an overview of which parts of the header is understood.

Attributes: values (numpy.array): The internsity values (y-value) or the spectrum. The unit for the values is given in metadata['units']

metadata (dict): The extracted metadata

filepath (str): The filepath this object was loaded from
CHFile(filepath)
121    def __init__(self, filepath):
122        self.filepath = filepath
123        self.metadata = {}
124        with open(self.filepath, "rb") as file_:
125            self._parse_header(file_)
126            self.values = self._parse_data(file_)
fields = (('sequence_line_or_injection', 252, '>H'), ('injection_or_sequence_line', 256, '>H'), ('data_offset', 264, '>I'), ('start_time', 282, 'x-time'), ('end_time', 286, 'x-time'), ('version_string', 326, 'utf16'), ('description', 347, 'utf16'), ('sample', 858, 'utf16'), ('operator', 1880, 'utf16'), ('date', 2391, 'utf16'), ('inlet', 2492, 'utf16'), ('instrument', 2533, 'utf16'), ('method', 2574, 'utf16'), ('software version', 3601, 'utf16'), ('software name', 3089, 'utf16'), ('software revision', 3802, 'utf16'), ('zero', 4110, '>i'), ('units', 4172, 'utf16'), ('detector', 4213, 'utf16'), ('yscaling', 4732, '>d'))
data_start = 6144
supported_versions = {130}
filepath
metadata
def times(unknown):

The time values (x-value) for the data set in minutes