Developer Tools
Developer Tools are currently being internally used and being industrialised. It helps to build an end to end IoTEdge solution.
CSV Reader
The CSV Reader sample uses the Edge SDK. Its purpose is to push data through the system so that applications which need to consume data from some source can be tested without having to directly connect them to hardware.
The application reads a CSV file and publishes all columns of the CSV file as IoPoints at 1Hz. It illustrates the usage of switch maps on the command line, configuration, and using the message bus to send data.
using static Agora.SDK;
using System.Diagnostics.CodeAnalysis;
using System.Text.RegularExpressions;
using System.Text;
using Agora.Edge.Messages;
using System.Reflection;
namespace CSVReader;
[ExcludeFromCodeCoverage]
public static class Program
{
static Dictionary<string, string> switchMap = new ()
{
{"-n", "App:StartLine"},
{"-p", "App:Period" },
{"-v", "AEA2:LogLevel" },
{"-d", "App:DeviceId" },
{"-f", "App:Filename" }
};
static public void Main(string[] args)
{
SetCommandLineSwitchMap( switchMap );
static void GetSetting(string name, int _default, out int val)
{
string? setting = Config[name];
$"'{name}': {setting}".LogInfo();
if (!int.TryParse(setting, out int v))
$"Could not parse '{name}' setting. Defaulting to {_default}".LogInfo();
val = v;
}
if (args.Length == 0)
{
$"\nUsage: {Assembly.GetEntryAssembly()!.GetName().Name} [-n <StartLine>] [-p <Period_ms>] [-v <Logging Level>] [-d <Device Id>] <filename>\n".Cout();
return;
}
string filename = args[args.Length-1];
if (!File.Exists(filename))
{
$"Cannot find file <{filename}>.".LogError();
return;
}
GetSetting("App:StartLine", 0, out int startLine);
GetSetting("App:Period", 1000, out int period);
GetSetting("App:FirstColumn", 1, out int firstColumn);
GetSetting("App:LastColumn", int.MaxValue, out int lastColumn);
FileStream fs = new(filename, FileMode.Open, FileAccess.Read, FileShare.ReadWrite);
StreamReader sr = new(fs, Encoding.Default);
int lineNumber = 0;
List<string>? ReadLine()
{
lineNumber++;
return SplitCSV(sr.ReadLine() ?? string.Empty);
}
if (fs == null || sr == null)
{
"Either the FileStream or StreamReader are null.".LogFatal();
return;
}
if (period > 10000 || period < 100)
{
"The period must be between 100 and 10000 ms.".LogFatal();
return;
}
Bus.Connect(10000);
if (!Bus.IsConnected)
{
"Could not connect to Bus. Cannot proceed.".LogFatal();
return;
}
if (fs != null)
fs.Position = 0;
// Read header
var header = ReadLine();
if (header == null)
{
"First line of csv file cannot be read.".LogError();
return;
}
int i = -1;
while (!sr.EndOfStream && i++ < startLine)
ReadLine();
while (!sr.EndOfStream)
{
var data = ReadLine();
if (data == null) continue;
IoDataReportMsg msg = new();
long ts = (long)AgoraTimeStamp();
if (data.Count != header.Count)
$"CSV File - Line {lineNumber}: Data Column Count ({data.Count}) != Header Column Count ({header.Count}) - Skipping".LogError();
else
{
int iEnd = Math.Min(header.Count, lastColumn);
for (i = firstColumn; i < iEnd; i++)
{
if (double.TryParse(data[i], out double v))
msg.Add(header[i], new()
{
quality_code = 0,
value = v,
timestamp = ts
});
else
$"CSV File - Line|Column {lineNumber}|{i}: Could not parse '{data[i]}' to double.".LogError();
}
Bus.SendData(msg);
}
Thread.Sleep(period);
}
}
static readonly Regex csvSplit = new("(?:^|,)(\"(?:[^\"]+|\"\")*\"|[^,]*)", RegexOptions.Compiled);
private static List<string> SplitCSV(string input)
{
List<string> list = new();
foreach (Match match in csvSplit.Matches(input))
{
string curr = match.Value.Trim(new char[] { '"', '\\', ',' });
list.Add(0 == curr.Length ? string.Empty : curr);
}
return list;
}
}
Important
Data is sent from the CSV Reader as 'DataOut'. If you are using a non-routing broker (not the AEA-Broker), you will need to override the default "DataOut" topic passed to Bus.SendData and use "DataIn" instead:
Bus.SendData(msg, "DataIn");
CSV Writer
Sometimes it is useful to be able to write the output specific variables to a file that can be monitored. The CSV Writer listens for messages and writes tags, as configured, into a csv file in the /var
folder.
using static Agora.SDK;
using System.Diagnostics.CodeAnalysis;
using System.Text;
namespace CSVWriter;
public static class Program
{
readonly static Dictionary<string, string> TagValue = new();
readonly static List<string> Tags = new();
static StreamWriter? outfile;
readonly static System.Timers.Timer timer = new();
static void Main()
{
"Starting".LogHeading();
var itemArray = Config.GetSection("AEA2:CSVWriter:Points").GetChildren();
string? CSVFileName = Config["AEA2:CSVWriter:CSVFile"];
string? strPeriod = Config["AEA2:CSVWriter:Period"];
int Period = int.MaxValue;
bool ok = true;
if (itemArray == null || !itemArray.Any())
{
"Configuration setting `AEA2:CSVWriter:Points` is null or missing. Should contain array of strings for columns of CSV File.".LogError();
ok = false;
}
if (string.IsNullOrEmpty(CSVFileName))
{
"Configuration setting 'AEA2:CSVWriter:CSVFile' is missing".LogError();
ok = false;
}
if (!int.TryParse(strPeriod, out Period) || Period < 500)
{
"Configuration setting 'AEA2:CSVWriter:Period' is missing, cannot be parsed, or less than 500ms.".LogError();
ok = false;
}
if (ok)
{
Bus.Connect(30000);
using (outfile = new StreamWriter(CSVFileName!))
{
StringBuilder sb = new();
"Columns will include: ".LogInfo();
bool first = true;
foreach (var i in itemArray!
.Where(item => item != null && !string.IsNullOrEmpty(item.Value)))
{
Tags.Add(i.Value!);
if (first)
{
sb.Append(i.Value);
first = false;
}
else
sb.Append(',').Append(i.Value);
}
sb.ToString().LogInfo();
outfile?.WriteLine(sb.ToString());
outfile?.Flush();
// Wait for Bus Connection
if (!Bus.IsConnected)
{
"Waiting on connection to bus...".LogInfo();
while (!Bus.IsConnected)
Thread.Sleep(1000);
"Connected.".LogInfo();
}
Bus.Connect();
timer.AutoReset = false;
timer.Interval = Period;
timer.Elapsed += Time_Elapsed;
timer.Start();
while (true)
Thread.Sleep(10000);
}
}
}
private static void Time_Elapsed(object? sender, System.Timers.ElapsedEventArgs e)
{
if (!Bus.IsConnected)
{
timer.Start();
return;
}
TagValue["AGORA_TIME"] = Agora.Utilities.Time.AgoraTimeStamp().ToString();
if (Bus.Messages.HasDataInMessages)
{
StringBuilder sb = new();
foreach (var m in Bus.Messages.GetDataInMessages())
foreach (var device in m.device)
foreach (var tag in device.tags)
{
TagValue[tag.Key] = tag.Value.value.ToString() ?? string.Empty;
sb.Append(tag.Key).Append(" = ").Append(tag.Value.value)
.Append(Environment.NewLine);
}
sb.ToString().Cout();
sb.Clear();
bool first = true;
foreach (var t in Tags)
{
TagValue.TryGetValue(t, out string? V);
if (V == null) V = string.Empty;
if (first)
{
sb.Append(V);
first = false;
}
else
sb.Append(',').Append(V);
}
outfile?.WriteLine(sb.ToString());
outfile?.Flush();
}
else
"No Message".LogDebug();
timer.Start();
}
}
Configuration
Configure the application by modifying the following settings in the /var/config/AEA.json
file, key-per-files, using environment variables, or command line parameters.
Configurable parameters include:
AEA2:CSVWriter:Points
- array of tags (as strings) in the order to be written to the output fileAEA2:CSVWriter:CSVFile
- the filename to be written to. This is an absolute path.
If AEA2:CSVWriter:Points
contains "AGORA_TIME", the current AgoraTimeStamp will be output.
Example:
{
"Name": "CSVWriter",
"AEA2": {
"CSVWriter": {
"Points": [
"AGORA_TIME",
"BPOS",
"DBTM",
"DMEA",
"FLWI",
"HKLD",
"CRPM",
"SPPA",
"STOR",
"RIG_STATE"
],
"CSVFile": "/var/out.csv",
"Period": 1000
},
"BusClient": {
"UseDataIn": true
}
}
}