Skip to content
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 32 additions & 25 deletions dask_ml/model_selection/_incremental.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,34 +198,40 @@ async def _fit(
else:
y_test = await client.scatter(y_test)

# Convert to batches of delayed objects of numpy arrays
X_train = sorted(futures_of(X_train), key=lambda f: f.key)
y_train = sorted(futures_of(y_train), key=lambda f: f.key)
assert len(X_train) == len(y_train)

train_eg = await client.gather(client.map(len, y_train))
msg = "[CV%s] For training there are between %d and %d examples in each chunk"
logger.info(msg, prefix, min(train_eg), max(train_eg))
if hasattr(X_train, 'npartitions'):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this if-statement be changed to if isinstance(X_train, da.Array)?

Copy link
Author

@gioxc88 gioxc88 Oct 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the reason I don't want to restrict the if statement to be isinstance(X_train, da.Array) is because, potentially any custom data structure like the hypotetical CustomFrame could have a dask-like API by implementing npartitions and __dask_graph__.

In this case, for example, in the future I could further extend my CustomFrame (I or any other user) to work with data larger than memory and this would still work with hyperband even if is not a da.Array, because the API is compatible.

With this in mind maybe a good compromise could be:

if dask.is_dask_collection(X_train) ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if dask.is_dask_collection(X_train)

I think that's a better choice (though @TomAugspurger might have more input).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think we is_dask_collection is better.

# Convert to batches of delayed objects of numpy arrays
X_train = sorted(futures_of(X_train), key=lambda f: f.key)
y_train = sorted(futures_of(y_train), key=lambda f: f.key)
assert len(X_train) == len(y_train)

train_eg = await client.gather(client.map(len, y_train))
msg = "[CV%s] For training there are between %d and %d examples in each chunk"
logger.info(msg, prefix, min(train_eg), max(train_eg))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why won't this code work?

train_eg =  ...
msg = ...
if len(train_eg):
    logger.info(msg, prefix, min(train_eg), max(train_eg))

This avoids min([]) as mentioned in #748 (comment).

Then, I think the if-statement could be moved inside get_futures:

def get_futures(partial_fit_calls):
    if not isinstance(X_train, da.Array):
        return X_train, y_train
    ...  # existing implementation

Copy link
Author

@gioxc88 gioxc88 Oct 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are right, I was not careful when I red your comment, this would work.

The reason why I did not implement it that way is because saying if len(train_eg): or even shorter if train_eg is the same as checking isinstance(X_train, da.Array) because train_eg will be empty only if X_train is not a da.Array. This means that in your implementation you are still executing these lines, even if not needed:

X_train = sorted(futures_of(X_train), key=lambda f: f.key)
y_train = sorted(futures_of(y_train), key=lambda f: f.key)
assert len(X_train) == len(y_train)

train_eg = await client.gather(client.map(len, y_train))

finally the reason why it might be better not to incorporate the if statement inside the get_futures function is because this will force you to check the condition every time you call the function which will not be ideal for dask.collections with a large number of partitions.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the logging message should be issued if the dataset object has an implementation of __len__. The computation of train_eg only happens once (train_eg is a list of ints).

I don't like two separate definitions of get_futures, especially if the both return training data.


def get_futures(partial_fit_calls):
"""Policy to get training data futures

Currently we compute once, and then keep in memory.
Presumably in the future we'll want to let data drop and recompute.
This function handles that policy internally, and also controls random
access to training data.
"""
# Shuffle blocks going forward to get uniform-but-random access
while partial_fit_calls >= len(order):
L = list(range(len(X_train)))
rng.shuffle(L)
order.extend(L)
j = order[partial_fit_calls]
return X_train[j], y_train[j]
# __addition__ start
else:
def get_futures(partial_fit_calls):
return X_train, y_train
# __addition__ end

# Order by which we process training data futures
order = []

def get_futures(partial_fit_calls):
"""Policy to get training data futures

Currently we compute once, and then keep in memory.
Presumably in the future we'll want to let data drop and recompute.
This function handles that policy internally, and also controls random
access to training data.
"""
# Shuffle blocks going forward to get uniform-but-random access
while partial_fit_calls >= len(order):
L = list(range(len(X_train)))
rng.shuffle(L)
order.extend(L)
j = order[partial_fit_calls]
return X_train[j], y_train[j]

# Submit initial partial_fit and score computations on first batch of data
X_future, y_future = get_futures(0)
X_future_2, y_future_2 = get_futures(1)
Expand Down Expand Up @@ -566,7 +572,8 @@ def _get_train_test_split(self, X, y, **kwargs):
X, y : dask.array.Array
"""
if self.test_size is None:
test_size = min(0.2, 1 / X.npartitions)
npartitions = getattr(X, 'npartitions', 1)
test_size = min(0.2, 1 / npartitions)
else:
test_size = self.test_size
X_train, X_test, y_train, y_test = train_test_split(
Expand Down