Skip to content
118 changes: 118 additions & 0 deletions feature_engine/creation/distance_features.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
from typing import Optional, Union

import numpy as np
import pandas as pd

from feature_engine._base_transformers.base_numerical import BaseNumericalTransformer
from feature_engine._base_transformers.mixins import (
FitFromDictMixin,
GetFeatureNamesOutMixin,
)
from feature_engine._check_input_parameters.check_init_input_params import (
_check_param_drop_original,
)


class DistanceFeatures(
BaseNumericalTransformer, FitFromDictMixin, GetFeatureNamesOutMixin
):
EARTH_RADIUS: float = 6371. # radius of Earth in kms

def __init__(
self,
a_latitude: str,
a_longitude: str,
b_latitude: str,
b_longitude: str,
output_column_name: Union[str, None] = None,
drop_original: bool = False,
) -> None:

self.a_latitude = self._check_column_name(a_latitude)
self.a_longitude = self._check_column_name(a_longitude)
self.b_latitude = self._check_column_name(b_latitude)
self.b_longitude = self._check_column_name(b_longitude)

self.output_column_name = self._check_column_name(column_name=output_column_name)

_check_param_drop_original(drop_original=drop_original)
self.drop_original = drop_original

self.variables = None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need this parameter. I'd suggest using RelativeFeatures as template to model this class: https://github.com/VascoSch92/feature_engine/blob/e1e927625678ee73c5c3a9edcf79e955ff9c5e8e/feature_engine/creation/relative_features.py

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

variables is a parameter that we have in all transformers, so I would stick to this name instead of using coordinate_columns

In short, let's replace coordinate_columns by variables.


@staticmethod
def _check_column_name(column_name: str) -> str:
if not isinstance(column_name, str):
raise ValueError(
"column_name takes only string as value. "
f"Got {column_name} instead."
)

return column_name

def transform(self, X: pd.DataFrame):
"""
Compute the distance between the two coordinates given using the Haversine formula

Parameters
----------
X: Pandas DataFrame of shame = [n_samples, n_features]
The data to be transformed.

Returns
-------
X_new: Pandas dataframe.
The original dataframe plus the distance between the given coordinates.
"""
X = self._check_transform_input_and_state(X)
X = self._check_lat_lon_columns_are_in_df(X)
X = self._check_correctness_of_coordinates(X)

self.compute_distance(X)

if self.drop_original:
X.drop(
columns=[
self.a_latitude,
self.a_longitude,
self.b_latitude,
self.b_longitude,
],
inplace=True)

return X

def compute_distance(self, X: pd.DataFrame):
# convert latitude and longitude in radians
phi_1 = np.radians(X[self.a_latitude])
phi_2 = np.radians(X[self.b_latitude])
lambda_1 = np.radians(X[self.a_longitude])
lambda_2 = np.radians(X[self.b_longitude])

# compute delta, i.e., difference, between radians
delta_phi = phi_2 - phi_1
delta_lambda = lambda_2 - lambda_1

# compute distance using Haversine formula
inner_part = np.sin(delta_phi / 2) ** 2 + np.cos(phi_1) * np.cos(phi_2) * np.sin(delta_lambda / 2) ** 2
X[self.output_column_name] = self.EARTH_RADIUS * 2 * np.arcsin(np.sqrt(inner_part))

def _check_lat_lon_columns_are_in_df(self, X) -> pd.DataFrame:
df_columns = set(X.columns)
input_columns = {self.a_latitude, self.a_longitude, self.b_latitude, self.b_latitude}

if input_columns.issubset(df_columns) is False:
raise ValueError(f'The columns {input_columns.difference(df_columns)} were not found in the dataframe.')

return X

def _check_correctness_of_coordinates(self, X: pd.DataFrame) -> pd.DataFrame:
irregular_latitudes = X[(X[self.a_latitude].abs() > 90) | (X[self.b_latitude].abs() > 90)]
irregular_longitudes = X[(X[self.a_longitude].abs() > 180) | (X[self.b_longitude].abs() > 180)]

if irregular_latitudes.empty is False:
raise ValueError(f'The dataframe contains irregular latitudes: {irregular_latitudes}')
if irregular_longitudes.empty is False:
raise ValueError(f'The dataframe contains irregular longitudes: {irregular_longitudes}')

return X
253 changes: 253 additions & 0 deletions tests/test_creation/test_distance_features.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
import pandas as pd
import pytest
from pandas.testing import assert_frame_equal

from feature_engine.creation.distance_features import DistanceFeatures


@pytest.mark.parametrize(
'input_data, expected_data, output_column_name, drop_original',
[(
{
'a_latitude': [0., 0., 46.948579],
'a_longitude': [0., 0., 7.436925],
'b_latitude': [0., 12.34, 59.91054],
'b_longitude': [0., 123.45, 10.752695],
},
{
'distance_between_a_and_b': [0., 13630.28, 1457.49],
},
'distance_between_a_and_b',
False,
)]
)
def test_compute_distance_without_dropping_lat_lon_columns(
input_data,
expected_data,
output_column_name,
drop_original,
):
input_df = pd.DataFrame(input_data)
expected_df = pd.DataFrame(input_data | expected_data)

distance_transformer = DistanceFeatures(
a_latitude='a_latitude',
a_longitude='a_longitude',
b_latitude='b_latitude',
b_longitude='b_longitude',
output_column_name=output_column_name,
drop_original=drop_original,
)

distance_transformer.fit(input_df)
output_df = distance_transformer.transform(X=input_df)

assert_frame_equal(output_df, expected_df)


@pytest.mark.parametrize(
'input_data, expected_data, output_column_name, drop_original',
[(
{
'a_latitude': [0., 0., 46.948579],
'a_longitude': [0., 0., 7.436925],
'b_latitude': [0., 12.34, 59.91054],
'b_longitude': [0., 123.45, 10.752695],
},
{
'distance_between_a_and_b': [0., 13630.28, 1457.49],
},
'distance_between_a_and_b',
True,
)]
)
def test_compute_distance_with_dropping_lat_lon_columns(
input_data,
expected_data,
output_column_name,
drop_original,
):
input_df = pd.DataFrame(input_data)
expected_df = pd.DataFrame(expected_data)

distance_transformer = DistanceFeatures(
a_latitude='a_latitude',
a_longitude='a_longitude',
b_latitude='b_latitude',
b_longitude='b_longitude',
output_column_name=output_column_name,
drop_original=drop_original,
)

distance_transformer.fit(input_df)
output_df = distance_transformer.transform(X=input_df)

assert_frame_equal(output_df, expected_df)


@pytest.mark.parametrize(
'input_data, output_column_name, drop_original',
[(
{
'a_latitude': [6, 7, 5],
'a_longitude': [3, 7, 9],
'b_latitude': [0, 0, 0],
'b_longitude': [0, 0, 0],
},
'distance_between_a_and_b',
True,
)]
)
def test_output_column_name(input_data, output_column_name, drop_original):
input_df = pd.DataFrame(input_data)

distance_transformer = DistanceFeatures(
a_latitude='a_latitude',
a_longitude='a_longitude',
b_latitude='b_latitude',
b_longitude='b_longitude',
output_column_name=output_column_name,
drop_original=drop_original,
)

distance_transformer.fit(input_df)
output_df = distance_transformer.transform(X=input_df)

assert output_column_name in output_df.columns, f'column_name: {output_column_name} ' \
f'is not in {output_df.columns} '


@pytest.mark.parametrize(
'input_data',
[
{
'a_latitude': [0, -100.],
'a_longitude': [0, 0],
'b_latitude': [0, 0],
'b_longitude': [0, 0],
},
]
)
def test_latitude_is_incorrect(input_data):
input_df = pd.DataFrame(input_data)
with pytest.raises(ValueError):
transformer = DistanceFeatures(
a_latitude='a_latitude',
a_longitude='a_longitude',
b_latitude='b_latitude',
b_longitude='b_longitude',
output_column_name='distance_between_a_and_b',
drop_original=False,
)
transformer.fit(input_df)
transformer.transform(X=input_df)


@pytest.mark.parametrize(
'input_data',
[
{
'a_latitude': [0, 0],
'a_longitude': [-1_000, 0],
'b_latitude': [0, 0],
'b_longitude': [0, 0],
},
]
)
def test_longitude_is_incorrect(input_data):
input_df = pd.DataFrame(input_data)
with pytest.raises(ValueError):
transformer = DistanceFeatures(
a_latitude='a_latitude',
a_longitude='a_longitude',
b_latitude='b_latitude',
b_longitude='b_longitude',
output_column_name='distance_between_a_and_b',
drop_original=False,
)
transformer.fit(input_df)
transformer.transform(X=input_df)


@pytest.mark.parametrize(
'input_data',
[
{
'a_latitude': [0, 0],
'a_longitude': [None, 0],
'b_latitude': [0, 0],
'b_longitude': [0, 0],
},
]
)
def test_fit_raises_error_if_na_in_df(input_data):
input_df = pd.DataFrame(input_data)
with pytest.raises(ValueError):
transformer = DistanceFeatures(
a_latitude='a_latitude',
a_longitude='a_longitude',
b_latitude='b_latitude',
b_longitude='b_longitude',
output_column_name='distance_between_a_and_b',
drop_original=False,
)
transformer.fit(input_df)
transformer.transform(X=input_df)


@pytest.mark.parametrize(
'input_data',
[
{
'a_latitude': [0, 0],
'a_longitude': [0, 0],
'b_latitude': [0, 0],
'b_longitude': [0, 0],
},
]
)
def test_fit_raises_error_if_lat_lon_columns_not_in_df(input_data):
input_df = pd.DataFrame(input_data)
with pytest.raises(ValueError):
transformer = DistanceFeatures(
a_latitude='a_latitude',
a_longitude='a_longitude',
b_latitude='<wrong-name>',
b_longitude='b_longitude',
output_column_name='distance_between_a_and_b',
drop_original=False,
)
transformer.fit(input_df)
transformer.transform(X=input_df)


def test_raises_error_when_init_parameters_not_permitted():
with pytest.raises(ValueError):
DistanceFeatures(
a_latitude='a_latitude',
a_longitude='a_longitude',
b_latitude='b_latitude',
b_longitude='b_longitude',
output_column_name='distance_between_a_and_b',
drop_original='False',
)

with pytest.raises(ValueError):
DistanceFeatures(
a_latitude=123,
a_longitude='a_longitude',
b_latitude='b_latitude',
b_longitude='b_longitude',
output_column_name='distance_between_a_and_b',
drop_original=False,
)

with pytest.raises(ValueError):
DistanceFeatures(
a_latitude='a_latitude',
a_longitude='a_longitude',
b_latitude='b_latitude',
b_longitude='b_longitude',
output_column_name=123,
drop_original=False,
)