diff --git a/docker-compose.yml b/docker-compose.yml index 2c8a32b..6f75d0a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -290,12 +290,14 @@ services: - AWS_ACCESS_KEY_ID=${MINIO_ROOT_USER} - AWS_SECRET_ACCESS_KEY=${MINIO_ROOT_PASSWORD} - AWS_JAVA_V1_DISABLE_DEPRECATION_ANNOUNCEMENT=true + - PYSPARK_PYTHON=/opt/python/bin/python ports: - "8091:8091" extra_hosts: - "localhost:host-gateway" volumes: - ./infra/airflow/processing/spark/jobs:/opt/spark/jobs + - ./notebooks:/home/jovyan/work healthcheck: test: ["CMD", "curl", "-f", "http://spark-master:8080"] interval: 30s @@ -328,12 +330,14 @@ services: - AWS_ACCESS_KEY_ID=${MINIO_ROOT_USER} - AWS_SECRET_ACCESS_KEY=${MINIO_ROOT_PASSWORD} - AWS_JAVA_V1_DISABLE_DEPRECATION_ANNOUNCEMENT=true + - PYSPARK_PYTHON=/opt/python/bin/python ports: - "8092:8092" extra_hosts: - "localhost:host-gateway" volumes: - ./infra/airflow/processing/spark/jobs:/opt/spark/jobs + - ./notebooks:/home/jovyan/work healthcheck: test: ["CMD", "curl", "-f", "http://spark-master:8080"] interval: 30s @@ -599,6 +603,8 @@ services: - TRINO_URL=http://trino:8080 - SUPERSET_URL=http://superset:8088 - AIRFLOW_URL=http://airflow-apiserver:8080 + - PYSPARK_PYTHON=/opt/conda/envs/notebook/bin/python + - PYSPARK_DRIVER_PYTHON=/opt/conda/envs/notebook/bin/python profiles: [explore] volumes: diff --git a/infra/jupyterlab/Dockerfile b/infra/jupyterlab/Dockerfile index 3d8bb57..eb6b054 100644 --- a/infra/jupyterlab/Dockerfile +++ b/infra/jupyterlab/Dockerfile @@ -8,6 +8,7 @@ RUN apt-get update && apt-get install -y \ && rm -rf /var/lib/apt/lists/* ARG SPARK_VERSION=3.5.6 +ARG PYTHON_VERSION=3.12 RUN set -eux; \ curl -fSL --connect-timeout 20 --max-time 900 --retry 5 --retry-connrefused \ -o /tmp/spark.tgz "https://downloads.apache.org/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop3.tgz"; \ @@ -17,6 +18,18 @@ RUN set -eux; \ ENV SPARK_HOME=/opt/spark ENV PATH="$SPARK_HOME/bin:$PATH" +SHELL ["bash","-o","pipefail","-c"] + +ENV CONDA_ENV_PATH=/opt/conda/envs/notebook + +RUN mamba create -y -n notebook python=${PYTHON_VERSION} pip jupyterlab notebook && \ + mamba clean -afy + +ENV PATH="${CONDA_ENV_PATH}/bin:${PATH}" +ENV CONDA_DEFAULT_ENV=notebook +ENV PYSPARK_PYTHON="${CONDA_ENV_PATH}/bin/python" +ENV PYSPARK_DRIVER_PYTHON="${CONDA_ENV_PATH}/bin/python" + USER $NB_UID RUN pip install --no-cache-dir \ trino[sqlalchemy] \ diff --git a/infra/spark/Dockerfile b/infra/spark/Dockerfile index 3f69544..7b2b276 100644 --- a/infra/spark/Dockerfile +++ b/infra/spark/Dockerfile @@ -2,7 +2,25 @@ FROM bitnamilegacy/spark:3.5.6 USER 0 -RUN pip install --no-cache-dir pyarrow pandas fastavro confluent-kafka[avro] +RUN apt-get update && apt-get install -y --no-install-recommends \ + curl \ + bzip2 \ + ca-certificates \ + && rm -rf /var/lib/apt/lists/* + +ARG PYTHON_VERSION=3.12 +ENV MAMBA_ROOT_PREFIX=/opt/micromamba + +RUN curl -Ls "https://micro.mamba.pm/api/micromamba/linux-64/latest" | tar -xj -C /tmp && \ + install -m 0755 /tmp/bin/micromamba /usr/local/bin/micromamba && \ + rm -rf /tmp/bin + +RUN micromamba create -y -p /opt/python python=${PYTHON_VERSION} pip && \ + micromamba clean -afy + +ENV PATH="/opt/python/bin:${PATH}" + +RUN /opt/python/bin/pip install --no-cache-dir pyarrow pandas fastavro confluent-kafka[avro] RUN set -eux; \ mkdir -p "${SPARK_HOME}/jars"; \ @@ -21,5 +39,7 @@ RUN chown -R 1001:1001 ${SPARK_HOME}/conf ENV SPARK_JARS_DIR="${SPARK_HOME}/jars" ENV PYTHONPATH="/opt/spark/jobs:${PYTHONPATH}" +ENV PYSPARK_PYTHON="/opt/python/bin/python" +ENV PYSPARK_DRIVER_PYTHON="/opt/python/bin/python" USER 1001 diff --git a/notebooks/playgrounds/hotels/hotels_iceberg_population.ipynb b/notebooks/playgrounds/hotels/hotels_iceberg_population.ipynb new file mode 100644 index 0000000..d43b2f5 --- /dev/null +++ b/notebooks/playgrounds/hotels/hotels_iceberg_population.ipynb @@ -0,0 +1,872 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "b61b95d0", + "metadata": {}, + "source": [ + "# 🧊 Hotels Iceberg Population Playground\n", + "\n", + "**Why**: Provide a reproducible Booking-style dataset for SQL/PySpark drills. \n", + "**How**: Synthesize booking-like DataFrames, merge them into Iceberg tables, and keep the run idempotent. \n", + "**Notes**: Safe to rerun; tables live in the `iceberg.hotels_practice` namespace.\n" + ] + }, + { + "cell_type": "markdown", + "id": "c8addbfe", + "metadata": {}, + "source": [ + "## ⚙️ Environment" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "0ddf3816", + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "\n", + "MINIO_ENDPOINT = os.getenv(\"MINIO_ENDPOINT\", \"http://minio:9000\")\n", + "MINIO_ACCESS_KEY = os.getenv(\"MINIO_ROOT_USER\", \"minio\")\n", + "MINIO_SECRET_KEY = os.getenv(\"MINIO_ROOT_PASSWORD\", \"minio123\")\n", + "HIVE_METASTORE_URI = os.getenv(\"HIVE_METASTORE_URI\", \"thrift://hive-metastore:9083\")\n", + "TRINO_URL = os.getenv(\"TRINO_URL\", \"http://trino:8080\")\n", + "SPARK_MASTER = os.getenv(\"SPARK_MASTER_URL\", \"spark://spark-master:7077\")\n", + "S3_ENDPOINT = os.getenv(\"S3_ENDPOINT\", \"minio:9000\")\n", + "\n", + "os.environ.setdefault(\"AWS_REGION\", \"us-east-1\")\n", + "os.environ.setdefault(\"AWS_DEFAULT_REGION\", os.environ[\"AWS_REGION\"])\n", + "\n", + "print(\"MinIO:\", MINIO_ENDPOINT)\n", + "print(\"Hive metastore:\", HIVE_METASTORE_URI)\n", + "print(\"Spark master:\", SPARK_MASTER)\n", + "print(\"Trino:\", TRINO_URL)\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d5ab5d87", + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "----------------------------------------\n", + "Exception occurred during processing of request from ('127.0.0.1', 48876)\n", + "Traceback (most recent call last):\n", + " File \"/opt/conda/envs/notebook/lib/python3.12/socketserver.py\", line 318, in _handle_request_noblock\n", + " self.process_request(request, client_address)\n", + " File \"/opt/conda/envs/notebook/lib/python3.12/socketserver.py\", line 349, in process_request\n", + " self.finish_request(request, client_address)\n", + " File \"/opt/conda/envs/notebook/lib/python3.12/socketserver.py\", line 362, in finish_request\n", + " self.RequestHandlerClass(request, client_address, self)\n", + " File \"/opt/conda/envs/notebook/lib/python3.12/socketserver.py\", line 766, in __init__\n", + " self.handle()\n", + " File \"/opt/spark/python/pyspark/accumulators.py\", line 295, in handle\n", + " poll(accum_updates)\n", + " File \"/opt/spark/python/pyspark/accumulators.py\", line 267, in poll\n", + " if self.rfile in r and func():\n", + " ^^^^^^\n", + " File \"/opt/spark/python/pyspark/accumulators.py\", line 271, in accum_updates\n", + " num_updates = read_int(self.rfile)\n", + " ^^^^^^^^^^^^^^^^^^^^\n", + " File \"/opt/spark/python/pyspark/serializers.py\", line 596, in read_int\n", + " raise EOFError\n", + "EOFError\n", + "----------------------------------------\n" + ] + } + ], + "source": [ + "from pyspark.sql import SparkSession\n", + "\n", + "packages = [\n", + " \"org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.9.2\",\n", + " \"org.apache.hadoop:hadoop-aws:3.3.4\",\n", + " \"software.amazon.awssdk:bundle:2.20.158\"\n", + "]\n", + "\n", + "print(\"🚀 Creating Spark session with Iceberg support …\")\n", + "\n", + "spark = (\n", + " SparkSession.builder\n", + " .appName(\"HotelsIcebergPopulation\")\n", + " .master(SPARK_MASTER)\n", + " .config(\"spark.jars.packages\", \",\".join(packages))\n", + " .config(\"spark.sql.catalog.spark_catalog\", \"org.apache.iceberg.spark.SparkSessionCatalog\")\n", + " .config(\"spark.sql.catalog.spark_catalog.type\", \"hive\")\n", + " .config(\"spark.sql.catalog.iceberg\", \"org.apache.iceberg.spark.SparkCatalog\")\n", + " .config(\"spark.sql.catalog.iceberg.type\", \"rest\")\n", + " .config(\"spark.sql.catalog.iceberg.uri\", \"http://hive-metastore:9001/iceberg\")\n", + " .config(\"spark.sql.catalog.iceberg.warehouse\", \"s3a://iceberg/warehouse\")\n", + " .config(\"spark.sql.catalog.iceberg.s3.endpoint\", f\"http://{S3_ENDPOINT}\")\n", + " .config(\"spark.sql.catalog.iceberg.s3.access-key-id\", MINIO_ACCESS_KEY)\n", + " .config(\"spark.sql.catalog.iceberg.s3.secret-access-key\", MINIO_SECRET_KEY)\n", + " .config(\"spark.sql.catalog.iceberg.s3.region\", os.environ[\"AWS_REGION\"])\n", + " .config(\"spark.sql.catalog.iceberg.s3.path-style-access\", \"true\")\n", + " .config(\"spark.sql.catalog.iceberg.s3.connection-ssl-enabled\", \"false\")\n", + " .config(\"spark.sql.catalog.iceberg.s3.sse.type\", \"none\")\n", + " .config(\"spark.sql.extensions\", \"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions\")\n", + " .config(\"spark.hadoop.hive.metastore.uris\", HIVE_METASTORE_URI)\n", + " .config(\"spark.hadoop.fs.s3a.endpoint\", S3_ENDPOINT)\n", + " .config(\"spark.hadoop.fs.s3a.access.key\", MINIO_ACCESS_KEY)\n", + " .config(\"spark.hadoop.fs.s3a.secret.key\", MINIO_SECRET_KEY)\n", + " .config(\"spark.hadoop.fs.s3a.path.style.access\", \"true\")\n", + " .config(\"spark.hadoop.fs.s3a.impl\", \"org.apache.hadoop.fs.s3a.S3AFileSystem\")\n", + " .config(\"spark.hadoop.fs.s3a.connection.ssl.enabled\", \"false\")\n", + " .config(\"spark.hadoop.fs.s3a.aws.credentials.provider\", \"org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider\")\n", + " .config(\"spark.sql.defaultCatalog\", \"iceberg\")\n", + " .enableHiveSupport()\n", + " .getOrCreate()\n", + ")\n", + "\n", + "spark.sparkContext.setLogLevel(\"WARN\")\n", + "spark.conf.set(\"spark.sql.session.timeZone\", \"UTC\")\n", + "\n", + "print(\"✅ Spark session ready (Iceberg REST catalog)\")\n", + "print(\" Spark version:\", spark.version)\n" + ] + }, + { + "cell_type": "markdown", + "id": "cc7608ef", + "metadata": {}, + "source": [ + "## 📦 Configuration\n", + "\n", + "Reproducible volumes and paths for the synthetic contract." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "661d720e", + "metadata": {}, + "outputs": [], + "source": [ + "from dataclasses import dataclass, field\n", + "import random\n", + "\n", + "@dataclass\n", + "class HotelsPracticeConfig:\n", + " random_seed: int = 7\n", + " catalog: str = \"iceberg\"\n", + " namespace: str = os.getenv(\"HOTELS_ICEBERG_NAMESPACE\", \"hotels_practice\")\n", + " volumes: dict = field(default_factory=lambda: {\n", + " \"hotels\": 200,\n", + " \"users\": 2000,\n", + " \"bookings\": 8000,\n", + " \"reviews\": 6000,\n", + " \"images\": 2500,\n", + " })\n", + "\n", + "cfg = HotelsPracticeConfig()\n", + "rng = random.Random(cfg.random_seed)\n", + "\n", + "print(cfg)\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "9fd4b8ba", + "metadata": {}, + "outputs": [], + "source": [ + "import json\n", + "import uuid\n", + "from datetime import date, datetime, timedelta\n", + "from typing import Optional\n", + "\n", + "from pyspark.sql import functions as F, types as T\n", + "\n", + "\n", + "def random_date(rng, start: date, end: date) -> date:\n", + " return start + timedelta(days=rng.randint(0, (end - start).days))\n", + "\n", + "\n", + "def random_timestamp(rng, start: datetime, end: datetime) -> datetime:\n", + " total_seconds = int((end - start).total_seconds())\n", + " return start + timedelta(seconds=rng.randint(0, total_seconds))\n", + "\n", + "\n", + "def deterministic_uuid(prefix: str, index: int) -> str:\n", + " return str(uuid.uuid5(uuid.NAMESPACE_URL, f\"{prefix}-{index}\"))\n", + "\n", + "\n", + "today = datetime.utcnow().date()\n", + "current_ts = datetime.utcnow()\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "36f1a1cf", + "metadata": {}, + "outputs": [], + "source": [ + "country_cities = {\n", + " \"US\": [\"New York\", \"San Francisco\", \"Chicago\", \"Austin\", \"Miami\"],\n", + " \"CA\": [\"Toronto\", \"Vancouver\", \"Montreal\", \"Calgary\", \"Ottawa\"],\n", + " \"GB\": [\"London\", \"Manchester\", \"Edinburgh\", \"Bristol\", \"Bath\"],\n", + " \"FR\": [\"Paris\", \"Lyon\", \"Nice\", \"Bordeaux\", \"Lille\"],\n", + " \"NL\": [\"Amsterdam\", \"Rotterdam\", \"Utrecht\", \"The Hague\", \"Eindhoven\"],\n", + " \"DE\": [\"Berlin\", \"Munich\", \"Frankfurt\", \"Hamburg\", \"Cologne\"],\n", + " \"ES\": [\"Barcelona\", \"Madrid\", \"Seville\", \"Valencia\", \"Bilbao\"],\n", + " \"IT\": [\"Rome\", \"Florence\", \"Milan\", \"Venice\", \"Bologna\"],\n", + " \"IL\": [\"Tel Aviv\", \"Jerusalem\", \"Haifa\", \"Eilat\", \"Herzliya\"],\n", + " \"PT\": [\"Lisbon\", \"Porto\", \"Faro\", \"Braga\", \"Coimbra\"],\n", + " \"SE\": [\"Stockholm\", \"Gothenburg\", \"Malmo\", \"Uppsala\", \"Visby\"],\n", + " \"JP\": [\"Tokyo\", \"Kyoto\", \"Osaka\", \"Sapporo\", \"Fukuoka\"],\n", + " \"AU\": [\"Sydney\", \"Melbourne\", \"Perth\", \"Brisbane\", \"Adelaide\"],\n", + " \"BR\": [\"Rio\", \"Sao Paulo\", \"Brasilia\", \"Salvador\", \"Recife\"],\n", + " \"AE\": [\"Dubai\", \"Abu Dhabi\", \"Sharjah\", \"Al Ain\", \"Ras Al Khaimah\"],\n", + " \"ZA\": [\"Cape Town\", \"Johannesburg\", \"Durban\", \"Pretoria\", \"Bloemfontein\"],\n", + "}\n", + "\n", + "chains = [\"StayPro\", \"BoutiqueX\", \"UrbanNest\", \"Skyline\", \"Coastline\", \"Heritage\", \"Aurora\"]\n", + "adjectives = [\"Grand\", \"Crystal\", \"Sunset\", \"Aurora\", \"Vertex\", \"Harbor\", \"Atlas\", \"Zenith\", \"Summit\", \"Velvet\"]\n", + "nouns = [\"Resort\", \"Suites\", \"Inn\", \"Lodge\", \"Retreat\", \"Villa\", \"Boutique\", \"Haven\", \"Plaza\", \"Terrace\"]\n", + "room_rates = {}\n", + "\n", + "hotels_records = []\n", + "for idx in range(cfg.volumes[\"hotels\"]):\n", + " country = rng.choice(list(country_cities))\n", + " city = rng.choice(country_cities[country])\n", + " stars = rng.choices([1, 2, 3, 4, 5], weights=[0.05, 0.15, 0.35, 0.3, 0.15])[0]\n", + " num_rooms = rng.randint(35, 420)\n", + " chain = rng.choice(chains) if rng.random() < 0.4 else None\n", + " hotel_id = f\"H{100000 + idx}\"\n", + " room_rates[hotel_id] = rng.randint(80, 420)\n", + " hotel_name = f\"{rng.choice(adjectives)} {rng.choice(nouns)} {city}\"\n", + " hotels_records.append({\n", + " \"hotel_id\": hotel_id,\n", + " \"hotel_name\": hotel_name,\n", + " \"country\": country,\n", + " \"city\": city,\n", + " \"stars\": stars,\n", + " \"num_rooms\": num_rooms,\n", + " \"chain\": chain,\n", + " })\n", + "\n", + "hotel_schema = T.StructType([\n", + " T.StructField(\"hotel_id\", T.StringType(), False),\n", + " T.StructField(\"hotel_name\", T.StringType(), False),\n", + " T.StructField(\"country\", T.StringType(), False),\n", + " T.StructField(\"city\", T.StringType(), False),\n", + " T.StructField(\"stars\", T.IntegerType(), False),\n", + " T.StructField(\"num_rooms\", T.IntegerType(), False),\n", + " T.StructField(\"chain\", T.StringType(), True),\n", + "])\n", + "\n", + "hotels_df = spark.createDataFrame(hotels_records, schema=hotel_schema)\n", + "print(f\"Hotels: {hotels_df.count()}\")\n", + "hotels_df.orderBy(\"hotel_id\").show(5, truncate=False)\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d39cd004", + "metadata": {}, + "outputs": [], + "source": [ + "users_records = []\n", + "user_schema = T.StructType([\n", + " T.StructField(\"user_id\", T.StringType(), False),\n", + " T.StructField(\"home_country\", T.StringType(), False),\n", + " T.StructField(\"signup_date\", T.DateType(), False),\n", + " T.StructField(\"age\", T.IntegerType(), False),\n", + "])\n", + "\n", + "user_ids = []\n", + "for idx in range(cfg.volumes[\"users\"]):\n", + " user_id = f\"U{100000 + idx}\"\n", + " user_ids.append(user_id)\n", + " signup_date = random_date(rng, date(2015, 1, 1), date(2024, 1, 1))\n", + " users_records.append({\n", + " \"user_id\": user_id,\n", + " \"home_country\": rng.choice(list(country_cities)),\n", + " \"signup_date\": signup_date,\n", + " \"age\": rng.randint(18, 78),\n", + " })\n", + "\n", + "users_df = spark.createDataFrame(users_records, schema=user_schema)\n", + "print(f\"Users: {users_df.count()}\")\n", + "users_df.orderBy(\"user_id\").show(5, truncate=False)\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7d9bda3a", + "metadata": {}, + "outputs": [], + "source": [ + "booking_schema = T.StructType([\n", + " T.StructField(\"booking_id\", T.StringType(), False),\n", + " T.StructField(\"user_id\", T.StringType(), False),\n", + " T.StructField(\"hotel_id\", T.StringType(), False),\n", + " T.StructField(\"checkin_date\", T.DateType(), False),\n", + " T.StructField(\"checkout_date\", T.DateType(), False),\n", + " T.StructField(\"nights\", T.IntegerType(), False),\n", + " T.StructField(\"price_usd\", T.DoubleType(), False),\n", + " T.StructField(\"status\", T.StringType(), False),\n", + "])\n", + "\n", + "booking_records = []\n", + "hotel_ids = [row[\"hotel_id\"] for row in hotels_records]\n", + "status_weights = [(\"completed\", 0.78), (\"cancelled\", 0.14), (\"no_show\", 0.08)]\n", + "\n", + "recent_booking_start = today - timedelta(days=540)\n", + "focus_booking_start = today - timedelta(days=180)\n", + "primary_end = today - timedelta(days=1)\n", + "\n", + "for idx in range(cfg.volumes[\"bookings\"]):\n", + " user_id = rng.choice(user_ids)\n", + " hotel_id = rng.choice(hotel_ids)\n", + " if rng.random() < 0.9:\n", + " window_start = focus_booking_start\n", + " window_end = primary_end\n", + " else:\n", + " window_start = recent_booking_start\n", + " window_end = max(recent_booking_start, focus_booking_start - timedelta(days=1))\n", + " if window_start > window_end:\n", + " window_start, window_end = recent_booking_start, primary_end\n", + " checkin = random_date(rng, window_start, window_end)\n", + " max_nights = max(1, min(14, (today - checkin).days))\n", + " nights = rng.randint(1, max_nights)\n", + " checkout = checkin + timedelta(days=nights)\n", + " status = rng.choices([s for s, _ in status_weights], weights=[w for _, w in status_weights])[0]\n", + " base_price = room_rates[hotel_id] * nights * rng.uniform(0.9, 1.3)\n", + " if status == \"completed\":\n", + " price = round(base_price, 2)\n", + " elif status == \"cancelled\":\n", + " price = round(base_price * rng.uniform(0.1, 0.4), 2)\n", + " else:\n", + " price = round(base_price * rng.uniform(0.0, 0.2), 2)\n", + " booking_records.append({\n", + " \"booking_id\": deterministic_uuid(\"booking\", idx),\n", + " \"user_id\": user_id,\n", + " \"hotel_id\": hotel_id,\n", + " \"checkin_date\": checkin,\n", + " \"checkout_date\": checkout,\n", + " \"nights\": nights,\n", + " \"price_usd\": float(price),\n", + " \"status\": status,\n", + " })\n", + "\n", + "bookings_df = spark.createDataFrame(booking_records, schema=booking_schema)\n", + "print(f\"Bookings: {bookings_df.count()}\")\n", + "bookings_df.groupBy(\"status\").count().show()\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ae35f7b9", + "metadata": {}, + "outputs": [], + "source": [ + "review_schema = T.StructType([\n", + " T.StructField(\"review_id\", T.StringType(), False),\n", + " T.StructField(\"user_id\", T.StringType(), False),\n", + " T.StructField(\"hotel_id\", T.StringType(), False),\n", + " T.StructField(\"rating\", T.IntegerType(), False),\n", + " T.StructField(\"created_at\", T.TimestampType(), False),\n", + " T.StructField(\"review_text\", T.StringType(), True),\n", + " T.StructField(\"review_metadata\", T.StringType(), True),\n", + " T.StructField(\"lang\", T.StringType(), True),\n", + "])\n", + "\n", + "hotel_lookup = {record[\"hotel_id\"]: record for record in hotels_records}\n", + "\n", + "rating_templates = {\n", + " 5: [\n", + " {\"text\": \"Exceptional stay at {hotel_name} in {city}; staff anticipated every need.\", \"lang\": \"en\"},\n", + " {\"text\": \"Worth the {price_per_night:.0f} USD nightly rate at {hotel_name}; pure comfort.\", \"lang\": \"en\"},\n", + " {\"text\": \"Servicio excelente en {hotel_name} ({city}), volveremos pronto.\", \"lang\": \"es\"},\n", + " {\"text\": \"Esperienza eccellente al {hotel_name} di {city}; ritorneremo certamente.\", \"lang\": \"it\"},\n", + " {\"text\": \"חוויה מושלמת ב-{hotel_name} שב-{city}, הכל היה מדויק.\", \"lang\": \"he\"},\n", + " ],\n", + " 4: [\n", + " {\"text\": \"{hotel_name} in {city} delivered a smooth check-in and quiet room.\", \"lang\": \"en\"},\n", + " {\"text\": \"Bon rapport qualité/prix à {hotel_name}; équipe attentionnée.\", \"lang\": \"fr\"},\n", + " {\"text\": \"Rooftop bar at {hotel_name} made the {nights} nights memorable.\", \"lang\": \"en\"},\n", + " {\"text\": \"Camere spaziose e personale cordiale al {hotel_name} di {city}.\", \"lang\": \"it\"},\n", + " ],\n", + " 3: [\n", + " {\"text\": \"Stay at {hotel_name} in {city} was decent, though amenities felt basic.\", \"lang\": \"en\"},\n", + " {\"text\": \"Habitación correcta en {hotel_name}, pero poco encanto.\", \"lang\": \"es\"},\n", + " {\"text\": \"{hotel_name} bietet solide Lage in {city}, aber Service wirkt routiniert.\", \"lang\": \"de\"},\n", + " {\"text\": \"Camera nella media al {hotel_name}; niente di speciale.\", \"lang\": \"it\"},\n", + " ],\n", + " 2: [\n", + " {\"text\": \"{hotel_name} in {city} struggled with slow check-in and thin walls.\", \"lang\": \"en\"},\n", + " {\"text\": \"Peu de pression d'eau et chauffage capricieux à {hotel_name}.\", \"lang\": \"fr\"},\n", + " {\"text\": \"Servicio frío en {hotel_name}; necesitamos mejorar la limpieza.\", \"lang\": \"es\"},\n", + " {\"text\": \"Personale distratto al {hotel_name}; esperienza deludente.\", \"lang\": \"it\"},\n", + " ],\n", + " 1: [\n", + " {\"text\": \"Worst check-in at {hotel_name}; {nights} nights felt endless.\", \"lang\": \"en\"},\n", + " {\"text\": \"Nos fuimos antes: {hotel_name} en {city} fue un desastre.\", \"lang\": \"es\"},\n", + " {\"text\": \"לעולם לא נחזור ל-{hotel_name}; רעש בלתי נסבל.\", \"lang\": \"he\"},\n", + " {\"text\": \"Esperienza pessima al {hotel_name}; siamo andati via subito.\", \"lang\": \"it\"},\n", + " ],\n", + "}\n", + "\n", + "def build_review_snippet(rating: int, booking: dict):\n", + " templates = rating_templates.get(rating)\n", + " if not templates:\n", + " return None, None\n", + " hotel = hotel_lookup.get(booking[\"hotel_id\"], {})\n", + " context = {\n", + " \"hotel_name\": hotel.get(\"hotel_name\", \"the hotel\"),\n", + " \"city\": hotel.get(\"city\", \"the city\"),\n", + " \"chain\": hotel.get(\"chain\") or \"independent\",\n", + " \"nights\": booking[\"nights\"],\n", + " \"price_per_night\": booking[\"price_usd\"] / booking[\"nights\"] if booking[\"nights\"] else booking[\"price_usd\"],\n", + " }\n", + " template = rng.choice(templates)\n", + " text = template[\"text\"].format(**context)\n", + " if rating in followup_phrases and rng.random() < 0.4:\n", + " text = f\"{text} {rng.choice(followup_phrases[rating])}\"\n", + " return text, template[\"lang\"]\n", + "\n", + "followup_phrases = {\n", + " 5: [\n", + " \"Breakfast buffet was a highlight.\",\n", + " \"Spa booking was effortless.\",\n", + " \"Loved the skyline view from the suite.\",\n", + " ],\n", + " 4: [\n", + " \"Would happily stay again once the gym reopens.\",\n", + " \"Concierge solved transport within minutes.\",\n", + " ],\n", + " 3: [\n", + " \"Staff tried their best, yet small details were missed.\",\n", + " \"Good for a short business hop.\",\n", + " ],\n", + " 2: [\n", + " \"Maintenance team eventually helped, but the wait was long.\",\n", + " \"Noise from hallway kept us up.\",\n", + " ],\n", + " 1: [\n", + " \"Requested refund for the final night.\",\n", + " \"Left after the first evening despite plans.\",\n", + " ],\n", + "}\n", + "\n", + "review_tags_vocab = [\"clean\", \"view\", \"staff\", \"breakfast\", \"location\", \"spa\", \"business\", \"family\", \"quiet\", \"food\", \"design\", \"access\"]\n", + "review_sources = [\"mobile_app\", \"desktop_web\", \"partner_site\"]\n", + "review_devices = [\"ios\", \"android\", \"web\"]\n", + "stay_purposes = [\"business\", \"leisure\", \"family\", \"event\"]\n", + "\n", + "completed_bookings = [rec for rec in booking_records if rec[\"status\"] == \"completed\"]\n", + "num_reviews = min(cfg.volumes[\"reviews\"], len(completed_bookings))\n", + "rng.shuffle(completed_bookings)\n", + "review_candidates = completed_bookings[:num_reviews]\n", + "\n", + "recent_review_floor = datetime.combine(today - timedelta(days=540), datetime.min.time())\n", + "user_last_review_ts = {}\n", + "review_records = []\n", + "for idx, booking in enumerate(review_candidates):\n", + " rating = rng.choices([1, 2, 3, 4, 5], weights=[0.04, 0.08, 0.22, 0.36, 0.30])[0]\n", + " base_datetime = (\n", + " datetime.combine(booking[\"checkout_date\"], datetime.min.time())\n", + " + timedelta(hours=rng.randint(7, 22), minutes=rng.randint(0, 59))\n", + " )\n", + " if rng.random() < 0.3 and booking[\"user_id\"] in user_last_review_ts:\n", + " created_at = user_last_review_ts[booking[\"user_id\"]] + timedelta(minutes=rng.randint(5, 25))\n", + " else:\n", + " created_at = base_datetime + timedelta(minutes=rng.randint(0, 240))\n", + " prev_ts = user_last_review_ts.get(booking[\"user_id\"])\n", + " if prev_ts and created_at <= prev_ts:\n", + " created_at = prev_ts + timedelta(minutes=rng.randint(10, 240))\n", + " if created_at > current_ts:\n", + " created_at = current_ts - timedelta(minutes=rng.randint(5, 240))\n", + " if created_at < recent_review_floor:\n", + " created_at = recent_review_floor + timedelta(days=rng.randint(0, 30), minutes=rng.randint(0, 120))\n", + " if rng.random() < 0.1:\n", + " review_text, lang = None, None\n", + " else:\n", + " review_text, lang = build_review_snippet(rating, booking)\n", + " sentiment = \"positive\" if rating >= 4 else \"negative\" if rating <= 2 else \"neutral\"\n", + " tags = rng.sample(review_tags_vocab, k=rng.randint(1, min(3, len(review_tags_vocab))))\n", + " metadata = json.dumps({\n", + " \"tags\": tags,\n", + " \"source\": rng.choice(review_sources),\n", + " \"device\": rng.choice(review_devices),\n", + " \"stay_purpose\": rng.choice(stay_purposes),\n", + " \"sentiment\": sentiment,\n", + " })\n", + " review_records.append({\n", + " \"review_id\": deterministic_uuid(\"review\", idx),\n", + " \"user_id\": booking[\"user_id\"],\n", + " \"hotel_id\": booking[\"hotel_id\"],\n", + " \"rating\": int(rating),\n", + " \"created_at\": created_at,\n", + " \"review_text\": review_text,\n", + " \"review_metadata\": metadata,\n", + " \"lang\": lang,\n", + " })\n", + " user_last_review_ts[booking[\"user_id\"]] = created_at\n", + "\n", + "reviews_df = spark.createDataFrame(review_records, schema=review_schema)\n", + "print(f\"Reviews: {reviews_df.count()}\")\n", + "reviews_df.groupBy(\"rating\").count().orderBy(\"rating\").show()\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "65a4005b", + "metadata": {}, + "outputs": [], + "source": [ + "image_schema = T.StructType([\n", + " T.StructField(\"image_id\", T.StringType(), False),\n", + " T.StructField(\"hotel_id\", T.StringType(), False),\n", + " T.StructField(\"url\", T.StringType(), False),\n", + " T.StructField(\"width\", T.IntegerType(), False),\n", + " T.StructField(\"height\", T.IntegerType(), False),\n", + " T.StructField(\"aspect_ratio\", T.DoubleType(), False),\n", + " T.StructField(\"tag\", T.StringType(), False),\n", + " T.StructField(\"quality_score\", T.DoubleType(), False),\n", + " T.StructField(\"created_at\", T.TimestampType(), False),\n", + "])\n", + "\n", + "image_tags = [\"room\", \"lobby\", \"pool\", \"restaurant\", \"gym\", \"spa\", \"exterior\", \"suite\", \"bar\", \"conference\"]\n", + "width_choices = [800, 1024, 1280, 1600, 1920, 2560]\n", + "height_choices = [600, 720, 900, 1080, 1440]\n", + "\n", + "image_records = []\n", + "image_counter = 0\n", + "image_window_start = current_ts - timedelta(days=365)\n", + "image_recent_threshold = current_ts - timedelta(days=90)\n", + "image_end = current_ts\n", + "\n", + "for hotel in hotels_records:\n", + " base_images = rng.randint(6, 10)\n", + " for idx in range(base_images):\n", + " width = rng.choice(width_choices)\n", + " height = rng.choice(height_choices)\n", + " aspect_ratio = round(width / height, 4)\n", + " created_at = random_timestamp(rng, image_window_start, image_end)\n", + " quality = rng.uniform(0.6, 0.97)\n", + " if created_at >= image_recent_threshold:\n", + " quality = max(quality, rng.uniform(0.75, 0.98))\n", + " image_records.append({\n", + " \"image_id\": deterministic_uuid(\"image\", image_counter),\n", + " \"hotel_id\": hotel[\"hotel_id\"],\n", + " \"url\": f\"https://cdn.practice.example/hotels/{hotel['hotel_id']}/img_{idx:03d}.jpg\",\n", + " \"width\": width,\n", + " \"height\": height,\n", + " \"aspect_ratio\": float(aspect_ratio),\n", + " \"tag\": rng.choice(image_tags),\n", + " \"quality_score\": round(quality, 3),\n", + " \"created_at\": created_at,\n", + " })\n", + " image_counter += 1\n", + " recent_min = rng.randint(4, 6)\n", + " for r_idx in range(recent_min):\n", + " width = rng.choice(width_choices)\n", + " height = rng.choice(height_choices)\n", + " aspect_ratio = round(width / height, 4)\n", + " created_at = random_timestamp(rng, image_recent_threshold, image_end)\n", + " quality = rng.uniform(0.78, 0.98)\n", + " image_records.append({\n", + " \"image_id\": deterministic_uuid(\"image\", image_counter),\n", + " \"hotel_id\": hotel[\"hotel_id\"],\n", + " \"url\": f\"https://cdn.practice.example/hotels/{hotel['hotel_id']}/recent_{r_idx:03d}.jpg\",\n", + " \"width\": width,\n", + " \"height\": height,\n", + " \"aspect_ratio\": float(aspect_ratio),\n", + " \"tag\": rng.choice(image_tags),\n", + " \"quality_score\": round(quality, 3),\n", + " \"created_at\": created_at,\n", + " })\n", + " image_counter += 1\n", + "\n", + "images_df = spark.createDataFrame(image_records, schema=image_schema)\n", + "print(f\"Images: {images_df.count()}\")\n", + "images_df.groupBy(\"tag\").count().orderBy(\"count\", ascending=False).show(5)\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f6c8b61d", + "metadata": {}, + "outputs": [], + "source": [ + "mismatch = bookings_df.filter(F.datediff(\"checkout_date\", \"checkin_date\") != F.col(\"nights\"))\n", + "assert mismatch.count() == 0, \"Night count mismatch detected\"\n", + "print(\"✅ Nights column validated\")\n" + ] + }, + { + "cell_type": "markdown", + "id": "74386ba4", + "metadata": {}, + "source": [ + "## 🧊 Iceberg Population\n", + "\n", + "Merge the freshly generated DataFrames directly into Iceberg tables and inspect the row counts before cleanup.\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d7b61508", + "metadata": {}, + "outputs": [], + "source": [ + "staging_views = {\n", + " \"stg_hotels\": hotels_df,\n", + " \"stg_users\": users_df,\n", + " \"stg_bookings\": bookings_df,\n", + " \"stg_reviews\": reviews_df,\n", + " \"stg_images\": images_df,\n", + "}\n", + "\n", + "for view_name, dataframe in staging_views.items():\n", + " dataframe.createOrReplaceTempView(view_name)\n", + "\n", + "print(\"Staging counts:\")\n", + "for name, df in staging_views.items():\n", + " print(f\" - {name.replace('stg_', '')}: {df.count()}\")\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "namespace = f\"{cfg.catalog}.{cfg.namespace}\"\n", + "\n", + "spark.sql(f\"CREATE NAMESPACE IF NOT EXISTS {namespace}\")\n", + "\n", + "spark.sql(f\"\"\"\n", + "CREATE TABLE IF NOT EXISTS {namespace}.hotels (\n", + " hotel_id STRING,\n", + " hotel_name STRING,\n", + " country STRING,\n", + " city STRING,\n", + " stars INT,\n", + " num_rooms INT,\n", + " chain STRING\n", + ")\n", + "USING ICEBERG\n", + "PARTITIONED BY (country)\n", + "TBLPROPERTIES ('format-version'='2')\n", + "\"\"\")\n", + "\n", + "spark.sql(f\"\"\"\n", + "CREATE TABLE IF NOT EXISTS {namespace}.users (\n", + " user_id STRING,\n", + " home_country STRING,\n", + " signup_date DATE,\n", + " age INT\n", + ")\n", + "USING ICEBERG\n", + "PARTITIONED BY (home_country)\n", + "TBLPROPERTIES ('format-version'='2')\n", + "\"\"\")\n", + "\n", + "spark.sql(f\"\"\"\n", + "CREATE TABLE IF NOT EXISTS {namespace}.bookings (\n", + " booking_id STRING,\n", + " user_id STRING,\n", + " hotel_id STRING,\n", + " checkin_date DATE,\n", + " checkout_date DATE,\n", + " nights INT,\n", + " price_usd DOUBLE,\n", + " status STRING\n", + ")\n", + "USING ICEBERG\n", + "PARTITIONED BY (months(checkin_date), bucket(16, hotel_id))\n", + "TBLPROPERTIES ('format-version'='2')\n", + "\"\"\")\n", + "\n", + "spark.sql(f\"\"\"\n", + "CREATE TABLE IF NOT EXISTS {namespace}.reviews (\n", + " review_id STRING,\n", + " user_id STRING,\n", + " hotel_id STRING,\n", + " rating INT,\n", + " created_at TIMESTAMP,\n", + " review_text STRING,\n", + " review_metadata STRING,\n", + " lang STRING\n", + ")\n", + "USING ICEBERG\n", + "PARTITIONED BY (months(created_at))\n", + "TBLPROPERTIES ('format-version'='2')\n", + "\"\"\")\n", + "\n", + "spark.sql(f\"\"\"\n", + "CREATE TABLE IF NOT EXISTS {namespace}.images (\n", + " image_id STRING,\n", + " hotel_id STRING,\n", + " url STRING,\n", + " width INT,\n", + " height INT,\n", + " aspect_ratio DOUBLE,\n", + " tag STRING,\n", + " quality_score DOUBLE,\n", + " created_at TIMESTAMP\n", + ")\n", + "USING ICEBERG\n", + "PARTITIONED BY (bucket(32, hotel_id))\n", + "TBLPROPERTIES ('format-version'='2')\n", + "\"\"\")\n", + "\n", + "print(f\"✅ Tables ensured in {namespace}\")\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "spark.sql(f\"\"\"\n", + "MERGE INTO {namespace}.hotels AS target\n", + "USING stg_hotels AS source\n", + "ON target.hotel_id = source.hotel_id\n", + "WHEN MATCHED THEN UPDATE SET\n", + " hotel_name = source.hotel_name,\n", + " country = source.country,\n", + " city = source.city,\n", + " stars = source.stars,\n", + " num_rooms = source.num_rooms,\n", + " chain = source.chain\n", + "WHEN NOT MATCHED THEN INSERT (hotel_id, hotel_name, country, city, stars, num_rooms, chain)\n", + "VALUES (source.hotel_id, source.hotel_name, source.country, source.city, source.stars, source.num_rooms, source.chain)\n", + "\"\"\")\n", + "\n", + "spark.sql(f\"\"\"\n", + "MERGE INTO {namespace}.users AS target\n", + "USING stg_users AS source\n", + "ON target.user_id = source.user_id\n", + "WHEN MATCHED THEN UPDATE SET\n", + " home_country = source.home_country,\n", + " signup_date = source.signup_date,\n", + " age = source.age\n", + "WHEN NOT MATCHED THEN INSERT (user_id, home_country, signup_date, age)\n", + "VALUES (source.user_id, source.home_country, source.signup_date, source.age)\n", + "\"\"\")\n", + "\n", + "spark.sql(f\"\"\"\n", + "MERGE INTO {namespace}.bookings AS target\n", + "USING stg_bookings AS source\n", + "ON target.booking_id = source.booking_id\n", + "WHEN MATCHED THEN UPDATE SET\n", + " user_id = source.user_id,\n", + " hotel_id = source.hotel_id,\n", + " checkin_date = source.checkin_date,\n", + " checkout_date = source.checkout_date,\n", + " nights = source.nights,\n", + " price_usd = source.price_usd,\n", + " status = source.status\n", + "WHEN NOT MATCHED THEN INSERT (booking_id, user_id, hotel_id, checkin_date, checkout_date, nights, price_usd, status)\n", + "VALUES (source.booking_id, source.user_id, source.hotel_id, source.checkin_date, source.checkout_date, source.nights, source.price_usd, source.status)\n", + "\"\"\")\n", + "\n", + "spark.sql(f\"\"\"\n", + "MERGE INTO {namespace}.reviews AS target\n", + "USING stg_reviews AS source\n", + "ON target.review_id = source.review_id\n", + "WHEN MATCHED THEN UPDATE SET\n", + " user_id = source.user_id,\n", + " hotel_id = source.hotel_id,\n", + " rating = source.rating,\n", + " created_at = source.created_at,\n", + " review_text = source.review_text,\n", + " review_metadata = source.review_metadata,\n", + " lang = source.lang\n", + "WHEN NOT MATCHED THEN INSERT (review_id, user_id, hotel_id, rating, created_at, review_text, review_metadata, lang)\n", + "VALUES (source.review_id, source.user_id, source.hotel_id, source.rating, source.created_at, source.review_text, source.review_metadata, source.lang)\n", + "\"\"\")\n", + "\n", + "spark.sql(f\"\"\"\n", + "MERGE INTO {namespace}.images AS target\n", + "USING stg_images AS source\n", + "ON target.image_id = source.image_id\n", + "WHEN MATCHED THEN UPDATE SET\n", + " hotel_id = source.hotel_id,\n", + " url = source.url,\n", + " width = source.width,\n", + " height = source.height,\n", + " aspect_ratio = source.aspect_ratio,\n", + " tag = source.tag,\n", + " quality_score = source.quality_score,\n", + " created_at = source.created_at\n", + "WHEN NOT MATCHED THEN INSERT (image_id, hotel_id, url, width, height, aspect_ratio, tag, quality_score, created_at)\n", + "VALUES (source.image_id, source.hotel_id, source.url, source.width, source.height, source.aspect_ratio, source.tag, source.quality_score, source.created_at)\n", + "\"\"\")\n", + "\n", + "print(\"✅ Iceberg tables merged\")\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "print(\"Row counts in Iceberg namespace:\")\n", + "for table in [\"hotels\", \"users\", \"bookings\", \"reviews\", \"images\"]:\n", + " result = spark.sql(f\"SELECT COUNT(*) AS rows FROM {namespace}.{table}\")\n", + " count = result.collect()[0][\"rows\"]\n", + " print(f\" - {table}: {count}\")\n", + "\n", + "spark.sql(f\"SELECT status, COUNT(*) AS cnt FROM {namespace}.bookings GROUP BY status ORDER BY status\").show()\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "print(\"🧹 Cleaning up Iceberg namespace...\")\n", + "print(\" (Comment out this cell if you want to keep the generated tables.)\")\n", + "tables = ['hotels', 'users', 'bookings', 'reviews', 'images']\n", + "for table in tables:\n", + " spark.sql(f\"DROP TABLE IF EXISTS {namespace}.{table}\")\n", + "spark.sql(f\"DROP NAMESPACE IF EXISTS {namespace}\")\n", + "print(\"✅ Cleanup complete.\")\n" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.11" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/notebooks/playgrounds/hotels/hotels_practice_drills.ipynb b/notebooks/playgrounds/hotels/hotels_practice_drills.ipynb new file mode 100644 index 0000000..e4fb085 --- /dev/null +++ b/notebooks/playgrounds/hotels/hotels_practice_drills.ipynb @@ -0,0 +1,318 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# 🧊 Hotels Practice Drills\n", + "\n", + "Use this lab to run the SQL, PySpark, and Python exercises against the hotels practice dataset. The first two cells spin up Spark with Iceberg support and configure Trino access. All subsequent sections list tasks only—add your own cells beneath each bullet to implement solutions.\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "\n", + "MINIO_ENDPOINT = os.getenv(\"MINIO_ENDPOINT\", \"http://minio:9000\")\n", + "MINIO_ACCESS_KEY = os.getenv(\"MINIO_ROOT_USER\", \"minio\")\n", + "MINIO_SECRET_KEY = os.getenv(\"MINIO_ROOT_PASSWORD\", \"minio123\")\n", + "HIVE_METASTORE_URI = os.getenv(\"HIVE_METASTORE_URI\", \"thrift://hive-metastore:9083\")\n", + "TRINO_URL = os.getenv(\"TRINO_URL\", \"http://trino:8080\")\n", + "SPARK_MASTER = os.getenv(\"SPARK_MASTER_URL\", \"spark://spark-master:7077\")\n", + "S3_ENDPOINT = os.getenv(\"S3_ENDPOINT\", \"minio:9000\")\n", + "\n", + "os.environ.setdefault(\"AWS_REGION\", \"us-east-1\")\n", + "os.environ.setdefault(\"AWS_DEFAULT_REGION\", os.environ[\"AWS_REGION\"])\n", + "\n", + "print(\"Spark master:\", SPARK_MASTER)\n", + "print(\"Trino URL:\", TRINO_URL)\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from pyspark.sql import SparkSession\n", + "\n", + "packages = [\n", + " \"org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.9.2\",\n", + " \"org.apache.hadoop:hadoop-aws:3.3.4\",\n", + " \"software.amazon.awssdk:bundle:2.20.158\",\n", + "]\n", + "\n", + "spark = (\n", + " SparkSession.builder\n", + " .appName(\"HotelsPracticeDrills\")\n", + " .master(SPARK_MASTER)\n", + " .config(\"spark.jars.packages\", \",\".join(packages))\n", + " .config(\"spark.sql.catalog.spark_catalog\", \"org.apache.iceberg.spark.SparkSessionCatalog\")\n", + " .config(\"spark.sql.catalog.spark_catalog.type\", \"hive\")\n", + " .config(\"spark.sql.catalog.iceberg\", \"org.apache.iceberg.spark.SparkCatalog\")\n", + " .config(\"spark.sql.catalog.iceberg.type\", \"rest\")\n", + " .config(\"spark.sql.catalog.iceberg.uri\", \"http://hive-metastore:9001/iceberg\")\n", + " .config(\"spark.sql.catalog.iceberg.warehouse\", \"s3a://iceberg/warehouse\")\n", + " .config(\"spark.sql.catalog.iceberg.s3.endpoint\", f\"http://{S3_ENDPOINT}\")\n", + " .config(\"spark.sql.catalog.iceberg.s3.access-key-id\", MINIO_ACCESS_KEY)\n", + " .config(\"spark.sql.catalog.iceberg.s3.secret-access-key\", MINIO_SECRET_KEY)\n", + " .config(\"spark.sql.catalog.iceberg.s3.region\", os.environ[\"AWS_REGION\"])\n", + " .config(\"spark.sql.catalog.iceberg.s3.path-style-access\", \"true\")\n", + " .config(\"spark.sql.extensions\", \"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions\")\n", + " .config(\"spark.sql.defaultCatalog\", \"iceberg\")\n", + " .enableHiveSupport()\n", + " .getOrCreate()\n", + ")\n", + "\n", + "spark.sparkContext.setLogLevel(\"WARN\")\n", + "spark.conf.set(\"spark.sql.session.timeZone\", \"UTC\")\n", + "\n", + "print(\"Spark session ready:\", spark.version)\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import trino\n", + "\n", + "trino_conn = trino.dbapi.connect(\n", + " host=os.getenv(\"TRINO_HOST\", \"trino\"),\n", + " port=int(os.getenv(\"TRINO_PORT\", \"8080\")),\n", + " user=os.getenv(\"TRINO_USER\", \"admin\"),\n", + " catalog=os.getenv(\"TRINO_CATALOG\", \"iceberg\"),\n", + " schema=os.getenv(\"TRINO_SCHEMA\", \"hotels_practice\"),\n", + ")\n", + "print(\"Connected to Trino catalog/schema:\", trino_conn.schema)\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 🧮 Section A — SQL (Trino) Drills\n", + "\n", + "Catalog/schema default: `iceberg.hotels_practice`. Adjust queries if you wrote tables elsewhere.\n", + "\n", + "### A1. Top Hotels by Recent Review Quality (last 180 days)\n", + "- Inputs: `reviews`, `hotels`\n", + "- Compute `review_cnt`, `avg_rating` grouped by `(country, city, hotel_id)` for reviews in the last 180 days.\n", + "- Return the top 20 ordered by `review_cnt` desc, then `avg_rating` desc. Handle ties safely.\n", + "\n", + "### A2. Monthly Occupancy Proxy\n", + "- Inputs: `bookings`, `hotels`\n", + "- For `status = 'completed'`, calculate monthly `bookings` and `total_nights` per `(country, city, hotel_id, month)` using `date_trunc('month', checkin_date)`.\n", + "- Order by `month` desc, then `bookings` desc.\n", + "\n", + "### A3. Top-3 Hotels per Country (Window)\n", + "- Inputs: `bookings`, `hotels`\n", + "- For completed bookings, compute total bookings and avg price per `(country, hotel_id)`.\n", + "- Return at most three rows per country using window functions (e.g., `row_number()` or `dense_rank()` when ties should retain peers).\n", + "\n", + "### A4. Rolling Rating per Hotel (Window Frame)\n", + "- Inputs: `reviews`\n", + "- For each `hotel_id`, compute a rolling average of `rating` over the previous 10 rows ordered by `created_at`.\n", + "\n", + "### A5. Cancellation & No-Show Rates\n", + "- Inputs: `bookings`, `hotels`\n", + "- For each `(country, month)` compute cancel and no-show rates using `NULLIF(total, 0)` to avoid divide-by-zero.\n", + "\n", + "### A6. Review Language Mix & Bias Check\n", + "- Inputs: `reviews`, `hotels`\n", + "- Last 90 days, compute language share per country and return those with any language share > 0.6.\n", + "\n", + "### A7. Joining Images for Quality Screening\n", + "- Inputs: `images`, `hotels`\n", + "- Last 60 days, compute `avg_quality`, `image_cnt` per `(country, hotel_id)` and filter to `image_cnt >= 5` and `avg_quality >= 0.8`.\n", + "\n", + "### A8. “Trusted” Hotel Surface (Multi-signal)\n", + "- Inputs: `hotels`, `reviews`, `bookings`, `images`\n", + "- Build a view/query where hotels satisfy all of:\n", + " - ≥ 30 reviews in last 180 days with `avg_rating >= 4.2`\n", + " - ≥ 20 completed bookings in last 180 days\n", + " - ≥ 5 images in last 90 days with `avg_quality >= 0.75`\n", + "- Compose with CTEs then join.\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 🔥 Section B — PySpark Drills\n", + "\n", + "Load tables via `spark.table(\"iceberg.hotels_practice.\")` or `spark.sql`.\n", + "\n", + "### B1. Sessionize Reviews by User (30-minute gaps)\n", + "- Build sessions per `user_id` using window + `lag` to reset when gap > 30 minutes.\n", + "\n", + "### B2. Late-arriving Event Simulation\n", + "- Use `spark.readStream` with a rate source or file source and the static reviews data to demonstrate a 5-minute tumbling aggregation with a 10-minute watermark.\n", + "\n", + "### B3. Skew Handling in Joins (Hotels × Reviews)\n", + "- Join reviews to hotels and compute avg rating per hotel. Show one skew mitigation strategy (broadcast or salting).\n", + "\n", + "### B4. Partition-Aware Writes (Iceberg)\n", + "- Rewrite reviews into a temp Iceberg table partitioned by `months(created_at)` and set a target file size property. Inspect metadata/EXPLAIN.\n", + "\n", + "### B5. Deduplicate Near-Duplicates by Text Fingerprint\n", + "- Normalize `review_text` (lowercase, strip punctuation) and drop duplicates by `(hotel_id, fingerprint)`.\n", + "\n", + "### B6. Curate Balanced ML Training Slices\n", + "- Sample balanced subsets across rating buckets (1–5) and languages (`en,de,fr,es,it,he`) with max N per bucket, writing to `iceberg.hotels_practice.ml_reviews_balanced`.\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 🧠 Section C — Python / Data Prep Drills\n", + "\n", + "You can use pandas or PySpark DataFrames. Parse `review_metadata` JSON as needed.\n", + "\n", + "### C1. JSON Tag Explosion + Top Tags per Hotel\n", + "- Parse tags from `review_metadata`, handle invalid JSON, and compute top three tags per hotel.\n", + "\n", + "### C2. Text Cleaning + Chunking for SFT\n", + "- Normalize `review_text`, strip emojis/punctuation, and chunk into ~60-word segments per review.\n", + "\n", + "### C3. Language Filter + Coverage Report\n", + "- Keep languages in `{en,de,fr,es,it,he}`. Report coverage % per lang and top 10 hotels by distinct language count.\n", + "\n", + "### C4. Toxicity/PII Placeholder Filter\n", + "- Implement simple regex filters for profanity, emails, phone numbers. Output counts of filtered rows.\n", + "\n", + "### C5. Train/Val/Test Split by Hotel + Time\n", + "- For each hotel, assign latest month to test, previous month to val, rest to train. Ensure no leakage.\n", + "\n", + "### C6. Simple Embedding Cache Index (Mock)\n", + "- Build TF-IDF vectors for chunks (from C2), store metadata mapping, and implement a cosine-similarity top-K search helper.\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## ☁️ Section D — Advanced Dataset Workloads (Iceberg Only)\n", + "\n", + "All tasks rely exclusively on the tables produced by `hotels_iceberg_population.ipynb` (namespace default: `iceberg.hotels_practice`).\n", + "\n", + "### D1. Historical Snapshot Audits\n", + "- Use Iceberg time travel to compare yesterday's snapshot with today and report row count deltas per table.\n", + "\n", + "### D2. Partition Health Checks\n", + "- For `bookings` and `reviews`, compute partition sizes (`months(checkin_date)` / `months(created_at)`) and flag skewed partitions (e.g., >3× median).\n", + "\n", + "### D3. Compaction Strategy Proposal\n", + "- Analyze file counts/average file sizes via `table.files` metadata and outline a compaction cadence (commands + triggers).\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 🔄 Section E — Streaming Simulation on Existing Data\n", + "\n", + "### E1. Micro-batch Replay\n", + "- Use Structured Streaming with the static `reviews` table as a rate-limited source (e.g., `spark.readStream.format(\"iceberg\").load(...)`).\n", + "- Demonstrate watermarking and exactly-once upserts back into an Iceberg staging table.\n", + "\n", + "### E2. Late Data Handling\n", + "- Inject artificial delays by duplicating rows with older timestamps; verify logic handles duplicates without off-stack sources.\n", + "\n", + "### E3. Quality Metrics\n", + "- Compute per-batch metrics (records processed, duplicates dropped) and write them to `iceberg.hotels_practice.stream_metrics`.\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## ✅ Section F — Data Quality & Governance\n", + "\n", + "### F1. Automated Profiling\n", + "- Write profiling helpers that operate on the Iceberg tables (min/max, distinct counts, null ratios) and persist to `data_quality_profile`.\n", + "\n", + "### F2. Contract Enforcement\n", + "- Express a contract for `bookings` as code (dictionary). Validate nightly and log violations to `data_quality_violations`.\n", + "\n", + "### F3. Catalog Documentation\n", + "- Build a small metadata table with columns (table_name, description, owner, quality_score, sample_query) sourced from the current dataset.\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 🧪 Section G — Experimentation & Monitoring (Dataset Only)\n", + "\n", + "### G1. Prompt/Response Logging Stub\n", + "- Create a framework that logs generated summaries derived from `reviews` into `genai_prompt_log` (Iceberg table).\n", + "- Include latency, prompt hash, response hash columns.\n", + "\n", + "### G2. Offline Metrics\n", + "- Compute text similarity metrics between review_text and generated summaries using only local libraries (e.g., cosine over TF-IDF).\n", + "- Store metrics per run in `genai_eval_metrics`.\n", + "\n", + "### G3. A/B Simulation\n", + "- Split hotels into pseudo A/B cohorts using existing data; compute uplift in review conversion or image engagement using window functions.\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## ☕ Section H — JVM & API Exercises within Dataset Scope\n", + "\n", + "### H1. Scala/Java Spark Translation\n", + "- Translate the PySpark booking aggregation into Scala/Java using the same Iceberg tables (include code snippet or sbt skeleton).\n", + "\n", + "### H2. Data Access Service Sketch\n", + "- Design a REST/gRPC service that reads from Iceberg via Spark or Trino to serve hotel insights.\n", + "- Keep the architecture grounded in the existing dataset (no new storages).\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 🔐 Section I — Privacy & Compliance on Generated Tables\n", + "\n", + "### I1. PII Masking\n", + "- Re-write `reviews` masking emails/phones using regex UDFs; store results in `reviews_redacted`.\n", + "\n", + "### I2. Auditing Access\n", + "- Capture query history against `hotels_practice` tables by parsing Spark event logs or Trino query logs (simulated).\n", + "\n", + "### I3. Data Retention Drill\n", + "- Implement a delete workflow for a user (`user_id`) across bookings/reviews/images inside Iceberg, preserving an audit trail table.\n" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.11" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +}