SKA SDP Data Queue Library

The data queue library supports loading data onto and off Kafka queues. The AIOKafka library is used to communicate from Python to Kafka.

For a detailed API of the various data queue classes, refer to SKA SDP Data Queue Classes.

Send data to a topic

The following is an example of how to use the library to push a basic string message to a Kafka queue. In this example, it is assumed that Kafka is running on the system in “KUBE_NAMESPACE”.

import asyncio
from ska_sdp_dataqueues import DataQueueProducer, Encoding

async def run():
    kafka_host = f"ska-sdp-kafka.{KUBE_NAMESPACE}:9092"
    kafka_topic = "test-topic"

    message = "Testing Testing"
    encoding = Encoding.ASCII

    async with DataQueueProducer(kafka_host, topic=kafka_topic, encoding=encoding) as queue:
        await queue.send(message)

record = asyncio.run(run())

The returned record is a RecordMetadata object which is returned when the AIOKafkaProducer sends a message.

Kafka topics only store raw bytes, so the data queue library provides an encoding to convert the message into bytes. The encoding options currently supported are:

  • utf-8: variable length character encoding

  • ascii: single byte length character encoding

  • msgpack_numpy: msgpack format for numpy arrays

  • xarray: encoding for xarray datasets

  • npy: self describing tensor format for numpy data

  • json: encoding for JSON

Stream data from multiple topics

Streaming of data via a long running consumer can also be done directly from an instance of a DataQueueConsumer, by using it as an iterator. The data that comes from this async iterator contains the topic, and the (optionally decoded) message.

from ska-sdp-dataqueues import DataQueueConsumer, Encoding

kafka_host = f"ska-sdp-kafka.{KUBE_NAMESPACE}:9092"
topics = ["test-topic-1", "test-topic-2"]

consumer = DataQueueConsumer(kafka_host, topics=topics, encoding=Encoding.UTF8)
producer = DataQueueProducer(kafka_host)

async with producer:
    for topic in topics:
        await queue.send(data="message", encoding=Encoding.UTF8, topic=topic)

 async with consumer:
    ind = 0
    async for topic, message in consumer:
        print(topic, message)
        ind += 1
        if ind == 2:  # once we got the data from both topics, exit
            break

If the encoding argument is removed from the DataQueueConsumer constructor call then the message will be the raw bytes.

Schema Validation

The data sent to the queue can (optionally) be validated against a schema. Currently the three Schemas for Validation included in the data queue library are supported and data in the following format can be validated:

  • Pointing Table as xarray Dataset

  • Pointing data as structured numpy array

  • Signal display metrics as a dictionary

In order to validate a Pointing Table (as an xarray Dataset), the xradio package is used, and the schema is defined as an xarray dataclass. The schema can either be passed to the send_to_queue method as an xradio DatasetSchema, or as an xarray dataclass (in this case it is converted to a DatasetSchema internally). All schema validation issues are collected together and raised in a single TypeException. Note that the correct encoding (and decoding) for xarray Datasets is still under development. Note that xradio is an optional dependency, and you will need to explicitly install it if you want to use xarray validation:

poetry install --extras xradio

# or

pip install ska-sdp-dataqueues[xradio] --extra-index-url https://artefact.skao.int/repository/pypi-internal/simple

The other data types are encoded/decoded using msgpack_numpy and are validated against standard dataclass schemas, with basic checks performed on attributes and formats. TypeExceptions are raised separately for each issue found.

The following is an example of schema validation for a structured numpy array containing pointing data:

import asyncio
from ska_sdp_dataqueues import DataQueueProducer, Encoding
from ska_sdp_dataqueues.schemas.numpy_structured_pointing import PointingNumpyArray

pointing_data = {
     "antenna_name": "MKT001",
     "last_scan_index": 9.0,
     "xel_offset": 3.5,
     "xel_offset_std": 0.1,
     "el_offset": 2.8,
     "el_offset_std": 0.1,
     "expected_width_h": 1.1,
     "expected_width_v": 1.1,
     "fitted_width_h": 1.3,
     "fitted_width_h_std": 0.1,
     "fitted_width_v": 1.2,
     "fitted_width_v_std": 0.1,
     "fitted_height": 100.0,
     "fitted_height_std": 1.2,
}

pointing_data_numpy = PointingNumpyArray(**pointing_data).to_numpy()

kafka_host = f"localhost:9092"
kafka_topic = "test-topic"

async def run():
    async with DataQueueProducer(kafka_host, topic=kafka_topic) as queue:
        queue.send(
            pointing_data_numpy,
            encoding=Encoding.MSGPACK_NUMPY,
            schema=PointingNumpyArray,
            validate=True,
        )

record = asyncio.run(run())

Dynamically Allocating Topics

Some use cases would prefer to constantly stream and only allocate topics in a different async task. This is ideal for use cases where you want the consumer to be a long running process, in which the topic selection changes over time.

from ska-sdp-dataqueues import DataQueueConsumer, DataQueueProducer, Encoding

kafka_host = f"ska-sdp-kafka.{KUBE_NAMESPACE}:9092"
topics = ["test-topic-1", "test-topic-2"]

async with DataQueueProducer(kafka_host) as producer:
    for topic in topics:
        await producer.send("message", encoding=Encoding.UTF8, topic=topic)

async with DataQueueConsumer(kafka_host, encoding=Encoding.UTF8) as consumer:

    await consumer.update_topics(topics)

    async for topic, message in consumer:
        print(topic, message)

If you need further fine-grained control, and also need to specify the partitions that should be allocated then instead of the consumer.update_topics call you should use consumer.assign_topics. In which you pass in a list of tuples, with the tuple containing the topic, and partition pairs.

await consumer.assign_topics([(topic, 0), (topic, 3)])

Manually Creating Topics

It is advised that topics are created before trying to consume from them. Sending to topics that are not created yet is normally fine. But consuming from un-created topics can sometimes lead to delayed processing.

from ska_sdp_dataqueues import DataQueueAdmin
kafka_host = f"ska-sdp-kafka.{KUBE_NAMESPACE}:9092"

async with DataQueueAdmin(kafka_host) as admin:

    # Creating normal topics
    await admin.create_topics(["topic-1", "topic-2"])

    # Creating topics and setting the amount of partitions they must have at minimum

    await admin.create_topics([("topic-3", 5), ("topic-4", 10)])

    # Mixing partitions with not setting partitions:

    await admin.create_topics([("topic-5", 3), "topic-6"])

Testing locally

In order to run either the tests or custom test scripts on your local machine, you need to have a kafka instance running locally too. This can be achieved in various ways, with or without zookeeper running (see e.g. .gitlab-ci.yaml file). Here is an option to start kafka without zookeeper, using docker. This image will keep running until you cancel it. Once it’s started (may take a minute), you can execute tests and other code, by setting the kafka_host in DataQueue(kafka_host) to localhost:9092.

Start kafka in docker:

docker run -it --name kafka-zkless -p 9092:9092 -e LOG_DIR=/tmp/logs quay.io/strimzi/kafka:latest-kafka-2.8.1-amd64 /bin/sh -c 'export CLUSTER_ID=$(bin/kafka-storage.sh random-uuid) && bin/kafka-storage.sh format -t $CLUSTER_ID -c config/kraft/server.properties && bin/kafka-server-start.sh config/kraft/server.properties'