Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
""" Generic utilities. """
""" Compute a checksum of a disk file. """
""" Use ``md5sum`` to combine multiple UUIDs into a single UUID. """ md5 = hashlib.md5() for uuid in uuids: md5.update(uuid.bytes) return UUID(bytes=md5.digest())
loop: Optional[Loop] = None) -> pd.FrameFloat32: """ Downsample each of the columns of a data frame. """ downsampled_matrix = map_columns(np.MatrixFloat32.am(frame.values), lambda column: downsample_array(column, samples), loop) return pd.FrameFloat32.be(downsampled_matrix, index=frame.index, columns=frame.columns)
""" Downsample an array so its sum will not exceed the specified number of samples. """ rounded = np.ArrayFloat32.am(np.floor(array))
fractions = np.ArrayFloat32.am(array - rounded) if fractions.max() > 0: random_fractions = np.random.rand(fractions.size) extra_bools = np.ArrayBool.am(random_fractions < fractions) rounded += np.ArrayFloat32.be(extra_bools) # type: ignore
if rounded.sum() <= samples: return np.ArrayFloat32.am(rounded)
integers = np.ArrayInt32.be(rounded) distribution = np.repeat(np.arange(integers.size), integers) sampled = np.random.choice(distribution, size=samples, replace=False) downsampled = np.bincount(sampled, minlength=array.size)
return np.ArrayFloat32.be(downsampled)
'mean': pd.core.window.Rolling.mean, 'median': pd.core.window.Rolling.median, 'std': pd.core.window.Rolling.std, 'var': pd.core.window.Rolling.var, }
functions: List[str]) -> Dict[str, np.ArrayFloat32]: """ Compute some function(s) on a sliding window.
The values are first sorted by the content of a second array, then the sliding window is applied (repeating the 1st and last elements as needed), the function(s) are applied, and the results are unsorted back into the proper positions.
Currently supported functions are ``median``, ``mean``, ``std`` and ``var``. """ assert len(values) == len(order_by)
if window_size % 2 == 0: window_size += 1
half_window_size = (window_size - 1) // 2
order_indices = np.argsort(order_by) reverse_order_indices = np.argsort(order_indices)
minimal_index = order_indices[0] maximal_index = order_indices[-1] extended_order_indices = np.concatenate([ np.repeat(minimal_index, half_window_size), order_indices, np.repeat(maximal_index, half_window_size), ]) extended_series = pd.SeriesFloat32.be(values[extended_order_indices])
rolling_windows = extended_series.rolling(window_size) results: Dict[str, np.ArrayFloat32] = {} for name in functions: function = _ROLLING_FUNCTIONS[name] computed = np.ArrayFloat32.be(function(rolling_windows).values) reordered = np.ArrayFloat32.am(computed[window_size - 1:]) results[name] = np.ArrayFloat32.am(reordered[reverse_order_indices])
return results
""" Convert an array of values to an array of ranks.
The result is still an array of float where NaN values get a NaN rank. """ ranks = np.ArrayFloat32.be(ss.rankdata(array, method='ordinal')) ranks[ranks > non_nans_count(array)] = None return ranks
""" Return only the lowest few values in an array. """ result = np.ArrayFloat32.am(array.copy())
if non_nans_count(array) <= keep_count: return result
indices = np.argpartition(array, keep_count) result[indices[keep_count:]] = None
return result
""" Count the number of non-NaN values in an array. """ return len(array) - nans_count(array)
""" Count the number of NaN values in an array. """ return np.count_nonzero(np.isnan(array))
""" Create a logged loop for summing profiles. """ return Loop(start='Sum profiles...', progress='Summed %sK profiles (%.2f%%)...', completed='Summed %s profiles', log_every=100_000, log_with=1_000, expected=expected)
row_function: Callable[[np.ArrayFloat32], np.ArrayFloat32], loop: Optional[Loop] = None) -> np.MatrixFloat32: """ Create a new matrix whose rows are the results of applying a function to each input matrix row. """ return _map_axis(matrix, 0, row_function, loop, with_index=False)
column_function: Callable[[np.ArrayFloat32], np.ArrayFloat32], loop: Optional[Loop] = None) -> np.MatrixFloat32: """ Create a new matrix whose columns are the results of applying a function to each input matrix column. """ return _map_axis(matrix, 1, column_function, loop, with_index=False)
# @config() # def map_indexed_rows(matrix: np.MatrixFloat32, # row_function: Callable[[int, np.ArrayFloat32], np.ArrayFloat32], # loop: Optional[Loop] = None) -> np.MatrixFloat32: # """ # Create a new matrix whose rows are the results of applying a function to each input matrix row. # """ # return _map_axis(matrix, 0, row_function, loop, with_index=True)
column_function: Callable[[int, np.ArrayFloat32], np.ArrayFloat32], loop: Optional[Loop] = None) -> np.MatrixFloat32: """ Create a new matrix whose columns are the results of applying a function to each input matrix column. """ return _map_axis(matrix, 1, column_function, loop, with_index=True)
loop: Optional[Loop], *, with_index: bool) -> np.MatrixFloat32: arrays_count = input_matrix.shape[axis] processes_count = processes_for(arrays_count)
result_matrix = np.MatrixFloat32.shared_memory_zeros(input_matrix.shape)
parallel(processes_count, _apply_array_function, input_matrix, axis, array_function, with_index, result_matrix, loop, kwargs=lambda index: dict(arrays_indices=indexed_range(index, size=arrays_count)))
return result_matrix
array_function: Any, with_index: bool, result_matrix: np.MatrixFloat32, loop: Optional[Loop], *, arrays_indices: range) -> None: for index in arrays_indices: if axis == 0: if with_index: result_matrix[index, :] = array_function(index, input_matrix[index, :]) else: result_matrix[index, :] = array_function(input_matrix[index, :]) else: if with_index: result_matrix[:, index] = array_function(index, input_matrix[:, index]) else: result_matrix[:, index] = array_function(input_matrix[:, index]) if loop is not None: loop.step()
parser=str2float(min=0, include_min=False), description=""" The number of UMIs to add to both nominmator and denominator when dividing UMI fractions. This is divided by the representative total number of UMIs. """)
parser=str2float(min=0, max=1, include_min=False), description=""" The quantile of the total number of UMIs to use as the representative number of UMIs when dividing UMI fractions. This is then used to scale the base fractions UMIs. """)
counts_of_umis: np.ArrayFloat32, *, base_fraction_umis: float = env(), representative_umis_quantile: float = env(), ) -> float: """ Compute the base fraction for normalizing divided fractions.
The idea is to add a small fraction whose scale represents the uncertainty, based on the total number of samples. """ if counts_of_umis.size == 0: return 1.0 representative_umis_count = np.quantile(counts_of_umis, representative_umis_quantile) if representative_umis_count == 0: return 1.0 return base_fraction_umis / representative_umis_count |