pychemstation.utils.spec_utils

Module contains various utility function for spectral data processing and analysis.

  1"""
  2Module contains various utility function for spectral data processing and
  3analysis.
  4"""
  5
  6from typing import Optional
  7
  8import numpy as np
  9import scipy
 10
 11from ..utils.num_utils import find_nearest_value_index
 12
 13
 14def create_binary_peak_map(data: np.ndarray) -> np.ndarray:
 15    """Return binary map of the peaks within data points.
 16
 17    True values are assigned to potential peak points, False - to baseline.
 18
 19    :param data: 1D array with data points.
 20
 21    :return: Mapping of data points, where True is potential peak region point, False - baseline.
 22    """
 23    # copying array
 24    data_c = np.copy(data)
 25
 26    # placeholder for the peak mapping
 27    peak_map = np.full_like(data_c, False, dtype=bool)
 28
 29    for _ in range(100500):  # shouldn't take more iterations
 30        # looking for peaks
 31        peaks_found = np.logical_or(
 32            data_c > np.mean(data_c) + np.std(data_c) * 3,
 33            data_c < np.mean(data_c) - np.std(data_c) * 3,
 34        )
 35
 36        # merging with peak mapping
 37        np.logical_or(peak_map, peaks_found, out=peak_map)
 38
 39        # if no peaks found - break
 40        if not peaks_found.any():
 41            break
 42
 43        # setting values to 0 and iterating again
 44        data_c[peaks_found] = 0
 45
 46    return peak_map
 47
 48
 49def combine_map_to_regions(mapping: np.ndarray) -> np.ndarray:
 50    """Combine True values into their indexes arrays.
 51
 52    :param mapping: Boolean mapping array to extract the indexes from.
 53    :return: 2D array with left and right borders of regions, where mapping is True.
 54
 55    Example:
 56    >>> combine_map_to_regions(np.array([True, True, False, True, False]))
 57    ...     array([[0, 1], [3, 3]])
 58    """
 59
 60    # No peaks identified, i.e. mapping is all False
 61    if not mapping.any():
 62        return np.array([], dtype="int64")
 63
 64    # region borders
 65    region_borders = np.diff(mapping)
 66
 67    # corresponding indexes
 68    border_indexes = np.argwhere(region_borders)
 69
 70    lefts = border_indexes[::2] + 1  # because diff was used to get the index
 71
 72    # edge case, where first peak doesn't have left border
 73    if mapping[border_indexes][0]:
 74        # just preppend 0 as first left border
 75        # mind the vstack, as np.argwhere produces a vector array
 76        lefts = np.vstack((0, lefts))
 77
 78    rights = border_indexes[1::2]
 79
 80    # another edge case, where last peak doesn't have a right border
 81    if mapping[-1]:  # True if last point identified as potential peak
 82        # just append -1 as last peak right border
 83        rights = np.vstack((rights, -1))
 84
 85    # columns as borders, rows as regions, i.e.
 86    # :output:[0] -> first peak region
 87    return np.hstack((lefts, rights))
 88
 89
 90def filter_regions(x_data: np.ndarray, peaks_regions: np.ndarray) -> np.ndarray:
 91    """Filter peak regions.
 92
 93    Peak regions are filtered to remove potential false positives (e.g. noise
 94        spikes).
 95
 96    :param x_data: X data points, needed to pick up the data resolution and map the region indexes to the corresponding data
 97            points.
 98    :param peaks_regions: 2D Nx2 array with peak regions indexes (rows) as left and right borders (columns).
 99
100    :return: 2D Mx2 array with filtered peak regions indexes(rows) as
101            left and right borders (columns).
102    """
103
104    # filter peaks where region is smaller than spectrum resolution
105    # like single spikes, e.g. noise
106    # compute the regions first
107    x_data_regions = np.copy(x_data[peaks_regions])
108
109    # get arguments where absolute difference is greater than data resolution
110    resolution = np.absolute(np.mean(np.diff(x_data)))
111
112    # (N, 1) array!
113    valid_regions_map = np.absolute(np.diff(x_data_regions)) > resolution
114
115    # get their indexes, mind the flattening of all arrays!
116    valid_regions_indexes = np.argwhere(valid_regions_map.flatten()).flatten()
117
118    # filtering!
119    peaks_regions = peaks_regions[valid_regions_indexes]
120
121    return peaks_regions
122
123
124def filter_noisy_regions(y_data: np.ndarray, peaks_regions: np.ndarray) -> np.ndarray:
125    """Remove noisy regions from given regions array.
126
127    Peak regions are filtered to remove false positive noise regions, e.g.
128        incorrectly assigned due to curvy baseline. Filtering is performed by
129        computing average peak points/data points ratio.
130
131    :param y_data: Y data points, needed to validate if the peaks
132            are actually present in the region and remove invalid regions.
133    :param peaks_regions: 2D Nx2 array with peak regions indexes
134            (rows) as left and right borders (columns).
135
136    :return: 2D Mx2 array with filtered peak regions indexes(rows) as
137            left and right borders (columns).
138    """
139
140    # compute the actual regions data points
141    y_data_regions = []
142    for region in peaks_regions:
143        y_data_regions.append(y_data[region[0] : region[-1]])
144
145    # compute noise data regions, i.e. in between peak regions
146    noise_data_regions = []
147    for row, _ in enumerate(peaks_regions):
148        try:
149            noise_data_regions.append(
150                y_data[peaks_regions[row][1] : peaks_regions[row + 1][0]]
151            )
152        except IndexError:
153            # exception for the last row -> discard
154            pass
155
156    # compute average peaks/data points ratio for noisy regions
157    noise_peaks_ratio = []
158    for region in noise_data_regions:
159        # protection from empty regions
160        if region.size != 0:
161            # minimum height is pretty low to ensure enough noise is picked
162            peaks, _ = scipy.signal.find_peaks(region, height=region.max() * 0.2)
163            noise_peaks_ratio.append(peaks.size / region.size)
164
165    # compute average with weights equal to the region length
166    noise_peaks_ratio = np.average(
167        noise_peaks_ratio, weights=[region.size for region in noise_data_regions]
168    )
169
170    # filtering!
171    valid_regions_indexes = []
172    for row, region in enumerate(y_data_regions):
173        peaks, _ = scipy.signal.find_peaks(region, height=region.max() * 0.2)
174        if peaks.size != 0 and peaks.size / region.size < noise_peaks_ratio:
175            valid_regions_indexes.append(row)
176
177    # protecting from complete cleaning
178    if not valid_regions_indexes:
179        return peaks_regions
180
181    peaks_regions = peaks_regions[np.array(valid_regions_indexes)]
182
183    return peaks_regions
184
185
186def merge_regions(
187    x_data: np.ndarray,
188    peaks_regions: np.ndarray,
189    d_merge: float,
190    recursively: Optional[bool] = True,
191):
192    """Merge peak regions if distance between is less than delta.
193
194    :param x_data: X data points.
195    :param peaks_regions: 2D Nx2 array with peak regions indexes
196            (rows) as left and right borders (columns).
197    :param d_merge: Minimum distance in X data points to merge two or more
198            regions together.
199    :param recursively: If True - will repeat the procedure until
200            all regions with distance < than d_merge will merge.
201
202    :return: 2D Mx2 array with peak regions indexes (rows) as left and
203            right borders (columns), merged according to predefined minimal
204            distance.
205
206    Example:
207    >>> regions = np.array(
208    ...     [[1, 10], [11, 20], [25, 45], [50, 75], [100, 120], [122, 134]]
209    ... )
210    >>> data = np.ones_like(regions)  # ones as example
211    >>> merge_regions(data, regions, 1)
212    ...     array([[1, 20],  [ 25,  45],  [ 50,  75],  [100, 120],  [122, 134]])
213    >>> merge_regions(data, regions, 20, True)
214    ...     array([[1, 75], [100, 134]])
215    """
216    # the code is pretty ugly but works
217    merged_regions = []
218
219    # converting to list to drop the data of the fly
220    regions = peaks_regions.tolist()
221
222    for i, _ in enumerate(regions):
223        try:
224            # check left border of i regions with right of i+1
225            if abs(x_data[regions[i][-1]] - x_data[regions[i + 1][0]]) <= d_merge:
226                # if lower append merge the regions
227                merged_regions.append([regions[i][0], regions[i + 1][-1]])
228                # drop the merged one
229                regions.pop(i + 1)
230            else:
231                # if nothing to merge, just append the current region
232                merged_regions.append(regions[i])
233        except IndexError:
234            # last row
235            merged_regions.append(regions[i])
236
237    merged_regions = np.array(merged_regions)
238
239    if not recursively:
240        return merged_regions
241
242    # if recursively, check for the difference
243    if (merged_regions == regions).all():
244        # done
245        return merged_regions
246
247    return merge_regions(x_data, merged_regions, d_merge, recursively=True)
248
249
250def expand_regions(x_data: np.ndarray, peaks_regions: np.ndarray, d_expand: float):
251    """Expand the peak regions by the desired value.
252
253    :param x_data: X data points.
254    :param peaks_regions: 2D Nx2 array with peak regions indexes
255            (rows) as left and right borders (columns).
256    :param d_expand: Value to expand borders to (in X data scale).
257
258    :return: 2D Nx2 array with expanded peak regions indexes (rows) as
259            left and right borders (columns).
260    """
261
262    data_regions = np.copy(x_data[peaks_regions])
263
264    # determine scale orientation, i.e. decreasing (e.g. ppm on NMR spectrum)
265    # or increasing (e.g. wavelength on UV spectrum)
266    if (data_regions[:, 0] - data_regions[:, 1]).mean() > 0:
267        # ppm-like scale
268        data_regions[:, 0] += d_expand
269        data_regions[:, -1] -= d_expand
270    else:
271        # wavelength-like scale
272        data_regions[:, 0] -= d_expand
273        data_regions[:, -1] += d_expand
274
275    # converting new values to new indexes
276    for index_, value in np.ndenumerate(data_regions):
277        data_regions[index_] = find_nearest_value_index(x_data, value)[1]
278
279    return data_regions.astype(int)
def create_binary_peak_map(data: numpy.ndarray) -> numpy.ndarray:
15def create_binary_peak_map(data: np.ndarray) -> np.ndarray:
16    """Return binary map of the peaks within data points.
17
18    True values are assigned to potential peak points, False - to baseline.
19
20    :param data: 1D array with data points.
21
22    :return: Mapping of data points, where True is potential peak region point, False - baseline.
23    """
24    # copying array
25    data_c = np.copy(data)
26
27    # placeholder for the peak mapping
28    peak_map = np.full_like(data_c, False, dtype=bool)
29
30    for _ in range(100500):  # shouldn't take more iterations
31        # looking for peaks
32        peaks_found = np.logical_or(
33            data_c > np.mean(data_c) + np.std(data_c) * 3,
34            data_c < np.mean(data_c) - np.std(data_c) * 3,
35        )
36
37        # merging with peak mapping
38        np.logical_or(peak_map, peaks_found, out=peak_map)
39
40        # if no peaks found - break
41        if not peaks_found.any():
42            break
43
44        # setting values to 0 and iterating again
45        data_c[peaks_found] = 0
46
47    return peak_map

Return binary map of the peaks within data points.

True values are assigned to potential peak points, False - to baseline.

Parameters
  • data: 1D array with data points.
Returns

Mapping of data points, where True is potential peak region point, False - baseline.

def combine_map_to_regions(mapping: numpy.ndarray) -> numpy.ndarray:
50def combine_map_to_regions(mapping: np.ndarray) -> np.ndarray:
51    """Combine True values into their indexes arrays.
52
53    :param mapping: Boolean mapping array to extract the indexes from.
54    :return: 2D array with left and right borders of regions, where mapping is True.
55
56    Example:
57    >>> combine_map_to_regions(np.array([True, True, False, True, False]))
58    ...     array([[0, 1], [3, 3]])
59    """
60
61    # No peaks identified, i.e. mapping is all False
62    if not mapping.any():
63        return np.array([], dtype="int64")
64
65    # region borders
66    region_borders = np.diff(mapping)
67
68    # corresponding indexes
69    border_indexes = np.argwhere(region_borders)
70
71    lefts = border_indexes[::2] + 1  # because diff was used to get the index
72
73    # edge case, where first peak doesn't have left border
74    if mapping[border_indexes][0]:
75        # just preppend 0 as first left border
76        # mind the vstack, as np.argwhere produces a vector array
77        lefts = np.vstack((0, lefts))
78
79    rights = border_indexes[1::2]
80
81    # another edge case, where last peak doesn't have a right border
82    if mapping[-1]:  # True if last point identified as potential peak
83        # just append -1 as last peak right border
84        rights = np.vstack((rights, -1))
85
86    # columns as borders, rows as regions, i.e.
87    # :output:[0] -> first peak region
88    return np.hstack((lefts, rights))

Combine True values into their indexes arrays.

Parameters
  • mapping: Boolean mapping array to extract the indexes from.
Returns

2D array with left and right borders of regions, where mapping is True.

Example:

>>> combine_map_to_regions(np.array([True, True, False, True, False]))
...     array([[0, 1], [3, 3]])
def filter_regions(x_data: numpy.ndarray, peaks_regions: numpy.ndarray) -> numpy.ndarray:
 91def filter_regions(x_data: np.ndarray, peaks_regions: np.ndarray) -> np.ndarray:
 92    """Filter peak regions.
 93
 94    Peak regions are filtered to remove potential false positives (e.g. noise
 95        spikes).
 96
 97    :param x_data: X data points, needed to pick up the data resolution and map the region indexes to the corresponding data
 98            points.
 99    :param peaks_regions: 2D Nx2 array with peak regions indexes (rows) as left and right borders (columns).
100
101    :return: 2D Mx2 array with filtered peak regions indexes(rows) as
102            left and right borders (columns).
103    """
104
105    # filter peaks where region is smaller than spectrum resolution
106    # like single spikes, e.g. noise
107    # compute the regions first
108    x_data_regions = np.copy(x_data[peaks_regions])
109
110    # get arguments where absolute difference is greater than data resolution
111    resolution = np.absolute(np.mean(np.diff(x_data)))
112
113    # (N, 1) array!
114    valid_regions_map = np.absolute(np.diff(x_data_regions)) > resolution
115
116    # get their indexes, mind the flattening of all arrays!
117    valid_regions_indexes = np.argwhere(valid_regions_map.flatten()).flatten()
118
119    # filtering!
120    peaks_regions = peaks_regions[valid_regions_indexes]
121
122    return peaks_regions

Filter peak regions.

Peak regions are filtered to remove potential false positives (e.g. noise spikes).

Parameters
  • x_data: X data points, needed to pick up the data resolution and map the region indexes to the corresponding data points.
  • peaks_regions: 2D Nx2 array with peak regions indexes (rows) as left and right borders (columns).
Returns

2D Mx2 array with filtered peak regions indexes(rows) as left and right borders (columns).

def filter_noisy_regions(y_data: numpy.ndarray, peaks_regions: numpy.ndarray) -> numpy.ndarray:
125def filter_noisy_regions(y_data: np.ndarray, peaks_regions: np.ndarray) -> np.ndarray:
126    """Remove noisy regions from given regions array.
127
128    Peak regions are filtered to remove false positive noise regions, e.g.
129        incorrectly assigned due to curvy baseline. Filtering is performed by
130        computing average peak points/data points ratio.
131
132    :param y_data: Y data points, needed to validate if the peaks
133            are actually present in the region and remove invalid regions.
134    :param peaks_regions: 2D Nx2 array with peak regions indexes
135            (rows) as left and right borders (columns).
136
137    :return: 2D Mx2 array with filtered peak regions indexes(rows) as
138            left and right borders (columns).
139    """
140
141    # compute the actual regions data points
142    y_data_regions = []
143    for region in peaks_regions:
144        y_data_regions.append(y_data[region[0] : region[-1]])
145
146    # compute noise data regions, i.e. in between peak regions
147    noise_data_regions = []
148    for row, _ in enumerate(peaks_regions):
149        try:
150            noise_data_regions.append(
151                y_data[peaks_regions[row][1] : peaks_regions[row + 1][0]]
152            )
153        except IndexError:
154            # exception for the last row -> discard
155            pass
156
157    # compute average peaks/data points ratio for noisy regions
158    noise_peaks_ratio = []
159    for region in noise_data_regions:
160        # protection from empty regions
161        if region.size != 0:
162            # minimum height is pretty low to ensure enough noise is picked
163            peaks, _ = scipy.signal.find_peaks(region, height=region.max() * 0.2)
164            noise_peaks_ratio.append(peaks.size / region.size)
165
166    # compute average with weights equal to the region length
167    noise_peaks_ratio = np.average(
168        noise_peaks_ratio, weights=[region.size for region in noise_data_regions]
169    )
170
171    # filtering!
172    valid_regions_indexes = []
173    for row, region in enumerate(y_data_regions):
174        peaks, _ = scipy.signal.find_peaks(region, height=region.max() * 0.2)
175        if peaks.size != 0 and peaks.size / region.size < noise_peaks_ratio:
176            valid_regions_indexes.append(row)
177
178    # protecting from complete cleaning
179    if not valid_regions_indexes:
180        return peaks_regions
181
182    peaks_regions = peaks_regions[np.array(valid_regions_indexes)]
183
184    return peaks_regions

Remove noisy regions from given regions array.

Peak regions are filtered to remove false positive noise regions, e.g. incorrectly assigned due to curvy baseline. Filtering is performed by computing average peak points/data points ratio.

Parameters
  • y_data: Y data points, needed to validate if the peaks are actually present in the region and remove invalid regions.
  • peaks_regions: 2D Nx2 array with peak regions indexes (rows) as left and right borders (columns).
Returns

2D Mx2 array with filtered peak regions indexes(rows) as left and right borders (columns).

def merge_regions( x_data: numpy.ndarray, peaks_regions: numpy.ndarray, d_merge: float, recursively: Optional[bool] = True):
187def merge_regions(
188    x_data: np.ndarray,
189    peaks_regions: np.ndarray,
190    d_merge: float,
191    recursively: Optional[bool] = True,
192):
193    """Merge peak regions if distance between is less than delta.
194
195    :param x_data: X data points.
196    :param peaks_regions: 2D Nx2 array with peak regions indexes
197            (rows) as left and right borders (columns).
198    :param d_merge: Minimum distance in X data points to merge two or more
199            regions together.
200    :param recursively: If True - will repeat the procedure until
201            all regions with distance < than d_merge will merge.
202
203    :return: 2D Mx2 array with peak regions indexes (rows) as left and
204            right borders (columns), merged according to predefined minimal
205            distance.
206
207    Example:
208    >>> regions = np.array(
209    ...     [[1, 10], [11, 20], [25, 45], [50, 75], [100, 120], [122, 134]]
210    ... )
211    >>> data = np.ones_like(regions)  # ones as example
212    >>> merge_regions(data, regions, 1)
213    ...     array([[1, 20],  [ 25,  45],  [ 50,  75],  [100, 120],  [122, 134]])
214    >>> merge_regions(data, regions, 20, True)
215    ...     array([[1, 75], [100, 134]])
216    """
217    # the code is pretty ugly but works
218    merged_regions = []
219
220    # converting to list to drop the data of the fly
221    regions = peaks_regions.tolist()
222
223    for i, _ in enumerate(regions):
224        try:
225            # check left border of i regions with right of i+1
226            if abs(x_data[regions[i][-1]] - x_data[regions[i + 1][0]]) <= d_merge:
227                # if lower append merge the regions
228                merged_regions.append([regions[i][0], regions[i + 1][-1]])
229                # drop the merged one
230                regions.pop(i + 1)
231            else:
232                # if nothing to merge, just append the current region
233                merged_regions.append(regions[i])
234        except IndexError:
235            # last row
236            merged_regions.append(regions[i])
237
238    merged_regions = np.array(merged_regions)
239
240    if not recursively:
241        return merged_regions
242
243    # if recursively, check for the difference
244    if (merged_regions == regions).all():
245        # done
246        return merged_regions
247
248    return merge_regions(x_data, merged_regions, d_merge, recursively=True)

Merge peak regions if distance between is less than delta.

Parameters
  • x_data: X data points.
  • peaks_regions: 2D Nx2 array with peak regions indexes (rows) as left and right borders (columns).
  • d_merge: Minimum distance in X data points to merge two or more regions together.
  • recursively: If True - will repeat the procedure until all regions with distance < than d_merge will merge.
Returns

2D Mx2 array with peak regions indexes (rows) as left and right borders (columns), merged according to predefined minimal distance.

Example:

>>> regions = np.array(
...     [[1, 10], [11, 20], [25, 45], [50, 75], [100, 120], [122, 134]]
... )
>>> data = np.ones_like(regions)  # ones as example
>>> merge_regions(data, regions, 1)
...     array([[1, 20],  [ 25,  45],  [ 50,  75],  [100, 120],  [122, 134]])
>>> merge_regions(data, regions, 20, True)
...     array([[1, 75], [100, 134]])
def expand_regions(x_data: numpy.ndarray, peaks_regions: numpy.ndarray, d_expand: float):
251def expand_regions(x_data: np.ndarray, peaks_regions: np.ndarray, d_expand: float):
252    """Expand the peak regions by the desired value.
253
254    :param x_data: X data points.
255    :param peaks_regions: 2D Nx2 array with peak regions indexes
256            (rows) as left and right borders (columns).
257    :param d_expand: Value to expand borders to (in X data scale).
258
259    :return: 2D Nx2 array with expanded peak regions indexes (rows) as
260            left and right borders (columns).
261    """
262
263    data_regions = np.copy(x_data[peaks_regions])
264
265    # determine scale orientation, i.e. decreasing (e.g. ppm on NMR spectrum)
266    # or increasing (e.g. wavelength on UV spectrum)
267    if (data_regions[:, 0] - data_regions[:, 1]).mean() > 0:
268        # ppm-like scale
269        data_regions[:, 0] += d_expand
270        data_regions[:, -1] -= d_expand
271    else:
272        # wavelength-like scale
273        data_regions[:, 0] -= d_expand
274        data_regions[:, -1] += d_expand
275
276    # converting new values to new indexes
277    for index_, value in np.ndenumerate(data_regions):
278        data_regions[index_] = find_nearest_value_index(x_data, value)[1]
279
280    return data_regions.astype(int)

Expand the peak regions by the desired value.

Parameters
  • x_data: X data points.
  • peaks_regions: 2D Nx2 array with peak regions indexes (rows) as left and right borders (columns).
  • d_expand: Value to expand borders to (in X data scale).
Returns

2D Nx2 array with expanded peak regions indexes (rows) as left and right borders (columns).