Write a Custom MQTT Decoder

Three MQTT parsers are included with VTScada: JSONDec, XMLDec, and MQTTPassThruParser.

You specify which to use by naming it in the I/O address for the topic to be read. See: MQTT Client Addressing

 

If you require custom parsing, you can write your own decoder. As with any VTScada module, the code must be declared in the AppRoot.SRC file and file changes imported.

 

As a template for you to use, the code for MQTTPassThruParser.SRC is provided here. Note that it runs as a subroutine. (

{========================== MQTTPassThruParser =============================}
{ Returns the payload as is without any parsing. Useful for examining the   }
{ full payload when used with a StringIO tag without any indexing.          }
{===========================================================================}
(
  Payload;  { required parameter }
)
[
]
Only [
  If Watch(1);
  [
    Return(Cast(Payload, \#VTypeText));
  ]
]
{ End of MQTTPassThruParser }

Payload is the value read from the topic address. Presumably, you will know the structure of the data you intend to read before attempting to write a custom parser. Within the body of the If script block, you are free to manipulate it as needed before returning it. There is no requirement that the data be returned as type text.

 

A generalized example of a parser follows:

{=========================== MQTTExampleParser =============================}
{ Returns a processed value from the MQTT data                              }
{===========================================================================}
(
  Data  { required parameter };
)
[
  ReturnVal  { processed value };
  { Other local variables as needed };
]
Only [
  If Watch(1);
  [
    { ReturnVal = some set of steps to process the data }
    Return(ReturnVal);  { mandatory statement }
  ]
]
{ End of MQTTExampleParser }

 

If the payload carries a timestamp and we want VTScada to use this timestamp then the decoder must return a structure declared using exactly the following format:

SomeTimeValue Struct [
                       Value;
                       TimeStamp;
                     ];

Example using a timestamp:

{========================== DataParserX ======================================}
{ Returns a final value. No indexing needed since indexing is done here.      }
{ It expects that the payload is 13-byte: 4-byte for a single precision float,}
{ 1-byte for a char, 8-byte for an epoch timestamp.                           }
{ Returns a structure [Value Timestamp] that can be interpreted by VTSRead    }
{=============================================================================}
(
  Data;
)
[
  X; C; TS;
  Measurement STRUCT [
                Value;
                TimeStamp;
              ];
  M;
]

Only [
  If Watch(1);
  [
    BuffRead(Data, 0, "%c%3b%5b", C, X, TS); { "%3b" is for IEEE single precision float (4 bytes)  }
    M = Measurement();
    M\Value     = X;
    M\TimeStamp = TS;
    Return(M);
  ]
]

 

Write example using a custom encoder:

For custom encoders there are 2 parameters that are passed in - Value and MemAddr. The driver supports writing single values only so the encoder must pass in the value and the address to write and allow the end user to specify how to format the message based on those 2 pieces of information. For example, you could set a specific dictionary entry to be a value and then format that dictionary to meet the needs of the protocol you are trying to work with..

Note: The terms Value and Metric are SparkplugB-specific, where data is stored as Metrics within a Topic, and each metric has a value.

{============================ DataEncoder1 =================================}
{ Returns a final value. No indexing needed since indexing is done here.    }
{===========================================================================}
(
  Data          { A value to be write.                                     };
  MemAddr       { Metric in SparkPlug B.Not used in this example           };

  { The terms Value and Metric are SparkplugB specific where data is stored }
  {   as Metrics within a Topic, and each metric has a value.               }
)
[
  X;
  JsonDict;
]

Only [
  If Watch(1);
  [
    JsonDict = Dictionary();
    JsonDict["V1"] = 51.5; { static value for demonstration purposes only }
    JsonDict["K1"] = Dictionary();
    JsonDict["K1"]["K1_1"] = Dictionary();
    JsonDict["K1"]["K1_1"]["K1_1_1"] = 3;
    JsonDict["K1"]["K1_1"]["K1_1_2"] = "v1_1_2_49";
    JsonDict["K1"]["K1_1"]["K1_1_3"] = New(3); { More demonstration-only values }
    JsonDict["K1"]["K1_1"]["K1_1_3"][0] = 13.50;
    JsonDict["K1"]["K1_1"]["K1_1_3"][1] = 22.88;
    JsonDict["K1"]["K1_1"]["K1_1_3"][2] = 36.09;
    JsonDict["K2"] = Data;
    JsonDict["Timestamp"] = CurrentTime(1); { Now }

    X = JSonEncode(JsonDict);

    Return(X);
  ]
]

This module is declared in AppRoot as:

    { Custom Encoders for MQTT data. }
  DataEncoder1        Module "DataEncoder1.SRC";

Finally, the Write address for the I/O tag is defined as:

VTSDemo/MyTopic+DataEncoder1++1

Where VTSDemo is the name used by the broker.