-
Notifications
You must be signed in to change notification settings - Fork 41
Add support for scan fetch size in storage adapters #2731
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -38,6 +38,7 @@ public class DatabaseConfig { | |
| private boolean crossPartitionScanFilteringEnabled; | ||
| private boolean crossPartitionScanOrderingEnabled; | ||
| private String systemNamespaceName; | ||
| private int scannerFetchSize; | ||
|
|
||
| public static final String PREFIX = "scalar.db."; | ||
| public static final String CONTACT_POINTS = PREFIX + "contact_points"; | ||
|
|
@@ -56,9 +57,11 @@ public class DatabaseConfig { | |
| public static final String CROSS_PARTITION_SCAN_FILTERING = SCAN_PREFIX + "filtering.enabled"; | ||
| public static final String CROSS_PARTITION_SCAN_ORDERING = SCAN_PREFIX + "ordering.enabled"; | ||
| public static final String SYSTEM_NAMESPACE_NAME = PREFIX + "system_namespace_name"; | ||
| public static final String SCANNER_FETCH_SIZE = PREFIX + "scanner_fetch_size"; | ||
|
||
|
|
||
| public static final int DEFAULT_METADATA_CACHE_EXPIRATION_TIME_SECS = 60; | ||
| public static final String DEFAULT_SYSTEM_NAMESPACE_NAME = "scalardb"; | ||
| public static final int DEFAULT_SCANNER_FETCH_SIZE = 10; | ||
|
|
||
| public DatabaseConfig(File propertiesFile) throws IOException { | ||
| try (FileInputStream stream = new FileInputStream(propertiesFile)) { | ||
|
|
@@ -118,6 +121,8 @@ protected void load() { | |
| } | ||
|
|
||
| systemNamespaceName = getSystemNamespaceName(getProperties()); | ||
|
|
||
| scannerFetchSize = getInt(getProperties(), SCANNER_FETCH_SIZE, DEFAULT_SCANNER_FETCH_SIZE); | ||
|
||
| } | ||
|
|
||
| public List<String> getContactPoints() { | ||
|
|
@@ -172,6 +177,10 @@ public String getSystemNamespaceName() { | |
| return systemNamespaceName; | ||
| } | ||
|
|
||
| public int getScannerFetchSize() { | ||
| return scannerFetchSize; | ||
| } | ||
|
|
||
| public static String getTransactionManager(Properties properties) { | ||
| return getString(properties, TRANSACTION_MANAGER, "consensus-commit"); | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -41,13 +41,18 @@ | |
| */ | ||
| @ThreadSafe | ||
| public class SelectStatementHandler extends StatementHandler { | ||
|
|
||
| private final int fetchSize; | ||
|
|
||
| /** | ||
| * Constructs {@code SelectStatementHandler} with the specified {@code Session} | ||
| * | ||
| * @param session session to be used with this statement | ||
| * @param fetchSize the number of rows to be fetched at once | ||
| */ | ||
| public SelectStatementHandler(Session session) { | ||
| public SelectStatementHandler(Session session, int fetchSize) { | ||
| super(session); | ||
| this.fetchSize = fetchSize; | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -94,6 +99,7 @@ protected BoundStatement bind(PreparedStatement prepared, Operation operation) { | |
| @Override | ||
| @Nonnull | ||
| protected ResultSet execute(BoundStatement bound, Operation operation) { | ||
| bound.setFetchSize(fetchSize); | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For Cassandra, call |
||
| return session.execute(bound); | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -44,8 +44,12 @@ | |
| @ThreadSafe | ||
| public class SelectStatementHandler extends StatementHandler { | ||
|
|
||
| public SelectStatementHandler(CosmosClient client, TableMetadataManager metadataManager) { | ||
| private final int fetchSize; | ||
|
|
||
| public SelectStatementHandler( | ||
| CosmosClient client, TableMetadataManager metadataManager, int fetchSize) { | ||
| super(client, metadataManager); | ||
| this.fetchSize = fetchSize; | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -85,9 +89,10 @@ private Scanner executeRead(Get get, TableMetadata tableMetadata) throws CosmosE | |
| return executeReadWithIndex(get, tableMetadata); | ||
| } | ||
|
|
||
| PartitionKey partitionKey = cosmosOperation.getCosmosPartitionKey(); | ||
|
|
||
| if (get.getProjections().isEmpty()) { | ||
| String id = cosmosOperation.getId(); | ||
| PartitionKey partitionKey = cosmosOperation.getCosmosPartitionKey(); | ||
| Record record = getContainer(get).readItem(id, partitionKey, Record.class).getItem(); | ||
| return new SingleRecordScanner( | ||
| record, new ResultInterpreter(get.getProjections(), tableMetadata)); | ||
|
|
@@ -100,8 +105,10 @@ private Scanner executeRead(Get get, TableMetadata tableMetadata) throws CosmosE | |
| .eq(cosmosOperation.getConcatenatedPartitionKey()), | ||
| DSL.field("r.id").eq(cosmosOperation.getId())) | ||
| .getSQL(ParamType.INLINED); | ||
| CosmosQueryRequestOptions options = | ||
| new CosmosQueryRequestOptions().setPartitionKey(partitionKey); | ||
|
|
||
| return executeQuery(get, tableMetadata, query); | ||
| return executeQuery(get, tableMetadata, query, options); | ||
| } | ||
|
|
||
| private Scanner executeReadWithIndex(Selection selection, TableMetadata tableMetadata) | ||
|
|
@@ -327,8 +334,8 @@ private Scanner executeQuery( | |
| CosmosQueryRequestOptions queryOptions) { | ||
| Iterator<FeedResponse<Record>> pagesIterator = | ||
| getContainer(selection) | ||
| .queryItems(query, queryOptions, Record.class) | ||
| .iterableByPage() | ||
| .queryItems(query, queryOptions.setMaxBufferedItemCount(fetchSize), Record.class) | ||
| .iterableByPage(fetchSize) | ||
|
Comment on lines
+337
to
+338
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the main change for Cosmos. |
||
| .iterator(); | ||
|
|
||
| return new ScannerImpl( | ||
|
|
||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the main change for Dynamo. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -59,7 +59,12 @@ public JdbcDatabase(DatabaseConfig databaseConfig) { | |
| databaseConfig.getMetadataCacheExpirationTimeSecs()); | ||
|
|
||
| OperationChecker operationChecker = new OperationChecker(databaseConfig, tableMetadataManager); | ||
| jdbcService = new JdbcService(tableMetadataManager, operationChecker, rdbEngine); | ||
| jdbcService = | ||
| new JdbcService( | ||
| tableMetadataManager, | ||
| operationChecker, | ||
| rdbEngine, | ||
| databaseConfig.getScannerFetchSize()); | ||
| } | ||
|
|
||
| @VisibleForTesting | ||
|
|
@@ -98,9 +103,18 @@ public Scanner scan(Scan scan) throws ExecutionException { | |
| Connection connection = null; | ||
| try { | ||
| connection = dataSource.getConnection(); | ||
| connection.setAutoCommit(false); | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For JDBC, call |
||
| rdbEngine.setReadOnly(connection, true); | ||
| return jdbcService.getScanner(scan, connection); | ||
| } catch (SQLException e) { | ||
| try { | ||
| if (connection != null) { | ||
| connection.rollback(); | ||
| } | ||
| } catch (SQLException ex) { | ||
| e.addSuppressed(ex); | ||
| } | ||
|
|
||
| close(connection); | ||
| throw new ExecutionException( | ||
| CoreError.JDBC_ERROR_OCCURRED_IN_SELECTION.buildMessage(e.getMessage()), e); | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a new property
scalar.db.scan_fetch_size.