Publish/Subscribe

Publish/Subscribe

This tutorial will introduce you to the fundamentals of the Solace API by connecting a client, adding a topic subscription and sending a message matching this topic subscription. This forms the basis for any publish / subscribe message exchange.

Assumptions

This tutorial assumes the following:

  • You are familiar with Solace core concepts.
  • You have access to Solace messaging with the following configuration details:
    • Connectivity information for a Solace message-VPN
    • Enabled client username and password

One simple way to get access to Solace messaging quickly is to create a messaging service in Solace Cloud as outlined here. You can find other ways to get access to Solace messaging below.

Goals

The goal of this tutorial is to demonstrate the most basic messaging interaction using Solace. This tutorial will show you:

  1. How to build and send a message on a topic
  2. How to subscribe to a topic and receive a message

Get Solace Messaging

This tutorial requires access Solace messaging and requires that you know several connectivity properties about your Solace messaging. Specifically you need to know the following:

Resource Value Description
Host String This is the address clients use when connecting to the Solace messaging to send and receive messages. (Format: DNS_NAME:Port or IP:Port)
Message VPN String The Solace message router Message VPN that this client should connect to.
Client Username String The client username. (See Notes below)
Client Password String The client password. (See Notes below)

There are several ways you can get access to Solace Messaging and find these required properties.

Option 1: Use Solace Cloud

  • Follow these instructions to quickly spin up a cloud-based Solace messaging service for your applications.
  • The messaging connectivity information is found in the service details in the connectivity tab (shown below). You will need:
    • Host:Port (use the SMF URI)
    • Message VPN
    • Client Username
    • Client Password

Option 2: Start a Solace VMR

  • Follow these instructions to start the Solace VMR in leading Clouds, Container Platforms or Hypervisors. The tutorials outline where to download and how to install the Solace VMR.
  • The messaging connectivity information are the following:
    • Host: <public_ip> (IP address assigned to the VMR in tutorial instructions)
    • Message VPN: default
    • Client Username: sampleUser (can be any value)
    • Client Password: samplePassword (can be any value)

    Note: By default, the Solace VMR “default” message VPN has authentication disabled.

Option 3: Get access to a Solace appliance

  • Contact your Solace appliance administrators and obtain the following:
    • A Solace Message-VPN where you can produce and consume direct and persistent messages
    • The host name or IP address of the Solace appliance hosting your Message-VPN
    • A username and password to access the Solace appliance

Obtaining the Solace API

This tutorial depends on you having the Solace Messaging API for Java Real-Time Optimized (JavaRTO). The JavaRTO API library can be downloaded here. The JavaRTO API is distributed as a tar file containing the required jars and libs, API documentation, and examples. The instructions in the Building section assume you’re using Gradle and using the JavaRTO API for Linux x86_64 platform. If your environment or target runtime platform differs, then download the appropriate JavaRTO API and adjust the build instructions appropriately.

NOTE: The Solace JavaRTO API (a.k.a solclientj) is a low-latency Java Native Interface (JNI) wrapper for the Solace C Messaging API (SolClient). Therefore, during compile time the JNI wrapper jar solclient.jar must be added a dependency and C API library packaged with the JavaRTO API must be included in your runtime library path.

Compile Dependency: Using Gradle

repositories {
    flatDir(dir: "<PATH_TO_EXTRACTED_API>/solclientj/lib", name: 'Java RTO API lib directory')
}

dependencies {
    // Solace Messaging API for JavaRTO Dependencies
    compile("com.solacesystems:solclientj:")
}

Compile Dependency: Using Maven

<repositories>
  <repository>
    <id>localrepository</id>
    <url>file://<PATH_TO_EXTRACTED_API>/solclientj/lib/</url>
  </repository>
</repositories>

[...]

<dependency>
  <groupId>com.solacesystems</groupId>
  <artifactId>solclientj</artifactId>
  <version>+</version>
</dependency>

Connecting to the Solace message router

In order to send or receive messages, an application must connect a Solace session. The Solace session is the basis for all client communication with the Solace message router.

In the Solace messaging API for JavaRTO (solclientj), a few distinct steps are required to create and connect a Solace session.

  • The API must be initialized
  • A Context is needed to control application threading
  • Appropriate asynchronous callbacks must be declared
  • The Session must be created

Initializing the JavaRTO API

To initialize the JavaRTO API, you call the initialize method with arguments that control global configuration properties (Solclient.GLOBAL_PROPERTIES), or an empty String array, if there are no global properties to set.

// Solclient needs to be initialized before any other API calls.
Solclient.init(new String[0]);

This call must be made prior to making any other calls to the JavaRTO API. It allows the API to initialize internal state and buffer pools.

Context Creation

As outlined in the core concepts, the context object is used to control threading that drives network I/O and message delivery and acts as containers for sessions. In the JavaRTO API, application must first instantiate an object with the ContextHandle interface by calling Solclient.Allocator.newContextHandle(), then create the Context, which will be bound to the given unbound ContextHandle.

// Context
final ContextHandle contextHandle = Solclient.Allocator.newContextHandle();
Solclient.createContextForHandle(contextHandle, new String[0]);

Asynchronous Callbacks

The JavaRTO API is predominantly an asynchronous API designed for the highest speed and lowest latency. As such most events and notifications occur through callbacks. In order to get up and running, the following basic callbacks are required at a minimum.

final CountDownLatch latch = new CountDownLatch(1);

// A message callback to receive messages asynchronously
MessageCallback messageCallback = new MessageCallback() {
    @Override
    public void onMessage(Handle handle) {
        try {
            // Get the received msg from the handle
            MessageSupport messageSupport = (MessageSupport) handle;
            MessageHandle rxMessage = messageSupport.getRxMessage();

            // Process the message ...

            // To display the contents of a message in human-readable form
            System.out.println(" Received message: ");
            rxMessage.dump(SolEnum.MessageDumpMode.FULL);

        } catch (SolclientException e) {
            // Handle exception
        } finally {
            latch.countDown(); // unblock main thread
        }     
    }
};

// A session event callback to events such as connect/disconnect events
SessionEventCallback sessionEventCallback = new SessionEventCallback() {
    @Override
    public void onEvent(SessionHandle sessionHandle) {
        System.out.println("Received SessionEvent:" + sessionHandle.getSessionEvent());
    }
};

The messageCallback is invoked for each Direct message received by the Session. In this sample, the message is printed to the screen. For the purpose of this tutorial a countdown latch is used to block the consumer thread until a single message has been received.

The sessionEventCallback is invoked for various significant session events like connection, disconnection, and other API session events. In this sample, simply prints the events. See the JavaRTO API documentation and samples for details on the session events.

Session Creation

Finally a session is needed to actually connect to the Solace message router. This is accomplished by creating a properties array and connecting the session.

// Configure the Session properties
ArrayList<String> sessionProperties = new ArrayList<String>();
sessionProperties.add(SessionHandle.PROPERTIES.HOST);
sessionProperties.add(host);
sessionProperties.add(SessionHandle.PROPERTIES.USERNAME);
sessionProperties.add(username);
sessionProperties.add(SessionHandle.PROPERTIES.PASSWORD);
sessionProperties.add(password);
sessionProperties.add(SessionHandle.PROPERTIES.VPN_NAME);
sessionProperties.add(vpnName);
String[] props = new String[sessionProperties.size()];

// Instantiate a new SessionHandle instance
final SessionHandle sessionHandle = Solclient.Allocator.newSessionHandle();

// Create the Session
contextHandle.createSessionForHandle(sessionHandle, sessionProperties.toArray(props), messageCallback, sessionEventCallback);

// Connect the Session
sessionHandle.connect();

Similar to the creating a Context, a new instance of SessionHandle must be created first and then allocated using the previously created contextHandle instance. The bounded SessionHandle, session properties, the message callback, and the session callback functions are passed as parameters. The final call to SessionHandle.connect establishes the connection to the Solace message router which makes the session ready for use.

At this point your client is connected to the Solace message router. You can use SolAdmin to view the client connection and related details.

Receiving a message

This tutorial is uses “Direct” messages which are at most once delivery messages. So first, let’s express interest in the messages by subscribing to a Solace topic. Then you can look at publishing a matching message and see it received.

With a session connected in the previous step, then you must subscribe to a topic in order to express interest in receiving messages. This tutorial uses the topics “tutorial/topic”.

Topic topic = Solclient.Allocator.newTopic("tutorial/topic");
sessionHandle.subscribe(topic, SolEnum.SubscribeFlags.WAIT_FOR_CONFIRM, 0);

Then after the subscription is added, the consumer is started. At this point the consumer is ready to receive messages. For the purpose of this tutorial a countdown latch is used here to block the consumer thread until a single message has been received.

try {
    latch.await(); // block here until message received, and latch will
                    // flip
} catch (InterruptedException e) {
    System.out.println("I was awoken while waiting");
}

Sending a message

Now it is time to send a message to the waiting consumer.

To send a message, you must create a message and a topic destination. This tutorial will send a Solace binary message with contents “Hello world!”. Then you must send the message to the Solace message router.

// Create a new binary message
final MessageHandle messageHandle = Solclient.Allocator.newMessageHandle();
Solclient.createMessageForHandle(messageHandle);

// Set the destination on the message
Topic topic = Solclient.Allocator.newTopic("tutorial/topic");
messageHandle.setDestination(topic);

// Create the content to publish and attach to message
String contentStr = "Hello world!";
ByteBuffer content = ByteBuffer.allocateDirect(contentStr.length());
content.put(contentStr.getBytes());
content.flip();
messageHandle.setBinaryAttachment(content);

// Send the message
sessionHandle.send(messageHandle);

// Finally free the allocated message
try {
    messageHandle.destroy();
} catch (Throwable t) {
    System.err.println("Unable to call destroy on messageCallback " + t.getCause());
}

In the JavaRTO API, messages are allocated and freed from an internal API message pool for greatest performance and efficiency. Therefore as shown, messages must be acquired by calls to Solclient.createMessageForHandle using an unbound MessageHandle instance, and then later freed back to the pool by calls to MessageHandle.destroy.

The minimum properties required to create a JavaRTO message that can be sent is to set the delivery mode, queue or topic destination, and message contents as shown in the above code. Once the message is created it is sent to the Solace message router with a call to SessionHandle.send.

At this point the producer has sent a message to the Solace message router and your waiting consumer will have received the message and printed its contents to the screen.

Summarizing

The full source code for this example is available in GitHub. If you combine the example source code shown above results in the following source:

Getting the Source

This tutorial is available in GitHub. To get started, clone the GitHub repository containing the Solace samples.

git clone https://github.com/SolaceSamples/solace-samples-javarto
cd solace-samples-javarto

Building

Building these examples is simple. You can simply build the project using Gradle.

./gradlew assemble

This builds all of the JavaRTO Getting Started Samples with OS specific launch scripts. The files are staged in the build/staged directory.

Running the Sample

Before you can run the sample, you must set the appropriate library paths for your specific runtime platform. Below instructions assume your runtime platform is Linux x86_64 and the corresponding JavaRTO API tar has been unpacked into solclientj subdirectory of your GitHub repository for this tutorial. For ideas on how to build on other platforms you can consult the README of the JavaRTO API library.

First, include the lib directory of the unpacked solclientj in your LD_LIBRARY_PATH:

export LD_LIBRARY_PATH=`pwd`/solclientj/lib:$LD_LIBRARY_PATH

If you start the TopicSubscriber, with the required arguments of your Solace messaging, it will connect and wait for a message.

$ ./build/staged/bin/TopicSubscriber <host:port> <client-username>@<message-vpn> <client-password>
TopicSubscriber initializing...
 Initializing the Java RTO Messaging API...
 Creating a context ...
 Creating a session ...
 Connecting session ...
 Received SessionEvent:ResponseCode [0] Info [] SessionEventEnum[0]:[UP_NOTICE] CorrelationKey [0]
 Subscribing to topic: tutorial/topic
 Subscribed. Awaiting message...

Then you can send a message using the TopicPublisher with the same arguments. If successful, the output for the producer will look like the following:

$ ./build/staged/bin/topicPublisher <host:port> <client-username>@<message-vpn> <client-password>
TopicPublisher initializing...
 Initializing the Java RTO Messaging API...
 Creating a context ...
 Creating a session ...
 Connecting session ...
 Received SessionEvent:ResponseCode [0] Info [] SessionEventEnum[0]:[UP_NOTICE] CorrelationKey [0]
 Creating message to publish ...
 Sending message with content: Hello world!
 Message Sent. Existing.

With the message delivered the subscriber output will look like the following:

 Received a message with content: Hello world!
 Complete message dump:
Destination:                            Topic 'tutorial/topic'
Class Of Service:                       COS_1
DeliveryMode:                           DIRECT
Binary Attachment:                      len=12
  48 65 6c 6c 6f 20 77 6f  72 6c 64 21                  Hello wo   rld!

 Existing.

The received message is printed to the screen. The binary message contents was “Hello world!” as expected and the message dump contains extra information about the Solace message that was received.

You have now successfully connected a client, subscribed to a topic and exchanged messages using this topic.

If you have any issues sending and receiving a message, check the Solace community for answers to common issues.