Skip to content

Commit e20280c

Browse files
committed
add filter_results
1 parent eabdff9 commit e20280c

File tree

5 files changed

+211
-22
lines changed

5 files changed

+211
-22
lines changed

netbox_scripthelper/api/views.py

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,20 @@ def get_results_limit(request):
1818
return None
1919

2020

21+
def filter_results(request, objects):
22+
q = request.query_params.get('q', '')
23+
limit = get_results_limit(request)
24+
filtered = []
25+
for x in objects:
26+
if limit and len(filtered) > limit:
27+
break
28+
if q and not str(x).startswith(str(q)):
29+
continue
30+
filtered.append(x)
31+
32+
return filtered[:limit]
33+
34+
2135
class AvailableIPAddressesView(ObjectValidationMixin, APIView):
2236
queryset = IPAddress.objects.all()
2337

@@ -26,14 +40,9 @@ def get_parent(self, request, pk):
2640

2741
def get(self, request, pk):
2842
parent = self.get_parent(request, pk)
29-
limit = get_results_limit(request)
3043

3144
# Calculate available IPs within the parent
32-
ip_list = []
33-
for index, ip in enumerate(parent.get_available_ips(), start=1):
34-
if index == limit:
35-
break
36-
ip_list.append(ip)
45+
ip_list = filter_results(request, parent.get_available_ips())
3746
serializer = AvailableIPSerializer(ip_list, many=True, context={
3847
'request': request,
3948
'parent': parent,
@@ -63,9 +72,8 @@ class AvailableVLANsView(ObjectValidationMixin, APIView):
6372

6473
def get(self, request, pk):
6574
vlangroup = get_object_or_404(VLANGroup.objects.restrict(request.user), pk=pk)
66-
limit = get_results_limit(request)
6775

68-
available_vlans = vlangroup.get_available_vids()[:limit]
76+
available_vlans = filter_results(request, vlangroup.get_available_vids())
6977
serializer = AvailableVLANSerializer(available_vlans, many=True, context={
7078
'request': request,
7179
'group': vlangroup,
@@ -84,8 +92,8 @@ def get(self, request, pk):
8492
prefix = get_object_or_404(Prefix.objects.restrict(request.user), pk=pk)
8593
available_prefixes = prefix.get_available_prefixes()
8694
prefix_len = int(request.query_params.get('prefixlen', 0))
87-
limit = get_results_limit(request)
88-
subnets = IPSplitter(available_prefixes).split(prefix_len, limit)
95+
subnets = IPSplitter(available_prefixes).split(prefix_len)
96+
subnets = filter_results(request, subnets)
8997

9098
serializer = AvailablePrefixSerializer(subnets, many=True, context={
9199
'request': request,

netbox_scripthelper/tests/__init__.py

Whitespace-only changes.
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
import unittest
2+
from netaddr import IPSet, IPNetwork, IPAddress, IPRange
3+
from netbox_scripthelper.utils import get_available_ips_list, IPSplitter
4+
5+
6+
class TestGetAvailableIPList(unittest.TestCase):
7+
8+
def test_full_network(self):
9+
ipset = IPSet([IPNetwork('192.168.1.0/24'),])
10+
iplist = get_available_ips_list(ipset, '192.168.1.10', 3)
11+
self.assertListEqual(iplist, [IPAddress('192.168.1.10'), IPAddress('192.168.1.11'), IPAddress('192.168.1.12')])
12+
13+
def test_range_with_hole(self):
14+
ipset = IPSet([IPRange('192.168.1.1', '192.168.1.11'), IPRange('192.168.1.13', '192.168.1.20')])
15+
iplist = get_available_ips_list(ipset, '192.168.1.10', 3)
16+
self.assertListEqual(iplist, [IPAddress('192.168.1.10'), IPAddress('192.168.1.11'), IPAddress('192.168.1.13')])
17+
18+
def test_not_enough(self):
19+
ipset = IPSet([IPRange('192.168.1.10', '192.168.1.11'), IPRange('192.168.1.13', '192.168.1.20')])
20+
raised = False
21+
try:
22+
get_available_ips_list(ipset, '192.168.1.10', 30)
23+
except IndexError:
24+
raised = True
25+
self.assertTrue(raised)
26+
27+
28+
class TestIPSplitter(unittest.TestCase):
29+
30+
def test(self):
31+
cases = [
32+
("limit", IPSet([IPNetwork('192.168.1.0/24'),]), (30, 2), [IPNetwork('192.168.1.0/30'), IPNetwork('192.168.1.4/30')]),
33+
("no limit", IPSet([IPNetwork('192.168.1.0/29'),]), (30,), [IPNetwork('192.168.1.0/30'), IPNetwork('192.168.1.4/30')]),
34+
("no free space", IPSet([IPNetwork('192.168.1.0/29'),]), (28,), [])
35+
]
36+
for case in cases:
37+
got = IPSplitter(case[1]).split(*case[2])
38+
self.assertListEqual(case[-1], got, case[0])
39+
IPSplitter(IPSet([IPNetwork("10.0.0.0/16"),])).split(24)
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
import unittest
2+
import unittest.mock as mock
3+
from utilities.testing.base import TestCase
4+
5+
from django.urls import reverse
6+
from django.test import override_settings
7+
from rest_framework import status
8+
9+
from ipam.models import VLANGroup, VLAN, Prefix, IPAddress
10+
from netbox_scripthelper.api.views import filter_results
11+
12+
13+
class TestFilterResults(unittest.TestCase):
14+
15+
def create_request(self, **kwargs):
16+
mm = mock.MagicMock()
17+
mm.query_params = kwargs
18+
return mm
19+
20+
def test(self):
21+
22+
cases = [
23+
("empty", self.create_request(), [], []),
24+
("only limit", self.create_request(limit=2), [1, 2, 3, 4, 5], [1, 2]),
25+
("only limit: all", self.create_request(limit=20), [1, 2, 3, 4, 5], [1, 2, 3, 4, 5]),
26+
("only q: matched", self.create_request(q=2), [1, 2, 3, 20, 5], [2, 20]),
27+
("only q: not matched", self.create_request(q=7), [1, 2, 3, 20, 5], []),
28+
("q and limit", self.create_request(q=2, limit=2), [1, 2, 3, 20, 21], [2, 20]),
29+
("q and limit: all", self.create_request(q=2, limit=10), [1, 2, 3, 20, 21], [2, 20, 21]),
30+
]
31+
32+
for case in cases:
33+
got = filter_results(case[1], case[2])
34+
self.assertListEqual(got, case[-1], case[0])
35+
36+
37+
class TestAvailablesVLANS(TestCase):
38+
39+
@classmethod
40+
def setUpTestData(cls):
41+
VLANGroup.objects.create(name='TestVG1', slug='testvg1', min_vid=100, max_vid=1000)
42+
vg2 = VLANGroup.objects.create(name='TestVG2', slug='testvg2', min_vid=10, max_vid=15)
43+
VLAN.objects.create(name='TestVlan2', vid=12, group=vg2)
44+
45+
@override_settings(EXEMPT_VIEW_PERMISSIONS=['*'], LOGIN_REQUIRED=False)
46+
def test(self):
47+
cases = [
48+
("no_limit_no_filter", "", 5),
49+
("with_limit_no_filter", "?limit=10", 5),
50+
("with_limit_with_filter_not_match", "?limit=10&q=2", 0),
51+
("with_limit_with_filter_match", "?limit=2&q=1", 2),
52+
("no_limit_with_filter", "?q=1", 5),
53+
]
54+
vg = VLANGroup.objects.get(name='TestVG2')
55+
url = reverse('plugins-api:netbox_scripthelper-api:vlangroup-available-vlans', kwargs={'pk': vg.pk})
56+
for case in cases:
57+
response = self.client.get(f'{url}{case[1]}')
58+
self.assertEqual(response.status_code, status.HTTP_200_OK, case[0])
59+
self.assertEqual(len(response.data['results']), case[-1], case[0])
60+
61+
62+
class TestAvailablesPrefixes(TestCase):
63+
64+
@classmethod
65+
def setUpTestData(cls):
66+
Prefix.objects.create(prefix='172.20.0.0/24')
67+
Prefix.objects.create(prefix='172.20.0.128/27')
68+
Prefix.objects.create(prefix='172.20.0.192/27')
69+
70+
@override_settings(EXEMPT_VIEW_PERMISSIONS=['*'], LOGIN_REQUIRED=False)
71+
def test_fixed_mask(self):
72+
cases = [
73+
("no_limit_no_filter", "?prefixlen=27", 6),
74+
("with_limit_no_filter", "?prefixlen=27&limit=3", 3),
75+
("with_limit_with_filter", "?prefixlen=27&limit=3&q=172.20.0.1", 1),
76+
("no_limit_with_filter", "?prefixlen=27&q=172.20.0.1", 1),
77+
]
78+
p = Prefix.objects.get(prefix='172.20.0.0/24')
79+
url = reverse('plugins-api:netbox_scripthelper-api:prefix-available-prefixes', kwargs={'pk': p.pk})
80+
for case in cases:
81+
response = self.client.get(f'{url}{case[1]}')
82+
self.assertEqual(response.status_code, status.HTTP_200_OK, case[0])
83+
self.assertEqual(len(response.data['results']), case[-1], case[0])
84+
85+
@override_settings(EXEMPT_VIEW_PERMISSIONS=['*'], LOGIN_REQUIRED=False)
86+
def test(self):
87+
cases = [
88+
("no_limit_no_filter", "", 3),
89+
("with_limit_no_filter", "?limit=2", 2),
90+
("with_limit_with_filter", "?limit=3&q=172.20.0.1", 1),
91+
("empty", "?limit=3&q=172.30.0.1", 0),
92+
]
93+
p = Prefix.objects.get(prefix='172.20.0.0/24')
94+
url = reverse('plugins-api:netbox_scripthelper-api:prefix-available-prefixes', kwargs={'pk': p.pk})
95+
for case in cases:
96+
response = self.client.get(f'{url}{case[1]}')
97+
self.assertEqual(response.status_code, status.HTTP_200_OK, case[0])
98+
self.assertEqual(len(response.data['results']), case[-1], case[0])
99+
100+
101+
class TestAvailablesIPAddresses(TestCase):
102+
103+
@classmethod
104+
def setUpTestData(cls):
105+
Prefix.objects.create(prefix='192.168.0.0/28')
106+
IPAddress.objects.bulk_create([
107+
IPAddress(address='192.168.0.2/28'),
108+
IPAddress(address='192.168.0.5/28')
109+
])
110+
111+
@override_settings(EXEMPT_VIEW_PERMISSIONS=['*'], LOGIN_REQUIRED=False)
112+
def test(self):
113+
cases = [
114+
("no_limit_no_filter", "", 12),
115+
("with_limit_no_filter", "?limit=5", 5),
116+
("with_limit_with_filter", "?limit=3&q=192.168.0.1", 3),
117+
("empty", "?limit=3&q=172.30.0.1", 0),
118+
]
119+
p = Prefix.objects.get(prefix='192.168.0.0/28')
120+
url = reverse('plugins-api:netbox_scripthelper-api:prefix-available-ips', kwargs={'pk': p.pk})
121+
for case in cases:
122+
response = self.client.get(f'{url}{case[1]}')
123+
self.assertEqual(response.status_code, status.HTTP_200_OK, case[0])
124+
self.assertEqual(len(response.data['results']), case[-1], case[0])

netbox_scripthelper/utils.py

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
1-
from typing import List
2-
from netaddr import IPSet, IPNetwork, IPAddress
3-
from django.core.exceptions import ValidationError
4-
from ipam.models import Prefix
1+
from typing import List, Any
2+
from netaddr import IPSet, IPNetwork, IPAddress, IPRange
53

64

75
class IPSplitter:
@@ -12,7 +10,7 @@ class IPSplitter:
1210
def __init__(self, prefixes: IPSet):
1311
self.prefixes = prefixes
1412

15-
def split(self, prefix_len: int, limit: int) -> List[IPNetwork]:
13+
def split(self, prefix_len: int, limit: int = None) -> List[IPNetwork]:
1614
subnets = []
1715
if prefix_len == 0:
1816
return list(self.prefixes.iter_cidrs())
@@ -27,14 +25,34 @@ def split(self, prefix_len: int, limit: int) -> List[IPNetwork]:
2725
return subnets
2826

2927

30-
def get_available_ip_list(prefix: Prefix, base_addr: str, size: int) -> List[IPAddress]:
31-
available_addresses = prefix.get_available_ips()
28+
def get_available_ips_list(ipset: IPSet, base_addr: str, size: int) -> List[IPAddress]:
29+
"""
30+
Returns a list of addresses from `ipset`, starting with `base_addr`.
31+
The size of the list is limited by the "size" argument.
32+
Raises IndexError if there are not enough free addresses.
33+
"""
34+
3235
base_addr = base_addr.split('/')[0]
36+
all = IPSet(['0.0.0.0/0'])
37+
excluded = IPSet([IPRange('0.0.0.0', IPAddress(base_addr) - 1),])
38+
39+
availables = (all ^ excluded) & ipset
3340
addresses = []
34-
for i in range(0, size):
35-
next_addr = IPAddress(base_addr) + i
36-
if next_addr in available_addresses:
37-
addresses.append(next_addr)
41+
for ip in availables:
42+
if len(addresses) >= size:
43+
break
44+
addresses.append(ip)
3845
if len(addresses) != size:
39-
raise ValidationError("not enough free addresses")
46+
raise IndexError("not enough free addresses")
4047
return addresses
48+
49+
50+
def make_link(obj: Any) -> str:
51+
"""
52+
Returns a reference to the object enclosed in the <a> tag.
53+
"""
54+
55+
try:
56+
return f'<a href="{obj.get_absolute_url()}">{str(obj)}</a>'
57+
except Exception:
58+
return str(obj)

0 commit comments

Comments
 (0)