| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158 |
- """
- Routines for filling missing data.
- """
- from __future__ import annotations
- from functools import wraps
- from typing import (
- TYPE_CHECKING,
- Any,
- Literal,
- cast,
- overload,
- )
- import numpy as np
- from pandas._libs import (
- NaT,
- algos,
- lib,
- )
- from pandas._typing import (
- ArrayLike,
- AxisInt,
- F,
- ReindexMethod,
- npt,
- )
- from pandas.compat._optional import import_optional_dependency
- from pandas.core.dtypes.cast import infer_dtype_from
- from pandas.core.dtypes.common import (
- is_array_like,
- is_bool_dtype,
- is_numeric_dtype,
- is_numeric_v_string_like,
- is_object_dtype,
- needs_i8_conversion,
- )
- from pandas.core.dtypes.dtypes import DatetimeTZDtype
- from pandas.core.dtypes.missing import (
- is_valid_na_for_dtype,
- isna,
- na_value_for_dtype,
- )
- if TYPE_CHECKING:
- from pandas import Index
- def check_value_size(value, mask: npt.NDArray[np.bool_], length: int):
- """
- Validate the size of the values passed to ExtensionArray.fillna.
- """
- if is_array_like(value):
- if len(value) != length:
- raise ValueError(
- f"Length of 'value' does not match. Got ({len(value)}) "
- f" expected {length}"
- )
- value = value[mask]
- return value
- def mask_missing(arr: ArrayLike, values_to_mask) -> npt.NDArray[np.bool_]:
- """
- Return a masking array of same size/shape as arr
- with entries equaling any member of values_to_mask set to True
- Parameters
- ----------
- arr : ArrayLike
- values_to_mask: list, tuple, or scalar
- Returns
- -------
- np.ndarray[bool]
- """
- # When called from Block.replace/replace_list, values_to_mask is a scalar
- # known to be holdable by arr.
- # When called from Series._single_replace, values_to_mask is tuple or list
- dtype, values_to_mask = infer_dtype_from(values_to_mask)
- if isinstance(dtype, np.dtype):
- values_to_mask = np.array(values_to_mask, dtype=dtype)
- else:
- cls = dtype.construct_array_type()
- if not lib.is_list_like(values_to_mask):
- values_to_mask = [values_to_mask]
- values_to_mask = cls._from_sequence(values_to_mask, dtype=dtype, copy=False)
- potential_na = False
- if is_object_dtype(arr.dtype):
- # pre-compute mask to avoid comparison to NA
- potential_na = True
- arr_mask = ~isna(arr)
- na_mask = isna(values_to_mask)
- nonna = values_to_mask[~na_mask]
- # GH 21977
- mask = np.zeros(arr.shape, dtype=bool)
- if (
- is_numeric_dtype(arr.dtype)
- and not is_bool_dtype(arr.dtype)
- and is_bool_dtype(nonna.dtype)
- ):
- pass
- elif (
- is_bool_dtype(arr.dtype)
- and is_numeric_dtype(nonna.dtype)
- and not is_bool_dtype(nonna.dtype)
- ):
- pass
- else:
- for x in nonna:
- if is_numeric_v_string_like(arr, x):
- # GH#29553 prevent numpy deprecation warnings
- pass
- else:
- if potential_na:
- new_mask = np.zeros(arr.shape, dtype=np.bool_)
- new_mask[arr_mask] = arr[arr_mask] == x
- else:
- new_mask = arr == x
- if not isinstance(new_mask, np.ndarray):
- # usually BooleanArray
- new_mask = new_mask.to_numpy(dtype=bool, na_value=False)
- mask |= new_mask
- if na_mask.any():
- mask |= isna(arr)
- return mask
- @overload
- def clean_fill_method(
- method: Literal["ffill", "pad", "bfill", "backfill"],
- *,
- allow_nearest: Literal[False] = ...,
- ) -> Literal["pad", "backfill"]:
- ...
- @overload
- def clean_fill_method(
- method: Literal["ffill", "pad", "bfill", "backfill", "nearest"],
- *,
- allow_nearest: Literal[True],
- ) -> Literal["pad", "backfill", "nearest"]:
- ...
- def clean_fill_method(
- method: Literal["ffill", "pad", "bfill", "backfill", "nearest"],
- *,
- allow_nearest: bool = False,
- ) -> Literal["pad", "backfill", "nearest"]:
- if isinstance(method, str):
- # error: Incompatible types in assignment (expression has type "str", variable
- # has type "Literal['ffill', 'pad', 'bfill', 'backfill', 'nearest']")
- method = method.lower() # type: ignore[assignment]
- if method == "ffill":
- method = "pad"
- elif method == "bfill":
- method = "backfill"
- valid_methods = ["pad", "backfill"]
- expecting = "pad (ffill) or backfill (bfill)"
- if allow_nearest:
- valid_methods.append("nearest")
- expecting = "pad (ffill), backfill (bfill) or nearest"
- if method not in valid_methods:
- raise ValueError(f"Invalid fill method. Expecting {expecting}. Got {method}")
- return method
- # interpolation methods that dispatch to np.interp
- NP_METHODS = ["linear", "time", "index", "values"]
- # interpolation methods that dispatch to _interpolate_scipy_wrapper
- SP_METHODS = [
- "nearest",
- "zero",
- "slinear",
- "quadratic",
- "cubic",
- "barycentric",
- "krogh",
- "spline",
- "polynomial",
- "from_derivatives",
- "piecewise_polynomial",
- "pchip",
- "akima",
- "cubicspline",
- ]
- def clean_interp_method(method: str, index: Index, **kwargs) -> str:
- order = kwargs.get("order")
- if method in ("spline", "polynomial") and order is None:
- raise ValueError("You must specify the order of the spline or polynomial.")
- valid = NP_METHODS + SP_METHODS
- if method not in valid:
- raise ValueError(f"method must be one of {valid}. Got '{method}' instead.")
- if method in ("krogh", "piecewise_polynomial", "pchip"):
- if not index.is_monotonic_increasing:
- raise ValueError(
- f"{method} interpolation requires that the index be monotonic."
- )
- return method
- def find_valid_index(how: str, is_valid: npt.NDArray[np.bool_]) -> int | None:
- """
- Retrieves the positional index of the first valid value.
- Parameters
- ----------
- how : {'first', 'last'}
- Use this parameter to change between the first or last valid index.
- is_valid: np.ndarray
- Mask to find na_values.
- Returns
- -------
- int or None
- """
- assert how in ["first", "last"]
- if len(is_valid) == 0: # early stop
- return None
- if is_valid.ndim == 2:
- is_valid = is_valid.any(axis=1) # reduce axis 1
- if how == "first":
- idxpos = is_valid[::].argmax()
- elif how == "last":
- idxpos = len(is_valid) - 1 - is_valid[::-1].argmax()
- chk_notna = is_valid[idxpos]
- if not chk_notna:
- return None
- # Incompatible return value type (got "signedinteger[Any]",
- # expected "Optional[int]")
- return idxpos # type: ignore[return-value]
- def validate_limit_direction(
- limit_direction: str,
- ) -> Literal["forward", "backward", "both"]:
- valid_limit_directions = ["forward", "backward", "both"]
- limit_direction = limit_direction.lower()
- if limit_direction not in valid_limit_directions:
- raise ValueError(
- "Invalid limit_direction: expecting one of "
- f"{valid_limit_directions}, got '{limit_direction}'."
- )
- # error: Incompatible return value type (got "str", expected
- # "Literal['forward', 'backward', 'both']")
- return limit_direction # type: ignore[return-value]
- def validate_limit_area(limit_area: str | None) -> Literal["inside", "outside"] | None:
- if limit_area is not None:
- valid_limit_areas = ["inside", "outside"]
- limit_area = limit_area.lower()
- if limit_area not in valid_limit_areas:
- raise ValueError(
- f"Invalid limit_area: expecting one of {valid_limit_areas}, got "
- f"{limit_area}."
- )
- # error: Incompatible return value type (got "Optional[str]", expected
- # "Optional[Literal['inside', 'outside']]")
- return limit_area # type: ignore[return-value]
- def infer_limit_direction(
- limit_direction: Literal["backward", "forward", "both"] | None, method: str
- ) -> Literal["backward", "forward", "both"]:
- # Set `limit_direction` depending on `method`
- if limit_direction is None:
- if method in ("backfill", "bfill"):
- limit_direction = "backward"
- else:
- limit_direction = "forward"
- else:
- if method in ("pad", "ffill") and limit_direction != "forward":
- raise ValueError(
- f"`limit_direction` must be 'forward' for method `{method}`"
- )
- if method in ("backfill", "bfill") and limit_direction != "backward":
- raise ValueError(
- f"`limit_direction` must be 'backward' for method `{method}`"
- )
- return limit_direction
- def get_interp_index(method, index: Index) -> Index:
- # create/use the index
- if method == "linear":
- # prior default
- from pandas import Index
- index = Index(np.arange(len(index)))
- else:
- methods = {"index", "values", "nearest", "time"}
- is_numeric_or_datetime = (
- is_numeric_dtype(index.dtype)
- or isinstance(index.dtype, DatetimeTZDtype)
- or lib.is_np_dtype(index.dtype, "mM")
- )
- if method not in methods and not is_numeric_or_datetime:
- raise ValueError(
- "Index column must be numeric or datetime type when "
- f"using {method} method other than linear. "
- "Try setting a numeric or datetime index column before "
- "interpolating."
- )
- if isna(index).any():
- raise NotImplementedError(
- "Interpolation with NaNs in the index "
- "has not been implemented. Try filling "
- "those NaNs before interpolating."
- )
- return index
- def interpolate_2d_inplace(
- data: np.ndarray, # floating dtype
- index: Index,
- axis: AxisInt,
- method: str = "linear",
- limit: int | None = None,
- limit_direction: str = "forward",
- limit_area: str | None = None,
- fill_value: Any | None = None,
- mask=None,
- **kwargs,
- ) -> None:
- """
- Column-wise application of _interpolate_1d.
- Notes
- -----
- Alters 'data' in-place.
- The signature does differ from _interpolate_1d because it only
- includes what is needed for Block.interpolate.
- """
- # validate the interp method
- clean_interp_method(method, index, **kwargs)
- if is_valid_na_for_dtype(fill_value, data.dtype):
- fill_value = na_value_for_dtype(data.dtype, compat=False)
- if method == "time":
- if not needs_i8_conversion(index.dtype):
- raise ValueError(
- "time-weighted interpolation only works "
- "on Series or DataFrames with a "
- "DatetimeIndex"
- )
- method = "values"
- limit_direction = validate_limit_direction(limit_direction)
- limit_area_validated = validate_limit_area(limit_area)
- # default limit is unlimited GH #16282
- limit = algos.validate_limit(nobs=None, limit=limit)
- indices = _index_to_interp_indices(index, method)
- def func(yvalues: np.ndarray) -> None:
- # process 1-d slices in the axis direction
- _interpolate_1d(
- indices=indices,
- yvalues=yvalues,
- method=method,
- limit=limit,
- limit_direction=limit_direction,
- limit_area=limit_area_validated,
- fill_value=fill_value,
- bounds_error=False,
- mask=mask,
- **kwargs,
- )
- # error: Argument 1 to "apply_along_axis" has incompatible type
- # "Callable[[ndarray[Any, Any]], None]"; expected "Callable[...,
- # Union[_SupportsArray[dtype[<nothing>]], Sequence[_SupportsArray
- # [dtype[<nothing>]]], Sequence[Sequence[_SupportsArray[dtype[<nothing>]]]],
- # Sequence[Sequence[Sequence[_SupportsArray[dtype[<nothing>]]]]],
- # Sequence[Sequence[Sequence[Sequence[_SupportsArray[dtype[<nothing>]]]]]]]]"
- np.apply_along_axis(func, axis, data) # type: ignore[arg-type]
- def _index_to_interp_indices(index: Index, method: str) -> np.ndarray:
- """
- Convert Index to ndarray of indices to pass to NumPy/SciPy.
- """
- xarr = index._values
- if needs_i8_conversion(xarr.dtype):
- # GH#1646 for dt64tz
- xarr = xarr.view("i8")
- if method == "linear":
- inds = xarr
- inds = cast(np.ndarray, inds)
- else:
- inds = np.asarray(xarr)
- if method in ("values", "index"):
- if inds.dtype == np.object_:
- inds = lib.maybe_convert_objects(inds)
- return inds
- def _interpolate_1d(
- indices: np.ndarray,
- yvalues: np.ndarray,
- method: str = "linear",
- limit: int | None = None,
- limit_direction: str = "forward",
- limit_area: Literal["inside", "outside"] | None = None,
- fill_value: Any | None = None,
- bounds_error: bool = False,
- order: int | None = None,
- mask=None,
- **kwargs,
- ) -> None:
- """
- Logic for the 1-d interpolation. The input
- indices and yvalues will each be 1-d arrays of the same length.
- Bounds_error is currently hardcoded to False since non-scipy ones don't
- take it as an argument.
- Notes
- -----
- Fills 'yvalues' in-place.
- """
- if mask is not None:
- invalid = mask
- else:
- invalid = isna(yvalues)
- valid = ~invalid
- if not valid.any():
- return
- if valid.all():
- return
- # These are sets of index pointers to invalid values... i.e. {0, 1, etc...
- all_nans = set(np.flatnonzero(invalid))
- first_valid_index = find_valid_index(how="first", is_valid=valid)
- if first_valid_index is None: # no nan found in start
- first_valid_index = 0
- start_nans = set(range(first_valid_index))
- last_valid_index = find_valid_index(how="last", is_valid=valid)
- if last_valid_index is None: # no nan found in end
- last_valid_index = len(yvalues)
- end_nans = set(range(1 + last_valid_index, len(valid)))
- # Like the sets above, preserve_nans contains indices of invalid values,
- # but in this case, it is the final set of indices that need to be
- # preserved as NaN after the interpolation.
- # For example if limit_direction='forward' then preserve_nans will
- # contain indices of NaNs at the beginning of the series, and NaNs that
- # are more than 'limit' away from the prior non-NaN.
- # set preserve_nans based on direction using _interp_limit
- preserve_nans: list | set
- if limit_direction == "forward":
- preserve_nans = start_nans | set(_interp_limit(invalid, limit, 0))
- elif limit_direction == "backward":
- preserve_nans = end_nans | set(_interp_limit(invalid, 0, limit))
- else:
- # both directions... just use _interp_limit
- preserve_nans = set(_interp_limit(invalid, limit, limit))
- # if limit_area is set, add either mid or outside indices
- # to preserve_nans GH #16284
- if limit_area == "inside":
- # preserve NaNs on the outside
- preserve_nans |= start_nans | end_nans
- elif limit_area == "outside":
- # preserve NaNs on the inside
- mid_nans = all_nans - start_nans - end_nans
- preserve_nans |= mid_nans
- # sort preserve_nans and convert to list
- preserve_nans = sorted(preserve_nans)
- is_datetimelike = yvalues.dtype.kind in "mM"
- if is_datetimelike:
- yvalues = yvalues.view("i8")
- if method in NP_METHODS:
- # np.interp requires sorted X values, #21037
- indexer = np.argsort(indices[valid])
- yvalues[invalid] = np.interp(
- indices[invalid], indices[valid][indexer], yvalues[valid][indexer]
- )
- else:
- yvalues[invalid] = _interpolate_scipy_wrapper(
- indices[valid],
- yvalues[valid],
- indices[invalid],
- method=method,
- fill_value=fill_value,
- bounds_error=bounds_error,
- order=order,
- **kwargs,
- )
- if mask is not None:
- mask[:] = False
- mask[preserve_nans] = True
- elif is_datetimelike:
- yvalues[preserve_nans] = NaT.value
- else:
- yvalues[preserve_nans] = np.nan
- return
- def _interpolate_scipy_wrapper(
- x: np.ndarray,
- y: np.ndarray,
- new_x: np.ndarray,
- method: str,
- fill_value=None,
- bounds_error: bool = False,
- order=None,
- **kwargs,
- ):
- """
- Passed off to scipy.interpolate.interp1d. method is scipy's kind.
- Returns an array interpolated at new_x. Add any new methods to
- the list in _clean_interp_method.
- """
- extra = f"{method} interpolation requires SciPy."
- import_optional_dependency("scipy", extra=extra)
- from scipy import interpolate
- new_x = np.asarray(new_x)
- # ignores some kwargs that could be passed along.
- alt_methods = {
- "barycentric": interpolate.barycentric_interpolate,
- "krogh": interpolate.krogh_interpolate,
- "from_derivatives": _from_derivatives,
- "piecewise_polynomial": _from_derivatives,
- "cubicspline": _cubicspline_interpolate,
- "akima": _akima_interpolate,
- "pchip": interpolate.pchip_interpolate,
- }
- interp1d_methods = [
- "nearest",
- "zero",
- "slinear",
- "quadratic",
- "cubic",
- "polynomial",
- ]
- if method in interp1d_methods:
- if method == "polynomial":
- kind = order
- else:
- kind = method
- terp = interpolate.interp1d(
- x, y, kind=kind, fill_value=fill_value, bounds_error=bounds_error
- )
- new_y = terp(new_x)
- elif method == "spline":
- # GH #10633, #24014
- if isna(order) or (order <= 0):
- raise ValueError(
- f"order needs to be specified and greater than 0; got order: {order}"
- )
- terp = interpolate.UnivariateSpline(x, y, k=order, **kwargs)
- new_y = terp(new_x)
- else:
- # GH 7295: need to be able to write for some reason
- # in some circumstances: check all three
- if not x.flags.writeable:
- x = x.copy()
- if not y.flags.writeable:
- y = y.copy()
- if not new_x.flags.writeable:
- new_x = new_x.copy()
- terp = alt_methods[method]
- new_y = terp(x, y, new_x, **kwargs)
- return new_y
- def _from_derivatives(
- xi: np.ndarray,
- yi: np.ndarray,
- x: np.ndarray,
- order=None,
- der: int | list[int] | None = 0,
- extrapolate: bool = False,
- ):
- """
- Convenience function for interpolate.BPoly.from_derivatives.
- Construct a piecewise polynomial in the Bernstein basis, compatible
- with the specified values and derivatives at breakpoints.
- Parameters
- ----------
- xi : array-like
- sorted 1D array of x-coordinates
- yi : array-like or list of array-likes
- yi[i][j] is the j-th derivative known at xi[i]
- order: None or int or array-like of ints. Default: None.
- Specifies the degree of local polynomials. If not None, some
- derivatives are ignored.
- der : int or list
- How many derivatives to extract; None for all potentially nonzero
- derivatives (that is a number equal to the number of points), or a
- list of derivatives to extract. This number includes the function
- value as 0th derivative.
- extrapolate : bool, optional
- Whether to extrapolate to ouf-of-bounds points based on first and last
- intervals, or to return NaNs. Default: True.
- See Also
- --------
- scipy.interpolate.BPoly.from_derivatives
- Returns
- -------
- y : scalar or array-like
- The result, of length R or length M or M by R.
- """
- from scipy import interpolate
- # return the method for compat with scipy version & backwards compat
- method = interpolate.BPoly.from_derivatives
- m = method(xi, yi.reshape(-1, 1), orders=order, extrapolate=extrapolate)
- return m(x)
- def _akima_interpolate(
- xi: np.ndarray,
- yi: np.ndarray,
- x: np.ndarray,
- der: int | list[int] | None = 0,
- axis: AxisInt = 0,
- ):
- """
- Convenience function for akima interpolation.
- xi and yi are arrays of values used to approximate some function f,
- with ``yi = f(xi)``.
- See `Akima1DInterpolator` for details.
- Parameters
- ----------
- xi : np.ndarray
- A sorted list of x-coordinates, of length N.
- yi : np.ndarray
- A 1-D array of real values. `yi`'s length along the interpolation
- axis must be equal to the length of `xi`. If N-D array, use axis
- parameter to select correct axis.
- x : np.ndarray
- Of length M.
- der : int, optional
- How many derivatives to extract; None for all potentially
- nonzero derivatives (that is a number equal to the number
- of points), or a list of derivatives to extract. This number
- includes the function value as 0th derivative.
- axis : int, optional
- Axis in the yi array corresponding to the x-coordinate values.
- See Also
- --------
- scipy.interpolate.Akima1DInterpolator
- Returns
- -------
- y : scalar or array-like
- The result, of length R or length M or M by R,
- """
- from scipy import interpolate
- P = interpolate.Akima1DInterpolator(xi, yi, axis=axis)
- return P(x, nu=der)
- def _cubicspline_interpolate(
- xi: np.ndarray,
- yi: np.ndarray,
- x: np.ndarray,
- axis: AxisInt = 0,
- bc_type: str | tuple[Any, Any] = "not-a-knot",
- extrapolate=None,
- ):
- """
- Convenience function for cubic spline data interpolator.
- See `scipy.interpolate.CubicSpline` for details.
- Parameters
- ----------
- xi : np.ndarray, shape (n,)
- 1-d array containing values of the independent variable.
- Values must be real, finite and in strictly increasing order.
- yi : np.ndarray
- Array containing values of the dependent variable. It can have
- arbitrary number of dimensions, but the length along ``axis``
- (see below) must match the length of ``x``. Values must be finite.
- x : np.ndarray, shape (m,)
- axis : int, optional
- Axis along which `y` is assumed to be varying. Meaning that for
- ``x[i]`` the corresponding values are ``np.take(y, i, axis=axis)``.
- Default is 0.
- bc_type : string or 2-tuple, optional
- Boundary condition type. Two additional equations, given by the
- boundary conditions, are required to determine all coefficients of
- polynomials on each segment [2]_.
- If `bc_type` is a string, then the specified condition will be applied
- at both ends of a spline. Available conditions are:
- * 'not-a-knot' (default): The first and second segment at a curve end
- are the same polynomial. It is a good default when there is no
- information on boundary conditions.
- * 'periodic': The interpolated functions is assumed to be periodic
- of period ``x[-1] - x[0]``. The first and last value of `y` must be
- identical: ``y[0] == y[-1]``. This boundary condition will result in
- ``y'[0] == y'[-1]`` and ``y''[0] == y''[-1]``.
- * 'clamped': The first derivative at curves ends are zero. Assuming
- a 1D `y`, ``bc_type=((1, 0.0), (1, 0.0))`` is the same condition.
- * 'natural': The second derivative at curve ends are zero. Assuming
- a 1D `y`, ``bc_type=((2, 0.0), (2, 0.0))`` is the same condition.
- If `bc_type` is a 2-tuple, the first and the second value will be
- applied at the curve start and end respectively. The tuple values can
- be one of the previously mentioned strings (except 'periodic') or a
- tuple `(order, deriv_values)` allowing to specify arbitrary
- derivatives at curve ends:
- * `order`: the derivative order, 1 or 2.
- * `deriv_value`: array-like containing derivative values, shape must
- be the same as `y`, excluding ``axis`` dimension. For example, if
- `y` is 1D, then `deriv_value` must be a scalar. If `y` is 3D with
- the shape (n0, n1, n2) and axis=2, then `deriv_value` must be 2D
- and have the shape (n0, n1).
- extrapolate : {bool, 'periodic', None}, optional
- If bool, determines whether to extrapolate to out-of-bounds points
- based on first and last intervals, or to return NaNs. If 'periodic',
- periodic extrapolation is used. If None (default), ``extrapolate`` is
- set to 'periodic' for ``bc_type='periodic'`` and to True otherwise.
- See Also
- --------
- scipy.interpolate.CubicHermiteSpline
- Returns
- -------
- y : scalar or array-like
- The result, of shape (m,)
- References
- ----------
- .. [1] `Cubic Spline Interpolation
- <https://en.wikiversity.org/wiki/Cubic_Spline_Interpolation>`_
- on Wikiversity.
- .. [2] Carl de Boor, "A Practical Guide to Splines", Springer-Verlag, 1978.
- """
- from scipy import interpolate
- P = interpolate.CubicSpline(
- xi, yi, axis=axis, bc_type=bc_type, extrapolate=extrapolate
- )
- return P(x)
- def _interpolate_with_limit_area(
- values: np.ndarray,
- method: Literal["pad", "backfill"],
- limit: int | None,
- limit_area: Literal["inside", "outside"],
- ) -> None:
- """
- Apply interpolation and limit_area logic to values along a to-be-specified axis.
- Parameters
- ----------
- values: np.ndarray
- Input array.
- method: str
- Interpolation method. Could be "bfill" or "pad"
- limit: int, optional
- Index limit on interpolation.
- limit_area: {'inside', 'outside'}
- Limit area for interpolation.
- Notes
- -----
- Modifies values in-place.
- """
- invalid = isna(values)
- is_valid = ~invalid
- if not invalid.all():
- first = find_valid_index(how="first", is_valid=is_valid)
- if first is None:
- first = 0
- last = find_valid_index(how="last", is_valid=is_valid)
- if last is None:
- last = len(values)
- pad_or_backfill_inplace(
- values,
- method=method,
- limit=limit,
- limit_area=limit_area,
- )
- if limit_area == "inside":
- invalid[first : last + 1] = False
- elif limit_area == "outside":
- invalid[:first] = invalid[last + 1 :] = False
- else:
- raise ValueError("limit_area should be 'inside' or 'outside'")
- values[invalid] = np.nan
- def pad_or_backfill_inplace(
- values: np.ndarray,
- method: Literal["pad", "backfill"] = "pad",
- axis: AxisInt = 0,
- limit: int | None = None,
- limit_area: Literal["inside", "outside"] | None = None,
- ) -> None:
- """
- Perform an actual interpolation of values, values will be make 2-d if
- needed fills inplace, returns the result.
- Parameters
- ----------
- values: np.ndarray
- Input array.
- method: str, default "pad"
- Interpolation method. Could be "bfill" or "pad"
- axis: 0 or 1
- Interpolation axis
- limit: int, optional
- Index limit on interpolation.
- limit_area: str, optional
- Limit area for interpolation. Can be "inside" or "outside"
- Notes
- -----
- Modifies values in-place.
- """
- transf = (lambda x: x) if axis == 0 else (lambda x: x.T)
- # reshape a 1 dim if needed
- if values.ndim == 1:
- if axis != 0: # pragma: no cover
- raise AssertionError("cannot interpolate on a ndim == 1 with axis != 0")
- values = values.reshape(tuple((1,) + values.shape))
- method = clean_fill_method(method)
- tvalues = transf(values)
- func = get_fill_func(method, ndim=2)
- # _pad_2d and _backfill_2d both modify tvalues inplace
- func(tvalues, limit=limit, limit_area=limit_area)
- def _fillna_prep(
- values, mask: npt.NDArray[np.bool_] | None = None
- ) -> npt.NDArray[np.bool_]:
- # boilerplate for _pad_1d, _backfill_1d, _pad_2d, _backfill_2d
- if mask is None:
- mask = isna(values)
- return mask
- def _datetimelike_compat(func: F) -> F:
- """
- Wrapper to handle datetime64 and timedelta64 dtypes.
- """
- @wraps(func)
- def new_func(
- values,
- limit: int | None = None,
- limit_area: Literal["inside", "outside"] | None = None,
- mask=None,
- ):
- if needs_i8_conversion(values.dtype):
- if mask is None:
- # This needs to occur before casting to int64
- mask = isna(values)
- result, mask = func(
- values.view("i8"), limit=limit, limit_area=limit_area, mask=mask
- )
- return result.view(values.dtype), mask
- return func(values, limit=limit, limit_area=limit_area, mask=mask)
- return cast(F, new_func)
- @_datetimelike_compat
- def _pad_1d(
- values: np.ndarray,
- limit: int | None = None,
- limit_area: Literal["inside", "outside"] | None = None,
- mask: npt.NDArray[np.bool_] | None = None,
- ) -> tuple[np.ndarray, npt.NDArray[np.bool_]]:
- mask = _fillna_prep(values, mask)
- if limit_area is not None and not mask.all():
- _fill_limit_area_1d(mask, limit_area)
- algos.pad_inplace(values, mask, limit=limit)
- return values, mask
- @_datetimelike_compat
- def _backfill_1d(
- values: np.ndarray,
- limit: int | None = None,
- limit_area: Literal["inside", "outside"] | None = None,
- mask: npt.NDArray[np.bool_] | None = None,
- ) -> tuple[np.ndarray, npt.NDArray[np.bool_]]:
- mask = _fillna_prep(values, mask)
- if limit_area is not None and not mask.all():
- _fill_limit_area_1d(mask, limit_area)
- algos.backfill_inplace(values, mask, limit=limit)
- return values, mask
- @_datetimelike_compat
- def _pad_2d(
- values: np.ndarray,
- limit: int | None = None,
- limit_area: Literal["inside", "outside"] | None = None,
- mask: npt.NDArray[np.bool_] | None = None,
- ):
- mask = _fillna_prep(values, mask)
- if limit_area is not None:
- _fill_limit_area_2d(mask, limit_area)
- if values.size:
- algos.pad_2d_inplace(values, mask, limit=limit)
- else:
- # for test coverage
- pass
- return values, mask
- @_datetimelike_compat
- def _backfill_2d(
- values,
- limit: int | None = None,
- limit_area: Literal["inside", "outside"] | None = None,
- mask: npt.NDArray[np.bool_] | None = None,
- ):
- mask = _fillna_prep(values, mask)
- if limit_area is not None:
- _fill_limit_area_2d(mask, limit_area)
- if values.size:
- algos.backfill_2d_inplace(values, mask, limit=limit)
- else:
- # for test coverage
- pass
- return values, mask
- def _fill_limit_area_1d(
- mask: npt.NDArray[np.bool_], limit_area: Literal["outside", "inside"]
- ) -> None:
- """Prepare 1d mask for ffill/bfill with limit_area.
- Caller is responsible for checking at least one value of mask is False.
- When called, mask will no longer faithfully represent when
- the corresponding are NA or not.
- Parameters
- ----------
- mask : np.ndarray[bool, ndim=1]
- Mask representing NA values when filling.
- limit_area : { "outside", "inside" }
- Whether to limit filling to outside or inside the outer most non-NA value.
- """
- neg_mask = ~mask
- first = neg_mask.argmax()
- last = len(neg_mask) - neg_mask[::-1].argmax() - 1
- if limit_area == "inside":
- mask[:first] = False
- mask[last + 1 :] = False
- elif limit_area == "outside":
- mask[first + 1 : last] = False
- def _fill_limit_area_2d(
- mask: npt.NDArray[np.bool_], limit_area: Literal["outside", "inside"]
- ) -> None:
- """Prepare 2d mask for ffill/bfill with limit_area.
- When called, mask will no longer faithfully represent when
- the corresponding are NA or not.
- Parameters
- ----------
- mask : np.ndarray[bool, ndim=1]
- Mask representing NA values when filling.
- limit_area : { "outside", "inside" }
- Whether to limit filling to outside or inside the outer most non-NA value.
- """
- neg_mask = ~mask.T
- if limit_area == "outside":
- # Identify inside
- la_mask = (
- np.maximum.accumulate(neg_mask, axis=0)
- & np.maximum.accumulate(neg_mask[::-1], axis=0)[::-1]
- )
- else:
- # Identify outside
- la_mask = (
- ~np.maximum.accumulate(neg_mask, axis=0)
- | ~np.maximum.accumulate(neg_mask[::-1], axis=0)[::-1]
- )
- mask[la_mask.T] = False
- _fill_methods = {"pad": _pad_1d, "backfill": _backfill_1d}
- def get_fill_func(method, ndim: int = 1):
- method = clean_fill_method(method)
- if ndim == 1:
- return _fill_methods[method]
- return {"pad": _pad_2d, "backfill": _backfill_2d}[method]
- def clean_reindex_fill_method(method) -> ReindexMethod | None:
- if method is None:
- return None
- return clean_fill_method(method, allow_nearest=True)
- def _interp_limit(
- invalid: npt.NDArray[np.bool_], fw_limit: int | None, bw_limit: int | None
- ):
- """
- Get indexers of values that won't be filled
- because they exceed the limits.
- Parameters
- ----------
- invalid : np.ndarray[bool]
- fw_limit : int or None
- forward limit to index
- bw_limit : int or None
- backward limit to index
- Returns
- -------
- set of indexers
- Notes
- -----
- This is equivalent to the more readable, but slower
- .. code-block:: python
- def _interp_limit(invalid, fw_limit, bw_limit):
- for x in np.where(invalid)[0]:
- if invalid[max(0, x - fw_limit):x + bw_limit + 1].all():
- yield x
- """
- # handle forward first; the backward direction is the same except
- # 1. operate on the reversed array
- # 2. subtract the returned indices from N - 1
- N = len(invalid)
- f_idx = set()
- b_idx = set()
- def inner(invalid, limit: int):
- limit = min(limit, N)
- windowed = _rolling_window(invalid, limit + 1).all(1)
- idx = set(np.where(windowed)[0] + limit) | set(
- np.where((~invalid[: limit + 1]).cumsum() == 0)[0]
- )
- return idx
- if fw_limit is not None:
- if fw_limit == 0:
- f_idx = set(np.where(invalid)[0])
- else:
- f_idx = inner(invalid, fw_limit)
- if bw_limit is not None:
- if bw_limit == 0:
- # then we don't even need to care about backwards
- # just use forwards
- return f_idx
- else:
- b_idx_inv = list(inner(invalid[::-1], bw_limit))
- b_idx = set(N - 1 - np.asarray(b_idx_inv))
- if fw_limit == 0:
- return b_idx
- return f_idx & b_idx
- def _rolling_window(a: npt.NDArray[np.bool_], window: int) -> npt.NDArray[np.bool_]:
- """
- [True, True, False, True, False], 2 ->
- [
- [True, True],
- [True, False],
- [False, True],
- [True, False],
- ]
- """
- # https://stackoverflow.com/a/6811241
- shape = a.shape[:-1] + (a.shape[-1] - window + 1, window)
- strides = a.strides + (a.strides[-1],)
- return np.lib.stride_tricks.as_strided(a, shape=shape, strides=strides)
|