vendredi 20 février 2015

Spring Amqp sendAndReceive operation get null but no timeout happens

As spring amqp reference doc said "When the reply times out (replyTimeout), the sendAndReceive() methods return null". As i understand the default value for replyTimeout is 5 seconds.


So, what i'm doing is open 100 threads. Each thread send message to one queue using sendAndReceive operation. RabbitListener listen that queue, receive message and send reply.


And some of the threads get null earlier than 5 seconds. As i understand RabbitTemplate first must wait for 5 sec and if don't receive reply than return null. Help me please, what i'm doing wrong?


Context:



<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://ift.tt/GArMu6"
xmlns:xsi="http://ift.tt/ra1lAU"
xmlns:context="http://ift.tt/GArMu7"
xmlns:rabbit="http://ift.tt/VwJ3iM"

xsi:schemaLocation="http://ift.tt/GArMu6 http://ift.tt/1jdM0fG
http://ift.tt/GArMu7 http://ift.tt/1jdLYo7
http://ift.tt/VwJ3iM http://ift.tt/1ECnZrZ">

<context:component-scan base-package="ru.cib" />
<rabbit:annotation-driven />
<rabbit:connection-factory id="connFactory" host="localhost" username="guest" password="guest" channel-cache-size="10" />
<rabbit:admin id="admin" connection-factory="connFactory" />
<rabbit:queue name="requestQueue" />
<rabbit:direct-exchange name="directExchange">
<rabbit:bindings>
<rabbit:binding key="directRequestKey" queue="requestQueue" />
</rabbit:bindings>
</rabbit:direct-exchange>

<rabbit:template id="requestTemplate" connection-factory="connFactory" channel-transacted="true" reply-timeout="10000" />

<bean id="rabbitTxManager" class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager">
<property name="connectionFactory" ref="connFactory" />
</bean>

<bean id="rabbitListenerContainerFactory" class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory">
<property name="connectionFactory" ref="connFactory" />
<property name="concurrentConsumers" value="10" />
<property name="maxConcurrentConsumers" value="20" />
<property name="transactionManager" ref="rabbitTxManager" />
<property name="channelTransacted" value="true" />
</bean>
</beans>


Methods:



for (int i = 1; i <= 100; i++) {
pool.submit(new RunnableImpl(i));
}

public void run() {
LOG.info("thread send request {}", i);
Integer resp = (Integer) requestTemplate.convertSendAndReceive("directExchange", "directRequestKey", i);
if (resp == null) {
LOG.error("thread get null for request {}!!!", i);
} else {
LOG.info("thread get response {}", resp);
}
}

@RabbitListener(queues = "requestQueue")
public int receivedRequestMessage(int data, Message request) throws Exception {
LOG.info("receive request {}, {}", data, request);
LOG.info("sleep for 2 seconds");
Thread.sleep(2000);
int reply = data * 100;
LOG.info("sending response {}", reply);
return reply;
}

Aucun commentaire:

Enregistrer un commentaire