Following the previous post on the TNX.ApplicationJobs library serie on NuGet, it is time to write a new example we often use in our cusomers implementation when on-premise

The goal is very few code lines, a uniquely comprensive job execution environment, easy to maintain, to trace and so on

This time, we will write a very small consoleapplication job host able to consume messages from a RabbitMQ and do whatever or simply output other messages to send the business task to another job agent.

As always, the code example is very, very, very, short! This is an internal lib we used in previous months/some-year to reduce our dev development timings when had to do simple works at our customers HQ

    • NuGet: TNX.ApplicationJobs.Common
    • NuGet: TNX.ApplicationJobs.RabbitMq

Here we go:

class Program
{
     static ConnectionFactory factory => new ConnectionFactory { HostName = “rabbitserver”, UserName = “rabbituser”, Password = “password” };

    static void Main(string[] args) => new ConsoleApplication()
         .Init(args, overrideVerbose: true)
         .ConsumeMessageAsync(factory, “queue_name”, new OnMessageReceivedHandler<Message1>((channel, verbose, msg) =>
         {
             //receive messages via Json serialization

            //transform
             var output = new Message2 { Name = msg.Name, Surname = msg.Surname, NewProperty = “ciao” };

            //output a new message on the queue to continue distributed programming
             //on another job agent or do what ever you like if no output is required

            //channel.BasicPublish(…) is the standard way

            //output the message via Json serialiation
             channel.SendMessage(“output_queue_name”, output);

            //overrides does exists to send string or byte[] messages

            return true; //the true will cause the message deletion on the underlying queue
         }))
         .Wait();
}

We definitely love Fluent notation.

So, the ConsoleApplication starts the job excution agent and environment (see previous post to read more or simply download the package and extend the class by yourself)

The Init method grab application startup commands and init the ServicePoint and ThreadPool setup

Then the ConsumeMessageAsnc extension method register child tasks able to process (consume) messages from the Queue. Consider that the implementation will create a consumer per thread as specified in the initialization parameters: default is 2; setup thread count witht the command “–th:N”

Extension methods make very easy to read and write messages to the queue

The same happens if you prefer dealing with text objects or byte[] buffers without having to serialize/deserialize via Json

.ConsumeMessageAsync(factory, “queue_name”, new OnMessageReceivedHandler((channel, verbose, body) =>
{
     //receive body as string
     return true;
}))
.ConsumeMessageAsync(factory, “queue_name”, new OnBufferReceivedHandler((channel, verbose, body) =>
{
     //receive body as byte[] buffer
     return true;
}))

Thanks for reading