Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

254

255

256

257

258

259

260

261

262

263

264

265

266

267

268

269

270

271

272

273

274

275

276

277

278

279

280

281

282

283

284

285

286

287

288

289

290

291

292

293

294

295

296

297

298

""" 

Generic utilities. 

""" 

 

from tgutils.application import * # pylint: disable=wildcard-import,unused-wildcard-import 

from typing import List 

from typing import Optional 

from uuid import UUID 

 

import ctypes 

import hashlib 

import mmap 

import scipy.stats as ss # type: ignore 

import tgutils.numpy as np 

import tgutils.pandas as pd 

 

 

def file_uuid(path: str) -> UUID: 

""" 

Compute a checksum of a disk file. 

""" 

md5 = hashlib.md5() 

 

with open(path, 'rb') as file: 

mapped = mmap.mmap(file.fileno(), 0, prot=mmap.PROT_READ) 

md5.update(mapped) # type: ignore 

mapped.close() 

 

return UUID(bytes=md5.digest()) 

 

 

def combine_uuids(uuids: List[UUID]) -> UUID: 

""" 

Use ``md5sum`` to combine multiple UUIDs into a single UUID. 

""" 

md5 = hashlib.md5() 

for uuid in uuids: 

md5.update(uuid.bytes) 

return UUID(bytes=md5.digest()) 

 

 

@config() 

def downsample_frame_columns(frame: pd.FrameFloat32, samples: int, 

loop: Optional[Loop] = None) -> pd.FrameFloat32: 

""" 

Downsample each of the columns of a data frame. 

""" 

downsampled_matrix = map_columns(np.MatrixFloat32.am(frame.values), 

lambda column: downsample_array(column, samples), 

loop) 

return pd.FrameFloat32.be(downsampled_matrix, index=frame.index, columns=frame.columns) 

 

 

@config(random=True) 

def downsample_array(array: np.ArrayFloat32, samples: int) -> np.ArrayFloat32: 

""" 

Downsample an array so its sum will not exceed the specified number of samples. 

""" 

rounded = np.ArrayFloat32.am(np.floor(array)) 

 

fractions = np.ArrayFloat32.am(array - rounded) 

if fractions.max() > 0: 

random_fractions = np.random.rand(fractions.size) 

extra_bools = np.ArrayBool.am(random_fractions < fractions) 

rounded += np.ArrayFloat32.be(extra_bools) # type: ignore 

 

if rounded.sum() <= samples: 

return np.ArrayFloat32.am(rounded) 

 

integers = np.ArrayInt32.be(rounded) 

distribution = np.repeat(np.arange(integers.size), integers) 

sampled = np.random.choice(distribution, size=samples, replace=False) 

downsampled = np.bincount(sampled, minlength=array.size) 

 

return np.ArrayFloat32.be(downsampled) 

 

 

_ROLLING_FUNCTIONS = { 

'mean': pd.core.window.Rolling.mean, 

'median': pd.core.window.Rolling.median, 

'std': pd.core.window.Rolling.std, 

'var': pd.core.window.Rolling.var, 

} 

 

 

def sliding_window_functions(values: np.ArrayFloat32, order_by: np.ArrayFloat32, window_size: int, 

functions: List[str]) -> Dict[str, np.ArrayFloat32]: 

""" 

Compute some function(s) on a sliding window. 

 

The values are first sorted by the content of a second array, then the sliding window is applied 

(repeating the 1st and last elements as needed), the function(s) are applied, and the results 

are unsorted back into the proper positions. 

 

Currently supported functions are ``median``, ``mean``, ``std`` and ``var``. 

""" 

assert len(values) == len(order_by) 

 

if window_size % 2 == 0: 

window_size += 1 

 

half_window_size = (window_size - 1) // 2 

 

order_indices = np.argsort(order_by) 

reverse_order_indices = np.argsort(order_indices) 

 

minimal_index = order_indices[0] 

maximal_index = order_indices[-1] 

extended_order_indices = np.concatenate([ 

np.repeat(minimal_index, half_window_size), 

order_indices, 

np.repeat(maximal_index, half_window_size), 

]) 

extended_series = pd.SeriesFloat32.be(values[extended_order_indices]) 

 

rolling_windows = extended_series.rolling(window_size) 

results: Dict[str, np.ArrayFloat32] = {} 

for name in functions: 

function = _ROLLING_FUNCTIONS[name] 

computed = np.ArrayFloat32.be(function(rolling_windows).values) 

reordered = np.ArrayFloat32.am(computed[window_size - 1:]) 

results[name] = np.ArrayFloat32.am(reordered[reverse_order_indices]) 

 

return results 

 

 

def rank_array(array: np.ArrayFloat32) -> np.ArrayFloat32: 

""" 

Convert an array of values to an array of ranks. 

 

The result is still an array of float where NaN values get a NaN rank. 

""" 

ranks = np.ArrayFloat32.be(ss.rankdata(array, method='ordinal')) 

ranks[ranks > non_nans_count(array)] = None 

return ranks 

 

 

def only_lowest_ranks(array: np.ArrayFloat32, keep_count: int) -> np.ArrayFloat32: 

""" 

Return only the lowest few values in an array. 

""" 

result = np.ArrayFloat32.am(array.copy()) 

 

if non_nans_count(array) <= keep_count: 

return result 

 

indices = np.argpartition(array, keep_count) 

result[indices[keep_count:]] = None 

 

return result 

 

 

def non_nans_count(array: np.ArrayFloat32) -> int: 

""" 

Count the number of non-NaN values in an array. 

""" 

return len(array) - nans_count(array) 

 

 

def nans_count(array: np.ArrayFloat32) -> int: 

""" 

Count the number of NaN values in an array. 

""" 

return np.count_nonzero(np.isnan(array)) 

 

 

def sum_profiles_loop(expected: int) -> Loop: 

""" 

Create a logged loop for summing profiles. 

""" 

return Loop(start='Sum profiles...', 

progress='Summed %sK profiles (%.2f%%)...', 

completed='Summed %s profiles', 

log_every=100_000, 

log_with=1_000, 

expected=expected) 

 

 

@config() 

def map_rows(matrix: np.MatrixFloat32, 

row_function: Callable[[np.ArrayFloat32], np.ArrayFloat32], 

loop: Optional[Loop] = None) -> np.MatrixFloat32: 

""" 

Create a new matrix whose rows are the results of applying a function to each input matrix row. 

""" 

return _map_axis(matrix, 0, row_function, loop, with_index=False) 

 

 

@config() 

def map_columns(matrix: np.MatrixFloat32, 

column_function: Callable[[np.ArrayFloat32], np.ArrayFloat32], 

loop: Optional[Loop] = None) -> np.MatrixFloat32: 

""" 

Create a new matrix whose columns are the results of applying a function to each input matrix 

column. 

""" 

return _map_axis(matrix, 1, column_function, loop, with_index=False) 

 

 

# @config() 

# def map_indexed_rows(matrix: np.MatrixFloat32, 

# row_function: Callable[[int, np.ArrayFloat32], np.ArrayFloat32], 

# loop: Optional[Loop] = None) -> np.MatrixFloat32: 

# """ 

# Create a new matrix whose rows are the results of applying a function to each input matrix row. 

# """ 

# return _map_axis(matrix, 0, row_function, loop, with_index=True) 

 

 

@config() 

def map_indexed_columns(matrix: np.MatrixFloat32, 

column_function: Callable[[int, np.ArrayFloat32], np.ArrayFloat32], 

loop: Optional[Loop] = None) -> np.MatrixFloat32: 

""" 

Create a new matrix whose columns are the results of applying a function to each input matrix 

column. 

""" 

return _map_axis(matrix, 1, column_function, loop, with_index=True) 

 

 

@config(parallel=True) 

def _map_axis(input_matrix: np.MatrixFloat32, axis: int, array_function: Any, 

loop: Optional[Loop], *, with_index: bool) -> np.MatrixFloat32: 

arrays_count = input_matrix.shape[axis] 

processes_count = processes_for(arrays_count) 

 

result_matrix = np.MatrixFloat32.shared_memory_zeros(input_matrix.shape) 

 

parallel(processes_count, 

_apply_array_function, 

input_matrix, 

axis, 

array_function, 

with_index, 

result_matrix, 

loop, 

kwargs=lambda index: dict(arrays_indices=indexed_range(index, size=arrays_count))) 

 

return result_matrix 

 

 

def _apply_array_function(input_matrix: np.MatrixFloat32, axis: int, 

array_function: Any, 

with_index: bool, 

result_matrix: np.MatrixFloat32, 

loop: Optional[Loop], *, 

arrays_indices: range) -> None: 

for index in arrays_indices: 

if axis == 0: 

if with_index: 

result_matrix[index, :] = array_function(index, input_matrix[index, :]) 

else: 

result_matrix[index, :] = array_function(input_matrix[index, :]) 

else: 

if with_index: 

result_matrix[:, index] = array_function(index, input_matrix[:, index]) 

else: 

result_matrix[:, index] = array_function(input_matrix[:, index]) 

if loop is not None: 

loop.step() 

 

 

Param(name='base_fraction_umis', metavar='FLOAT', default=3, 

parser=str2float(min=0, include_min=False), 

description=""" 

The number of UMIs to add to both nominmator and denominator when dividing UMI 

fractions. This is divided by the representative total number of UMIs. 

""") 

 

 

Param(name='representative_umis_quantile', metavar='FLOAT', default=0.05, 

parser=str2float(min=0, max=1, include_min=False), 

description=""" 

The quantile of the total number of UMIs to use as the representative number of 

UMIs when dividing UMI fractions. This is then used to scale the base fractions 

UMIs. 

""") 

 

 

@config() 

def normalization_base_fraction( # 

counts_of_umis: np.ArrayFloat32, 

*, 

base_fraction_umis: float = env(), 

representative_umis_quantile: float = env(), 

) -> float: 

""" 

Compute the base fraction for normalizing divided fractions. 

 

The idea is to add a small fraction whose scale represents the uncertainty, based on the total 

number of samples. 

""" 

if counts_of_umis.size == 0: 

return 1.0 

representative_umis_count = np.quantile(counts_of_umis, representative_umis_quantile) 

if representative_umis_count == 0: 

return 1.0 

return base_fraction_umis / representative_umis_count