1- //TODO rebase use super::udf_xirr::XirrAccumulator;
2- use crate :: queryplanner:: coalesce:: SUPPORTED_COALESCE_TYPES ;
31use crate :: queryplanner:: hll:: { Hll , HllUnion } ;
42use crate :: queryplanner:: info_schema:: timestamp_nanos_or_panic;
53use crate :: queryplanner:: udf_xirr:: { XirrUDF , XIRR_UDAF_NAME } ;
@@ -32,6 +30,7 @@ pub fn registerable_scalar_udfs_iter() -> impl Iterator<Item = ScalarUDF> {
3230 ScalarUDF :: new_from_impl ( DateAddSub :: new_sub ( ) ) ,
3331 ScalarUDF :: new_from_impl ( UnixTimestamp :: new ( ) ) ,
3432 ScalarUDF :: new_from_impl ( ConvertTz :: new ( ) ) ,
33+ ScalarUDF :: new_from_impl ( Now :: new ( ) ) ,
3534 ]
3635 . into_iter ( )
3736}
@@ -72,62 +71,74 @@ pub fn aggregate_kind_by_name(n: &str) -> Option<CubeAggregateUDFKind> {
7271// TODO: add custom type and use it instead of `Binary` for HLL columns.
7372
7473#[ derive( Debug ) ]
75- struct UnixTimestamp {
74+ struct Now {
7675 signature : Signature ,
7776}
7877
79- struct Now { }
8078impl Now {
79+ fn new ( ) -> Self {
80+ Now {
81+ signature : Self :: signature ( ) ,
82+ }
83+ }
84+
8185 fn signature ( ) -> Signature {
82- Signature :: Exact ( Vec :: new ( ) )
86+ Signature :: exact ( Vec :: new ( ) , Volatility :: Stable )
8387 }
8488}
85- /* TODO rebase - reimplement for new interface
86- impl CubeScalarUDF for Now {
87- fn kind(&self) -> CubeScalarUDFKind {
88- CubeScalarUDFKind::Now
89- }
9089
90+ impl ScalarUDFImpl for Now {
9191 fn name ( & self ) -> & str {
9292 "NOW"
9393 }
9494
95- fn descriptor(&self) -> ScalarUDF {
96- ScalarUDF {
97- name: self.name().to_string(),
98- signature: Self::signature(),
99- return_type: Arc::new(|inputs| {
100- assert!(inputs.is_empty());
101- Ok(Arc::new(DataType::Timestamp(TimeUnit::Nanosecond, None)))
102- }),
103- fun: Arc::new(|_| {
104- let t = match SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) {
105- Ok(t) => t,
106- Err(e) => {
107- return Err(DataFusionError::Internal(format!(
108- "Failed to get current timestamp: {}",
109- e
110- )))
111- }
112- };
113-
114- let nanos = match i64::try_from(t.as_nanos()) {
115- Ok(t) => t,
116- Err(e) => {
117- return Err(DataFusionError::Internal(format!(
118- "Failed to convert timestamp to i64: {}",
119- e
120- )))
121- }
122- };
95+ fn as_any ( & self ) -> & dyn Any {
96+ self
97+ }
12398
124- Ok(ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
125- Some(nanos),
99+ fn signature ( & self ) -> & Signature {
100+ & self . signature
101+ }
102+
103+ fn return_type ( & self , _arg_types : & [ DataType ] ) -> datafusion:: common:: Result < DataType > {
104+ Ok ( DataType :: Timestamp ( TimeUnit :: Nanosecond , None ) )
105+ }
106+
107+ fn invoke_with_args (
108+ & self ,
109+ _args : datafusion:: logical_expr:: ScalarFunctionArgs ,
110+ ) -> datafusion:: error:: Result < ColumnarValue > {
111+ let t = match SystemTime :: now ( ) . duration_since ( SystemTime :: UNIX_EPOCH ) {
112+ Ok ( t) => t,
113+ Err ( e) => {
114+ return Err ( DataFusionError :: Internal ( format ! (
115+ "Failed to get current timestamp: {}" ,
116+ e
126117 ) ) )
127- }),
128- }
118+ }
119+ } ;
120+
121+ let nanos = match i64:: try_from ( t. as_nanos ( ) ) {
122+ Ok ( t) => t,
123+ Err ( e) => {
124+ return Err ( DataFusionError :: Internal ( format ! (
125+ "Failed to convert timestamp to i64: {}" ,
126+ e
127+ ) ) )
128+ }
129+ } ;
130+
131+ Ok ( ColumnarValue :: Scalar ( ScalarValue :: TimestampNanosecond (
132+ Some ( nanos) ,
133+ None ,
134+ ) ) )
129135 }
130- } */
136+ }
137+
138+ #[ derive( Debug ) ]
139+ struct UnixTimestamp {
140+ signature : Signature ,
141+ }
131142
132143impl UnixTimestamp {
133144 pub fn new ( ) -> Self {
@@ -145,40 +156,6 @@ impl ScalarUDFImpl for UnixTimestamp {
145156 "unix_timestamp"
146157 }
147158
148- /* TODO rebase - reimplement for new interface
149- * fn descriptor(&self) -> ScalarUDF {
150- ScalarUDF {
151- name: self.name().to_string(),
152- signature: Self::signature(),
153- return_type: Arc::new(|inputs| {
154- assert!(inputs.is_empty());
155- Ok(Arc::new(DataType::Int64))
156- }),
157- fun: Arc::new(|_| {
158- let t = match SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) {
159- Ok(t) => t,
160- Err(e) => {
161- return Err(DataFusionError::Internal(format!(
162- "Failed to get current timestamp: {}",
163- e
164- )))
165- }
166- };
167-
168- let seconds = match i64::try_from(t.as_secs()) {
169- Ok(t) => t,
170- Err(e) => {
171- return Err(DataFusionError::Internal(format!(
172- "Failed to convert timestamp to i64: {}",
173- e
174- )))
175- }
176- };
177-
178- Ok(ColumnarValue::Scalar(ScalarValue::Int64(Some(seconds))))
179- }),
180- }
181- }*/
182159 fn as_any ( & self ) -> & dyn Any {
183160 self
184161 }
@@ -191,16 +168,31 @@ impl ScalarUDFImpl for UnixTimestamp {
191168 Ok ( DataType :: Int64 )
192169 }
193170
194- fn invoke ( & self , _args : & [ ColumnarValue ] ) -> datafusion:: common:: Result < ColumnarValue > {
195- Err ( DataFusionError :: Internal (
196- "UNIX_TIMESTAMP() was not optimized away" . to_string ( ) ,
197- ) )
198- }
171+ fn invoke_with_args (
172+ & self ,
173+ _args : datafusion:: logical_expr:: ScalarFunctionArgs ,
174+ ) -> datafusion:: error:: Result < ColumnarValue > {
175+ let t = match SystemTime :: now ( ) . duration_since ( SystemTime :: UNIX_EPOCH ) {
176+ Ok ( t) => t,
177+ Err ( e) => {
178+ return Err ( DataFusionError :: Internal ( format ! (
179+ "Failed to get current timestamp: {}" ,
180+ e
181+ ) ) )
182+ }
183+ } ;
199184
200- fn invoke_no_args ( & self , _number_rows : usize ) -> datafusion:: common:: Result < ColumnarValue > {
201- Err ( DataFusionError :: Internal (
202- "UNIX_TIMESTAMP() was not optimized away" . to_string ( ) ,
203- ) )
185+ let seconds = match i64:: try_from ( t. as_secs ( ) ) {
186+ Ok ( t) => t,
187+ Err ( e) => {
188+ return Err ( DataFusionError :: Internal ( format ! (
189+ "Failed to convert timestamp to i64: {}" ,
190+ e
191+ ) ) )
192+ }
193+ } ;
194+
195+ Ok ( ColumnarValue :: Scalar ( ScalarValue :: Int64 ( Some ( seconds) ) ) )
204196 }
205197
206198 fn simplify (
0 commit comments