package org.tp23.xtomp;

import asia.stampy.client.listener.validate.ClientMessageValidationListener;
import asia.stampy.client.message.connect.ConnectMessage;
import asia.stampy.client.message.subscribe.SubscribeMessage;
import asia.stampy.client.netty.ClientNettyChannelHandler;
import asia.stampy.client.netty.ClientNettyMessageGateway;
import asia.stampy.client.netty.connected.NettyConnectedMessageListener;
import asia.stampy.client.netty.disconnect.NettyDisconnectListenerAndInterceptor;
import asia.stampy.common.gateway.AbstractStampyMessageGateway;
import asia.stampy.common.gateway.HostPort;
import asia.stampy.common.gateway.SecurityMessageListener;
import asia.stampy.common.gateway.StampyMessageListener;
import asia.stampy.common.heartbeat.HeartbeatContainer;
import asia.stampy.common.message.StampyMessage;
import asia.stampy.common.message.StompMessageType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Xtomp Java client using https://github.com/mrstampy/Stampy
 *
 * Current Stampy has a couple of bugs, one that has not been addressed in years.
 *
 * Seems this project is unmaintained.
 *
 * @author teknopaul
 */
public class XtompStampyClient {

	private static final Logger LOG = LoggerFactory.getLogger(XtompStampyClient.class);

	private ClientNettyChannelHandler channelHandler;
	private ClientNettyMessageGateway gateway;
	private HeartbeatContainer heartbeatContainer;

	public static void main (String... args) throws Exception {
		XtompStampyClient xtompClient = new XtompStampyClient();
		AbstractStampyMessageGateway gateway = xtompClient.initialize();

		gateway.connect();
		/**
		 * Workaround for https://github.com/mrstampy/Stampy/issues/16
		 */
		Thread.sleep(1000L);
		HostPort hostPort = gateway.getConnectedHostPorts().iterator().next();
		gateway.sendMessage(new ConnectMessage("1.2", "localhost") {
			/**
			 * Workaround for  https://github.com/mrstampy/Stampy/issues/8
			 */
			@Override
			protected String postHeader() {
				return "";
			}
		}, hostPort);

		Thread.sleep(1000L);

		gateway.sendMessage(new SubscribeMessage("memtop-a", "1") {
			/**
			 * Workaround for  https://github.com/mrstampy/Stampy/issues/8
			 */
			@Override
			protected String postHeader() {
				return "";
			}
		}, hostPort);

		Thread.sleep(1000L);

	}



	public AbstractStampyMessageGateway initialize() {
		heartbeatContainer = new HeartbeatContainer();

		gateway = new ClientNettyMessageGateway();
		gateway.setPort(61613);
		gateway.setHost("localhost");
		gateway.setHeartbeat(60000);

		channelHandler = new ClientNettyChannelHandler();
		channelHandler.setGateway(gateway);
		channelHandler.setHeartbeatContainer(heartbeatContainer);

		gateway.addMessageListener(new SecurityMessageListener(){
			public StompMessageType[] getMessageTypes() {
				return new StompMessageType[] {
						StompMessageType.MESSAGE
				};
			}
			public boolean isForMessage(StampyMessage<?> stampyMessage) {
				return true;
			}
			public void messageReceived(StampyMessage<?> stampyMessage, HostPort hostPort) throws Exception {
				/**
				 * N.B. https://github.com/mrstampy/Stampy/issues/14
				 * '\n' characters have been stripped from the message content.
				 */
				LOG.info(stampyMessage.toStompMessage(false));
			}
		});
		gateway.addMessageListener(new StampyMessageListener() {

			public StompMessageType[] getMessageTypes() {
				return new StompMessageType[] {
					StompMessageType.MESSAGE
				};
			}

			public boolean isForMessage(StampyMessage<?> stampyMessage) {
				return true;
			}

			public void messageReceived(StampyMessage<?> stampyMessage, HostPort hostPort) throws Exception {
				/**
				 * N.B. https://github.com/mrstampy/Stampy/issues/14
				 * '\n' characters have been stripped from the message content.
				 */
				LOG.info(stampyMessage.toStompMessage(false));
			}
		});

		gateway.addMessageListener(new ClientMessageValidationListener());

		NettyConnectedMessageListener cml = new NettyConnectedMessageListener();
		cml.setHeartbeatContainer(heartbeatContainer);
		cml.setGateway(gateway);
		gateway.addMessageListener(cml);

		NettyDisconnectListenerAndInterceptor disconnect = new NettyDisconnectListenerAndInterceptor();
		disconnect.setCloseOnDisconnectMessage(false);
		gateway.addMessageListener(disconnect);
		gateway.addOutgoingMessageInterceptor(disconnect);
		disconnect.setGateway(gateway);

		gateway.setHandler(channelHandler);

		return gateway;
	}

}