Connected Python to WinCC OA through a Websocket Manager. Python programs can connect to WinCC OA and read/write datapoints. Communication is JSON based, it’s simple to use in Python, see examples below (ws://rocworks.no-ip.org can be used for tests, but will not be available all the time).
https://github.com/vogler75/oa4j-wss
- dpGet
- dpSet
- dpConnect
- dpQueryConnect
- dpGetPeriod
- … more functions will be implemented
Required Python modules:
- pip3 install websocket-client
- pip3 install matplotlib
############################################################ # Open Connection ############################################################ import json import ssl from websocket import create_connection url='ws://rocworks.no-ip.org/winccoa?username=demo&password=demo' ws = create_connection(url, sslopt={"cert_reqs": ssl.CERT_NONE}) ############################################################ # dpGetPeriod ############################################################ cmd={'DpGetPeriod': { 'Dps':['ExampleDP_Trend1.'], 'T1': '2018-02-07T18:10:00.000', 'T2': '2018-02-07T23:59:59.999', 'Count': 0, # Optional (Default=0) 'Ts': 0 # Optional (0...no ts in result, 1...ts as ms since epoch, 2...ts as ISO8601) }} ws.send(json.dumps(cmd)) res=json.loads(ws.recv()) #print(res) if "System1:ExampleDP_Trend1.:_offline.._value" in res["DpGetPeriodResult"]["Values"]: values=res["DpGetPeriodResult"]["Values"]["System1:ExampleDP_Trend1.:_offline.._value"] print(values) else: print("no data found") # Plot result of dpGetPeriod %matplotlib inline import matplotlib.pyplot as plt plt.plot(values) plt.ylabel('ExampleDP_Trend1.') plt.show() ############################################################ # dpGet ############################################################ cmd={'DpGet': {'Dps':['ExampleDP_Trend1.', 'ExampleDP_Trend2.']}} ws.send(json.dumps(cmd)) res=json.loads(ws.recv()) print(json.dumps(res, indent=4, sort_keys=True)) ############################################################ # dpSet ############################################################ from random import randint cmd={'DpSet': {'Wait': True, 'Values':[{'Dp':'ExampleDP_Trend1.','Value': randint(0, 9)}, {'Dp':'ExampleDP_Trend2.','Value': randint(0, 9)}]}} ws.send(json.dumps(cmd)) res=json.loads(ws.recv()) print(json.dumps(res, indent=4, sort_keys=True)) ############################################################ # dpConnect ############################################################ from threading import Thread def read(): while True: res=json.loads(ws.recv()) print(res) Thread(target=read).start() cmd={"DpConnect": {"Id": 1, "Dps": ["ExampleDP_Trend1."]}} ws.send(json.dumps(cmd))