Ciao a tutti
Dopo l’introduzione all’Azure Messaging, è l’ora di iniziare con gli esempi pratici di codice; ma per poterli fare bisogna preparare l’ambiente che useremo per i test che come base sarà una console application.
Come vedrete, non ho usato alcun framework di IoC o configurazione o pattern architetturale o di design nella soluzione, questo per migliorare la leggibilità del codice e la comprensione delle funzioni che in questo momento sono il cuore del nostro interesse, e cioè la messaggistica su Azure. Ovvio che in un uso reale in produzione, molte più accortezze sono da prendersi in considerazione:
Le interfacce di base
public interface IMessageWorker
{
string Name { get; }
}public interface IMessageSender : IMessageWorker
{
void SendMessage(MessageItem arg);
}public interface IMessageReceiver : IMessageWorker
{
event EventHandler<MessageItem> OnReceiveMessage;
}
Il messaggio utilizzato in tutte le code:
public sealed class MessageItem
{
public override string ToString()
{
return JsonConvert.SerializeObject(this);
}public static MessageItem Parse(string arg)
{
return JsonConvert.DeserializeObject<MessageItem>(arg);
}public string Key { get; set; }
public DateTime Timestamp { get; set; }
public double Value { get; set; }
}
Il DataSampler che produce i dati di telemetria per i test:
public class DataSampler : IDisposable
{
public event DataSamplerNewValueHandler OnNewValue;
PerformanceCounter cpuCounter;
PerformanceCounter sqlCounter;
public DataSampler()
{
cpuCounter = new PerformanceCounter(“Processor Information”, “% processor time”, “_Total”, true);
sqlCounter = new PerformanceCounter(“SQLServer:Databases”, “Transactions/sec”, “_Total”, true);
Task.Factory.StartNew(OnInnerWorkerTask, TaskCreationOptions.LongRunning);
}private void OnInnerWorkerTask()
{
while (cpuCounter != null)
{
var time = DateTime.UtcNow;
var cpu = cpuCounter.NextValue();
var sql = sqlCounter.NextValue();
Task.Factory.StartNew(() => OnNewValue?.Invoke(cpuCounter.CounterName, time, cpu), TaskCreationOptions.DenyChildAttach | TaskCreationOptions.PreferFairness);
Task.Factory.StartNew(() => OnNewValue?.Invoke(sqlCounter.CounterName, time, sql), TaskCreationOptions.DenyChildAttach | TaskCreationOptions.PreferFairness);
Sleep(50);
}
}public void Dispose()
{
var counter = cpuCounter;
cpuCounter = null;
counter.Dispose();
}
}
public delegate void DataSamplerNewValueHandler(string key, DateTime timestamp, double value);
Il Program.Main
class Program
{
static DataSampler dataSampler = new DataSampler();
static IList<IMessageSender> messageSenders = new List<IMessageSender>();
static IList<IMessageReceiver> messageReceivers = new List<IMessageReceiver>();static void Main(string[] args)
{
//general cloud messaging optimization
ServicePointManager.UseNagleAlgorithm = false;
ServicePointManager.Expect100Continue = false;
ServicePointManager.DefaultConnectionLimit = 40;//Task/ThreadPool optimization
ThreadPool.SetMinThreads(32, 32);
ThreadPool.SetMaxThreads(256, 256);//register workers
RegisterWorker(new StorageQueueMessageWorker());//attach to data sampler sampling event
dataSampler.OnNewValue += (k, t, v) =>
{
foreach (var sender in messageSenders)
sender.SendMessage(new MessageItem
{
Key = k,
Timestamp = t,
Value = v,
});
};//register on senders callbacks
foreach (var sender in messageReceivers)
{
sender.OnReceiveMessage += (o, arg) =>
{
Console.WriteLine($”{(o as IMessageSender).Name} => {arg.Key}:{arg.Value:N1} ({(DateTime.UtcNow – arg.Timestamp).TotalSeconds:N0}s delay)”);
};
}ReadLine();
messageSenders.Clear();
WriteLine(“Waiting for pending messages…”);
WriteLine(“RETURN to exit”);
ReadLine();
}static void RegisterWorker(IMessageWorker arg)
{
if (arg is IMessageReceiver)
//all workers receive data
//this is useful to test scalability
messageReceivers.Add(arg as IMessageReceiver);if (arg is IMessageSender)
//only one worker per type send data
if (!messageSenders.Any(x => x.GetType().Equals(arg.GetType())))
messageSenders.Add(arg as IMessageSender);
}
}
In futuro, dopo i vari post, metterò un link di download già pronto