SKA SDP Data Queue Library ========================== The data queue library supports loading data onto and off Kafka queues. The `AIOKafka `_ library is used to communicate from Python to Kafka. For a detailed API of the various data queue classes, refer to :ref:`data_queue_api`. Send data to a topic -------------------- The following is an example of how to use the library to push a basic string message to a Kafka queue. In this example, it is assumed that Kafka is running on the system in "KUBE_NAMESPACE". .. code-block:: python import asyncio from ska_sdp_dataqueues import DataQueueProducer, Encoding async def run(): kafka_host = f"ska-sdp-kafka.{KUBE_NAMESPACE}:9092" kafka_topic = "test-topic" message = "Testing Testing" encoding = Encoding.ASCII async with DataQueueProducer(kafka_host, topic=kafka_topic, encoding=encoding) as queue: await queue.send(message) record = asyncio.run(run()) The returned record is a `RecordMetadata `_ object which is returned when the AIOKafkaProducer sends a message. Kafka topics only store raw bytes, so the data queue library provides an encoding to convert the message into bytes. The encoding options currently supported are: * **utf-8**: variable length character encoding * **ascii**: single byte length character encoding * **msgpack_numpy**: msgpack format for numpy arrays * **xarray**: encoding for xarray datasets * **npy**: self describing tensor format for numpy data * **json**: encoding for JSON Stream data from multiple topics -------------------------------- Streaming of data via a long running consumer can also be done directly from an instance of a DataQueueConsumer, by using it as an iterator. The data that comes from this async iterator contains the topic, and the (optionally decoded) message. .. code-block:: python from ska-sdp-dataqueues import DataQueueConsumer, Encoding kafka_host = f"ska-sdp-kafka.{KUBE_NAMESPACE}:9092" topics = ["test-topic-1", "test-topic-2"] consumer = DataQueueConsumer(kafka_host, topics=topics, encoding=Encoding.UTF8) producer = DataQueueProducer(kafka_host) async with producer: for topic in topics: await queue.send(data="message", encoding=Encoding.UTF8, topic=topic) async with consumer: ind = 0 async for topic, message in consumer: print(topic, message) ind += 1 if ind == 2: # once we got the data from both topics, exit break If the ``encoding`` argument is removed from the ``DataQueueConsumer`` constructor call then the ``message`` will be the raw bytes. Schema Validation ----------------- The data sent to the queue can (optionally) be validated against a schema. Currently the three :ref:`schemas` included in the data queue library are supported and data in the following format can be validated: * Pointing Table as xarray Dataset * Pointing data as structured numpy array * Signal display metrics as a dictionary In order to validate a Pointing Table (as an xarray Dataset), the `xradio`_ package is used, and the schema is defined as an xarray dataclass. The schema can either be passed to the ``send_to_queue`` method as an xradio DatasetSchema, or as an xarray dataclass (in this case it is converted to a DatasetSchema internally). All schema validation issues are collected together and raised in a single TypeException. Note that the correct encoding (and decoding) for xarray Datasets is still under development. Note that `xradio`_ is an optional dependency, and you will need to explicitly install it if you want to use xarray validation: .. code-block:: bash poetry install --extras xradio # or pip install ska-sdp-dataqueues[xradio] --extra-index-url https://artefact.skao.int/repository/pypi-internal/simple The other data types are encoded/decoded using ``msgpack_numpy`` and are validated against standard dataclass schemas, with basic checks performed on attributes and formats. TypeExceptions are raised separately for each issue found. The following is an example of schema validation for a structured numpy array containing pointing data: .. code-block:: python import asyncio from ska_sdp_dataqueues import DataQueueProducer, Encoding from ska_sdp_dataqueues.schemas.numpy_structured_pointing import PointingNumpyArray pointing_data = { "antenna_name": "MKT001", "last_scan_index": 9.0, "xel_offset": 3.5, "xel_offset_std": 0.1, "el_offset": 2.8, "el_offset_std": 0.1, "expected_width_h": 1.1, "expected_width_v": 1.1, "fitted_width_h": 1.3, "fitted_width_h_std": 0.1, "fitted_width_v": 1.2, "fitted_width_v_std": 0.1, "fitted_height": 100.0, "fitted_height_std": 1.2, } pointing_data_numpy = PointingNumpyArray(**pointing_data).to_numpy() kafka_host = f"localhost:9092" kafka_topic = "test-topic" async def run(): async with DataQueueProducer(kafka_host, topic=kafka_topic) as queue: queue.send( pointing_data_numpy, encoding=Encoding.MSGPACK_NUMPY, schema=PointingNumpyArray, validate=True, ) record = asyncio.run(run()) Dynamically Allocating Topics ----------------------------- Some use cases would prefer to constantly stream and only allocate topics in a different async task. This is ideal for use cases where you want the consumer to be a long running process, in which the topic selection changes over time. .. code-block:: python from ska-sdp-dataqueues import DataQueueConsumer, DataQueueProducer, Encoding kafka_host = f"ska-sdp-kafka.{KUBE_NAMESPACE}:9092" topics = ["test-topic-1", "test-topic-2"] async with DataQueueProducer(kafka_host) as producer: for topic in topics: await producer.send("message", encoding=Encoding.UTF8, topic=topic) async with DataQueueConsumer(kafka_host, encoding=Encoding.UTF8) as consumer: consumer.update_topics(topics) async for topic, message in consumer: print(topic, message) If you need further fine-grained control, and also need to specify the partitions that should be allocated then instead of the ``consumer.update_topics`` call you should use ``consumer.assign_topics``. In which you pass in a list of tuples, with the tuple containing the topic, and partition pairs. .. code-block:: python consumer.assign_topics([(topic, 0), (topic, 3)]) Manually Creating Topics ------------------------ It is advised that topics are created before trying to consume from them. Sending to topics that are not created yet is normally fine. But consuming from un-created topics can sometimes lead to delayed processing. .. code-block:: python from ska_sdp_dataqueues import DataQueueAdmin kafka_host = f"ska-sdp-kafka.{KUBE_NAMESPACE}:9092" async with DataQueueAdmin(kafka_host) as admin: # Creating normal topics await admin.create_topics(["topic-1", "topic-2"]) # Creating topics and setting the amount of partitions they must have at minimum await admin.create_topics([("topic-3", 5), ("topic-4", 10)]) # Mixing partitions with not setting partitions: await admin.create_topics([("topic-5", 3), "topic-6"]) Testing locally --------------- In order to run either the tests or custom test scripts on your local machine, you need to have a kafka instance running locally too. This can be achieved in various ways, with or without zookeeper running (see e.g. .gitlab-ci.yaml file). Here is an option to start kafka without zookeeper, using docker. This image will keep running until you cancel it. Once it's started (may take a minute), you can execute tests and other code, by setting the ``kafka_host`` in ``DataQueue(kafka_host)`` to ``localhost:9092``. Start kafka in docker: .. code-block:: bash docker run -it --name kafka-zkless -p 9092:9092 -e LOG_DIR=/tmp/logs quay.io/strimzi/kafka:latest-kafka-2.8.1-amd64 /bin/sh -c 'export CLUSTER_ID=$(bin/kafka-storage.sh random-uuid) && bin/kafka-storage.sh format -t $CLUSTER_ID -c config/kraft/server.properties && bin/kafka-server-start.sh config/kraft/server.properties' .. _xradio: https://github.com/casangi/xradio.git