Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ The REST API is documented using Swagger (OpenAPI). After installing and running


## Change Log
- 1.1.4 - minor bug fixes and code cleanup
- 1.1.3 - introduced configurable footer menu for links in bottom of the default template
- 1.1.2 - minor security updates (removed unused JS files), setup.py now reads dependencies from requirements.txt
- 1.1.1 - Machine API Key rewrited.
Expand Down
2 changes: 1 addition & 1 deletion flowapp/__about__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "1.1.3"
__version__ = "1.1.4"
11 changes: 7 additions & 4 deletions flowapp/flowspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,12 @@ def filter_rules_action(user_actions, rules):
editable = []
viewonly = []
for rule in rules:
if rule.action_id in user_actions:
editable.append(rule)
else:
viewonly.append(rule)
try:
if rule.action_id in user_actions:
editable.append(rule)
else:
viewonly.append(rule)
except AttributeError:
editable.append(rule) # If rule has no action_id, treat it as editable

return editable, viewonly
191 changes: 191 additions & 0 deletions flowapp/tests/test_validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
editable_range,
network_in_range,
range_in_network,
filter_rules_in_network,
split_rules_for_user,
filter_rtbh_rules,
split_rtbh_rules_for_user,
)


Expand Down Expand Up @@ -354,3 +358,190 @@ def test_network_validator_invalid(field, address, mask):
form = MockForm(address, mask)
with pytest.raises(ValidationError):
validator(form, field)


# Mock rule classes for testing robust attribute handling
class MockRule:
"""Mock rule with all expected attributes"""

def __init__(self, source=None, source_mask=None, dest=None, dest_mask=None):
self.source = source
self.source_mask = source_mask
self.dest = dest
self.dest_mask = dest_mask


class MockRuleIncomplete:
"""Mock rule with missing attributes"""

def __init__(self, name=None):
self.name = name
# Intentionally missing source, source_mask, dest, dest_mask attributes


class MockRulePartial:
"""Mock rule with some attributes"""

def __init__(self, source=None):
self.source = source
# Missing source_mask, dest, dest_mask attributes


class MockRTBHRule:
"""Mock RTBH rule with all expected attributes"""

def __init__(self, ipv4=None, ipv4_mask=None, ipv6=None, ipv6_mask=None):
self.ipv4 = ipv4
self.ipv4_mask = ipv4_mask
self.ipv6 = ipv6
self.ipv6_mask = ipv6_mask


class MockRTBHRuleIncomplete:
"""Mock RTBH rule with missing attributes"""

def __init__(self, name=None):
self.name = name
# Intentionally missing ipv4, ipv4_mask, ipv6, ipv6_mask attributes


# Tests for filter_rules_in_network with robust attribute handling
def test_filter_rules_in_network_normal_rules():
"""Test filter_rules_in_network with normal rule objects"""
net_ranges = ["192.168.0.0/16", "10.0.0.0/8"]
rules = [
MockRule("192.168.1.0", "24", "10.0.1.0", "24"), # Should match
MockRule("172.16.1.0", "24", "172.16.2.0", "24"), # Should not match
MockRule("10.1.0.0", "16", None, None), # Should match (source only)
]

filtered = filter_rules_in_network(net_ranges, rules)
assert len(filtered) == 2
assert rules[0] in filtered # 192.168.x.x rule
assert rules[2] in filtered # 10.x.x.x rule
assert rules[1] not in filtered # 172.16.x.x rule


def test_filter_rules_in_network_missing_attributes():
"""Test filter_rules_in_network with rules missing required attributes"""
net_ranges = ["192.168.0.0/16"]
rules = [
MockRule("192.168.1.0", "24", "10.0.1.0", "24"), # Normal rule - should match
MockRuleIncomplete("rule_without_network_attrs"), # Missing attrs - should be included
MockRulePartial("172.16.1.0"), # Partial attrs - should be included
]

filtered = filter_rules_in_network(net_ranges, rules)
assert len(filtered) == 3 # All rules should be included
assert all(rule in filtered for rule in rules)


def test_filter_rules_in_network_none_values():
"""Test filter_rules_in_network with None values in attributes"""
net_ranges = ["192.168.0.0/16"]
rules = [
MockRule("192.168.1.0", "24", None, None), # Should match on source
MockRule(None, None, "192.168.2.0", "24"), # Should match on dest
MockRule(None, None, None, None), # Should not match
]

filtered = filter_rules_in_network(net_ranges, rules)
assert len(filtered) == 2
assert rules[0] in filtered
assert rules[1] in filtered
assert rules[2] not in filtered


# Tests for split_rules_for_user with robust attribute handling
def test_split_rules_for_user_normal_rules():
"""Test split_rules_for_user with normal rule objects"""
net_ranges = ["192.168.0.0/16"]
rules = [
MockRule("192.168.1.0", "24", "10.0.1.0", "24"), # Should be user rule
MockRule("172.16.1.0", "24", "172.16.2.0", "24"), # Should be rest rule
]

user_rules, rest_rules = split_rules_for_user(net_ranges, rules)
assert len(user_rules) == 1
assert len(rest_rules) == 1
assert rules[0] in user_rules
assert rules[1] in rest_rules


def test_split_rules_for_user_missing_attributes():
"""Test split_rules_for_user with rules missing required attributes"""
net_ranges = ["192.168.0.0/16"]
rules = [
MockRule("192.168.1.0", "24", "10.0.1.0", "24"), # Normal rule - user rule
MockRuleIncomplete("rule_without_attrs"), # Missing attrs - should be user rule
MockRule("172.16.1.0", "24", "172.16.2.0", "24"), # Normal rule - rest rule
]

user_rules, rest_rules = split_rules_for_user(net_ranges, rules)
assert len(user_rules) == 2 # Normal matching rule + incomplete rule
assert len(rest_rules) == 1
assert rules[0] in user_rules # Matching rule
assert rules[1] in user_rules # Incomplete rule treated as editable
assert rules[2] in rest_rules # Non-matching rule


# Tests for filter_rtbh_rules with robust attribute handling
def test_filter_rtbh_rules_normal_rules():
"""Test filter_rtbh_rules with normal RTBH rule objects"""
net_ranges = ["192.168.0.0/16", "2001:db8::/32"]
rules = [
MockRTBHRule("192.168.1.0", "24", None, None), # Should match on IPv4
MockRTBHRule(None, None, "2001:db8:1::", "48"), # Should match on IPv6
MockRTBHRule("172.16.1.0", "24", "2001:db9::", "32"), # Should not match
]

filtered = filter_rtbh_rules(net_ranges, rules)
assert len(filtered) == 2
assert rules[0] in filtered
assert rules[1] in filtered
assert rules[2] not in filtered


# Tests for split_rtbh_rules_for_user with robust attribute handling
def test_split_rtbh_rules_for_user_normal_rules():
"""Test split_rtbh_rules_for_user with normal RTBH rule objects"""
net_ranges = ["192.168.0.0/16"]
rules = [
MockRTBHRule("192.168.1.0", "24", None, None), # Should be filtered (user)
MockRTBHRule("172.16.1.0", "24", None, None), # Should be read-only
]

filtered, read_only = split_rtbh_rules_for_user(net_ranges, rules)
assert len(filtered) == 1
assert len(read_only) == 1
assert rules[0] in filtered
assert rules[1] in read_only


# Edge case tests
def test_filter_functions_empty_input():
"""Test all filter functions with empty input"""
net_ranges = ["192.168.0.0/16"]

# Empty rules list
assert filter_rules_in_network(net_ranges, []) == []
assert split_rules_for_user(net_ranges, []) == ([], [])
assert filter_rtbh_rules(net_ranges, []) == []
assert split_rtbh_rules_for_user(net_ranges, []) == ([], [])


def test_filter_functions_empty_net_ranges():
"""Test filter functions with empty network ranges"""
rules = [MockRule("192.168.1.0", "24", None, None)]
rtbh_rules = [MockRTBHRule("192.168.1.0", "24", None, None)]

# Empty network ranges - nothing should match
assert filter_rules_in_network([], rules) == []
user_rules, rest_rules = split_rules_for_user([], rules)
assert user_rules == []
assert rest_rules == rules

assert filter_rtbh_rules([], rtbh_rules) == []
filtered, read_only = split_rtbh_rules_for_user([], rtbh_rules)
assert filtered == []
assert read_only == rtbh_rules
29 changes: 18 additions & 11 deletions flowapp/validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,15 @@ def filter_rules_in_network(net_ranges, rules):
:param rules: list of rules (ipv4 or ipv6
:return: filtered list of rules
"""
return [
rule
for rule in rules
if network_in_range(rule.source, rule.source_mask, net_ranges)
or network_in_range(rule.dest, rule.dest_mask, net_ranges)
]
filtered_rules = []
for rule in rules:
try:
if network_in_range(rule.source, rule.source_mask, net_ranges) or network_in_range(rule.dest, rule.dest_mask, net_ranges):
filtered_rules.append(rule)
except AttributeError:
# If rule has no source or dest, include it (consistent with split_rules_for_user)
filtered_rules.append(rule)
return filtered_rules


def split_rules_for_user(net_ranges, rules):
Expand All @@ -30,12 +33,16 @@ def split_rules_for_user(net_ranges, rules):
user_rules = []
rest_rules = []
for rule in rules:
if network_in_range(rule.source, rule.source_mask, net_ranges) or network_in_range(
rule.dest, rule.dest_mask, net_ranges
):
try:
if network_in_range(rule.source, rule.source_mask, net_ranges) or network_in_range(
rule.dest, rule.dest_mask, net_ranges
):
user_rules.append(rule)
else:
rest_rules.append(rule)
except AttributeError:
# If rule has no source or dest, the split is not possible
user_rules.append(rule)
Comment on lines +36 to 45
Copy link

Copilot AI Sep 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The try-catch block only catches AttributeError but not other exceptions that network_in_range might raise. Based on the context, network_in_range can raise ValueError for invalid network formats, which would not be caught here and could cause unexpected crashes.

Copilot uses AI. Check for mistakes.
Copy link
Collaborator Author

@jirivrany jirivrany Sep 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ValueError is catched in the network_in_range validator

def network_in_range(address, mask, net_ranges):
    """
    check if given ip address is in user network ranges
    :param address: string ip_address
    :param net_ranges: list of network ranges
    :return: boolean
    """
    result = False
    network = "{}/{}".format(address, mask)
    for adr_range in net_ranges:
        try:
            result = result or subnet_of(ipaddress.ip_network(network), ipaddress.ip_network(adr_range))
        except TypeError:  # V4 can't be a subnet of V6 and vice versa
            pass
        except ValueError:
            return False

    return result

else:
rest_rules.append(rule)

return user_rules, rest_rules

Expand Down