mercredi 25 février 2015

Spring Integration: API call causes server crash

I have a integration context which does the following:



  • Reads 100 rows from database and updates those rows status to IN_PROGRESS.

  • Hands them to task executor to call the rest api.

  • Based on the API call success or failure it updates the individual row's status to FAILED or PROCESSED.

  • I have defined the threadPool=20 and messageQueueCapacity=4000


This integration context is working fine. I have other 5 similar context which does same almost same thing. They are all loaded at once. I have a controlChannel through which I can start/stop individual jdbc:inbound-channel-adapter.



<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://ift.tt/GArMu6"
xmlns:beans="http://ift.tt/GArMu6"
xmlns:context="http://ift.tt/GArMu7"
xmlns:int="http://ift.tt/1eF5VEE"
xmlns:int-http="http://ift.tt/1iKXRiK"
xmlns:int-jdbc="http://ift.tt/1JLVhvf" xmlns:jdbc="http://ift.tt/18IIlo0"
xmlns:task="http://ift.tt/LFBt0P"
xmlns:xsi="http://ift.tt/ra1lAU"
xsi:schemaLocation="http://ift.tt/1eF5VEE
http://ift.tt/1eF5VEQ
http://ift.tt/LFBt0P http://ift.tt/1k4aabm
http://ift.tt/1JLVhvf http://ift.tt/1JLViPH
http://ift.tt/GArMu6 http://ift.tt/1jdM0fG
http://ift.tt/GArMu7 http://ift.tt/1jdLYo7
http://ift.tt/1iKXRiK http://ift.tt/1pMDTvB">

<int:message-history />

<int-jdbc:inbound-channel-adapter id="matchCenterInboundJdbcAdapter" query="SELECT * FROM match_center WHERE processed_status = '' OR processed_status IS NULL LIMIT 100" channel="matchCenterJdbcChannel" data-source="dataSource" update="UPDATE match_center SET processed_status = 'IN_PROGRESS', date_processed = NOW() WHERE id IN (:id)" auto-startup="${jdbc.autostart}">
<int:poller fixed-rate="${pollerIntervalTime}" error-channel="matchCenterErrorChannel" />

</int-jdbc:inbound-channel-adapter>
<int:splitter input-channel="matchCenterJdbcChannel" output-channel="matchCenterQueueChannel" />

<int:channel id="matchCenterQueueChannel">
<int:dispatcher task-executor="matchCenterTaskExec" />
</int:channel>
<task:executor id="matchCenterTaskExec" pool-size="${threadPool}" queue-capacity="${messageQueueCapacity}" rejection-policy="CALLER_RUNS" />

<int:chain id="matchCenterChain" input-channel="matchCenterQueueChannel" output-channel="matchCenterApiChannel">
<int:header-enricher>
<int:header name="errorChannel" value="matchCenterErrorChannel" />
<int:header name="content-type" value="application/json" />
</int:header-enricher>
<int:service-activator ref="matchCenterService" method="processMasterData" />
</int:chain>

<int:channel id="matchCenterApiChannel" />
<int-http:outbound-gateway url="${addmatchCenterPath}" request-channel="matchCenterApiChannel" http-method="POST" expected-response-type="java.lang.String" reply-channel="matchCenterResponseChannel" request-factory="requestFactory">
<int-http:request-handler-advice-chain>
<int:retry-advice max-attempts="${apiMaxRetryAttempt}">
<int:fixed-back-off interval="${apiRetryInterval}" />
</int:retry-advice>
</int-http:request-handler-advice-chain>
</int-http:outbound-gateway>
<int:channel id="matchCenterResponseChannel" />

<int:router input-channel="matchCenterResponseChannel" default-output-channel="matchCenterErrorResponseChannel" resolution-required="false" expression="headers['#{T(org.springframework.integration.http.HttpHeaders).STATUS_CODE}'].toString()">
<int:mapping value="200" channel="matchCenterAddSuccessChannel" />
<int:mapping value="201" channel="matchCenterAddSuccessChannel" />
</int:router>

<int:channel id="matchCenterErrorResponseChannel" />
<int:service-activator input-channel="matchCenterErrorResponseChannel" ref="matchCenterService" method="processResponseError" output-channel="matchCenterUpdateChannel" />

<int:channel id="matchCenterUpdateChannel" />
<int-jdbc:outbound-channel-adapter channel="matchCenterUpdateChannel" query="UPDATE match_center SET processed_status = :processedStatus, processed_result = :message, date_processed = :dateProcessed WHERE id = :masterDataId" data-source="dataSource" sql-parameter-source-factory="matchCenterSpelSource" />
<bean id="matchCenterSpelSource" class="org.springframework.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory">
<property name="parameterExpressions">
<map>
<entry key="masterDataId" value="headers['masterDataId']" />
<entry key="message" value="headers['message']" />
<entry key="processedStatus" value="headers['processedStatus']" />
<entry key="dateProcessed" value="headers['dateProcessed']" />
</map>
</property>
</bean>

<int:channel id="matchCenterErrorChannel" />
<int:service-activator input-channel="matchCenterErrorChannel" ref="matchCenterService" method="processErrorMessage" output-channel="matchCenterUpdateChannel" />

<int:channel id="matchCenterAddSuccessChannel" />
<int:service-activator input-channel="matchCenterAddSuccessChannel" ref="matchCenterService" method="processSuccessMessage" output-channel="matchCenterUpdateChannel" />
</beans>


There are thousands of rows in each tables. When I start one jdbc:inbound-channel-adapter it will work perfectly fine. But when I start another one, it causes the API hosting server to crash. The API sever is Spring based rest application hosted in tomcat server in different machine.


At first I get the read timeout issue for some data. Then the API hosting server kills the tomcat because of out of memory error. However, integration server is up and running it just updates FAILED status to each row because of 'connection timeout'.


Is it the threadPool that is causing the issue? At time, I have condition to start all the jdbc:inbound-channel-adapter but seems like API server cannot handle. Also, is there any optimization we can achieve on integration part on above code?


Aucun commentaire:

Enregistrer un commentaire