10 Ağustos 2020 Pazartesi

ActiveMQ ActiveMQConnectionFactory Sınıfı

Giriş
Şu satırı dahil ederiz
import org.apache.activemq.ActiveMQConnectionFactory;
ConnectionFactory arayüzünü gerçekleştirir.

Maven
Örnek
Şu satırı dahil ederiz.
<dependency>
  <groupId>org.apache.activemq</groupId>
  <artifactId>activemq-broker</artifactId>
  <version>5.14.0-SNAPSHOT</version>
</dependency>
Örnek
Şu satırı dahil ederiz.
<dependency>
  <groupId>org.apache.activemq</groupId>
  <artifactId>activemq-all</artifactId>
  <version>5.10.2</version>
</dependency>

<dependency>
  <groupId>org.apache.activemq</groupId>
  <artifactId>activemq-http</artifactId>
  <version>5.10.2</version>
</dependency>
Virtual Topic
Producer "VirtualTopic.<name>" şeklinde isim kullanır.
Consumer "Consumer.<name>.VirtualTopic.<name>" şeklinde isim kullanır. Bu isimler active.xml'den değiştirilir. Varsayılan ayarla şöyledir
<destinationInterceptors>
  <virtualDestinationInterceptor>
    <virtualDestinations>
      <virtualTopic name="VirtualTopic.>" prefix="Consumer.*." selectorAware="false"/>
    </virtualDestinations>
  </virtualDestinationInterceptor>
</destinationInterceptors>
Açıklaması şöyle
While the producer sends messages to a topic, consumers will receive a copy of the message on their own dedicated queue.

Pros
Multiple consumers can get a message
Decoupling between producer and consumers (publish-and-subscribe pattern)
Messages can be recovered by putting them back on the queue

Cons
Might require additional configuration in the broker
Açıklaması şöyle
Virtual topics are a combination of topics and queues. Producers will write messages to a topic while listeners will consume from their own queue. ActiveMQ will copy and duplicate each message from the topic to the actual consumer queues.

This combination of topics and queues has some advantages over conventional topics:

- Even if a consumer is offline, no messages will be lost. ActiveMQ will copy all messages to the registered queues. As soon as a consumer comes back online, he can handle the messages stored on his queue.
- If a consumer cannot handle a message, it will put on a dead letter queue. The consumer can be fixed and the message can be put back on his own dedicated queue without affecting the other consumer (as a topic would do).
- We can register multiple instances of a consumer on a queue to implement a load balancing mechanism.

constructor - URL
Açıklaması şöyle. Altta netty kullanır.
The connection factory ActiveMQConnection uses netty to connect to ActiveMQ Artemis if the url has a tcp schema. When the application is stopped by tomcat, the shutdownGracefully is called on the netty group to stop all netty threads.
Örnek
Şöyle yaparız.
@Bean("connectionFactory")
public ConnectionFactory connectionFactory(JmsProperties appProperties) {
  ActiveMQConnectionFactory cf =
    new ActiveMQConnectionFactory(appProperties.getArtemis().getBrokerUrl());
  cf.setUser(appProperties.getArtemis().getUser());
  cf.setPassword(appProperties.getArtemis().getPassword());
  return cf;
}
constructor - boolean + TransportConfiguration
Örnek
Spring'de şöyle yaparız.
@Bean("connectionFactory")
public ConnectionFactory connectionFactory(TransportConfiguration
  transportConfiguration) throws JMSException {
  ActiveMQJMSConnectionFactory activeMQJMSConnectionFactory =
    new ActiveMQJMSConnectionFactory( false, transportConfiguration);
  activeMQJMSConnectionFactory.setPassword("admin");
  activeMQJMSConnectionFactory.setUser("admin");
  activeMQJMSConnectionFactory.setClientID("admin");
  return activeMQJMSConnectionFactory;
}
createConnection metodu
Şöyle yaparız
@PostConstruct
private void init() throws JMSException {
  Connection connection = activeMQConnectionFactory.createConnection();
  connection.start();
  Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  Queue queue = session.createQueue("test.queue");
  consumer = session.createConsumer(queue);

  new Thread(() -> {
    while (true) {
      try {
        Message receive = consumer.receiveNoWait();
        if (receive != null) {
          ...

        }
      } catch (Exception e) {
        ...
      }
    }
  }, "consumerThread").start();
}
getRedeliveryPolicyMap metodu
Örnek
Şöyle yaparız
public void initializeConsumer(String queueName, String topicAddress, int numOfRetry)
 throws JMSException {
  factory.getRedeliveryPolicyMap().put(new ActiveMQQueue("*." + queueName + ".>"),
    getRedeliveryPolicy(numOfRetry));
  Connection connection = factory.createConnection();
  connection.start();
  Session consumerSession = connection.createSession(false, CLIENT_ACKNOWLEDGE);
  Queue queue = consumerSession.createQueue("Consumer." + queueName +
            ".VirtualTopic." + topicAddress);
  MessageConsumer consumer = consumerSession.createConsumer(queue);
  consumer.setMessageListener(message -> {
    try {
      System.out.println("in listener --- " +
        ((ActiveMQDestination)message.getJMSDestination()).getPhysicalName());
      consumerSession.recover();
    } catch (JMSException e) {
      e.printStackTrace();
    }
  });
}

private RedeliveryPolicy getRedeliveryPolicy(int numOfRetry) {
  RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
  redeliveryPolicy.setInitialRedeliveryDelay(0);
  redeliveryPolicy.setMaximumRedeliveries(numOfRetry);
  redeliveryPolicy.setMaximumRedeliveryDelay(-1);
  redeliveryPolicy.setRedeliveryDelay(0);
  return redeliveryPolicy;
}
setAlwaysSyncSend metodu
Mesak senkron veya asenkron olarak gönderilebilir.

Örnek
Şöyle yaparız
String brokerUrl = ...
ActiveMQconnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);
factory.setUserName("admin");
factory.setPassword("password");
factory.setAlwaysSyncSend(false);
setBrokerUrl metodu
Örnek
Şöyle yaparız
"tcp://78.99.88.72:61616?jms.prefetchPolicy.all=2&
                         jms.useAsyncSend=true&
                         socketBufferSize=131072"
Örnek
Şöyle yaparız.
@Bean
public ConnectionFactory connectionFactory() {

  ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
  try {
    connectionFactory.setBrokerURL("failover:(tcp://activemq-01:61616,
      tcp://activemq-02:61616)?randomize=false&maxReconnectDelay=3000&
      maxReconnectAttempts=50");
    connectionFactory.setUserName("abcxyz");
    connectionFactory.setPassword("abcxyz");
  }catch(Exception e) {
    e.printStackTrace();
  }
  return connectionFactory;
}      


Hiç yorum yok:

Yorum Gönder