33import functools
44import itertools
55import logging
6+ import posixpath
67from typing import Any , Dict , List , Mapping , Optional , Sequence , Union
78
89import ydb
@@ -78,6 +79,7 @@ def __init__(
7879 session_pool : Union [ydb .SessionPool , ydb .aio .SessionPool ],
7980 tx_mode : ydb .AbstractTransactionModeBuilder ,
8081 tx_context : Optional [ydb .BaseTxContext ] = None ,
82+ root_directory : str = "" ,
8183 ):
8284 self .session_pool = session_pool
8385 self .tx_mode = tx_mode
@@ -86,33 +88,36 @@ def __init__(
8688 self .arraysize = 1
8789 self .rows = None
8890 self ._rows_prefetched = None
91+ self .root_directory = root_directory
8992
9093 @_handle_ydb_errors
9194 def describe_table (self , abs_table_path : str ) -> ydb .TableDescription :
9295 return self ._retry_operation_in_pool (self ._describe_table , abs_table_path )
9396
94- def check_exists (self , table_path : str ) -> bool :
97+ def check_exists (self , abs_table_path : str ) -> bool :
9598 try :
96- self ._retry_operation_in_pool (self ._describe_path , table_path )
99+ self ._retry_operation_in_pool (self ._describe_path , abs_table_path )
97100 return True
98101 except ydb .SchemeError :
99102 return False
100103
101104 @_handle_ydb_errors
102- def get_table_names (self ) -> List [str ]:
103- directory : ydb .Directory = self ._retry_operation_in_pool (self ._list_directory )
104- return [child .name for child in directory .children if child .is_table ()]
105+ def get_table_names (self , abs_dir_path : str ) -> List [str ]:
106+ directory : ydb .Directory = self ._retry_operation_in_pool (self ._list_directory , abs_dir_path )
107+ result = []
108+ for child in directory .children :
109+ child_abs_path = posixpath .join (abs_dir_path , child .name )
110+ if child .is_table ():
111+ result .append (child_abs_path )
112+ elif child .is_directory () and not child .name .startswith ("." ):
113+ result .extend (self .get_table_names (child_abs_path ))
114+ return result
105115
106116 def execute (self , operation : YdbQuery , parameters : Optional [Mapping [str , Any ]] = None ):
107- if operation .is_ddl or not operation .parameters_types :
108- query = operation .yql_text
109- is_ddl = operation .is_ddl
110- else :
111- query = ydb .DataQuery (operation .yql_text , operation .parameters_types )
112- is_ddl = operation .is_ddl
117+ query = self ._get_ydb_query (operation )
113118
114119 logger .info ("execute sql: %s, params: %s" , query , parameters )
115- if is_ddl :
120+ if operation . is_ddl :
116121 chunks = self ._execute_ddl (query )
117122 else :
118123 chunks = self ._execute_dml (query , parameters )
@@ -130,6 +135,15 @@ def execute(self, operation: YdbQuery, parameters: Optional[Mapping[str, Any]] =
130135
131136 self .rows = rows
132137
138+ def _get_ydb_query (self , operation : YdbQuery ) -> Union [ydb .DataQuery , str ]:
139+ pragma = ""
140+ if self .root_directory :
141+ pragma = f'PRAGMA TablePathPrefix = "{ self .root_directory } ";\n '
142+ if operation .is_ddl or not operation .parameters_types :
143+ return pragma + operation .yql_text
144+
145+ return ydb .DataQuery (pragma + operation .yql_text , operation .parameters_types )
146+
133147 @_handle_ydb_errors
134148 def _execute_dml (
135149 self , query : Union [ydb .DataQuery , str ], parameters : Optional [Mapping [str , Any ]] = None
@@ -163,8 +177,8 @@ def _describe_path(session: ydb.Session, table_path: str) -> ydb.SchemeEntry:
163177 return session ._driver .scheme_client .describe_path (table_path )
164178
165179 @staticmethod
166- def _list_directory (session : ydb .Session ) -> ydb .Directory :
167- return session ._driver .scheme_client .list_directory (session . _driver . _driver_config . database )
180+ def _list_directory (session : ydb .Session , abs_dir_path : str ) -> ydb .Directory :
181+ return session ._driver .scheme_client .list_directory (abs_dir_path )
168182
169183 @staticmethod
170184 def _prepare (session : ydb .Session , query : str ) -> ydb .DataQuery :
@@ -264,12 +278,12 @@ async def _describe_table(session: ydb.aio.table.Session, abs_table_path: str) -
264278 return await session .describe_table (abs_table_path )
265279
266280 @staticmethod
267- async def _describe_path (session : ydb .aio .table .Session , table_path : str ) -> ydb .SchemeEntry :
268- return await session ._driver .scheme_client .describe_path (table_path )
281+ async def _describe_path (session : ydb .aio .table .Session , abs_table_path : str ) -> ydb .SchemeEntry :
282+ return await session ._driver .scheme_client .describe_path (abs_table_path )
269283
270284 @staticmethod
271- async def _list_directory (session : ydb .aio .table .Session ) -> ydb .Directory :
272- return await session ._driver .scheme_client .list_directory (session . _driver . _driver_config . database )
285+ async def _list_directory (session : ydb .aio .table .Session , abs_dir_path : str ) -> ydb .Directory :
286+ return await session ._driver .scheme_client .list_directory (abs_dir_path )
273287
274288 @staticmethod
275289 async def _execute_scheme (session : ydb .aio .table .Session , query : str ) -> ydb .convert .ResultSets :
0 commit comments