24 Ağustos 2020 Pazartesi

ActiveMQ activemq.xml Dosyası

schedulerSupport Alanı
Açıklaması şöyle
By default scheduled message support is disabled. We must enabled it in the broker XML configuration file.
Örnek
Şöyle yaparız
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost"
   dataDirectory="${activemq.data}" schedulerSupport="true">
Mesaja şöyle yaparız
message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, scheduledDelay);
Şöyle yaparız
public void send(Object object) {

  jmsTemplate.convertAndSend(QUEUE_NAME, object, m -> {
    m.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 10000);
    return m;
  });
}

17 Ağustos 2020 Pazartesi

Camel Splitter

Örnek
Şöyle yaparız
from("timer://poll?period=10000").process(new Processor(){
  public void process(Exchange exchange){
    ArrayList<String> list = new ArrayList<>();
    list.add("one");
    list.add("two");
    list.add("three");
    exchange.getIn().setBody(list, ArrayList.class);
  }
})
.split(body())
.log(body().toString())
.to("file:some/dir?fileName=splitted-${id}");

12 Ağustos 2020 Çarşamba

Camel DirectComponent Sınıfı

Giriş
DirectComponent bir çok farklı kanalın birleşme noktası gibi düşünülebilir. Farklı kanallardan gelen girdiler, DirectComponent'e gönderilir. DirectComponent'i dinleyen bir başka kanal da veriyi örneğin bir kuyruğa veya veri tabanına yazar. Açıklaması şöyle
The direct: component provides direct, synchronous invocation of any consumers when a producer sends a message exchange.
Bu bileşen bir thread pool kullanmaz. Açıklaması şöyle
Another difference is Direct component doesn't has any thread pool, the direct consumer process method is invoked by the calling thread of direct producer.
DirecktComponent'e normalde sadece to() ile consumer'lar bağlanır. DirectComponent'i besleyen/tetikleyen kodlar genelde başka yerdedir

Örnek - Log + Consumer
Şöyle yaparız
import org.apache.camel.builder.RouteBuilder;

public class SampleDirectRoute extends RouteBuilder {

  @Override
  public void configure() throws Exception {

    from("direct:sampleInput")
      .log("Received Message is ${body} and headers are ${headers}")
      .to("file:sampleOutput?fileName=output.txt")
    .end();
  }
}
Kullanım Örnekleri

Örnek
- Bir SpringBoot RestController DirectComponent'a select çağrısı gönderir. DirectComponent bir sql cümlesi oluşturur ve JDBCComponent'e gönderir ve cevabı döner.
- SpringBoot RestController DirectComponent'a insert çağrısı gönderir. DirectComponent bir sql cümlesi oluşturur ve JDBCComponent'e gönderir ve cevabı döner.

Örnek
Şöyle yaparız. Activemq'dan okuma yapıp, OrderServer.validate() metoduna gönderir. Bu metod da processOrder kanalına gönderir. Bu kanalı dinleyen OrderService.process() çıktısını Activemq'ya gönderir.
from("activemq:queue:order.in")
    .to("bean:orderServer?method=validate")
    .to("direct:processOrder");

from("direct:processOrder")
    .to("bean:orderService?method=process")
    .to("activemq:queue:order.out");
Örnek
Şöyle yaparız.
// Three endpoints to one "main" route.
from("activemq:queue:order.in")
  .to("direct:processOrder");

from("file:some/file/path")
  .to("direct:processOrder");

from("jetty:http://0.0.0.0/order/in")
  .to("direct:processOrder");

from("direct:processOrder")
  .to("bean:orderService?method=process")
  .to("activemq:queue:order.out");
marshal metodu
Şöyle yaparız
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.jackson.JacksonDataFormat;
import org.springframework.stereotype.Component;

import com.javainuse.model.Employee;


@Component
public class RabbitMQRoute extends RouteBuilder {

  @Override
  public void configure() throws Exception {

    JacksonDataFormat jsonDataFormat = new JacksonDataFormat(Employee.class);

    from("direct:startQueuePoint").id("idOfQueueHere").marshal(jsonDataFormat)
    .to("rabbitmq://localhost:5672/javainuse.exchange?queue=javainuse.queue
         &autoDelete=false")
    .end();
  }
}

10 Ağustos 2020 Pazartesi

ActiveMQ ActiveMQConnectionFactory Sınıfı

Giriş
Şu satırı dahil ederiz
import org.apache.activemq.ActiveMQConnectionFactory;
ConnectionFactory arayüzünü gerçekleştirir.

Maven
Örnek
Şu satırı dahil ederiz.
<dependency>
  <groupId>org.apache.activemq</groupId>
  <artifactId>activemq-broker</artifactId>
  <version>5.14.0-SNAPSHOT</version>
</dependency>
Örnek
Şu satırı dahil ederiz.
<dependency>
  <groupId>org.apache.activemq</groupId>
  <artifactId>activemq-all</artifactId>
  <version>5.10.2</version>
</dependency>

<dependency>
  <groupId>org.apache.activemq</groupId>
  <artifactId>activemq-http</artifactId>
  <version>5.10.2</version>
</dependency>
Virtual Topic
Producer "VirtualTopic.<name>" şeklinde isim kullanır.
Consumer "Consumer.<name>.VirtualTopic.<name>" şeklinde isim kullanır. Bu isimler active.xml'den değiştirilir. Varsayılan ayarla şöyledir
<destinationInterceptors>
  <virtualDestinationInterceptor>
    <virtualDestinations>
      <virtualTopic name="VirtualTopic.>" prefix="Consumer.*." selectorAware="false"/>
    </virtualDestinations>
  </virtualDestinationInterceptor>
</destinationInterceptors>
Açıklaması şöyle
While the producer sends messages to a topic, consumers will receive a copy of the message on their own dedicated queue.

Pros
Multiple consumers can get a message
Decoupling between producer and consumers (publish-and-subscribe pattern)
Messages can be recovered by putting them back on the queue

Cons
Might require additional configuration in the broker
Açıklaması şöyle
Virtual topics are a combination of topics and queues. Producers will write messages to a topic while listeners will consume from their own queue. ActiveMQ will copy and duplicate each message from the topic to the actual consumer queues.

This combination of topics and queues has some advantages over conventional topics:

- Even if a consumer is offline, no messages will be lost. ActiveMQ will copy all messages to the registered queues. As soon as a consumer comes back online, he can handle the messages stored on his queue.
- If a consumer cannot handle a message, it will put on a dead letter queue. The consumer can be fixed and the message can be put back on his own dedicated queue without affecting the other consumer (as a topic would do).
- We can register multiple instances of a consumer on a queue to implement a load balancing mechanism.

constructor - URL
Açıklaması şöyle. Altta netty kullanır.
The connection factory ActiveMQConnection uses netty to connect to ActiveMQ Artemis if the url has a tcp schema. When the application is stopped by tomcat, the shutdownGracefully is called on the netty group to stop all netty threads.
Örnek
Şöyle yaparız.
@Bean("connectionFactory")
public ConnectionFactory connectionFactory(JmsProperties appProperties) {
  ActiveMQConnectionFactory cf =
    new ActiveMQConnectionFactory(appProperties.getArtemis().getBrokerUrl());
  cf.setUser(appProperties.getArtemis().getUser());
  cf.setPassword(appProperties.getArtemis().getPassword());
  return cf;
}
constructor - boolean + TransportConfiguration
Örnek
Spring'de şöyle yaparız.
@Bean("connectionFactory")
public ConnectionFactory connectionFactory(TransportConfiguration
  transportConfiguration) throws JMSException {
  ActiveMQJMSConnectionFactory activeMQJMSConnectionFactory =
    new ActiveMQJMSConnectionFactory( false, transportConfiguration);
  activeMQJMSConnectionFactory.setPassword("admin");
  activeMQJMSConnectionFactory.setUser("admin");
  activeMQJMSConnectionFactory.setClientID("admin");
  return activeMQJMSConnectionFactory;
}
createConnection metodu
Şöyle yaparız
@PostConstruct
private void init() throws JMSException {
  Connection connection = activeMQConnectionFactory.createConnection();
  connection.start();
  Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  Queue queue = session.createQueue("test.queue");
  consumer = session.createConsumer(queue);

  new Thread(() -> {
    while (true) {
      try {
        Message receive = consumer.receiveNoWait();
        if (receive != null) {
          ...

        }
      } catch (Exception e) {
        ...
      }
    }
  }, "consumerThread").start();
}
getRedeliveryPolicyMap metodu
Örnek
Şöyle yaparız
public void initializeConsumer(String queueName, String topicAddress, int numOfRetry)
 throws JMSException {
  factory.getRedeliveryPolicyMap().put(new ActiveMQQueue("*." + queueName + ".>"),
    getRedeliveryPolicy(numOfRetry));
  Connection connection = factory.createConnection();
  connection.start();
  Session consumerSession = connection.createSession(false, CLIENT_ACKNOWLEDGE);
  Queue queue = consumerSession.createQueue("Consumer." + queueName +
            ".VirtualTopic." + topicAddress);
  MessageConsumer consumer = consumerSession.createConsumer(queue);
  consumer.setMessageListener(message -> {
    try {
      System.out.println("in listener --- " +
        ((ActiveMQDestination)message.getJMSDestination()).getPhysicalName());
      consumerSession.recover();
    } catch (JMSException e) {
      e.printStackTrace();
    }
  });
}

private RedeliveryPolicy getRedeliveryPolicy(int numOfRetry) {
  RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
  redeliveryPolicy.setInitialRedeliveryDelay(0);
  redeliveryPolicy.setMaximumRedeliveries(numOfRetry);
  redeliveryPolicy.setMaximumRedeliveryDelay(-1);
  redeliveryPolicy.setRedeliveryDelay(0);
  return redeliveryPolicy;
}
setAlwaysSyncSend metodu
Mesak senkron veya asenkron olarak gönderilebilir.

Örnek
Şöyle yaparız
String brokerUrl = ...
ActiveMQconnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);
factory.setUserName("admin");
factory.setPassword("password");
factory.setAlwaysSyncSend(false);
setBrokerUrl metodu
Örnek
Şöyle yaparız
"tcp://78.99.88.72:61616?jms.prefetchPolicy.all=2&
                         jms.useAsyncSend=true&
                         socketBufferSize=131072"
Örnek
Şöyle yaparız.
@Bean
public ConnectionFactory connectionFactory() {

  ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
  try {
    connectionFactory.setBrokerURL("failover:(tcp://activemq-01:61616,
      tcp://activemq-02:61616)?randomize=false&maxReconnectDelay=3000&
      maxReconnectAttempts=50");
    connectionFactory.setUserName("abcxyz");
    connectionFactory.setPassword("abcxyz");
  }catch(Exception e) {
    e.printStackTrace();
  }
  return connectionFactory;
}