package org.tp23.xtomp;
import asia.stampy.client.listener.validate.ClientMessageValidationListener;
import asia.stampy.client.message.connect.ConnectMessage;
import asia.stampy.client.message.subscribe.SubscribeMessage;
import asia.stampy.client.netty.ClientNettyChannelHandler;
import asia.stampy.client.netty.ClientNettyMessageGateway;
import asia.stampy.client.netty.connected.NettyConnectedMessageListener;
import asia.stampy.client.netty.disconnect.NettyDisconnectListenerAndInterceptor;
import asia.stampy.common.gateway.AbstractStampyMessageGateway;
import asia.stampy.common.gateway.HostPort;
import asia.stampy.common.gateway.SecurityMessageListener;
import asia.stampy.common.gateway.StampyMessageListener;
import asia.stampy.common.heartbeat.HeartbeatContainer;
import asia.stampy.common.message.StampyMessage;
import asia.stampy.common.message.StompMessageType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Xtomp Java client using https://github.com/mrstampy/Stampy
*
* Current Stampy has a couple of bugs, one that has not been addressed in years.
*
* Seems this project is unmaintained.
*
* @author teknopaul
*/
public class XtompStampyClient {
private static final Logger LOG = LoggerFactory.getLogger(XtompStampyClient.class);
private ClientNettyChannelHandler channelHandler;
private ClientNettyMessageGateway gateway;
private HeartbeatContainer heartbeatContainer;
public static void main (String... args) throws Exception {
XtompStampyClient xtompClient = new XtompStampyClient();
AbstractStampyMessageGateway gateway = xtompClient.initialize();
gateway.connect();
/**
* Workaround for https://github.com/mrstampy/Stampy/issues/16
*/
Thread.sleep(1000L);
HostPort hostPort = gateway.getConnectedHostPorts().iterator().next();
gateway.sendMessage(new ConnectMessage("1.2", "localhost") {
/**
* Workaround for https://github.com/mrstampy/Stampy/issues/8
*/
@Override
protected String postHeader() {
return "";
}
}, hostPort);
Thread.sleep(1000L);
gateway.sendMessage(new SubscribeMessage("memtop-a", "1") {
/**
* Workaround for https://github.com/mrstampy/Stampy/issues/8
*/
@Override
protected String postHeader() {
return "";
}
}, hostPort);
Thread.sleep(1000L);
}
public AbstractStampyMessageGateway initialize() {
heartbeatContainer = new HeartbeatContainer();
gateway = new ClientNettyMessageGateway();
gateway.setPort(61613);
gateway.setHost("localhost");
gateway.setHeartbeat(60000);
channelHandler = new ClientNettyChannelHandler();
channelHandler.setGateway(gateway);
channelHandler.setHeartbeatContainer(heartbeatContainer);
gateway.addMessageListener(new SecurityMessageListener(){
public StompMessageType[] getMessageTypes() {
return new StompMessageType[] {
StompMessageType.MESSAGE
};
}
public boolean isForMessage(StampyMessage<?> stampyMessage) {
return true;
}
public void messageReceived(StampyMessage<?> stampyMessage, HostPort hostPort) throws Exception {
/**
* N.B. https://github.com/mrstampy/Stampy/issues/14
* '\n' characters have been stripped from the message content.
*/
LOG.info(stampyMessage.toStompMessage(false));
}
});
gateway.addMessageListener(new StampyMessageListener() {
public StompMessageType[] getMessageTypes() {
return new StompMessageType[] {
StompMessageType.MESSAGE
};
}
public boolean isForMessage(StampyMessage<?> stampyMessage) {
return true;
}
public void messageReceived(StampyMessage<?> stampyMessage, HostPort hostPort) throws Exception {
/**
* N.B. https://github.com/mrstampy/Stampy/issues/14
* '\n' characters have been stripped from the message content.
*/
LOG.info(stampyMessage.toStompMessage(false));
}
});
gateway.addMessageListener(new ClientMessageValidationListener());
NettyConnectedMessageListener cml = new NettyConnectedMessageListener();
cml.setHeartbeatContainer(heartbeatContainer);
cml.setGateway(gateway);
gateway.addMessageListener(cml);
NettyDisconnectListenerAndInterceptor disconnect = new NettyDisconnectListenerAndInterceptor();
disconnect.setCloseOnDisconnectMessage(false);
gateway.addMessageListener(disconnect);
gateway.addOutgoingMessageInterceptor(disconnect);
disconnect.setGateway(gateway);
gateway.setHandler(channelHandler);
return gateway;
}
}