-
-
Notifications
You must be signed in to change notification settings - Fork 261
support for non dask arrays for HyperbandSearchCV #751
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 1 commit
6b3ad51
101e6a9
fe154c6
a73e6ec
cd5645f
0017e09
f7c01c7
ef2956a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -198,34 +198,40 @@ async def _fit( | |
| else: | ||
| y_test = await client.scatter(y_test) | ||
|
|
||
| # Convert to batches of delayed objects of numpy arrays | ||
| X_train = sorted(futures_of(X_train), key=lambda f: f.key) | ||
| y_train = sorted(futures_of(y_train), key=lambda f: f.key) | ||
| assert len(X_train) == len(y_train) | ||
|
|
||
| train_eg = await client.gather(client.map(len, y_train)) | ||
| msg = "[CV%s] For training there are between %d and %d examples in each chunk" | ||
| logger.info(msg, prefix, min(train_eg), max(train_eg)) | ||
| if hasattr(X_train, 'npartitions'): | ||
| # Convert to batches of delayed objects of numpy arrays | ||
| X_train = sorted(futures_of(X_train), key=lambda f: f.key) | ||
| y_train = sorted(futures_of(y_train), key=lambda f: f.key) | ||
| assert len(X_train) == len(y_train) | ||
|
|
||
| train_eg = await client.gather(client.map(len, y_train)) | ||
| msg = "[CV%s] For training there are between %d and %d examples in each chunk" | ||
| logger.info(msg, prefix, min(train_eg), max(train_eg)) | ||
|
||
|
|
||
| def get_futures(partial_fit_calls): | ||
| """Policy to get training data futures | ||
|
|
||
| Currently we compute once, and then keep in memory. | ||
| Presumably in the future we'll want to let data drop and recompute. | ||
| This function handles that policy internally, and also controls random | ||
| access to training data. | ||
| """ | ||
| # Shuffle blocks going forward to get uniform-but-random access | ||
| while partial_fit_calls >= len(order): | ||
| L = list(range(len(X_train))) | ||
| rng.shuffle(L) | ||
| order.extend(L) | ||
| j = order[partial_fit_calls] | ||
| return X_train[j], y_train[j] | ||
| # __addition__ start | ||
| else: | ||
| def get_futures(partial_fit_calls): | ||
| return X_train, y_train | ||
| # __addition__ end | ||
|
|
||
| # Order by which we process training data futures | ||
| order = [] | ||
|
|
||
| def get_futures(partial_fit_calls): | ||
| """Policy to get training data futures | ||
|
|
||
| Currently we compute once, and then keep in memory. | ||
| Presumably in the future we'll want to let data drop and recompute. | ||
| This function handles that policy internally, and also controls random | ||
| access to training data. | ||
| """ | ||
| # Shuffle blocks going forward to get uniform-but-random access | ||
| while partial_fit_calls >= len(order): | ||
| L = list(range(len(X_train))) | ||
| rng.shuffle(L) | ||
| order.extend(L) | ||
| j = order[partial_fit_calls] | ||
| return X_train[j], y_train[j] | ||
|
|
||
| # Submit initial partial_fit and score computations on first batch of data | ||
| X_future, y_future = get_futures(0) | ||
| X_future_2, y_future_2 = get_futures(1) | ||
|
|
@@ -566,7 +572,8 @@ def _get_train_test_split(self, X, y, **kwargs): | |
| X, y : dask.array.Array | ||
| """ | ||
| if self.test_size is None: | ||
| test_size = min(0.2, 1 / X.npartitions) | ||
| npartitions = getattr(X, 'npartitions', 1) | ||
| test_size = min(0.2, 1 / npartitions) | ||
| else: | ||
| test_size = self.test_size | ||
| X_train, X_test, y_train, y_test = train_test_split( | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this if-statement be changed to
if isinstance(X_train, da.Array)?Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the reason I don't want to restrict the if statement to be
isinstance(X_train, da.Array)is because, potentially any custom data structure like the hypoteticalCustomFramecould have adask-likeAPI by implementingnpartitionsand__dask_graph__.In this case, for example, in the future I could further extend my
CustomFrame(I or any other user) to work with data larger than memory and this would still work withhyperbandeven if is not ada.Array, because the API is compatible.With this in mind maybe a good compromise could be:
if dask.is_dask_collection(X_train)?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that's a better choice (though @TomAugspurger might have more input).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think we
is_dask_collectionis better.