Ciao a tutti

continuando il precedente post sull’argomento, proseguo parlando della prima e più economica soluzione per gestire tanti messaggi su coda asincrona in Azure: la Queue dello Storage Account

Non mi dilungo su pro e contro, ne ho già parlato abbondantemente nello scorso post. Parleremo invece di come implementare una soluzione solida, vedremo come creare un worker pool dinamico, in grado quindi di aumentare/diminuire il numero di scodatori in modo del tutto automatico in base al numero di messaggi presenti in coda. Ovviamente questa soluzione non va del tutto d’accordo con il numero di sistemi che stanno scodando, nel senso che nell’esempio diamo per scontato che abbiamo un unico nodo di scodamento messaggi. Invece, nella realtà, in genere si usa un WebJob o un WorkerRole in cui hostare la propria logica. In questi casi si tende ad usare la scalabilità (scale out) orizzontale dell’host della nostra logica al posto di aumentare/ridurre il numero di processi logici che scodano. Comunque non è sempre detto. Se fissiamo il numero di scodatori in maniera hard-coded potremmo comunque sprecare risorse o tempo, mentre un sistema del tutto dinamico spesso offre risultati migliori, con ovvi maggiori responsabilità nella sua architettura.

Chiudiamo il preambolo e passiamo alla soluzione. Dicevamo che la coda dello Storage è la più economica ed anche quella con meno funzioni. Io ad esempio la uso per conservare temporaneamente tutti i messaggi che entrano nei miei sistemi basati su dati che arrivano da dispositivi IoT satellitari con SIM integrata. Non uso uno Stream solo perché tanto serve comunque uno Storage per gestire la sincronizzazione di lettura con costi ovviamente superiori (poi vedremo meglio quell’aspetto nei successivi post), oltre al fatto di non aver benefici oggettivi (nella mia soluzione).

Il progetto dimostrativo usa due contenitori di pubblicatori e consumatori di messaggi che devono implementare una ben precisa interfaccia: IMessageSender ed IMessageReceiver. Senza entrare troppo nel dettaglio di come funziona l’architettura demo, basti sapere che c’è una classe per la gestione di ogni tipologia di Coda. Adesso analizzeremo la classe StorageQueueMessageWorker, che implementa una logica di base di invio/ricezione di messaggi dalla Queue dello Storage Account.

Ogni worker che voglia ricevere messaggi, deve esporre (implementando l’IMessageReceiver) un evento OnReceiveMessage per comunicare al mondo di aver ricevuto un messaggio. Qui il primo handicap della coda dello storage: funziona in polling, quindi dobbiamo creare un Client, poi uno o più processi asincroni su altri thread (Thread, ThreadPool, Task, etc) per pollare la ricezione dei messaggi. Nel nostro caso, andiamo a creare un worker per ogni CPU logica del PC

Questa classe, andrà poi a collezionare i vari Task (che ho scelto di usare per contenere i worker di scodamento) in una lista, per poi eliminarli del tutto alla distruzione dell’oggetto intero

Inizializzazione:

this.Account = CloudStorageAccount.Parse(ConfigurationManager.ConnectionStrings[“storageQueueStorageAccountConnectionString”].ConnectionString);

this.Client = Account.CreateCloudQueueClient();
this.Queue = Client.GetQueueReference(“messages”);
Queue.CreateIfNotExists();

this.WorkingTasks = new List<Task>();

//create the first message pump
CreateWorker();

//static message pump amount based on logical CPU count
for (int i = 0; i < Environment.ProcessorCount; i++)
     CreateWorker();

Creazione di un worker interno (scodatore di messaggi):

Task task = null;

task = Task.Factory.StartNew(() =>
{
     while (WorkingTasks.Contains(task))
     {
         //no more than 3 second to get message or null
         var msg = Queue.GetMessage(options: new QueueRequestOptions { ServerTimeout = TimeSpan.FromSeconds(3) });
         if (msg != null)
         {
             //thelemetry data does not require transactional queue design
             Queue.DeleteMessage(msg);
             Task.Factory.StartNew(() => OnReceiveMessage?.Invoke(this, MessageItem.Parse(msg.AsString)), TaskCreationOptions.DenyChildAttach | TaskCreationOptions.PreferFairness);
         }
     }
});

this.WorkingTasks.Add(task);
Debug.WriteLine($”{Name}: now working with {WorkingTasks.Count:N0} workers”);
Console.WriteLine($”{Name}: now working with {WorkingTasks.Count:N0} workers”);

Aggiunta di messaggi alla coda:

public void Enqueue(MessageItem arg) => Queue.AddMessageAsync(new CloudQueueMessage(arg.ToString()));

Quando avviamo questo progetto demo, ed andiamo ad eseguire questo codice, quello che succede è molto semplice: l’aggiunta è immediata anche se asincrona, non dobbiamo scrivere quasi niente, mentre la ricezione ci costringe a creare più Task di scodamento che tutti insieme andranno a pollare la coda in attesa di messaggi in attesa.

In questo progetto dimostrativo, sto campionando due variabili di sistema (performance counter) del mio PC: CPU e transazioni sul SQL locale ad una frequenza di 20 hertz cad. Il risultato è che se non usassi più worker di scodamento, il sistema non sarebbe mai online a tempo zero, in pratica andrebbe sempre a collezionare nuovo ritardo. Con almeno 4 worker di scodamento invece tutto procede con 1 o 2 secondi di ritardo, che è nella norma per una coda cloud utilizzata da una connessione domestica fuori regione.