Show / Hide Table of Contents

Developer Tools

Developer Tools are currently being internally used and being industrialised. It helps to build an end to end IoTEdge solution.

CSV Reader

The CSV Reader sample uses the Edge SDK. Its purpose is to push data through the system so that applications which need to consume data from some source can be tested without having to directly connect them to hardware.

The application reads a CSV file and publishes all columns of the CSV file as IoPoints at 1Hz. It illustrates the usage of switch maps on the command line, configuration, and using the message bus to send data.

using static Agora.SDK;
using System.Diagnostics.CodeAnalysis;
using System.Text.RegularExpressions;
using System.Text;
using Agora.Edge.Messages;
using System.Reflection;

namespace CSVReader;

[ExcludeFromCodeCoverage]
public static class Program
{
    static Dictionary<string, string> switchMap = new ()
        {
        {"-n", "App:StartLine"},
        {"-p", "App:Period" },
        {"-v", "AEA2:LogLevel" },
        {"-d", "App:DeviceId" },
        {"-f", "App:Filename" }
        };

    static public void Main(string[] args)
    {
        SetCommandLineSwitchMap( switchMap );

        static void GetSetting(string name, int _default, out int val)
        {
            string? setting = Config[name];
            $"'{name}': {setting}".LogInfo();
            if (!int.TryParse(setting, out int v))
                $"Could not parse '{name}' setting.  Defaulting to {_default}".LogInfo();
            val = v;
        }
                
        if (args.Length == 0)
        {
            $"\nUsage: {Assembly.GetEntryAssembly()!.GetName().Name} [-n <StartLine>] [-p <Period_ms>] [-v <Logging Level>] [-d <Device Id>] <filename>\n".Cout();
            return;
        }

        string filename = args[args.Length-1];

        if (!File.Exists(filename))
        {
            $"Cannot find file <{filename}>.".LogError();
            return;
        }

        GetSetting("App:StartLine", 0, out int startLine);
        GetSetting("App:Period", 1000, out int period);
        GetSetting("App:FirstColumn", 1, out int firstColumn);
        GetSetting("App:LastColumn", int.MaxValue, out int lastColumn);

        FileStream fs = new(filename, FileMode.Open, FileAccess.Read, FileShare.ReadWrite);
        StreamReader sr = new(fs, Encoding.Default);
        int lineNumber = 0;

        List<string>? ReadLine()
        {
            lineNumber++;
            return SplitCSV(sr.ReadLine() ?? string.Empty);
        }

        if (fs == null || sr == null)
        {
            "Either the FileStream or StreamReader are null.".LogFatal();
            return;
        }

        if (period > 10000 || period < 100)
        {
            "The period must be between 100 and 10000 ms.".LogFatal();
            return;
        }

        Bus.Connect(10000);

        if (!Bus.IsConnected)
        {
            "Could not connect to Bus. Cannot proceed.".LogFatal();
            return;
        }

        if (fs != null)
            fs.Position = 0;

        // Read header
        var header = ReadLine();
        if (header == null)
        {
            "First line of csv file cannot be read.".LogError();
            return;
        }

        int i = -1;
        while (!sr.EndOfStream && i++ < startLine)
            ReadLine();

        while (!sr.EndOfStream)
        {
            var data = ReadLine();
            if (data == null) continue;

            IoDataReportMsg msg = new();

            long ts = (long)AgoraTimeStamp();

            if (data.Count != header.Count)
                $"CSV File - Line {lineNumber}: Data Column Count ({data.Count}) != Header Column Count ({header.Count}) - Skipping".LogError();
            else
            {
                int iEnd = Math.Min(header.Count, lastColumn);
                for (i = firstColumn; i < iEnd; i++)
                {
                    if (double.TryParse(data[i], out double v))
                        msg.Add(header[i], new()
                        {
                            quality_code = 0,
                            value = v,
                            timestamp = ts
                        });
                    else
                        $"CSV File - Line|Column {lineNumber}|{i}: Could not parse '{data[i]}' to double.".LogError();
                }
                Bus.SendData(msg);
            }

            Thread.Sleep(period);
        }
    }

    static readonly Regex csvSplit = new("(?:^|,)(\"(?:[^\"]+|\"\")*\"|[^,]*)", RegexOptions.Compiled);
    private static List<string> SplitCSV(string input)
    {
        List<string> list = new();

        foreach (Match match in csvSplit.Matches(input))
        {
            string curr = match.Value.Trim(new char[] { '"', '\\', ',' });
            list.Add(0 == curr.Length ? string.Empty : curr);
        }

        return list;
    }
}
Important

Data is sent from the CSV Reader as 'DataOut'. If you are using a non-routing broker (not the AEA-Broker), you will need to override the default "DataOut" topic passed to Bus.SendData and use "DataIn" instead:

Bus.SendData(msg, "DataIn");

CSV Writer

Sometimes it is useful to be able to write the output specific variables to a file that can be monitored. The CSV Writer listens for messages and writes tags, as configured, into a csv file in the /var folder.

using static Agora.SDK;
using System.Diagnostics.CodeAnalysis;
using System.Text;

namespace CSVWriter;

public static class Program
{
    readonly static Dictionary<string, string> TagValue = new();
    readonly static List<string> Tags = new();
    static StreamWriter? outfile;
    readonly static System.Timers.Timer timer = new();

    static void Main()
    {
        "Starting".LogHeading();

        var itemArray = Config.GetSection("AEA2:CSVWriter:Points").GetChildren();
        string? CSVFileName = Config["AEA2:CSVWriter:CSVFile"];
        string? strPeriod = Config["AEA2:CSVWriter:Period"];
        int Period = int.MaxValue;

        bool ok = true;
        if (itemArray == null || !itemArray.Any())
        {
            "Configuration setting `AEA2:CSVWriter:Points` is null or missing.  Should contain array of strings for columns of CSV File.".LogError();
            ok = false;
        }

        if (string.IsNullOrEmpty(CSVFileName))
        {
            "Configuration setting 'AEA2:CSVWriter:CSVFile' is missing".LogError();
            ok = false;
        }

        if (!int.TryParse(strPeriod, out Period) || Period < 500)
        {
            "Configuration setting 'AEA2:CSVWriter:Period' is missing, cannot be parsed, or less than 500ms.".LogError();
            ok = false;
        }

        if (ok)
        {
            Bus.Connect(30000);

            using (outfile = new StreamWriter(CSVFileName!))
            {
                StringBuilder sb = new();
                "Columns will include: ".LogInfo();

                bool first = true;
                foreach (var i in itemArray!
                    .Where(item => item != null && !string.IsNullOrEmpty(item.Value)))
                {
                    Tags.Add(i.Value!);
                    if (first)
                    {
                        sb.Append(i.Value);
                        first = false;
                    }
                    else
                        sb.Append(',').Append(i.Value);
                }

                sb.ToString().LogInfo();
                outfile?.WriteLine(sb.ToString());
                outfile?.Flush();

                // Wait for Bus Connection
                if (!Bus.IsConnected)
                {
                    "Waiting on connection to bus...".LogInfo();
                    while (!Bus.IsConnected)
                        Thread.Sleep(1000);
                    "Connected.".LogInfo();
                }

                Bus.Connect();
                timer.AutoReset = false;
                timer.Interval = Period;
                timer.Elapsed += Time_Elapsed;
                timer.Start();

                while (true)
                    Thread.Sleep(10000);
            }
        }
    }

    private static void Time_Elapsed(object? sender, System.Timers.ElapsedEventArgs e)
    {
        if (!Bus.IsConnected)
        {
            timer.Start();
            return;
        }

        TagValue["AGORA_TIME"] = Agora.Utilities.Time.AgoraTimeStamp().ToString();
        if (Bus.Messages.HasDataInMessages)
        {
            StringBuilder sb = new();
            foreach (var m in Bus.Messages.GetDataInMessages())
                foreach (var device in m.device)
                    foreach (var tag in device.tags)
                    {
                        TagValue[tag.Key] = tag.Value.value.ToString() ?? string.Empty;
                        sb.Append(tag.Key).Append(" = ").Append(tag.Value.value)
                            .Append(Environment.NewLine);
                    }

            sb.ToString().Cout();
            sb.Clear();

            bool first = true;
            foreach (var t in Tags)
            {
                TagValue.TryGetValue(t, out string? V);
                if (V == null) V = string.Empty;

                if (first)
                {
                    sb.Append(V);
                    first = false;
                }
                else
                    sb.Append(',').Append(V);
            }

            outfile?.WriteLine(sb.ToString());
            outfile?.Flush();
        }
        else
            "No Message".LogDebug();

        timer.Start();
    }
}

Configuration

Configure the application by modifying the following settings in the /var/config/AEA.json file, key-per-files, using environment variables, or command line parameters.

Configurable parameters include:

  • AEA2:CSVWriter:Points - array of tags (as strings) in the order to be written to the output file
  • AEA2:CSVWriter:CSVFile - the filename to be written to. This is an absolute path.

If AEA2:CSVWriter:Points contains "AGORA_TIME", the current AgoraTimeStamp will be output.

Example:

{
  "Name": "CSVWriter",
  "AEA2": {
    "CSVWriter": {
      "Points": [
        "AGORA_TIME",
        "BPOS",
        "DBTM",
        "DMEA",
        "FLWI",
        "HKLD",
        "CRPM",
        "SPPA",
        "STOR",
        "RIG_STATE"
      ],
      "CSVFile": "/var/out.csv",
      "Period": 1000
    },    
    "BusClient": {
      "UseDataIn": true
    }
  }
}
  • Edit this page
In this article
Back to top Copyright SLB, Published Work. All rights reserved.