Ciao a tutti

Dopo l’introduzione all’Azure Messaging, è l’ora di iniziare con gli esempi pratici di codice; ma per poterli fare bisogna preparare l’ambiente che useremo per i test che come base sarà una console application.

Come vedrete, non ho usato alcun framework di IoC o configurazione o pattern architetturale o di design nella soluzione, questo per migliorare la leggibilità del codice e la comprensione delle funzioni che in questo momento sono il cuore del nostro interesse, e cioè la messaggistica su Azure. Ovvio che in un uso reale in produzione, molte più accortezze sono da prendersi in considerazione:

Le interfacce di base

public interface IMessageWorker
{
string Name { get; }
}

public interface IMessageSender : IMessageWorker
{
void SendMessage(MessageItem arg);
}

public interface IMessageReceiver : IMessageWorker
{
event EventHandler<MessageItem> OnReceiveMessage;
}

Il messaggio utilizzato in tutte le code:

public sealed class MessageItem
{
public override string ToString()
{
return JsonConvert.SerializeObject(this);
}

public static MessageItem Parse(string arg)
{
return JsonConvert.DeserializeObject<MessageItem>(arg);
}

public string Key { get; set; }
public DateTime Timestamp { get; set; }
public double Value { get; set; }
}

Il DataSampler che produce i dati di telemetria per i test:

public class DataSampler : IDisposable
{
public event DataSamplerNewValueHandler OnNewValue;
PerformanceCounter cpuCounter;
PerformanceCounter sqlCounter;
public DataSampler()
{
cpuCounter = new PerformanceCounter(“Processor Information”, “% processor time”, “_Total”, true);
sqlCounter = new PerformanceCounter(“SQLServer:Databases”, “Transactions/sec”, “_Total”, true);
Task.Factory.StartNew(OnInnerWorkerTask, TaskCreationOptions.LongRunning);
}

private void OnInnerWorkerTask()
{
while (cpuCounter != null)
{
var time = DateTime.UtcNow;
var cpu = cpuCounter.NextValue();
var sql = sqlCounter.NextValue();
Task.Factory.StartNew(() => OnNewValue?.Invoke(cpuCounter.CounterName, time, cpu), TaskCreationOptions.DenyChildAttach | TaskCreationOptions.PreferFairness);
Task.Factory.StartNew(() => OnNewValue?.Invoke(sqlCounter.CounterName, time, sql), TaskCreationOptions.DenyChildAttach | TaskCreationOptions.PreferFairness);
Sleep(50);
}
}

public void Dispose()
{
var counter = cpuCounter;
cpuCounter = null;
counter.Dispose();
}
}

 

public delegate void DataSamplerNewValueHandler(string key, DateTime timestamp, double value);

Il Program.Main

class Program
{
static DataSampler dataSampler = new DataSampler();
static IList<IMessageSender> messageSenders = new List<IMessageSender>();
static IList<IMessageReceiver> messageReceivers = new List<IMessageReceiver>();

static void Main(string[] args)
{
//general cloud messaging optimization
ServicePointManager.UseNagleAlgorithm = false;
ServicePointManager.Expect100Continue = false;
ServicePointManager.DefaultConnectionLimit = 40;

//Task/ThreadPool optimization
ThreadPool.SetMinThreads(32, 32);
ThreadPool.SetMaxThreads(256, 256);

//register workers
RegisterWorker(new StorageQueueMessageWorker());

//attach to data sampler sampling event
dataSampler.OnNewValue += (k, t, v) =>
{
foreach (var sender in messageSenders)
sender.SendMessage(new MessageItem
{
Key = k,
Timestamp = t,
Value = v,
});
};

//register on senders callbacks
foreach (var sender in messageReceivers)
{
sender.OnReceiveMessage += (o, arg) =>
{
Console.WriteLine($”{(o as IMessageSender).Name} => {arg.Key}:{arg.Value:N1} ({(DateTime.UtcNow – arg.Timestamp).TotalSeconds:N0}s delay)”);
};
}

ReadLine();
messageSenders.Clear();
WriteLine(“Waiting for pending messages…”);
WriteLine(“RETURN to exit”);
ReadLine();
}

static void RegisterWorker(IMessageWorker arg)
{
if (arg is IMessageReceiver)
//all workers receive data
//this is useful to test scalability
messageReceivers.Add(arg as IMessageReceiver);

if (arg is IMessageSender)
//only one worker per type send data
if (!messageSenders.Any(x => x.GetType().Equals(arg.GetType())))
messageSenders.Add(arg as IMessageSender);
}
}

In futuro, dopo i vari post, metterò un link di download già pronto