{"id":1295,"date":"2023-05-13T08:09:48","date_gmt":"2023-05-13T06:09:48","guid":{"rendered":"https:\/\/www.rocworks.at\/wordpress\/?p=1295"},"modified":"2023-05-13T09:07:33","modified_gmt":"2023-05-13T07:07:33","slug":"scada-real-time-data-and-apache-spark","status":"publish","type":"post","link":"https:\/\/www.rocworks.at\/wordpress\/?p=1295","title":{"rendered":"SCADA Real Time Data and Apache SPARK"},"content":{"rendered":"\n<p>The <strong>integration of SCADA with Spark<\/strong> and WinCC Open Architecture offers a powerful and versatile solution that combines <strong>real-time data<\/strong> processing, <strong>advanced analytics<\/strong>, <strong>scalability<\/strong>, and flexibility. This combination empowers you to <strong>optimize industrial processes<\/strong>, make <strong>data-driven decisions<\/strong>, and stay ahead in a rapidly evolving technological landscape.<\/p>\n\n\n\n<p>By utilizing my 5-year-old project that implemented a native <a href=\"https:\/\/github.com\/vogler75\/oa4j\">Java manager for WinCC Open Architecture<\/a>, I have enabled the <strong>integration of SCADA with Spark <\/strong>for the current WinCC OA Version 3.19. <\/p>\n\n\n\n<p>Very simple example is to analyze tags and the corresponding amount of values in your SCADA system can provide valuable insights into the distribution and characteristics of the data.<\/p>\n\n\n\n<pre class=\"wp-block-code\"><code>res = spark.sql('SELECT tag, count(*) count FROM events GROUP BY tag ORDER by count(*) DESC')\ndata = res.toPandas()\nplt.figure( figsize = ( 10, 6 ) )\nsns.barplot( x=\"count\", y=\"tag\", data=data)\nplt.show()<\/code><\/pre>\n\n\n\n<figure class=\"wp-block-image size-large\"><a href=\"https:\/\/www.rocworks.at\/wordpress\/wp-content\/uploads\/2023\/05\/Bildschirmfoto-2023-05-13-um-08.04.17.png\"><img loading=\"lazy\" decoding=\"async\" width=\"1024\" height=\"505\" src=\"https:\/\/www.rocworks.at\/wordpress\/wp-content\/uploads\/2023\/05\/Bildschirmfoto-2023-05-13-um-08.04.17-1024x505.png\" alt=\"\" class=\"wp-image-1298\" srcset=\"https:\/\/www.rocworks.at\/wordpress\/wp-content\/uploads\/2023\/05\/Bildschirmfoto-2023-05-13-um-08.04.17-1024x505.png 1024w, https:\/\/www.rocworks.at\/wordpress\/wp-content\/uploads\/2023\/05\/Bildschirmfoto-2023-05-13-um-08.04.17-300x148.png 300w, https:\/\/www.rocworks.at\/wordpress\/wp-content\/uploads\/2023\/05\/Bildschirmfoto-2023-05-13-um-08.04.17-768x379.png 768w, https:\/\/www.rocworks.at\/wordpress\/wp-content\/uploads\/2023\/05\/Bildschirmfoto-2023-05-13-um-08.04.17.png 1069w\" sizes=\"auto, (max-width: 1024px) 100vw, 1024px\" \/><\/a><\/figure>\n\n\n\n<p>Another simple example is to calculate the moving average of 10 preceding and following values for a given data point in a time series, you can use a sliding window approach:<\/p>\n\n\n\n<pre class=\"wp-block-code\"><code>data = spark.sql(\"\"\"\nSELECT ROUND(value,2) as value, \n  AVG(value) OVER (PARTITION BY tag ORDER BY ts \n  ROWS BETWEEN 10 PRECEDING AND 10 FOLLOWING) avg \n  FROM events \n  WHERE tag = 'System1:ExampleDP_Trend2.'\n  ORDER BY ts DESC\n  LIMIT 100\n  \"\"\").toPandas()\ndata = data.reset_index().rename(columns={\"index\": \"nr\"})\nsns.lineplot(data=data, x='nr', y='value', label='Value')\nsns.lineplot(data=data, x='nr', y='avg', label='Average')\nplt.show()<\/code><\/pre>\n\n\n\n<figure class=\"wp-block-image size-full\"><a href=\"https:\/\/www.rocworks.at\/wordpress\/wp-content\/uploads\/2023\/05\/Bildschirmfoto-2023-05-13-um-08.06.29.png\"><img loading=\"lazy\" decoding=\"async\" width=\"873\" height=\"513\" src=\"https:\/\/www.rocworks.at\/wordpress\/wp-content\/uploads\/2023\/05\/Bildschirmfoto-2023-05-13-um-08.06.29.png\" alt=\"\" class=\"wp-image-1300\" srcset=\"https:\/\/www.rocworks.at\/wordpress\/wp-content\/uploads\/2023\/05\/Bildschirmfoto-2023-05-13-um-08.06.29.png 873w, https:\/\/www.rocworks.at\/wordpress\/wp-content\/uploads\/2023\/05\/Bildschirmfoto-2023-05-13-um-08.06.29-300x176.png 300w, https:\/\/www.rocworks.at\/wordpress\/wp-content\/uploads\/2023\/05\/Bildschirmfoto-2023-05-13-um-08.06.29-768x451.png 768w\" sizes=\"auto, (max-width: 873px) 100vw, 873px\" \/><\/a><\/figure>\n\n\n\n<p>By leveraging the <strong>distributed file system<\/strong>, you can take advantage of Spark&#8217;s parallel processing capabilities. The distributed file system ensures that the data frame is <strong>partitioned<\/strong> and <strong>distributed<\/strong> across the <strong>nodes<\/strong> of the Spark <strong>cluster<\/strong>, enabling simultaneous <strong>processing<\/strong> of data in <strong>parallel<\/strong>. This distributed approach enhances <strong>performance<\/strong> and <strong>scalability<\/strong>, allowing for <strong>efficient handling of large volumes of real-time SCADA data<\/strong>.<\/p>\n\n\n\n<p>I have achieved <strong>real-time data streaming<\/strong> from WinCC OA to a Spark cluster with a <a href=\"https:\/\/github.com\/vogler75\/oa4j-wss\">Websocket-Server<\/a> based on the Java manager. This streaming process involves <strong>continuously<\/strong> <strong>transferring<\/strong> SCADA <strong>real-time data<\/strong> from the WinCC OA system to the <strong>Spark <\/strong>cluster for further processing and analysis.<\/p>\n\n\n\n<pre class=\"wp-block-code\"><code>url='wss:\/\/192.168.1.190:8443\/winccoa?username=root&amp;password='\nws = create_connection(url, sslopt={\"cert_reqs\": ssl.CERT_NONE})\n\ndef read():\n    while True:\n        on_message(ws.recv())\nThread(target=read).start()\n\ncmd={'DpQueryConnect': {'Id': 1, 'Query':\"SELECT '_online.._value' FROM 'ExampleDP_*.'\", 'Answer': False}}\nws.send(json.dumps(cmd))<\/code><\/pre>\n\n\n\n<p>Once the data is received by the Spark cluster, I store it as a data frame on the distributed file system (DFS). A data frame is a distributed collection of data organized into named columns, similar to a table in a relational database. Storing the data frame on the distributed file system ensures data persistence and allows for efficient processing and retrieval.<\/p>\n\n\n\n<pre class=\"wp-block-code\"><code>schema = StructType(&#91;\n    StructField(\"ts\", TimestampType(), nullable=False),\n    StructField(\"tag\", StringType(), nullable=False),\n    StructField(\"value\", FloatType(), nullable=False)\n])\ndf = spark.createDataFrame(spark.sparkContext.emptyRDD(), schema)<\/code><\/pre>\n\n\n\n<pre class=\"wp-block-code\"><code>bulk = &#91;]\nlast = datetime.datetime.now()\ndef on_message(message):\n    global bulk, last, start\n    data = json.loads(message) \n            \n    if \"DpQueryConnectResult\" in data:\n        values = data&#91;\"DpQueryConnectResult\"]&#91;\"Values\"]\n        for tag, value in values:\n            #print(tag, value)\n            data = {\"ts\": datetime.datetime.now(), \"tag\": tag, \"value\": value}\n            bulk.append(data)\n            \n    now =datetime.datetime.now()\n    time = datetime.datetime.now() - last\n    if time.total_seconds() &gt; 10 or len(bulk) &gt;= 1000:\n        last = now\n\n        # Create a new DataFrame with the received data\n        new_df = spark.createDataFrame(bulk, schema)\n        \n        new_df.write \\\n            .format(\"csv\") \\\n            .option(\"header\", \"true\") \\\n            .mode(\"append\") \\\n            .save(\"events.csv\")\n        \n        bulk = &#91;]<\/code><\/pre>\n\n\n\n<p>Once the SCADA data is stored as a distributed data frame on the Spark cluster&#8217;s distributed file system, you can leverage Spark&#8217;s parallel processing capabilities to efficiently process the data in parallel.<\/p>\n\n\n\n<pre class=\"wp-block-code\"><code>df = spark.read \\\n    .format(\"csv\") \\\n    .option(\"header\", \"true\") \\\n    .option(\"timezone\", \"UTC\") \\\n    .schema(schema) \\\n    .load(\"events.csv\")\ndf.createOrReplaceTempView(\"events\")<\/code><\/pre>\n\n\n\n<p>By combining SCADA (Supervisory Control and Data Acquisition) with Spark&#8217;s powerful data processing capabilities, I have created a solution that can handle large volumes of real-time data efficiently. This enables faster and more accurate decision-making based on the insights derived from the processed data.<\/p>\n","protected":false},"excerpt":{"rendered":"<p>The integration of SCADA with Spark and WinCC Open Architecture offers a powerful and versatile solution that combines real-time data processing, advanced analytics, scalability, and flexibility. This combination empowers you to optimize industrial processes, make data-driven decisions, and stay ahead in a rapidly evolving technological landscape. By utilizing my 5-year-old project that implemented a native &hellip; <a href=\"https:\/\/www.rocworks.at\/wordpress\/?p=1295\" class=\"more-link\">Continue reading <span class=\"screen-reader-text\">SCADA Real Time Data and Apache SPARK<\/span> <span class=\"meta-nav\">&rarr;<\/span><\/a><\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"closed","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[1,29,3],"tags":[],"class_list":["post-1295","post","type-post","status-publish","format-standard","hentry","category-allgemein","category-bigdatanosql","category-wincc-oa"],"_links":{"self":[{"href":"https:\/\/www.rocworks.at\/wordpress\/index.php?rest_route=\/wp\/v2\/posts\/1295","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/www.rocworks.at\/wordpress\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/www.rocworks.at\/wordpress\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/www.rocworks.at\/wordpress\/index.php?rest_route=\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/www.rocworks.at\/wordpress\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=1295"}],"version-history":[{"count":7,"href":"https:\/\/www.rocworks.at\/wordpress\/index.php?rest_route=\/wp\/v2\/posts\/1295\/revisions"}],"predecessor-version":[{"id":1305,"href":"https:\/\/www.rocworks.at\/wordpress\/index.php?rest_route=\/wp\/v2\/posts\/1295\/revisions\/1305"}],"wp:attachment":[{"href":"https:\/\/www.rocworks.at\/wordpress\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=1295"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/www.rocworks.at\/wordpress\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=1295"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/www.rocworks.at\/wordpress\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=1295"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}