Commit ab410e63 authored by Dennis Neidherr's avatar Dennis Neidherr 😲
Browse files

Added mqtt2opc subscription file

parent 5be5993c
*.log
\ No newline at end of file
......@@ -4,7 +4,7 @@ save_data_on_exit = True
save_history = True
save_non_project_files = False
project_type = 'empty-project-type'
recent_files = ['opc2mqtt.py']
recent_files = ['opc2mqtt.py', 'mqtt2opc.py', 'MQTT_Examples\\MQTT-Queue-Template.py', 'OPC_Examples\\OPC_ReadWriteExample.py', 'OPC_Examples\\OPC_BrowseExample.py']
[main]
version = 0.2.0
......
......@@ -4,7 +4,7 @@ save_data_on_exit = True
save_history = True
save_non_project_files = False
project_type = 'empty-project-type'
recent_files = ['opc2mqtt.py']
recent_files = ['opc2mqtt.py', 'mqtt2opc.py', 'OPC_Examples\\OPC_ReadWriteExample.py', 'MQTT_Examples\\MQTT-Queue-Template.py', 'OPC_Examples\\OPC_BrowseExample.py']
[main]
version = 0.2.0
......
# -*- coding: utf-8 -*-
"""
This is an interface between OPC UA and MQTT. It subscribes to a list of
MQTT topics and publish them to OPC nodes.
Copyright 2022 GSI Helmholtzzentrum für Schwerionenforschung GmbH
D. Neidherr, EEL, Planckstraße 1, 64291 Darmstadt, Germany
eMail: D.Neidherr@gsi.de
Web: https://www.gsi.de/work/forschung/experimentelektronik/kontrollsysteme
Published under EUPL.
"""
import logging
import platform
# import os
import re
import sys
import datetime
import paho.mqtt.client as mqtt
import asyncio
from asyncua import Client, ua
import nest_asyncio
nest_asyncio.apply()
q = asyncio.Queue()
logging.basicConfig(
filename="mqtt2opc.log",
level=logging.WARNING,
style="{",
format="{asctime} [{levelname:8}] {message}",
datefmt="%d.%m.%Y %H:%M:%S"
)
hostname = platform.node()
client_name = hostname + "_" + "opc2mqtt" # re.split(".py", os.path.basename(__file__))[0]
# client_name = hostname + "_" + re.split(".py", os.path.basename(__file__))[0]
date_time = re.split(" ", str(datetime.datetime.now()))
client_id = client_name # + "_" + date_time[0] + "_" + date_time[1]
def on_mqtt_connect(client, userdata, flags, rc):
"""
Handle MQTT broker connected callback.
Publish all topic once.
Subscribe to desired topics.
"""
if rc == 0:
client.connected_flag = True
logging.info("Connection OK")
else:
logging.error("Bad connection, returned code =", rc)
def on_mqtt_disconnect(client, userdata, flags, rc=0):
"""Handle MQTT broker disconnected callback."""
# client.connected_flag = False
logging.info("Disconnected result code =" + str(rc))
def on_mqtt_subscribe(client, userdata, mid, granted_qos):
"""Handle subscribed callback."""
logging.info("Client subscribed message ID =", mid, "with qos =", granted_qos)
def on_mqtt_message(client, userdata, msg):
"""
Handle MQTT message received callback.
Decode received MQTT message data and insert into command processor queue.
"""
m_decode = str(msg.payload.decode("utf-8", "ignore"))
logging.info("Message received. Topic:", msg.topic, "Payload:", m_decode)
q.put_nowait((msg.topic, m_decode))
class Mqtt2OPC():
def __init__(self):
self._connected = False
self._opcnodes = []
self._mqtt_topics = []
self._opcsubscribed = False
self._background_tasks = set()
async def _mqtt_command_processor(self):
"""
Implement command processing here.
Get decoded message from queue and process.
Example: Handle topic 'Command'.
"""
print("cmd Processor running")
while True:
received = await q.get()
topic = received[0]
data = received[1]
logging.info("Command processing:", topic)
if topic in self._mqtt_topics:
datatype = await self._opcclient.get_node(self._opcnodes[self._mqtt_topics.index(topic)]).read_data_type_as_variant_type()
print(datatype)
print(type(datatype))
if datatype is ua.VariantType.String:
await self._opcnodes[self._mqtt_topics.index(topic)].set_value(ua.DataValue(ua.Variant(data, ua.VariantType.String)))
elif datatype is ua.VariantType.Float:
await self._opcnodes[self._mqtt_topics.index(topic)].set_value(ua.DataValue(ua.Variant(float(data), ua.VariantType.Float)))
elif datatype is ua.VariantType.Double:
await self._opcnodes[self._mqtt_topics.index(topic)].set_value(ua.DataValue(ua.Variant(float(data), ua.VariantType.Double)))
else:
print(type(datatype))
else:
logging.error("Received unkown:", topic)
logging.info("Command processing done.")
q.task_done()
async def open_connection(self, OPCUrl, MQTTBroker="localhost"):
"""
Opens the connection to the OPC server as well as the MQTT broker.
Parameters
----------
OPCUrl : String
URL to the OPC server. The format is 'opc.tcp://xxx:48010/', where 48010 is the default opc port.
MQTTBroker : String
URL to the MQTT broker.
Returns
-------
None.
"""
if not self._connected:
self._mqttbroker = MQTTBroker
self._opcclient = Client(url=OPCUrl, timeout=4)
await self._opcclient.connect()
self._mqttclient = mqtt.Client(client_id, clean_session=True)
self._mqttclient.connected_flag = False
self._mqttclient.will_set(client_id, "Offline", 1, False)
self._mqttclient.on_connect = on_mqtt_connect
self._mqttclient.on_disconnect = on_mqtt_disconnect
self._mqttclient.on_subscribe = on_mqtt_subscribe
self._mqttclient.on_message = on_mqtt_message
self._mqttclient.loop_start()
self._mqttclient.connect(self._mqttbroker, port=1883, keepalive=60, bind_address="")
while not self._mqttclient.connected_flag:
logging.info("Waiting for", self._mqttbroker, "...")
await asyncio.sleep(1)
self._subscriptiontask = asyncio.create_task(self._mqtt_command_processor())
self._background_tasks.add(self._subscriptiontask)
self._connected = True
async def subscribe(self, NodeIDs):
"""
Subscribes to MQTT topics and updates OPC. This function can be executed
mulitple times and just adds NodeIDs.
Parameters
----------
Nodes : OPCNodeID
List with OPCNodeIDs to subscribe. The NodeIDs are strings of the form:
'ns=Y;s=zzz', where Y is the OPCNameSpace number and zzz the OPCIdentifier.
Returns
-------
None.
"""
for eachNodeID in NodeIDs:
opcnode = self._opcclient.get_node(eachNodeID)
opcpathlist = await opcnode.get_path(as_string=(True))
opcpath = client_id
del opcpathlist[0]
for each in opcpathlist:
opcpath += "/" + re.split(":", each)[1]
rc, mid = self._mqttclient.subscribe(opcpath)
logging.info("Subscribing to: '", eachNodeID, "'returned rc =", rc, "mid = ", mid)
self._opcnodes.append(self._opcclient.get_node(eachNodeID))
self._mqtt_topics.append(opcpath)
async def unsubscribe(self, NodeIDs=[]):
"""
Cancel existing MQTT subscriptions.If the NodeIDs list is kept empty
this function cancels all subscriptions.
Parameters
----------
Nodes : OPCNodeID
List with OPCNodeIDs to unsubscribe. The NodeIDs are strings of the form:
'ns=Y;s=zzz', where Y is the OPCNameSpace number and zzz the OPCIdentifier.
The default is [].
Returns
-------
None.
"""
if NodeIDs == []:
for eachMqttTopic in self._mqtt_topics:
rc, mid = self._mqttclient.unsubscribe(eachMqttTopic)
logging.info("Unsubscribing to: '", eachMqttTopic, "'returned rc =", rc, "mid = ", mid)
self._opcnodes = []
self._mqtt_topics = []
else:
for eachNodeID in NodeIDs:
opcnode = self._opcclient.get_node(eachNodeID)
opcpathlist = await opcnode.get_path(as_string=(True))
opcpath = client_id
del opcpathlist[0]
for each in opcpathlist:
opcpath += "/" + re.split(":", each)[1]
rc, mid = self._mqttclient.unsubscribe(opcpath)
logging.info("Unsubscribing to: '", eachNodeID, "'returned rc =", rc, "mid = ", mid)
self._opcnodes.remove(self._opcclient.get_node(eachNodeID))
self._mqtt_topics.remove(opcpath)
async def close_connection(self):
"""
Closes the connection to the OPC server and the MQTT broker.
Returns
-------
None.
"""
self._subscriptiontask.cancel()
self._subscriptiontask.add_done_callback(self._background_tasks.discard)
await self._opcclient.disconnect()
logging.info("Publishing: 'disconnected'")
rc, mid = self._mqttclient.publish(client_id, "disconnected")
logging.info("Publishing: 'disconnected' returned rc =", rc, "mid = ", mid)
logging.info("Disonnecting from broker", self._mqttbroker)
self._mqttclient.disconnect()
await asyncio.sleep(1)
logging.info("Stopping message loop")
self._mqttclient.loop_stop(force=False)
self._connected = False
async def mqtt2opc():
try:
Mqtt2OPCIntstance = Mqtt2OPC()
await Mqtt2OPCIntstance.open_connection('opc.tcp://DLPC104:48010/', "localhost")
await Mqtt2OPCIntstance.subscribe(["ns=2;s=Demo.Static.Scalar.String", "ns=2;s=Demo.Static.Scalar.Int32", "ns=2;s=Demo.Static.Scalar.Float", "ns=2;s=Demo.Static.Scalar.Double"])
# await Mqtt2OPCIntstance.unsubscribe(["ns=2;s=Demo.Static.Scalar.String"])
# await Mqtt2OPCIntstance.unsubscribe()
# await Mqtt2OPCIntstance.subscribe(["ns=2;s=Demo.Static.Scalar.String"])
while Mqtt2OPCIntstance._mqttclient.connected_flag:
await asyncio.sleep(1)
except BaseException as e:
logging.error("Exception catched!", e)
Mqtt2OPCIntstance._mqttclient.connected_flag = False
finally:
await Mqtt2OPCIntstance.unsubscribe()
await Mqtt2OPCIntstance.close_connection()
logging.info("End")
logging.shutdown()
sys.exit()
async def main():
await mqtt2opc()
if __name__ == "__main__":
asyncio.run(main())
09.08.2022 07:46:32 [ERROR ] Task exception was never retrieved
future: <Task finished name='Task-196' coro=<main() done, defined at d:\git-python\opcua_mqtt_interface\fromopc2mqtt_example.py:270> exception=SystemExit()>
Traceback (most recent call last):
File "C:\WINDOWS\system32\spyder-env\lib\site-packages\nest_asyncio.py", line 35, in run
return loop.run_until_complete(task)
File "C:\WINDOWS\system32\spyder-env\lib\site-packages\nest_asyncio.py", line 83, in run_until_complete
self._run_once()
File "C:\WINDOWS\system32\spyder-env\lib\site-packages\nest_asyncio.py", line 106, in _run_once
event_list = self._selector.select(timeout)
File "C:\Program Files\Python310\lib\selectors.py", line 324, in select
r, w, _ = self._select(self._readers, self._writers, [], timeout)
File "C:\Program Files\Python310\lib\selectors.py", line 315, in _select
r, w, x = select.select(r, w, w, timeout)
KeyboardInterrupt
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "C:\WINDOWS\system32\spyder-env\lib\site-packages\spyder_kernels\customize\spydercustomize.py", line 469, in exec_code
exec_fun(compile(ast_code, filename, 'exec'), ns_globals, ns_locals)
File "C:\WINDOWS\system32\spyder-env\lib\site-packages\spyder_kernels\py3compat.py", line 356, in compat_exec
exec(code, globals, locals)
File "d:\git-python\opcua_mqtt_interface\fromopc2mqtt_example.py", line 275, in <module>
File "C:\WINDOWS\system32\spyder-env\lib\site-packages\nest_asyncio.py", line 40, in run
loop.run_until_complete(task)
File "C:\WINDOWS\system32\spyder-env\lib\site-packages\nest_asyncio.py", line 83, in run_until_complete
self._run_once()
File "C:\WINDOWS\system32\spyder-env\lib\site-packages\nest_asyncio.py", line 119, in _run_once
handle._run()
File "C:\Program Files\Python310\lib\asyncio\events.py", line 80, in _run
self._context.run(self._callback, *self._args)
File "C:\Program Files\Python310\lib\asyncio\tasks.py", line 315, in __wakeup
self.__step()
File "C:\WINDOWS\system32\spyder-env\lib\site-packages\nest_asyncio.py", line 195, in step
step_orig(task, exc)
File "C:\Program Files\Python310\lib\asyncio\tasks.py", line 232, in __step
result = coro.send(None)
File "d:\git-python\opcua_mqtt_interface\fromopc2mqtt_example.py", line 271, in main
File "d:\git-python\opcua_mqtt_interface\fromopc2mqtt_example.py", line 267, in opc2mqtt
SystemExit
09.08.2022 07:46:32 [ERROR ] Task exception was never retrieved
future: <Task finished name='Task-242' coro=<main() done, defined at d:\git-python\opcua_mqtt_interface\fromopc2mqtt_example.py:267> exception=SystemExit()>
Traceback (most recent call last):
File "C:\WINDOWS\system32\spyder-env\lib\site-packages\nest_asyncio.py", line 35, in run
return loop.run_until_complete(task)
File "C:\WINDOWS\system32\spyder-env\lib\site-packages\nest_asyncio.py", line 83, in run_until_complete
self._run_once()
File "C:\WINDOWS\system32\spyder-env\lib\site-packages\nest_asyncio.py", line 106, in _run_once
event_list = self._selector.select(timeout)
File "C:\Program Files\Python310\lib\selectors.py", line 324, in select
r, w, _ = self._select(self._readers, self._writers, [], timeout)
File "C:\Program Files\Python310\lib\selectors.py", line 315, in _select
r, w, x = select.select(r, w, w, timeout)
KeyboardInterrupt
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "C:\WINDOWS\system32\spyder-env\lib\site-packages\spyder_kernels\customize\spydercustomize.py", line 469, in exec_code
exec_fun(compile(ast_code, filename, 'exec'), ns_globals, ns_locals)
File "C:\WINDOWS\system32\spyder-env\lib\site-packages\spyder_kernels\py3compat.py", line 356, in compat_exec
exec(code, globals, locals)
File "d:\git-python\opcua_mqtt_interface\fromopc2mqtt_example.py", line 272, in <module>
File "C:\WINDOWS\system32\spyder-env\lib\site-packages\nest_asyncio.py", line 40, in run
loop.run_until_complete(task)
File "C:\WINDOWS\system32\spyder-env\lib\site-packages\nest_asyncio.py", line 83, in run_until_complete
self._run_once()
File "C:\WINDOWS\system32\spyder-env\lib\site-packages\nest_asyncio.py", line 119, in _run_once
handle._run()
File "C:\Program Files\Python310\lib\asyncio\events.py", line 80, in _run
self._context.run(self._callback, *self._args)
File "C:\Program Files\Python310\lib\asyncio\tasks.py", line 315, in __wakeup
self.__step()
File "C:\WINDOWS\system32\spyder-env\lib\site-packages\nest_asyncio.py", line 195, in step
step_orig(task, exc)
File "C:\Program Files\Python310\lib\asyncio\tasks.py", line 232, in __step
result = coro.send(None)
File "d:\git-python\opcua_mqtt_interface\fromopc2mqtt_example.py", line 268, in main
File "d:\git-python\opcua_mqtt_interface\fromopc2mqtt_example.py", line 264, in opc2mqtt
SystemExit
......@@ -5,7 +5,7 @@ This is an interface between OPC UA and MQTT. It subscribes to a list of
OPC nodes and publish them to MQTT topics.
Copyright 2022 GSI Helmholtzzentrum für Schwerionenforschung GmbH
Dr. D. Neidherr, EEL, Planckstraße 1, 64291 Darmstadt, Germany
D. Neidherr, EEL, Planckstraße 1, 64291 Darmstadt, Germany
eMail: D.Neidherr@gsi.de
Web: https://www.gsi.de/work/forschung/experimentelektronik/kontrollsysteme
......@@ -17,7 +17,6 @@ import platform
import os
import re
import datetime
import time
import sys
import json
import logging
......@@ -251,6 +250,7 @@ async def opc2mqtt():
await OPC2MqttIntstance.subscribe(["ns=2;s=Demo.Dynamic.Scalar.Int32", "ns=2;s=Demo.Dynamic.Scalar.Float", "ns=2;s=Demo.Dynamic.Scalar.String"])
await OPC2MqttIntstance.subscribe_add(["ns=2;s=Demo.Dynamic.Arrays.Int32"])
await OPC2MqttIntstance.subscribe_sub(["ns=2;s=Demo.Dynamic.Scalar.Float"])
await OPC2MqttIntstance.subscribe_add(["ns=2;s=Demo.Static.Scalar.String", "ns=2;s=Demo.Static.Scalar.Float", "ns=2;s=Demo.Static.Scalar.Double"])
while OPC2MqttIntstance._mqttclient.connected_flag:
await asyncio.sleep(1)
except BaseException as e:
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment