SKA SDP Data Queue Library
The data queue library supports loading data onto and off Kafka queues. The AIOKafka library is used to communicate from Python to Kafka.
For a detailed API of the various data queue classes, refer to SKA SDP Data Queue Classes.
Send data to a topic
The following is an example of how to use the library to push a basic string message to a Kafka queue. In this example, it is assumed that Kafka is running on the system in “KUBE_NAMESPACE”.
import asyncio
from ska_sdp_dataqueues import DataQueueProducer, Encoding
async def run():
kafka_host = f"ska-sdp-kafka.{KUBE_NAMESPACE}:9092"
kafka_topic = "test-topic"
message = "Testing Testing"
encoding = Encoding.ASCII
async with DataQueueProducer(kafka_host, topic=kafka_topic, encoding=encoding) as queue:
await queue.send(message)
record = asyncio.run(run())
The returned record is a RecordMetadata object which is returned when the AIOKafkaProducer sends a message.
Kafka topics only store raw bytes, so the data queue library provides an encoding to convert the message into bytes. The encoding options currently supported are:
utf-8: variable length character encoding
ascii: single byte length character encoding
msgpack_numpy: msgpack format for numpy arrays
xarray: encoding for xarray datasets
npy: self describing tensor format for numpy data
json: encoding for JSON
Stream data from multiple topics
Streaming of data via a long running consumer can also be done directly from an instance of a DataQueueConsumer, by using it as an iterator. The data that comes from this async iterator contains the topic, and the (optionally decoded) message.
from ska-sdp-dataqueues import DataQueueConsumer, Encoding
kafka_host = f"ska-sdp-kafka.{KUBE_NAMESPACE}:9092"
topics = ["test-topic-1", "test-topic-2"]
consumer = DataQueueConsumer(kafka_host, topics=topics, encoding=Encoding.UTF8)
producer = DataQueueProducer(kafka_host)
async with producer:
for topic in topics:
await queue.send(data="message", encoding=Encoding.UTF8, topic=topic)
async with consumer:
ind = 0
async for topic, message in consumer:
print(topic, message)
ind += 1
if ind == 2: # once we got the data from both topics, exit
break
If the encoding
argument is removed from the DataQueueConsumer
constructor call then the message
will be the raw bytes.
Schema Validation
The data sent to the queue can (optionally) be validated against a schema. Currently the three Schemas for Validation included in the data queue library are supported and data in the following format can be validated:
Pointing Table as xarray Dataset
Pointing data as structured numpy array
Signal display metrics as a dictionary
In order to validate a Pointing Table (as an xarray Dataset), the xradio package is used, and the
schema is defined as an xarray dataclass. The schema can either be passed to the send_to_queue
method as an xradio DatasetSchema, or as an xarray dataclass (in this case it is converted to a
DatasetSchema internally). All schema validation issues are collected together and raised in a single
TypeException. Note that the correct encoding (and decoding) for xarray Datasets is still under
development. Note that xradio is an optional dependency, and you will need to explicitly
install it if you want to use xarray validation:
poetry install --extras xradio
# or
pip install ska-sdp-dataqueues[xradio] --extra-index-url https://artefact.skao.int/repository/pypi-internal/simple
The other data types are encoded/decoded using msgpack_numpy
and are validated against standard
dataclass schemas, with basic checks performed on attributes and formats. TypeExceptions are raised
separately for each issue found.
The following is an example of schema validation for a structured numpy array containing pointing data:
import asyncio from ska_sdp_dataqueues import DataQueueProducer, Encoding from ska_sdp_dataqueues.schemas.numpy_structured_pointing import PointingNumpyArray pointing_data = { "antenna_name": "MKT001", "last_scan_index": 9.0, "xel_offset": 3.5, "xel_offset_std": 0.1, "el_offset": 2.8, "el_offset_std": 0.1, "expected_width_h": 1.1, "expected_width_v": 1.1, "fitted_width_h": 1.3, "fitted_width_h_std": 0.1, "fitted_width_v": 1.2, "fitted_width_v_std": 0.1, "fitted_height": 100.0, "fitted_height_std": 1.2, } pointing_data_numpy = PointingNumpyArray(**pointing_data).to_numpy() kafka_host = f"localhost:9092" kafka_topic = "test-topic" async def run(): async with DataQueueProducer(kafka_host, topic=kafka_topic) as queue: queue.send( pointing_data_numpy, encoding=Encoding.MSGPACK_NUMPY, schema=PointingNumpyArray, validate=True, ) record = asyncio.run(run())
Dynamically Allocating Topics
Some use cases would prefer to constantly stream and only allocate topics in a different async task. This is ideal for use cases where you want the consumer to be a long running process, in which the topic selection changes over time.
from ska-sdp-dataqueues import DataQueueConsumer, DataQueueProducer, Encoding
kafka_host = f"ska-sdp-kafka.{KUBE_NAMESPACE}:9092"
topics = ["test-topic-1", "test-topic-2"]
async with DataQueueProducer(kafka_host) as producer:
for topic in topics:
await producer.send("message", encoding=Encoding.UTF8, topic=topic)
async with DataQueueConsumer(kafka_host, encoding=Encoding.UTF8) as consumer:
await consumer.update_topics(topics)
async for topic, message in consumer:
print(topic, message)
If you need further fine-grained control, and also need to specify the partitions
that should be allocated then instead of the consumer.update_topics
call
you should use consumer.assign_topics
. In which you pass in a list of
tuples, with the tuple containing the topic, and partition pairs.
await consumer.assign_topics([(topic, 0), (topic, 3)])
Manually Creating Topics
It is advised that topics are created before trying to consume from them. Sending to topics that are not created yet is normally fine. But consuming from un-created topics can sometimes lead to delayed processing.
from ska_sdp_dataqueues import DataQueueAdmin
kafka_host = f"ska-sdp-kafka.{KUBE_NAMESPACE}:9092"
async with DataQueueAdmin(kafka_host) as admin:
# Creating normal topics
await admin.create_topics(["topic-1", "topic-2"])
# Creating topics and setting the amount of partitions they must have at minimum
await admin.create_topics([("topic-3", 5), ("topic-4", 10)])
# Mixing partitions with not setting partitions:
await admin.create_topics([("topic-5", 3), "topic-6"])
Testing locally
In order to run either the tests or custom test scripts on your local machine,
you need to have a kafka instance running locally too. This can be achieved in
various ways, with or without zookeeper running (see e.g. .gitlab-ci.yaml file).
Here is an option to start kafka without zookeeper, using docker. This image
will keep running until you cancel it. Once it’s started (may take a minute),
you can execute tests and other code, by setting the kafka_host
in
DataQueue(kafka_host)
to localhost:9092
.
Start kafka in docker:
docker run -it --name kafka-zkless -p 9092:9092 -e LOG_DIR=/tmp/logs quay.io/strimzi/kafka:latest-kafka-2.8.1-amd64 /bin/sh -c 'export CLUSTER_ID=$(bin/kafka-storage.sh random-uuid) && bin/kafka-storage.sh format -t $CLUSTER_ID -c config/kraft/server.properties && bin/kafka-server-start.sh config/kraft/server.properties'