pychemstation.utils.parsing
File parser for Chemstation files (*.ch) Basically a port of the matlab script at: https://github.com/chemplexity/chromatography/blob/master/Development/File%20Conversion/ImportAgilentFID.m
This file is a standalone file to parse the binary files created by Chemstation
I use it for file with version 130, genereted by an Agilent LC.
1#!/usr/bin/python 2# coding: utf-8 3 4""" 5File parser for Chemstation files (*.ch) 6Basically a port of the matlab script at: 7https://github.com/chemplexity/chromatography/blob/master/Development/File%20Conversion/ImportAgilentFID.m 8 9This file is a standalone file to parse the binary files created by Chemstation 10 11I use it for file with version 130, genereted by an Agilent LC. 12""" 13 14import struct 15from struct import unpack 16 17import numpy as np 18 19# Constants used for binary file parsing 20ENDIAN = ">" 21STRING = ENDIAN + "{}s" 22UINT8 = ENDIAN + "B" 23UINT16 = ENDIAN + "H" 24INT16 = ENDIAN + "h" 25INT32 = ENDIAN + "i" 26UINT32 = ENDIAN + "I" 27 28 29def fread(fid, nelements, dtype): 30 """Equivalent to Matlab fread function""" 31 32 if dtype is str: 33 dt = np.uint8 # WARNING: assuming 8-bit ASCII for np.str! 34 else: 35 dt = dtype 36 37 data_array = np.fromfile(fid, dt, nelements) 38 data_array.shape = (nelements, 1) 39 40 return data_array 41 42 43def parse_utf16_string(file_, encoding="UTF16"): 44 """Parse a pascal type UTF16 encoded string from a binary file object""" 45 46 # First read the expected number of CHARACTERS 47 string_length = unpack(UINT8, file_.read(1))[0] 48 # Then read and decode 49 parsed = unpack(STRING.format(2 * string_length), file_.read(2 * string_length)) 50 return parsed[0].decode(encoding) 51 52 53class cached_property(object): 54 """A property that is only computed once per instance and then replaces 55 itself with an ordinary attribute. Deleting the attribute resets the 56 property. 57 58 https://github.com/bottlepy/bottle/commit/fa7733e075da0d790d809aa3d2f53071897e6f76 59 """ 60 61 def __init__(self, func): 62 self.__doc__ = getattr(func, "__doc__") 63 self.func = func 64 65 def __get__(self, obj, cls): 66 if obj is None: 67 return self 68 value = obj.__dict__[self.func.__name__] = self.func(obj) 69 return value 70 71 72class CHFile(object): 73 """Class that implementats the Agilent .ch file format version 74 130. Warning: Not all aspects of the file header is understood, 75 so there may and probably is information that is not parsed. See 76 _parse_header_status for an overview of which parts of the header 77 is understood. 78 79 Attributes: 80 values (numpy.array): The internsity values (y-value) or the 81 spectrum. The unit for the values is given in `metadata['units']` 82 83 metadata (dict): The extracted metadata 84 85 filepath (str): The filepath this object was loaded from 86 """ 87 88 # Fields is a table of name, offset and type. Types 'x-time' and 'utf16' 89 # are specially handled, the rest are format arguments for struct unpack 90 fields = ( 91 ("sequence_line_or_injection", 252, UINT16), 92 ("injection_or_sequence_line", 256, UINT16), 93 ("data_offset", 264, UINT32), 94 ("start_time", 282, "x-time"), 95 ("end_time", 286, "x-time"), 96 ("version_string", 326, "utf16"), 97 ("description", 347, "utf16"), 98 ("sample", 858, "utf16"), 99 ("operator", 1880, "utf16"), 100 ("date", 2391, "utf16"), 101 ("inlet", 2492, "utf16"), 102 ("instrument", 2533, "utf16"), 103 ("method", 2574, "utf16"), 104 ("software version", 3601, "utf16"), 105 ("software name", 3089, "utf16"), 106 ("software revision", 3802, "utf16"), 107 ("zero", 4110, INT32), 108 ("units", 4172, "utf16"), 109 ("detector", 4213, "utf16"), 110 ("yscaling", 4732, ENDIAN + "d"), 111 ) 112 113 # The start position of the data 114 # Get it from metadata['data_offset'] * 512 115 data_start = 6144 116 117 # The versions of the file format supported by this implementation 118 supported_versions = {130} 119 120 def __init__(self, filepath): 121 self.filepath = filepath 122 self.metadata = {} 123 with open(self.filepath, "rb") as file_: 124 self._parse_header(file_) 125 self.values = self._parse_data(file_) 126 127 def _parse_header(self, file_): 128 """Parse the header""" 129 130 # Parse and check version 131 length = unpack(UINT8, file_.read(1))[0] 132 parsed = unpack(STRING.format(length), file_.read(length)) 133 version = int(parsed[0]) 134 if version not in self.supported_versions: 135 raise ValueError("Unsupported file version {}".format(version)) 136 self.metadata["magic_number_version"] = version 137 138 # Parse all metadata fields 139 for name, offset, type_ in self.fields: 140 file_.seek(offset) 141 if type_ == "utf16": 142 self.metadata[name] = parse_utf16_string(file_) 143 elif type_ == "x-time": 144 self.metadata[name] = unpack(UINT32, file_.read(4))[0] / 60000 145 else: 146 self.metadata[name] = unpack(type_, file_.read(struct.calcsize(type_)))[ 147 0 148 ] 149 150 def _parse_header_status(self): 151 """Print known and unknown parts of the header""" 152 153 file_ = open(self.filepath, "rb") 154 155 print("Header parsing status") 156 # Map positions to fields for all the known fields 157 knowns = {item[1]: item for item in self.fields} 158 # A couple of places has a \x01 byte before a string, these we simply 159 # skip 160 skips = {325, 3600} 161 # Jump to after the magic number version 162 file_.seek(4) 163 164 # Initialize variables for unknown bytes 165 unknown_start = None 166 unknown_bytes = b"" 167 # While we have not yet reached the data 168 while file_.tell() < self.data_start: 169 current_position = file_.tell() 170 # Just continue on skip bytes 171 if current_position in skips: 172 file_.read(1) 173 continue 174 175 # If we know about a data field that starts at this point 176 if current_position in knowns: 177 # If we have collected unknown bytes, print them out and reset 178 if unknown_bytes != b"": 179 print( 180 "Unknown at", unknown_start, repr(unknown_bytes.rstrip(b"\x00")) 181 ) 182 unknown_bytes = b"" 183 unknown_start = None 184 185 # Print out the position, type, name and value of the known 186 # value 187 print("Known field at {: >4},".format(current_position), end=" ") 188 name, _, type_ = knowns[current_position] 189 if type_ == "x-time": 190 print( 191 'x-time, "{: <19}'.format(name + '"'), 192 unpack(ENDIAN + "f", file_.read(4))[0] / 60000, 193 ) 194 elif type_ == "utf16": 195 print( 196 ' utf16, "{: <19}'.format(name + '"'), parse_utf16_string(file_) 197 ) 198 else: 199 size = struct.calcsize(type_) 200 print( 201 '{: >6}, "{: <19}'.format(type_, name + '"'), 202 unpack(type_, file_.read(size))[0], 203 ) 204 205 # We do not know about a data field at this position If we have 206 # already collected 4 zero bytes, assume that we are done with 207 # this unkonw field, print and reset 208 else: 209 if unknown_bytes[-4:] == b"\x00\x00\x00\x00": 210 print( 211 "Unknown at", unknown_start, repr(unknown_bytes.rstrip(b"\x00")) 212 ) 213 unknown_bytes = b"" 214 unknown_start = None 215 216 # Read one byte and save it 217 one_byte = file_.read(1) 218 if unknown_bytes == b"": 219 # Only start a new collection of unknown bytes, if this 220 # byte is not a zero byte 221 if one_byte != b"\x00": 222 unknown_bytes = one_byte 223 unknown_start = file_.tell() - 1 224 else: 225 unknown_bytes += one_byte 226 227 file_.close() 228 229 def _parse_data(self, file_): 230 """Parse the data. Decompress the delta-encoded data, and scale them 231 with y-scaling""" 232 233 scaling = self.metadata["yscaling"] 234 235 # Go to the end of the file 236 file_.seek(0, 2) 237 stop = file_.tell() 238 239 # Go to the start point of the data 240 file_.seek(self.data_start) 241 242 signal = [] 243 244 buff = [0, 0, 0, 0] 245 246 while file_.tell() < stop: 247 buff[0] = fread(file_, 1, INT16)[0][0] 248 buff[1] = buff[3] 249 250 if buff[0] << 12 == 0: 251 break 252 253 for i in range(buff[0] & 4095): 254 buff[2] = fread(file_, 1, INT16)[0][0] 255 256 if buff[2] != -32768: 257 buff[1] = buff[1] + buff[2] 258 else: 259 buff[1] = fread(file_, 1, INT32)[0][0] 260 261 signal.append(buff[1]) 262 263 buff[3] = buff[1] 264 265 signal = np.array(signal) 266 signal = signal * scaling 267 268 return signal 269 270 @cached_property 271 def times(self): 272 """The time values (x-value) for the data set in minutes""" 273 274 return np.linspace( 275 self.metadata["start_time"], self.metadata["end_time"], len(self.values) 276 ) 277 278 279if __name__ == "__main__": 280 CHFile("lcdiag.reg")
30def fread(fid, nelements, dtype): 31 """Equivalent to Matlab fread function""" 32 33 if dtype is str: 34 dt = np.uint8 # WARNING: assuming 8-bit ASCII for np.str! 35 else: 36 dt = dtype 37 38 data_array = np.fromfile(fid, dt, nelements) 39 data_array.shape = (nelements, 1) 40 41 return data_array
Equivalent to Matlab fread function
44def parse_utf16_string(file_, encoding="UTF16"): 45 """Parse a pascal type UTF16 encoded string from a binary file object""" 46 47 # First read the expected number of CHARACTERS 48 string_length = unpack(UINT8, file_.read(1))[0] 49 # Then read and decode 50 parsed = unpack(STRING.format(2 * string_length), file_.read(2 * string_length)) 51 return parsed[0].decode(encoding)
Parse a pascal type UTF16 encoded string from a binary file object
54class cached_property(object): 55 """A property that is only computed once per instance and then replaces 56 itself with an ordinary attribute. Deleting the attribute resets the 57 property. 58 59 https://github.com/bottlepy/bottle/commit/fa7733e075da0d790d809aa3d2f53071897e6f76 60 """ 61 62 def __init__(self, func): 63 self.__doc__ = getattr(func, "__doc__") 64 self.func = func 65 66 def __get__(self, obj, cls): 67 if obj is None: 68 return self 69 value = obj.__dict__[self.func.__name__] = self.func(obj) 70 return value
A property that is only computed once per instance and then replaces itself with an ordinary attribute. Deleting the attribute resets the property.
https://github.com/bottlepy/bottle/commit/fa7733e075da0d790d809aa3d2f53071897e6f76
73class CHFile(object): 74 """Class that implementats the Agilent .ch file format version 75 130. Warning: Not all aspects of the file header is understood, 76 so there may and probably is information that is not parsed. See 77 _parse_header_status for an overview of which parts of the header 78 is understood. 79 80 Attributes: 81 values (numpy.array): The internsity values (y-value) or the 82 spectrum. The unit for the values is given in `metadata['units']` 83 84 metadata (dict): The extracted metadata 85 86 filepath (str): The filepath this object was loaded from 87 """ 88 89 # Fields is a table of name, offset and type. Types 'x-time' and 'utf16' 90 # are specially handled, the rest are format arguments for struct unpack 91 fields = ( 92 ("sequence_line_or_injection", 252, UINT16), 93 ("injection_or_sequence_line", 256, UINT16), 94 ("data_offset", 264, UINT32), 95 ("start_time", 282, "x-time"), 96 ("end_time", 286, "x-time"), 97 ("version_string", 326, "utf16"), 98 ("description", 347, "utf16"), 99 ("sample", 858, "utf16"), 100 ("operator", 1880, "utf16"), 101 ("date", 2391, "utf16"), 102 ("inlet", 2492, "utf16"), 103 ("instrument", 2533, "utf16"), 104 ("method", 2574, "utf16"), 105 ("software version", 3601, "utf16"), 106 ("software name", 3089, "utf16"), 107 ("software revision", 3802, "utf16"), 108 ("zero", 4110, INT32), 109 ("units", 4172, "utf16"), 110 ("detector", 4213, "utf16"), 111 ("yscaling", 4732, ENDIAN + "d"), 112 ) 113 114 # The start position of the data 115 # Get it from metadata['data_offset'] * 512 116 data_start = 6144 117 118 # The versions of the file format supported by this implementation 119 supported_versions = {130} 120 121 def __init__(self, filepath): 122 self.filepath = filepath 123 self.metadata = {} 124 with open(self.filepath, "rb") as file_: 125 self._parse_header(file_) 126 self.values = self._parse_data(file_) 127 128 def _parse_header(self, file_): 129 """Parse the header""" 130 131 # Parse and check version 132 length = unpack(UINT8, file_.read(1))[0] 133 parsed = unpack(STRING.format(length), file_.read(length)) 134 version = int(parsed[0]) 135 if version not in self.supported_versions: 136 raise ValueError("Unsupported file version {}".format(version)) 137 self.metadata["magic_number_version"] = version 138 139 # Parse all metadata fields 140 for name, offset, type_ in self.fields: 141 file_.seek(offset) 142 if type_ == "utf16": 143 self.metadata[name] = parse_utf16_string(file_) 144 elif type_ == "x-time": 145 self.metadata[name] = unpack(UINT32, file_.read(4))[0] / 60000 146 else: 147 self.metadata[name] = unpack(type_, file_.read(struct.calcsize(type_)))[ 148 0 149 ] 150 151 def _parse_header_status(self): 152 """Print known and unknown parts of the header""" 153 154 file_ = open(self.filepath, "rb") 155 156 print("Header parsing status") 157 # Map positions to fields for all the known fields 158 knowns = {item[1]: item for item in self.fields} 159 # A couple of places has a \x01 byte before a string, these we simply 160 # skip 161 skips = {325, 3600} 162 # Jump to after the magic number version 163 file_.seek(4) 164 165 # Initialize variables for unknown bytes 166 unknown_start = None 167 unknown_bytes = b"" 168 # While we have not yet reached the data 169 while file_.tell() < self.data_start: 170 current_position = file_.tell() 171 # Just continue on skip bytes 172 if current_position in skips: 173 file_.read(1) 174 continue 175 176 # If we know about a data field that starts at this point 177 if current_position in knowns: 178 # If we have collected unknown bytes, print them out and reset 179 if unknown_bytes != b"": 180 print( 181 "Unknown at", unknown_start, repr(unknown_bytes.rstrip(b"\x00")) 182 ) 183 unknown_bytes = b"" 184 unknown_start = None 185 186 # Print out the position, type, name and value of the known 187 # value 188 print("Known field at {: >4},".format(current_position), end=" ") 189 name, _, type_ = knowns[current_position] 190 if type_ == "x-time": 191 print( 192 'x-time, "{: <19}'.format(name + '"'), 193 unpack(ENDIAN + "f", file_.read(4))[0] / 60000, 194 ) 195 elif type_ == "utf16": 196 print( 197 ' utf16, "{: <19}'.format(name + '"'), parse_utf16_string(file_) 198 ) 199 else: 200 size = struct.calcsize(type_) 201 print( 202 '{: >6}, "{: <19}'.format(type_, name + '"'), 203 unpack(type_, file_.read(size))[0], 204 ) 205 206 # We do not know about a data field at this position If we have 207 # already collected 4 zero bytes, assume that we are done with 208 # this unkonw field, print and reset 209 else: 210 if unknown_bytes[-4:] == b"\x00\x00\x00\x00": 211 print( 212 "Unknown at", unknown_start, repr(unknown_bytes.rstrip(b"\x00")) 213 ) 214 unknown_bytes = b"" 215 unknown_start = None 216 217 # Read one byte and save it 218 one_byte = file_.read(1) 219 if unknown_bytes == b"": 220 # Only start a new collection of unknown bytes, if this 221 # byte is not a zero byte 222 if one_byte != b"\x00": 223 unknown_bytes = one_byte 224 unknown_start = file_.tell() - 1 225 else: 226 unknown_bytes += one_byte 227 228 file_.close() 229 230 def _parse_data(self, file_): 231 """Parse the data. Decompress the delta-encoded data, and scale them 232 with y-scaling""" 233 234 scaling = self.metadata["yscaling"] 235 236 # Go to the end of the file 237 file_.seek(0, 2) 238 stop = file_.tell() 239 240 # Go to the start point of the data 241 file_.seek(self.data_start) 242 243 signal = [] 244 245 buff = [0, 0, 0, 0] 246 247 while file_.tell() < stop: 248 buff[0] = fread(file_, 1, INT16)[0][0] 249 buff[1] = buff[3] 250 251 if buff[0] << 12 == 0: 252 break 253 254 for i in range(buff[0] & 4095): 255 buff[2] = fread(file_, 1, INT16)[0][0] 256 257 if buff[2] != -32768: 258 buff[1] = buff[1] + buff[2] 259 else: 260 buff[1] = fread(file_, 1, INT32)[0][0] 261 262 signal.append(buff[1]) 263 264 buff[3] = buff[1] 265 266 signal = np.array(signal) 267 signal = signal * scaling 268 269 return signal 270 271 @cached_property 272 def times(self): 273 """The time values (x-value) for the data set in minutes""" 274 275 return np.linspace( 276 self.metadata["start_time"], self.metadata["end_time"], len(self.values) 277 )
Class that implementats the Agilent .ch file format version
- Warning: Not all aspects of the file header is understood, so there may and probably is information that is not parsed. See _parse_header_status for an overview of which parts of the header is understood.
Attributes:
values (numpy.array): The internsity values (y-value) or the
spectrum. The unit for the values is given in metadata['units']
metadata (dict): The extracted metadata
filepath (str): The filepath this object was loaded from