Ciao a tutti

Azure Service Bus Logo
Azure Service Bus Logo

come detto nella prima parte di questa serie di post, la coda del Service Bus è quella più completa a disposizione su Azure, al prezzo ovviamente delle maggiori funzioni che abbiamo a disposizione, il che la rende ovviamente più costosa della coda dello Storage Account.

Non mi dilungo sull’elencare le differenze. Andremo invece direttamente a vedere come lo stesso modulo (reader e writer) già scritto per la coda dello Storage Account, scritto per la coda del SB, diventi estremamente più concisa, dato il naturale design della soluzione già conforme con quanto ci serve: reader in ascolto ad eventi e writer asincrono.

public sealed class SBQueueMessageWorker : IMessageSender
{
public string Name => nameof(SBQueueMessageWorker);
public QueueClient Client { get; private set; }
public MessagingFactory MsgFactory { get; private set; }
public event EventHandler<MessageItem> OnReceiveMessage;

public SBQueueMessageWorker()
{
this.Client = QueueClient.CreateFromConnectionString(ConfigurationManager.ConnectionStrings[“sbQueueConnectionString”].ConnectionString
, ReceiveMode.PeekLock);
this.Client.OnMessage(OnMessageReceived, new OnMessageOptions { MaxConcurrentCalls = Environment.ProcessorCount, AutoComplete = true });

//I prefer using PeekLock receive mode (the default) instead of the ReceiveAndDelete
//because the second one has poor scalability and reduces the overall receive speed
//when using multiple receive pipelines as in this case
}

private void OnMessageReceived(BrokeredMessage obj)
{
var item = MessageItem.Parse(obj.GetBody<string>());

Task.Factory.StartNew(() => OnReceiveMessage?.Invoke(this, item)
, TaskCreationOptions.DenyChildAttach | TaskCreationOptions.PreferFairness);
}

public void SendMessage(MessageItem arg)
{
Client.SendAsync(new BrokeredMessage(arg.ToString()) { PartitionKey = arg.Key });
}
}

Questo approccio andrà a creare una comunicazione asincrona in scrittura, di tipo FIFO (non garantito) nel senso che essendo un sistema completamente asincrono, accavallamenti possono capitare, sia tra messaggi appartenenti a diverse variabili reali da campionare, che anche tra valori della stessa variabile reale. Per noi sarebbe anche accettabile, dato che stiamo facendo telemetria, e la perdita di un pacchetto è meno importante di una statistica riduzione delle performance della soluzione.

Andando ad eseguire la test console, andremo a notare subito come all’inizio i dati entrano con un ritardo massimo di 1~3 secondi, normale usando una connessione WiFi di scarsa qualità. Semplificando, succede che la coda del Service Bus cerca di spingere i messaggi verso il reader in modo da averne almeno qualcuno il prima possibile….. Il problema con questa soluzione è che appena passano 10 o 20 secondi, iniziano ad arrivare sia qualche messaggio degli ultimi, giusto per avere dati aggiornati, ma anche i messaggi vecchi che non avevano raggiunto ancora il reader, quindi vedremo la test console mostrare un output tipo questo:

SBQueueMessageWorker => % processor time:9,0 (3s delay)
SBQueueMessageWorker => Transactions/sec:0,0 (72s delay)
SBQueueMessageWorker => % processor time:32,5 (3s delay)
SBQueueMessageWorker => % processor time:7,8 (3s delay)
SBQueueMessageWorker => Transactions/sec:0,0 (72s delay)
SBQueueMessageWorker => % processor time:0,0 (3s delay)
SBQueueMessageWorker => Transactions/sec:0,0 (72s delay)
SBQueueMessageWorker => % processor time:23,3 (3s delay)
SBQueueMessageWorker => Transactions/sec:0,0 (72s delay)
SBQueueMessageWorker => Transactions/sec:0,0 (72s delay)
SBQueueMessageWorker => % processor time:16,1 (4s delay)
SBQueueMessageWorker => Transactions/sec:0,0 (72s delay)
SBQueueMessageWorker => % processor time:0,0 (4s delay)
SBQueueMessageWorker => % processor time:9,7 (4s delay)
SBQueueMessageWorker => Transactions/sec:0,0 (72s delay)
SBQueueMessageWorker => % processor time:2,4 (4s delay)

Aggiunta delle sessioni

Con le code del Service Bus è possibile utilizzare le sessioni, una per ogni variabile reale da campionare, che agisce come fosse una sotto coda per ogni variabile reale, così da avere un supporto FIFO garantito (a meno di eccezioni durante la fase di scodamento che farebbero come sempre ritornare il messaggio sul reader). Vediamo il codice

public sealed class SBOrderedQueueMessageWorker : IMessageSender, IMessageReceiver
{
public string Name => nameof(SBOrderedQueueMessageWorker);
public QueueClient Client { get; private set; }
public event EventHandler<MessageItem> OnReceiveMessage;
private static SBOrderedQueueMessageWorker SendingInstance = null;

public SBOrderedQueueMessageWorker()
{
if (SendingInstance == null)
SendingInstance = this;

this.Client = QueueClient.CreateFromConnectionString(ConfigurationManager.ConnectionStrings[“sbOrderedQueueConnectionString”].ConnectionString
, ReceiveMode.PeekLock);

this.Client.RegisterSessionHandler(typeof(SBOrderedQueueSessionHandler), new SessionHandlerOptions
{
AutoComplete = true,
MaxConcurrentSessions = Environment.ProcessorCount,
});
}

public class SBOrderedQueueSessionHandler : IMessageSessionHandler
{
public void OnCloseSession(MessageSession session)
{
session.Close();
Console.WriteLine($”Closing connection to session {session.SessionId}”);
}

public void OnMessage(MessageSession session, BrokeredMessage obj)
{
var item = MessageItem.Parse(obj.GetBody<string>());

Task.Factory.StartNew(() => SBOrderedQueueMessageWorker.SendingInstance.OnReceiveMessage?.Invoke(SBOrderedQueueMessageWorker.SendingInstance, item)
, TaskCreationOptions.DenyChildAttach | TaskCreationOptions.PreferFairness);
}

public void OnSessionLost(Exception exception)
{
Console.WriteLine($”Receiving exception from session: {exception.Message} {exception.InnerException?.Message}”);
}
}

public void SendMessage(MessageItem arg)
{
Client.SendAsync(new BrokeredMessage(arg.ToString()) { SessionId = arg.Key, PartitionKey = arg.Key });
}
}

Come sempre, per semplificare la lettura non abbiamo usato alcun framework/design particolare, per poter soffermare solo sul funzionamento della tecnologia che vogliamo usare. In questo esempio, abbiamo anche usato una specie di Singleton mal implementato per poter gestire l’evento di output dei messaggi ricevuto che poi la test console andrà a mostrare.

La differenza principale con il worker semplice, è che qui andiamo a creare una classe per poter gestire propriamente le nostre sessioni. Poi andiamo a registrare il tipo del nostro gestore delle sessioni sul client, che andrà a instanziare il giusto numero in base a quante sessioni effettivamente troveremo sulla coda. In invio la modifica da fare è minima, semplicemente andiamo a specificare la chiave della sessione e della partizione (che per noi sono uguali), nulla di più.

La differenza nell’esecuzione invece è estremamente forte: l’avvio è quasi a scatti…. un tantino strozzato…. poi dopo 10 o 20 secondi il client si stabilizza ed i dati iniziano ad arrivare quasi senza ritardi, recuperando in pochissimo il ritardo iniziale acquisito. Di seguito l’output della test console

SBOrderedQueueMessageWorker => Transactions/sec:0,0 (0s delay)
SBOrderedQueueMessageWorker => % processor time:32,1 (0s delay)
SBOrderedQueueMessageWorker => % processor time:15,9 (0s delay)
SBOrderedQueueMessageWorker => Transactions/sec:0,0 (0s delay)
SBOrderedQueueMessageWorker => % processor time:16,3 (0s delay)
SBOrderedQueueMessageWorker => Transactions/sec:0,0 (0s delay)
SBOrderedQueueMessageWorker => Transactions/sec:0,0 (0s delay)
SBOrderedQueueMessageWorker => Transactions/sec:0,0 (0s delay)
SBOrderedQueueMessageWorker => % processor time:0,0 (0s delay)
SBOrderedQueueMessageWorker => % processor time:39,2 (0s delay)
SBOrderedQueueMessageWorker => % processor time:31,8 (0s delay)
SBOrderedQueueMessageWorker => % processor time:9,5 (0s delay)

 

Chiudiamo dicendo cosa abbiamo visto: la coda del Service Bus di Azure, è sicuramente più potente di quella dello Storage Account. Si dice che in telemetria la certezza della consegna di tutti i dati non è del tutto essenziale… ma dipende, dobbiamo solo fare analisi statistica? allora è così, altrimenti dovessimo fare analisi strategiche, o pilotare qualche output in funzione dei dati che riceviamo, allora si che vogliamo i dati quanto prima e con la garanzia di sequenzialità.

Nel prossimo post approfondiremo invece l’approccio con gli Stream, dove abbiamo maggiore banda e la possibilità di utilizzare più sottoscrittori dello stream per fare magari attività diverse