I have the following project:
There is ActimeMQ JMS topic where GPS devices (unique ID is IMEI) sends (indirect) his current status (GPS position and other IO values) messages real-time (pro 8-12 secs). I have to show this data per devices (devices is associated with customers) real-time in a map in the browser for a user. (JMS subscriber can filter messages with IMEI).
I have an AppFuse 3.0 framework project:
- JSF Mojarra 2.2.10 (PrimeFaces 5.1)
- OmniFaces 1.8.1
- PrettyFaces 3.3.3
- Hibernate 4.1
- Spring 4.1.2
- Spring Security 3.2,
- EL 2.2
- MySQL 5.x
- (etc)
in a normal servlet (3.x) container like Tomcat 7.x.
No EJB, no CDI(?).
I (want) use the PrimeFaces Push (WebSocket) to sends JMS messages to browser.
I'am very confused how to design the architecture and implement to properly open/close JMS connection/session, who is careing about opened JMS sessions to avoid memory eating, closing JMS to avoid receive message tn non authenticated user, etc
Situations where problem can occur
- HttpSession expired
- any ajax/non ajax request exception
- user close browser
- root user can view other customer devices (switch via dropdown), I have to change filtering on JMS messages real-time
- ?
I've made some tests with a @SessionScoped @ManagedBean implements JMS MessageListener, but for example @PreDestroy never invoked...JMS is still opened after HttpSession is expired.
@Component
@ManagedBean
@SessionScoped
public class HomePage extends BasePage implements Serializable, MessageListener { // SessionAwareMessageListener
private static final long serialVersionUID = 1L;
/*
* Channel name (url) for client push request
*/
private final static String CHANNEL = "/notify";
@Autowired
private CarManager carManager;
@Autowired
private TopicConnectionFactory connectionFactory;
@Autowired
private Topic jmsTopic;
private List<Car> carsOnMap;
private Map<String, CarDTO> carByIMEI;
private TopicConnection connection;
private TopicSession session;
private TopicSubscriber subscriber;
private ObjectMapper jsonMapper = new ObjectMapper();
public HomePage() {
super();
}
public void startJMSTopicSubscriber(String msgSelector) {
try {
if (connection == null) {
LOG.info("JMS connection not started yet, init and start now");
connection = connectionFactory.createTopicConnection();
connection.start();
} else {
LOG.info("JMS connection already started, close subscriber");
if (subscriber != null) {
subscriber.close();
}
// if (session != null) { session.close(); }
// connection.stop();
subscriber = null;
// session = null;
}
if (session == null) {
LOG.info("JMS connection session not created yet, create it");
session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
}
LOG.info("create JMS subscriber with selector: {}", msgSelector);
subscriber = session.createSubscriber(jmsTopic, msgSelector, false);
subscriber.setMessageListener(this);
} catch (Exception e) {
LOG.error("Error in JMS Topic start", e);
}
}
public void stopJMSTopicSubscriber() {
try {
if (subscriber != null) {
subscriber.close();
LOG.debug("JMS subscriber closed on leaving HomePage view");
}
} catch (JMSException e) {
LOG.error("Error closing JMS subscriber", e);
}
try {
if (session != null) {
session.close();
LOG.debug("JMS session closed on leaving HomePage view");
}
} catch (JMSException e) {
LOG.error("Error closing JMS session", e);
}
try {
if (connection != null) {
connection.stop();
connection.close();
LOG.debug("JMS connection stopped/closed on leaving HomePage view");
}
} catch (JMSException e) {
LOG.error("Error stopping/closing JMS connection", e);
}
}
@PostConstruct
public void init() {
reloadCars();
}
@PreDestroy
public void destroy() {
LOG.debug("PreDestroy called, try stoppping JMS");
stopJMSTopicSubscriber();
}
public void reloadCars() {
LOG.debug("Reloading cars from DB");
if (managedPartner == null) {
managedPartner = getLoggedInUser().getPartner();
}
if (managedPartner != null) {
carsOnMap = getCarManager().getCarsByPartner(managedPartner);
} else {
carsOnMap = new ArrayList<Car>();
}
LOG.debug("Generate IMEI filter for JMS subscriber; build CarByIME map");
String imeis = "";
StringBuilder sb = new StringBuilder("imei IN (");
carByIMEI = new HashMap<String, CarDTO>();
int i = 0;
for (Car car : carsOnMap) {
if (car.getDevice() != null && StringUtils.isNotBlank(car.getDevice().getMobileDevice())) {
CarDTO carDTO = new CarDTO(car);
carByIMEI.put(car.getDevice().getMobileDevice(), carDTO);
if (i > 0) {
sb.append(',');
}
sb.append('\'');
sb.append(car.getDevice().getMobileDevice());
sb.append('\'');
}
i++;
}
sb.append(')');
imeis = sb.toString();
startJMSTopicSubscriber(imeis);
RequestContext reqCtx = RequestContext.getCurrentInstance();
try {
reqCtx.addCallbackParam("carsIMEI", jsonMapper.writeValueAsString(carByIMEI));
} catch (Exception e) {
LOG.error("Error in mapping car imeis to JSON", e);
}
} // reloadCars
@Override
public void onMessage(Message msg) {
try {
if (msg instanceof ActiveMQBytesMessage) {
ActiveMQBytesMessage byteMsg = (ActiveMQBytesMessage) msg;
WMessage wmsg = null;
String invalidMsg = "";
try {
wmsg = new WMessage(correctbuf);
} catch (Exception e1) {
invalidMsg = processDeviceJMSMsgData(buf);
}
if (wmsg != null) {
EventBus eventBus = EventBusFactory.getDefault().eventBus();
eventBus.publish(CHANNEL, wmsg);
} else {
LOG.warn("INVALID Msg: {}", invalidMsg);
}
} else {
LOG.trace(String.format("* * * JMS onMessage, type: %s, msg:%s", msg.getClass().getName(), msg));
}
} catch (Exception jmsex) {
LOG.error("device JMS onMessage error:", jmsex);
}
} // onMessage
}
@PushEndpoint("/notify")
@Singleton
public class NotifyResource {
//protected final Log log = LogFactory.getLog(getClass());
private static final Logger LOG = LoggerFactory.getLogger(NotifyResource.class);
@OnOpen
public void onOpen(RemoteEndpoint r, EventBus e) {
LOG.debug("push onOpen");
}
@OnClose
public void onClose(RemoteEndpoint r, EventBus e) {
LOG.debug("push onClose");
}
@OnMessage(encoders = { JSONEncoder.class })
public WMessage onMessage(WMessage message) {
//log.debug(message);
LOG.debug(message.toString());
return message;
}
}
I implemented a Spring HttpSessionEventPublisher to catch session destroy but I don't know how to get "httpsession active" JMS connection to close. Should I manage the JMS connections pro user at my own?!?
public class JMSContainerShutDownHook extends HttpSessionEventPublisher implements ServletContextListener {
private static final Log log = LogFactory.getLog(JMSContainerShutDownHook.class);
private void shutDownJMSContainer(ServletContext servletContext) {
log.info("Destroyed called");
...
log.info("Exiting.");
}
@Override
public void contextInitialized(ServletContextEvent servletContextEvent) {
log.info("contextInitialized called");
}
@Override
public void contextDestroyed(ServletContextEvent servletContextEvent) {
log.info("contextDestroyed called");
ServletContext servletContext = servletContextEvent.getServletContext();
shutDownJMSContainer(servletContext);
}
@Override
public void sessionCreated(HttpSessionEvent event) {
log.info("HTTP sessionCreated called");
ServletContext servletContext = event.getSession().getServletContext();
super.sessionCreated(event);
}
@Override
public void sessionDestroyed(HttpSessionEvent event) {
log.info("HTTP sessionDestroyed called");
ServletContext servletContext = event.getSession().getServletContext();
shutDownJMSContainer(servletContext);
super.sessionDestroyed(event);
}
}
Any kind of help is welcome
@BalusC :-)
Aucun commentaire:
Enregistrer un commentaire