Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion dask_sql/physical/rel/custom/create_model.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
from typing import TYPE_CHECKING

import numpy as np
from dask import delayed

from dask_sql.datacontainer import DataContainer
Expand Down Expand Up @@ -183,7 +184,13 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai

delayed_model = [delayed(model.fit)(x_p, y_p) for x_p, y_p in zip(X_d, y_d)]
model = delayed_model[0].compute()
model = ParallelPostFit(estimator=model)
output_meta = np.array([])
model = ParallelPostFit(
estimator=model,
predict_meta=output_meta,
predict_proba_meta=output_meta,
transform_meta=output_meta,
)

else:
model.fit(X, y, **fit_kwargs)
Expand Down
56 changes: 55 additions & 1 deletion dask_sql/physical/rel/custom/predict.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@
import uuid
from typing import TYPE_CHECKING

import numpy as np

try:
from dask_ml.wrappers import ParallelPostFit
except ImportError: # pragma: no cover
raise ValueError("Wrapping requires dask-ml to be installed.")

from dask_sql.datacontainer import ColumnContainer, DataContainer
from dask_sql.physical.rel.base import BaseRelPlugin

Expand Down Expand Up @@ -59,7 +66,18 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai

model, training_columns = context.schema[schema_name].models[model_name]
df = context.sql(sql_select)
prediction = model.predict(df[training_columns])
part = df[training_columns]

if isinstance(model, ParallelPostFit):
output_meta = model.predict_meta
if output_meta is None:
output_meta = model.estimator.predict(part._meta_nonempty)
prediction = part.map_partitions(
self._predict, output_meta, model.estimator, meta=output_meta
)
else:
prediction = model.predict(part)

predicted_df = df.assign(target=prediction)

# Create a temporary context, which includes the
Expand All @@ -79,3 +97,39 @@ def convert(self, rel: "LogicalPlan", context: "dask_sql.Context") -> DataContai
dc = DataContainer(predicted_df, cc)

return dc

def _predict(self, part, predict_meta, estimator):
if part.shape[0] == 0 and predict_meta is not None:
empty_output = self.handle_empty_partitions(predict_meta)
if empty_output is not None:
return empty_output
return estimator.predict(part)

def handle_empty_partitions(self, output_meta):
if hasattr(output_meta, "__array_function__"):
if len(output_meta.shape) == 1:
shape = 0
else:
shape = list(output_meta.shape)
shape[0] = 0
ar = np.zeros(
shape=shape,
dtype=output_meta.dtype,
like=output_meta,
)
return ar
elif "scipy.sparse" in type(output_meta).__module__:
# sparse matrices don't support
# `like` due to non implimented __array_function__
# Refer https://github.com/scipy/scipy/issues/10362
# Note below works for both cupy and scipy sparse matrices
if len(output_meta.shape) == 1:
shape = 0
else:
shape = list(output_meta.shape)
shape[0] = 0

ar = type(output_meta)(shape, dtype=output_meta.dtype)
return ar
elif hasattr(output_meta, "iloc"):
return output_meta.iloc[:0, :]
74 changes: 74 additions & 0 deletions tests/integration/test_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -924,3 +924,77 @@ def test_experiment_automl_regressor(c, client, training_df):
), "Best model was not registered"

check_trained_model(c, "my_automl_exp2")


def test_predict_with_nullable_types(c):
df = pd.DataFrame(
{
"rough_day_of_year": [0, 1, 2, 3],
"prev_day_inches_rained": [0, 1, 2, 3],
"rained": [False, False, False, True],
}
)
c.create_table("train_set", df)

model_class = "'sklearn.linear_model.LogisticRegression'"

c.sql(
f"""
CREATE OR REPLACE MODEL model WITH (
model_class = {model_class},
wrap_predict = True,
wrap_fit = False,
target_column = 'rained'
) AS (
SELECT *
FROM train_set
)
"""
)

expected = c.sql(
"""
SELECT * FROM PREDICT(
MODEL model,
SELECT * FROM train_set
)
"""
)

df = pd.DataFrame(
{
"rough_day_of_year": pd.Series([0, 1, 2, 3], dtype='Int32'),
"prev_day_inches_rained": pd.Series([0, 1, 2, 3], dtype='float32'),
"rained": pd.Series([False, False, False, True]),
}
)
c.create_table("train_set", df)

c.sql(
f"""
CREATE OR REPLACE MODEL model WITH (
model_class = {model_class},
wrap_predict = True,
wrap_fit = False,
target_column = 'rained'
) AS (
SELECT *
FROM train_set
)
"""
)

result = c.sql(
"""
SELECT * FROM PREDICT(
MODEL model,
SELECT * FROM train_set
)
"""
)

assert_eq(
expected,
result,
check_dtype=False,
)