# -----------------------------------------------------------------------------
# OwlPy - AGPLv3
#
# This file is part of the OwlPy library. For licensing information see the
# accompanying file `LICENSE`.
#
# The OwlPy Developers, 21st century
# -----------------------------------------------------------------------------
import math
import numpy as np
try:
from pyrocko.trace import Trace as PTrace
except ImportError:
PTrace = None
try:
from obspy import Trace as OTrace
except ImportError:
OTrace = None
from .error import OwlPyError
def _all_same(xs):
return all(x == xs[0] for x in xs)
def _unpack_trace(tr):
'''
Get trace attributes in a framework agnostic way.
'''
if PTrace and isinstance(tr, PTrace):
return (
tr.ydata,
tr.deltat,
tr.tmin)
elif OTrace and isinstance(tr, OTrace):
return (
tr.data,
1.0/tr.stats.sampling_rate,
tr.starttime.timestamp)
else:
raise TypeError(
'Expected ObsPy or Pyrocko trace but got object of type "%s".'
% str(type(tr)))
[docs]def get_traces_data_as_array(traces):
'''
Merge data samples from multiple traces into a 2D array.
:param traces:
Input waveforms.
:type traces:
list of :py:class:`obspy.Trace <obspy.core.trace.Trace>`
or :py:class:`pyrocko.Trace <pyrocko.trace.Trace>` objects
:raises:
:py:class:`~owlpy.error.OwlPyError` if traces have different time
span, sample rate or data type, or if traces is an empty list.
:returns:
2D array as ``data[itrace, isample]``.
:rtype:
:py:class:`numpy.ndarray`
'''
if not traces:
raise OwlPyError('Need at least one trace.')
udata = [_unpack_trace(tr) for tr in traces]
if not _all_same([
(data.size, data.dtype, deltat, tmin)
for (data, deltat, tmin)
in udata]):
raise OwlPyError(
'Given traces are incompatible. Unable to join multiple '
'components into a single 2D array. Sampling rate, start time, '
'number of samples and data type must match.')
return np.vstack([data for (data, _, _) in udata])
[docs]class ArangeError(Exception):
pass
[docs]def arange2(start, stop, step, dtype=float, epsilon=1e-6, error='raise'):
'''
Return evenly spaced numbers over a specified interval.
Like :py:func:`numpy.arange` but returning floating point numbers by
default and with defined behaviour when stepsize is inconsistent with
interval bounds. It is considered inconsistent if the difference between
the closest multiple of ``step`` and ``stop`` is larger than ``epsilon *
step``. Inconsistencies are handled according to the ``error`` parameter.
If it is set to ``'raise'`` an exception of type :py:exc:`ArangeError` is
raised. If it is set to ``'round'``, ``'floor'``, or ``'ceil'``, ``stop``
is silently changed to the closest, the next smaller, or next larger
multiple of ``step``, respectively.
This function has been adapted from Pyrocko (pyrocko.util.arange2).
'''
assert error in ('raise', 'round', 'floor', 'ceil')
start = dtype(start)
stop = dtype(stop)
step = dtype(step)
rnd = {'floor': math.floor, 'ceil': math.ceil}.get(error, round)
n = int(rnd((stop - start) / step)) + 1
stop_check = start + (n-1) * step
if error == 'raise' and abs(stop_check - stop) > step * epsilon:
raise ArangeError(
'inconsistent range specification: start=%g, stop=%g, step=%g'
% (start, stop, step))
x = np.arange(n, dtype=dtype)
x *= step
x += start
return x
def moving_sum(x, n, mode='valid'):
n = int(n)
cx = np.cumsum(x, axis=-1)
nn = x.shape[-1]
def xzeros(n):
return np.zeros(shape=x.shape[:-1] + (n,), dtype=cx.dtype)
if mode == 'valid':
if nn-n+1 <= 0:
return xzeros(0)
y = xzeros(nn-n+1)
y[..., 0] = cx[..., n-1]
y[..., 1:nn-n+1] = cx[..., n:nn]-cx[..., 0:nn-n]
if mode == 'full':
y = xzeros(nn+n-1)
if n <= nn:
y[..., 0:n] = cx[..., 0:n]
y[..., n:nn] = cx[..., n:nn] - cx[..., 0:nn-n]
y[..., nn:nn+n-1] = cx[..., -1]-cx[..., nn-n:nn-1]
else:
y[..., 0:nn] = cx[..., 0:nn]
y[..., nn:n] = cx[..., nn-1]
y[..., n:nn+n-1] = cx[..., nn-1] - cx[..., 0:nn-1]
if mode == 'same':
n1 = (n-1)//2
y = xzeros(nn)
if n <= nn:
y[..., 0:n-n1] = cx[..., n1:n]
y[..., n-n1:nn-n1] = cx[..., n:nn]-cx[..., 0:nn-n]
y[..., nn-n1:nn] = cx[..., nn-1, np.newaxis] \
- cx[..., nn-n:nn-n+n1]
else:
y[..., 0:max(0, nn-n1)] = cx[..., min(n1, nn):nn]
y[..., max(nn-n1, 0):min(n-n1, nn)] = cx[..., nn-1]
y[..., min(n-n1, nn):nn] = cx[..., nn-1] \
- cx[..., 0:max(0, nn-(n-n1))]
return y