openlp-core team mailing list archive
-
openlp-core team
-
Mailing list archive
-
Message #32682
[Merge] lp:~alisonken1/openlp/pjlink2-p into lp:openlp
Ken Roberts has proposed merging lp:~alisonken1/openlp/pjlink2-p into lp:openlp.
Commit message:
PJLink2 Update P
Requested reviews:
OpenLP Core (openlp-core)
For more details, see:
https://code.launchpad.net/~alisonken1/openlp/pjlink2-p/+merge/337504
- manager: Remove unused signal disconnect projectorNetwork.disconnect()
- Change PJLinkUDP.pjlink_udp_commands to dict with link to processing methods
- Add test_projector_pjlink_udp.test_process_ackn_duplicate
- Add test_projector_pjlink_udp.test_process_ackn_multiple
- Add test_projector_pjlink_udp.test_process_ackn_single
- Add test_projector_pjlink_udp.test_process_srch
- Add PJLinkUDP.get_datagram method
- Add PJLinkUDP._trash_udp_buffer method
- Add PJLinkUDP.process_ackn method
- Add PJLinkUDP.process_srch method
- Move projector tests to tests/openlp_core/projectors/
- Update resources.projector.data.TESTx_DATA to add mac_adx
--------------------------------------------------------------------------------
lp:~alisonken1/openlp/pjlink2-p (revision 2810)
https://ci.openlp.io/job/Branch-01-Pull/2443/ [SUCCESS]
https://ci.openlp.io/job/Branch-02a-Linux-Tests/2344/ [SUCCESS]
https://ci.openlp.io/job/Branch-02b-macOS-Tests/138/ [SUCCESS]
https://ci.openlp.io/job/Branch-03a-Build-Source/60/ [SUCCESS]
https://ci.openlp.io/job/Branch-03b-Build-macOS/58/ [SUCCESS]
https://ci.openlp.io/job/Branch-04a-Code-Analysis/1522/ [SUCCESS]
https://ci.openlp.io/job/Branch-04b-Test-Coverage/1335/ [SUCCESS]
https://ci.openlp.io/job/Branch-05-AppVeyor-Tests/276/ [FAILURE]
--
Your team OpenLP Core is requested to review the proposed merge of lp:~alisonken1/openlp/pjlink2-p into lp:openlp.
=== modified file 'openlp/.version'
--- openlp/.version 2016-12-12 22:16:23 +0000
+++ openlp/.version 2018-02-10 09:09:49 +0000
@@ -1,1 +1,1 @@
-2.5.0
+2.5-bzr2809
\ No newline at end of file
=== modified file 'openlp/core/projectors/constants.py'
--- openlp/core/projectors/constants.py 2018-01-03 00:35:14 +0000
+++ openlp/core/projectors/constants.py 2018-02-10 09:09:49 +0000
@@ -24,6 +24,8 @@
"""
import logging
+from PyQt5.QtNetwork import QAbstractSocket, QHostAddress, QNetworkInterface
+
from openlp.core.common.i18n import translate
log = logging.getLogger(__name__)
@@ -557,3 +559,24 @@
label = "{source}{item}".format(source=source, item=item)
PJLINK_DEFAULT_CODES[label] = "{source} {item}".format(source=PJLINK_DEFAULT_SOURCES[source],
item=PJLINK_DEFAULT_ITEMS[item])
+
+# Get the local IPv4 active address(es) that are NOT localhost (lo or '127.0.0.1')
+log.debug('Getting local IPv4 interface(es) information')
+MY_IP4 = {}
+for iface in QNetworkInterface.allInterfaces():
+ if not iface.isValid() or not (iface.flags() & (QNetworkInterface.IsUp | QNetworkInterface.IsRunning)):
+ continue
+ for address in iface.addressEntries():
+ ip = address.ip()
+ if (ip.protocol() == QAbstractSocket.IPv4Protocol) and (ip != QHostAddress.LocalHost):
+ MY_IP4[iface.name()] = {'ip': ip.toString(),
+ 'broadcast': address.broadcast().toString(),
+ 'netmask': address.netmask().toString(),
+ 'prefix': address.prefixLength(),
+ 'localnet': QHostAddress(address.netmask().toIPv4Address() &
+ ip.toIPv4Address()).toString()
+ }
+ log.debug('Adding {iface} to active list'.format(iface=iface.name()))
+
+if not MY_IP4:
+ log.warning('No active IPv4 interfaces found')
=== modified file 'openlp/core/projectors/manager.py'
--- openlp/core/projectors/manager.py 2018-01-13 05:41:42 +0000
+++ openlp/core/projectors/manager.py 2018-02-10 09:09:49 +0000
@@ -308,7 +308,6 @@
self.settings_section = 'projector'
self.projectordb = projectordb
self.projector_list = []
- self.pjlink_udp = PJLinkUDP(self.projector_list)
self.source_select_form = None
def bootstrap_initialise(self):
@@ -323,6 +322,7 @@
else:
log.debug('Using existing ProjectorDB() instance')
self.get_settings()
+ self.pjlink_udp = PJLinkUDP(self.projector_list)
def bootstrap_post_set_up(self):
"""
@@ -344,6 +344,7 @@
"""
Retrieve the saved settings
"""
+ log.debug('Updating ProjectorManager settings')
settings = Settings()
settings.beginGroup(self.settings_section)
self.autostart = settings.value('connect on start')
@@ -502,10 +503,6 @@
if ans == msg.Cancel:
return
try:
- projector.link.projectorNetwork.disconnect(self.update_status)
- except (AttributeError, TypeError):
- pass
- try:
projector.link.changeStatus.disconnect(self.update_status)
except (AttributeError, TypeError):
pass
=== modified file 'openlp/core/projectors/pjlink.py'
--- openlp/core/projectors/pjlink.py 2018-01-13 05:41:42 +0000
+++ openlp/core/projectors/pjlink.py 2018-02-10 09:09:49 +0000
@@ -64,7 +64,7 @@
log = logging.getLogger(__name__)
log.debug('pjlink loaded')
-__all__ = ['PJLink']
+__all__ = ['PJLink', 'PJLinkUDP']
# Shortcuts
SocketError = QtNetwork.QAbstractSocket.SocketError
@@ -79,22 +79,145 @@
"""
Socket service for PJLink UDP socket.
"""
- # New commands available in PJLink Class 2
- pjlink_udp_commands = [
- 'ACKN', # Class 2 (cmd is SRCH)
- 'ERST', # Class 1/2
- 'INPT', # Class 1/2
- 'LKUP', # Class 2 (reply only - no cmd)
- 'POWR', # Class 1/2
- 'SRCH' # Class 2 (reply is ACKN)
- ]
-
def __init__(self, projector_list, port=PJLINK_PORT):
"""
- Initialize socket
+ Socket services for PJLink UDP packets.
+
+ Since all UDP packets from any projector will come into the same
+ port, process UDP packets here then route to the appropriate
+ projector instance as needed.
"""
+ # Keep track of currently defined projectors so we can route
+ # inbound packets to the correct instance
+ super().__init__()
self.projector_list = projector_list
self.port = port
+ # Local defines
+ self.ackn_list = {} # Replies from online projetors
+ self.search_active = False
+ self.search_time = 30000 # 30 seconds for allowed time
+ self.search_timer = QtCore.QTimer()
+ # New commands available in PJLink Class 2
+ # ACKN/SRCH is processed here since it's used to find available projectors
+ # Other commands are processed by the individual projector instances
+ self.pjlink_udp_functions = {
+ 'ACKN': self.process_ackn, # Class 2, command is 'SRCH'
+ 'ERST': None, # Class 1/2
+ 'INPT': None, # Class 1/2
+ 'LKUP': None, # Class 2 (reply only - no cmd)
+ 'POWR': None, # Class 1/2
+ 'SRCH': self.process_srch # Class 2 (reply is ACKN)
+ }
+
+ self.readyRead.connect(self.get_datagram)
+ log.debug('(UDP) PJLinkUDP() Initialized')
+
+ @QtCore.pyqtSlot()
+ def get_datagram(self):
+ """
+ Retrieve packet and basic checks
+ """
+ log.debug('(UDP) get_datagram() - Receiving data')
+ read = self.pendingDatagramSize()
+ if read < 0:
+ log.warn('(UDP) No data (-1)')
+ return
+ if read < 1:
+ log.warn('(UDP) get_datagram() called when pending data size is 0')
+ return
+ data, peer_address, peer_port = self.readDatagram(self.pendingDatagramSize())
+ log.debug('(UDP) {size} bytes received from {adx} on port {port}'.format(size=len(data),
+ adx=peer_address,
+ port=peer_port))
+ log.debug('(UDP) packet "{data}"'.format(data=data))
+ if len(data) < 0:
+ log.warn('(UDP) No data (-1)')
+ return
+ elif len(data) < 8:
+ # Minimum packet is '%2CCCC='
+ log.warn('(UDP) Invalid packet - not enough data')
+ return
+ elif data is None:
+ log.warn('(UDP) No data (None)')
+ return
+ elif len(data) > PJLINK_MAX_PACKET:
+ log.warn('(UDP) Invalid packet - length too long')
+ return
+ elif not data.startswith(PJLINK_PREFIX):
+ log.warn('(UDP) Invalid packet - does not start with PJLINK_PREFIX')
+ return
+ elif data[1] != '2':
+ log.warn('(UDP) Invalid packet - missing/invalid PJLink class version')
+ return
+ elif data[6] != '=':
+ log.warn('(UDP) Invalid packet - separator missing')
+ return
+ # First two characters are header information we don't need at this time
+ cmd, data = data[2:].split('=')
+ if cmd not in self.pjlink_udp_functions:
+ log.warn('(UDP) Invalid packet - not a valid PJLink UDP reply')
+ return
+ if self.pjlink_udp_functions[cmd] is not None:
+ log.debug('(UDP) Processing {cmd} with "{data}"'.format(cmd=cmd, data=data))
+ return self.pjlink_udp_functions[cmd](data=data, host=peer_address, port=peer_port)
+ else:
+ log.debug('(UDP) Checking projector list for ip {host} to process'.format(host=peer_address))
+ for projector in self.projector_list:
+ if peer_address == projector.ip:
+ if cmd not in projector.pjlink_functions:
+ log.error('(UDP) Could not find method to process '
+ '"{cmd}" in {host}'.format(cmd=cmd, host=projector.ip))
+ return
+ log.debug('(UDP) Calling "{cmd}" in {host}'.format(cmd=cmd, host=projector.ip))
+ return projector.pjlink_functions[cmd](data=data)
+ log.warn('(UDP) Could not find projector with ip {ip} to process packet'.format(ip=peer_address))
+ return
+
+ def process_ackn(self, data, host, port):
+ """
+ Process the ACKN command.
+
+ :param data: Data in packet
+ :param host: IP address of sending host
+ :param port: Port received on
+ """
+ log.debug('(UDP) Processing ACKN packet')
+ if host not in self.ackn_list:
+ log.debug('(UDP) Adding {host} to ACKN list'.format(host=host))
+ self.ackn_list[host] = {'data': data,
+ 'port': port}
+ else:
+ log.warn('(UDP) Host {host} already replied - ignoring'.format(host=host))
+
+ def process_srch(self, data, host, port):
+ """
+ Process the SRCH command.
+
+ SRCH is processed by terminals so we ignore any packet.
+
+ :param data: Data in packet
+ :param host: IP address of sending host
+ :param port: Port received on
+ """
+ log.debug('(UDP) SRCH packet received - ignoring')
+ return
+
+ def search_start(self):
+ """
+ Start search for projectors on local network
+ """
+ self.search_active = True
+ self.ackn_list = {}
+ # TODO: Send SRCH packet here
+ self.search_timer.singleShot(self.search_time, self.search_stop)
+
+ @QtCore.pyqtSlot()
+ def search_stop(self):
+ """
+ Stop search
+ """
+ self.search_active = False
+ self.search_timer.stop()
class PJLinkCommands(object):
@@ -257,8 +380,9 @@
else:
clss = data
self.pjlink_class = clss
- log.debug('({ip}) Setting pjlink_class for this projector to "{data}"'.format(ip=self.entry.name,
- data=self.pjlink_class))
+ log.debug('({ip}) Setting pjlink_class for this projector '
+ 'to "{data}"'.format(ip=self.entry.name,
+ data=self.pjlink_class))
# Since we call this one on first connect, setup polling from here
if not self.no_poll:
log.debug('({ip}) process_pjlink(): Starting timer'.format(ip=self.entry.name))
@@ -276,9 +400,10 @@
"""
if len(data) != PJLINK_ERST_DATA['DATA_LENGTH']:
count = PJLINK_ERST_DATA['DATA_LENGTH']
- log.warning('({ip}) Invalid error status response "{data}": length != {count}'.format(ip=self.entry.name,
- data=data,
- count=count))
+ log.warning('({ip}) Invalid error status response "{data}": '
+ 'length != {count}'.format(ip=self.entry.name,
+ data=data,
+ count=count))
return
try:
datacheck = int(data)
@@ -557,7 +682,7 @@
class PJLink(QtNetwork.QTcpSocket, PJLinkCommands):
"""
- Socket service for PJLink TCP socket.
+ Socket services for PJLink TCP packets.
"""
# Signals sent by this module
changeStatus = QtCore.pyqtSignal(str, int, str)
=== added directory 'tests/openlp_core'
=== added file 'tests/openlp_core/__init__.py'
--- tests/openlp_core/__init__.py 1970-01-01 00:00:00 +0000
+++ tests/openlp_core/__init__.py 2018-02-10 09:09:49 +0000
@@ -0,0 +1,26 @@
+# -*- coding: utf-8 -*-
+# vim: autoindent shiftwidth=4 expandtab textwidth=120 tabstop=4 softtabstop=4
+
+###############################################################################
+# OpenLP - Open Source Lyrics Projection #
+# --------------------------------------------------------------------------- #
+# Copyright (c) 2008-2018 OpenLP Developers #
+# --------------------------------------------------------------------------- #
+# This program is free software; you can redistribute it and/or modify it #
+# under the terms of the GNU General Public License as published by the Free #
+# Software Foundation; version 2 of the License. #
+# #
+# This program is distributed in the hope that it will be useful, but WITHOUT #
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or #
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for #
+# more details. #
+# #
+# You should have received a copy of the GNU General Public License along #
+# with this program; if not, write to the Free Software Foundation, Inc., 59 #
+# Temple Place, Suite 330, Boston, MA 02111-1307 USA #
+###############################################################################
+"""
+:mod: `tests.openlp_core` module
+
+Tests modules/files for module openlp.core
+"""
=== renamed directory 'tests/functional/openlp_core/projectors' => 'tests/openlp_core/projectors'
=== modified file 'tests/openlp_core/projectors/__init__.py'
--- tests/functional/openlp_core/projectors/__init__.py 2017-11-10 11:59:38 +0000
+++ tests/openlp_core/projectors/__init__.py 2018-02-10 09:09:49 +0000
@@ -20,5 +20,5 @@
# Temple Place, Suite 330, Boston, MA 02111-1307 USA #
###############################################################################
"""
-Module-level functions for the functional test suite
+Module-level functions for the projector test suite
"""
=== added file 'tests/openlp_core/projectors/test_projector_pjlink_udp.py'
--- tests/openlp_core/projectors/test_projector_pjlink_udp.py 1970-01-01 00:00:00 +0000
+++ tests/openlp_core/projectors/test_projector_pjlink_udp.py 2018-02-10 09:09:49 +0000
@@ -0,0 +1,360 @@
+
+# -*- coding: utf-8 -*-
+# vim: autoindent shiftwidth=4 expandtab textwidth=120 tabstop=4 softtabstop=4
+
+###############################################################################
+# OpenLP - Open Source Lyrics Projection #
+# --------------------------------------------------------------------------- #
+# Copyright (c) 2008-2018 OpenLP Developers #
+# --------------------------------------------------------------------------- #
+# This program is free software; you can redistribute it and/or modify it #
+# under the terms of the GNU General Public License as published by the Free #
+# Software Foundation; version 2 of the License. #
+# #
+# This program is distributed in the hope that it will be useful, but WITHOUT #
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or #
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for #
+# more details. #
+# #
+# You should have received a copy of the GNU General Public License along #
+# with this program; if not, write to the Free Software Foundation, Inc., 59 #
+# Temple Place, Suite 330, Boston, MA 02111-1307 USA #
+###############################################################################
+"""
+Package to test the PJLink UDP functions
+"""
+
+from unittest import TestCase
+from unittest.mock import call, patch
+
+import openlp.core.projectors.pjlink
+from openlp.core.projectors.constants import PJLINK_MAX_PACKET, PJLINK_PORT, PJLINK_PREFIX
+
+from openlp.core.projectors.db import Projector
+from openlp.core.projectors.pjlink import PJLinkUDP
+from tests.resources.projector.data import TEST1_DATA, TEST2_DATA
+
+
+class TestPJLinkBase(TestCase):
+ """
+ Tests for the PJLinkUDP class
+ """
+ def setUp(self):
+ """
+ Setup generic test conditions
+ """
+ self.test_list = [Projector(**TEST1_DATA), Projector(**TEST2_DATA)]
+
+ def tearDown(self):
+ """
+ Close generic test condidtions
+ """
+ self.test_list = None
+
+ @patch.object(openlp.core.projectors.pjlink, 'log')
+ def test_get_datagram_data_invalid_class(self, mock_log):
+ """
+ Test get_datagram with invalid class number
+ """
+ # GIVEN: Test setup
+ pjlink_udp = PJLinkUDP(projector_list=self.test_list)
+ log_warn_calls = [call('(UDP) Invalid packet - missing/invalid PJLink class version')]
+ log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'),
+ call('(UDP) get_datagram() - Receiving data'),
+ call('(UDP) 24 bytes received from 111.111.111.111 on port 4352'),
+ call('(UDP) packet "%1ACKN=11:11:11:11:11:11"')]
+ with patch.object(pjlink_udp, 'pendingDatagramSize') as mock_datagram, \
+ patch.object(pjlink_udp, 'readDatagram') as mock_read:
+ mock_datagram.return_value = 24
+ mock_read.return_value = ('{prefix}1ACKN={mac}'.format(prefix=PJLINK_PREFIX, mac=TEST1_DATA['mac_adx']),
+ TEST1_DATA['ip'], PJLINK_PORT)
+
+ # WHEN: get_datagram called with 0 bytes ready
+ pjlink_udp.get_datagram()
+
+ # THEN: Log entries should be made and method returns
+ mock_log.debug.assert_has_calls(log_debug_calls)
+ mock_log.warn.assert_has_calls(log_warn_calls)
+
+ @patch.object(openlp.core.projectors.pjlink, 'log')
+ def test_get_datagram_data_invalid_command(self, mock_log):
+ """
+ Test get_datagram with invalid PJLink UDP command
+ """
+ # GIVEN: Test setup
+ pjlink_udp = PJLinkUDP(projector_list=self.test_list)
+ log_warn_calls = [call('(UDP) Invalid packet - not a valid PJLink UDP reply')]
+ log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'),
+ call('(UDP) get_datagram() - Receiving data'),
+ call('(UDP) 24 bytes received from 111.111.111.111 on port 4352'),
+ call('(UDP) packet "%2DUMB=11:11:11:11:11:11"')]
+ with patch.object(pjlink_udp, 'pendingDatagramSize') as mock_datagram, \
+ patch.object(pjlink_udp, 'readDatagram') as mock_read:
+ mock_datagram.return_value = 24
+ mock_read.return_value = ('{prefix}2DUMB={mac}'.format(prefix=PJLINK_PREFIX, mac=TEST1_DATA['mac_adx']),
+ TEST1_DATA['ip'], PJLINK_PORT)
+
+ # WHEN: get_datagram called with 0 bytes ready
+ pjlink_udp.get_datagram()
+
+ # THEN: Log entries should be made and method returns
+ mock_log.debug.assert_has_calls(log_debug_calls)
+ mock_log.warn.assert_has_calls(log_warn_calls)
+
+ @patch.object(openlp.core.projectors.pjlink, 'log')
+ def test_get_datagram_data_invalid_prefix(self, mock_log):
+ """
+ Test get_datagram when prefix != PJLINK_PREFIX
+ """
+ # GIVEN: Test setup
+ pjlink_udp = PJLinkUDP(projector_list=self.test_list)
+ log_warn_calls = [call('(UDP) Invalid packet - does not start with PJLINK_PREFIX')]
+ log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'),
+ call('(UDP) get_datagram() - Receiving data'),
+ call('(UDP) 24 bytes received from 111.111.111.111 on port 4352'),
+ call('(UDP) packet "$2ACKN=11:11:11:11:11:11"')]
+ with patch.object(pjlink_udp, 'pendingDatagramSize') as mock_datagram, \
+ patch.object(pjlink_udp, 'readDatagram') as mock_read:
+ mock_datagram.return_value = 24
+ mock_read.return_value = ('{prefix}2ACKN={mac}'.format(prefix='$', mac=TEST1_DATA['mac_adx']),
+ TEST1_DATA['ip'], PJLINK_PORT)
+
+ # WHEN: get_datagram called with 0 bytes ready
+ pjlink_udp.get_datagram()
+
+ # THEN: Log entries should be made and method returns
+ mock_log.debug.assert_has_calls(log_debug_calls)
+ mock_log.warn.assert_has_calls(log_warn_calls)
+
+ @patch.object(openlp.core.projectors.pjlink, 'log')
+ def test_get_datagram_data_invalid_separator(self, mock_log):
+ """
+ Test get_datagram when separator not equal to =
+ """
+ # GIVEN: Test setup
+ pjlink_udp = PJLinkUDP(projector_list=self.test_list)
+ log_warn_calls = [call('(UDP) Invalid packet - separator missing')]
+ log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'),
+ call('(UDP) get_datagram() - Receiving data'),
+ call('(UDP) 24 bytes received from 111.111.111.111 on port 4352'),
+ call('(UDP) packet "%2ACKN 11:11:11:11:11:11"')]
+ with patch.object(pjlink_udp, 'pendingDatagramSize') as mock_datagram, \
+ patch.object(pjlink_udp, 'readDatagram') as mock_read:
+ mock_datagram.return_value = 24
+ mock_read.return_value = ('{prefix}2ACKN {mac}'.format(prefix=PJLINK_PREFIX, mac=TEST1_DATA['mac_adx']),
+ TEST1_DATA['ip'], PJLINK_PORT)
+
+ # WHEN: get_datagram called with 0 bytes ready
+ pjlink_udp.get_datagram()
+
+ # THEN: Log entries should be made and method returns
+ mock_log.debug.assert_has_calls(log_debug_calls)
+ mock_log.warn.assert_has_calls(log_warn_calls)
+
+ @patch.object(openlp.core.projectors.pjlink, 'log')
+ def test_get_datagram_data_long(self, mock_log):
+ """
+ Test get_datagram when datagram > PJLINK_MAX_PACKET
+ """
+ # GIVEN: Test setup
+ pjlink_udp = PJLinkUDP(projector_list=self.test_list)
+ log_warn_calls = [call('(UDP) Invalid packet - length too long')]
+ log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'),
+ call('(UDP) get_datagram() - Receiving data'),
+ call('(UDP) 143 bytes received from 111.111.111.111 on port 4352'),
+ call('(UDP) packet "%2ACKN={long}"'.format(long='X' * PJLINK_MAX_PACKET))]
+ with patch.object(pjlink_udp, 'pendingDatagramSize') as mock_datagram, \
+ patch.object(pjlink_udp, 'readDatagram') as mock_read:
+ mock_datagram.return_value = PJLINK_MAX_PACKET + 7
+ mock_read.return_value = ('{prefix}2ACKN={long}'.format(prefix=PJLINK_PREFIX,
+ long='X' * PJLINK_MAX_PACKET),
+ TEST1_DATA['ip'], PJLINK_PORT)
+
+ # WHEN: get_datagram called with 0 bytes ready
+ pjlink_udp.get_datagram()
+
+ # THEN: Log entries should be made and method returns
+ mock_log.debug.assert_has_calls(log_debug_calls)
+ mock_log.warn.assert_has_calls(log_warn_calls)
+
+ @patch.object(openlp.core.projectors.pjlink, 'log')
+ def test_get_datagram_data_negative_zero_length(self, mock_log):
+ """
+ Test get_datagram when pendingDatagramSize = 0
+ """
+ # GIVEN: Test setup
+ pjlink_udp = PJLinkUDP(projector_list=self.test_list)
+ log_warn_calls = [call('(UDP) No data (-1)')]
+ log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'),
+ call('(UDP) get_datagram() - Receiving data')]
+ with patch.object(pjlink_udp, 'pendingDatagramSize') as mock_datagram, \
+ patch.object(pjlink_udp, 'readDatagram') as mock_read:
+ mock_datagram.return_value = -1
+ mock_read.return_value = ('', TEST1_DATA['ip'], PJLINK_PORT)
+
+ # WHEN: get_datagram called with 0 bytes ready
+ pjlink_udp.get_datagram()
+
+ # THEN: Log entries should be made and method returns
+ mock_log.warn.assert_has_calls(log_warn_calls)
+ mock_log.debug.assert_has_calls(log_debug_calls)
+
+ @patch.object(openlp.core.projectors.pjlink, 'log')
+ def test_get_datagram_data_no_data(self, mock_log):
+ """
+ Test get_datagram when data length = 0
+ """
+ # GIVEN: Test setup
+ pjlink_udp = PJLinkUDP(projector_list=self.test_list)
+ log_warn_calls = [call('(UDP) Invalid packet - not enough data')]
+ log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'),
+ call('(UDP) get_datagram() - Receiving data')]
+ with patch.object(pjlink_udp, 'pendingDatagramSize') as mock_datagram, \
+ patch.object(pjlink_udp, 'readDatagram') as mock_read:
+ mock_datagram.return_value = 1
+ mock_read.return_value = ('', TEST1_DATA['ip'], PJLINK_PORT)
+
+ # WHEN: get_datagram called with 0 bytes ready
+ pjlink_udp.get_datagram()
+
+ # THEN: Log entries should be made and method returns
+ mock_log.warn.assert_has_calls(log_warn_calls)
+ mock_log.debug.assert_has_calls(log_debug_calls)
+
+ @patch.object(openlp.core.projectors.pjlink, 'log')
+ def test_get_datagram_data_short(self, mock_log):
+ """
+ Test get_datagram when data length < 8
+ """
+ # GIVEN: Test setup
+ pjlink_udp = PJLinkUDP(projector_list=self.test_list)
+ log_warn_calls = [call('(UDP) Invalid packet - not enough data')]
+ log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'),
+ call('(UDP) get_datagram() - Receiving data')]
+ with patch.object(pjlink_udp, 'pendingDatagramSize') as mock_datagram, \
+ patch.object(pjlink_udp, 'readDatagram') as mock_read:
+ mock_datagram.return_value = 6
+ mock_read.return_value = ('{prefix}2ACKN'.format(prefix=PJLINK_PREFIX), TEST1_DATA['ip'], PJLINK_PORT)
+
+ # WHEN: get_datagram called with 0 bytes ready
+ pjlink_udp.get_datagram()
+
+ # THEN: Log entries should be made and method returns
+ mock_log.warn.assert_has_calls(log_warn_calls)
+ mock_log.debug.assert_has_calls(log_debug_calls)
+
+ @patch.object(openlp.core.projectors.pjlink, 'log')
+ def test_get_datagram_pending_zero_length(self, mock_log):
+ """
+ Test get_datagram when pendingDatagramSize = 0
+ """
+ # GIVEN: Test setup
+ pjlink_udp = PJLinkUDP(projector_list=self.test_list)
+ log_warn_calls = [call('(UDP) get_datagram() called when pending data size is 0')]
+ log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'),
+ call('(UDP) get_datagram() - Receiving data')]
+ with patch.object(pjlink_udp, 'pendingDatagramSize') as mock_datagram:
+ mock_datagram.return_value = 0
+
+ # WHEN: get_datagram called with 0 bytes ready
+ pjlink_udp.get_datagram()
+
+ # THEN: Log entries should be made and method returns
+ mock_log.warn.assert_has_calls(log_warn_calls)
+ mock_log.debug.assert_has_calls(log_debug_calls)
+
+ @patch.object(openlp.core.projectors.pjlink, 'log')
+ def test_process_ackn_duplicate(self, mock_log):
+ """
+ Test process_ackn method with multiple calls with same data
+ """
+ # GIVEN: Test setup
+ pjlink_udp = PJLinkUDP(projector_list=self.test_list)
+ check_list = {TEST1_DATA['ip']: {'data': TEST1_DATA['mac_adx'], 'port': PJLINK_PORT}}
+ log_warn_calls = [call('(UDP) Host {host} already replied - ignoring'.format(host=TEST1_DATA['ip']))]
+ log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'),
+ call('(UDP) Processing ACKN packet'),
+ call('(UDP) Adding {host} to ACKN list'.format(host=TEST1_DATA['ip'])),
+ call('(UDP) Processing ACKN packet')]
+
+ # WHEN: process_ackn called twice with same data
+ pjlink_udp.process_ackn(data=TEST1_DATA['mac_adx'], host=TEST1_DATA['ip'], port=PJLINK_PORT)
+ pjlink_udp.process_ackn(data=TEST1_DATA['mac_adx'], host=TEST1_DATA['ip'], port=PJLINK_PORT)
+
+ # THEN: pjlink_udp.ack_list should equal test_list
+ # NOTE: This assert only returns AssertionError - does not list differences. Maybe add a compare function?
+ if pjlink_udp.ackn_list != check_list:
+ # Check this way so we can print differences to stdout
+ print('\nackn_list: ', pjlink_udp.ackn_list)
+ print('test_list: ', check_list)
+ assert pjlink_udp.ackn_list == check_list
+ mock_log.debug.assert_has_calls(log_debug_calls)
+ mock_log.warn.assert_has_calls(log_warn_calls)
+
+ @patch.object(openlp.core.projectors.pjlink, 'log')
+ def test_process_ackn_multiple(self, mock_log):
+ """
+ Test process_ackn method with multiple calls
+ """
+ # GIVEN: Test setup
+ pjlink_udp = PJLinkUDP(projector_list=self.test_list)
+ check_list = {TEST1_DATA['ip']: {'data': TEST1_DATA['mac_adx'], 'port': PJLINK_PORT},
+ TEST2_DATA['ip']: {'data': TEST2_DATA['mac_adx'], 'port': PJLINK_PORT}}
+ log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'),
+ call('(UDP) Processing ACKN packet'),
+ call('(UDP) Adding {host} to ACKN list'.format(host=TEST1_DATA['ip'])),
+ call('(UDP) Processing ACKN packet'),
+ call('(UDP) Adding {host} to ACKN list'.format(host=TEST2_DATA['ip']))]
+
+ # WHEN: process_ackn called twice with different data
+ pjlink_udp.process_ackn(data=TEST1_DATA['mac_adx'], host=TEST1_DATA['ip'], port=PJLINK_PORT)
+ pjlink_udp.process_ackn(data=TEST2_DATA['mac_adx'], host=TEST2_DATA['ip'], port=PJLINK_PORT)
+
+ # THEN: pjlink_udp.ack_list should equal test_list
+ # NOTE: This assert only returns AssertionError - does not list differences. Maybe add a compare function?
+ if pjlink_udp.ackn_list != check_list:
+ # Check this way so we can print differences to stdout
+ print('\nackn_list: ', pjlink_udp.ackn_list)
+ print('test_list: ', check_list)
+ assert pjlink_udp.ackn_list == check_list
+ mock_log.debug.assert_has_calls(log_debug_calls)
+
+ @patch.object(openlp.core.projectors.pjlink, 'log')
+ def test_process_ackn_single(self, mock_log):
+ """
+ Test process_ackn method with single call
+ """
+ # GIVEN: Test setup
+ pjlink_udp = PJLinkUDP(projector_list=self.test_list)
+ check_list = {TEST1_DATA['ip']: {'data': TEST1_DATA['mac_adx'], 'port': PJLINK_PORT}}
+ log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'),
+ call('(UDP) Processing ACKN packet'),
+ call('(UDP) Adding {host} to ACKN list'.format(host=TEST1_DATA['ip']))]
+
+ # WHEN: process_ackn called twice with different data
+ pjlink_udp.process_ackn(data=TEST1_DATA['mac_adx'], host=TEST1_DATA['ip'], port=PJLINK_PORT)
+
+ # THEN: pjlink_udp.ack_list should equal test_list
+ # NOTE: This assert only returns AssertionError - does not list differences. Maybe add a compare function?
+ if pjlink_udp.ackn_list != check_list:
+ # Check this way so we can print differences to stdout
+ print('\nackn_list: ', pjlink_udp.ackn_list)
+ print('test_list: ', check_list)
+ assert pjlink_udp.ackn_list == check_list
+ mock_log.debug.assert_has_calls(log_debug_calls)
+
+ @patch.object(openlp.core.projectors.pjlink, 'log')
+ def test_process_srch(self, mock_log):
+ """
+ Test process_srch method
+ """
+ # GIVEN: Test setup
+ pjlink_udp = PJLinkUDP(projector_list=self.test_list)
+ log_debug_calls = [call('(UDP) PJLinkUDP() Initialized'),
+ call('(UDP) SRCH packet received - ignoring')]
+
+ # WHEN: process_srch called
+ pjlink_udp.process_srch(data=None, host=None, port=None)
+
+ # THEN: debug log entry should be entered
+ mock_log.debug.assert_has_calls(log_debug_calls)
=== renamed file 'tests/functional/openlp_core/common/test_projector_utilities.py' => 'tests/openlp_core/projectors/test_projector_utilities.py'
=== renamed file 'tests/interfaces/openlp_core/ui/test_projectoreditform.py' => 'tests/openlp_core/projectors/test_projectoreditform.py'
=== renamed file 'tests/interfaces/openlp_core/ui/test_projectormanager.py' => 'tests/openlp_core/projectors/test_projectormanager.py'
=== renamed file 'tests/interfaces/openlp_core/ui/test_projectorsourceform.py' => 'tests/openlp_core/projectors/test_projectorsourceform.py'
--- tests/interfaces/openlp_core/ui/test_projectorsourceform.py 2017-12-29 09:15:48 +0000
+++ tests/openlp_core/projectors/test_projectorsourceform.py 2018-02-10 09:09:49 +0000
@@ -125,7 +125,6 @@
select_form = SourceSelectSingle(parent=None, projectordb=self.projectordb)
select_form.edit = True
select_form.exec(projector=self.projector)
- projector = select_form.projector
# THEN: Verify all 4 buttons are available
assert len(select_form.button_box.buttons()) == 4, \
@@ -144,7 +143,6 @@
select_form = SourceSelectSingle(parent=None, projectordb=self.projectordb)
select_form.edit = False
select_form.exec(projector=self.projector)
- projector = select_form.projector
# THEN: Verify only 2 buttons are available
assert len(select_form.button_box.buttons()) == 2, \
=== modified file 'tests/resources/projector/data.py'
--- tests/resources/projector/data.py 2017-12-29 09:15:48 +0000
+++ tests/resources/projector/data.py 2018-02-10 09:09:49 +0000
@@ -45,7 +45,8 @@
serial_no='Serial Number 1',
sw_version='Version 1',
model_filter='Filter type 1',
- model_lamp='Lamp type 1')
+ model_lamp='Lamp type 1',
+ mac_adx='11:11:11:11:11:11')
TEST2_DATA = dict(ip='222.222.222.222',
port='2222',
@@ -56,7 +57,8 @@
serial_no='Serial Number 2',
sw_version='Version 2',
model_filter='Filter type 2',
- model_lamp='Lamp type 2')
+ model_lamp='Lamp type 2',
+ mac_adx='22:22:22:22:22:22')
TEST3_DATA = dict(ip='333.333.333.333',
port='3333',
@@ -67,7 +69,8 @@
serial_no='Serial Number 3',
sw_version='Version 3',
model_filter='Filter type 3',
- model_lamp='Lamp type 3')
+ model_lamp='Lamp type 3',
+ mac_adx='33:33:33:33:33:33')
TEST_VIDEO_CODES = {
'11': 'RGB 1',
Follow ups