Skip to content

Module hetmatpy.pipeline

None

None

View Source
import itertools

import numpy

import pandas

import scipy.special

import scipy.stats

import hetmatpy.degree_group

import hetmatpy.degree_weight

import hetmatpy.hetmat

FLOAT_ERROR_TOLERANCE = 1e-5

def sd_is_positive(sd):

    """

    Tests whether the standard deviation is greater than zero or if it is

    zero/NaN/None

    """

    return pandas.notna(sd) and sd > 0

def calculate_sd(sum_of_squares, unsquared_sum, number_nonzero):

    """

    Calculate the standard deviation and validate the incoming data

    """

    if number_nonzero < 2:

        return None

    squared_deviations = sum_of_squares - unsquared_sum**2 / number_nonzero

    # If all the values in the row are the same we'll manually return zero,

    # because not doing so can lead to some issues with float imprecision

    # The true value of the squared deviation will always be >= zero,

    # but float error may bring it below zero

    if abs(squared_deviations) < FLOAT_ERROR_TOLERANCE:

        return 0.0

    else:

        return (squared_deviations / (number_nonzero - 1)) ** 0.5

def add_gamma_hurdle_to_dgp_df(dgp_df):

    """

    Edit a degree-grouped permutation dataframe to include gamma-hurdle

    distribution parameters.

    """

    # Validate dgp_df

    if not isinstance(dgp_df, pandas.DataFrame):

        raise ValueError(

            "add_gamma_hurdle_to_dgp_df: dgp_df must be a pandas.DataFrame"

        )

    missing = {"nnz", "sum", "sum_of_squares"} - set(dgp_df.columns)

    if missing:

        raise ValueError(

            "add_gamma_hurdle_to_dgp_df: "

            "dgp_df missing the following required columns: " + ", ".join(missing)

        )

    # Compute gamma-hurdle parameters

    # to_numeric prevents ZeroDivisionError when nnz is an column with object dtype

    # https://github.com/pandas-dev/pandas/issues/46292

    dgp_df["mean_nz"] = dgp_df["sum"] / pandas.to_numeric(dgp_df["nnz"])

    dgp_df["sd_nz"] = dgp_df[["sum_of_squares", "sum", "nnz"]].apply(

        lambda row: calculate_sd(*row), raw=True, axis=1

    )

    dgp_df["beta"] = (

        dgp_df["mean_nz"] / pandas.to_numeric(dgp_df["sd_nz"] ** 2)

    ).replace(numpy.inf, numpy.nan)

    dgp_df["alpha"] = dgp_df["mean_nz"] * dgp_df["beta"]

    return dgp_df

def calculate_gamma_hurdle_p_value(row):

    """

    Use the gamma hurdle model to calculate the p_value for a metapath.

    If beta and alpha gamma-hurdle parameters are missing, calculate them

    and add them to row.

    """

    if "beta" not in row:

        row["beta"] = row["mean_nz"] / row["sd_nz"] ** 2

    if numpy.isinf(row["beta"]):

        row["beta"] = numpy.nan

    if "alpha" not in row:

        row["alpha"] = row["mean_nz"] * row["beta"]

    return (

        row["nnz"]

        / row["n"]

        * scipy.special.gammaincc(row["alpha"], row["beta"] * row["dwpc"])

    )

def path_does_not_exist(row):

    """

    Check whether any paths exist between the source and target. We know there

    isn't a path if the row has a zero path count, or has a zero dwpc if the path

    count isn't present in the row

    """

    if "path_count" in row:

        return row["path_count"] == 0

    return row["dwpc"] == 0

def calculate_empirical_p_value(row):

    """

    Calculate p_value in cases where the gamma hurdle model won't work

    """

    if path_does_not_exist(row):

        # No paths exist between the given source and target nodes

        return 1.0

    if row["nnz"] == 0:

        # No nonzero DWPCs are found in the permuted network, but paths are

        # observed in the true network

        return 0.0

    if not sd_is_positive(row["sd_nz"]):

        # The DWPCs in the permuted network are identical

        if row["dwpc"] <= row["mean_nz"] + FLOAT_ERROR_TOLERANCE:

            # The DWPC you found in the true network is smaller than or equal

            # to those in the permuted network

            return row["nnz"] / row["n"]

        # The DWPC you found in the true network is larger than those in the

        # permuted network

        return 0.0

    raise NotImplementedError

def calculate_p_value(row):

    """

    Calculate the p_value for a given metapath

    """

    if row["nnz"] == 0 or path_does_not_exist(row) or not sd_is_positive(row["sd_nz"]):

        return calculate_empirical_p_value(row)

    else:

        return calculate_gamma_hurdle_p_value(row)

def combine_dwpc_dgp(graph, metapath, damping, ignore_zeros=False, max_p_value=1.0):

    """

    Combine DWPC information with degree-grouped permutation summary metrics.

    Includes gamma-hurdle significance estimates.

    """

    stats_path = graph.get_running_degree_group_path(

        metapath, "dwpc", damping, extension=".tsv.gz"

    )

    dgp_df = pandas.read_csv(stats_path, sep="\t")

    dgp_df = add_gamma_hurdle_to_dgp_df(dgp_df)

    degrees_to_dgp = dgp_df.set_index(["source_degree", "target_degree"]).to_dict(

        orient="index"

    )

    dwpc_row_generator = hetmatpy.degree_group.dwpc_to_degrees(

        graph, metapath, damping=damping, ignore_zeros=ignore_zeros

    )

    for row in dwpc_row_generator:

        degrees = row["source_degree"], row["target_degree"]

        dgp = degrees_to_dgp[degrees]

        row.update(dgp)

        row["p_value"] = calculate_p_value(row)

        if row["p_value"] is not None and row["p_value"] > max_p_value:

            continue

        for key in ["sum", "sum_of_squares", "beta", "alpha"]:

            del row[key]

        yield row

def grouper(iterable, group_size):

    """

    Group an iterable into chunks of group_size.

    Derived from https://stackoverflow.com/a/8998040/4651668

    """

    iterable = iter(iterable)

    while True:

        chunk = itertools.islice(iterable, group_size)

        try:

            head = (next(chunk),)

        except StopIteration:

            break

        yield itertools.chain(head, chunk)

def grouped_tsv_writer(

    row_generator, path, group_size=20_000, sep="\t", index=False, **kwargs

):

    """

    Write an iterable of dictionaries to a TSV, where each dictionary is a row.

    Uses pandas (extra keyword arguments are passed to DataFrame.to_csv) to

    write the TSV, enabling using pandas to write a generated rows that are too

    plentiful to fit in memory.

    """

    chunks = grouper(row_generator, group_size=group_size)

    for i, chunk in enumerate(chunks):

        df = pandas.DataFrame.from_records(chunk)

        kwargs["header"] = not bool(i)

        kwargs["mode"] = "a" if i else "w"

        df.to_csv(path, sep=sep, index=index, **kwargs)

Variables

FLOAT_ERROR_TOLERANCE

Functions

add_gamma_hurdle_to_dgp_df

def add_gamma_hurdle_to_dgp_df(
    dgp_df
)

Edit a degree-grouped permutation dataframe to include gamma-hurdle

distribution parameters.

View Source
def add_gamma_hurdle_to_dgp_df(dgp_df):

    """

    Edit a degree-grouped permutation dataframe to include gamma-hurdle

    distribution parameters.

    """

    # Validate dgp_df

    if not isinstance(dgp_df, pandas.DataFrame):

        raise ValueError(

            "add_gamma_hurdle_to_dgp_df: dgp_df must be a pandas.DataFrame"

        )

    missing = {"nnz", "sum", "sum_of_squares"} - set(dgp_df.columns)

    if missing:

        raise ValueError(

            "add_gamma_hurdle_to_dgp_df: "

            "dgp_df missing the following required columns: " + ", ".join(missing)

        )

    # Compute gamma-hurdle parameters

    # to_numeric prevents ZeroDivisionError when nnz is an column with object dtype

    # https://github.com/pandas-dev/pandas/issues/46292

    dgp_df["mean_nz"] = dgp_df["sum"] / pandas.to_numeric(dgp_df["nnz"])

    dgp_df["sd_nz"] = dgp_df[["sum_of_squares", "sum", "nnz"]].apply(

        lambda row: calculate_sd(*row), raw=True, axis=1

    )

    dgp_df["beta"] = (

        dgp_df["mean_nz"] / pandas.to_numeric(dgp_df["sd_nz"] ** 2)

    ).replace(numpy.inf, numpy.nan)

    dgp_df["alpha"] = dgp_df["mean_nz"] * dgp_df["beta"]

    return dgp_df

calculate_empirical_p_value

def calculate_empirical_p_value(
    row
)

Calculate p_value in cases where the gamma hurdle model won't work

View Source
def calculate_empirical_p_value(row):

    """

    Calculate p_value in cases where the gamma hurdle model won't work

    """

    if path_does_not_exist(row):

        # No paths exist between the given source and target nodes

        return 1.0

    if row["nnz"] == 0:

        # No nonzero DWPCs are found in the permuted network, but paths are

        # observed in the true network

        return 0.0

    if not sd_is_positive(row["sd_nz"]):

        # The DWPCs in the permuted network are identical

        if row["dwpc"] <= row["mean_nz"] + FLOAT_ERROR_TOLERANCE:

            # The DWPC you found in the true network is smaller than or equal

            # to those in the permuted network

            return row["nnz"] / row["n"]

        # The DWPC you found in the true network is larger than those in the

        # permuted network

        return 0.0

    raise NotImplementedError

calculate_gamma_hurdle_p_value

def calculate_gamma_hurdle_p_value(
    row
)

Use the gamma hurdle model to calculate the p_value for a metapath.

If beta and alpha gamma-hurdle parameters are missing, calculate them and add them to row.

View Source
def calculate_gamma_hurdle_p_value(row):

    """

    Use the gamma hurdle model to calculate the p_value for a metapath.

    If beta and alpha gamma-hurdle parameters are missing, calculate them

    and add them to row.

    """

    if "beta" not in row:

        row["beta"] = row["mean_nz"] / row["sd_nz"] ** 2

    if numpy.isinf(row["beta"]):

        row["beta"] = numpy.nan

    if "alpha" not in row:

        row["alpha"] = row["mean_nz"] * row["beta"]

    return (

        row["nnz"]

        / row["n"]

        * scipy.special.gammaincc(row["alpha"], row["beta"] * row["dwpc"])

    )

calculate_p_value

def calculate_p_value(
    row
)

Calculate the p_value for a given metapath

View Source
def calculate_p_value(row):

    """

    Calculate the p_value for a given metapath

    """

    if row["nnz"] == 0 or path_does_not_exist(row) or not sd_is_positive(row["sd_nz"]):

        return calculate_empirical_p_value(row)

    else:

        return calculate_gamma_hurdle_p_value(row)

calculate_sd

def calculate_sd(
    sum_of_squares,
    unsquared_sum,
    number_nonzero
)

Calculate the standard deviation and validate the incoming data

View Source
def calculate_sd(sum_of_squares, unsquared_sum, number_nonzero):

    """

    Calculate the standard deviation and validate the incoming data

    """

    if number_nonzero < 2:

        return None

    squared_deviations = sum_of_squares - unsquared_sum**2 / number_nonzero

    # If all the values in the row are the same we'll manually return zero,

    # because not doing so can lead to some issues with float imprecision

    # The true value of the squared deviation will always be >= zero,

    # but float error may bring it below zero

    if abs(squared_deviations) < FLOAT_ERROR_TOLERANCE:

        return 0.0

    else:

        return (squared_deviations / (number_nonzero - 1)) ** 0.5

combine_dwpc_dgp

def combine_dwpc_dgp(
    graph,
    metapath,
    damping,
    ignore_zeros=False,
    max_p_value=1.0
)

Combine DWPC information with degree-grouped permutation summary metrics.

Includes gamma-hurdle significance estimates.

View Source
def combine_dwpc_dgp(graph, metapath, damping, ignore_zeros=False, max_p_value=1.0):

    """

    Combine DWPC information with degree-grouped permutation summary metrics.

    Includes gamma-hurdle significance estimates.

    """

    stats_path = graph.get_running_degree_group_path(

        metapath, "dwpc", damping, extension=".tsv.gz"

    )

    dgp_df = pandas.read_csv(stats_path, sep="\t")

    dgp_df = add_gamma_hurdle_to_dgp_df(dgp_df)

    degrees_to_dgp = dgp_df.set_index(["source_degree", "target_degree"]).to_dict(

        orient="index"

    )

    dwpc_row_generator = hetmatpy.degree_group.dwpc_to_degrees(

        graph, metapath, damping=damping, ignore_zeros=ignore_zeros

    )

    for row in dwpc_row_generator:

        degrees = row["source_degree"], row["target_degree"]

        dgp = degrees_to_dgp[degrees]

        row.update(dgp)

        row["p_value"] = calculate_p_value(row)

        if row["p_value"] is not None and row["p_value"] > max_p_value:

            continue

        for key in ["sum", "sum_of_squares", "beta", "alpha"]:

            del row[key]

        yield row

grouped_tsv_writer

def grouped_tsv_writer(
    row_generator,
    path,
    group_size=20000,
    sep='\t',
    index=False,
    **kwargs
)

Write an iterable of dictionaries to a TSV, where each dictionary is a row.

Uses pandas (extra keyword arguments are passed to DataFrame.to_csv) to write the TSV, enabling using pandas to write a generated rows that are too plentiful to fit in memory.

View Source
def grouped_tsv_writer(

    row_generator, path, group_size=20_000, sep="\t", index=False, **kwargs

):

    """

    Write an iterable of dictionaries to a TSV, where each dictionary is a row.

    Uses pandas (extra keyword arguments are passed to DataFrame.to_csv) to

    write the TSV, enabling using pandas to write a generated rows that are too

    plentiful to fit in memory.

    """

    chunks = grouper(row_generator, group_size=group_size)

    for i, chunk in enumerate(chunks):

        df = pandas.DataFrame.from_records(chunk)

        kwargs["header"] = not bool(i)

        kwargs["mode"] = "a" if i else "w"

        df.to_csv(path, sep=sep, index=index, **kwargs)

grouper

def grouper(
    iterable,
    group_size
)

Group an iterable into chunks of group_size.

Derived from https://stackoverflow.com/a/8998040/4651668

View Source
def grouper(iterable, group_size):

    """

    Group an iterable into chunks of group_size.

    Derived from https://stackoverflow.com/a/8998040/4651668

    """

    iterable = iter(iterable)

    while True:

        chunk = itertools.islice(iterable, group_size)

        try:

            head = (next(chunk),)

        except StopIteration:

            break

        yield itertools.chain(head, chunk)

path_does_not_exist

def path_does_not_exist(
    row
)

Check whether any paths exist between the source and target. We know there

isn't a path if the row has a zero path count, or has a zero dwpc if the path count isn't present in the row

View Source
def path_does_not_exist(row):

    """

    Check whether any paths exist between the source and target. We know there

    isn't a path if the row has a zero path count, or has a zero dwpc if the path

    count isn't present in the row

    """

    if "path_count" in row:

        return row["path_count"] == 0

    return row["dwpc"] == 0

sd_is_positive

def sd_is_positive(
    sd
)

Tests whether the standard deviation is greater than zero or if it is

zero/NaN/None

View Source
def sd_is_positive(sd):

    """

    Tests whether the standard deviation is greater than zero or if it is

    zero/NaN/None

    """

    return pandas.notna(sd) and sd > 0