> ## Documentation Index
> Fetch the complete documentation index at: https://docs.sx.bet/llms.txt
> Use this file to discover all available pages before exploring further.

# Migrating from Ably

> Step-by-step guide for migrating from the legacy Ably WebSocket API to Centrifugo.

The legacy Ably-based WebSocket API is being deprecated. This guide covers everything you need to migrate to the Centrifugo-based API.

<Warning>
  **Deprecation deadline: July 1, 2026.** The legacy Ably WebSocket API will be shut down on this date. All integrations must complete migration to Centrifugo before then.
</Warning>

The core concepts are the same — channels, subscriptions, and message payloads are structurally identical. There are six concrete changes to make.

<Steps>
  <Step title="Swap the SDK">
    Uninstall `ably` and install `centrifuge`:

    ```bash theme={null}
    npm uninstall ably
    npm install centrifuge
    ```
  </Step>

  <Step title="Update authentication">
    The auth endpoint and callback pattern have both changed.

    |          | Ably (old)                            | Centrifugo (new)                             |
    | -------- | ------------------------------------- | -------------------------------------------- |
    | Endpoint | `GET /user/token`                     | `GET /user/realtime-token/api-key`           |
    | Callback | `authCallback(tokenParams, callback)` | `getToken: async () => token`                |
    | WS URL   | Managed by SDK                        | `wss://realtime.sx.bet/connection/websocket` |

    **Before:**

    ```javascript theme={null}
    import * as ably from "ably";

    const realtime = new ably.Realtime.Promise({
      authCallback: async (tokenParams, callback) => {
        try {
          const response = await fetch("https://api.sx.bet/user/token", {
            headers: { "X-Api-Key": process.env.SX_BET_API_KEY },
          });
          callback(null, await response.json());
        } catch (error) {
          callback(error, null);
        }
      },
    });

    await realtime.connection.once("connected");
    ```

    **After:**

    <CodeGroup>
      ```javascript JavaScript theme={null}
      import { Centrifuge } from "centrifuge";

      const client = new Centrifuge("wss://realtime.sx.bet/connection/websocket", {
        getToken: async () => {
          const res = await fetch("https://api.sx.bet/user/realtime-token/api-key", {
            headers: { "x-api-key": process.env.SX_BET_API_KEY },
          });
          if (!res.ok) throw new Error(`Token endpoint returned ${res.status}`);
          const { token } = await res.json();
          return token;
        },
      });

      client.connect();
      ```

      ```python Python theme={null}
      import asyncio
      import os
      import requests
      from centrifuge import Client

      def fetch_token():
          res = requests.get(
              "https://api.sx.bet/user/realtime-token/api-key",
              headers={"x-api-key": os.environ["SX_BET_API_KEY"]},
          )
          if not res.ok:
              raise Exception(f"Token endpoint returned {res.status_code}")
          return res.json()["token"]

      async def main():
          client = Client(
              "wss://realtime.sx.bet/connection/websocket",
              get_token=fetch_token,
          )
          await client.connect()
          await asyncio.Future()  # keep running

      asyncio.run(main())
      ```
    </CodeGroup>

    `getToken` is called automatically on initial connect and on token refresh — no manual token management needed. To signal a permanent auth failure (so the client stops retrying), throw `UnauthorizedError` from `centrifuge` on a 401/403 response. Any other thrown error is treated as transient and retried with backoff. See [Real-time Data → Auth](/developers/real-time#auth) for the full pattern.
  </Step>

  <Step title="Update channel subscriptions">
    The subscription API has changed. Message data moves from `message.data` to `ctx.data`.

    **Before:**

    ```javascript theme={null}
    const channel = realtime.channels.get("order_book_v2:0x6629...:0xabc...", {
      params: { rewind: "10s" },
    });

    channel.subscribe((message) => {
      console.log(message.data);
    });
    ```

    **After:**

    <CodeGroup>
      ```javascript JavaScript theme={null}
      const sub = client.newSubscription("order_book:market_0xabc...", {
        positioned: true,
        recoverable: true,
      });

      sub.on("subscribed", async (ctx) => {
        if (!ctx.wasRecovering || (ctx.wasRecovering && !ctx.recovered)) {
          // Seed state from REST on fresh connect or failed recovery
          const res = await fetch(`https://api.sx.bet/orders?marketHashes=0xabc...`);
          applySnapshot(await res.json());
        }
      });

      sub.on("publication", (ctx) => {
        applyUpdate(ctx.data);
      });

      sub.subscribe();
      ```

      ```python Python theme={null}
      import asyncio
      import aiohttp
      from centrifuge import (
          Client,
          PublicationContext,
          SubscribedContext,
          SubscriptionEventHandler,
      )

      async def on_subscribed(ctx: SubscribedContext) -> None:
          if not ctx.was_recovering or (ctx.was_recovering and not ctx.recovered):
              async with aiohttp.ClientSession() as session:
                  async with session.get(
                      "https://api.sx.bet/orders?marketHashes=0xabc..."
                  ) as res:
                      apply_snapshot(await res.json())

      async def on_publication(ctx: PublicationContext) -> None:
          apply_update(ctx.data)

      async def main():
          client = Client(
              "wss://realtime.sx.bet/connection/websocket",
              get_token=fetch_token,
          )
          await client.connect()
          handler = SubscriptionEventHandler(
              on_subscribed=on_subscribed,
              on_publication=on_publication,
          )
          sub = client.new_subscription("order_book:market_0xabc...", handler)
          await sub.subscribe()
          await asyncio.Future()

      asyncio.run(main())
      ```
    </CodeGroup>

    **`rewind` is replaced by `positioned` + `recoverable`.** These two flags together give you at-least-once delivery with automatic gap recovery on reconnect — equivalent to what `rewind` was doing, but more reliable. See [Real-time Data → Recovery & reliability](/developers/real-time#recovery--reliability) for how to handle the three reconnect states.
  </Step>

  <Step title="Update channel names">
    All channel names have changed. Update every channel string in your code:

    | Old channel (Ably)                       | New channel (Centrifuge)            | Notes                                                                                                     |
    | ---------------------------------------- | ----------------------------------- | --------------------------------------------------------------------------------------------------------- |
    | `active_orders_v2:{baseToken}:{maker}`   | `active_orders:{maker}`             | `baseToken` removed, `_v2` dropped                                                                        |
    | `order_book_v2:{baseToken}:{marketHash}` | `order_book:market_{marketHash}`    | `baseToken` removed                                                                                       |
    | —                                        | `order_book:event_{sportXEventId}`  | New — alternative to `order_book:market_` that covers every market in an event with a single subscription |
    | `best_odds:{baseToken}`                  | `best_odds:global`                  | `baseToken` removed from channel name                                                                     |
    | `markets`                                | `markets:global`                    | Renamed                                                                                                   |
    | `recent_trades`                          | `recent_trades:global`              | Renamed                                                                                                   |
    | `recent_trades_consolidated`             | `recent_trades_consolidated:global` | Renamed                                                                                                   |
    | `main_line`                              | `main_line:global`                  | Renamed                                                                                                   |
    | `fixtures:fixture_update`                | `fixtures:global`                   | Renamed                                                                                                   |
    | `live_scores:{sportXEventId}`            | `fixtures:live_scores`              | Now global — filter by `sportXEventId` client-side                                                        |
    | `ce_refunds:{user}`                      | `ce_refunds:{bettor}`               | Parameter renamed `user` → `bettor`                                                                       |
    | `markets:parlay`                         | `parlay_markets:global`             | Renamed                                                                                                   |

    All message payloads are unchanged.

    **`live_scores` is now a global channel.** The old `live_scores:{sportXEventId}` channel was per-event. The new `fixtures:live_scores` delivers all live score updates globally — filter the `sportXEventId` field in the payload to isolate the events you care about.
  </Step>

  <Step title="Replace rewind with the snapshot + subscribe pattern">
    The Ably `rewind` parameter (which replayed recent messages automatically on subscribe) has no direct equivalent in Centrifugo. The replacement is to set `positioned: true` and `recoverable: true` on the subscription, then seed from REST inside the `subscribed` handler — which fires on every connect and tells you whether history replay already filled the gap.

    **Before:**

    ```javascript theme={null}
    const channel = realtime.channels.get(`order_book_v2:${token}:${marketHash}`, {
      params: { rewind: "10s" },
    });
    channel.subscribe((message) => {
      applyUpdate(message.data);
    });
    ```

    **After:**

    <CodeGroup>
      ```javascript JavaScript theme={null}
      let ready = false;
      const buffer = [];

      const sub = client.newSubscription(`order_book:market_${marketHash}`, {
        positioned: true,
        recoverable: true,
      });

      sub.on("publication", (ctx) => {
        if (!ready) {
          buffer.push(ctx.data);
        } else {
          applyUpdate(ctx.data);
        }
      });

      sub.on("subscribed", async (ctx) => {
        if (ctx.wasRecovering && ctx.recovered) {
          // Centrifugo replayed all missed messages — no REST call needed.
          ready = true;
          return;
        }

        // Fresh connect or failed recovery (history gap too large / expired).
        // Reset and re-seed from REST.
        ready = false;
        buffer.length = 0;

        const res = await fetch(`https://api.sx.bet/orders?marketHashes=${marketHash}`);
        applySnapshot(await res.json());

        // Drain buffered publications on top of the snapshot.
        // applyUpdate deduplicates by entity ID so any
        // overlap between the snapshot and the buffer is handled safely.
        for (const data of buffer) applyUpdate(data);
        buffer.length = 0;
        ready = true;
      });

      sub.subscribe();
      ```

      ```python Python theme={null}
      import asyncio
      import aiohttp
      from centrifuge import (
          Client,
          PublicationContext,
          SubscribedContext,
          SubscriptionEventHandler,
          SubscriptionOptions,
      )

      async def subscribe_order_book(client: Client, market_hash: str) -> None:
          ready = False
          buffer: list = []

          async def on_subscribed(ctx: SubscribedContext) -> None:
              nonlocal ready
              if ctx.was_recovering and ctx.recovered:
                  ready = True
                  return

              ready = False
              buffer.clear()

              async with aiohttp.ClientSession() as session:
                  async with session.get(
                      f"https://api.sx.bet/orders?marketHashes={market_hash}"
                  ) as res:
                      apply_snapshot(await res.json())

              for data in buffer:
                  apply_update(data)
              buffer.clear()
              ready = True

          async def on_publication(ctx: PublicationContext) -> None:
              if not ready:
                  buffer.append(ctx.data)
              else:
                  apply_update(ctx.data)

          handler = SubscriptionEventHandler(
              on_subscribed=on_subscribed,
              on_publication=on_publication,
          )
          options = SubscriptionOptions(positioned=True, recoverable=True)
          sub = client.new_subscription(
              f"order_book:market_{market_hash}", handler, options
          )
          await sub.subscribe()
      ```
    </CodeGroup>

    For channels with no history (`best_odds:global`, `parlay_markets:global`), omit the flags and always seed from REST in the `subscribed` handler — recovery is not available for those channels. See [Real-time Data → Recovery & reliability](/developers/real-time#recovery--reliability) for the full three-state logic.
  </Step>

  <Step title="Handle duplicate messages">
    Centrifugo provides at-least-once delivery: during history recovery a message may be replayed more than once. Use `ctx.tags.messageId` to deduplicate:

    <CodeGroup>
      ```javascript JavaScript theme={null}
      const seen = new Set();
      const MAX_SEEN = 10_000;

      sub.on("publication", (ctx) => {
        const id = ctx.tags?.messageId;
        if (id !== undefined) {
          if (seen.has(id)) return;
          seen.add(id);
          if (seen.size > MAX_SEEN) {
            seen.delete(seen.values().next().value);
          }
        }
        applyUpdate(ctx.data);
      });
      ```

      ```python Python theme={null}
      from collections import OrderedDict
      from centrifuge import PublicationContext, SubscriptionEventHandler

      MAX_SEEN = 10_000
      seen: OrderedDict[str, None] = OrderedDict()

      async def on_publication(ctx: PublicationContext) -> None:
          msg_id = (ctx.tags or {}).get("messageId")
          if msg_id is not None:
              if msg_id in seen:
                  return
              seen[msg_id] = None
              if len(seen) > MAX_SEEN:
                  seen.popitem(last=False)
          apply_update(ctx.data)

      handler = SubscriptionEventHandler(on_publication=on_publication)
      ```
    </CodeGroup>

    For long-running processes, bound the set size (e.g., keep only the last 1,000 IDs) to avoid unbounded memory growth.
  </Step>
</Steps>

***

## Further reading

* [Real-time Data →](/developers/real-time) — connecting, subscribing, recovery, and common failures
* [WebSocket Channels →](/api-reference/centrifugo-overview) — namespace history capabilities and connection limits
* [WebSocket Initialization →](/api-reference/centrifugo-initialization) — full connection reference
