diff --git a/src/reporter/reporter.py b/src/reporter/reporter.py index a3388fe6..a478ffa2 100644 --- a/src/reporter/reporter.py +++ b/src/reporter/reporter.py @@ -32,7 +32,7 @@ from geocoding.factory import get_geo_cache, is_geo_coding_available from requests import RequestException from translators.sql_translator import SQLTranslator -from utils.common import iter_entity_attrs, TIME_INDEX_NAME +from utils.common import iter_entity_attrs, TIME_INDEX_NAME, TIME_INDEX_ATTRIBUTE_NAME import json import logging import requests @@ -42,7 +42,7 @@ from exceptions.exceptions import NGSIUsageError, InvalidParameterValue, InvalidHeaderValue from wq.ql.notify import InsertAction from reporter.httputil import fiware_correlator, fiware_s, fiware_sp - +from reporter.timex import select_time_index_attr def log(): logger = logging.getLogger(__name__) @@ -133,6 +133,7 @@ def _filter_empty_entities(payload): def _filter_no_type_no_value_entities(payload): attrs = list(iter_entity_attrs(payload)) attrs.remove('time_index') + attrs.remove('time_index_attribute') for i in attrs: attr = payload.get(i, {}) try: @@ -143,7 +144,6 @@ def _filter_no_type_no_value_entities(payload): # remove attributes without value or type except Exception as e: del payload[i] - return payload @@ -170,6 +170,10 @@ def notify(): custom_index = request.headers.get(TIME_INDEX_HEADER_NAME, None) entity[TIME_INDEX_NAME] = \ select_time_index_value_as_iso(custom_index, entity) + # Add TIME_INDEX_ATTRIBUTE_NAME + attr_name = iter_entity_attrs(entity) + entity[TIME_INDEX_ATTRIBUTE_NAME] = \ + select_time_index_attr(attr_name, entity) # Add GEO-DATE if enabled if not entity.get(LOCATION_ATTR_NAME, None): add_geodata(entity) diff --git a/src/reporter/timex.py b/src/reporter/timex.py index 584de775..7ed13fa6 100644 --- a/src/reporter/timex.py +++ b/src/reporter/timex.py @@ -5,6 +5,7 @@ from utils.timestr import latest_from_str_rep, to_datetime TIME_INDEX_HEADER_NAME = 'Fiware-TimeIndex-Attribute' +TIME_INDEX_ATTRIBUTE_NAME = 'time_index_attribute' MaybeString = Union[str, None] @@ -61,7 +62,6 @@ def time_index_priority_list( """ # Custom time index attribute yield to_datetime(_attribute(notification, custom_index)) - # The most recent custom time index metadata yield latest_from_str_rep(_iter_metadata(notification, custom_index)) @@ -104,7 +104,6 @@ def select_time_index_value(custom_index: str, notification: dict) -> datetime: be converted to a ``datetime``. Items are considered from top to bottom, so that if multiple values are present and they can all be converted to ``datetime``, the topmost value is chosen. - - Custom time index. The value of the ``TIME_INDEX_HEADER_NAME``. Note that for a notification to contain such header, the corresponding subscription has to be created with an ``httpCustom`` block as detailed @@ -124,7 +123,6 @@ def select_time_index_value(custom_index: str, notification: dict) -> datetime: - Current time. This is the default value we use if any of the above isn't present or none of the values found can actually be converted to a ``datetime``. - :param custom_index: name of the custom_index (if requested, None otherwise) :param notification: the notification JSON payload as received from Orion. @@ -136,10 +134,14 @@ def select_time_index_value(custom_index: str, notification: dict) -> datetime: custom_index, notification): if index_candidate: return index_candidate - - # use the current time as a last resort return current_time - + +def select_time_index_attr(attr_name:str, notification: dict): + attr_names = [] + for attr_name in iter_entity_attrs(notification): + if attr_name != 'time_index': + attr_names.append(attr_name) + return attr_names def select_time_index_value_as_iso(custom_index: str, notification: dict) -> \ str: diff --git a/src/translators/base_translator.py b/src/translators/base_translator.py index 4d367c2a..37530949 100644 --- a/src/translators/base_translator.py +++ b/src/translators/base_translator.py @@ -1,5 +1,4 @@ -from utils.common import TIME_INDEX_NAME - +from utils.common import TIME_INDEX_NAME, TIME_INDEX_ATTRIBUTE_NAME class BaseTranslator(object): """ @@ -11,7 +10,8 @@ class BaseTranslator(object): # Note: Some databases will restrict the possible names for tables and # columns. TIME_INDEX_NAME = TIME_INDEX_NAME - + TIME_INDEX_ATTRIBUTE_NAME = TIME_INDEX_ATTRIBUTE_NAME + def __init__(self, host, port, db_name): self.host = host self.port = port diff --git a/src/translators/crate.py b/src/translators/crate.py index 0aecc820..7591f5eb 100644 --- a/src/translators/crate.py +++ b/src/translators/crate.py @@ -11,7 +11,7 @@ from translators.errors import CrateErrorAnalyzer from translators.sql_translator import NGSI_ISO8601, NGSI_DATETIME, \ NGSI_GEOJSON, NGSI_GEOPOINT, NGSI_TEXT, NGSI_STRUCTURED_VALUE, \ - NGSI_LD_GEOMETRY, TIME_INDEX, METADATA_TABLE_NAME, FIWARE_SERVICEPATH + NGSI_LD_GEOMETRY, TIME_INDEX, TIME_INDEX_ATTRIBUTE, METADATA_TABLE_NAME, FIWARE_SERVICEPATH import logging from .crate_geo_query import from_ngsi_query from utils.cfgreader import EnvReader, StrVar, IntVar, FloatVar @@ -34,7 +34,8 @@ "Number": 'real', NGSI_TEXT: 'text', NGSI_STRUCTURED_VALUE: 'object', - TIME_INDEX: 'timestamptz' + TIME_INDEX: 'timestamptz', + TIME_INDEX_ATTRIBUTE: CRATE_ARRAY_STR } CRATE_TO_NGSI = dict((v, k) for (k, v) in NGSI_TO_SQL.items()) diff --git a/src/translators/sql_translator.py b/src/translators/sql_translator.py index 115136bb..cd53fe32 100644 --- a/src/translators/sql_translator.py +++ b/src/translators/sql_translator.py @@ -12,6 +12,7 @@ import dateutil.parser from typing import Any, List, Optional, Sequence from uuid import uuid4 +from reporter.timex import select_time_index_attr from cache.factory import get_cache, is_cache_available from translators.insert_splitter import to_insert_batches @@ -35,6 +36,7 @@ TENANT_PREFIX = 'mt' TYPE_PREFIX = 'et' TIME_INDEX = 'timeindex' +TIME_INDEX_ATTRIBUTE = 'time_index_attribute' VALID_AGGR_METHODS = ['count', 'sum', 'avg', 'min', 'max'] VALID_AGGR_PERIODS = ['year', 'month', 'day', 'hour', 'minute', 'second'] # The name of the column where we store the original JSON entity received @@ -63,7 +65,8 @@ NGSI_TEXT: 'text', # NOT all databases supports JSON objects NGSI_STRUCTURED_VALUE: 'text', - TIME_INDEX: 'timestamp WITH TIME ZONE NOT NULL' + TIME_INDEX: 'timestamp WITH TIME ZONE NOT NULL', + TIME_INDEX_ATTRIBUTE: 'array(string)' } @@ -269,18 +272,17 @@ def _insert_entities_of_type(self, "It should have been inserted by the 'Reporter'. {}" warnings.warn(msg.format(e)) e[self.TIME_INDEX_NAME] = current_timex() - if ORIGINAL_ENTITY_COL in e: raise ValueError( f"Entity {e[NGSI_ID]} has a reserved attribute name: " + "'{ORIGINAL_ENTITY_COL_NAME}'") - # Define column types # {column_name -> crate_column_type} table = { 'entity_id': self.NGSI_TO_SQL['Text'], 'entity_type': self.NGSI_TO_SQL['Text'], self.TIME_INDEX_NAME: self.NGSI_TO_SQL[TIME_INDEX], + self.TIME_INDEX_ATTRIBUTE_NAME: self.NGSI_TO_SQL[TIME_INDEX_ATTRIBUTE], FIWARE_SERVICEPATH: self.NGSI_TO_SQL['Text'], ORIGINAL_ENTITY_COL: self.NGSI_TO_SQL[NGSI_STRUCTURED_VALUE], 'instanceId': self.NGSI_TO_SQL['Text'] @@ -293,13 +295,13 @@ def _insert_entities_of_type(self, 'entity_id': (NGSI_ID, NGSI_TEXT), self.TIME_INDEX_NAME: (self.TIME_INDEX_NAME, NGSI_DATETIME), } - for e in entities: entity_id = e.get('id') for attr in iter_entity_attrs(e): if attr == self.TIME_INDEX_NAME: continue - + if attr == self.TIME_INDEX_ATTRIBUTE_NAME: + continue if isinstance(e[attr], dict) and 'type' in e[attr] \ and e[attr]['type'] != 'Property': attr_t = e[attr]['type'] @@ -481,6 +483,8 @@ def _preprocess_values(self, e, original_attrs, col_names, values.append(e['id']) elif cn == self.TIME_INDEX_NAME: values.append(e[self.TIME_INDEX_NAME]) + elif cn == self.TIME_INDEX_ATTRIBUTE_NAME: + values.append(e[self.TIME_INDEX_ATTRIBUTE_NAME]) elif cn == FIWARE_SERVICEPATH: values.append(fiware_servicepath or '') elif cn == 'instanceId': @@ -805,11 +809,10 @@ def _get_limit(self, limit, last_n): f"last_n should be >=1 and <= {default_limit}.") return min(last_n, limit) - def _get_where_clause(self, entity_ids, from_date, to_date, fiware_sp=None, + def _get_where_clause(self, attr_names, entity_ids, from_date, to_date, fiware_sp=None, geo_query=None, prefix=''): clauses = [] where_clause = "" - if entity_ids: ids = ",".join("'{}'".format(e) for e in entity_ids) clauses.append(" {}entity_id in ({}) ".format(prefix, ids)) @@ -819,7 +822,6 @@ def _get_where_clause(self, entity_ids, from_date, to_date, fiware_sp=None, if to_date: clauses.append(" {}{} <= '{}'".format(prefix, self.TIME_INDEX_NAME, self._parse_date(to_date))) - if fiware_sp: # Match prefix of fiware service path if fiware_sp == '/': @@ -832,13 +834,22 @@ def _get_where_clause(self, entity_ids, from_date, to_date, fiware_sp=None, else: # Match prefix of fiware service path clauses.append(" " + prefix + FIWARE_SERVICEPATH + " = ''") - # TODO implement prefix also for geo_clause geo_clause = self._get_geo_clause(geo_query) + attrs = '' + attrs_clauses = [] + if attr_names: + attrs_clauses.append(" and ") + for a in attr_names: + attrs = '\'' + a + '\'' + attrs_clauses.append(" " + attrs + " = any(time_index_attribute) or") + attrs_clauses.append(" time_index_attribute = NULL") + # TODO implement prefix also for geo_clause if geo_clause: clauses.append(geo_clause) if len(clauses) > 0: - where_clause = "where" + " and ".join(clauses) + where_clause = "where" + " and ".join(clauses) + where_clause = where_clause + "".join(attrs_clauses) return where_clause @staticmethod @@ -1078,7 +1089,8 @@ def query(self, aggr_method, aggr_period) if not where_clause: - where_clause = self._get_where_clause(entity_ids, + where_clause = self._get_where_clause(lower_attr_names, + entity_ids, from_date, to_date, fiware_servicepath, diff --git a/src/translators/tests/test_crate.py b/src/translators/tests/test_crate.py index 47f0c436..eae3530d 100644 --- a/src/translators/tests/test_crate.py +++ b/src/translators/tests/test_crate.py @@ -1,10 +1,9 @@ from translators.sql_translator import METADATA_TABLE_NAME, TYPE_PREFIX from conftest import crate_translator as translator, entity -from utils.common import TIME_INDEX_NAME +from utils.common import TIME_INDEX_NAME, TIME_INDEX_ATTRIBUTE_NAME from utils.tests.common import * from datetime import datetime, timezone - def test_db_version(translator): version = translator.get_db_version() major = int(version.split('.')[0]) @@ -17,6 +16,7 @@ def test_geo_point(translator): 'id': 'Room1', 'type': 'Room', TIME_INDEX_NAME: datetime.now(timezone.utc).isoformat(timespec='milliseconds'), + TIME_INDEX_ATTRIBUTE_NAME: ['temperature','pressure'], 'location': { 'type': 'geo:point', 'value': "19.6389474, -98.9109537" # lat, long @@ -47,6 +47,7 @@ def test_geo_point_null_values(translator): 'id': 'Room1', 'type': 'Room', TIME_INDEX_NAME: datetime.now(timezone.utc).isoformat(timespec='milliseconds'), + TIME_INDEX_ATTRIBUTE_NAME: ['temperature','pressure'], 'location': { 'type': 'geo:point', 'value': "19.6389474, -98.9109537" # lat, long @@ -63,6 +64,7 @@ def test_geo_point_null_values(translator): TIME_INDEX_NAME: datetime.now( timezone.utc).isoformat( timespec='milliseconds'), + TIME_INDEX_ATTRIBUTE_NAME: ['temperature','pressure'], 'temperature': { 'type': 'Number', 'value': 19}} diff --git a/src/translators/tests/test_insert.py b/src/translators/tests/test_insert.py index d312d658..44a21e80 100644 --- a/src/translators/tests/test_insert.py +++ b/src/translators/tests/test_insert.py @@ -253,6 +253,7 @@ def test_insert_entity(translator, entity): now = datetime.now(timezone.utc) now_iso = now.isoformat(timespec='milliseconds') entity[BaseTranslator.TIME_INDEX_NAME] = now_iso + entity[BaseTranslator.TIME_INDEX_ATTRIBUTE_NAME] = ['temperature','pressure'] result = translator.insert([entity]) assert result.rowcount != 0 @@ -274,7 +275,7 @@ def test_insert_same_entity_with_different_attrs( # of temperature. for entity in sameEntityWithDifferentAttrs: entity[BaseTranslator.TIME_INDEX_NAME] = entity['temperature']['metadata']['dateModified']['value'] - + entity[BaseTranslator.TIME_INDEX_ATTRIBUTE_NAME] = ['temperature','pressure'] result = translator.insert(sameEntityWithDifferentAttrs) assert result.rowcount != 0 @@ -521,6 +522,7 @@ def test_accept_unknown_ngsi_type(translator): TIME_INDEX_NAME: datetime.now( timezone.utc).isoformat( timespec='milliseconds'), + TIME_INDEX_ATTRIBUTE_NAME: ['temperature','pressure'], "address": { "type": "PostalAddress", "value": { @@ -549,6 +551,7 @@ def test_accept_special_chars(translator): TIME_INDEX_NAME: datetime.now( timezone.utc).isoformat( timespec='milliseconds'), + TIME_INDEX_ATTRIBUTE_NAME: ['temperature','pressure'], "address": { "type": "Address-Type", "value": { @@ -572,6 +575,7 @@ def test_missing_type_defaults_to_string(translator): TIME_INDEX_NAME: datetime.now( timezone.utc).isoformat( timespec='milliseconds'), + TIME_INDEX_ATTRIBUTE_NAME: ['temperature','pressure'], "foo": { "value": "BaR", }, @@ -590,7 +594,8 @@ def test_missing_type_defaults_to_string(translator): def test_capitals(translator): entity_type = "SoMeWeIrDtYpE" e1 = { - "type": entity_type, "id": "sOmEwEiRdId", TIME_INDEX_NAME: datetime.now( + "type": entity_type, "id": "sOmEwEiRdId",TIME_INDEX_ATTRIBUTE_NAME: + ['temperature','pressure'], TIME_INDEX_NAME: datetime.now( timezone.utc).isoformat( timespec='milliseconds'), "Foo": { "type": "Text", "value": "FoO", }, "bAr": { @@ -606,7 +611,7 @@ def test_capitals(translator): e2['NewAttr'] = {"type": "Text", "value": "NewAttrValue!"} e2[TIME_INDEX_NAME] = datetime.now( timezone.utc).isoformat(timespec='milliseconds') - + e2[TIME_INDEX_ATTRIBUTE_NAME]: ['temperature','pressure'] translator.insert([e2]) entities, err = translator.query() assert len(entities) == 2 @@ -646,6 +651,7 @@ def test_long_json(translator): big_entity = { 'id': 'entityId1', 'type': 'type1', + TIME_INDEX_ATTRIBUTE_NAME: ['temperature','pressure'], TIME_INDEX_NAME: datetime.now( timezone.utc).isoformat( timespec='milliseconds'), @@ -666,6 +672,7 @@ def test_structured_value_to_array(translator): entity = { 'id': '8906', 'type': 'AirQualityObserved', + TIME_INDEX_ATTRIBUTE_NAME: ['temperature','pressure'], TIME_INDEX_NAME: datetime.now(timezone.utc).isoformat(timespec='milliseconds'), 'aqi': {'type': 'Number', 'value': 43}, 'city': {'type': 'Text', 'value': 'Antwerpen'}, @@ -699,6 +706,7 @@ def test_ISO8601(translator): e = { "type": "MyType", "id": "MyId", + TIME_INDEX_ATTRIBUTE_NAME: ['temperature','pressure'], TIME_INDEX_NAME: datetime.now( timezone.utc).isoformat( timespec='milliseconds'), diff --git a/src/utils/common.py b/src/utils/common.py index dea6c5e3..3a712528 100644 --- a/src/utils/common.py +++ b/src/utils/common.py @@ -1,5 +1,5 @@ TIME_INDEX_NAME = 'time_index' - +TIME_INDEX_ATTRIBUTE_NAME = 'time_index_attribute' def entity_pk(entity): """