Cluster-wide Messaging

If you need to communicate across the cluster, Liferay has you covered...

Introduction

I posted two blog posts this week, and each of them had a particular drawback - lack of cluster coordination.

The first blog post on cluster-enabled upgrade processes introduced using an @Reference dependency upon a specific Release version, the idea being that the component shouldn't start unless that version was available. When the upgrade process would run on the cluster leader, it would know the Release was available (that's an automatic thing the upgrade process framework will take care of for us), but the other nodes in the cluster would not be aware the Release version was available until they were restarted. Kind of defeats the purpose of allowing the cluster to start at the same time if you have to come back and restart to get the other nodes working...

In the next example, the logging perisister guy I created knows how to get the changed log level and will write to the DB, but it doesn't announce the change to the other nodes, so they will not know that there is a new logging level configuration until they are restarted...

So here were two clear cases where it would be great if I could send a cluster-wide notification to automatically let the other nodes know that the Release was ready to go or that a log level change needed to be processed...

It so happens that Liferay allows us to build a cluster-wide messaging system to do things like this, so I thought I'd work up a blog to show how we can augment these two examples to make them cluster-friendly.

Precursor to Cluster Messaging

If you're not familiar with it, we should first talk about the Liferay Message Bus (LMB). The LMB is a lightweight messaging system that allows sending messages within the JVM to message listeners, a way to decouple code and turn an otherwise synchronous process into an asynchronous one.

When we want to start cluster-wide messaging, we have to start by incorporating the LMB in as it the backbone for cluster messaging. In fact, our solution is going to be leveraging OOTB functionality to send our LMB messages to the cluster...

Liferay Message Bus

So when you are dealing with the LMB, there's a number of pieces that you're going to be working with.

The first and most important is going to be your Message. The com.liferay.portal.kernel.messaging.Message is a special container for passing data. A message is kind of like a Map<String, Object> that exposes getters for type coercion and a single put method. Additionally it has a payload to send a complex java object, and some envelope details such as a response destination and a field to send a response object. Within a JVM, there are few restrictions on the Message because it never leaves the JVM it is in.

When we bridge the LMB to handle the cluster messaging, though, some hopefully obvious restrictions will apply, namely that to pass objects between JVMs, your objects must be serializable and they must be the same code on all of the nodes. Additionally, you should consider following Java best practices and use a unique serialVersionUID value every time you change the serializable object as this helps to ensure that the objects that Java is trying to serialize/deserialize are the same version and helps to avoid screwy outcomes when they otherwise are not the same.

When I'm working with messages, I like to add timestamps into the Message object, especially when those messages are going to the cluster. It doesn't really add much to the message size, but it can help with understanding future issues you might have.

The next important aspect is the Destination. A destination itself is really just a string representing the name. You'll always want to ensure you're picking good names for your destinations and avoid possibility of destination name conflicts, otherwise you might get messages you don't expect or have your messages processed by listeners that can't handle the message correctly. Although the destination is just a string, it does need to be registered before you can start sending or receiving on it. We'll see the registration in a bit.

The next aspect is a message sender. This can be simply an @Reference annotation to inject the com.liferay.portal.kernel.messaging.MessageBus service into your class and then use its sendMessage() method to post a new message. I prefer to create a formal interface and an implementation to hide the details, so for example I would define a LoggingLevelChangedMessageSender interface with a sendLogLevelChangedMessage(final String logName, final String priority) method. The implementation would take care of creating and populating the Message object with the details and sending it to the right destination using the MessageBus service. Structuring the code this way means any component that needs to send the message can use my handy sender and won't have to recreate all of the details of sending the messages on the LMB.

The last piece you need is a MessageListener implementation. The listener is the class that receives and processes the Message object. It may be a call and forget implementation where the listener doesn't respond, or it could use the response details in the Message to send a result back to the caller on another destination.

Implementing the LMB Code

So I'll be adding the code to the logging persistence repo separately, so here let's tackle the upgrade problem from the previous blog.

Our requirement is going to be simple - The last step of any upgrade process executing on the cluster leader should notify the cluster that the upgrade to the new Release version has completed.

So for our message, we're going to send a few minor details as values in the Message. We'll send the "servletContextName" and the "schemaVersion" values that represent the updated Release record.

We're going to combine some of the aspects into our message sender implementation, so let's start by defining the interface:

public interface UpgradeMessageSender {

  /**
   * sendUpgradeSuccess: Used to send a success message for a completed upgrade.
   * @param servletContextName The servlet context the upgrade was for.
   * @param schemaVersion The version that was upgraded to.
   * @throws PortalException in case of failure.
   */
  void sendUpgradeSuccess(final String servletContextName, final String schemaVersion)
    throws PortalException;
}

So, pretty simple so far, so we're ready to move on to the destination...

As discussed above, the destination we're going to use, "upgrade/success", needs to be registered with Liferay, and we do this using an @Activate method on a component. You can use your message sender or your message listener for this (hint: they're both implemented as components), but I prefer to keep mine separate from the sender and listener. I wouldn't want some unresolved reference to prevent my destination from starting, after all. So my destination registrator becomes:

@Component(
  immediate = true
)
public class UpgradeDestinationRegistrator {
  
  @Activate
  private void _activate(BundleContext bundleContext) {
    Destination destination = _destinationFactory.createDestination(
      DestinationConfiguration.createSerialDestinationConfiguration(
        "upgrade/success"));
    
    _serviceRegistration = bundleContext.registerService(
      Destination.class, destination,
      MapUtil.singletonDictionary(
        "destination.name", destination.getName()));
  }
  
  @Deactivate
  private void _deactivate() {
    if (_serviceRegistration != null) {
      _serviceRegistration.unregister();
    }
  }
  
  @Reference
  private DestinationFactory _destinationFactory;
  
  private ServiceRegistration<Destination> _serviceRegistration;
}

I've highlighted the method we're using to create the destination configuration, in this case it's the serial destination configuration which the Liferay documentation touches on here.

With the destination ready, we can next tackle the custom message sender. He's pretty simple:

@Component(
  immediate = true,
  service = UpgradeMessageSender.class
)
public class UpgradeMessageSenderImpl implements UpgradeMessageSender {
  @Override
  public void sendUpgradeSuccess(String servletContextName, String schemaVersion) 
      throws PortalException {

    // we need a message to populate
    Message message = new Message();
    
    // populate with the fields we expect
    message.put("servletContextName", servletContextName);
    message.put("schemaVersion", schemaVersion);
    
    // add some other fields that might be useful
    message.put("timestamp", System.currentTimeMillis());

    try {
      InetAddress address = InetAddressUtil.getLocalInetAddress();
      message.put("origin", address.getHostName());
    } catch (Exception e) {
      // we do not need this, so we'll just discard the exception
    }
    
    // ready to send the message
    messageBus.sendMessage("upgrade/success", message);
  }

  @Reference(unbind = "-")
  private MessageBus messageBus;

  @Reference(unbind = "-")
  private Portal portal;
}

Just looking at this class, it may not seem like a separate, custom message sender benefits us too much. To me, though, all of this detail such as the destination to send to, the message attributes (or perhaps we use the payload to set a POJO instead), these details should be hidden behind the interface so that components that just want to send an upgrade success message can do so w/o having to get all of the details right. To me, it just makes sense to hide even this simple message sender behind an interface to make the rest of the code simpler.

Note that I've tacked some additional attributes onto the message; the timestamp for when the message was created as well as the origin so I know where it is being sent from. When all you are doing is using the internal LMB, you don't really need these as they won't really change, but they will come in handy when you are sending your messages to the cluster...

And that's actually what we're going to do next. The last piece for our LMB adventure is to create the MessageListener which will process the messages. We know that our Message contains the details we need for the Release, and we can use the com.liferay.portal.upgrade.internal.release.ReleasePublisher class to publish the new version from our message listener:

@Component(
  immediate = true,
  service = MessageListener.class
)
public class UpgradeSuccessMessageListener extends BaseMessageListener {
  @Override
  protected void doReceive(Message message) throws Exception {
    String servletContextName = message.getString("servletContextName");

    final Release release = releaseLocalService.fetchRelease(servletContextName);

    RegistryUtil.getRegistry().callService("com.liferay.portal.upgrade.internal.release.ReleasePublisher", 
        (Object obj) -> {
      try {
        ReflectionUtil
          .getDeclaredMethod(obj.getClass(), "publish", Release.class)
          .invoke(obj, release);
      } catch (Exception e) {
        _log.error("Error publishing release: {}", e.getMessage(), e);
      }

      return null;
    });
  }

  @Reference(unbind = "-")
  private ReleaseLocalService releaseLocalService;

  private static final Logger _log = 
      LoggerFactory.getLogger(UpgradeSuccessMessageListener.class);
}

So this takes care of our LMB-based implementation for supporting the Release update, but we still haven't tackled the cluster aspect yet...

Cluster Bridge Message Listener

I wrote earlier that we were going to be using an OOTB Liferay class to bridge between the LMB and the cluster messaging, and this is it. We will simply need to register an instance of com.liferay.portal.kernel.cluster.messaging.ClusterBridgeMessageListener as a listener on our destination to handle the bridge. This class will receive a message from the LMB and will send it using ClusterLink to the other nodes in the cluster.

We register the bridge for our upgrade as follows:

@Component(
    immediate = true
)
public class ClusterBridgeMessageListenerRegistrator {

  @Activate
  protected void activate() {
    ClusterBridgeMessageListener bridgeMessageListener = 
      new ClusterBridgeMessageListener();
    bridgeMessageListener.setPriority(Priority.LEVEL5);
    _destination.register(bridgeMessageListener);
    _clusterBridgeMessageListener = bridgeMessageListener;
  }

  @Deactivate
  protected void deactivate() {
    _destination.unregister(_clusterBridgeMessageListener);
  }

  @Reference(target = "(destination.name=upgrade/success)")
  private Destination _destination;

  private MessageListener _clusterBridgeMessageListener;
}

With this component deployed, every message that our UpgradeMessageSender sends to the upgrade/success destination will be received by the ClusterBridgeMessageListener and broadcast via ClusterLink

And that's basically it! Amazing, huh?

See, you don't have to worry about receiving the cluster message, Liferay already handles that for us. Behind the scenes, there's the ClusterForwardReceiver that actually gets messages from the cluster, and it filters messages originating from the current node (so a node doesn't bombard itself with its own messages), but then it sends them on to the ClusterLinkImpl class which basically takes the message and sends it to the destination, but this time it's the ones on the other nodes.

Wrapping Up The Code

So our final step to pull everything together, we need that last upgrade step that will issue the notification.

I would solve this using a fairly simple upgrade step:

public class ClusterReleaseNotificationUpgradeStep extends UpgradeProcess {
  public ClusterReleaseNotificationUpgradeStep(
      String servletContextName, String schemaVersion, 
      UpgradeMessageSender upgradeMessageSender) {
    this.servletContextName = servletContextName;
    this.schemaVersion = schemaVersion;
    this.upgradeMessageSender = upgradeMessageSender;
  }

  @Override
  protected void doUpgrade() throws Exception {
    // send the message that the upgrade is finished
    upgradeMessageSender.sendUpgradeSuccess(
      servletContextName, schemaVersion);
  }

  private final String servletContextName;
  private final String schemaVersion;
  private final UpgradeMessageSender upgradeMessageSender;
}

And I can add it to my upgrade steps in MyCustomModuleUpgrade's register step:

registry.register(
  "com.liferay.mycustommodule", "1.1.0", "2.0.0",
  new com.liferay.mycustommodule.upgrade.v2_0_0.UpgradeFoo(),
  new UpgradeBar(), 
  new ClusterReleaseNotificationUpgradeStep("com.example.mycustom.service", 
    "2.0.0", _upgradeMessageSender));

And now when the cluster leader is done upgrading Foo and Bar, it will also invoke this new upgrade step and provide the details. The upgrade step will use the custom message sender to send the message containing these details, the cluster bridge message listener will forward the message to ClusterLink, it will receive the message and forward to the LMB on each of the nodes where a message listener will process the message, invoking the Liferay code to report the new release is available, and any of those minions mentioned on the last blog with the @Reference(target = "(&(release.bundle.symbolic.name=com.example.mycustom.service)(&release.schema.version>=2.0.0)))") line will magically spring to life since the Release will now be available.

Conclusion

In the log-persist repo in the messaging branch, I've gone ahead and added similar code to ensure that log level changes are also broadcast throughout the cluster.

So here were two real-world scenarios where I had working solutions to problems, but they were not cluster aware. By adding the support for messaging and incorporating the ClusterBridgeMessageListener, I was able to make both solutions cluster-aware and cluster-friendly, and in a non-clustered environment things would work there too.

Hopefully you now have some ideas how you might take a solution you have and make it cluster-aware in the same way.

Please share your ideas, I'd love to hear them!

Blogs