Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
""" Utilities for main functions. """
functions: Optional[List[str]] = None, *, adapter: Optional[Callable[[Namespace], None]] = None) -> None: """ A generic ``main`` function for configurable functions.
See :py:func:`dynamake.application.main`. """ adapter(args)
""" Ensure we can use the maximal number of open files at the same time. """
""" Perform last minute adaptation of the program following parsing the command line options. """
""" Reset the global state (for tests). """
""" Return a range of indices for an indexed invocation.
Each invocation ``index`` will get its own range, where the range sizes will be the same (as much as possible) for each invocation.
If the number of ``invocations`` is zero, it is assumed to be the number of available parallel processes, that is, that there will be one invocation per parallel process (at most ``size``). """
""" A logger adapter that performs a file lock around each logged messages.
If used consistently in multiple applications, this ensures that logging does not get garbled, even when running across multiple machines. """
""" Create a logger adapter that locks the specified directory path. """ os.makedirs(directory, exist_ok=True)
""" Log a message while locking the directory. """
""" Perform some action while holding a file lock. """ except BaseException: # pylint: disable=broad-except slept += 0.1 if slept > 60: raise RuntimeError('Failed to obtain lock file: %s ' 'for more than 60 seconds' % lock_path) sleep(0.1) continue
finally:
""" Wrap a logger so that messages will not get interleaved with other program invocations and/or the messages from the ``tg_qsub`` script. """ return logger os.path.join(os.getenv('QSUB_TMP_DIR', '.qsub'), 'lock'))
""" Log progress for a (possibly parallel) loop. """
log_with: Optional[int] = None, expected: Optional[int] = None) -> None: #: The format of the start message. self.start = start
#: The format of the progress message. self.progress = progress
#: The format of the completion messages. self.completed = completed
#: Emit a log message every this amount of iterations (typically a power of 10). self.log_every = log_every
#: The value in the log message is divided by this amount (typically a power of 1000). self.log_with = log_with or log_every
#: The shared memory iteration counter. self.shared_counter = Value(ctypes.c_int32) # type: ignore
#: Granularity of parallel counting. self.local_every = self.log_every // 10
#: The expected number of increments. self.expected = expected
if self.expected is None or self.expected >= self.log_every: Prog.logger.info(self.start)
return self
self.done()
""" Indicate a loop iteration.
Ideally is called at the end of the iteration to indicate the iteration has completed. If the loop code is complex (contains ``continue`` etc.) then it is placed at the start of the code. """ with self.shared_counter.get_lock(): self.shared_counter.value += 1 total = self.shared_counter.value
if total % self.log_every > 0: return
if fraction is None and self.expected is not None: fraction = total / self.expected
if fraction is None: Prog.logger.info(self.progress, total // self.log_with) else: Prog.logger.info(self.progress, total // self.log_with, 100 * fraction)
""" Indicate the loop has completed. """ total = self.shared_counter.value if total >= self.log_every or self.expected is None or self.expected >= self.log_every: Prog.logger.info(self.completed, total)
""" Loop on each file line. """ size = Stat.stat(path).st_size number = 0 offset = 0 with open(path, 'r') as file: for line in file: number += 1 yield number, line if loop is None: continue offset += len(line) fraction = offset / size loop.step(fraction) |