samedi 28 février 2015

File inbound-channel-adapter spring integration for Multiple Files aggregation into one master File for Job processing

I have written a code to combined multiple files into one single Master file. The issue is with int-transformer where I am getting one file at a time although I have aggregated List of File in composite Filter of File inbound-channel-adapter. The List of File size in composite filter is correct but in Transformer bean the List of File size is always one and not getting the correct list size aggregated file by the filter.


Here is my config:



<!-- Auto Wiring -->
<context:component-scan base-package="com.nt.na21.nam.integration.*" />
<!-- intercept and log every message -->
<int:logging-channel-adapter id="logger"
level="DEBUG" />
<int:wire-tap channel="logger" />

<!-- Aggregating the processed Output for OSS processing -->

<int:channel id="networkData" />
<int:channel id="requests" />

<int-file:inbound-channel-adapter id="pollProcessedNetworkData"
directory="file:${processing.files.directory}" filter="compositeProcessedFileFilter"
channel="networkData">
<int:poller default="true" cron="*/20 * * * * *" />

</int-file:inbound-channel-adapter>

<bean id="compositeProcessedFileFilter"
class="com.nt.na21.nam.integration.file.filter.CompositeFileListFilterForBaseLine" />

<int:transformer id="aggregateNetworkData"
input-channel="networkData" output-channel="requests">
<bean id="networkData" class="com.nt.na21.nam.integration.helper.CSVFileAggregator">
</bean>
</int:transformer>


CompositeFileListFilterForBaseLine:


public class CompositeFileListFilterForBaseLine implements FileListFilter {



private final static Logger LOG = Logger
.getLogger(CompositeFileListFilterForBaseLine.class);

@Override
public List<File> filterFiles(File[] files) {
List<File> filteredFile = new ArrayList<File>();
int index;
String fetchedFileName = null;
String fileCreatedDate = null;
String todayDate = DateHelper.toddMM(new Date());
LOG.debug("Date - dd-MM: " + todayDate);

for (File f : files) {
fetchedFileName = StringUtils.removeEnd(f.getName(), ".csv");
index = fetchedFileName.indexOf("_");

// Add plus one to index to skip underscore
fileCreatedDate = fetchedFileName.substring(index + 1);
// Format the created file date
fileCreatedDate = DateHelper.formatFileNameDateForAggregation(fileCreatedDate);
LOG.debug("file created date: " + fileCreatedDate + " today Date: "
+ todayDate);
if (fileCreatedDate.equalsIgnoreCase(todayDate)) {
filteredFile.add(f);
LOG.debug("File added to List of File: " + f.getAbsolutePath());
}
}
LOG.debug("SIZE: " + filteredFile.size());
LOG.debug("filterFiles method end.");
return filteredFile;
}


}


The Class file for CSVFileAggregator public class CSVFileAggregator {



private final static Logger LOG = Logger.getLogger(CSVFileAggregator.class);

private int snePostion;

protected String masterFileSourcePath=null;

public File handleAggregateFiles(List<File> files) throws IOException {
LOG.debug("materFileSourcePath: " + masterFileSourcePath);
LinkedHashSet<String> allAttributes = null;
Map<String, LinkedHashSet<String>> allAttrBase = null;
Map<String, LinkedHashSet<String>> allAttrDelta = null;
LOG.info("Aggregator releasing [" + files.size() + "] files");
}


}


Log Output:


INFO : com.nt.na21.nam.integration.aggregator.NetFileAggregatorClient - NetFileAggregator context initialized. Polling input folder... INFO : com.nt.na21.nam.integration.aggregator.NetFileAggregatorClient - Input directory is: D:\Projects\csv\processing DEBUG: com.nt.na21.nam.integration.file.filter.CompositeFileListFilterForBaseLine - Date - dd-MM: 0103 DEBUG: com.nt.na21.nam.integration.file.filter.CompositeFileListFilterForBaseLine - file created date: 0103 today Date: 0103 DEBUG: com.nt.na21.nam.integration.file.filter.CompositeFileListFilterForBaseLine - File added to List of File: D:\Projects\NA21\NAMworkspace\na21_nam_integration\csv\processing\file1_base_0103.csv DEBUG: com.nt.na21.nam.integration.file.filter.CompositeFileListFilterForBaseLine - file created date: 0103 today Date: 0103 DEBUG: com.nt.na21.nam.integration.file.filter.CompositeFileListFilterForBaseLine - File added to List of File: D:\Projects\NA21\NAMworkspace\na21_nam_integration\csv\processing\file2_base_0103.csv DEBUG: com.nt.na21.nam.integration.file.filter.CompositeFileListFilterForBaseLine - SIZE: 2 DEBUG: com.nt.na21.nam.integration.file.filter.CompositeFileListFilterForBaseLine - filterFiles method end. DEBUG: org.springframework.integration.file.FileReadingMessageSource - Added to queue: [csv\processing\file1_base_0103.csv, csv\processing\file2_base_0103.csv] INFO : org.springframework.integration.file.FileReadingMessageSource - Created message: [GenericMessage [payload=csv\processing\file2_base_0103.csv, headers={timestamp=1425158920029, id=cb3c8505-0ee5-7476-5b06-01d14380e24a}]] DEBUG: org.springframework.integration.endpoint.SourcePollingChannelAdapter - Poll resulted in Message: GenericMessage [payload=csv\processing\file2_base_0103.csv, headers={timestamp=1425158920029, id=cb3c8505-0ee5-7476-5b06-01d14380e24a}] DEBUG: org.springframework.integration.channel.DirectChannel - preSend on channel 'networkData', message: GenericMessage [payload=csv\processing\file2_base_0103.csv, headers={timestamp=1425158920029, id=cb3c8505-0ee5-7476-5b06-01d14380e24a}] DEBUG: org.springframework.integration.handler.LoggingHandler - org.springframework.integration.handler.LoggingHandler#0 received message: GenericMessage [payload=csv\processing\file2_base_0103.csv, headers={timestamp=1425158920029, id=cb3c8505-0ee5-7476-5b06-01d14380e24a}] DEBUG: org.springframework.integration.handler.LoggingHandler - csv\processing\file2_base_0103.csv DEBUG: org.springframework.integration.channel.DirectChannel - postSend (sent=true) on channel 'logger', message: GenericMessage [payload=csv\processing\file2_base_0103.csv, headers={timestamp=1425158920029, id=cb3c8505-0ee5-7476-5b06-01d14380e24a}] DEBUG: org.springframework.integration.transformer.MessageTransformingHandler - org.springframework.integration.transformer.MessageTransformingHandler@606f8b2b received message: GenericMessage [payload=csv\processing\file2_base_0103.csv, headers={timestamp=1425158920029, id=cb3c8505-0ee5-7476-5b06-01d14380e24a}] DEBUG: com.nt.na21.nam.integration.helper.CSVFileAggregator - materFileSourcePath: null INFO : com.nt.na21.nam.integration.helper.CSVFileAggregator - Aggregator releasing [1] files


Can some one help me here in identifying the issue with Filter and same is not collecting for transformation?


Thanks in advance.


Aucun commentaire:

Enregistrer un commentaire