Skip to content
This repository was archived by the owner on Jun 30, 2025. It is now read-only.

Commit 5c0e39a

Browse files
author
Carter Shanklin
committed
Let user provide the namespace so we don't look it up (which requires
extra privileges)
1 parent dc106a9 commit 5c0e39a

File tree

1 file changed

+39
-19
lines changed

1 file changed

+39
-19
lines changed

scripts/plugins/operators/oci_object_storage.py

Lines changed: 39 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ class MakeBucket(BaseOperator):
3232
:type bucket_name: str
3333
:param compartment_ocid: Compartment ID
3434
:type compartment_id: str
35+
:param namespace_name: Object storage namespace
36+
:type namespace_name: str
3537
:param oci_conn_id: Airflow connection ID
3638
:type oci_conn_id: str
3739
"""
@@ -41,13 +43,15 @@ def __init__(
4143
self,
4244
bucket_name: str,
4345
compartment_ocid: str,
46+
namespace_name: Optional[str] = None,
4447
oci_conn_id: Optional[str] = "oci_default",
4548
*args,
4649
**kwargs
4750
) -> None:
4851
super().__init__(*args, **kwargs)
4952
self.bucket_name = bucket_name
5053
self.compartment_id = compartment_ocid
54+
self.namespace_name = namespace_name
5155
self.oci_conn_id = oci_conn_id
5256
self._oci_hook = None
5357
self.oci_client = oci.object_storage.ObjectStorageClient
@@ -58,17 +62,18 @@ def execute(self, context, **kwargs):
5862
client = self._oci_hook.get_client(self.oci_client)
5963
self.log.info("Validating OCI Config")
6064
self._oci_hook.validate_config()
61-
namespace = self._oci_hook.get_namespace()
65+
if not self.namespace_name:
66+
self.namespace_name = self._oci_hook.get_namespace()
6267
details = oci.object_storage.models.CreateBucketDetails(
6368
compartment_id=self.compartment_id, name=self.bucket_name
6469
)
6570
self.log.info("Checking if Bucket {} exists".format(self.bucket_name))
66-
bucket_exists = self._oci_hook.check_for_bucket(namespace_name=namespace, bucket_name=self.bucket_name)
71+
bucket_exists = self._oci_hook.check_for_bucket(namespace_name=self.namespace_name, bucket_name=self.bucket_name)
6772
if bucket_exists is True:
6873
self.log.info("Bucket {0} exists, skipping creation".format(self.bucket_name))
6974
else:
70-
self.log.info("Creating Bucket {0} in {1}".format(self.bucket_name, namespace))
71-
client.create_bucket(namespace_name=namespace, create_bucket_details=details, **kwargs)
75+
self.log.info("Creating Bucket {0} in {1}".format(self.bucket_name, self.namespace_name))
76+
client.create_bucket(namespace_name=self.namespace_name, create_bucket_details=details, **kwargs)
7277
self.log.info("Create bucket complete")
7378

7479

@@ -84,6 +89,8 @@ class CopyFileToOCIObjectStorageOperator(BaseOperator):
8489
:type object_name: str
8590
:param local_file_path: Path to local file
8691
:type local_file_path: str
92+
:param namespace_name: Object storage namespace
93+
:type namespace_name: str
8794
:param oci_conn_id: Airflow connection ID
8895
:type oci_conn_id: str
8996
"""
@@ -95,13 +102,15 @@ def __init__(
95102
compartment_ocid: str,
96103
object_name: str,
97104
local_file_path: str,
105+
namespace_name: Optional[str] = None,
98106
oci_conn_id: Optional[str] = "oci_default",
99107
*args,
100108
**kwargs
101109
) -> None:
102110
super().__init__(*args, **kwargs)
103111
self.bucket_name = bucket_name
104112
self.compartment_id = compartment_ocid
113+
self.namespace_name = namespace_name
105114
self.object_name = object_name
106115
self.local_file_path = local_file_path
107116
self.oci_conn_id = oci_conn_id
@@ -114,20 +123,21 @@ def execute(self, context, **kwargs):
114123
client = self._oci_hook.get_client(self.oci_client)
115124
self.log.info("Validating OCI Config")
116125
self._oci_hook.validate_config()
117-
namespace = self._oci_hook.get_namespace()
126+
if not self.namespace_name:
127+
self.namespace_name = self._oci_hook.get_namespace()
118128
details = oci.object_storage.models.CreateBucketDetails(
119129
compartment_id=self.compartment_id, name=self.bucket_name
120130
)
121131
self.log.info("Checking if Bucket {} exists".format(self.bucket_name))
122-
bucket_exists = self._oci_hook.check_for_bucket(namespace_name=namespace, bucket_name=self.bucket_name)
132+
bucket_exists = self._oci_hook.check_for_bucket(namespace_name=self.namespace_name, bucket_name=self.bucket_name)
123133
if bucket_exists is True:
124134
self.log.info("Bucket {0} exists, skipping creation".format(self.bucket_name))
125135
else:
126-
self.log.info("Creating Bucket {0} in {1}".format(self.bucket_name, namespace))
127-
client.create_bucket(namespace_name=namespace, create_bucket_details=details)
136+
self.log.info("Creating Bucket {0} in {1}".format(self.bucket_name, self.namespace_name))
137+
client.create_bucket(namespace_name=self.namespace_name, create_bucket_details=details)
128138
self.log.info("Create bucket complete")
129139
self.log.info("Checking if {0} exists in {1}".format(self.object_name, self.bucket_name))
130-
object_exists = self._oci_hook.check_for_object(namespace_name=namespace, bucket_name=self.bucket_name,
140+
object_exists = self._oci_hook.check_for_object(namespace_name=self.namespace_name, bucket_name=self.bucket_name,
131141
object_name=self.object_name)
132142
if object_exists is True:
133143
self.log.info("Object {0} exists already in {1}".format(self.object_name, self.bucket_name))
@@ -139,7 +149,7 @@ def execute(self, context, **kwargs):
139149
self.log.info("Copying {0} to {1}".format(self.local_file, self.bucket_name))
140150
self.put_object_body = open(self.local_file, 'rb')
141151
self._oci_hook.copy_to_bucket(bucket_name=self.bucket_name,
142-
namespace_name=namespace,
152+
namespace_name=self.namespace_name,
143153
object_name=self.object_name,
144154
put_object_body=self.put_object_body, **kwargs)
145155
else:
@@ -160,6 +170,8 @@ class CopyToOCIObjectStorageOperator(BaseOperator):
160170
:type object_name: str
161171
:param put_object_body: Contents of object_name
162172
:type put_object_body: stream
173+
:param namespace_name: Object storage namespace
174+
:type namespace_name: str
163175
:param oci_conn_id: Airflow connection ID
164176
:type oci_conn_id: str
165177
"""
@@ -171,13 +183,15 @@ def __init__(
171183
compartment_ocid: str,
172184
object_name: str,
173185
put_object_body: str,
186+
namespace_name: Optional[str] = None,
174187
oci_conn_id: Optional[str] = "oci_default",
175188
*args,
176189
**kwargs
177190
) -> None:
178191
super().__init__(*args, **kwargs)
179192
self.bucket_name = bucket_name
180193
self.compartment_id = compartment_ocid
194+
self.namespace_name = namespace_name
181195
self.object_name = object_name
182196
self.put_object_body = put_object_body
183197
self.oci_conn_id = oci_conn_id
@@ -190,26 +204,27 @@ def execute(self, context, **kwargs):
190204
client = self._oci_hook.get_client(self.oci_client)
191205
self.log.info("Validating OCI Config")
192206
self._oci_hook.validate_config()
193-
namespace = self._oci_hook.get_namespace()
207+
if not self.namespace_name:
208+
self.namespace_name = self._oci_hook.get_namespace()
194209
details = oci.object_storage.models.CreateBucketDetails(
195210
compartment_id=self.compartment_id, name=self.bucket_name
196211
)
197212
self.log.info("Checking if Bucket {} exists".format(self.bucket_name))
198-
bucket_exists = self._oci_hook.check_for_bucket(namespace_name=namespace, bucket_name=self.bucket_name)
213+
bucket_exists = self._oci_hook.check_for_bucket(namespace_name=self.namespace_name, bucket_name=self.bucket_name)
199214
if bucket_exists is True:
200215
self.log.info("Bucket {0} exists, skipping creation".format(self.bucket_name))
201216
else:
202-
self.log.info("Creating Bucket {0} in {1}".format(self.bucket_name, namespace))
203-
client.create_bucket(namespace_name=namespace, create_bucket_details=details)
217+
self.log.info("Creating Bucket {0} in {1}".format(self.bucket_name, self.namespace_name))
218+
client.create_bucket(namespace_name=self.namespace_name, create_bucket_details=details)
204219
self.log.info("Create bucket complete")
205220
self.log.info("Checking if {0} exists in {1}".format(self.object_name, self.bucket_name))
206-
object_exists = self._oci_hook.check_for_object(namespace_name=namespace, bucket_name=self.bucket_name,
221+
object_exists = self._oci_hook.check_for_object(namespace_name=self.namespace_name, bucket_name=self.bucket_name,
207222
object_name=self.object_name)
208223
if object_exists is True:
209224
self.log.info("Object {0} exists already in {1}".format(self.object_name, self.bucket_name))
210225
else:
211226
self.log.info("Copying {0} to {1}".format(self.object_name, self.bucket_name))
212-
self._oci_hook.copy_to_bucket(bucket_name=self.bucket_name, namespace_name=namespace,
227+
self._oci_hook.copy_to_bucket(bucket_name=self.bucket_name, namespace_name=self.namespace_name,
213228
object_name=self.object_name, put_object_body=self.put_object_body, **kwargs)
214229

215230

@@ -225,6 +240,8 @@ class CopyFromOCIObjectStorage(BaseOperator):
225240
:type object_name: str
226241
:param put_object_body: Contents of object_name
227242
:type put_object_body: stream
243+
:param namespace_name: Object storage namespace
244+
:type namespace_name: str
228245
:param oci_conn_id: Airflow connection ID
229246
:type oci_conn_id: str
230247
"""
@@ -234,13 +251,15 @@ def __init__(
234251
bucket_name: str,
235252
compartment_id: str,
236253
object_name: str,
254+
namespace_name: Optional[str] = None,
237255
oci_conn_id: Optional[str] = "oci_default",
238256
*args,
239257
**kwargs
240258
) -> None:
241259
super().__init__(*args, **kwargs)
242260
self.bucket_name = bucket_name
243261
self.compartment_id = compartment_id
262+
self.namespace_name = namespace_name
244263
self.object_name = object_name
245264
self.oci_conn_id = oci_conn_id
246265
self._oci_hook = None
@@ -252,13 +271,14 @@ def execute(self, context, **kwargs):
252271
client = self._oci_hook.get_client(self.oci_client)
253272
self.log.info("Validating OCI Config")
254273
self._oci_hook.validate_config()
255-
namespace = self._oci_hook.get_namespace()
274+
if not self.namespace_name:
275+
self.namespace_name = self._oci_hook.get_namespace()
256276
self.log.info("Checking if {0} exists in {1}".format(self.object_name, self.bucket_name))
257-
object_exists = self._oci_hook.check_for_object(namespace_name=namespace, bucket_name=self.bucket_name,
277+
object_exists = self._oci_hook.check_for_object(namespace_name=self.namespace_name, bucket_name=self.bucket_name,
258278
object_name=self.object_name, **kwargs)
259279
if object_exists is True:
260280
self.log.info("Reading {0} from {1}".format(self.object_name, self.bucket_name))
261-
return client.get_object(namespace_name=namespace, object_name=self.object_name,
281+
return client.get_object(namespace_name=self.namespace_name, object_name=self.object_name,
262282
bucket_name=self.bucket_name, **kwargs)
263283
else:
264284
raise AirflowException("{0} does not exist in {1}".format(self.object_name, self.bucket_name))

0 commit comments

Comments
 (0)