I have a integration context which does the following:
- Reads 100 rows from database and updates those rows status to
IN_PROGRESS
. - Hands them to task executor to call the rest api.
- Based on the API call success or failure it updates the individual row's status to
FAILED
orPROCESSED
. - I have defined the
threadPool=20
andmessageQueueCapacity=4000
This integration context is working fine. I have other 5 similar context which does same almost same thing. They are all loaded at once. I have a controlChannel
through which I can start/stop individual jdbc:inbound-channel-adapter
.
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://ift.tt/GArMu6"
xmlns:beans="http://ift.tt/GArMu6"
xmlns:context="http://ift.tt/GArMu7"
xmlns:int="http://ift.tt/1eF5VEE"
xmlns:int-http="http://ift.tt/1iKXRiK"
xmlns:int-jdbc="http://ift.tt/1JLVhvf" xmlns:jdbc="http://ift.tt/18IIlo0"
xmlns:task="http://ift.tt/LFBt0P"
xmlns:xsi="http://ift.tt/ra1lAU"
xsi:schemaLocation="http://ift.tt/1eF5VEE
http://ift.tt/1eF5VEQ
http://ift.tt/LFBt0P http://ift.tt/1k4aabm
http://ift.tt/1JLVhvf http://ift.tt/1JLViPH
http://ift.tt/GArMu6 http://ift.tt/1jdM0fG
http://ift.tt/GArMu7 http://ift.tt/1jdLYo7
http://ift.tt/1iKXRiK http://ift.tt/1pMDTvB">
<int:message-history />
<int-jdbc:inbound-channel-adapter id="matchCenterInboundJdbcAdapter" query="SELECT * FROM match_center WHERE processed_status = '' OR processed_status IS NULL LIMIT 100" channel="matchCenterJdbcChannel" data-source="dataSource" update="UPDATE match_center SET processed_status = 'IN_PROGRESS', date_processed = NOW() WHERE id IN (:id)" auto-startup="${jdbc.autostart}">
<int:poller fixed-rate="${pollerIntervalTime}" error-channel="matchCenterErrorChannel" />
</int-jdbc:inbound-channel-adapter>
<int:splitter input-channel="matchCenterJdbcChannel" output-channel="matchCenterQueueChannel" />
<int:channel id="matchCenterQueueChannel">
<int:dispatcher task-executor="matchCenterTaskExec" />
</int:channel>
<task:executor id="matchCenterTaskExec" pool-size="${threadPool}" queue-capacity="${messageQueueCapacity}" rejection-policy="CALLER_RUNS" />
<int:chain id="matchCenterChain" input-channel="matchCenterQueueChannel" output-channel="matchCenterApiChannel">
<int:header-enricher>
<int:header name="errorChannel" value="matchCenterErrorChannel" />
<int:header name="content-type" value="application/json" />
</int:header-enricher>
<int:service-activator ref="matchCenterService" method="processMasterData" />
</int:chain>
<int:channel id="matchCenterApiChannel" />
<int-http:outbound-gateway url="${addmatchCenterPath}" request-channel="matchCenterApiChannel" http-method="POST" expected-response-type="java.lang.String" reply-channel="matchCenterResponseChannel" request-factory="requestFactory">
<int-http:request-handler-advice-chain>
<int:retry-advice max-attempts="${apiMaxRetryAttempt}">
<int:fixed-back-off interval="${apiRetryInterval}" />
</int:retry-advice>
</int-http:request-handler-advice-chain>
</int-http:outbound-gateway>
<int:channel id="matchCenterResponseChannel" />
<int:router input-channel="matchCenterResponseChannel" default-output-channel="matchCenterErrorResponseChannel" resolution-required="false" expression="headers['#{T(org.springframework.integration.http.HttpHeaders).STATUS_CODE}'].toString()">
<int:mapping value="200" channel="matchCenterAddSuccessChannel" />
<int:mapping value="201" channel="matchCenterAddSuccessChannel" />
</int:router>
<int:channel id="matchCenterErrorResponseChannel" />
<int:service-activator input-channel="matchCenterErrorResponseChannel" ref="matchCenterService" method="processResponseError" output-channel="matchCenterUpdateChannel" />
<int:channel id="matchCenterUpdateChannel" />
<int-jdbc:outbound-channel-adapter channel="matchCenterUpdateChannel" query="UPDATE match_center SET processed_status = :processedStatus, processed_result = :message, date_processed = :dateProcessed WHERE id = :masterDataId" data-source="dataSource" sql-parameter-source-factory="matchCenterSpelSource" />
<bean id="matchCenterSpelSource" class="org.springframework.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory">
<property name="parameterExpressions">
<map>
<entry key="masterDataId" value="headers['masterDataId']" />
<entry key="message" value="headers['message']" />
<entry key="processedStatus" value="headers['processedStatus']" />
<entry key="dateProcessed" value="headers['dateProcessed']" />
</map>
</property>
</bean>
<int:channel id="matchCenterErrorChannel" />
<int:service-activator input-channel="matchCenterErrorChannel" ref="matchCenterService" method="processErrorMessage" output-channel="matchCenterUpdateChannel" />
<int:channel id="matchCenterAddSuccessChannel" />
<int:service-activator input-channel="matchCenterAddSuccessChannel" ref="matchCenterService" method="processSuccessMessage" output-channel="matchCenterUpdateChannel" />
</beans>
There are thousands of rows in each tables. When I start one jdbc:inbound-channel-adapter
it will work perfectly fine. But when I start another one, it causes the API hosting server to crash. The API sever is Spring based rest application hosted in tomcat server in different machine.
At first I get the read timeout
issue for some data. Then the API hosting server kills the tomcat because of out of memory error
. However, integration server is up and running it just updates FAILED status to each row because of 'connection timeout'.
Is it the threadPool that is causing the issue? At time, I have condition to start all the jdbc:inbound-channel-adapter
but seems like API server cannot handle. Also, is there any optimization we can achieve on integration part on above code?
Aucun commentaire:
Enregistrer un commentaire