# Copyright (c) 2022 Darren Erik Vengroff
from typing import Any, Iterable, Mapping, Optional, Tuple, Union
import numpy as np
import pandas as pd
import pandas.api.types
def _diversity_of_df(
df_communities: pd.DataFrame,
) -> Tuple[pd.Series, pd.Series]:
"""
Compute the diversity of each row of a dataframe.
Each row is assumed to be an independent community.
Parameters
----------
df_communities
The DataFrame
Returns
-------
A tuple of a series containing the diversity metric for each
row and a series of the total population of each row.
"""
# Compute the total population of each row, as
# a series.
s_total_population = df_communities.sum(axis="columns", numeric_only=True)
# Compute the fraction of the population in each
# group. This is also known as p.
df_p = df_communities.div(s_total_population, axis="rows")
# Now let q = 1 - p and compute pq.
df_pq = df_p * (1 - df_p)
s_diversity = df_pq.sum(axis=1)
s_diversity.name = "diversity"
return s_diversity, s_total_population
[docs]def diversity(
communities: Union[pd.DataFrame, Iterable[float]]
) -> Union[pd.Series, float]:
"""
Compute the diversity of one or more communities.
Parameters
----------
communities
The communities. This is either an iterable over
the population of each group in the community or,
more commonly, a :py:class:`~pd.DataFrame` with
each row representing a community and each column
representing a group.
Returns
-------
The diversity of the community or, if passed a
:py:class:`~pd.DataFrame` then a :py:class:`~pd.Series`
with one entry for the diversity of each community.
"""
if isinstance(communities, pd.DataFrame):
return _diversity_of_df(communities)[0]
else:
return _diversity_of_df(pd.DataFrame([communities]))[0][0]
def _drop_non_numeric_except(
df: pd.DataFrame,
by: Optional[Any],
over: Optional[Any],
) -> pd.DataFrame:
"""
A helper function to drop non-numeric columns that
may be present, e.g. for different political boundaries
that are not needed for the current diversity or integration
calculation.
Parameters
----------
df
The original dataframe
by
What we are grouping by. This column will not be removed.
over
What we are computing inner diversity over. This column
will not be removed.
Returns
-------
The data with non-numeric columns other than `by` and
`over` removed.
"""
drop_cols = [
col
for col in df.columns
if (
not pandas.api.types.is_numeric_dtype(df[col]) and col != by and col != over
)
]
if len(drop_cols) > 0:
return df.drop(drop_cols, axis="columns")
return df
[docs]def integration(
df_communities: pd.DataFrame,
by=None,
over=None,
*,
drop_non_numeric: bool = False,
) -> pd.DataFrame:
"""
Compute the integration of one of more communities
over a nested level of population aggregation. For
example, with US census data we might compute integration
of block groups over blocks.
Parameters
----------
df_communities
A :py:class:`pd.DataFrame` of communities.
by
The column or index to group by in order to
partition the rows into communities.
over
The column to group by in order to partition
the rows of each community into smaller
aggregation units where the base diversity will
be computed. If `None` then each row is assumed
to represent a different community.
drop_non_numeric
If `True`, then any non-numeric column other than
those specified by `by` and `over` will be
implicitly dropped. This is useful if there are columns
naming other levels of geographic aggregation that
should be ignored.
Returns
-------
A :py:class:`~pd.Series` containing the integration of
each community.
"""
def integration_of_group(df_group: pd.DataFrame) -> float:
"""
A helper method to compute the integration of each group.
"""
if over is not None:
df_group = df_group.groupby(over).sum(numeric_only=True)
s_div, s_total = _diversity_of_df(df_group)
# If there is no population, then there is no integration.
if s_total.sum() == 0:
return 0.0
return np.average(s_div, weights=s_total)
if drop_non_numeric:
df_communities = _drop_non_numeric_except(df_communities, by, over)
df_integration = pd.DataFrame(
df_communities.groupby(by=by).apply(integration_of_group),
columns=["integration"],
)
return df_integration
[docs]def segregation(
df_communities: pd.DataFrame,
by=None,
over=None,
*,
drop_non_numeric: bool = False,
) -> pd.DataFrame:
"""
Compute the segregation of one of more communities
over a nested level of population aggregation. For
example, with US census data we might compute integration
of block groups over blocks.
Parameters
----------
df_communities
A :py:class:`pd.DataFrame` of communities.
by
The column or index to group by in order to
partition the rows into communities.
over
The column to group by in order to partition
the rows of each community into smaller
aggregation units where the base diversity will
be computed. If `None` then each row is assumed
to represent a different community.
drop_non_numeric
If `True`, then any non-numeric column other than
those specified by `by` and `over` will be
implicitly dropped. This is useful if there are columns
naming other levels of geographic aggregation that
should be ignored.
Returns
-------
A :py:class:`~pd.Series` containing the segregation of
each community.
"""
df_segregation = 1.0 - integration(
df_communities, by=by, over=over, drop_non_numeric=drop_non_numeric
)
df_segregation.columns = ["segregation"]
return df_segregation
[docs]def di(
df_communities: pd.DataFrame,
by=None,
over=None,
*,
add_segregation: bool = False,
drop_non_numeric: bool = False,
) -> pd.DataFrame:
"""
Compute the diversity, integration, and optionally
the segregation of each of a collection of communities.
Parameters
----------
df_communities
A :py:class:`pd.DataFrame` of communities.
by
The column or index to group by in order to
partition the rows into communities.
over
The column to group by in order to partition
the rows of each community into smaller
aggregation units where the base diversity will
be computed. If `None` then each row is assumed
to represent a different community.
add_segregation
if `True` add a column to the results for
segregation.
drop_non_numeric
If `True`, then any non-numeric column other than
those specified by `by` and `over` will be
implicitly dropped. This is useful if there are columns
naming other levels of geographic aggregation that
should be ignored.
Returns
-------
A :py:class:`~pd.Series` containing the diversity,
integration, and optionally the segregation of
each community.
"""
if drop_non_numeric:
df_communities = _drop_non_numeric_except(df_communities, by, over)
if over is not None:
df_sum_by = (
df_communities.drop(over, axis="columns")
.groupby(by=by)
.sum(numeric_only=True)
)
else:
df_sum_by = df_communities.groupby(by=by).sum(numeric_only=True)
df_diversity = diversity(df_sum_by)
df_integration = integration(df_communities, by=by, over=over)
df_di = pd.concat([df_diversity, df_integration], axis=1)
if add_segregation:
df_di["segregation"] = 1.0 - df_di["integration"]
return df_di
[docs]def dissimilarity(
df_communities: pd.DataFrame,
reference: Union[pd.DataFrame, Mapping[str, Union[int, float]]],
) -> pd.Series:
"""
Compute the dissimilarity index of one or more communities relative to a reference community.
If you want compute dissimilarity or similarity many times against a
common reference, then creating at :py:class:`~SimularityReference`
is a more efficient option.
Parameters
----------
df_communities
The communities. This is a :py:class:`~pd.DataFrame` with
each row representing a community and each column
representing a group.
reference
The reference community. It should be a single row with
a column with the reference population of each group.
Returns
-------
The dissimilarity index of the each community relative to the reference
community.
"""
return SimilarityReference(reference).dissimilarity(df_communities)
[docs]def similarity(
df_communities: pd.DataFrame,
reference: Union[pd.DataFrame, Mapping[str, Union[int, float]]],
) -> pd.Series:
"""
Compute the similarity index of one or more communities relative to a reference community.
Note that similarity is just one minus dissimilarity.
If you want compute dissimilarity or similarity many times against a
common reference, then creating at :py:class:`~SimularityReference`
is a more efficient option.
Parameters
----------
df_communities
The communities. This is a :py:class:`~pd.DataFrame` with
each row representing a community and each column
representing a group.
reference
The reference community. It should be a single row with
a column with the reference population of each group.
Returns
-------
The similarity of the each community relative to the reference
community.
"""
return SimilarityReference(reference).similarity(df_communities)
[docs]class SimilarityReference:
"""
An object that computes dissimilarty from a reference.
Parameters
----------
reference
The reference community. It should be a mapping from
name to count or a dataframe with a single row with
a column with the reference population of each group.
"""
def __init__(
self,
reference: Union[pd.DataFrame, Mapping[str, Union[int, float]]],
):
if isinstance(reference, pd.DataFrame):
self._df_reference = reference
else:
self._df_reference = pd.DataFrame([reference])
if len(self._df_reference.index) != 1:
raise ValueError("Reference community should have a single row.")
self._reference_total = self._df_reference.sum(axis="columns").iloc[0]
self._df_reference_fractions = self._df_reference / self._reference_total
[docs] def dissimilarity(
self,
df_communities: pd.DataFrame,
) -> pd.Series:
"""
Compute the dissimilarity index of one or more communities
relative to a reference community.
Parameters
----------
df_communities
The communities. This is a :py:class:`~pd.DataFrame` with
each row representing a community and each column
representing a group.
Returns
-------
The dissimilarity index of the each community relative to the reference
community.
"""
community_totals = df_communities.sum(axis="columns")
df_community_fractions = df_communities.div(community_totals, axis="rows")
df_abs_differences = abs(
df_community_fractions.sub(self._df_reference_fractions.iloc[0])
)
df_dissimilarity_index = 0.5 * df_abs_differences.sum(axis="columns")
return df_dissimilarity_index
[docs] def similarity(
self,
df_communities: pd.DataFrame,
) -> pd.Series:
"""
Compute the representation index of one or more communities.
If `sim_ref` is a `SimilarityReference`, then `sim_ref.similarity(communities)-
is equal to `1.0 - sim_ref.similarity(communities)`.
Parameters
----------
df_communities
The communities. This is a :py:class:`~pd.DataFrame` with
each row representing a community and each column
representing a group.
Returns
-------
The dissimilarity index of the each community relative to the reference
community.
"""
return 1.0 - self.dissimilarity(df_communities)
def likelihood_populationfrac_product(
df_grouped: pd.DataFrame, likelihood: pd.Series, by: str, group_name: str
) -> pd.Series:
"""
Helper function for isolation and exposure to multiply the likelihood series
and the population fraction.
Parameters
----------
df_grouped
A :py:class:`pd.DataFrame` created by grouping over both `by` and `over.
likelihood
A :py:class:`pd.Series` representing the probability of picking a
specific group in a certain community.
by
The column or index to group by in order to
partition the rows into communities.
group_name
The name of the group of which to take the population fraction of, which
would then be multiplied with `likelihood`.
Returns
-------
A series containing `likelihood` multiplied element-wise with
the fraction of `group_name` in a certain region.
"""
region_population = df_grouped.groupby(by)[group_name].sum(numeric_only=True)
frac_in_region = df_grouped[group_name] / region_population[
df_grouped[by]
].reset_index(drop=True)
df_grouped["Product"] = likelihood * frac_in_region
product_sum = df_grouped.groupby(by)["Product"].sum().reset_index(drop=True)
return product_sum
[docs]def isolation(
df_communities: pd.DataFrame,
group_name: str,
by: str,
over: str,
) -> pd.DataFrame:
"""
Compute the isolation of a group in a community. Isolation is the
average, over all members of a group in a community, of the proportion
of the smaller area they reside in that are not members of their group.
Parameters
----------
df_communities
A :py:class:`pd.DataFrame` of communities.
group_name
The name of the group (name of a column in `df_communities`)
whose isolation we wish to compute.
by
The column or index to group by in order to
partition the rows into communities.
over
The column to group by in order to partition
the rows of each community into smaller
aggregation units where the base diversity will
be computed.
Returns
-------
A dataframe with one row for each unique value of the `by`
column indicating the isolation of the `group_name` column
with respect to all of the other columns in the data frame.
Examples
--------
>>> import pandas as pd
...
... df = pd.DataFrame(
... [
... ['Region 1', 'Subregion A', 100, 0],
... ['Region 1', 'Subregion B', 50, 50],
... ['Region 2', 'Subregion C', 0, 100],
... ['Region 2', 'Subregion D', 0, 50],
... ['Region 2', 'Subregion E', 10, 90],
... ],
... columns=['REGION', 'SUBREGION', 'S', 'T']
... )
...
... df
REGION SUBREGION S T
0 Region 1 Subregion A 100 0
1 Region 1 Subregion B 50 50
2 Region 2 Subregion C 0 100
3 Region 2 Subregion D 0 50
4 Region 2 Subregion E 10 90
>>> from divintseg import isolation
...
... isolation(df, "S", by="REGION", over="SUBREGION")
REGION S
0 Region 1 0.83333
1 Region 2 0.1
Let's look at what this example computed. First, we have
to see how likely each person in group S is to see other
members of their own group in their subregion. This is as
follows:
+----------+-------------+-----------------------+
| Region | Subregion | Likelihood of an S |
+==========+=============+=======================+
| Region 1 | Subregion A | 100 / (100 + 0) = 1.0 |
+----------+-------------+-----------------------+
| Region 1 | Subregion B | 50 / (50 + 50) = 0.5 |
+----------+-------------+-----------------------+
| Region 2 | Subregion C | 0 / (0 + 100) = 0.0 |
+----------+-------------+-----------------------+
| Region 2 | Subregion D | 0 / (0 + 50) = 0.0 |
+----------+-------------+-----------------------+
| Region 2 | Subregion E | 10 / (10 + 90) = 0.1 |
+----------+-------------+-----------------------+
Next, we can compute the fraction of all S's in
each subregion of each region. There are 150 S's
in Region 1 and 10 S's in region 2, therefore, we have:
+----------+-------------+--------------------------------+
| Region | Subregion | Fraction of all As in Region |
+==========+=============+================================+
| Region 1 | Subregion A | 100 / 150 = 0.6667 |
+----------+-------------+--------------------------------+
| Region 1 | Subregion B | 50 / 150 = 0.3333 |
+----------+-------------+--------------------------------+
| Region 2 | Subregion C | 0 / 10 = 0.0000 |
+----------+-------------+--------------------------------+
| Region 2 | Subregion D | 0 / 10 = 0.0000 |
+----------+-------------+--------------------------------+
| Region 2 | Subregion E | 10 / 10 = 1.0000 |
+----------+-------------+--------------------------------+
Finally, for each subregion, we multiply these together and
add them up the values for the subregions in each region.
For Region 1, we get
.. math::
(0.6667 * 1.0) + (0.3333 * 0.5) = 0.8333.
For Region 2, we get
.. math::
0.0 * 0.0 + 0.0 * 0.0 + 1.000 * 0.1 = 0.1.
Note that the implentation may not do this math exactly as
specified here, but it will do something equivalent.
"""
df_grouped = df_communities.groupby([by, over], as_index=False).sum(
numeric_only=True
)
likelihood = df_grouped[group_name] / df_grouped.sum(
axis="columns", numeric_only=True
)
final_df = pd.DataFrame(df_communities[by].unique(), columns=[by])
final_df[group_name] = likelihood_populationfrac_product(
df_grouped, likelihood, by, group_name
)
return final_df
[docs]def bells(
df_communities: pd.DataFrame,
group_name: str,
by: str,
over: str,
) -> pd.DataFrame:
"""
Computes the isolation of a group using the isolation index by Wendell Bell.
Parameters
----------
df_communities
A :py:class:`pd.DataFrame` of communities.
group_name
The name of the group (name of a column in `df_communities`)
whose isolation we wish to compute.
by
The column or index to group by in order to
partition the rows into communities.
over
The column to group by in order to partition
the rows of each community into smaller
aggregation units where the base diversity will
be computed.
Returns
-------
A dataframe with one row for each unique value of the `by`
column indicating the Bell's Index of the `group_name` column
with respect to all of the other columns in the data frame. If community
population consists exclusively of `group_name`, 1.0 will take place in
the dataframe cell corresponding to that region.
"""
df_grouped = df_communities.groupby([by, over], as_index=False).sum(
numeric_only=True
)
group_pop = df_grouped.groupby(by, as_index=False).sum(numeric_only=True)[
group_name
]
region_pop = (
df_grouped.groupby(by, as_index=False)
.sum(numeric_only=True)
.sum(numeric_only=True, axis=1)
)
px = group_pop / region_pop # proportion of sample made of group_name
pxx = isolation(
df_communities, group_name, by, over
) # minimum probable interaction within group
pxx[group_name] = (pxx[group_name] - px) / (1 - px)
# NaN if px=1, happens when subgroup population consists only of group_name
return pxx.fillna(1)
[docs]def exposure(
df_communities: pd.DataFrame,
primary_group_name: str,
by: str,
over: str,
secondary_group_name: str = None,
) -> pd.DataFrame:
"""
Compute the exposure of a group in a community. Exposure measures a group's
average local exposure to members of another group.
Parameters
----------
df_communities
A :py:class:`pd.DataFrame` of communities.
primary_group_name
The name of the group (name of a column in `df_communities`)
whose exposure we wish to compute relative to the group specified
in `secondary_group_name`.
by
The column or index to group by in order to
partition the rows into communities.
over
The column to group by in order to partition
the rows of each community into smaller
aggregation units where the base diversity will
be computed.
secondary_group_name
The name of the group whose exposure should be calculated relative to
`primary_group_name`. If None, every single group will have its
exposure calculated.
Returns
-------
A dataframe with one row for each unique value of the `by`
column and one column for each value of the `over` column other
than `primary_group_name` indicating the exposure of the
`primary_group_name` column with respect to another `over` column in
the data frame. If `secondary_group_name` is not None, the only column
in the returned dataframe will be the exposure of `primary_group_name`
to `secondary_group_name`.
Example
--------
>>> import pandas as pd
...
... df = pd.DataFrame(
... [
... ['Region 1', 'Subregion A', 100, 0, 0],
... ['Region 1', 'Subregion B', 50, 50, 50],
... ['Region 2', 'Subregion C', 0, 110, 100],
... ['Region 2', 'Subregion D', 0, 50, 0],
... ['Region 2', 'Subregion E', 10, 90, 0],
... ],
... columns=['REGION', 'SUBREGION', 'A', 'B', 'C']
... )
...
... df
REGION SUBREGION A B C
0 Region 1 Subregion A 100 0 0
1 Region 1 Subregion B 50 50 50
2 Region 2 Subregion C 0 110 100
3 Region 2 Subregion D 0 50 0
4 Region 2 Subregion E 10 90 0
>>> from divintseg import exposure
...
... exposure(df, "A", by="REGION", over="SUBREGION")
REGION B C
0 Region 1 0.3333 0.3333
1 Region 2 0.036 0
Row R and column C represents the exposure of `groupname` to C in region R:
the exposure of A to B is 0.036 in region 2.
Calculating the likelihood of A:
+----------+-------------+------------------------------+
| Region | Subregion | Likelihood of A |
+==========+=============+==============================+
| Region 1 | Subregion A | 100 / (100 + 0 + 0) = 1 |
+----------+-------------+------------------------------+
| Region 1 | Subregion B | 50 / (50 + 50 + 50) = 0.3333 |
+----------+-------------+------------------------------+
| Region 2 | Subregion C | 0 / (0 + 110 + 100) = 0 |
+----------+-------------+------------------------------+
| Region 2 | Subregion D | 0 / (0 + 50 + 0) = 0 |
+----------+-------------+------------------------------+
| Region 2 | Subregion E | 10 / (10 + 90 + 0) = 0.1 |
+----------+-------------+------------------------------+
Computing the fraction of all B's in each subregion of each region:
+----------+-------------+--------------------------------+
| Region | Subregion | Fraction of all B's in Region |
+==========+=============+================================+
| Region 1 | Subregion A | 0 / 50 = 0 |
+----------+-------------+--------------------------------+
| Region 1 | Subregion B | 50 / 50 = 1 |
+----------+-------------+--------------------------------+
| Region 2 | Subregion C | 110 / 250 = 0.44 |
+----------+-------------+--------------------------------+
| Region 2 | Subregion D | 50 / 250 = 0.2 |
+----------+-------------+--------------------------------+
| Region 2 | Subregion E | 90 / 250 = 0.36 |
+----------+-------------+--------------------------------+
Multiplying and summing for the subregions in each region:
For Region 1, we get
.. math::
(1 * 0) + (0.3333 * 1) = 0.3333.
For Region 2, we get
.. math::
0 * 0.44 + 0 * 0.2 + 0.1 * 0.36 = 0.036.
Repeating the process for C:
+----------+-------------+--------------------------------+
| Region | Subregion | Fraction of all C's in Region |
+==========+=============+================================+
| Region 1 | Subregion A | 0 / 50 = 0 |
+----------+-------------+--------------------------------+
| Region 1 | Subregion B | 50 / 50 = 1 |
+----------+-------------+--------------------------------+
| Region 2 | Subregion C | 100 / 100 = 1 |
+----------+-------------+--------------------------------+
| Region 2 | Subregion D | 0 / 100 = 0 |
+----------+-------------+--------------------------------+
| Region 2 | Subregion E | 0 / 100 = 0 |
+----------+-------------+--------------------------------+
For Region 1, we get
.. math::
(1 * 0) + (0.3333 * 1) = 0.3333.
For Region 2, we get
.. math::
0 * 1 + 0 * 0 + 0.1 * 0 = 0.
"""
df_grouped = df_communities.groupby([by, over], as_index=False).sum(
numeric_only=True
)
likelihood = df_grouped[primary_group_name] / df_grouped.sum(
axis="columns", numeric_only=True
)
if secondary_group_name is None:
pairwise_columns = df_grouped.select_dtypes(include="number").columns.tolist()
pairwise_columns.remove(primary_group_name)
final_df = pd.DataFrame(df_communities[by].unique(), columns=[by])
for col in pairwise_columns:
final_df[col] = likelihood_populationfrac_product(
df_grouped, likelihood, by, col
)
return final_df
final_df = pd.DataFrame(df_communities[by].unique(), columns=[by])
final_df[secondary_group_name] = likelihood_populationfrac_product(
df_grouped, likelihood, by, secondary_group_name
)
return final_df