Consider a situation where you are streaming options trades in real-time through a WebSocket API connection. As the data flows in, you detect an anomaly in a series of trades, perhaps an unusual spike in trading volume or a significant price movement. To gain more insight into this anomaly, you want to quickly access detailed information about the options chain related to these trades. This situation requires making a REST API call to fetch the options contact data while continuing to monitor the ongoing trades without interruption.

This scenario is not just hypothetical; it's a common challenge faced by anyone looking to quickly respond to market events by fetching additional data without losing track of the real-time data stream. However, a frequent coding mistake in such scenarios is handling REST API calls synchronously (blocking) while processing WebSocket events. This approach can lead to a significant backlog of real-time events that need processing, potentially causing the WebSocket stream to disconnect if the backlog grows too large. In this tutorial, we will walk you through how to handle this effectively using Python, focusing on asynchronous (non-blocking) operations to maintain a continuous and responsive data stream.

Blocking vs Non-blocking

When handling streaming data feeds with high message volumes, efficient non-blocking message processing is essential. A common issue occurs when the message process becomes 'blocking.' This happens when sub tasks, such as making a REST API call, are done synchronously within the same code block that's handling or parsing these streaming messages. For instance, in our scenario of detecting an anomaly and needing to fetch extra data via a REST API, doing this in a synchronous way within the message handler can pause the entire data stream. Such a pause creates a bottleneck, leading to a backlog of unprocessed messages. The consequences are missed or delayed updates, and potentially, server disconnections due to slow message processing.

Let's explore this further with a simple pseudo-code example demonstrating how a blocking scenario can occur and contrast it with a non-blocking approach.

function handleWebSocketMessages():
    while True:
        message = receiveWebSocketMessage()
        if processMessage(message) == anomaly:  # This might take time
            fetchAdditionalData(message)  # Synchronous REST API call, blocking

# Main flow
start handleWebSocketMessages()

To overcome the challenges of blocking streaming WebSocket messages, we turn to the concept of non-blocking operations, leveraging threads and queues. This approach decouples the receipt of data from WebSocket streams and the processing or fetching of additional data through REST API calls. In this non-blocking pseudo-code, handleWebSocketMessages puts incoming messages into a queue, and a separate asynchronous function processMessages processes these messages. Crucially, fetchAdditionalDataAsync is a non-blocking asynchronous call, allowing the system to continue processing new WebSocket messages while waiting for the REST API response.

queue messageQueue

async function handleWebSocketMessages():
    while True:
        message = receiveWebSocketMessage()
        await messageQueue.put(message)

async function processMessages():
    while True:
        message = await messageQueue.get()
        if processMessage(message) == anomaly:
            await fetchAdditionalDataAsync(message)  # Asynchronous REST API call

# Main flow
start handleWebSocketMessages()
start processMessages()

By implementing non-blocking data fetching using threads and queues, applications that rely on WebSocket connections for real-time data can achieve higher performance and reliability. This method ensures that the processing of data, whether it be through REST API calls or other means, does not interfere with the critical task of data reception, thus maintaining the integrity and responsiveness of the data feed.

Real World Pattern

Now that we have explored the theory behind non-blocking WebSocket communication and its contrast with blocking operations, let's dive into a real-world example. In this section, we will explore a practical scenario that illustrates how non-blocking asynchronous operations using asyncio can be implemented in Python to handle streaming data and REST API calls simultaneously.

In this pattern we have effectively decoupled the reception of WebSocket messages from their processing which means you can keep pace with messages as they arrive. The example below demonstrates the search for NVDA options trades, following which it pulls the associated options contract in a non-blocking manner. This is specifically designed to showcase how such a pattern can manage data streaming and external API interactions efficiently without one process hindering the other. While this is just an example that focuses on NVDA options trades, the overall pattern is quite flexible and can be adapted to suit a variety of use-cases where similar real-time data processing and external data fetching are required.

import asyncio
import logging
import os
import re
from concurrent.futures import ThreadPoolExecutor
from typing import Optional, Union
from polygon import RESTClient, WebSocketClient
from polygon.websocket.models import Market, Feed


class ApiCallHandler:
    def __init__(self):
        self.api_call_queue = asyncio.Queue()
        self.executor = ThreadPoolExecutor()  # Thread pool for running synchronous code
        self.client = RESTClient()  # Assumes POLYGON_API_KEY is set in the environment

    async def enqueue_api_call(self, options_ticker):
        await self.api_call_queue.put(options_ticker)

    async def start_processing_api_calls(self):
        while True:
            options_ticker = await self.api_call_queue.get()
            try:
                # TODO:
                # Here, you can process the rest api requets as needed
                # Example: Get the options contract for this
                contract = await asyncio.get_running_loop().run_in_executor(
                    self.executor, self.get_options_contract, options_ticker
                )
                print(contract)  # Or process the contract data as needed
            except Exception as e:
                logging.error(f"Error processing API call for {options_ticker}: {e}")
            finally:
                self.api_call_queue.task_done()

    def get_options_contract(self, options_ticker):
        return self.client.get_options_contract(options_ticker)


class MessageHandler:
    def __init__(self, api_call_handler):
        self.handler_queue = asyncio.Queue()
        self.api_call_handler = api_call_handler

    async def add(self, message_response: Optional[Union[str, bytes]]) -> None:
        await self.handler_queue.put(message_response)

    async def start_handling(self) -> None:
        while True:
            message_response = await self.handler_queue.get()
            logging.info(f"Received message: {message_response}")
            try:
                # TODO:
                # Here, you can process the websocket messages as needed
                # Example: Extract ticker symbol and enqueue REST API call
                # to get the options contract for this trade (non-blocking)
                for trade in message_response:
                    ticker = self.extract_symbol(trade.symbol)
                    if ticker == "NVDA":
                        asyncio.create_task(
                            self.api_call_handler.enqueue_api_call(trade.symbol)
                        )
            except Exception as e:
                logging.error(f"Error handling message: {e}")
            finally:
                self.handler_queue.task_done()

    def extract_symbol(self, input_string):
        match = re.search(r"O:([A-Z]+)", input_string)
        if match:
            return match.group(1)
        else:
            return None


class MyClient:
    def __init__(self, feed, market, subscriptions):
        api_key = os.getenv("POLYGON_API_KEY")
        self.polygon_websocket_client = WebSocketClient(
            api_key=api_key,
            feed=feed,
            market=market,
            verbose=True,
            subscriptions=subscriptions,
        )
        self.api_call_handler = ApiCallHandler()
        self.message_handler = MessageHandler(self.api_call_handler)

    async def start_event_stream(self):
        try:
            await asyncio.gather(
                self.polygon_websocket_client.connect(self.message_handler.add),
                self.message_handler.start_handling(),
                self.api_call_handler.start_processing_api_calls(),
            )
        except Exception as e:
            logging.error(f"Error in event stream: {e}")


async def main():
    logging.basicConfig(level=logging.INFO)
    my_client = MyClient(
        feed=Feed.RealTime, market=Market.Options, subscriptions=["T.*"]
    )
    await my_client.start_event_stream()


# Entry point for the asyncio program
asyncio.run(main())

Please see the polygon-io/client-python GitHub repository for the code example.

This real-world example clearly demonstrates the advantages of using asynchronous operations in Python to handle WebSocket streams and REST API calls concurrently. The decoupling of message ingestion and external data retrieval when needed greatly increases the usefulness of your scripts. Hopefully you can leverage this pattern and by embracing non-blocking techniques, which makes your applications remain responsive and efficient, even when faced with the challenge of processing high volumes of streaming data while simultaneously performing external data fetches. The principles and techniques illustrated here are not only applicable to financial data streams but can also be adapted to a wide range of scenarios where real-time data processing is crucial.

Next Steps

While this tutorial presents a practical approach using asyncio in Python for handling concurrent WebSocket and REST API operations, it's important to recognize that this is just one solution in a range of possibilities. For applications dealing with higher volumes of data and complexity, more advanced solutions such as publish/subscribe systems, distributed queues, and remote workers might be necessary. These approaches offer greater scalability and robustness but also add complexity and infrastructure requirements. As your project's needs evolve, carefully consider these options to strike the right balance between efficiency, scalability, and maintainability.