Mule Aggregator Connector – DZone Integration

What is the aggregator link?

As the name implies, it means it totals/accumulates set of data. Aggregation can be further achieved in three different ways:

  1. Size-Based Aggregator
  2. Group-based viewer
  3. Time-Based Aggregator

Let’s start exploring each of these functions in detail with a demo.

1. Size-Based Aggregator

This enables you to aggregate the incoming request (full payload or any specified object) to the size specified in the assembler configuration.

For example, if you want to bind the incoming order in batch 3, we will set the size limit to 3 in the aggregator configuration. Let’s see the implementation now.

  • Create a stream and drag the HTTP Connector listener and size-based assembler into it.
  • The volume-based collector connector has two sections as follows:
    • incremental aggregation: This basically keeps track of every current incremental value.
    • Assembly completed: This section is run once the compilation condition is met, in this case, it is run Max size. In this, you can either use a stream reference or you can use an assembler listener for further implementation.
  • Below is the mule flow configuration.Size dependent renderer flow

    Size dependent renderer flow

    Viewer streaming listener

    Viewer streaming listener

  • Below is the connector configuration.Size-based renderer configuration

    Size-based renderer configuration

    Configure the assembler listener

    Configure the assembler listener

Now run the code and send 3 HTTP requests. In the console, you will see an aggregated output.

**Incremental Aggregation Output**

INFO  2022-01-24 14:02:07,202 [[MuleRuntime].uber.08: [aggregatordemo].aggregatordemoFlow.CPU_LITE @2f2eb4b3] [processor: aggregatordemoFlow/processors/0/route/0/processors/0; event: 1d95e200-7cf0-11ec-a0e1-00155df07a06] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: [TypedValue[value: '[B@63d6397f', dataType: 'SimpleDataType{type=[B, mimeType="application/json; charset=UTF-8"}']]
INFO  2022-01-24 14:02:11,755 [[MuleRuntime].uber.08: [aggregatordemo].aggregatordemoFlow.CPU_LITE @2f2eb4b3] [processor: aggregatordemoFlow/processors/0/route/0/processors/0; event: 204c7680-7cf0-11ec-a0e1-00155df07a06] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: [TypedValue[value: '[B@46c24667', dataType: 'SimpleDataType{type=[B, mimeType="application/json; charset=UTF-8"}'], TypedValue[value: '[B@bd58c3c', dataType: 'SimpleDataType{type=[B, mimeType="application/json; charset=UTF-8"}']]
INFO  2022-01-24 14:02:15,460 [[MuleRuntime].uber.08: [aggregatordemo].aggregatordemoFlow.CPU_LITE @2f2eb4b3] [processor: aggregatordemoFlow/processors/0/route/1/processors/0; event: 2281a600-7cf0-11ec-a0e1-00155df07a06] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: [TypedValue[value: '[B@4a986e9', dataType: 'SimpleDataType{type=[B, mimeType="application/json; charset=UTF-8"}'], TypedValue[value: '[B@6326951f', dataType: 'SimpleDataType{type=[B, mimeType="application/json; charset=UTF-8"}'], TypedValue[value: '[B@b79f9dd', dataType: 'SimpleDataType{type=[B, mimeType="application/json; charset=UTF-8"}']]

**Aggregation Complete Output**

INFO  2022-01-24 14:02:15,462 [[MuleRuntime].uber.05: [aggregatordemo].aggregatordemoFlow1.CPU_INTENSIVE @58aef977] [processor: aggregatordemoFlow1/processors/1; event: 1787be10-7cf0-11ec-a0e1-00155df07a06] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: [
  {
    "Text": 1
  },
  {
    "Text": 2
  },
  {
    "Text": 3
  }
]

2. Group-based assembler

This enables you to group data (full payload or any specified element) into groups based on a file group id The batch size specified in the configuration.

For example, if you want to group employees based on their status as active or inactive, we can take advantage of that. Let’s see the implementation now.

  • Create a stream and drag the HTTP Connector listener and the group based assembler into it.
  • Create now Name, group ID, and group size In group-based assembler connector as your requirement.
  • Evacuation time Is the time until it remembers the group ID, the default is 180 seconds.
  • The rest of the configuration remains the same as above. Group-based renderer configuration

    Group-based renderer configuration

Now run the code and submit Officer Data with status as Active And Inactive And you can see the output is grouped with the respective states.

**Incremental Aggregation Output**

INFO  2022-01-24 14:50:37,833 [[MuleRuntime].uber.05: [aggregatordemo].aggregatordemoFlow.CPU_LITE @689874a9] [processor: aggregatordemoFlow/processors/0/route/0/processors/0; event: e4742a20-7cf6-11ec-b9da-00155df07a06] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: 
org.mule.runtime.core.internal.message.DefaultMessageBuilder$MessageImplementation
{
  payload=[TypedValue[value: '[B@4e7a5601', dataType: 'SimpleDataType{type=[B, mimeType="application/json; charset=UTF-8"}']]
  mediaType=*/*
  attributes=org.mule.extension.aggregator.api.AggregationAttributes@217e7427
  attributesMediaType=*/*
}
INFO  2022-01-24 14:50:44,767 [[MuleRuntime].uber.05: [aggregatordemo].aggregatordemoFlow.CPU_LITE @689874a9] [processor: aggregatordemoFlow/processors/0/route/0/processors/0; event: e89683a0-7cf6-11ec-b9da-00155df07a06] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: 
org.mule.runtime.core.internal.message.DefaultMessageBuilder$MessageImplementation
{
  payload=[TypedValue[value: '[B@51e94f99', dataType: 'SimpleDataType{type=[B, mimeType="application/json; charset=UTF-8"}'], TypedValue[value: '[B@2e7fcd2d', dataType: 'SimpleDataType{type=[B, mimeType="application/json; charset=UTF-8"}']]
  mediaType=*/*
  attributes=org.mule.extension.aggregator.api.AggregationAttributes@2064cc7e
  attributesMediaType=*/*
}
INFO  2022-01-24 14:50:50,556 [[MuleRuntime].uber.05: [aggregatordemo].aggregatordemoFlow.CPU_LITE @689874a9] [processor: aggregatordemoFlow/processors/0/route/1/processors/0; event: ec09ff80-7cf6-11ec-b9da-00155df07a06] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: 
org.mule.runtime.core.internal.message.DefaultMessageBuilder$MessageImplementation
{
  payload=[TypedValue[value: '[B@4b29977e', dataType: 'SimpleDataType{type=[B, mimeType="application/json; charset=UTF-8"}'], TypedValue[value: '[B@3e61fe9e', dataType: 'SimpleDataType{type=[B, mimeType="application/json; charset=UTF-8"}'], TypedValue[value: '[B@3e104949', dataType: 'SimpleDataType{type=[B, mimeType="application/json; charset=UTF-8"}']]
  mediaType=*/*
  attributes=org.mule.extension.aggregator.api.AggregationAttributes@623f8ee0
  attributesMediaType=*/*
}

**Aggregation Complete Output**

INFO  2022-01-24 14:50:50,561 [[MuleRuntime].uber.03: [aggregatordemo].aggregatordemoFlow1.CPU_INTENSIVE @61e80f72] [processor: aggregatordemoFlow1/processors/1; event: Active] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: [
  {
    "name": "ABC",
    "status": "Active"
  },
  {
    "name": "MNO",
    "status": "Active"
  },
  {
    "name": "XYZ",
    "status": "Active"
  }
]

Since we have set the evacuation time to 180 seconds, we cannot submit another request with the status active until 180 seconds have expired. You will receive the error below in the console.

ERROR 2022-01-24 14:52:03,275 [[MuleRuntime].uber.05: [aggregatordemo].aggregatordemoFlow.CPU_LITE @689874a9] [processor: aggregatordemoFlow/processors/0; event: 1761bb50-7cf7-11ec-b9da-00155df07a06] org.mule.runtime.core.internal.exception.OnErrorPropagateHandler: 
********************************************************************************
Message               : Trying to aggregate a new element to the group with id: Active ,but it's already complete
Element               : aggregatordemoFlow/processors/0 @ aggregatordemo:aggregatordemo.xml:14 (Group based aggregator)
Element DSL           : <aggregators:group-based-aggregator doc:name="Group based aggregator" doc:id="dee44800-5b56-4f64-bb0a-ec2336df6334" name="groupAggregator" groupId="#[payload.status]" groupSize="3">
<aggregators:incremental-aggregation>
<logger level="INFO" doc:name="Logger" doc:id="f05a8637-8422-4764-b58b-f81e2b6455ed"></logger>
</aggregators:incremental-aggregation>
<aggregators:aggregation-complete>
<logger level="INFO" doc:name="Logger" doc:id="b99e2e44-21ff-4d83-a00f-4c6a94533b4f"></logger>
</aggregators:aggregation-complete>
</aggregators:group-based-aggregator>
Error type            : AGGREGATORS:GROUP_COMPLETED
FlowStack             : at aggregatordemoFlow(aggregatordemoFlow/processors/0 @ aggregatordemo:aggregatordemo.xml:14 (Group based aggregator))

  (set debug level logging or '-Dmule.verbose.exceptions=true' for everything)
********************************************************************************

3. Time-Based Aggregator

This enables you to aggregate the request (full payload or any specified item) for the time period specified in the assembler configuration.

For example, if you want to bind all incoming requests for a time period of 30 seconds, you need to set this time period in a time-based aggregator configuration. You can also limit the size by setting the maximum size. Let’s see the implementation now.

  • Create a stream and drag the HTTP listener and the time-based collector into it.
  • Create now Time period and maximum size In the time-based assembler connector as per your requirement.
  • The rest of the configuration remains the same as above. Time-based renderer configuration

    Time-based renderer configuration

Now run the code and send 2-3 requests within the given time and then you can see the combined response in the output.

INFO  2022-01-24 15:28:20,908 [[MuleRuntime].uber.03: [aggregatordemo].aggregatordemoFlow.CPU_LITE @61cf581] [processor: aggregatordemoFlow/processors/0/route/0/processors/0; event: 294037c0-7cfc-11ec-91f8-00155df07a06] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: [TypedValue[value: '[B@502dd49e', dataType: 'SimpleDataType{type=[B, mimeType="application/json; charset=UTF-8"}']]
INFO  2022-01-24 15:28:21,873 [[MuleRuntime].uber.03: [aggregatordemo].aggregatordemoFlow.CPU_LITE @61cf581] [processor: aggregatordemoFlow/processors/0/route/0/processors/0; event: 29ed8ec0-7cfc-11ec-91f8-00155df07a06] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: [TypedValue[value: '[B@7d50b4f4', dataType: 'SimpleDataType{type=[B, mimeType="application/json; charset=UTF-8"}'], TypedValue[value: '[B@42282847', dataType: 'SimpleDataType{type=[B, mimeType="application/json; charset=UTF-8"}']]
INFO  2022-01-24 15:28:22,859 [[MuleRuntime].uber.03: [aggregatordemo].aggregatordemoFlow.CPU_LITE @61cf581] [processor: aggregatordemoFlow/processors/0/route/0/processors/0; event: 2a83db50-7cfc-11ec-91f8-00155df07a06] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: [TypedValue[value: '[B@56bd5e36', dataType: 'SimpleDataType{type=[B, mimeType="application/json; charset=UTF-8"}'], TypedValue[value: '[B@c2bfde6', dataType: 'SimpleDataType{type=[B, mimeType="application/json; charset=UTF-8"}'], TypedValue[value: '[B@4e655dbd', dataType: 'SimpleDataType{type=[B, mimeType="application/json; charset=UTF-8"}']]
INFO  2022-01-24 15:28:40,878 [[MuleRuntime].uber.05: [aggregatordemo].aggregatordemoFlow1.CPU_INTENSIVE @31dd2e47] [processor: aggregatordemoFlow1/processors/1; event: 294dcc50-7cfc-11ec-91f8-00155df07a06] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: [
  {
    "Text": 1
  },
  {
    "Text": 1
  },
  {
    "Text": 1
  }
]

.

Leave a Comment