Ciao a tutti

Azure Messaging è un argomento estremamente vasto e affascinante. Abbiamo già visto come inviare messaggi con le code, sia con quelle basi dello Storage Account, che con quelle più avanzate del Service Bus, adesso è l’ora dell’Event Hub, lo stream distribuito di Microsoft Azure

Quando però si parla di IoT, spesso si parla di dispositivi che autonomamente sono in grado di produrre (o campionare) dati che poi direttamente (e spesso in maniera grezza) vengono inviati su un sistema remoto più potente, in grado di storicizzarli ed analizzarli. Questo è lo scenario (esemplificando) di telemetria globalmente più usato. Se poi l’analisi serva a produrre report di business, manutenzione predittiva od a fare da insegnante ad un sistema di intelligenza artificiale, non cambia il fatto che il messaging è la parte essenziale dell’intera soluzione. Altrimenti non avremmo alcun dato da analizzare e quindi alcuna risposta da produrre. E questo vale anche in scenari non di telemetria ma di controllo remoto o vicino (nella nebbia) – vedi post Edge Computing

Tornando al messaging con l’Event Hub: ricordiamo bene cosa è uno stream. A differenza delle code, che sono dei sistemi di persistenza temporanei che spediscono messaggi in ottica FIFO (garantita o no, con partizioni e sessioni o no) ove esiste il principio della garanzia di consegna, gli stream sono dei flussi di dati punto e basta. Per flusso si intende un insieme di messaggi omogenei per tipo o no (in genere per comodità sono omogenei) il cui flusso di byte viene inoltrato in maniera sequenziale. Già per definizione quindi sparisce il concetto di messaggio vero e proprio, nel senso di item che c’è invece in una coda, e che qui si trasforma nel significato di dato, cioè gruppo di byte che rappresentano un’informazione, che noi chiamiamo comunque messaggio anche se non è più un messaggio nel senso di comando, ma solo nel senso di informazione. Sparisce il principio della consegna, sparisce il supporto alla transazionalità ed alla relativa capacità di riconsegnare il messaggio ad altro ricevente in caso di problemi, sparisce il concetto di gestione degli scarti (i messaggi non consegnabili o andati in errore troppe volte) dato che semplicemente non esiste il ciclo di vita di un messaggio.

Abbiamo invece dei dati che viaggiano ad altissima velocità, possono essere riletti entro un tot di tempo, possono essere processati da più sistemi tutti contemporaneamente dato che lo stream è condiviso e possono essere shardati in base alle nostre necessità: per cliente, creando un sistema multitenant, per risorsa oppure per gruppo di risorse e quindi partizionando lo stream in quelli che potrebbero essere tanti stream più piccoli.

Sicuramente, il pro più importante che fa pendere nell’uso di uno stream è la sua velocità e scalabilità. Cosa in cui una coda per definizione è limitata. Poi anche il fatto di usare protocolli standard, un consumo minimo della banda, bassa latenza e più processi che ne leggono i dati, fanno dello stream il sistema di scambio dati di telemetria in assoluto di riferimento.

Event Hub è uno stream molto potente, è distribuito, scalabile, supporta più processori (virtual machine) che ne “pompano” i messaggi, supporta il partizionamento, e supporta vari protocolli (vedremo in seguito in altro post i protocolli e le loro differenze). Come tutti gli stream, l’uso dell’Event Hub ci costringe a ricordarci dove eravamo rimasti nell’ultima lettura, al pari di leggere un file gigantesco che cresce all’infinito che dobbiamo continuare a leggere. Nel caso di un file, basterebbe ricordare l’indice dell’ultimo byte letto; nel caso di Event Hub, ricorderemo il timestamp di creazione dell’ultimo messaggio processato. Eh si, perchè quando si parla di code i messaggi si scodano, mentre per gli stream no, si processano, si leggono, ma non si scodano, dato che questo avrebbe un significato particolare che appunto non è applicabile in questo contesto.

Demo con unica partizione

Vediamo adesso l’esempio di codice usuale, in scrittura asincrona e lettura asincrona ad eventi come abbiamo visto già per le code nei post precedenti.

In questo caso, andremo a fare un invio e ricezione semplici sulla stessa partizione. Alla inizializzazione del worker creiamo il client, accediamo alla partizione di base, creiamo un receiver per la partizione di base, e su un thread a parte iniziamo a metterci in ascolto. Sotto questo punto di vista, l’API è più simile a quella della Queue dello Storage Account, dato che non c’è un evento che già pronto ci torna i messaggi quando si presentano

Inizializzazione del client

this.client = EventHubClient.CreateFromConnectionString(ConnectionString);
this.defaultConsumer = client.GetDefaultConsumerGroup();
this.defaultReceiver = defaultConsumer.CreateReceiver(
//scorro lo stream sulla prima partizione soltanto
client.GetRuntimeInformation().PartitionIds[0]
//scorro lo stream partendo dall’ultimo timestamp di messaggio letto
, LastOffset);

Come dicevamo, quando creiamo il client, è importante specificare l’ultimo timestamp già letto.

Invio del messaggio

public void SendMessage(MessageItem arg)
{
if (!disposing) this.client.SendAsync(new EventData(Encoding.ASCII.GetBytes(arg.ToString())));
}

Task in background di scodamento

this.worker = Task.Factory.StartNew(() =>
{
DateTime lastOffset = LastOffset;

while (!disposing && !defaultReceiver.IsClosed && !defaultConsumer.IsClosed && !client.IsClosed)
using (var data = defaultReceiver.Receive(TimeSpan.FromSeconds(3)))
//possono esserci doppioni quindi li scarto
if (data != null)
{
var buffer = data.GetBytes();
var item = MessageItem.Parse(Encoding.ASCII.GetString(buffer));
Task.Factory.StartNew(() => OnReceiveMessage?.Invoke(this, item), TaskCreationOptions.PreferFairness);

//aggiorno l’ultimo timestamp di messaggio letto
lastOffset = data.EnqueuedTimeUtc;
}

//prima di chiudere il task aggiorno il timestamp su disco
LastOffset = lastOffset;
}, TaskCreationOptions.LongRunning);

La logica è semplice: proviamo a processare un nuovo messaggio se presente sullo stream, se dopo 3 secondi non c’è mandiamo avanti il ciclo, così da poter eventualmente uscire al Dispose dell’intero worker. Poi il resto è semplice, conserviamo l’ultimo timestamp e solo all’uscita del ciclo provocato dal Dispose salviamo il timestamp su disco per quando andremo a riavviare il processo. Dato il disallineamento (anche se minimo) dei timestamp, qualche messaggio doppio al riavvio del ciclo può capitare, bisogna tenerne conto.

 

Ciò che colpisce di questa soluzione, è la latenza bassissima ed il basso consumo di banda.

Volendo usare altri reader, poter poter usare i messaggi per alimentare diverse logiche senza accoppiarne il codice a valle, basta usare ancora il CreateReceiver del ConsumerGroup ed ovviamente creare altri Task per processarne i messaggi.

In coda il codice completo del worker funzionante

 

public sealed class EventHubMessageWorker : IMessageSender, IDisposable, IMessageReceiver
{
private EventHubClient client;

public string Name => nameof(EventHubMessageWorker);
string ConnectionString => ConfigurationManager.ConnectionStrings[“eventHub”]?.ConnectionString;

public EventHubMessageWorker()
{
this.client = EventHubClient.CreateFromConnectionString(ConnectionString);
this.defaultConsumer = client.GetDefaultConsumerGroup();
this.defaultReceiver = defaultConsumer.CreateReceiver(
//scorro lo stream sulla prima partizione soltanto
client.GetRuntimeInformation().PartitionIds[0]
//scorro lo stream partendo dall’ultimo timestamp di messaggio letto
, LastOffset);

this.worker = Task.Factory.StartNew(() =>
{
DateTime lastOffset = LastOffset;

while (!disposing && !defaultReceiver.IsClosed && !defaultConsumer.IsClosed && !client.IsClosed)
using (var data = defaultReceiver.Receive(TimeSpan.FromSeconds(3)))
//possono esserci doppioni quindi li scarto
if (data != null)
{
var buffer = data.GetBytes();
var item = MessageItem.Parse(Encoding.ASCII.GetString(buffer));
Task.Factory.StartNew(() => OnReceiveMessage?.Invoke(this, item), TaskCreationOptions.PreferFairness);

//aggiorno l’ultimo timestamp di messaggio letto
lastOffset = data.EnqueuedTimeUtc;
}

//prima di chiudere il task aggiorno il timestamp su disco
LastOffset = lastOffset;
}, TaskCreationOptions.LongRunning);
}

const string fname = “offset.dat”;

DateTime LastOffset
{
get
{
if (File.Exists(fname))
using (var f = File.OpenRead(fname))
return (DateTime)new BinaryFormatter().Deserialize(f);
else
return default(DateTime);
}
set
{
using (var f = File.OpenWrite(fname))
new BinaryFormatter().Serialize(f, value);
}
}

public void SendMessage(MessageItem arg)
{
if (!disposing) this.client.SendAsync(new EventData(Encoding.ASCII.GetBytes(arg.ToString())));
}

bool disposing = false;
private EventHubConsumerGroup defaultConsumer;
private EventHubReceiver defaultReceiver;
private Task worker;

public event EventHandler<MessageItem> OnReceiveMessage;

public void Dispose()
{
if (!disposing)
{
disposing = true;

worker.Wait();
worker.Dispose();
defaultReceiver.Close();
defaultConsumer.Close();
client.Close();
}
}
}