Ciao a tutti

immaginiamo di voler gestire dati telemetrici provenienti da vari dispositivi sparsi per il globo

la soluzione odierna più accattivante, è usare l’Azure Event Hub per collezionare dati, e l’Azure Stream Analytics per analizzarli

 

Chi già conosce le code (Queue) di Azure Service Bus, non avrà alcuna difficoltà a comprendere il funzionamento di un Event Hub. Difatti quest’ultimo è parte del service bus esattamente come le code. La differenza è che una coda ha una forte sequenzialità con supporto a transazionalità e rielaborazione automatica del messaggio in caso di mancata conferma, tutto di fabbrica, per garantire appunto la tanto sognata sequenzialità.

Differentemente, l’Event Hub è un multi-stream di dati ad altissima velocità, senza supporto transazionale e senza logiche di rielaborazione, tant’è che per essere usato, necessità di salvare l’indice dell’ultimo elemento letto… proprio come avverrebbe se stessimo tentando di leggere un FileStream che qualcun’altro continua a popolare per noi. Nella scrittura, invece, l’Hub si comporta quasi in modo identico ad una coda… semplicemente spariamo tutti questi eventi nell’etere, sperando che qualcuno sia eventualmente in ascolto!

Entrando più nel dettaglio, subito vediamo che l’Hub è composto da tanti stream che funzionano in parallelo. Difatti noi dobbiamo leggere/scrivere tutti gli stream insieme per raggiungere il massimo delle performance. Alternativamente, possiamo dividere gli stream per usi diversi come avviene in ogni scenario di partizionamento, vedi l’uso della cache o delle pagine nei database.

 

Per utilizzare l’Event Hub abbiamo bisogno di scaricare il pacchetto nuget dell’Azure Service Bus. Qui dentro, troviamo il NamespaceManager in grado di darci accesso all’intero bus, e di creare i client per l’Event Hub. E’ sufficiente scaricare la connection string dal manager di Azure per avere accesso al proprio bus. Per scaricarlo, basta selezionare (senza entrare nel dettaglio) il singolo bus nella pagina dei vari bus di Azure. In basso cliccando sul pulsante apposito avremo accesso alla connection string completa. Invece, dell’Hub basta prendere il nome dalla lista. Qui un esempio.

public static string BUS_CN = “[CONNECTION STRING]”;
public static string HUB_NAME = “[HUB NAME]”;
static void Main(string[] args)
{
    var ns = NamespaceManager.CreateFromConnectionString(BUS_CN);
    var hub1 = ns.CreateEventHubIfNotExists(HUB_NAME);
    var client = EventHubClient.CreateFromConnectionString(BUS_CN, hub1.Path);
    var group = client.GetConsumerGroup(“$Default”);

 

Con questi oggetti a nostra disposizione, è sufficiente usare l’EventHubClient per inviare/ricevere messaggi. Nel nostro esempio lo useremo solo in invio. Per continuare, infatti, andremo ad usare un altro pacchetto NuGet, il mio TNX.SystemMonitor (http://www.antonioesposito.it/?p=591), che ci consente con semplicità di accedere ai performance counter di CPU, RAM e LAN del nostro PC, in poco tempo, e già ad eventi.

Sempre da NuGet, è necessario scaricare anche il pacchetto Newtonsoft.Json

Con questa ulteriore porzione di codice, completiamo la nostra applicazione Console (DOS) per leggere la telemetria del nostro PC in realtime sparandola sull’Event Hub.

    var healthCollector = new PerformanceCollector()
    {
        SamplingDelayMs = 500,
        NotifyOnlyOnDataChange = true,
        DecimalDigits = 1,
    };

    healthCollector.ProcessorTimePercentValueChanged += (o, e) =>
        client.SendAsync(NewEvent(Environment.MachineName, “ProcessorTime”, healthCollector.ProcessorTimePercentValue));

    healthCollector.MemoryUsagePercentValueChanged += (o, e) =>
        client.SendAsync(NewEvent(Environment.MachineName, “MemoryUsage”, healthCollector.MemoryUsagePercentValue));

    healthCollector.NetworkInMBitValueChanged += (o, e) =>
        client.SendAsync(NewEvent(Environment.MachineName, “NetworkIn”, healthCollector.NetworkInMBitValue));

    healthCollector.NetworkOutMBitValueChanged += (o, e) =>
        client.SendAsync(NewEvent(Environment.MachineName, “NetworkOut”, healthCollector.NetworkOutMBitValue));

    Console.WriteLine(“Collecting telemetry data…”);
    Console.WriteLine(“PRESS RETURN TO EXIT”);
    Console.ReadLine();
}

static EventData NewEvent(string system, string counter, double value)
{
    if (Debugger.IsAttached)
        Debug.WriteLine(string.Format(“{0}: {1}”, counter, value));

    using (var w = new StringWriter())
    {
        JsonSerializer.Create().Serialize(w, new
            {
                UtcTime = DateTime.UtcNow,
                Value = value,
                SystemName = system,
                CounterName = counter,
            });

        return new EventData(Encoding.ASCII.GetBytes(w.ToString()));
    }
}

}

 

La demo in alto, non fa altro che leggere tutti i dati del nostro PC dai vari eventi, per poi spararli sull’Hub impostando nel messaggio tutte le proprietà utili: timestamp, il nome del PC ed il nome del Counter, oltre ovviamente al valore letto dal counter. 

Potremmo leggerci da noi i messaggi su Azure, ad esempio creando un Receiver per ogni partizione… un simile esempio è in una mia demo di qualche settimana fa: http://www.antonioesposito.it/?p=511

Ma invece, oggi vedremo un’altra opzione di lettura, sicuramente più potente e funzionale: l’Azure Stream Analytics

Come il nome lascia intendere, questi non è altro che un lettore di stream di dati, nel nostro caso l’Event Hub

Una volta banalmente creato un nuovo analytics (associando regione ed un [solito] storage account), andremo ad aggiungere un Input di tipo Data Stream, di tipo Event Hub, selezioniamo il nostro Hub da cui leggere, e gli diamo un alias interno allo Stream Analytics.

Una volta impostato un input, è necessario impostare un output. Ad esempio possiamo usare un DB SQL Azure, in alternativa il Blob Storage, un altro Event Hub, Power BI, Table Storage. Sceglieremo per questo esempio un output di tipo SQL Azure, Basta dare tutta la configurazione necessaria, come il nome del server (solo nome senza dominio), specificare che eventualmente è un’altra sottoscrizione (il mio esempio), nome del DB, username, password e tabella.

Questa la mia tabella:

CREATE TABLE [dbo].[History](
    [UtcTime] [datetime] NOT NULL,
    [Value] [float] NOT NULL,
    [SystemName] [varchar](50) NOT NULL,
    [CounterName] [varchar](50) NOT NULL,
    [HistoryID] [bigint] IDENTITY(1,1) NOT NULL,
CONSTRAINT [PK_History] PRIMARY KEY CLUSTERED
(
    [HistoryID] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON)
)

Adesso è il momento di leggere i dati inviati in JSON nell’Hub dall’interno dello Stream Analitics. Qui è sufficiente impostare una Query che legga dal nostro alias di input (come specificato prima) per poi spararli in un alias di output. Il lavoro importante qui è far rispettare la struttura della tabella che in pratica deve combaciare alla perfezione con l’output della query.

Questa la mia Query:

SELECT UtcTime, Value, SystemName, CounterName

INTO
    [HistoryTable]
FROM
    [SystemHealthFromHub]

la sintassi è molto semplice!

Una volta che il tutto è completato, sarà sufficiente premere play nello Stream Analytics per far avviare la procedura di data routing!

Occhio che

  • Le Stream Units (gli engine che processano e girano i dati) non leggono eventuali dati preesistenti nell’Hub. Unica eccezione, se stoppiamo l’Analytics e poi lo vogliamo riavviare, possiamo dirgli di ripartire da dove si era fermato (ricordiamo che l’Hub ha un indice per fare ciò)
  • Eventuali errori li leggiamo negli Output

Ovviamente le stream units sono scalabili e configurabili.

Oltre quanto detto, ricordiamo che la Query è il punto ideale per “aggiustare” i dati e per pre-elaborarli.

Una volta che il sistema sarà operativo, è possibile analizzare i dati nel nostro nuovo DB come più ci aggrada. Ovviamente, la possibilità di analizzare questi dati con strumenti di BI rende il tutto più potente. Oggi ci siamo soffermati alla parte di “routing” dei dati telemetrici (o eventi qualunque) dall’Hub ad un nostro database, tramite lo Stream Analytics.

 

a presto con ulteriori aggiornamenti

Lascia un commento