|
| 1 | +### KIP-848 - Migration Guide |
| 2 | + |
| 3 | +#### Overview |
| 4 | + |
| 5 | +- **What changed:** |
| 6 | + |
| 7 | + The **Group Leader role** (consumer member) is removed. Assignments are calculated by the **Group Coordinator (broker)** and distributed via **heartbeats**. |
| 8 | + |
| 9 | +- **Requirements:** |
| 10 | + |
| 11 | + - Broker version **4.0.0+** |
| 12 | + - `Confluent.Kafka` version **2.12.0+**: GA (production-ready) |
| 13 | + |
| 14 | +- **Enablement (client-side):** |
| 15 | + |
| 16 | + - `GroupProtocol=Consumer` |
| 17 | + - `GroupRemoteAssignor=<assignor>` (optional; broker-controlled if `null`; default broker assignor is `uniform`) |
| 18 | + |
| 19 | +#### Available Features |
| 20 | + |
| 21 | +All KIP-848 features are supported including: |
| 22 | + |
| 23 | +- Subscription to one or more topics, including **regular expression (regex) subscriptions** |
| 24 | +- Rebalance handlers (**incremental only**) |
| 25 | +- Static group membership |
| 26 | +- Configurable remote assignor |
| 27 | +- Enforced max poll interval |
| 28 | +- Upgrade from `classic` protocol or downgrade from `consumer` protocol |
| 29 | +- AdminClient changes as per KIP |
| 30 | + |
| 31 | +#### Contract Changes |
| 32 | + |
| 33 | +##### Client Configuration changes |
| 34 | + |
| 35 | +| Classic Protocol (Deprecated Configs in KIP-848) | KIP-848 / Next-Gen Replacement | |
| 36 | +|--------------------------------------------------|--------------------------------------------------------------------------------| |
| 37 | +| `PartitionAssignmentStrategy` | `GroupRemoteAssignor` | |
| 38 | +| `SessionTimeoutMs` | Broker config: `group.consumer.session.timeout.ms` (configurable per group) | |
| 39 | +| `HeartbeatIntervalMs` | Broker config: `group.consumer.heartbeat.interval.ms` (configurable per group) | |
| 40 | +| `GroupProtocolType` | Not used in the new protocol | |
| 41 | + |
| 42 | +##### Rebalance Handler changes |
| 43 | + |
| 44 | +- The **protocol is fully incremental** in KIP-848. |
| 45 | +- ⚠️ The `partitions` list passed to `PartitionsAssignedHandler()` and `PartitionsRevokedHandler()` contains only the **incremental changes** — partitions being **added** or **revoked** — **not the full assignment**, as was the case with `Range` or `RoundRobin` in the classic protocol. |
| 46 | +It's similar to the `CooperativeSticky` incremental handlers contract but |
| 47 | +number of calls can vary: there isn't a call for each rebalance. |
| 48 | +- All assignors under KIP-848 are now **sticky**, including `range`, which was **not sticky** in the classic protocol. |
| 49 | + |
| 50 | +##### Manual partition assignment |
| 51 | + |
| 52 | +- You can still use `Assign()` before being subscribed but after subscribing you can only use `IncrementalAssign()` and `IncrementalUnassign()`. |
| 53 | + |
| 54 | +##### Static Group Membership |
| 55 | + |
| 56 | +- Duplicate `GroupInstanceId` handling: |
| 57 | + - **Newly joining member** is fenced with **UnreleasedInstanceId (fatal)**. |
| 58 | + - (Classic protocol fenced the **existing** member instead.) |
| 59 | +- Implications: |
| 60 | + - Ensure only **one active instance per** `GroupInstanceId`. |
| 61 | + - Consumers must shut down cleanly to avoid blocking replacements until session timeout expires. |
| 62 | + |
| 63 | +##### Session Timeout & Fetching |
| 64 | + |
| 65 | +- **Session timeout is broker-controlled**: |
| 66 | + - If the Coordinator is unreachable, a consumer **continues fetching messages** but cannot commit offsets. |
| 67 | + - Consumer is fenced once a heartbeat response is received from the Coordinator. |
| 68 | +- In the classic protocol, the client stopped fetching when session timeout expired. |
| 69 | + |
| 70 | +##### Closing / Auto-Commit |
| 71 | + |
| 72 | +- On `Close()` or `Unsubscribe()` with auto-commit enabled: |
| 73 | + - Member retries committing offsets until a timeout expires. |
| 74 | + - Currently uses the **default remote session timeout**. |
| 75 | + - Future **KIP-1092** will allow custom commit timeouts. |
| 76 | + |
| 77 | +##### Error Handling Changes |
| 78 | + |
| 79 | +- `UnknownTopicOrPart` (**subscription case**): |
| 80 | + - No longer returned if a topic is missing in the **local cache** when subscribing; the subscription proceeds. |
| 81 | +- `TopicAuthorizationFailed`: |
| 82 | + - Reported once per heartbeat or subscription change, even if only one topic is unauthorized. |
| 83 | + |
| 84 | +##### Summary of Key Differences (Classic vs Next-Gen) |
| 85 | + |
| 86 | +- **Assignment:** Classic protocol calculated by **Group Leader (consumer)**; KIP-848 calculated by **Group Coordinator (broker)** |
| 87 | +- **Assignors:** Classic range assignor was **not sticky**; KIP-848 assignors are **sticky**, including range |
| 88 | +- **Deprecated configs:** Classic client configs are replaced by `GroupRemoteAssignor` and broker-controlled session/heartbeat configs |
| 89 | +- **Static membership fencing:** KIP-848 fences **new member** on duplicate `GroupInstanceId` |
| 90 | +- **Session timeout:** Classic enforced on client; KIP-848 enforced on broker |
| 91 | +- **Auto-commit on close:** Classic stops at client session timeout; KIP-848 retries until remote timeout |
| 92 | +- **Unknown topics:** KIP-848 does not return error on subscription if topic missing |
| 93 | +- **Upgrade/Downgrade:** KIP-848 supports upgrade/downgrade from/to `classic` and `consumer` protocols |
| 94 | + |
| 95 | +#### Minimal Example Config |
| 96 | + |
| 97 | +##### Classic Protocol |
| 98 | + |
| 99 | +``` properties |
| 100 | +# Optional; default is 'classic' |
| 101 | +GroupProtocol=Classic |
| 102 | +PartitionAssignmentStrategy=<Range,RoundRobin,CooperativeSticky> |
| 103 | +SessionTimeoutMs=45000 |
| 104 | +HeartbeatIntervalMs=15000 |
| 105 | +``` |
| 106 | + |
| 107 | +##### Next-Gen Protocol / KIP-848 |
| 108 | + |
| 109 | +``` properties |
| 110 | +GroupProtocol=Consumer |
| 111 | + |
| 112 | +# Optional: select a remote assignor |
| 113 | +# Valid options currently: 'uniform' or 'range' |
| 114 | +# GroupRemoteAssignor=<uniform,range> |
| 115 | +# If unset, broker chooses the assignor (default: 'uniform') |
| 116 | + |
| 117 | +# Session & heartbeat now controlled by broker: |
| 118 | +# group.consumer.session.timeout.ms |
| 119 | +# group.consumer.heartbeat.interval.ms |
| 120 | +``` |
| 121 | + |
| 122 | +#### Rebalance Callback Migration |
| 123 | + |
| 124 | +**Note:** The `partitions` list contains **only partitions being added or revoked**, not the full partition list as in the eager protocol. |
| 125 | +Ensure this is handled correctly if it updates data related to the assigned partitions. |
| 126 | + |
| 127 | +#### Upgrade and Downgrade |
| 128 | + |
| 129 | +- A group made up entirely of `classic` consumers runs under the classic protocol. |
| 130 | +- The group is **upgraded to the consumer protocol** as soon as at least one `consumer` protocol member joins. |
| 131 | +- The group is **downgraded back to the classic protocol** if the last `consumer` protocol member leaves while `classic` members remain. |
| 132 | +- Both **rolling upgrade** (classic → consumer) and **rolling downgrade** (consumer → classic) are supported. |
| 133 | + |
| 134 | +#### Migration Checklist (Next-Gen Protocol / KIP-848) |
| 135 | + |
| 136 | +1. Upgrade to **Confluent.Kafka ≥ 2.12.0** (GA release) |
| 137 | +2. Run against **Kafka brokers ≥ 4.0.0** |
| 138 | +3. Set `GroupProtocol=Consumer` |
| 139 | +4. Optionally set `GroupRemoteAssignor`; leave unset for `null` for broker-controlled (default: `uniform`), valid options: `uniform` or `range` |
| 140 | +5. Replace deprecated configs with new ones |
| 141 | +6. Update rebalance callbacks to expect **incremental changes only** |
| 142 | +7. Review static membership handling (`GroupInstanceId`) |
| 143 | +8. Ensure proper shutdown to avoid fencing issues |
| 144 | +9. Adjust error handling for unknown topics and authorization failures |
0 commit comments