Ciao a tutti

Il post di domenica ci aveva fatto vedere come scodare da una Queue dello Storage Account di Azure. Adesso vediamo una piccola modifica da fare a quel codice, per avere un numero dinamico di worker asincroni che scodano messaggi.

La soluzione è molto semplice: andremo a valutare il numero residuo di messaggi sulla coda e andiamo a valutare se questo numero è minore o superiore alle nostre soglie prestabilite. Abbiamo due soglie, una per l’aumento dei worker, ed una per la riduzione dei worker, con ovviamente un delta tra le due per evitare di fare aggiungi/rimuovi all’infinito. Nel progetto di esempio: abbiamo 50 come soglia di aumento, e 10 come soglia di riduzione.

Alla creazione della nostra classe di elaborazione dati, andiamo a creare un Task aggiuntivo di controllo del numero di scodatori. Il resto del codice è identico alla soluzione già vista e quindi non l’andrò a ricopiare. Il Task di controllo eseguirà ad una velocità

Da un punto di vista di soluzione, va detto che questa implementazione andrà ovviamente ad aumentare il ritardo globale nella gestione dei messaggi perché se prima questo ritardo non supera dei livelli critici, non andranno ad aumentare i worker di scodamento. D’altro canto, è virtualmente capace di gestire un numero molto maggiore di messaggi quando questi sono disponibili, e ridurre il polling inutile in caso di mancanza di messaggi sulla coda nella quantità tale da soddisfare tutti i worker, riducendone appunto il numero.

Nuova inizializzazione:

//dynamic message pump amount
//we dont need using multiple Clients, only multiple runners
Task.Factory.StartNew(() =>
{
while (!disposing)
{
Thread.Sleep(5000);
Queue.FetchAttributes();
var lastCount = Queue.ApproximateMessageCount;
if (lastCount != null)
{
Debug.WriteLine($”{typeof(StorageQueueMessageWorker)}: now pending {lastCount:N0} messages”);

if (lastCount.Value > MIN_MESSAGECOUNT_TO_INCREMENT_WORKERS * WorkingTasks.Count)
//add another pump
CreateWorker();
else if (WorkingTasks.Count > 1 && lastCount.Value < MAX_MESSAGECOUNT_TO_DECREMENT_WORKERS * WorkingTasks.Count)
this.WorkingTasks.RemoveAt(0);
}
}
}, TaskCreationOptions.LongRunning);