Quando si deve scorrere uno stream di EventHub, oltre all’approccio classico che abbiamo già visto nel precedente post, esiste l’approccio implementando l’interfaccia IEventProcessor ed usando l’EventProcessorHost che ci garantisce un’esecuzione thread-safe e persiste autonomamente (su un nostro Storage Account) l’ultima posizione del nostro cursore virtuale di lettura dello stream, per poter proseguire con la lettura da dove ci si era fermati, qualunque sia il motivo dello stop.

Per poter sfruttare questo approccio, dovremo definire una classe che implementerà l’interfaccia IEventProcessor. Qui dentro, andremo a mettere la nostra logica di scodamento dei messaggi. Peculiarità della soluzione, è il fatto che il nostro processor andrà a girare dentro un EventProcessorHost che si occuperà per noi di alcune cose, forse la più importante, il fatto di collaborare con lo Storage Account, dove va a persistere dei dati che servono per sincronizzare eventuali lettori multipli e per conservare la posizione dell’ultimo elemento letto. Per evitare poi di rendere lo storage il collo di bottiglia, la persistenza viene fatta ogni N roundtrips di lettura dallo stream, così al peggio rileggeremo un piccolo numero (max N) di messaggi dallo stream, garantendoci però al contempo la capacità di non lasciare nulla dietro.

NB: Per necessità di semplificazione del codice da leggere, è stata usata una variabile statica di nome Instance che fa da ponte (come stessimo usando un Singleton pattern) per gli eventi dei messaggi ricevuti. Ovviamente questo design è inusabile in produzione, dove invece si va di solito ad usare un altro approccio magari con l’uso della IoC con cui inviare il messaggio direttamente alla classe di business che lo dovrà elaborare, senza dover sollevare un evento così brutalmente in un oggetto statico globale

Diamo uno sguardo al codice di esempio. Ricordo che chi non avesse letto i primi post con la spiegazione del progetto in uso può farlo qui

Il nostro EventProcessor

Prima di tutto, prepariamo una sottoclasse da mettere nel nostro worker per lettura/scrittura dei messaggi che si occuperà dietro le quinte di scodare i messaggi con il sistema dell’EventProcessorHost

sealed class MessageProcessor : IEventProcessor
{
public Task CloseAsync(PartitionContext context, CloseReason reason)
{
return Task.CompletedTask;
}

public Task OpenAsync(PartitionContext context)
{
return Task.CompletedTask;
}

const int MAX_COUNTER_TO_PERSIST_LEASE = 10;
int counter = 0;
public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
return Task.WhenAll(messages.Select(msg => Task.Run(() =>
{
//gestisco il messaggio ricevuto
var buffer = msg.GetBytes();
var item = MessageItem.Parse(Encoding.ASCII.GetString(buffer));
Instance.OnReceiveMessage?.Invoke(Instance, item);

//incremento sempre il contatore dei msg scaricati
Interlocked.Increment(ref counter);

if (counter >= MAX_COUNTER_TO_PERSIST_LEASE)
{
//resetto il counter
Interlocked.Exchange(ref counter, 0);

//informo il contesto di persistere lo stato dell’ultimo messaggio letto
//in caso di problemi, al riavvio del Worker si ripartirà dall’ultimo checkpoint
context.CheckpointAsync().ContinueWith(t =>
{
Console.WriteLine($”Syncing…”);
Debug.WriteLine($”Syncing…”);
});
}
})));
}
}

Come visibile, questo oggetto molto semplice si occupa solo di scodare i messaggi e girarli all’evento statico del worker per poterli poi rendere disponibili al di fuori. Durante questo lavoro, andiamo a contare il numero di messaggi scodati, e ogni tanto (in questo esempio ogni 10 messaggi scodati) andiamo a chiedere all’Host di sincronizzare la posizione dell’ultimo messaggio letto, così da persistere il cursore virtuale che ci consentirà in futuro eventualmente di ripartire la letttura da quella posizione, ed evitare di rileggere molti messaggi già processati. Va sempre tenuto presente che in un contesto di lettura da stream, per definizione, nessuno potrà mai garantire assolutamente la mancanza di messaggi doppi, quindi questi vanno tenuti presenti nella successiva logica di elaborazione!

Inizializzazione del worker

Con il nostro EventProcessor pronto, siamo in grado di inizializzare il nostro worker principale e instanziare il nostro Host che andrà poi autonomamente a gestire il ciclo di vita del nostro processor

if (Instance == null)
Instance = this;

//instanzio un host di eventi per processare i nostri messaggi in ricezione
ProcessorHost = new EventProcessorHost(
Guid.NewGuid().ToString(), //un nome univoco dell’host
HubName,
EventHubConsumerGroup.DefaultGroupName, //stiamo usando il gruppo di default, ma è possibile usarne di più contemporaneamente con più eventprocessor
HubConnectionString,
SyncConnectionString
);

//registro il mio eventprocessor
ProcessorHost.RegisterEventProcessorAsync<MessageProcessor>(new EventProcessorOptions
{
MaxBatchSize = 100,
ReceiveTimeOut = TimeSpan.FromSeconds(30),
});

//instanzio un client normale per poter spedire i messaggi
//la ricezione invece sarà a cura degli event processor
Client = EventHubClient.CreateFromConnectionString($”{HubConnectionString};EntityPath={HubName}”);

Possiamo vedere che infondo una volta registrato il nostro Processor dentro l’Host, non c’è altro da fare.

Importante da tenere in considerazione, è che con questo sistema stiamo lavorando solo su un Group dell’EventHub, per l’esattezza il Default. Dovessimo lavorare in un contesto più grande, con più Group attivi, dovremmo andare a registrare più volte un EventProcessor per far si che questo scodi dal relativo Group. Ma come già detto il codice da scrivere è minimo.

Summary

Riassumendo, con questo approccio andiamo a semplificare il codice ed ad avare qualcosa di thread-safe e di scalabile con facilità. Di contro, questo sistema fa un uso intensivo dello Storage Account che diventa così elemento essenziale e comunque di parte del sistema, ma d’altronde da qualche parte dobbiamo per forza persistere l’ultima posizione del cursore virtuale, e nell’esempio del precedente post ove non i usava questo Processor, andavamo ad usare il disco locale per conservare il Timestamp dell’ultimo messaggio letto e questa soluzione non è per niente scalabile e tantomeno usabile nel contesto d’esecuzione del cloud computing, dove dovremmo nel caso sempre usare o lo Storage Account come tale (blob) o come disco virtuale da attaccare alla macchina che in quel momento sta scodando, con ovvie complicazioni e minore robustezza della soluzione

Codice completo

Di seguito il codice completo del worker in oggetto

//scaricare pacchetto NuGet: Microsoft.Azure.ServiceBus.EventProcessorHost
public sealed class EventHubProcessorHostMessageWorker : IMessageReceiver, IMessageSender
{
public string Name => nameof(EventHubProcessorHostMessageWorker);
string HubConnectionString => ConfigurationManager.ConnectionStrings[“eventprocessorhubconnectionstring”]?.ConnectionString;
string HubName => ConfigurationManager.ConnectionStrings[“eventprocessorhubname”]?.ConnectionString;
string SyncConnectionString => ConfigurationManager.ConnectionStrings[“eventprocessorstorageconnectionstring”]?.ConnectionString;
public event EventHandler<MessageItem> OnReceiveMessage;
readonly EventProcessorHost ProcessorHost;
static EventHubProcessorHostMessageWorker Instance;
readonly EventHubClient Client;

sealed class MessageProcessor : IEventProcessor
{
public Task CloseAsync(PartitionContext context, CloseReason reason)
{
return Task.CompletedTask;
}

public Task OpenAsync(PartitionContext context)
{
return Task.CompletedTask;
}

const int MAX_COUNTER_TO_PERSIST_LEASE = 10;
int counter = 0;
public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
return Task.WhenAll(messages.Select(msg => Task.Run(() =>
{
//gestisco il messaggio ricevuto
var buffer = msg.GetBytes();
var item = MessageItem.Parse(Encoding.ASCII.GetString(buffer));
Instance.OnReceiveMessage?.Invoke(Instance, item);

//incremento sempre il contatore dei msg scaricati
Interlocked.Increment(ref counter);

if (counter >= MAX_COUNTER_TO_PERSIST_LEASE)
{
//resetto il counter
Interlocked.Exchange(ref counter, 0);

//informo il contesto di persistere lo stato dell’ultimo messaggio letto
//in caso di problemi, al riavvio del Worker si ripartirà dall’ultimo checkpoint
context.CheckpointAsync().ContinueWith(t =>
{
Console.WriteLine($”Syncing…”);
Debug.WriteLine($”Syncing…”);
});
}
})));
}
}

public EventHubProcessorHostMessageWorker()
{
if (Instance == null)
Instance = this;

//instanzio un host di eventi per processare i nostri messaggi in ricezione
ProcessorHost = new EventProcessorHost(
Guid.NewGuid().ToString(), //un nome univoco dell’host
HubName,
EventHubConsumerGroup.DefaultGroupName, //stiamo usando il gruppo di default, ma è possibile usarne di più contemporaneamente con più eventprocessor
HubConnectionString,
SyncConnectionString
);

//registro il mio eventprocessor
ProcessorHost.RegisterEventProcessorAsync<MessageProcessor>(new EventProcessorOptions
{
MaxBatchSize = 100,
ReceiveTimeOut = TimeSpan.FromSeconds(30),
});

//instanzio un client normale per poter spedire i messaggi
//la ricezione invece sarà a cura degli event processor
Client = EventHubClient.CreateFromConnectionString($”{HubConnectionString};EntityPath={HubName}”);
}

bool disposing = false;
public void Dispose()
{
if (!disposing)
{
disposing = true;
Client.Close();
ProcessorHost.UnregisterEventProcessorAsync().Wait();
ProcessorHost.Dispose();
}
}

public void SendMessage(MessageItem arg)
{
if (!disposing) Client.SendAsync(new EventData(Encoding.ASCII.GetBytes(arg.ToString())));
}
}