...
 
Commits (2)
......@@ -6,7 +6,7 @@ Maintainer: Newterm Developers <devs@newterm.pl>
Section: misc
Standards-Version: 3.6.1
X-Python-Version: >= 2.6
Build-Depends: debhelper (>= 5.0.38), ccache, cmake (>=3.2), libwxgtk3.0-dev, wx-common, libwxbase3.0-dev, libxml2-dev, bison, flex, imagemagick, jadetex, gettext, xsltproc, libxslt1-dev, libnewt-dev, libssl-dev | libssl1.0-dev, libpam-dev | libpam0g-dev, libcurl3-dev | libcurl4-gnutls-dev, perl, librsync-dev, libgtk2.0-dev, libsqlite3-dev, libldap2-dev (>= 2.3.5), python-setuptools (>= 0.6b3-1), libzip-dev, openjade, python-pybabel | python-babel, libicu-dev, libxt-dev, libc-ares-dev, libxmlrpc-epi-dev, libluajit-5.1-dev | liblua5.1-0-dev, libftgl-dev, libboost-dev (>=1.55), libboost-system-dev (>=1.55), libboost-thread-dev (>=1.55), libboost-program-options-dev (>=1.55), libboost-regex-dev (>=1.55), libboost-filesystem-dev (>=1.55), libboost-python-dev (>=1.55), libboost-locale-dev (>=1.55), libboost-signals-dev (>=1.55), libasio-dev | libasio1.55-dev, libstdc++-dev, libxpm-dev, libevent-dev(>=2.0), docbook-dsssl, docbook, rsync, dh-python, konwert, pyqt4-dev-tools, libtool, python-all-dev, python-dev, pyqt4-dev-tools, qt4-linguist-tools, libzmq3-dev, libcppunit-dev, protobuf-compiler, python-zmq, libprotobuf-dev, python-sip, python-lxml, python-protobuf, texlive-generic-recommended, libsnap7-dev, libsystemd-dev
Build-Depends: debhelper (>= 5.0.38), ccache, cmake (>=3.2), libwxgtk3.0-dev, wx-common, libwxbase3.0-dev, libxml2-dev, bison, flex, imagemagick, jadetex, gettext, xsltproc, libxslt1-dev, libnewt-dev, libssl-dev | libssl1.0-dev, libpam-dev | libpam0g-dev, libcurl3-dev | libcurl4-gnutls-dev, perl, librsync-dev, libgtk2.0-dev, libsqlite3-dev, libldap2-dev (>= 2.3.5), python-setuptools (>= 0.6b3-1), libzip-dev, openjade, python-pybabel | python-babel, libicu-dev, libxt-dev, libc-ares-dev, libxmlrpc-epi-dev, libluajit-5.1-dev | liblua5.1-0-dev, libftgl-dev, libboost-dev (>=1.55), libboost-system-dev (>=1.55), libboost-thread-dev (>=1.55), libboost-program-options-dev (>=1.55), libboost-regex-dev (>=1.55), libboost-filesystem-dev (>=1.55), libboost-python-dev (>=1.55), libboost-locale-dev (>=1.55), libboost-signals-dev (>=1.55), libasio-dev | libasio1.55-dev, libstdc++-dev, libxpm-dev, libevent-dev(>=2.0), docbook-dsssl, docbook, rsync, dh-python, konwert, pyqt4-dev-tools, libtool, python-all-dev, python-dev, pyqt4-dev-tools, qt4-linguist-tools, libzmq3-dev, libcppunit-dev, protobuf-compiler, python-zmq, libprotobuf-dev, python-sip, python-lxml, python-protobuf, python3-coverage, texlive-generic-recommended, libsnap7-dev, libsystemd-dev
Package: szarp-base
Section: misc
......
add_subdirectory(meaner4)
add_subdirectory(meaner5)
add_subdirectory(parhub)
add_subdirectory(protobuf)
add_subdirectory(pysz4)
add_subdirectory(pysz5)
add_subdirectory(pyszbase)
add_subdirectory(pytimer)
add_subdirectory(utils)
......@@ -17,12 +17,22 @@ class Meaner5:
self.time_source = time_source
self.heartbeat_param_name = "Status/Meaner3/program_uruchomiony"
self.terminated = False
self.probe_save_time_shift = self.saving_interval * 0.5
self.max_future_shift = self.saving_interval * 2
assert(self.probe_save_time_shift > 0) # if equal to zero, daemons will send us probes at the same time that we would like to store data
assert(self.probe_save_time_shift < self.saving_interval) # with a small redesign we could use larger values too
def calculate_next_save_time(self, now):
def calculate_first_probe_time(self, now):
next_save_time = math.trunc(now) + self.saving_interval
next_save_time_aligned = next_save_time - (next_save_time % self.saving_interval)
return next_save_time_aligned
def calculate_next_probe_time(self, last_probe_time):
return last_probe_time + self.saving_interval
def calculate_save_time(self, probe_time):
return probe_time + self.probe_save_time_shift
def read_socket(self):
try:
while True:
......@@ -30,17 +40,31 @@ class Meaner5:
params_values = paramsvalues_pb2.ParamsValues.FromString(msg)
for param_value in params_values.param_values:
index = param_value.param_no
if index not in self.msgs.keys():
self.logger.error("Cannot store param with index: %d, no param name", index)
continue
self.msgs[index].append(param_value)
self.logger.debug("Param '%s' value received: %d with time: %d" % (self.param_names_ordered[index], param_value.value_before_prec, param_value.time))
except zmq.ZMQError as e:
if e.errno != zmq.EAGAIN:
raise
def msg_list_to_param_value(self, msg_list):
def msg_list_to_param_value(self, msg_list, probe_timestamp):
numeric_value = 0
count = 0
prec = None
is_nodata = True
unused_msgs = []
for msg in msg_list:
if msg.time <= probe_timestamp - self.saving_interval:
self.logger.debug("Ignoring msg, because it's too old, sth is malfunctioning...")
continue
if msg.time > probe_timestamp + self.max_future_shift:
self.logger.debug("Ignoring msg, because it's too far from the future, sth is malfunctioning...")
continue
if msg.time > probe_timestamp:
unused_msgs.append(msg)
continue
if msg.is_nan:
continue
numeric_value = numeric_value + msg.value_before_prec
......@@ -51,26 +75,36 @@ class Meaner5:
else:
if msg.prec != prec:
self.logger.error("Precision changed during time step, cannot continue")
return self.base_handler.ParamValue()
return (self.base_handler.ParamValue(), unused_msgs)
if is_nodata:
return self.base_handler.ParamValue()
return self.base_handler.ParamValue(value_before_prec=int(numeric_value / count), prec=prec, is_no_data=False)
return (self.base_handler.ParamValue(), unused_msgs)
return (self.base_handler.ParamValue(value_before_prec=int(numeric_value / count), prec=prec, is_no_data=False), unused_msgs)
def db_store(self, timestamp):
def db_store(self, probe_timestamp):
for index in self.msgs.keys():
value = self.msg_list_to_param_value(self.msgs[index])
value, unused_msgs = self.msg_list_to_param_value(self.msgs[index], probe_timestamp)
self.msgs[index] = unused_msgs
param_name = self.param_names_ordered[index]
self.db.write_value(param_name, timestamp, value)
self.db.write_value(self.heartbeat_param_name, timestamp, self.base_handler.ParamValue(value_before_prec=1, prec=0, is_no_data=False))
self.db.write_value(param_name, probe_timestamp, value)
self.logger.debug("Param '%s' value store: %d with time: %d" % (param_name, value.value_before_prec, probe_timestamp))
self.db.write_value(self.heartbeat_param_name, probe_timestamp, self.base_handler.ParamValue(value_before_prec=1, prec=0, is_no_data=False))
def loop(self):
now = self.time_source.time()
next_probe_time = self.calculate_first_probe_time(now)
while not interrupt_caught:
self.clear_msgs()
next_save_time = self.calculate_save_time(next_probe_time)
now = self.time_source.time()
next_save_time = self.calculate_next_save_time(now)
if next_save_time < now:
self.logger.error("next_save_time (%d) < now (%d), shifting next_probe_time" % (next_save_time, t))
next_probe_time = self.calculate_first_probe_time(now)
next_save_time = self.calculate_save_time(next_probe_time)
wait_ms = (next_save_time - now) * 1000
wait_ms_step = wait_ms / 10.0
while self.time_source.time() < next_save_time:
t = now
while t < next_save_time:
t = self.time_source.time()
if interrupt_caught:
break
try:
......@@ -82,7 +116,8 @@ class Meaner5:
self.read_socket()
if interrupt_caught:
break
self.db_store(next_save_time)
self.db_store(next_probe_time)
next_probe_time = self.calculate_next_probe_time(next_probe_time)
def clear_msgs(self):
for index in range(len(self.param_names_ordered)):
......
#!/usr/bin/python2
#!/usr/bin/python3
import fcntl, sys, os, struct
import time
......@@ -12,7 +12,7 @@ from simpleparamsreader import SimpleParamsReader
"""
Meaner5 daemon.
To run manually: env PYTHONPATH="${PYTHONPATH}:.:../protobuf:../pysz5:../../utils/probes_server" python ./meaner5dmn.py
To run manually: env PYTHONPATH="${PYTHONPATH}:.:../protobuf:../pysz5:../../utils/probes_server" python3 ./meaner5dmn.py
"""
daemon_name = "meaner5"
......
......@@ -2,4 +2,5 @@
# ugly way of calling python tests, it should be integrated somehow with python pybuild tests launched by dh_auto_test
env PYTHONPATH="${PYTHONPATH}:.:../protobuf:../pysz5" /usr/bin/python2 -m unittest discover -s ./unittests/
env PYTHONPATH="${PYTHONPATH}:.:../protobuf:../pysz5" python3-coverage run --source . --omit "unittests/*" -m unittest discover -s ./unittests/
python3-coverage report -m
......@@ -15,26 +15,64 @@ def run_server():
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind(parhub_uri)
for i in range(0, 200):
for i in range(0, 2000):
values = paramsvalues_pb2.ParamsValues()
value = values.param_values.add()
value.param_no = 1
value.time = 0
value.time = 1
value.value_before_prec = 5
value.prec = 0
value.is_nan = False
value = values.param_values.add()
value.param_no = 2
value.time = 0
value.time = 1
value.value_before_prec = 6
value.prec = 1
value.is_nan = False
value = values.param_values.add()
value.param_no = 3
value.time = 0
value.time = 1
value.value_before_prec = 7
value.prec = i % 2 # varying prec should result in nodata
value.is_nan = False
value = values.param_values.add()
value.param_no = 100
value.time = 1
value.value_before_prec = 5
value.prec = 0
value.is_nan = False
value = values.param_values.add()
value.param_no = 1
value.time = 20
value.value_before_prec = 5
value.prec = 0
value.is_nan = True
value = values.param_values.add()
value.param_no = 2
value.time = 20
value.value_before_prec = 16
value.prec = 1
value.is_nan = False
value = values.param_values.add()
value.param_no = 3
value.time = 20
value.value_before_prec = 17
value.prec = 2
value.is_nan = False
value = values.param_values.add()
value.param_no = 1
value.time = 60 # this one should trigger adding to unused_msgs
value.value_before_prec = 50
value.prec = 0
value.is_nan = False
value = values.param_values.add()
value.param_no = 3
value.time = 100 # this one should trigger discarding too far from the future msgs
value.value_before_prec = 17
value.prec = 2
value.is_nan = False
socket.send(values.SerializeToString())
sleep(0.01)
......@@ -85,8 +123,8 @@ class SimpleTest(unittest.TestCase):
db_handler = TestDbHandler(db)
param_names_ordered = ["zero", "first", "second", "third"]
ipk_handler = TestIpkHandler(param_names_ordered)
time_handler = TimeHandler([0, 0, 0.1, 0.2, 0.3, 1])
meaner = meaner5.Meaner5(base_name="test", parhub_uri=parhub_uri, saving_interval=1, logger=logger, base_handler=db_handler, ipk_handler=ipk_handler, time_source=time_handler)
time_handler = TimeHandler([0, 0, 1, 2, 3, 10, 15, 20, 21, 25, 50, 65]) # time=50 should trigger shifting next probe save time
meaner = meaner5.Meaner5(base_name="test", parhub_uri=parhub_uri, saving_interval=10, logger=logger, base_handler=db_handler, ipk_handler=ipk_handler, time_source=time_handler)
server = Process(target=run_server)
server.start()
try:
......@@ -96,10 +134,10 @@ class SimpleTest(unittest.TestCase):
server.terminate() # TODO less violent termination, that would close zmq gracefully?
param_names = [heartbeat_param_name] + param_names_ordered
self.assertEqual(set(param_names), set(db.params.keys()))
expected_store_no = 1
expected_store_no = 3
for key in db.params.keys():
self.assertEqual(expected_store_no, len(db.params[key]))
expected_timestamp = 1
expected_timestamp = 10
for param_name in param_names:
self.assertEqual(expected_timestamp, db.params[param_name][0][0])
self.assertEqual(True, db.params["zero"][0][1].is_no_data)
......@@ -113,3 +151,29 @@ class SimpleTest(unittest.TestCase):
self.assertEqual(0, db.params["first"][0][1].prec)
self.assertEqual(1, db.params["second"][0][1].prec)
self.assertEqual(0, db.params[heartbeat_param_name][0][1].prec)
expected_timestamp = 20
for param_name in param_names:
self.assertEqual(expected_timestamp, db.params[param_name][1][0])
self.assertEqual(True, db.params["zero"][1][1].is_no_data)
self.assertEqual(True, db.params["first"][1][1].is_no_data)
self.assertEqual(False, db.params["second"][1][1].is_no_data)
self.assertEqual(False, db.params["third"][1][1].is_no_data)
self.assertEqual(False, db.params[heartbeat_param_name][1][1].is_no_data)
self.assertEqual(16, db.params["second"][1][1].value_before_prec)
self.assertEqual(17, db.params["third"][1][1].value_before_prec)
self.assertEqual(1, db.params[heartbeat_param_name][1][1].value_before_prec)
self.assertEqual(1, db.params["second"][1][1].prec)
self.assertEqual(2, db.params["third"][1][1].prec)
self.assertEqual(0, db.params[heartbeat_param_name][1][1].prec)
expected_timestamp = 60
for param_name in param_names:
self.assertEqual(expected_timestamp, db.params[param_name][2][0])
self.assertEqual(True, db.params["zero"][2][1].is_no_data)
self.assertEqual(False, db.params["first"][2][1].is_no_data)
self.assertEqual(True, db.params["second"][2][1].is_no_data)
self.assertEqual(True, db.params["third"][2][1].is_no_data)
self.assertEqual(False, db.params[heartbeat_param_name][2][1].is_no_data)
self.assertEqual(50, db.params["first"][2][1].value_before_prec)
self.assertEqual(1, db.params[heartbeat_param_name][2][1].value_before_prec)
self.assertEqual(0, db.params["first"][2][1].prec)
self.assertEqual(0, db.params[heartbeat_param_name][2][1].prec)