Link to umbrella project: >> System's Configuration Management <<
SCM Message Broker is a service that performs function of gathering messages from system service instances and broadcast it to subscribers. Service use SCM messaging and coordination components to communicate with external components of the system.
Message Broker is Spring Boot application and disctributed as jar file. It placed in jcenter()
repository:
http://jcenter.bintray.com/org/ametiste/scm/scm-message-broker/
Also releases stored on github or you can build artifact by itself from sources.
Installation process include next steps:
- Get a ready-to-deploy jar file:
- download ready artifact from one of available sources;
- build artifact from sources:
project based on Gradle, so to build execute next command from project root directory:gradle build
- Deploy jar file to the target server.
- Configure properties for Broker (see here).
- Start service by executing:
java -jar dph-scm-message-broker-[version].jar
.
- JDK 1.8 or higher.
Application has few sets of properties separated by functional modules.
Name | Type | Description | Default |
---|---|---|---|
org.ametiste.scm.broker.group.timeout |
integer | Group expiration timeout (in milliseconds). Used to define maximum time to collect list of messages by aggregate. |
250 |
org.ametiste.scm.broker.group.threshold |
integer | Maximum group size. When group is filled aggregate push group and start gather next. |
200 |
org.ametiste.scm.broker.broadcast.timeout |
integer | Time delay between broadcasting operations (in milliseconds). | 30000 |
Name | Type | Description | Default |
---|---|---|---|
org.ametiste.scm.broker.sender.client.connect-timeout |
integer | Connection timeout for HTTP client (in milliseconds) | 1000 |
org.ametiste.scm.broker.sender.client.read-timeout |
integer | Read timeout for HTTP client (in milliseconds) | 1000 |
Note: if parameters not defined default values will be used.
Name | Type | Description | Default |
---|---|---|---|
org.ametiste.scm.broker.sender.retry.maxAttempts |
int | Maximum number of attempts. | 5 |
org.ametiste.scm.broker.sender.retry.interval |
int | Interval between retry attempts (in milliseconds). | 1000 |
org.ametiste.scm.broker.sender.retry.exponentialBackOff |
boolean | Enables exponential backoff policy. | false |
org.ametiste.scm.broker.sender.retry.maxInterval |
int | Maximum interval between attempts in case when {@literal exponentialBackOff} is enabled (in milliseconds). | 30000 |
org.ametiste.scm.broker.sender.retry.multiplier |
double | Multiplier for {@literal exponentialBackOff} policy. | 3.0 |
Service use ActiveMQ for storing queues of messages and has next properties:
Name | Type | Description | Default |
---|---|---|---|
org.ametiste.scm.broker.amq.queue-name.raw-event |
string | Name of queue for raw events received from system instances. | queue.raw |
org.ametiste.scm.broker.amq.queue-name.aggregated-event |
string | Name of queue for aggregated lists of messages. | queue.aggregated |
org.ametiste.scm.broker.amq.broker-url |
URL | Configuration URL for ActiveMQ broker (more info). | vm://localhost |
org.ametiste.scm.broker.amq.username |
string | Username for access to ActiveMQ broker. | |
org.ametiste.scm.broker.amq.password |
string | Password for access to ActiveMQ broker. | |
org.ametiste.scm.broker.amq.redelivery.maxRedeliveries |
integer | Number of redelivery retries. | -1 (infinite redelivery) |
org.ametiste.scm.broker.amq.redelivery.initialDelay |
integer | Initial redelivery delay and increase step (if exponential backoff disabled (in milliseconds). | 1000 |
org.ametiste.scm.broker.amq.redelivery.maxDelay |
integer | Maximum value of redelivery delay (in milliseconds). | 30000 |
Broker use SCM SubscribersFetcher configuration from Scm Coordinator Library and require define properties for it:
Name | Type | Description | Default |
---|---|---|---|
spring.application.name |
string | Name of application. | scm-message-broker |
eureka.client.serviceUrl.defaultZone |
URL | URL to Eureka Server instance. |
http://localhost:8761/eureka/ |
You can provide and other spring cloud eureka properties for discovering client.
For receiving and sending transport messages with event payload used SCM Messaging Library.
For fetching subscribers used SCM Coordinator Library. Read more about it here.
This library requires use SCM Coordinator Library for subscribing to event messages. Or interested service must register in Eureka server with required metadata independently.
All messages received by broker published with ApplicationEventPublisher. In broker configuratuion (EventProcessingConfiguration.class
) defines @EventListener that catch message and pass it to EventMessageProcessor interface.
All processing of messages defined as Spring Integration Flow:
Flow:
- EventListener pass message to EventMessageProcessorGateway and store in JMS channel.
- After broadcasting timeout stored messages grep from channel and aggregated to packets of messages.
- Packets stores in other JMS channel (message driven).
- Service Activator grep packets from channel and send it to subscribers.
Aggregator group messages to packets by number that defined in property com.dph.scm.broker.group.threshold
. If group not full and group timeout expired aggregator push partial packet to channel. It provides safe of messages in time when they present in service.
Jms channels backed with embedded ActiveMQ broker with persistence.
Service Activator execution is transnational and all messages rollback to queue if service failed to send it. It save messages in case when no one subscriber is present or service fail send packet to all subscribers.
Broadcasting executed by implementation of MessageBroadcaster interface. This component uses fetcher and sender component for provide own logic.
Class diagram placed below: