Blogs
If you need to communicate across the cluster, Liferay has you covered...
Introduction
I posted two blog posts this week, and each of them had a particular drawback - lack of cluster coordination.
The first blog post on cluster-enabled upgrade processes
introduced using an @Reference
dependency upon a
specific Release version, the idea being that the component
shouldn't start unless that version was available. When the upgrade
process would run on the cluster leader, it would know the Release
was available (that's an automatic thing the upgrade process
framework will take care of for us), but the other nodes in the
cluster would not be aware the Release version was available until
they were restarted. Kind of defeats the purpose of allowing the
cluster to start at the same time if you have to come back and
restart to get the other nodes working...
In the next example, the logging perisister guy I created knows how to get the changed log level and will write to the DB, but it doesn't announce the change to the other nodes, so they will not know that there is a new logging level configuration until they are restarted...
So here were two clear cases where it would be great if I could send a cluster-wide notification to automatically let the other nodes know that the Release was ready to go or that a log level change needed to be processed...
It so happens that Liferay allows us to build a cluster-wide messaging system to do things like this, so I thought I'd work up a blog to show how we can augment these two examples to make them cluster-friendly.
Precursor to Cluster Messaging
If you're not familiar with it, we should first talk about the Liferay Message Bus (LMB). The LMB is a lightweight messaging system that allows sending messages within the JVM to message listeners, a way to decouple code and turn an otherwise synchronous process into an asynchronous one.
When we want to start cluster-wide messaging, we have to start by incorporating the LMB in as it the backbone for cluster messaging. In fact, our solution is going to be leveraging OOTB functionality to send our LMB messages to the cluster...
Liferay Message Bus
So when you are dealing with the LMB, there's a number of pieces that you're going to be working with.
The first and most important is going to be your
Message. The
com.liferay.portal.kernel.messaging.Message
is a special
container for passing data. A message is kind of like a
Map<String, Object>
that exposes getters for type
coercion and a single put method. Additionally it has a payload to
send a complex java object, and some envelope details such as a
response destination and a field to send a response object. Within a
JVM, there are few restrictions on the Message because it never leaves
the JVM it is in.
When we bridge the LMB to handle the cluster messaging, though,
some hopefully obvious restrictions will apply, namely that to pass
objects between JVMs, your objects must be serializable and they must
be the same code on all of the nodes. Additionally, you should
consider following Java best practices and use a unique
serialVersionUID
value every time you change the
serializable object as this helps to ensure that the objects that Java
is trying to serialize/deserialize are the same version and helps to
avoid screwy outcomes when they otherwise are not the same.
When I'm working with messages, I like to add timestamps into the Message object, especially when those messages are going to the cluster. It doesn't really add much to the message size, but it can help with understanding future issues you might have.
The next important aspect is the Destination. A destination itself is really just a string representing the name. You'll always want to ensure you're picking good names for your destinations and avoid possibility of destination name conflicts, otherwise you might get messages you don't expect or have your messages processed by listeners that can't handle the message correctly. Although the destination is just a string, it does need to be registered before you can start sending or receiving on it. We'll see the registration in a bit.
The next aspect is a message sender. This can
be simply an @Reference
annotation to inject the
com.liferay.portal.kernel.messaging.MessageBus
service
into your class and then use its sendMessage() method to post a new
message. I prefer to create a formal interface and an implementation
to hide the details, so for example I would define a
LoggingLevelChangedMessageSender interface with a
sendLogLevelChangedMessage(final String logName, final String
priority) method. The implementation would take care of creating and
populating the Message object with the details and sending it to the
right destination using the MessageBus service. Structuring the code
this way means any component that needs to send the message can use my
handy sender and won't have to recreate all of the details of sending
the messages on the LMB.
The last piece you need is a MessageListener implementation. The listener is the class that receives and processes the Message object. It may be a call and forget implementation where the listener doesn't respond, or it could use the response details in the Message to send a result back to the caller on another destination.
Implementing the LMB Code
So I'll be adding the code to the logging persistence repo separately, so here let's tackle the upgrade problem from the previous blog.
Our requirement is going to be simple - The last step of any upgrade process executing on the cluster leader should notify the cluster that the upgrade to the new Release version has completed.
So for our message, we're going to send a few minor details as values in the Message. We'll send the "servletContextName" and the "schemaVersion" values that represent the updated Release record.
We're going to combine some of the aspects into our message sender implementation, so let's start by defining the interface:
public interface UpgradeMessageSender { /** * sendUpgradeSuccess: Used to send a success message for a completed upgrade. * @param servletContextName The servlet context the upgrade was for. * @param schemaVersion The version that was upgraded to. * @throws PortalException in case of failure. */ void sendUpgradeSuccess(final String servletContextName, final String schemaVersion) throws PortalException; }
So, pretty simple so far, so we're ready to move on to the destination...
As discussed above, the destination we're going to use,
"upgrade/success", needs to be registered with Liferay, and
we do this using an @Activate
method on a component. You
can use your message sender or your message listener for this (hint:
they're both implemented as components), but I prefer to keep mine
separate from the sender and listener. I wouldn't want some unresolved
reference to prevent my destination from starting, after all. So my
destination registrator becomes:
@Component( immediate = true ) public class UpgradeDestinationRegistrator { @Activate private void _activate(BundleContext bundleContext) { Destination destination = _destinationFactory.createDestination( DestinationConfiguration.createSerialDestinationConfiguration( "upgrade/success")); _serviceRegistration = bundleContext.registerService( Destination.class, destination, MapUtil.singletonDictionary( "destination.name", destination.getName())); } @Deactivate private void _deactivate() { if (_serviceRegistration != null) { _serviceRegistration.unregister(); } } @Reference private DestinationFactory _destinationFactory; private ServiceRegistration<Destination> _serviceRegistration; }
I've highlighted the method we're using to create the destination configuration, in this case it's the serial destination configuration which the Liferay documentation touches on here.
With the destination ready, we can next tackle the custom message sender. He's pretty simple:
@Component( immediate = true, service = UpgradeMessageSender.class ) public class UpgradeMessageSenderImpl implements UpgradeMessageSender { @Override public void sendUpgradeSuccess(String servletContextName, String schemaVersion) throws PortalException { // we need a message to populate Message message = new Message(); // populate with the fields we expect message.put("servletContextName", servletContextName); message.put("schemaVersion", schemaVersion); // add some other fields that might be useful message.put("timestamp", System.currentTimeMillis()); try { InetAddress address = InetAddressUtil.getLocalInetAddress(); message.put("origin", address.getHostName()); } catch (Exception e) { // we do not need this, so we'll just discard the exception } // ready to send the message messageBus.sendMessage("upgrade/success", message); } @Reference(unbind = "-") private MessageBus messageBus; @Reference(unbind = "-") private Portal portal; }
Just looking at this class, it may not seem like a separate, custom message sender benefits us too much. To me, though, all of this detail such as the destination to send to, the message attributes (or perhaps we use the payload to set a POJO instead), these details should be hidden behind the interface so that components that just want to send an upgrade success message can do so w/o having to get all of the details right. To me, it just makes sense to hide even this simple message sender behind an interface to make the rest of the code simpler.
Note that I've tacked some additional attributes onto the message; the timestamp for when the message was created as well as the origin so I know where it is being sent from. When all you are doing is using the internal LMB, you don't really need these as they won't really change, but they will come in handy when you are sending your messages to the cluster...
And that's actually what we're going to do next. The last piece
for our LMB adventure is to create the MessageListener which will
process the messages. We know that our Message contains the details
we need for the Release, and we can use the
com.liferay.portal.upgrade.internal.release.ReleasePublisher
class to publish the new version from our message listener:
@Component( immediate = true, service = MessageListener.class ) public class UpgradeSuccessMessageListener extends BaseMessageListener { @Override protected void doReceive(Message message) throws Exception { String servletContextName = message.getString("servletContextName"); final Release release = releaseLocalService.fetchRelease(servletContextName); RegistryUtil.getRegistry().callService("com.liferay.portal.upgrade.internal.release.ReleasePublisher", (Object obj) -> { try { ReflectionUtil .getDeclaredMethod(obj.getClass(), "publish", Release.class) .invoke(obj, release); } catch (Exception e) { _log.error("Error publishing release: {}", e.getMessage(), e); } return null; }); } @Reference(unbind = "-") private ReleaseLocalService releaseLocalService; private static final Logger _log = LoggerFactory.getLogger(UpgradeSuccessMessageListener.class); }
So this takes care of our LMB-based implementation for supporting the Release update, but we still haven't tackled the cluster aspect yet...
Cluster Bridge Message Listener
I wrote earlier that we were going to be using an OOTB Liferay class to bridge between the LMB and the cluster messaging, and this is it. We will simply need to register an instance of com.liferay.portal.kernel.cluster.messaging.ClusterBridgeMessageListener as a listener on our destination to handle the bridge. This class will receive a message from the LMB and will send it using ClusterLink to the other nodes in the cluster.
We register the bridge for our upgrade as follows:
@Component( immediate = true ) public class ClusterBridgeMessageListenerRegistrator { @Activate protected void activate() { ClusterBridgeMessageListener bridgeMessageListener = new ClusterBridgeMessageListener(); bridgeMessageListener.setPriority(Priority.LEVEL5); _destination.register(bridgeMessageListener); _clusterBridgeMessageListener = bridgeMessageListener; } @Deactivate protected void deactivate() { _destination.unregister(_clusterBridgeMessageListener); } @Reference(target = "(destination.name=upgrade/success)") private Destination _destination; private MessageListener _clusterBridgeMessageListener; }
With this component deployed, every message that our UpgradeMessageSender sends to the upgrade/success destination will be received by the ClusterBridgeMessageListener and broadcast via ClusterLink
And that's basically it! Amazing, huh?
See, you don't have to worry about receiving the cluster message, Liferay already handles that for us. Behind the scenes, there's the ClusterForwardReceiver that actually gets messages from the cluster, and it filters messages originating from the current node (so a node doesn't bombard itself with its own messages), but then it sends them on to the ClusterLinkImpl class which basically takes the message and sends it to the destination, but this time it's the ones on the other nodes.
Wrapping Up The Code
So our final step to pull everything together, we need that last upgrade step that will issue the notification.
I would solve this using a fairly simple upgrade step:
public class ClusterReleaseNotificationUpgradeStep extends UpgradeProcess { public ClusterReleaseNotificationUpgradeStep( String servletContextName, String schemaVersion, UpgradeMessageSender upgradeMessageSender) { this.servletContextName = servletContextName; this.schemaVersion = schemaVersion; this.upgradeMessageSender = upgradeMessageSender; } @Override protected void doUpgrade() throws Exception { // send the message that the upgrade is finished upgradeMessageSender.sendUpgradeSuccess( servletContextName, schemaVersion); } private final String servletContextName; private final String schemaVersion; private final UpgradeMessageSender upgradeMessageSender; }
And I can add it to my upgrade steps in MyCustomModuleUpgrade's register step:
registry.register( "com.liferay.mycustommodule", "1.1.0", "2.0.0", new com.liferay.mycustommodule.upgrade.v2_0_0.UpgradeFoo(), new UpgradeBar(), new ClusterReleaseNotificationUpgradeStep("com.example.mycustom.service", "2.0.0", _upgradeMessageSender));
And now when the cluster leader is done upgrading Foo and Bar,
it will also invoke this new upgrade step and provide the details.
The upgrade step will use the custom message sender to send the
message containing these details, the cluster bridge message
listener will forward the message to ClusterLink, it will receive
the message and forward to the LMB on each of the nodes where a
message listener will process the message, invoking the Liferay code
to report the new release is available, and any of those minions
mentioned on the last blog with the @Reference(target =
"(&(release.bundle.symbolic.name=com.example.mycustom.service)(&release.schema.version>=2.0.0)))")
line will magically spring to life since the Release will now be available.
Conclusion
In the log-persist repo in the messaging branch, I've gone ahead and added similar code to ensure that log level changes are also broadcast throughout the cluster.
So here were two real-world scenarios where I had working solutions to problems, but they were not cluster aware. By adding the support for messaging and incorporating the ClusterBridgeMessageListener, I was able to make both solutions cluster-aware and cluster-friendly, and in a non-clustered environment things would work there too.
Hopefully you now have some ideas how you might take a solution you have and make it cluster-aware in the same way.
Please share your ideas, I'd love to hear them!