package org.tp23.xtomp; import asia.stampy.client.listener.validate.ClientMessageValidationListener; import asia.stampy.client.message.connect.ConnectMessage; import asia.stampy.client.message.subscribe.SubscribeMessage; import asia.stampy.client.netty.ClientNettyChannelHandler; import asia.stampy.client.netty.ClientNettyMessageGateway; import asia.stampy.client.netty.connected.NettyConnectedMessageListener; import asia.stampy.client.netty.disconnect.NettyDisconnectListenerAndInterceptor; import asia.stampy.common.gateway.AbstractStampyMessageGateway; import asia.stampy.common.gateway.HostPort; import asia.stampy.common.gateway.SecurityMessageListener; import asia.stampy.common.gateway.StampyMessageListener; import asia.stampy.common.heartbeat.HeartbeatContainer; import asia.stampy.common.message.StampyMessage; import asia.stampy.common.message.StompMessageType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Xtomp Java client using https://github.com/mrstampy/Stampy * * Current Stampy has a couple of bugs, one that has not been addressed in years. * * Seems this project is unmaintained. * * @author teknopaul */ public class XtompStampyClient { private static final Logger LOG = LoggerFactory.getLogger(XtompStampyClient.class); private ClientNettyChannelHandler channelHandler; private ClientNettyMessageGateway gateway; private HeartbeatContainer heartbeatContainer; public static void main (String... args) throws Exception { XtompStampyClient xtompClient = new XtompStampyClient(); AbstractStampyMessageGateway gateway = xtompClient.initialize(); gateway.connect(); /** * Workaround for https://github.com/mrstampy/Stampy/issues/16 */ Thread.sleep(1000L); HostPort hostPort = gateway.getConnectedHostPorts().iterator().next(); gateway.sendMessage(new ConnectMessage("1.2", "localhost") { /** * Workaround for https://github.com/mrstampy/Stampy/issues/8 */ @Override protected String postHeader() { return ""; } }, hostPort); Thread.sleep(1000L); gateway.sendMessage(new SubscribeMessage("memtop-a", "1") { /** * Workaround for https://github.com/mrstampy/Stampy/issues/8 */ @Override protected String postHeader() { return ""; } }, hostPort); Thread.sleep(1000L); } public AbstractStampyMessageGateway initialize() { heartbeatContainer = new HeartbeatContainer(); gateway = new ClientNettyMessageGateway(); gateway.setPort(61613); gateway.setHost("localhost"); gateway.setHeartbeat(60000); channelHandler = new ClientNettyChannelHandler(); channelHandler.setGateway(gateway); channelHandler.setHeartbeatContainer(heartbeatContainer); gateway.addMessageListener(new SecurityMessageListener(){ public StompMessageType[] getMessageTypes() { return new StompMessageType[] { StompMessageType.MESSAGE }; } public boolean isForMessage(StampyMessage<?> stampyMessage) { return true; } public void messageReceived(StampyMessage<?> stampyMessage, HostPort hostPort) throws Exception { /** * N.B. https://github.com/mrstampy/Stampy/issues/14 * '\n' characters have been stripped from the message content. */ LOG.info(stampyMessage.toStompMessage(false)); } }); gateway.addMessageListener(new StampyMessageListener() { public StompMessageType[] getMessageTypes() { return new StompMessageType[] { StompMessageType.MESSAGE }; } public boolean isForMessage(StampyMessage<?> stampyMessage) { return true; } public void messageReceived(StampyMessage<?> stampyMessage, HostPort hostPort) throws Exception { /** * N.B. https://github.com/mrstampy/Stampy/issues/14 * '\n' characters have been stripped from the message content. */ LOG.info(stampyMessage.toStompMessage(false)); } }); gateway.addMessageListener(new ClientMessageValidationListener()); NettyConnectedMessageListener cml = new NettyConnectedMessageListener(); cml.setHeartbeatContainer(heartbeatContainer); cml.setGateway(gateway); gateway.addMessageListener(cml); NettyDisconnectListenerAndInterceptor disconnect = new NettyDisconnectListenerAndInterceptor(); disconnect.setCloseOnDisconnectMessage(false); gateway.addMessageListener(disconnect); gateway.addOutgoingMessageInterceptor(disconnect); disconnect.setGateway(gateway); gateway.setHandler(channelHandler); return gateway; } }