diff --git a/docker-compose.yml b/docker-compose.yml index 2c8a32b..6f75d0a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -290,12 +290,14 @@ services: - AWS_ACCESS_KEY_ID=${MINIO_ROOT_USER} - AWS_SECRET_ACCESS_KEY=${MINIO_ROOT_PASSWORD} - AWS_JAVA_V1_DISABLE_DEPRECATION_ANNOUNCEMENT=true + - PYSPARK_PYTHON=/opt/python/bin/python ports: - "8091:8091" extra_hosts: - "localhost:host-gateway" volumes: - ./infra/airflow/processing/spark/jobs:/opt/spark/jobs + - ./notebooks:/home/jovyan/work healthcheck: test: ["CMD", "curl", "-f", "http://spark-master:8080"] interval: 30s @@ -328,12 +330,14 @@ services: - AWS_ACCESS_KEY_ID=${MINIO_ROOT_USER} - AWS_SECRET_ACCESS_KEY=${MINIO_ROOT_PASSWORD} - AWS_JAVA_V1_DISABLE_DEPRECATION_ANNOUNCEMENT=true + - PYSPARK_PYTHON=/opt/python/bin/python ports: - "8092:8092" extra_hosts: - "localhost:host-gateway" volumes: - ./infra/airflow/processing/spark/jobs:/opt/spark/jobs + - ./notebooks:/home/jovyan/work healthcheck: test: ["CMD", "curl", "-f", "http://spark-master:8080"] interval: 30s @@ -599,6 +603,8 @@ services: - TRINO_URL=http://trino:8080 - SUPERSET_URL=http://superset:8088 - AIRFLOW_URL=http://airflow-apiserver:8080 + - PYSPARK_PYTHON=/opt/conda/envs/notebook/bin/python + - PYSPARK_DRIVER_PYTHON=/opt/conda/envs/notebook/bin/python profiles: [explore] volumes: diff --git a/infra/jupyterlab/Dockerfile b/infra/jupyterlab/Dockerfile index 3d8bb57..eb6b054 100644 --- a/infra/jupyterlab/Dockerfile +++ b/infra/jupyterlab/Dockerfile @@ -8,6 +8,7 @@ RUN apt-get update && apt-get install -y \ && rm -rf /var/lib/apt/lists/* ARG SPARK_VERSION=3.5.6 +ARG PYTHON_VERSION=3.12 RUN set -eux; \ curl -fSL --connect-timeout 20 --max-time 900 --retry 5 --retry-connrefused \ -o /tmp/spark.tgz "https://downloads.apache.org/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop3.tgz"; \ @@ -17,6 +18,18 @@ RUN set -eux; \ ENV SPARK_HOME=/opt/spark ENV PATH="$SPARK_HOME/bin:$PATH" +SHELL ["bash","-o","pipefail","-c"] + +ENV CONDA_ENV_PATH=/opt/conda/envs/notebook + +RUN mamba create -y -n notebook python=${PYTHON_VERSION} pip jupyterlab notebook && \ + mamba clean -afy + +ENV PATH="${CONDA_ENV_PATH}/bin:${PATH}" +ENV CONDA_DEFAULT_ENV=notebook +ENV PYSPARK_PYTHON="${CONDA_ENV_PATH}/bin/python" +ENV PYSPARK_DRIVER_PYTHON="${CONDA_ENV_PATH}/bin/python" + USER $NB_UID RUN pip install --no-cache-dir \ trino[sqlalchemy] \ diff --git a/infra/spark/Dockerfile b/infra/spark/Dockerfile index 3f69544..7b2b276 100644 --- a/infra/spark/Dockerfile +++ b/infra/spark/Dockerfile @@ -2,7 +2,25 @@ FROM bitnamilegacy/spark:3.5.6 USER 0 -RUN pip install --no-cache-dir pyarrow pandas fastavro confluent-kafka[avro] +RUN apt-get update && apt-get install -y --no-install-recommends \ + curl \ + bzip2 \ + ca-certificates \ + && rm -rf /var/lib/apt/lists/* + +ARG PYTHON_VERSION=3.12 +ENV MAMBA_ROOT_PREFIX=/opt/micromamba + +RUN curl -Ls "https://micro.mamba.pm/api/micromamba/linux-64/latest" | tar -xj -C /tmp && \ + install -m 0755 /tmp/bin/micromamba /usr/local/bin/micromamba && \ + rm -rf /tmp/bin + +RUN micromamba create -y -p /opt/python python=${PYTHON_VERSION} pip && \ + micromamba clean -afy + +ENV PATH="/opt/python/bin:${PATH}" + +RUN /opt/python/bin/pip install --no-cache-dir pyarrow pandas fastavro confluent-kafka[avro] RUN set -eux; \ mkdir -p "${SPARK_HOME}/jars"; \ @@ -21,5 +39,7 @@ RUN chown -R 1001:1001 ${SPARK_HOME}/conf ENV SPARK_JARS_DIR="${SPARK_HOME}/jars" ENV PYTHONPATH="/opt/spark/jobs:${PYTHONPATH}" +ENV PYSPARK_PYTHON="/opt/python/bin/python" +ENV PYSPARK_DRIVER_PYTHON="/opt/python/bin/python" USER 1001 diff --git a/notebooks/playgrounds/hotels/hotels_iceberg_population.ipynb b/notebooks/playgrounds/hotels/hotels_iceberg_population.ipynb new file mode 100644 index 0000000..d43b2f5 --- /dev/null +++ b/notebooks/playgrounds/hotels/hotels_iceberg_population.ipynb @@ -0,0 +1,872 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "b61b95d0", + "metadata": {}, + "source": [ + "# 🧊 Hotels Iceberg Population Playground\n", + "\n", + "**Why**: Provide a reproducible Booking-style dataset for SQL/PySpark drills. \n", + "**How**: Synthesize booking-like DataFrames, merge them into Iceberg tables, and keep the run idempotent. \n", + "**Notes**: Safe to rerun; tables live in the `iceberg.hotels_practice` namespace.\n" + ] + }, + { + "cell_type": "markdown", + "id": "c8addbfe", + "metadata": {}, + "source": [ + "## ⚙️ Environment" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "0ddf3816", + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "\n", + "MINIO_ENDPOINT = os.getenv(\"MINIO_ENDPOINT\", \"http://minio:9000\")\n", + "MINIO_ACCESS_KEY = os.getenv(\"MINIO_ROOT_USER\", \"minio\")\n", + "MINIO_SECRET_KEY = os.getenv(\"MINIO_ROOT_PASSWORD\", \"minio123\")\n", + "HIVE_METASTORE_URI = os.getenv(\"HIVE_METASTORE_URI\", \"thrift://hive-metastore:9083\")\n", + "TRINO_URL = os.getenv(\"TRINO_URL\", \"http://trino:8080\")\n", + "SPARK_MASTER = os.getenv(\"SPARK_MASTER_URL\", \"spark://spark-master:7077\")\n", + "S3_ENDPOINT = os.getenv(\"S3_ENDPOINT\", \"minio:9000\")\n", + "\n", + "os.environ.setdefault(\"AWS_REGION\", \"us-east-1\")\n", + "os.environ.setdefault(\"AWS_DEFAULT_REGION\", os.environ[\"AWS_REGION\"])\n", + "\n", + "print(\"MinIO:\", MINIO_ENDPOINT)\n", + "print(\"Hive metastore:\", HIVE_METASTORE_URI)\n", + "print(\"Spark master:\", SPARK_MASTER)\n", + "print(\"Trino:\", TRINO_URL)\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d5ab5d87", + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "----------------------------------------\n", + "Exception occurred during processing of request from ('127.0.0.1', 48876)\n", + "Traceback (most recent call last):\n", + " File \"/opt/conda/envs/notebook/lib/python3.12/socketserver.py\", line 318, in _handle_request_noblock\n", + " self.process_request(request, client_address)\n", + " File \"/opt/conda/envs/notebook/lib/python3.12/socketserver.py\", line 349, in process_request\n", + " self.finish_request(request, client_address)\n", + " File \"/opt/conda/envs/notebook/lib/python3.12/socketserver.py\", line 362, in finish_request\n", + " self.RequestHandlerClass(request, client_address, self)\n", + " File \"/opt/conda/envs/notebook/lib/python3.12/socketserver.py\", line 766, in __init__\n", + " self.handle()\n", + " File \"/opt/spark/python/pyspark/accumulators.py\", line 295, in handle\n", + " poll(accum_updates)\n", + " File \"/opt/spark/python/pyspark/accumulators.py\", line 267, in poll\n", + " if self.rfile in r and func():\n", + " ^^^^^^\n", + " File \"/opt/spark/python/pyspark/accumulators.py\", line 271, in accum_updates\n", + " num_updates = read_int(self.rfile)\n", + " ^^^^^^^^^^^^^^^^^^^^\n", + " File \"/opt/spark/python/pyspark/serializers.py\", line 596, in read_int\n", + " raise EOFError\n", + "EOFError\n", + "----------------------------------------\n" + ] + } + ], + "source": [ + "from pyspark.sql import SparkSession\n", + "\n", + "packages = [\n", + " \"org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.9.2\",\n", + " \"org.apache.hadoop:hadoop-aws:3.3.4\",\n", + " \"software.amazon.awssdk:bundle:2.20.158\"\n", + "]\n", + "\n", + "print(\"🚀 Creating Spark session with Iceberg support …\")\n", + "\n", + "spark = (\n", + " SparkSession.builder\n", + " .appName(\"HotelsIcebergPopulation\")\n", + " .master(SPARK_MASTER)\n", + " .config(\"spark.jars.packages\", \",\".join(packages))\n", + " .config(\"spark.sql.catalog.spark_catalog\", \"org.apache.iceberg.spark.SparkSessionCatalog\")\n", + " .config(\"spark.sql.catalog.spark_catalog.type\", \"hive\")\n", + " .config(\"spark.sql.catalog.iceberg\", \"org.apache.iceberg.spark.SparkCatalog\")\n", + " .config(\"spark.sql.catalog.iceberg.type\", \"rest\")\n", + " .config(\"spark.sql.catalog.iceberg.uri\", \"http://hive-metastore:9001/iceberg\")\n", + " .config(\"spark.sql.catalog.iceberg.warehouse\", \"s3a://iceberg/warehouse\")\n", + " .config(\"spark.sql.catalog.iceberg.s3.endpoint\", f\"http://{S3_ENDPOINT}\")\n", + " .config(\"spark.sql.catalog.iceberg.s3.access-key-id\", MINIO_ACCESS_KEY)\n", + " .config(\"spark.sql.catalog.iceberg.s3.secret-access-key\", MINIO_SECRET_KEY)\n", + " .config(\"spark.sql.catalog.iceberg.s3.region\", os.environ[\"AWS_REGION\"])\n", + " .config(\"spark.sql.catalog.iceberg.s3.path-style-access\", \"true\")\n", + " .config(\"spark.sql.catalog.iceberg.s3.connection-ssl-enabled\", \"false\")\n", + " .config(\"spark.sql.catalog.iceberg.s3.sse.type\", \"none\")\n", + " .config(\"spark.sql.extensions\", \"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions\")\n", + " .config(\"spark.hadoop.hive.metastore.uris\", HIVE_METASTORE_URI)\n", + " .config(\"spark.hadoop.fs.s3a.endpoint\", S3_ENDPOINT)\n", + " .config(\"spark.hadoop.fs.s3a.access.key\", MINIO_ACCESS_KEY)\n", + " .config(\"spark.hadoop.fs.s3a.secret.key\", MINIO_SECRET_KEY)\n", + " .config(\"spark.hadoop.fs.s3a.path.style.access\", \"true\")\n", + " .config(\"spark.hadoop.fs.s3a.impl\", \"org.apache.hadoop.fs.s3a.S3AFileSystem\")\n", + " .config(\"spark.hadoop.fs.s3a.connection.ssl.enabled\", \"false\")\n", + " .config(\"spark.hadoop.fs.s3a.aws.credentials.provider\", \"org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider\")\n", + " .config(\"spark.sql.defaultCatalog\", \"iceberg\")\n", + " .enableHiveSupport()\n", + " .getOrCreate()\n", + ")\n", + "\n", + "spark.sparkContext.setLogLevel(\"WARN\")\n", + "spark.conf.set(\"spark.sql.session.timeZone\", \"UTC\")\n", + "\n", + "print(\"✅ Spark session ready (Iceberg REST catalog)\")\n", + "print(\" Spark version:\", spark.version)\n" + ] + }, + { + "cell_type": "markdown", + "id": "cc7608ef", + "metadata": {}, + "source": [ + "## 📦 Configuration\n", + "\n", + "Reproducible volumes and paths for the synthetic contract." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "661d720e", + "metadata": {}, + "outputs": [], + "source": [ + "from dataclasses import dataclass, field\n", + "import random\n", + "\n", + "@dataclass\n", + "class HotelsPracticeConfig:\n", + " random_seed: int = 7\n", + " catalog: str = \"iceberg\"\n", + " namespace: str = os.getenv(\"HOTELS_ICEBERG_NAMESPACE\", \"hotels_practice\")\n", + " volumes: dict = field(default_factory=lambda: {\n", + " \"hotels\": 200,\n", + " \"users\": 2000,\n", + " \"bookings\": 8000,\n", + " \"reviews\": 6000,\n", + " \"images\": 2500,\n", + " })\n", + "\n", + "cfg = HotelsPracticeConfig()\n", + "rng = random.Random(cfg.random_seed)\n", + "\n", + "print(cfg)\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "9fd4b8ba", + "metadata": {}, + "outputs": [], + "source": [ + "import json\n", + "import uuid\n", + "from datetime import date, datetime, timedelta\n", + "from typing import Optional\n", + "\n", + "from pyspark.sql import functions as F, types as T\n", + "\n", + "\n", + "def random_date(rng, start: date, end: date) -> date:\n", + " return start + timedelta(days=rng.randint(0, (end - start).days))\n", + "\n", + "\n", + "def random_timestamp(rng, start: datetime, end: datetime) -> datetime:\n", + " total_seconds = int((end - start).total_seconds())\n", + " return start + timedelta(seconds=rng.randint(0, total_seconds))\n", + "\n", + "\n", + "def deterministic_uuid(prefix: str, index: int) -> str:\n", + " return str(uuid.uuid5(uuid.NAMESPACE_URL, f\"{prefix}-{index}\"))\n", + "\n", + "\n", + "today = datetime.utcnow().date()\n", + "current_ts = datetime.utcnow()\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "36f1a1cf", + "metadata": {}, + "outputs": [], + "source": [ + "country_cities = {\n", + " \"US\": [\"New York\", \"San Francisco\", \"Chicago\", \"Austin\", \"Miami\"],\n", + " \"CA\": [\"Toronto\", \"Vancouver\", \"Montreal\", \"Calgary\", \"Ottawa\"],\n", + " \"GB\": [\"London\", \"Manchester\", \"Edinburgh\", \"Bristol\", \"Bath\"],\n", + " \"FR\": [\"Paris\", \"Lyon\", \"Nice\", \"Bordeaux\", \"Lille\"],\n", + " \"NL\": [\"Amsterdam\", \"Rotterdam\", \"Utrecht\", \"The Hague\", \"Eindhoven\"],\n", + " \"DE\": [\"Berlin\", \"Munich\", \"Frankfurt\", \"Hamburg\", \"Cologne\"],\n", + " \"ES\": [\"Barcelona\", \"Madrid\", \"Seville\", \"Valencia\", \"Bilbao\"],\n", + " \"IT\": [\"Rome\", \"Florence\", \"Milan\", \"Venice\", \"Bologna\"],\n", + " \"IL\": [\"Tel Aviv\", \"Jerusalem\", \"Haifa\", \"Eilat\", \"Herzliya\"],\n", + " \"PT\": [\"Lisbon\", \"Porto\", \"Faro\", \"Braga\", \"Coimbra\"],\n", + " \"SE\": [\"Stockholm\", \"Gothenburg\", \"Malmo\", \"Uppsala\", \"Visby\"],\n", + " \"JP\": [\"Tokyo\", \"Kyoto\", \"Osaka\", \"Sapporo\", \"Fukuoka\"],\n", + " \"AU\": [\"Sydney\", \"Melbourne\", \"Perth\", \"Brisbane\", \"Adelaide\"],\n", + " \"BR\": [\"Rio\", \"Sao Paulo\", \"Brasilia\", \"Salvador\", \"Recife\"],\n", + " \"AE\": [\"Dubai\", \"Abu Dhabi\", \"Sharjah\", \"Al Ain\", \"Ras Al Khaimah\"],\n", + " \"ZA\": [\"Cape Town\", \"Johannesburg\", \"Durban\", \"Pretoria\", \"Bloemfontein\"],\n", + "}\n", + "\n", + "chains = [\"StayPro\", \"BoutiqueX\", \"UrbanNest\", \"Skyline\", \"Coastline\", \"Heritage\", \"Aurora\"]\n", + "adjectives = [\"Grand\", \"Crystal\", \"Sunset\", \"Aurora\", \"Vertex\", \"Harbor\", \"Atlas\", \"Zenith\", \"Summit\", \"Velvet\"]\n", + "nouns = [\"Resort\", \"Suites\", \"Inn\", \"Lodge\", \"Retreat\", \"Villa\", \"Boutique\", \"Haven\", \"Plaza\", \"Terrace\"]\n", + "room_rates = {}\n", + "\n", + "hotels_records = []\n", + "for idx in range(cfg.volumes[\"hotels\"]):\n", + " country = rng.choice(list(country_cities))\n", + " city = rng.choice(country_cities[country])\n", + " stars = rng.choices([1, 2, 3, 4, 5], weights=[0.05, 0.15, 0.35, 0.3, 0.15])[0]\n", + " num_rooms = rng.randint(35, 420)\n", + " chain = rng.choice(chains) if rng.random() < 0.4 else None\n", + " hotel_id = f\"H{100000 + idx}\"\n", + " room_rates[hotel_id] = rng.randint(80, 420)\n", + " hotel_name = f\"{rng.choice(adjectives)} {rng.choice(nouns)} {city}\"\n", + " hotels_records.append({\n", + " \"hotel_id\": hotel_id,\n", + " \"hotel_name\": hotel_name,\n", + " \"country\": country,\n", + " \"city\": city,\n", + " \"stars\": stars,\n", + " \"num_rooms\": num_rooms,\n", + " \"chain\": chain,\n", + " })\n", + "\n", + "hotel_schema = T.StructType([\n", + " T.StructField(\"hotel_id\", T.StringType(), False),\n", + " T.StructField(\"hotel_name\", T.StringType(), False),\n", + " T.StructField(\"country\", T.StringType(), False),\n", + " T.StructField(\"city\", T.StringType(), False),\n", + " T.StructField(\"stars\", T.IntegerType(), False),\n", + " T.StructField(\"num_rooms\", T.IntegerType(), False),\n", + " T.StructField(\"chain\", T.StringType(), True),\n", + "])\n", + "\n", + "hotels_df = spark.createDataFrame(hotels_records, schema=hotel_schema)\n", + "print(f\"Hotels: {hotels_df.count()}\")\n", + "hotels_df.orderBy(\"hotel_id\").show(5, truncate=False)\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d39cd004", + "metadata": {}, + "outputs": [], + "source": [ + "users_records = []\n", + "user_schema = T.StructType([\n", + " T.StructField(\"user_id\", T.StringType(), False),\n", + " T.StructField(\"home_country\", T.StringType(), False),\n", + " T.StructField(\"signup_date\", T.DateType(), False),\n", + " T.StructField(\"age\", T.IntegerType(), False),\n", + "])\n", + "\n", + "user_ids = []\n", + "for idx in range(cfg.volumes[\"users\"]):\n", + " user_id = f\"U{100000 + idx}\"\n", + " user_ids.append(user_id)\n", + " signup_date = random_date(rng, date(2015, 1, 1), date(2024, 1, 1))\n", + " users_records.append({\n", + " \"user_id\": user_id,\n", + " \"home_country\": rng.choice(list(country_cities)),\n", + " \"signup_date\": signup_date,\n", + " \"age\": rng.randint(18, 78),\n", + " })\n", + "\n", + "users_df = spark.createDataFrame(users_records, schema=user_schema)\n", + "print(f\"Users: {users_df.count()}\")\n", + "users_df.orderBy(\"user_id\").show(5, truncate=False)\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7d9bda3a", + "metadata": {}, + "outputs": [], + "source": [ + "booking_schema = T.StructType([\n", + " T.StructField(\"booking_id\", T.StringType(), False),\n", + " T.StructField(\"user_id\", T.StringType(), False),\n", + " T.StructField(\"hotel_id\", T.StringType(), False),\n", + " T.StructField(\"checkin_date\", T.DateType(), False),\n", + " T.StructField(\"checkout_date\", T.DateType(), False),\n", + " T.StructField(\"nights\", T.IntegerType(), False),\n", + " T.StructField(\"price_usd\", T.DoubleType(), False),\n", + " T.StructField(\"status\", T.StringType(), False),\n", + "])\n", + "\n", + "booking_records = []\n", + "hotel_ids = [row[\"hotel_id\"] for row in hotels_records]\n", + "status_weights = [(\"completed\", 0.78), (\"cancelled\", 0.14), (\"no_show\", 0.08)]\n", + "\n", + "recent_booking_start = today - timedelta(days=540)\n", + "focus_booking_start = today - timedelta(days=180)\n", + "primary_end = today - timedelta(days=1)\n", + "\n", + "for idx in range(cfg.volumes[\"bookings\"]):\n", + " user_id = rng.choice(user_ids)\n", + " hotel_id = rng.choice(hotel_ids)\n", + " if rng.random() < 0.9:\n", + " window_start = focus_booking_start\n", + " window_end = primary_end\n", + " else:\n", + " window_start = recent_booking_start\n", + " window_end = max(recent_booking_start, focus_booking_start - timedelta(days=1))\n", + " if window_start > window_end:\n", + " window_start, window_end = recent_booking_start, primary_end\n", + " checkin = random_date(rng, window_start, window_end)\n", + " max_nights = max(1, min(14, (today - checkin).days))\n", + " nights = rng.randint(1, max_nights)\n", + " checkout = checkin + timedelta(days=nights)\n", + " status = rng.choices([s for s, _ in status_weights], weights=[w for _, w in status_weights])[0]\n", + " base_price = room_rates[hotel_id] * nights * rng.uniform(0.9, 1.3)\n", + " if status == \"completed\":\n", + " price = round(base_price, 2)\n", + " elif status == \"cancelled\":\n", + " price = round(base_price * rng.uniform(0.1, 0.4), 2)\n", + " else:\n", + " price = round(base_price * rng.uniform(0.0, 0.2), 2)\n", + " booking_records.append({\n", + " \"booking_id\": deterministic_uuid(\"booking\", idx),\n", + " \"user_id\": user_id,\n", + " \"hotel_id\": hotel_id,\n", + " \"checkin_date\": checkin,\n", + " \"checkout_date\": checkout,\n", + " \"nights\": nights,\n", + " \"price_usd\": float(price),\n", + " \"status\": status,\n", + " })\n", + "\n", + "bookings_df = spark.createDataFrame(booking_records, schema=booking_schema)\n", + "print(f\"Bookings: {bookings_df.count()}\")\n", + "bookings_df.groupBy(\"status\").count().show()\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ae35f7b9", + "metadata": {}, + "outputs": [], + "source": [ + "review_schema = T.StructType([\n", + " T.StructField(\"review_id\", T.StringType(), False),\n", + " T.StructField(\"user_id\", T.StringType(), False),\n", + " T.StructField(\"hotel_id\", T.StringType(), False),\n", + " T.StructField(\"rating\", T.IntegerType(), False),\n", + " T.StructField(\"created_at\", T.TimestampType(), False),\n", + " T.StructField(\"review_text\", T.StringType(), True),\n", + " T.StructField(\"review_metadata\", T.StringType(), True),\n", + " T.StructField(\"lang\", T.StringType(), True),\n", + "])\n", + "\n", + "hotel_lookup = {record[\"hotel_id\"]: record for record in hotels_records}\n", + "\n", + "rating_templates = {\n", + " 5: [\n", + " {\"text\": \"Exceptional stay at {hotel_name} in {city}; staff anticipated every need.\", \"lang\": \"en\"},\n", + " {\"text\": \"Worth the {price_per_night:.0f} USD nightly rate at {hotel_name}; pure comfort.\", \"lang\": \"en\"},\n", + " {\"text\": \"Servicio excelente en {hotel_name} ({city}), volveremos pronto.\", \"lang\": \"es\"},\n", + " {\"text\": \"Esperienza eccellente al {hotel_name} di {city}; ritorneremo certamente.\", \"lang\": \"it\"},\n", + " {\"text\": \"חוויה מושלמת ב-{hotel_name} שב-{city}, הכל היה מדויק.\", \"lang\": \"he\"},\n", + " ],\n", + " 4: [\n", + " {\"text\": \"{hotel_name} in {city} delivered a smooth check-in and quiet room.\", \"lang\": \"en\"},\n", + " {\"text\": \"Bon rapport qualité/prix à {hotel_name}; équipe attentionnée.\", \"lang\": \"fr\"},\n", + " {\"text\": \"Rooftop bar at {hotel_name} made the {nights} nights memorable.\", \"lang\": \"en\"},\n", + " {\"text\": \"Camere spaziose e personale cordiale al {hotel_name} di {city}.\", \"lang\": \"it\"},\n", + " ],\n", + " 3: [\n", + " {\"text\": \"Stay at {hotel_name} in {city} was decent, though amenities felt basic.\", \"lang\": \"en\"},\n", + " {\"text\": \"Habitación correcta en {hotel_name}, pero poco encanto.\", \"lang\": \"es\"},\n", + " {\"text\": \"{hotel_name} bietet solide Lage in {city}, aber Service wirkt routiniert.\", \"lang\": \"de\"},\n", + " {\"text\": \"Camera nella media al {hotel_name}; niente di speciale.\", \"lang\": \"it\"},\n", + " ],\n", + " 2: [\n", + " {\"text\": \"{hotel_name} in {city} struggled with slow check-in and thin walls.\", \"lang\": \"en\"},\n", + " {\"text\": \"Peu de pression d'eau et chauffage capricieux à {hotel_name}.\", \"lang\": \"fr\"},\n", + " {\"text\": \"Servicio frío en {hotel_name}; necesitamos mejorar la limpieza.\", \"lang\": \"es\"},\n", + " {\"text\": \"Personale distratto al {hotel_name}; esperienza deludente.\", \"lang\": \"it\"},\n", + " ],\n", + " 1: [\n", + " {\"text\": \"Worst check-in at {hotel_name}; {nights} nights felt endless.\", \"lang\": \"en\"},\n", + " {\"text\": \"Nos fuimos antes: {hotel_name} en {city} fue un desastre.\", \"lang\": \"es\"},\n", + " {\"text\": \"לעולם לא נחזור ל-{hotel_name}; רעש בלתי נסבל.\", \"lang\": \"he\"},\n", + " {\"text\": \"Esperienza pessima al {hotel_name}; siamo andati via subito.\", \"lang\": \"it\"},\n", + " ],\n", + "}\n", + "\n", + "def build_review_snippet(rating: int, booking: dict):\n", + " templates = rating_templates.get(rating)\n", + " if not templates:\n", + " return None, None\n", + " hotel = hotel_lookup.get(booking[\"hotel_id\"], {})\n", + " context = {\n", + " \"hotel_name\": hotel.get(\"hotel_name\", \"the hotel\"),\n", + " \"city\": hotel.get(\"city\", \"the city\"),\n", + " \"chain\": hotel.get(\"chain\") or \"independent\",\n", + " \"nights\": booking[\"nights\"],\n", + " \"price_per_night\": booking[\"price_usd\"] / booking[\"nights\"] if booking[\"nights\"] else booking[\"price_usd\"],\n", + " }\n", + " template = rng.choice(templates)\n", + " text = template[\"text\"].format(**context)\n", + " if rating in followup_phrases and rng.random() < 0.4:\n", + " text = f\"{text} {rng.choice(followup_phrases[rating])}\"\n", + " return text, template[\"lang\"]\n", + "\n", + "followup_phrases = {\n", + " 5: [\n", + " \"Breakfast buffet was a highlight.\",\n", + " \"Spa booking was effortless.\",\n", + " \"Loved the skyline view from the suite.\",\n", + " ],\n", + " 4: [\n", + " \"Would happily stay again once the gym reopens.\",\n", + " \"Concierge solved transport within minutes.\",\n", + " ],\n", + " 3: [\n", + " \"Staff tried their best, yet small details were missed.\",\n", + " \"Good for a short business hop.\",\n", + " ],\n", + " 2: [\n", + " \"Maintenance team eventually helped, but the wait was long.\",\n", + " \"Noise from hallway kept us up.\",\n", + " ],\n", + " 1: [\n", + " \"Requested refund for the final night.\",\n", + " \"Left after the first evening despite plans.\",\n", + " ],\n", + "}\n", + "\n", + "review_tags_vocab = [\"clean\", \"view\", \"staff\", \"breakfast\", \"location\", \"spa\", \"business\", \"family\", \"quiet\", \"food\", \"design\", \"access\"]\n", + "review_sources = [\"mobile_app\", \"desktop_web\", \"partner_site\"]\n", + "review_devices = [\"ios\", \"android\", \"web\"]\n", + "stay_purposes = [\"business\", \"leisure\", \"family\", \"event\"]\n", + "\n", + "completed_bookings = [rec for rec in booking_records if rec[\"status\"] == \"completed\"]\n", + "num_reviews = min(cfg.volumes[\"reviews\"], len(completed_bookings))\n", + "rng.shuffle(completed_bookings)\n", + "review_candidates = completed_bookings[:num_reviews]\n", + "\n", + "recent_review_floor = datetime.combine(today - timedelta(days=540), datetime.min.time())\n", + "user_last_review_ts = {}\n", + "review_records = []\n", + "for idx, booking in enumerate(review_candidates):\n", + " rating = rng.choices([1, 2, 3, 4, 5], weights=[0.04, 0.08, 0.22, 0.36, 0.30])[0]\n", + " base_datetime = (\n", + " datetime.combine(booking[\"checkout_date\"], datetime.min.time())\n", + " + timedelta(hours=rng.randint(7, 22), minutes=rng.randint(0, 59))\n", + " )\n", + " if rng.random() < 0.3 and booking[\"user_id\"] in user_last_review_ts:\n", + " created_at = user_last_review_ts[booking[\"user_id\"]] + timedelta(minutes=rng.randint(5, 25))\n", + " else:\n", + " created_at = base_datetime + timedelta(minutes=rng.randint(0, 240))\n", + " prev_ts = user_last_review_ts.get(booking[\"user_id\"])\n", + " if prev_ts and created_at <= prev_ts:\n", + " created_at = prev_ts + timedelta(minutes=rng.randint(10, 240))\n", + " if created_at > current_ts:\n", + " created_at = current_ts - timedelta(minutes=rng.randint(5, 240))\n", + " if created_at < recent_review_floor:\n", + " created_at = recent_review_floor + timedelta(days=rng.randint(0, 30), minutes=rng.randint(0, 120))\n", + " if rng.random() < 0.1:\n", + " review_text, lang = None, None\n", + " else:\n", + " review_text, lang = build_review_snippet(rating, booking)\n", + " sentiment = \"positive\" if rating >= 4 else \"negative\" if rating <= 2 else \"neutral\"\n", + " tags = rng.sample(review_tags_vocab, k=rng.randint(1, min(3, len(review_tags_vocab))))\n", + " metadata = json.dumps({\n", + " \"tags\": tags,\n", + " \"source\": rng.choice(review_sources),\n", + " \"device\": rng.choice(review_devices),\n", + " \"stay_purpose\": rng.choice(stay_purposes),\n", + " \"sentiment\": sentiment,\n", + " })\n", + " review_records.append({\n", + " \"review_id\": deterministic_uuid(\"review\", idx),\n", + " \"user_id\": booking[\"user_id\"],\n", + " \"hotel_id\": booking[\"hotel_id\"],\n", + " \"rating\": int(rating),\n", + " \"created_at\": created_at,\n", + " \"review_text\": review_text,\n", + " \"review_metadata\": metadata,\n", + " \"lang\": lang,\n", + " })\n", + " user_last_review_ts[booking[\"user_id\"]] = created_at\n", + "\n", + "reviews_df = spark.createDataFrame(review_records, schema=review_schema)\n", + "print(f\"Reviews: {reviews_df.count()}\")\n", + "reviews_df.groupBy(\"rating\").count().orderBy(\"rating\").show()\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "65a4005b", + "metadata": {}, + "outputs": [], + "source": [ + "image_schema = T.StructType([\n", + " T.StructField(\"image_id\", T.StringType(), False),\n", + " T.StructField(\"hotel_id\", T.StringType(), False),\n", + " T.StructField(\"url\", T.StringType(), False),\n", + " T.StructField(\"width\", T.IntegerType(), False),\n", + " T.StructField(\"height\", T.IntegerType(), False),\n", + " T.StructField(\"aspect_ratio\", T.DoubleType(), False),\n", + " T.StructField(\"tag\", T.StringType(), False),\n", + " T.StructField(\"quality_score\", T.DoubleType(), False),\n", + " T.StructField(\"created_at\", T.TimestampType(), False),\n", + "])\n", + "\n", + "image_tags = [\"room\", \"lobby\", \"pool\", \"restaurant\", \"gym\", \"spa\", \"exterior\", \"suite\", \"bar\", \"conference\"]\n", + "width_choices = [800, 1024, 1280, 1600, 1920, 2560]\n", + "height_choices = [600, 720, 900, 1080, 1440]\n", + "\n", + "image_records = []\n", + "image_counter = 0\n", + "image_window_start = current_ts - timedelta(days=365)\n", + "image_recent_threshold = current_ts - timedelta(days=90)\n", + "image_end = current_ts\n", + "\n", + "for hotel in hotels_records:\n", + " base_images = rng.randint(6, 10)\n", + " for idx in range(base_images):\n", + " width = rng.choice(width_choices)\n", + " height = rng.choice(height_choices)\n", + " aspect_ratio = round(width / height, 4)\n", + " created_at = random_timestamp(rng, image_window_start, image_end)\n", + " quality = rng.uniform(0.6, 0.97)\n", + " if created_at >= image_recent_threshold:\n", + " quality = max(quality, rng.uniform(0.75, 0.98))\n", + " image_records.append({\n", + " \"image_id\": deterministic_uuid(\"image\", image_counter),\n", + " \"hotel_id\": hotel[\"hotel_id\"],\n", + " \"url\": f\"https://cdn.practice.example/hotels/{hotel['hotel_id']}/img_{idx:03d}.jpg\",\n", + " \"width\": width,\n", + " \"height\": height,\n", + " \"aspect_ratio\": float(aspect_ratio),\n", + " \"tag\": rng.choice(image_tags),\n", + " \"quality_score\": round(quality, 3),\n", + " \"created_at\": created_at,\n", + " })\n", + " image_counter += 1\n", + " recent_min = rng.randint(4, 6)\n", + " for r_idx in range(recent_min):\n", + " width = rng.choice(width_choices)\n", + " height = rng.choice(height_choices)\n", + " aspect_ratio = round(width / height, 4)\n", + " created_at = random_timestamp(rng, image_recent_threshold, image_end)\n", + " quality = rng.uniform(0.78, 0.98)\n", + " image_records.append({\n", + " \"image_id\": deterministic_uuid(\"image\", image_counter),\n", + " \"hotel_id\": hotel[\"hotel_id\"],\n", + " \"url\": f\"https://cdn.practice.example/hotels/{hotel['hotel_id']}/recent_{r_idx:03d}.jpg\",\n", + " \"width\": width,\n", + " \"height\": height,\n", + " \"aspect_ratio\": float(aspect_ratio),\n", + " \"tag\": rng.choice(image_tags),\n", + " \"quality_score\": round(quality, 3),\n", + " \"created_at\": created_at,\n", + " })\n", + " image_counter += 1\n", + "\n", + "images_df = spark.createDataFrame(image_records, schema=image_schema)\n", + "print(f\"Images: {images_df.count()}\")\n", + "images_df.groupBy(\"tag\").count().orderBy(\"count\", ascending=False).show(5)\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f6c8b61d", + "metadata": {}, + "outputs": [], + "source": [ + "mismatch = bookings_df.filter(F.datediff(\"checkout_date\", \"checkin_date\") != F.col(\"nights\"))\n", + "assert mismatch.count() == 0, \"Night count mismatch detected\"\n", + "print(\"✅ Nights column validated\")\n" + ] + }, + { + "cell_type": "markdown", + "id": "74386ba4", + "metadata": {}, + "source": [ + "## 🧊 Iceberg Population\n", + "\n", + "Merge the freshly generated DataFrames directly into Iceberg tables and inspect the row counts before cleanup.\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d7b61508", + "metadata": {}, + "outputs": [], + "source": [ + "staging_views = {\n", + " \"stg_hotels\": hotels_df,\n", + " \"stg_users\": users_df,\n", + " \"stg_bookings\": bookings_df,\n", + " \"stg_reviews\": reviews_df,\n", + " \"stg_images\": images_df,\n", + "}\n", + "\n", + "for view_name, dataframe in staging_views.items():\n", + " dataframe.createOrReplaceTempView(view_name)\n", + "\n", + "print(\"Staging counts:\")\n", + "for name, df in staging_views.items():\n", + " print(f\" - {name.replace('stg_', '')}: {df.count()}\")\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "namespace = f\"{cfg.catalog}.{cfg.namespace}\"\n", + "\n", + "spark.sql(f\"CREATE NAMESPACE IF NOT EXISTS {namespace}\")\n", + "\n", + "spark.sql(f\"\"\"\n", + "CREATE TABLE IF NOT EXISTS {namespace}.hotels (\n", + " hotel_id STRING,\n", + " hotel_name STRING,\n", + " country STRING,\n", + " city STRING,\n", + " stars INT,\n", + " num_rooms INT,\n", + " chain STRING\n", + ")\n", + "USING ICEBERG\n", + "PARTITIONED BY (country)\n", + "TBLPROPERTIES ('format-version'='2')\n", + "\"\"\")\n", + "\n", + "spark.sql(f\"\"\"\n", + "CREATE TABLE IF NOT EXISTS {namespace}.users (\n", + " user_id STRING,\n", + " home_country STRING,\n", + " signup_date DATE,\n", + " age INT\n", + ")\n", + "USING ICEBERG\n", + "PARTITIONED BY (home_country)\n", + "TBLPROPERTIES ('format-version'='2')\n", + "\"\"\")\n", + "\n", + "spark.sql(f\"\"\"\n", + "CREATE TABLE IF NOT EXISTS {namespace}.bookings (\n", + " booking_id STRING,\n", + " user_id STRING,\n", + " hotel_id STRING,\n", + " checkin_date DATE,\n", + " checkout_date DATE,\n", + " nights INT,\n", + " price_usd DOUBLE,\n", + " status STRING\n", + ")\n", + "USING ICEBERG\n", + "PARTITIONED BY (months(checkin_date), bucket(16, hotel_id))\n", + "TBLPROPERTIES ('format-version'='2')\n", + "\"\"\")\n", + "\n", + "spark.sql(f\"\"\"\n", + "CREATE TABLE IF NOT EXISTS {namespace}.reviews (\n", + " review_id STRING,\n", + " user_id STRING,\n", + " hotel_id STRING,\n", + " rating INT,\n", + " created_at TIMESTAMP,\n", + " review_text STRING,\n", + " review_metadata STRING,\n", + " lang STRING\n", + ")\n", + "USING ICEBERG\n", + "PARTITIONED BY (months(created_at))\n", + "TBLPROPERTIES ('format-version'='2')\n", + "\"\"\")\n", + "\n", + "spark.sql(f\"\"\"\n", + "CREATE TABLE IF NOT EXISTS {namespace}.images (\n", + " image_id STRING,\n", + " hotel_id STRING,\n", + " url STRING,\n", + " width INT,\n", + " height INT,\n", + " aspect_ratio DOUBLE,\n", + " tag STRING,\n", + " quality_score DOUBLE,\n", + " created_at TIMESTAMP\n", + ")\n", + "USING ICEBERG\n", + "PARTITIONED BY (bucket(32, hotel_id))\n", + "TBLPROPERTIES ('format-version'='2')\n", + "\"\"\")\n", + "\n", + "print(f\"✅ Tables ensured in {namespace}\")\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "spark.sql(f\"\"\"\n", + "MERGE INTO {namespace}.hotels AS target\n", + "USING stg_hotels AS source\n", + "ON target.hotel_id = source.hotel_id\n", + "WHEN MATCHED THEN UPDATE SET\n", + " hotel_name = source.hotel_name,\n", + " country = source.country,\n", + " city = source.city,\n", + " stars = source.stars,\n", + " num_rooms = source.num_rooms,\n", + " chain = source.chain\n", + "WHEN NOT MATCHED THEN INSERT (hotel_id, hotel_name, country, city, stars, num_rooms, chain)\n", + "VALUES (source.hotel_id, source.hotel_name, source.country, source.city, source.stars, source.num_rooms, source.chain)\n", + "\"\"\")\n", + "\n", + "spark.sql(f\"\"\"\n", + "MERGE INTO {namespace}.users AS target\n", + "USING stg_users AS source\n", + "ON target.user_id = source.user_id\n", + "WHEN MATCHED THEN UPDATE SET\n", + " home_country = source.home_country,\n", + " signup_date = source.signup_date,\n", + " age = source.age\n", + "WHEN NOT MATCHED THEN INSERT (user_id, home_country, signup_date, age)\n", + "VALUES (source.user_id, source.home_country, source.signup_date, source.age)\n", + "\"\"\")\n", + "\n", + "spark.sql(f\"\"\"\n", + "MERGE INTO {namespace}.bookings AS target\n", + "USING stg_bookings AS source\n", + "ON target.booking_id = source.booking_id\n", + "WHEN MATCHED THEN UPDATE SET\n", + " user_id = source.user_id,\n", + " hotel_id = source.hotel_id,\n", + " checkin_date = source.checkin_date,\n", + " checkout_date = source.checkout_date,\n", + " nights = source.nights,\n", + " price_usd = source.price_usd,\n", + " status = source.status\n", + "WHEN NOT MATCHED THEN INSERT (booking_id, user_id, hotel_id, checkin_date, checkout_date, nights, price_usd, status)\n", + "VALUES (source.booking_id, source.user_id, source.hotel_id, source.checkin_date, source.checkout_date, source.nights, source.price_usd, source.status)\n", + "\"\"\")\n", + "\n", + "spark.sql(f\"\"\"\n", + "MERGE INTO {namespace}.reviews AS target\n", + "USING stg_reviews AS source\n", + "ON target.review_id = source.review_id\n", + "WHEN MATCHED THEN UPDATE SET\n", + " user_id = source.user_id,\n", + " hotel_id = source.hotel_id,\n", + " rating = source.rating,\n", + " created_at = source.created_at,\n", + " review_text = source.review_text,\n", + " review_metadata = source.review_metadata,\n", + " lang = source.lang\n", + "WHEN NOT MATCHED THEN INSERT (review_id, user_id, hotel_id, rating, created_at, review_text, review_metadata, lang)\n", + "VALUES (source.review_id, source.user_id, source.hotel_id, source.rating, source.created_at, source.review_text, source.review_metadata, source.lang)\n", + "\"\"\")\n", + "\n", + "spark.sql(f\"\"\"\n", + "MERGE INTO {namespace}.images AS target\n", + "USING stg_images AS source\n", + "ON target.image_id = source.image_id\n", + "WHEN MATCHED THEN UPDATE SET\n", + " hotel_id = source.hotel_id,\n", + " url = source.url,\n", + " width = source.width,\n", + " height = source.height,\n", + " aspect_ratio = source.aspect_ratio,\n", + " tag = source.tag,\n", + " quality_score = source.quality_score,\n", + " created_at = source.created_at\n", + "WHEN NOT MATCHED THEN INSERT (image_id, hotel_id, url, width, height, aspect_ratio, tag, quality_score, created_at)\n", + "VALUES (source.image_id, source.hotel_id, source.url, source.width, source.height, source.aspect_ratio, source.tag, source.quality_score, source.created_at)\n", + "\"\"\")\n", + "\n", + "print(\"✅ Iceberg tables merged\")\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "print(\"Row counts in Iceberg namespace:\")\n", + "for table in [\"hotels\", \"users\", \"bookings\", \"reviews\", \"images\"]:\n", + " result = spark.sql(f\"SELECT COUNT(*) AS rows FROM {namespace}.{table}\")\n", + " count = result.collect()[0][\"rows\"]\n", + " print(f\" - {table}: {count}\")\n", + "\n", + "spark.sql(f\"SELECT status, COUNT(*) AS cnt FROM {namespace}.bookings GROUP BY status ORDER BY status\").show()\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "print(\"🧹 Cleaning up Iceberg namespace...\")\n", + "print(\" (Comment out this cell if you want to keep the generated tables.)\")\n", + "tables = ['hotels', 'users', 'bookings', 'reviews', 'images']\n", + "for table in tables:\n", + " spark.sql(f\"DROP TABLE IF EXISTS {namespace}.{table}\")\n", + "spark.sql(f\"DROP NAMESPACE IF EXISTS {namespace}\")\n", + "print(\"✅ Cleanup complete.\")\n" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.11" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/notebooks/playgrounds/hotels/hotels_practice_drills.ipynb b/notebooks/playgrounds/hotels/hotels_practice_drills.ipynb new file mode 100644 index 0000000..e4fb085 --- /dev/null +++ b/notebooks/playgrounds/hotels/hotels_practice_drills.ipynb @@ -0,0 +1,318 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# 🧊 Hotels Practice Drills\n", + "\n", + "Use this lab to run the SQL, PySpark, and Python exercises against the hotels practice dataset. The first two cells spin up Spark with Iceberg support and configure Trino access. All subsequent sections list tasks only—add your own cells beneath each bullet to implement solutions.\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "\n", + "MINIO_ENDPOINT = os.getenv(\"MINIO_ENDPOINT\", \"http://minio:9000\")\n", + "MINIO_ACCESS_KEY = os.getenv(\"MINIO_ROOT_USER\", \"minio\")\n", + "MINIO_SECRET_KEY = os.getenv(\"MINIO_ROOT_PASSWORD\", \"minio123\")\n", + "HIVE_METASTORE_URI = os.getenv(\"HIVE_METASTORE_URI\", \"thrift://hive-metastore:9083\")\n", + "TRINO_URL = os.getenv(\"TRINO_URL\", \"http://trino:8080\")\n", + "SPARK_MASTER = os.getenv(\"SPARK_MASTER_URL\", \"spark://spark-master:7077\")\n", + "S3_ENDPOINT = os.getenv(\"S3_ENDPOINT\", \"minio:9000\")\n", + "\n", + "os.environ.setdefault(\"AWS_REGION\", \"us-east-1\")\n", + "os.environ.setdefault(\"AWS_DEFAULT_REGION\", os.environ[\"AWS_REGION\"])\n", + "\n", + "print(\"Spark master:\", SPARK_MASTER)\n", + "print(\"Trino URL:\", TRINO_URL)\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from pyspark.sql import SparkSession\n", + "\n", + "packages = [\n", + " \"org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.9.2\",\n", + " \"org.apache.hadoop:hadoop-aws:3.3.4\",\n", + " \"software.amazon.awssdk:bundle:2.20.158\",\n", + "]\n", + "\n", + "spark = (\n", + " SparkSession.builder\n", + " .appName(\"HotelsPracticeDrills\")\n", + " .master(SPARK_MASTER)\n", + " .config(\"spark.jars.packages\", \",\".join(packages))\n", + " .config(\"spark.sql.catalog.spark_catalog\", \"org.apache.iceberg.spark.SparkSessionCatalog\")\n", + " .config(\"spark.sql.catalog.spark_catalog.type\", \"hive\")\n", + " .config(\"spark.sql.catalog.iceberg\", \"org.apache.iceberg.spark.SparkCatalog\")\n", + " .config(\"spark.sql.catalog.iceberg.type\", \"rest\")\n", + " .config(\"spark.sql.catalog.iceberg.uri\", \"http://hive-metastore:9001/iceberg\")\n", + " .config(\"spark.sql.catalog.iceberg.warehouse\", \"s3a://iceberg/warehouse\")\n", + " .config(\"spark.sql.catalog.iceberg.s3.endpoint\", f\"http://{S3_ENDPOINT}\")\n", + " .config(\"spark.sql.catalog.iceberg.s3.access-key-id\", MINIO_ACCESS_KEY)\n", + " .config(\"spark.sql.catalog.iceberg.s3.secret-access-key\", MINIO_SECRET_KEY)\n", + " .config(\"spark.sql.catalog.iceberg.s3.region\", os.environ[\"AWS_REGION\"])\n", + " .config(\"spark.sql.catalog.iceberg.s3.path-style-access\", \"true\")\n", + " .config(\"spark.sql.extensions\", \"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions\")\n", + " .config(\"spark.sql.defaultCatalog\", \"iceberg\")\n", + " .enableHiveSupport()\n", + " .getOrCreate()\n", + ")\n", + "\n", + "spark.sparkContext.setLogLevel(\"WARN\")\n", + "spark.conf.set(\"spark.sql.session.timeZone\", \"UTC\")\n", + "\n", + "print(\"Spark session ready:\", spark.version)\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import trino\n", + "\n", + "trino_conn = trino.dbapi.connect(\n", + " host=os.getenv(\"TRINO_HOST\", \"trino\"),\n", + " port=int(os.getenv(\"TRINO_PORT\", \"8080\")),\n", + " user=os.getenv(\"TRINO_USER\", \"admin\"),\n", + " catalog=os.getenv(\"TRINO_CATALOG\", \"iceberg\"),\n", + " schema=os.getenv(\"TRINO_SCHEMA\", \"hotels_practice\"),\n", + ")\n", + "print(\"Connected to Trino catalog/schema:\", trino_conn.schema)\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 🧮 Section A — SQL (Trino) Drills\n", + "\n", + "Catalog/schema default: `iceberg.hotels_practice`. Adjust queries if you wrote tables elsewhere.\n", + "\n", + "### A1. Top Hotels by Recent Review Quality (last 180 days)\n", + "- Inputs: `reviews`, `hotels`\n", + "- Compute `review_cnt`, `avg_rating` grouped by `(country, city, hotel_id)` for reviews in the last 180 days.\n", + "- Return the top 20 ordered by `review_cnt` desc, then `avg_rating` desc. Handle ties safely.\n", + "\n", + "### A2. Monthly Occupancy Proxy\n", + "- Inputs: `bookings`, `hotels`\n", + "- For `status = 'completed'`, calculate monthly `bookings` and `total_nights` per `(country, city, hotel_id, month)` using `date_trunc('month', checkin_date)`.\n", + "- Order by `month` desc, then `bookings` desc.\n", + "\n", + "### A3. Top-3 Hotels per Country (Window)\n", + "- Inputs: `bookings`, `hotels`\n", + "- For completed bookings, compute total bookings and avg price per `(country, hotel_id)`.\n", + "- Return at most three rows per country using window functions (e.g., `row_number()` or `dense_rank()` when ties should retain peers).\n", + "\n", + "### A4. Rolling Rating per Hotel (Window Frame)\n", + "- Inputs: `reviews`\n", + "- For each `hotel_id`, compute a rolling average of `rating` over the previous 10 rows ordered by `created_at`.\n", + "\n", + "### A5. Cancellation & No-Show Rates\n", + "- Inputs: `bookings`, `hotels`\n", + "- For each `(country, month)` compute cancel and no-show rates using `NULLIF(total, 0)` to avoid divide-by-zero.\n", + "\n", + "### A6. Review Language Mix & Bias Check\n", + "- Inputs: `reviews`, `hotels`\n", + "- Last 90 days, compute language share per country and return those with any language share > 0.6.\n", + "\n", + "### A7. Joining Images for Quality Screening\n", + "- Inputs: `images`, `hotels`\n", + "- Last 60 days, compute `avg_quality`, `image_cnt` per `(country, hotel_id)` and filter to `image_cnt >= 5` and `avg_quality >= 0.8`.\n", + "\n", + "### A8. “Trusted” Hotel Surface (Multi-signal)\n", + "- Inputs: `hotels`, `reviews`, `bookings`, `images`\n", + "- Build a view/query where hotels satisfy all of:\n", + " - ≥ 30 reviews in last 180 days with `avg_rating >= 4.2`\n", + " - ≥ 20 completed bookings in last 180 days\n", + " - ≥ 5 images in last 90 days with `avg_quality >= 0.75`\n", + "- Compose with CTEs then join.\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 🔥 Section B — PySpark Drills\n", + "\n", + "Load tables via `spark.table(\"iceberg.hotels_practice.