Share
BLOG

Pulsar, Musings from a DataStax and Confluent Partner

Summary:
Blog #1 in a series that will highlight the aspects of Pulsar that make it an attractive prospect for your messaging and data streaming needs.

Like many, we were excited and intrigued to see DataStax’s acquisition of Kesque. In almost every intensive data driven project, we end up needing some form of “get this data from here over to there reliably”. I can count on one hand the times when neither Kafka nor Confluent is at least some part of that conversation. This is certainly true with most of our large DataStax/Cassandra/Scylla clients.


Kesque is a commercial cloud messaging service built on top of Pulsar. Upon review, Pulsar is simpler to work with yet can scale to large complexity with more flexibility than other messaging platform offerings. In this Blog Series I will highlight aspects of Pulsar that make it an attractive prospect for your messaging and data streaming needs. With key points of interest being; 


  • A streamlined Client API
  • Integrated Tiered Storage
  • A Stateless Broker architecture with a decoupled Distributed Ledger system for data persistence
  • Multi-Tenancy support and management
  • Cloud-Native Design, Kesque & Luna Streaming by DataStax and production-ready Kubernetes support
  • Integrated Schema Registry for type control and management
  • Effectively-Once Transactional processing
  • Integrated Geo-Replication
  • Integrated monitoring support with Prometheus (No JMX necessary Yay!)
  • Drop-in Pulsar Adaptors for existing Kafka implementations
  • Pulsar IO, An API and support for 3rd party connectors
  • Pulsar SQL, SQL like access to in-flight streams
  • Security, End-toEnd Encryption


Messaging applications like Pulsar, Kafka, RabbitMQ and others are becoming very popular. As they provide the means by which various, disparate, legacy and modern enterprise distributed application workflows can be loosely coupled to one another. Providing centralized and easier to manage workflow pipelines to move your data, over a common data platform to and from applications that do the analysis and computation necessary to do your work.


In this light, with DataStax opting to include a streaming component to their ‘Big Data One-Stop Shop’ offering makes complete sense. However, we’d not done all that much with Apache Pulsar but we’ll probably encounter more of it because of this development.



Simplicity as a Foundation


From digging into Pulsar over a few days, it became clear that a lot of thought has been applied from lessons learned from competing applications. It offers many features that are comparable to Kafka and other messaging and streaming apps, with a focus on getting up and running quickly out of the box. From configuring a cluster, to writing producers and consumers, and finally administration tools, everything about Pulsar is straightforward and stream-lined and well organized. I like Simplicity!


For sure there exist many elements and options that offer the ability to tune and refine a cluster configuration as you scale (this is where more complexity comes into play of course), but it is pretty easy to get a working cluster up and running to start using in short order.


Out of the box, getting a stand-alone developer instance up and running took me about 15 minutes. This is something I look for when I start working with a new application. How fast can I start tinkering! Once I had a dev instance running it took just a short amount of time to get a sample producer and consumer operational in Java as per the example code in the Pulsar documentation. I love it when the sample code documentation works without a fuss!


Next I decided to set up a simple cluster. This too was pretty easy, but the documentation for Apache Pulsar although accurate was not as precise as I would like in what you really needed to do to configure a 3 Broker and 3 Bookie cluster on bare-metal. After some searching and tinkering I was able to stand up a cluster pretty quick. (Later in this blog series, I will offer detailed instructions on how you can do this for yourself.)


Once you have zookeeper, a Bookie and a Broker operational, topics are auto-created when first referenced by a producer or a consumer. (Direct management of topics is provided by the pulsar-admin tool.) So sending messages right away was a breeze without the need to go to the admin tools to set up a topic before testing my sample code.


Pulsar is oriented around a Pub-Sub architecture in general. This is how it works when you first start it up. Thus if this is what you seek is to start pushing message data from one point to another from the get-go, with Pulsar you have what you need at the onset to start writing apps to accomplish this without having to go through a host of configuration and setup before you can do so.


Producing and Consuming


Pulsar is built upon a Publish-Subscribe (pub-sub) pattern. While being so, it is much more than that. It offers the ability for high-rate real-time streaming capacity like kafka, as well as supporting more conventional queue oriented message publishing and consumption patterns like RabbitMQ etc., via competing consumers, fail-over subscriptions, and easy message fan out. 


Thus Pulsar is an ideal choice where you wish to have the best of both worlds in a single platform. If you need high throughput capacity with the ability to partition your topics, you can do that. If you have requirements that are simpler and you have a less of a need for speed, with simpler yet flexible  processing semantics you can do that too. All while guaranteeing delivery of your data.


The Producer


Like many messaging apps, the Producer is what is used to send/publish your message data to Pulsar.  Here I will cover some general examples of how easy it is to work with producers.


To produce messages to Pulsar, you first need a broker URL list like this:


pulsar://192.168.1.6:6650,192.168.1.6:6651,192.168.1.6:6652


The client API (client APIs exist for JAVA, Python, GO, C++, Node.js, C#, Websockets) connects to the first available broker on the URL list. Hence if a given broker is unavailable, it goes to the next on the list.


Here is a sample of starting up a client in Java against a 3 broker cluster:


PulsarClient client = PulsarClient.builder()

    .serviceUrl("pulsar://192.168.1.6:6650,192.168.1.6:6651,192.168.1.6:6652")

    .build;


Once you have a client instance established you can construct a Producer, Consumer, or a Reader (more on this later).


Here is how you may establish a producer instance via the newly created client instance:


Producer<String> stringProducer = client.newProducer(Schema.STRING)

    .topic("my test topic")

    .create();


Once you have a producer established, you can start sending messages. Like so:

stringProducer.send(message);


As you can see, the setup for connecting to a Pulsar broker, establishing a producer and sending a message is simplicity at its best.

Real applications will require a little more complexity of course to be robust. But the core semantics required to work with Pulsar is very straightforward and simple.


Notice that when creating a new producer, you can define the schema type of the message being sent. Pulsar has a ‘built-in schema registry’ whereby you can enforce type safety for your messages. Pulsar supports a variety of schemas to choose from by default. And you can create custom schemas as needed. This is similar in fashion to the Schema Registry in Kafka. However instead of relying on a  separate service for the registry, it is built into the Pulsar framework itself. It does support varying degrees of compatibility levels as well, so that message schemas may support older schema types through Schema Evolution. And it has internal support for Protobuf. Avro is also an option.


Here is a simple example of sending a JSON typed message:


Producer<User> producer = client.newProducer(JSONSchema.of(User.class))

        .topic(topic)

        .create();

User user = new User(“Tom”, 28);

producer.send(user);


If you don’t specify a schema, then data is sent in raw byte form via byte[] arrays. In this case it is up to the producer and the consumer code to marshal and unmarshal the data to convert it to the appropriate type. This may be useful where you may only be interested in sending raw blocks of data from one point to another. Like chunked file blocks.


There are two forms of send. The one depicted so far is a blocking send. Where the producer will block until a received ack is returned by the Broker. 


The other form is async sending like so:


CompletableFuture<MessageId> sendFuture = stringProducer.sendAsync(message);


Async sends returns a Completable Future whereby you can apply any post processing of a send in a separate thread, unblocking for the next read, for handling of when the receive ack is returned or if a failure were to occur. I find this very elegant and easy to use.


In addition to the message you can define a Key for the message. If partitioned topics are used, messages with the same key will be posted to the same topic partition. Without topic partitioning, keyed messages are guaranteed to be received by the same consumer, in a shared subscription scenario. You can also add custom properties to a message which is a nifty feature where you can assign additional metadata to your message.


Pulsar supports tunable quorum writes and reads on a per topic basis. To provide varying degrees of consistency and availability according to your application needs. Thus you can ensure your messages are delivered and are guaranteed to be available for consumption.


For faster throughput Pulsar provides batching support. It is pretty easy to enable, like so:


stringProducer = client.newProducer(Schema.STRING)

    .topic(pulsarConfig.getTopic())

    .enableBatching(true)

    .batchingMaxBytes(1000000)

    .create();


Batching of messages is essentially always used in Kafka. In Pulsar you can choose to use it or not. In some cases not enabling batching may be desirable for simple low throughput messaging requirements.



Consumers and Readers


There are two ways to consume messages from Pulsar, each with distinct read semantics when consuming from Pulsar topics.


With a client established as described in creating a Producer, the Pulsar client is also used to create a consumer like so:


consumer = client.newConsumer()

   .topic(pulsarConfig.getTopic())

   .subscriptionName(pulsarConfig.getSubscription())

   .subscriptionType(SubscriptionType.Shared)

   .subscribe();


With a consumer, a subscription to a topic is specified. Named subscriptions along with namespaces (multi-tenancy feature I will get to later) allows for logical grouping of consumers to topics. The ability for a consumer instance to subscribe to more than one topic also exists, and reg-ex style patterns may be applied for a single consumer subscribing to more than one topic. There are also a variety of subscription modes listed as follows: 


  • Exclusive - Only the first consumer to connect will be allowed to subscribe. All others will receive an error.
  • Failover - Multiple consumers can connect with the same subscription. But the first one in will be active, all others will be on stand-by. If the primary consumer dies, one of the others will take over.
  • Shared - All consumers subscribed to the same topic receive messages in round-robin fashion. (Order is not preserved in this case)
  • Key_Shared - Multiple consumers can subscribe to the same topic and receive messages, but messages with the same key will always go to the same consumer, in order. (Much like in Kafka)


There are additional behaviors when a topic is partitioned. When a consumer is subscribed to a topic, messages for a given partition are delivered to a single consumer, with message key support to ensure ordering and delivery to consumers for the same key.  Partitioned topics allows for greater throughput in some cases (streaming support), a strategy which has been proven in Kafka which is also available to you in Pulsar. But unlike Kafka, partitions are not stored in a single partition file, and are not bound to a specific broker. They are distributed across multiple ledgers on multiple Bookkeeper servers. (More on this when I get to the Pulsar architecture.


To receive data from a consumer instance, the method to do so looks like this:


Message msg = consumer.receive();


try {

   // Acknowledge the message so that it can be deleted by the message broker

   consumer.acknowledge(msg);


} catch (Exception e) {

   // Message failed to process, redeliver later

   consumer.negativeAcknowledge(msg);

}


A nifty feature is if a consumer was unable to fully receive and acknowledge a message, 

you can send a negative acknowledgement for a message ID and the broker will resend it.  


Consumers are designed to read messages and then acknowledge them right away. The pointers to messages in Pulsar are referred to as cursors. Cursors for Consumers are managed automatically.


Not having to manage cursors in a consumer instance explicitly eases some of the complexity around consumer message management. In production implementations it is a good idea to persist references to Message IDs last processed in case of a catastrophic failure, however it appears that this aspect was thought about in the Pulsar broker design and effort was made to alleviate the management around which message was processed, and where to resume after restart or failure. For many applications, once a message is consumed and acknowledged, that is good enough. And the message can be discarded from the persistence layer once received and acknowledged. 


Another interesting feature is dead-letter queue support. So for messages that cannot be handled for some reason for a period of time, they will be moved to a special topic by Pulsar where they can be retried or failed out properly, without slowing down the boat.


Consumers may receive messages in batch form as well. One thing to note is that if doing so and the consumer is processing batched messages, keyed messages may not be grouped together unless the producer side uses KeyBasedBatcher.


The form of consumer just described is a blocking receive. You can do async receives as well, where a CompletableFuture is returned like so:


CompletableFuture receiveFuture = consumer.receiveAsync();


Again providing an elegant mechanism to asynchronously process messages consumed.


Pulsar provides metaphors to flexibly manage message data in a variety of ways to suit the many needs of your data pipeline architecture. For applications that have a requirement for retaining the messages over time there does exist the ability to set retention policies for message data so that a topic can be replayed. This is where Readers come into play.


Consumers are designed specifically to consume. Thus it is expected that a receive acknowledgement follows a successful receipt of a message. Readers are designed to start reading at any point of the topic stream, presumably where the retention setting for persistence exists for a period where data will be available at some later point in time. To me, this is an elegant design approach to separate cursor management from the Consumer pattern.


Here is a simple example of using a reader:


byte[] msgIdBytes = // Some message ID byte array

MessageId id = MessageId.fromByteArray(msgIdBytes);

Reader reader = pulsarClient.newReader()

        .topic(topic)

        .startMessageId(id)

        .create();


while (true) {

    Message message = reader.readNext();

    // Process message

}


Readers do not send an acknowledgement, thus other readers may come in and read through the message stream for another purpose. The period of time that messages are retained for successive reads via Readers is managed by the data retention policy established for a topic.


The code sample above demonstrates pointing the Reader object to a specific message (by ID). You can also use MessageId.earliest to point to the earliest available message on the topic or MessageId.latest to point to the most recent available message.


For producers, consumers and readers there exist a wide variety of additional options that can be selected from to control their behavior. I have only touched the surface in this regard, and mainly to demonstrate how simple it is to work with them. Instead of setting properties before construction, the builder pattern is used to set options for behavior and can end with a send() or receive() via method chaining. Which makes the code implemented cleaner and simpler to read through.


Other features of note when working with messages in Pulsar is that a variety of end-to-end encryption and compression is available. By design, options exist where data can be encrypted when persisted to the ledger which is a feature missing in many other messaging applications, where security concerns in the storage layer is a distinct requirement (without the need to encrypt the entire disk). Using encryption for persistence does add overhead of course on the client producers and consumers, as one may expect. Even so, Pulsar provides a feature by design which has been asked of me by clients for other messaging stacks that lack it.


Cloud-Native by Design

Sending and receiving messages is great. But how does Pulsar scale? This is where the complexity comes in. Anyone who has worked on Big Data projects wants to know ‘how can I leverage my data pipeline to scale to terabytes of data with many thousands of connections’. And more importantly, how difficult is it to manage?


This is where Pulsar looks very interesting.


Pulsar has been designed from the ground up to operate in the cloud. The Pulsar documentation provides a comprehensive production ready Helm Chart that you can modify to your needs to deploy to a cloud provider of your choice. For anyone who has set up a cloud deployment specification in Helm for deployment via Kubernetes can appreciate this. As there are many elements that require consideration, from resource management to authentication and authorization, TLS, etc., which can be time consuming to define and tweak to get working properly. Pulsar provides the basis for this for you where all you need to focus on is the specific elements of  configuration for the cluster you want to deploy into kubernetes pods on your cloud provider of choice  via Helm.


Additionally, the Pulsar admin tools are comprehensive, well organized and straightforward to figure out and use.


If you don’t want to spend the time on managing your kubernetes environment, Kesque by Datastax will do this for you. I created an account on Kesque to check this out. You just specify the cloud provider and region you wish for your cluster to reside in, define a handful of configuration items and within minutes you have an operational cluster in the cloud to start working with.


The Kesque Dashboard looks to be pretty comprehensive in options for defining and managing your cluster. From topic and namespace management, adding additional clusters, providing sinks and sources, credentials, functions, etc. And makes it easy to do so.



With inherent cloud-native support, and by the nature of its architecture (coming up next), Pulsar provides the means to scale up and scale down your messaging infrastructure according to demand and cost requirements. This is a very exciting development in the Big Data messaging world. As the Kesque/Pulsar offering makes large scale messaging development and management very approachable to the small company. And from what I can see so far, much easier to manage and develop against, for larger organizations with extensive and regional infrastructure requirements.


There are many more features which I have yet to touch upon, like multi-tenancy, geo-replication built in, tiered storage, stateless brokers, the use of ledgers over commit logs, Pulsar IO connectors, Pulsar SQL, Monitoring integration via Prometheus, Adaptors for existing Kafka code and much more which I will dive into in this series.


Stay tuned for my next Blog in this series where I will dig deeper into the Pulsar architecture with more specifics on how it can scale to Big Data capacities.



Subscribe to our quarterly newsletter

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.