dimanche 1 mars 2015

Architect design JMS Topic messages to JSF 2.2 (PrimeFaces) client

I have the following project:


There is ActimeMQ JMS topic where GPS devices (unique ID is IMEI) sends (indirect) his current status (GPS position and other IO values) messages real-time (pro 8-12 secs). I have to show this data per devices (devices is associated with customers) real-time in a map in the browser for a user. (JMS subscriber can filter messages with IMEI).


I have an AppFuse 3.0 framework project:



  • JSF Mojarra 2.2.10 (PrimeFaces 5.1)

  • OmniFaces 1.8.1

  • PrettyFaces 3.3.3

  • Hibernate 4.1

  • Spring 4.1.2

  • Spring Security 3.2,

  • EL 2.2

  • MySQL 5.x

  • (etc)


in a normal servlet (3.x) container like Tomcat 7.x.


No EJB, no CDI(?).


I (want) use the PrimeFaces Push (WebSocket) to sends JMS messages to browser.


I'am very confused how to design the architecture and implement to properly open/close JMS connection/session, who is careing about opened JMS sessions to avoid memory eating, closing JMS to avoid receive message tn non authenticated user, etc


Situations where problem can occur



  • HttpSession expired

  • any ajax/non ajax request exception

  • user close browser

  • root user can view other customer devices (switch via dropdown), I have to change filtering on JMS messages real-time

  • ?


I've made some tests with a @SessionScoped @ManagedBean implements JMS MessageListener, but for example @PreDestroy never invoked...JMS is still opened after HttpSession is expired.



@Component
@ManagedBean
@SessionScoped
public class HomePage extends BasePage implements Serializable, MessageListener { // SessionAwareMessageListener

private static final long serialVersionUID = 1L;

/*
* Channel name (url) for client push request
*/
private final static String CHANNEL = "/notify";

@Autowired
private CarManager carManager;

@Autowired
private TopicConnectionFactory connectionFactory;

@Autowired
private Topic jmsTopic;

private List<Car> carsOnMap;

private Map<String, CarDTO> carByIMEI;

private TopicConnection connection;
private TopicSession session;
private TopicSubscriber subscriber;

private ObjectMapper jsonMapper = new ObjectMapper();

public HomePage() {
super();
}

public void startJMSTopicSubscriber(String msgSelector) {
try {
if (connection == null) {
LOG.info("JMS connection not started yet, init and start now");
connection = connectionFactory.createTopicConnection();
connection.start();
} else {
LOG.info("JMS connection already started, close subscriber");
if (subscriber != null) {
subscriber.close();
}
// if (session != null) { session.close(); }
// connection.stop();
subscriber = null;
// session = null;
}
if (session == null) {
LOG.info("JMS connection session not created yet, create it");
session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
}
LOG.info("create JMS subscriber with selector: {}", msgSelector);
subscriber = session.createSubscriber(jmsTopic, msgSelector, false);
subscriber.setMessageListener(this);

} catch (Exception e) {
LOG.error("Error in JMS Topic start", e);
}
}

public void stopJMSTopicSubscriber() {
try {
if (subscriber != null) {
subscriber.close();
LOG.debug("JMS subscriber closed on leaving HomePage view");
}
} catch (JMSException e) {
LOG.error("Error closing JMS subscriber", e);
}
try {
if (session != null) {
session.close();
LOG.debug("JMS session closed on leaving HomePage view");
}
} catch (JMSException e) {
LOG.error("Error closing JMS session", e);
}
try {
if (connection != null) {
connection.stop();
connection.close();
LOG.debug("JMS connection stopped/closed on leaving HomePage view");
}
} catch (JMSException e) {
LOG.error("Error stopping/closing JMS connection", e);
}

}

@PostConstruct
public void init() {
reloadCars();
}

@PreDestroy
public void destroy() {
LOG.debug("PreDestroy called, try stoppping JMS");
stopJMSTopicSubscriber();
}

public void reloadCars() {
LOG.debug("Reloading cars from DB");
if (managedPartner == null) {
managedPartner = getLoggedInUser().getPartner();
}
if (managedPartner != null) {
carsOnMap = getCarManager().getCarsByPartner(managedPartner);
} else {
carsOnMap = new ArrayList<Car>();
}

LOG.debug("Generate IMEI filter for JMS subscriber; build CarByIME map");
String imeis = "";
StringBuilder sb = new StringBuilder("imei IN (");
carByIMEI = new HashMap<String, CarDTO>();
int i = 0;
for (Car car : carsOnMap) {
if (car.getDevice() != null && StringUtils.isNotBlank(car.getDevice().getMobileDevice())) {
CarDTO carDTO = new CarDTO(car);
carByIMEI.put(car.getDevice().getMobileDevice(), carDTO);
if (i > 0) {
sb.append(',');
}
sb.append('\'');
sb.append(car.getDevice().getMobileDevice());
sb.append('\'');
}
i++;
}
sb.append(')');
imeis = sb.toString();

startJMSTopicSubscriber(imeis);

RequestContext reqCtx = RequestContext.getCurrentInstance();
try {
reqCtx.addCallbackParam("carsIMEI", jsonMapper.writeValueAsString(carByIMEI));
} catch (Exception e) {
LOG.error("Error in mapping car imeis to JSON", e);
}
} // reloadCars

@Override
public void onMessage(Message msg) {
try {
if (msg instanceof ActiveMQBytesMessage) {
ActiveMQBytesMessage byteMsg = (ActiveMQBytesMessage) msg;
WMessage wmsg = null;
String invalidMsg = "";
try {
wmsg = new WMessage(correctbuf);
} catch (Exception e1) {
invalidMsg = processDeviceJMSMsgData(buf);
}
if (wmsg != null) {
EventBus eventBus = EventBusFactory.getDefault().eventBus();
eventBus.publish(CHANNEL, wmsg);
} else {
LOG.warn("INVALID Msg: {}", invalidMsg);
}
} else {
LOG.trace(String.format("* * * JMS onMessage, type: %s, msg:%s", msg.getClass().getName(), msg));
}
} catch (Exception jmsex) {
LOG.error("device JMS onMessage error:", jmsex);
}
} // onMessage

}


@PushEndpoint("/notify")
@Singleton
public class NotifyResource {

//protected final Log log = LogFactory.getLog(getClass());
private static final Logger LOG = LoggerFactory.getLogger(NotifyResource.class);


@OnOpen
public void onOpen(RemoteEndpoint r, EventBus e) {
LOG.debug("push onOpen");
}

@OnClose
public void onClose(RemoteEndpoint r, EventBus e) {
LOG.debug("push onClose");
}

@OnMessage(encoders = { JSONEncoder.class })
public WMessage onMessage(WMessage message) {
//log.debug(message);
LOG.debug(message.toString());
return message;
}
}


I implemented a Spring HttpSessionEventPublisher to catch session destroy but I don't know how to get "httpsession active" JMS connection to close. Should I manage the JMS connections pro user at my own?!?



public class JMSContainerShutDownHook extends HttpSessionEventPublisher implements ServletContextListener {

private static final Log log = LogFactory.getLog(JMSContainerShutDownHook.class);

private void shutDownJMSContainer(ServletContext servletContext) {
log.info("Destroyed called");
...
log.info("Exiting.");
}

@Override
public void contextInitialized(ServletContextEvent servletContextEvent) {
log.info("contextInitialized called");
}

@Override
public void contextDestroyed(ServletContextEvent servletContextEvent) {
log.info("contextDestroyed called");
ServletContext servletContext = servletContextEvent.getServletContext();
shutDownJMSContainer(servletContext);
}

@Override
public void sessionCreated(HttpSessionEvent event) {
log.info("HTTP sessionCreated called");
ServletContext servletContext = event.getSession().getServletContext();
super.sessionCreated(event);
}

@Override
public void sessionDestroyed(HttpSessionEvent event) {
log.info("HTTP sessionDestroyed called");
ServletContext servletContext = event.getSession().getServletContext();
shutDownJMSContainer(servletContext);
super.sessionDestroyed(event);
}


}


Any kind of help is welcome


@BalusC :-)


Aucun commentaire:

Enregistrer un commentaire