Giriş
Şu satırı dahil ederiz
Şu satırı dahil ederiz.
Şu satırı dahil ederiz.
Producer "VirtualTopic.<name>" şeklinde isim kullanır.
Consumer "Consumer.<name>.VirtualTopic.<name>" şeklinde isim kullanır. Bu isimler active.xml'den değiştirilir. Varsayılan ayarla şöyledir
constructor - URL
Açıklaması şöyle. Altta netty kullanır.
Şöyle yaparız.
Örnek
Spring'de şöyle yaparız.
Şöyle yaparız
Örnek
Şöyle yaparız
Mesak senkron veya asenkron olarak gönderilebilir.
Örnek
Şöyle yaparız
Örnek
Şöyle yaparız
Şöyle yaparız.
Şu satırı dahil ederiz
import org.apache.activemq.ActiveMQConnectionFactory;
ConnectionFactory arayüzünü gerçekleştirir.
Maven
ÖrnekMaven
Şu satırı dahil ederiz.
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-broker</artifactId>
<version>5.14.0-SNAPSHOT</version>
</dependency>
ÖrnekŞu satırı dahil ederiz.
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.10.2</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-http</artifactId>
<version>5.10.2</version>
</dependency>
Virtual TopicProducer "VirtualTopic.<name>" şeklinde isim kullanır.
Consumer "Consumer.<name>.VirtualTopic.<name>" şeklinde isim kullanır. Bu isimler active.xml'den değiştirilir. Varsayılan ayarla şöyledir
<destinationInterceptors>
<virtualDestinationInterceptor>
<virtualDestinations>
<virtualTopic name="VirtualTopic.>" prefix="Consumer.*." selectorAware="false"/>
</virtualDestinations>
</virtualDestinationInterceptor>
</destinationInterceptors>
Açıklaması şöyleWhile the producer sends messages to a topic, consumers will receive a copy of the message on their own dedicated queue.Açıklaması şöyle
Pros
Multiple consumers can get a message
Decoupling between producer and consumers (publish-and-subscribe pattern)
Messages can be recovered by putting them back on the queue
Cons
Might require additional configuration in the broker
Virtual topics are a combination of topics and queues. Producers will write messages to a topic while listeners will consume from their own queue. ActiveMQ will copy and duplicate each message from the topic to the actual consumer queues.
This combination of topics and queues has some advantages over conventional topics:
- Even if a consumer is offline, no messages will be lost. ActiveMQ will copy all messages to the registered queues. As soon as a consumer comes back online, he can handle the messages stored on his queue.
- If a consumer cannot handle a message, it will put on a dead letter queue. The consumer can be fixed and the message can be put back on his own dedicated queue without affecting the other consumer (as a topic would do).
- We can register multiple instances of a consumer on a queue to implement a load balancing mechanism.
constructor - URL
Açıklaması şöyle. Altta netty kullanır.
The connection factory ActiveMQConnection uses netty to connect to ActiveMQ Artemis if the url has a tcp schema. When the application is stopped by tomcat, the shutdownGracefully is called on the netty group to stop all netty threads.Örnek
Şöyle yaparız.
@Bean("connectionFactory")
public ConnectionFactory connectionFactory(JmsProperties appProperties) {
ActiveMQConnectionFactory cf =
new ActiveMQConnectionFactory(appProperties.getArtemis().getBrokerUrl());
cf.setUser(appProperties.getArtemis().getUser());
cf.setPassword(appProperties.getArtemis().getPassword());
return cf;
}
constructor - boolean + TransportConfigurationÖrnek
Spring'de şöyle yaparız.
@Bean("connectionFactory")
public ConnectionFactory connectionFactory(TransportConfiguration
transportConfiguration) throws JMSException {
ActiveMQJMSConnectionFactory activeMQJMSConnectionFactory =
new ActiveMQJMSConnectionFactory( false, transportConfiguration);
activeMQJMSConnectionFactory.setPassword("admin");
activeMQJMSConnectionFactory.setUser("admin");
activeMQJMSConnectionFactory.setClientID("admin");
return activeMQJMSConnectionFactory;
}
createConnection metoduŞöyle yaparız
@PostConstruct
private void init() throws JMSException {
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("test.queue");
consumer = session.createConsumer(queue);
new Thread(() -> {
while (true) {
try {
Message receive = consumer.receiveNoWait();
if (receive != null) {
...
}
} catch (Exception e) {
...
}
}
}, "consumerThread").start();
}
getRedeliveryPolicyMap metoduÖrnek
Şöyle yaparız
public void initializeConsumer(String queueName, String topicAddress, int numOfRetry)
throws JMSException {
factory.getRedeliveryPolicyMap().put(new ActiveMQQueue("*." + queueName + ".>"),
getRedeliveryPolicy(numOfRetry));
Connection connection = factory.createConnection();
connection.start();
Session consumerSession = connection.createSession(false, CLIENT_ACKNOWLEDGE);
Queue queue = consumerSession.createQueue("Consumer." + queueName +
".VirtualTopic." + topicAddress);
MessageConsumer consumer = consumerSession.createConsumer(queue);
consumer.setMessageListener(message -> {
try {
System.out.println("in listener --- " +
((ActiveMQDestination)message.getJMSDestination()).getPhysicalName());
consumerSession.recover();
} catch (JMSException e) {
e.printStackTrace();
}
});
}
private RedeliveryPolicy getRedeliveryPolicy(int numOfRetry) {
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setInitialRedeliveryDelay(0);
redeliveryPolicy.setMaximumRedeliveries(numOfRetry);
redeliveryPolicy.setMaximumRedeliveryDelay(-1);
redeliveryPolicy.setRedeliveryDelay(0);
return redeliveryPolicy;
}
setAlwaysSyncSend metoduMesak senkron veya asenkron olarak gönderilebilir.
Örnek
Şöyle yaparız
String brokerUrl = ...
ActiveMQconnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);
factory.setUserName("admin");
factory.setPassword("password");
factory.setAlwaysSyncSend(false);
setBrokerUrl metoduÖrnek
Şöyle yaparız
"tcp://78.99.88.72:61616?jms.prefetchPolicy.all=2&
jms.useAsyncSend=true&
socketBufferSize=131072"
ÖrnekŞöyle yaparız.
@Bean
public ConnectionFactory connectionFactory() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
try {
connectionFactory.setBrokerURL("failover:(tcp://activemq-01:61616,
tcp://activemq-02:61616)?randomize=false&maxReconnectDelay=3000&
maxReconnectAttempts=50");
connectionFactory.setUserName("abcxyz");
connectionFactory.setPassword("abcxyz");
}catch(Exception e) {
e.printStackTrace();
}
return connectionFactory;
}
Hiç yorum yok:
Yorum Gönder