pychemstation.utils.spec_utils
Module contains various utility function for spectral data processing and analysis.
1""" 2Module contains various utility function for spectral data processing and 3analysis. 4""" 5 6from typing import Optional 7 8import numpy as np 9import scipy 10 11from ..utils.num_utils import find_nearest_value_index 12 13 14def create_binary_peak_map(data: np.ndarray) -> np.ndarray: 15 """Return binary map of the peaks within data points. 16 17 True values are assigned to potential peak points, False - to baseline. 18 19 :param data: 1D array with data points. 20 21 :return: Mapping of data points, where True is potential peak region point, False - baseline. 22 """ 23 # copying array 24 data_c = np.copy(data) 25 26 # placeholder for the peak mapping 27 peak_map = np.full_like(data_c, False, dtype=bool) 28 29 for _ in range(100500): # shouldn't take more iterations 30 # looking for peaks 31 peaks_found = np.logical_or( 32 data_c > np.mean(data_c) + np.std(data_c) * 3, 33 data_c < np.mean(data_c) - np.std(data_c) * 3, 34 ) 35 36 # merging with peak mapping 37 np.logical_or(peak_map, peaks_found, out=peak_map) 38 39 # if no peaks found - break 40 if not peaks_found.any(): 41 break 42 43 # setting values to 0 and iterating again 44 data_c[peaks_found] = 0 45 46 return peak_map 47 48 49def combine_map_to_regions(mapping: np.ndarray) -> np.ndarray: 50 """Combine True values into their indexes arrays. 51 52 :param mapping: Boolean mapping array to extract the indexes from. 53 :return: 2D array with left and right borders of regions, where mapping is True. 54 55 Example: 56 >>> combine_map_to_regions(np.array([True, True, False, True, False])) 57 ... array([[0, 1], [3, 3]]) 58 """ 59 60 # No peaks identified, i.e. mapping is all False 61 if not mapping.any(): 62 return np.array([], dtype="int64") 63 64 # region borders 65 region_borders = np.diff(mapping) 66 67 # corresponding indexes 68 border_indexes = np.argwhere(region_borders) 69 70 lefts = border_indexes[::2] + 1 # because diff was used to get the index 71 72 # edge case, where first peak doesn't have left border 73 if mapping[border_indexes][0]: 74 # just preppend 0 as first left border 75 # mind the vstack, as np.argwhere produces a vector array 76 lefts = np.vstack((0, lefts)) 77 78 rights = border_indexes[1::2] 79 80 # another edge case, where last peak doesn't have a right border 81 if mapping[-1]: # True if last point identified as potential peak 82 # just append -1 as last peak right border 83 rights = np.vstack((rights, -1)) 84 85 # columns as borders, rows as regions, i.e. 86 # :output:[0] -> first peak region 87 return np.hstack((lefts, rights)) 88 89 90def filter_regions(x_data: np.ndarray, peaks_regions: np.ndarray) -> np.ndarray: 91 """Filter peak regions. 92 93 Peak regions are filtered to remove potential false positives (e.g. noise 94 spikes). 95 96 :param x_data: X data points, needed to pick up the data resolution and map the region indexes to the corresponding data 97 points. 98 :param peaks_regions: 2D Nx2 array with peak regions indexes (rows) as left and right borders (columns). 99 100 :return: 2D Mx2 array with filtered peak regions indexes(rows) as 101 left and right borders (columns). 102 """ 103 104 # filter peaks where region is smaller than spectrum resolution 105 # like single spikes, e.g. noise 106 # compute the regions first 107 x_data_regions = np.copy(x_data[peaks_regions]) 108 109 # get arguments where absolute difference is greater than data resolution 110 resolution = np.absolute(np.mean(np.diff(x_data))) 111 112 # (N, 1) array! 113 valid_regions_map = np.absolute(np.diff(x_data_regions)) > resolution 114 115 # get their indexes, mind the flattening of all arrays! 116 valid_regions_indexes = np.argwhere(valid_regions_map.flatten()).flatten() 117 118 # filtering! 119 peaks_regions = peaks_regions[valid_regions_indexes] 120 121 return peaks_regions 122 123 124def filter_noisy_regions(y_data: np.ndarray, peaks_regions: np.ndarray) -> np.ndarray: 125 """Remove noisy regions from given regions array. 126 127 Peak regions are filtered to remove false positive noise regions, e.g. 128 incorrectly assigned due to curvy baseline. Filtering is performed by 129 computing average peak points/data points ratio. 130 131 :param y_data: Y data points, needed to validate if the peaks 132 are actually present in the region and remove invalid regions. 133 :param peaks_regions: 2D Nx2 array with peak regions indexes 134 (rows) as left and right borders (columns). 135 136 :return: 2D Mx2 array with filtered peak regions indexes(rows) as 137 left and right borders (columns). 138 """ 139 140 # compute the actual regions data points 141 y_data_regions = [] 142 for region in peaks_regions: 143 y_data_regions.append(y_data[region[0] : region[-1]]) 144 145 # compute noise data regions, i.e. in between peak regions 146 noise_data_regions = [] 147 for row, _ in enumerate(peaks_regions): 148 try: 149 noise_data_regions.append( 150 y_data[peaks_regions[row][1] : peaks_regions[row + 1][0]] 151 ) 152 except IndexError: 153 # exception for the last row -> discard 154 pass 155 156 # compute average peaks/data points ratio for noisy regions 157 noise_peaks_ratio = [] 158 for region in noise_data_regions: 159 # protection from empty regions 160 if region.size != 0: 161 # minimum height is pretty low to ensure enough noise is picked 162 peaks, _ = scipy.signal.find_peaks(region, height=region.max() * 0.2) 163 noise_peaks_ratio.append(peaks.size / region.size) 164 165 # compute average with weights equal to the region length 166 noise_peaks_ratio = np.average( 167 noise_peaks_ratio, weights=[region.size for region in noise_data_regions] 168 ) 169 170 # filtering! 171 valid_regions_indexes = [] 172 for row, region in enumerate(y_data_regions): 173 peaks, _ = scipy.signal.find_peaks(region, height=region.max() * 0.2) 174 if peaks.size != 0 and peaks.size / region.size < noise_peaks_ratio: 175 valid_regions_indexes.append(row) 176 177 # protecting from complete cleaning 178 if not valid_regions_indexes: 179 return peaks_regions 180 181 peaks_regions = peaks_regions[np.array(valid_regions_indexes)] 182 183 return peaks_regions 184 185 186def merge_regions( 187 x_data: np.ndarray, 188 peaks_regions: np.ndarray, 189 d_merge: float, 190 recursively: Optional[bool] = True, 191): 192 """Merge peak regions if distance between is less than delta. 193 194 :param x_data: X data points. 195 :param peaks_regions: 2D Nx2 array with peak regions indexes 196 (rows) as left and right borders (columns). 197 :param d_merge: Minimum distance in X data points to merge two or more 198 regions together. 199 :param recursively: If True - will repeat the procedure until 200 all regions with distance < than d_merge will merge. 201 202 :return: 2D Mx2 array with peak regions indexes (rows) as left and 203 right borders (columns), merged according to predefined minimal 204 distance. 205 206 Example: 207 >>> regions = np.array( 208 ... [[1, 10], [11, 20], [25, 45], [50, 75], [100, 120], [122, 134]] 209 ... ) 210 >>> data = np.ones_like(regions) # ones as example 211 >>> merge_regions(data, regions, 1) 212 ... array([[1, 20], [ 25, 45], [ 50, 75], [100, 120], [122, 134]]) 213 >>> merge_regions(data, regions, 20, True) 214 ... array([[1, 75], [100, 134]]) 215 """ 216 # the code is pretty ugly but works 217 merged_regions = [] 218 219 # converting to list to drop the data of the fly 220 regions = peaks_regions.tolist() 221 222 for i, _ in enumerate(regions): 223 try: 224 # check left border of i regions with right of i+1 225 if abs(x_data[regions[i][-1]] - x_data[regions[i + 1][0]]) <= d_merge: 226 # if lower append merge the regions 227 merged_regions.append([regions[i][0], regions[i + 1][-1]]) 228 # drop the merged one 229 regions.pop(i + 1) 230 else: 231 # if nothing to merge, just append the current region 232 merged_regions.append(regions[i]) 233 except IndexError: 234 # last row 235 merged_regions.append(regions[i]) 236 237 merged_regions = np.array(merged_regions) 238 239 if not recursively: 240 return merged_regions 241 242 # if recursively, check for the difference 243 if (merged_regions == regions).all(): 244 # done 245 return merged_regions 246 247 return merge_regions(x_data, merged_regions, d_merge, recursively=True) 248 249 250def expand_regions(x_data: np.ndarray, peaks_regions: np.ndarray, d_expand: float): 251 """Expand the peak regions by the desired value. 252 253 :param x_data: X data points. 254 :param peaks_regions: 2D Nx2 array with peak regions indexes 255 (rows) as left and right borders (columns). 256 :param d_expand: Value to expand borders to (in X data scale). 257 258 :return: 2D Nx2 array with expanded peak regions indexes (rows) as 259 left and right borders (columns). 260 """ 261 262 data_regions = np.copy(x_data[peaks_regions]) 263 264 # determine scale orientation, i.e. decreasing (e.g. ppm on NMR spectrum) 265 # or increasing (e.g. wavelength on UV spectrum) 266 if (data_regions[:, 0] - data_regions[:, 1]).mean() > 0: 267 # ppm-like scale 268 data_regions[:, 0] += d_expand 269 data_regions[:, -1] -= d_expand 270 else: 271 # wavelength-like scale 272 data_regions[:, 0] -= d_expand 273 data_regions[:, -1] += d_expand 274 275 # converting new values to new indexes 276 for index_, value in np.ndenumerate(data_regions): 277 data_regions[index_] = find_nearest_value_index(x_data, value)[1] 278 279 return data_regions.astype(int)
15def create_binary_peak_map(data: np.ndarray) -> np.ndarray: 16 """Return binary map of the peaks within data points. 17 18 True values are assigned to potential peak points, False - to baseline. 19 20 :param data: 1D array with data points. 21 22 :return: Mapping of data points, where True is potential peak region point, False - baseline. 23 """ 24 # copying array 25 data_c = np.copy(data) 26 27 # placeholder for the peak mapping 28 peak_map = np.full_like(data_c, False, dtype=bool) 29 30 for _ in range(100500): # shouldn't take more iterations 31 # looking for peaks 32 peaks_found = np.logical_or( 33 data_c > np.mean(data_c) + np.std(data_c) * 3, 34 data_c < np.mean(data_c) - np.std(data_c) * 3, 35 ) 36 37 # merging with peak mapping 38 np.logical_or(peak_map, peaks_found, out=peak_map) 39 40 # if no peaks found - break 41 if not peaks_found.any(): 42 break 43 44 # setting values to 0 and iterating again 45 data_c[peaks_found] = 0 46 47 return peak_map
Return binary map of the peaks within data points.
True values are assigned to potential peak points, False - to baseline.
Parameters
- data: 1D array with data points.
Returns
Mapping of data points, where True is potential peak region point, False - baseline.
50def combine_map_to_regions(mapping: np.ndarray) -> np.ndarray: 51 """Combine True values into their indexes arrays. 52 53 :param mapping: Boolean mapping array to extract the indexes from. 54 :return: 2D array with left and right borders of regions, where mapping is True. 55 56 Example: 57 >>> combine_map_to_regions(np.array([True, True, False, True, False])) 58 ... array([[0, 1], [3, 3]]) 59 """ 60 61 # No peaks identified, i.e. mapping is all False 62 if not mapping.any(): 63 return np.array([], dtype="int64") 64 65 # region borders 66 region_borders = np.diff(mapping) 67 68 # corresponding indexes 69 border_indexes = np.argwhere(region_borders) 70 71 lefts = border_indexes[::2] + 1 # because diff was used to get the index 72 73 # edge case, where first peak doesn't have left border 74 if mapping[border_indexes][0]: 75 # just preppend 0 as first left border 76 # mind the vstack, as np.argwhere produces a vector array 77 lefts = np.vstack((0, lefts)) 78 79 rights = border_indexes[1::2] 80 81 # another edge case, where last peak doesn't have a right border 82 if mapping[-1]: # True if last point identified as potential peak 83 # just append -1 as last peak right border 84 rights = np.vstack((rights, -1)) 85 86 # columns as borders, rows as regions, i.e. 87 # :output:[0] -> first peak region 88 return np.hstack((lefts, rights))
Combine True values into their indexes arrays.
Parameters
- mapping: Boolean mapping array to extract the indexes from.
Returns
2D array with left and right borders of regions, where mapping is True.
Example:
>>> combine_map_to_regions(np.array([True, True, False, True, False]))
... array([[0, 1], [3, 3]])
91def filter_regions(x_data: np.ndarray, peaks_regions: np.ndarray) -> np.ndarray: 92 """Filter peak regions. 93 94 Peak regions are filtered to remove potential false positives (e.g. noise 95 spikes). 96 97 :param x_data: X data points, needed to pick up the data resolution and map the region indexes to the corresponding data 98 points. 99 :param peaks_regions: 2D Nx2 array with peak regions indexes (rows) as left and right borders (columns). 100 101 :return: 2D Mx2 array with filtered peak regions indexes(rows) as 102 left and right borders (columns). 103 """ 104 105 # filter peaks where region is smaller than spectrum resolution 106 # like single spikes, e.g. noise 107 # compute the regions first 108 x_data_regions = np.copy(x_data[peaks_regions]) 109 110 # get arguments where absolute difference is greater than data resolution 111 resolution = np.absolute(np.mean(np.diff(x_data))) 112 113 # (N, 1) array! 114 valid_regions_map = np.absolute(np.diff(x_data_regions)) > resolution 115 116 # get their indexes, mind the flattening of all arrays! 117 valid_regions_indexes = np.argwhere(valid_regions_map.flatten()).flatten() 118 119 # filtering! 120 peaks_regions = peaks_regions[valid_regions_indexes] 121 122 return peaks_regions
Filter peak regions.
Peak regions are filtered to remove potential false positives (e.g. noise spikes).
Parameters
- x_data: X data points, needed to pick up the data resolution and map the region indexes to the corresponding data points.
- peaks_regions: 2D Nx2 array with peak regions indexes (rows) as left and right borders (columns).
Returns
2D Mx2 array with filtered peak regions indexes(rows) as left and right borders (columns).
125def filter_noisy_regions(y_data: np.ndarray, peaks_regions: np.ndarray) -> np.ndarray: 126 """Remove noisy regions from given regions array. 127 128 Peak regions are filtered to remove false positive noise regions, e.g. 129 incorrectly assigned due to curvy baseline. Filtering is performed by 130 computing average peak points/data points ratio. 131 132 :param y_data: Y data points, needed to validate if the peaks 133 are actually present in the region and remove invalid regions. 134 :param peaks_regions: 2D Nx2 array with peak regions indexes 135 (rows) as left and right borders (columns). 136 137 :return: 2D Mx2 array with filtered peak regions indexes(rows) as 138 left and right borders (columns). 139 """ 140 141 # compute the actual regions data points 142 y_data_regions = [] 143 for region in peaks_regions: 144 y_data_regions.append(y_data[region[0] : region[-1]]) 145 146 # compute noise data regions, i.e. in between peak regions 147 noise_data_regions = [] 148 for row, _ in enumerate(peaks_regions): 149 try: 150 noise_data_regions.append( 151 y_data[peaks_regions[row][1] : peaks_regions[row + 1][0]] 152 ) 153 except IndexError: 154 # exception for the last row -> discard 155 pass 156 157 # compute average peaks/data points ratio for noisy regions 158 noise_peaks_ratio = [] 159 for region in noise_data_regions: 160 # protection from empty regions 161 if region.size != 0: 162 # minimum height is pretty low to ensure enough noise is picked 163 peaks, _ = scipy.signal.find_peaks(region, height=region.max() * 0.2) 164 noise_peaks_ratio.append(peaks.size / region.size) 165 166 # compute average with weights equal to the region length 167 noise_peaks_ratio = np.average( 168 noise_peaks_ratio, weights=[region.size for region in noise_data_regions] 169 ) 170 171 # filtering! 172 valid_regions_indexes = [] 173 for row, region in enumerate(y_data_regions): 174 peaks, _ = scipy.signal.find_peaks(region, height=region.max() * 0.2) 175 if peaks.size != 0 and peaks.size / region.size < noise_peaks_ratio: 176 valid_regions_indexes.append(row) 177 178 # protecting from complete cleaning 179 if not valid_regions_indexes: 180 return peaks_regions 181 182 peaks_regions = peaks_regions[np.array(valid_regions_indexes)] 183 184 return peaks_regions
Remove noisy regions from given regions array.
Peak regions are filtered to remove false positive noise regions, e.g. incorrectly assigned due to curvy baseline. Filtering is performed by computing average peak points/data points ratio.
Parameters
- y_data: Y data points, needed to validate if the peaks are actually present in the region and remove invalid regions.
- peaks_regions: 2D Nx2 array with peak regions indexes (rows) as left and right borders (columns).
Returns
2D Mx2 array with filtered peak regions indexes(rows) as left and right borders (columns).
187def merge_regions( 188 x_data: np.ndarray, 189 peaks_regions: np.ndarray, 190 d_merge: float, 191 recursively: Optional[bool] = True, 192): 193 """Merge peak regions if distance between is less than delta. 194 195 :param x_data: X data points. 196 :param peaks_regions: 2D Nx2 array with peak regions indexes 197 (rows) as left and right borders (columns). 198 :param d_merge: Minimum distance in X data points to merge two or more 199 regions together. 200 :param recursively: If True - will repeat the procedure until 201 all regions with distance < than d_merge will merge. 202 203 :return: 2D Mx2 array with peak regions indexes (rows) as left and 204 right borders (columns), merged according to predefined minimal 205 distance. 206 207 Example: 208 >>> regions = np.array( 209 ... [[1, 10], [11, 20], [25, 45], [50, 75], [100, 120], [122, 134]] 210 ... ) 211 >>> data = np.ones_like(regions) # ones as example 212 >>> merge_regions(data, regions, 1) 213 ... array([[1, 20], [ 25, 45], [ 50, 75], [100, 120], [122, 134]]) 214 >>> merge_regions(data, regions, 20, True) 215 ... array([[1, 75], [100, 134]]) 216 """ 217 # the code is pretty ugly but works 218 merged_regions = [] 219 220 # converting to list to drop the data of the fly 221 regions = peaks_regions.tolist() 222 223 for i, _ in enumerate(regions): 224 try: 225 # check left border of i regions with right of i+1 226 if abs(x_data[regions[i][-1]] - x_data[regions[i + 1][0]]) <= d_merge: 227 # if lower append merge the regions 228 merged_regions.append([regions[i][0], regions[i + 1][-1]]) 229 # drop the merged one 230 regions.pop(i + 1) 231 else: 232 # if nothing to merge, just append the current region 233 merged_regions.append(regions[i]) 234 except IndexError: 235 # last row 236 merged_regions.append(regions[i]) 237 238 merged_regions = np.array(merged_regions) 239 240 if not recursively: 241 return merged_regions 242 243 # if recursively, check for the difference 244 if (merged_regions == regions).all(): 245 # done 246 return merged_regions 247 248 return merge_regions(x_data, merged_regions, d_merge, recursively=True)
Merge peak regions if distance between is less than delta.
Parameters
- x_data: X data points.
- peaks_regions: 2D Nx2 array with peak regions indexes (rows) as left and right borders (columns).
- d_merge: Minimum distance in X data points to merge two or more regions together.
- recursively: If True - will repeat the procedure until all regions with distance < than d_merge will merge.
Returns
2D Mx2 array with peak regions indexes (rows) as left and right borders (columns), merged according to predefined minimal distance.
Example:
>>> regions = np.array(
... [[1, 10], [11, 20], [25, 45], [50, 75], [100, 120], [122, 134]]
... )
>>> data = np.ones_like(regions) # ones as example
>>> merge_regions(data, regions, 1)
... array([[1, 20], [ 25, 45], [ 50, 75], [100, 120], [122, 134]])
>>> merge_regions(data, regions, 20, True)
... array([[1, 75], [100, 134]])
251def expand_regions(x_data: np.ndarray, peaks_regions: np.ndarray, d_expand: float): 252 """Expand the peak regions by the desired value. 253 254 :param x_data: X data points. 255 :param peaks_regions: 2D Nx2 array with peak regions indexes 256 (rows) as left and right borders (columns). 257 :param d_expand: Value to expand borders to (in X data scale). 258 259 :return: 2D Nx2 array with expanded peak regions indexes (rows) as 260 left and right borders (columns). 261 """ 262 263 data_regions = np.copy(x_data[peaks_regions]) 264 265 # determine scale orientation, i.e. decreasing (e.g. ppm on NMR spectrum) 266 # or increasing (e.g. wavelength on UV spectrum) 267 if (data_regions[:, 0] - data_regions[:, 1]).mean() > 0: 268 # ppm-like scale 269 data_regions[:, 0] += d_expand 270 data_regions[:, -1] -= d_expand 271 else: 272 # wavelength-like scale 273 data_regions[:, 0] -= d_expand 274 data_regions[:, -1] += d_expand 275 276 # converting new values to new indexes 277 for index_, value in np.ndenumerate(data_regions): 278 data_regions[index_] = find_nearest_value_index(x_data, value)[1] 279 280 return data_regions.astype(int)
Expand the peak regions by the desired value.
Parameters
- x_data: X data points.
- peaks_regions: 2D Nx2 array with peak regions indexes (rows) as left and right borders (columns).
- d_expand: Value to expand borders to (in X data scale).
Returns
2D Nx2 array with expanded peak regions indexes (rows) as left and right borders (columns).