Skip to content

Commit b48c48b

Browse files
authored
Merge pull request #16 from Rathan8/clqsh-expansion-truncate
truncate helper script to drop and recreate table
2 parents 3b0fa8a + 58ff206 commit b48c48b

File tree

1 file changed

+241
-0
lines changed

1 file changed

+241
-0
lines changed

bin/toolkit-truncate.py

Lines changed: 241 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,241 @@
1+
'''The following script will simulate Truncate table in a region for the Amazon Keyspaces service.
2+
This is achieved by dropping the existing table by executing the DROP command. The DROP command itself is asynchronous and success is only a service side
3+
confirmation of the request, but not confirmation that resources have been removed. When the table no longer appears
4+
in the system.schema_mcs table is it considered fully removed. Once the table is dropped script will recreate table with existing custom properties. The CREATE command itself is asynchronous and success is only a service side
5+
confirmation of the request, but not confirmation that resources have been Creates. When the table Status appears as 'ACTIVE' in the system.schema_mcs table is considered ready for Usage. Prior to table being dropped and recreated the schema of the table is backed to ~/.cassandra/truncate log
6+
7+
Currently the script will wait for 15 + 15 mins for table deletion/ creation to be complete, In case of exceptions or duration excedeed script will fail
8+
9+
Usage - The inputs for the script are keyspace name, table name , and region respectively , for private endpoints need to manually change the endpoint variable in the script
10+
11+
## Prerequisites cqlsh-expansion (download required libraries for script execution like cassandra-driver, sigv4 etc)
12+
To install Refer to https://pypi.org/project/cqlsh-expansion/
13+
14+
##
15+
Execution Sample commands: python toolkit-truncate --table aws.test --host cassandra.us-east-1.amazonaws.com --ssl --auth-provider "SigV4AuthProvider"
16+
python toolkit-truncate --table "aws.test" --host "cassandra.us-east-1.amazonaws.com" -u "sri" -p "Rangisetti" --ssl --auth-provider "PlainTextAuthProvider"
17+
python toolkit-truncate --table aws.test --host cassandra.us-east-1.amazonaws.com --ssl --sigv4
18+
19+
@author Sri Rathan Rangisetti
20+
@author Michael Raney
21+
'''
22+
import boto3, time, sys
23+
import getpass
24+
import optparse
25+
import os
26+
import sys
27+
from cassandra.cluster import Cluster, ExecutionProfile, EXEC_PROFILE_DEFAULT
28+
from cassandra.query import dict_factory
29+
from cassandra.metadata import KeyspaceMetadata, TableMetadata
30+
from ssl import SSLContext, PROTOCOL_TLSv1_2, CERT_REQUIRED
31+
from cassandra_sigv4.auth import *
32+
from cassandra.auth import PlainTextAuthProvider
33+
import logging
34+
35+
DEFAULT_HOST = '127.0.0.1'
36+
DEFAULT_PORT = 9142
37+
DEFAULT_SSL = False
38+
DEFAULT_SIGV4 = False
39+
DEFAULT_CONNECT_TIMEOUT_SECONDS = 5
40+
DEFAULT_REQUEST_TIMEOUT_SECONDS = 10
41+
DEFAULT_AUTH_PROVIDER = 'PlainTextAuthProvider'
42+
43+
parser = optparse.OptionParser(description='Truncate the table')
44+
parser.add_option('--table', help='keyspace.table name')
45+
parser.add_option('--host', help='Amazon Keyspaces endpoint')
46+
parser.add_option("--sigv4", action='store_true', help='Use SigV4AuthProvider plugin for autentication and authorization', default=DEFAULT_SIGV4)
47+
parser.add_option("--auth-provider", help="The AuthProvider to use when connecting. Default is PlainTextAuthProvider", dest='auth_provider_name', default=DEFAULT_AUTH_PROVIDER)
48+
parser.add_option("-u", "--username", help="Authenticate as user.")
49+
parser.add_option("-p", "--password", help="Authenticate using password.")
50+
parser.add_option('--ssl', action='store_true', help='Use SSL', default=False)
51+
52+
optvalues = optparse.Values()
53+
(options, arguments) = parser.parse_args(sys.argv[1:], values=optvalues)
54+
endpoint = str(options.host).replace("'", '')
55+
ks = str((options.table).split('.')[0]).replace("'", '')
56+
tb = str((options.table).split('.')[1]).replace("'", '')
57+
58+
if hasattr(options, 'username'):
59+
username = str(options.username)
60+
61+
if hasattr(options, 'password'):
62+
password = str(options.password)
63+
64+
user = getpass.getuser()
65+
user_dir = os.path.expanduser('~'+user)
66+
config_dir = os.path.join( user_dir, '.cassandra')
67+
cert_dir = os.path.join(config_dir, 'sf-class2-root.crt')
68+
truncate_log = os.path.join(config_dir, 'truncate')
69+
70+
#now we will Create and configure logger
71+
logging.basicConfig(filename=truncate_log,
72+
format='%(asctime)s %(message)s',
73+
filemode='a')
74+
#Let us Create an object
75+
logger=logging.getLogger()
76+
77+
#Now we are going to Set the threshold of logger to INFO
78+
logger.setLevel(logging.INFO)
79+
80+
ssl_context = SSLContext(PROTOCOL_TLSv1_2)
81+
ssl_context.load_verify_locations(cert_dir) # change the location to access the cert file
82+
ssl_context.verify_mode = CERT_REQUIRED
83+
84+
'''
85+
The following connection example uses the SigV4Athentication plugin for the datastax python driver.
86+
This allows you to use user roles and service roles to delegate authentication and authorization to
87+
Amazon Keyspaces resources and actions.
88+
Documentation for the sigv4connection can be found here:
89+
https://docs.aws.amazon.com/keyspaces/latest/devguide/using_python_driver.html
90+
Github example can be found here: https://github.com/aws/aws-sigv4-auth-cassandra-python-driver-plugin
91+
'''
92+
## Credntial Configuration https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html
93+
## https://docs.aws.amazon.com/keyspaces/latest/devguide/programmatic.endpoints.html
94+
95+
if hasattr(options, 'sigv4'):
96+
my_session = boto3.session.Session()
97+
auth_provider = SigV4AuthProvider(my_session)
98+
elif hasattr(options, 'auth_provider_name'):
99+
if options.auth_provider_name == 'SigV4AuthProvider':
100+
my_session = boto3.session.Session()
101+
auth_provider = SigV4AuthProvider(my_session)
102+
elif options.auth_provider_name == DEFAULT_AUTH_PROVIDER:
103+
if username:
104+
if not hasattr(options, 'password'):
105+
password = getpass.getpass()
106+
auth_provider = PlainTextAuthProvider(username=username, password=password)
107+
else:
108+
auth_provider = PlainTextAuthProvider(username=username, password=password)
109+
else:
110+
raise SyntaxError('cqlsh-expansion.py Invalid parameter for auth-provider. "%s" is not a valid AuthProvider' % (auth_provider,))
111+
else:
112+
if username:
113+
if not hasattr(options, 'password'):
114+
password = getpass.getpass()
115+
auth_provider = PlainTextAuthProvider(username=username, password=password)
116+
else:
117+
auth_provider = PlainTextAuthProvider(username=username, password=password)
118+
119+
profile = ExecutionProfile(request_timeout= None, row_factory=dict_factory)
120+
121+
cluster = Cluster( [endpoint], ssl_context=ssl_context, auth_provider=auth_provider,
122+
port=9142, execution_profiles={EXEC_PROFILE_DEFAULT: profile})
123+
session = cluster.connect()
124+
125+
# Retrieves the schema of the table
126+
pk= {} # To store partition keys of the tables
127+
cl ={} # To store clustering columns of the tables
128+
cl_order ={} # To store clustering order of the tables
129+
columns= session.execute(("select column_name, type, kind, position, clustering_order from system_schema.columns WHERE keyspace_name = %s AND table_name = %s"), (ks,tb))
130+
# Verifying if table exists or not
131+
if columns.one() is None:
132+
sys.exit("No such table exists to truncate")
133+
134+
# Recreating table schema
135+
schema_tbl = 'CREATE TABLE ' + ks + '.' + tb + ' ('
136+
137+
for item in columns:
138+
schema_tbl += item['column_name'] +' '+ item['type']
139+
if item['kind'] == 'partition_key':
140+
pk[item['position']]= item['column_name']
141+
elif item['kind'] == 'clustering':
142+
cl[item['position']]= item['column_name']
143+
cl_order [item['column_name']] = item['clustering_order']
144+
elif item['kind'] == 'static':
145+
schema_tbl += ' '+ item['kind']
146+
schema_tbl = schema_tbl + ', '
147+
148+
def looper(schema_str, keys):
149+
for i in sorted(keys):
150+
schema_str += str(keys[i]) +','
151+
return schema_str
152+
153+
schema_cl_keys=''
154+
schema_pk_keys = 'PRIMARY KEY (( '
155+
schema_pk_keys = looper(schema_pk_keys, pk)
156+
sch_par_keys = schema_pk_keys.rstrip(schema_pk_keys[-1])
157+
schema_cl_order=''
158+
159+
if any(cl):
160+
schema_cl_keys = looper(schema_cl_keys, cl)
161+
schema_cl_keys = ', ' + schema_cl_keys.rstrip(schema_cl_keys[-1])
162+
schema_cl_order = ' WITH CLUSTERING ORDER BY ('
163+
for i in cl_order:
164+
schema_cl_order += str(i) + ' ' + str(cl_order[i]) + ' ,'
165+
schema_cl_order = schema_cl_order.rstrip(schema_cl_order[-1]) + ')'
166+
recreate_table = schema_tbl + sch_par_keys + ')' + schema_cl_keys + '))' + schema_cl_order
167+
if any(cl):
168+
recreate_table += ' AND'
169+
else:
170+
recreate_table += ' with'
171+
172+
# Removes the unicode(u) after converting to string
173+
def unicode_fixer(obj):
174+
return str(obj).replace("u'", "'")
175+
176+
# Retrieves the properties of the table such as TTL, Comments, Provisioning, Encryption
177+
cust_set = session.execute(("select custom_properties from system_schema_mcs.tables WHERE keyspace_name = %s AND table_name = %s"), (ks,tb)).one()
178+
prop_set = session.execute(("select comment, default_time_to_live from system_schema_mcs.tables WHERE keyspace_name = %s AND table_name = %s"), (ks,tb)).one()
179+
180+
if prop_set['comment']:
181+
ttl_comm = ' default_time_to_live = ' + unicode_fixer(prop_set['default_time_to_live']) + ' AND comment = ' + '\'' + unicode_fixer(prop_set['comment']) + '\''
182+
else:
183+
ttl_comm = ' default_time_to_live = ' + unicode_fixer(prop_set['default_time_to_live'])
184+
185+
capacity_mode = dict(cust_set['custom_properties']['capacity_mode'])
186+
187+
capacity_mode.pop('last_update_to_pay_per_request_timestamp', None)
188+
pitr_status = dict(cust_set['custom_properties']['point_in_time_recovery'])
189+
pitr_status.pop('earliest_restorable_timestamp',None)
190+
191+
encry_st = cust_set['custom_properties']['encryption_specification']
192+
cust_prop = '{ \'capacity_mode\':' + unicode_fixer(capacity_mode) + ',' + ' \'point_in_time_recovery\':' + unicode_fixer(pitr_status) + ',' + ' \'encryption_specification\':'+ unicode_fixer(encry_st) +'}'
193+
194+
# Retrieve the tags associated with the table
195+
tags_result_set = session.execute(("SELECT * FROM system_schema_mcs.tags WHERE keyspace_name = %s AND resource_name = %s "), (ks,tb))
196+
tags_data = tags_result_set.one()
197+
tags_sch = unicode_fixer(tags_data['tags'])
198+
199+
# Building the schema to recreate the table
200+
final_schema = recreate_table + ttl_comm + ' AND TAGS = ' + tags_sch + ' AND CUSTOM_PROPERTIES = ' + cust_prop
201+
202+
# Printing final schema to log as backup for scenarios when table was dropped and unable to recreate succesfully
203+
logger.info(final_schema)
204+
205+
# execute the command check the status of the table. If no exception this represents acknowledgement that the service has received the request.
206+
def DDL_status(resultset):
207+
try:
208+
results = resultset.result()
209+
status = session.execute(('SELECT keyspace_name, table_name, status FROM system_schema_mcs.tables WHERE keyspace_name = %s AND table_name = %s'), (ks,tb))
210+
if not status:
211+
return True;
212+
else :
213+
return True if status.one()['status'] == 'ACTIVE' else False
214+
except Exception as execp:
215+
print (' Exception Encountered :', execp)
216+
217+
# Time loop function to wait for the table status as commands are executed asynchronously
218+
def duration_loop(t_end, ddl):
219+
while time.time() < t_end:
220+
status = DDL_status(ddl)
221+
if status == True:
222+
return True;
223+
else:
224+
continue
225+
drp_tbl_state = 'DROP TABLE ' + ks + '.' + tb
226+
# Executing the drop table async statement
227+
logger.info("Drop Table Initiated")
228+
dp_tbl = session.execute_async(drp_tbl_state)
229+
dp_time = time.time() + 60 * 15 # hard coded to 15 mins from now can increase or decrease time based on need
230+
dp_status = duration_loop(dp_time, dp_tbl)
231+
time.sleep(30) # sleeping for 30 sec to make sure metadata is updated after table deletion
232+
233+
if dp_status == True :
234+
# Executing the create table async statement
235+
logger.info("ReCreating Table Initiated")
236+
cr_tbl = session.execute_async(final_schema)
237+
crt_time = time.time() + 60 * 15 # hard coded to 15 mins from now can increase or decrease time based on need
238+
crt_status = duration_loop(crt_time, cr_tbl)
239+
240+
if crt_status:
241+
print("Table Truncate is completed")

0 commit comments

Comments
 (0)