Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 3 additions & 10 deletions api/desecapi/models/records.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,24 +56,17 @@
)


def replace_ip_subnet(queryset, subnet):
def replace_ip_subnet(records, subnet):
"""
Fetches A or AAAA record contents from an RRset queryset (depending on the subnet's
address family) and returns them with their subnet bits replaced accordingly.
Takes a list of A or AAAA records and returns them with their subnet bits replaced accordingly.
"""
subnet = ip_network(subnet, strict=False)
try:
records = queryset.get(
type={IPv4Network: "A", IPv6Network: "AAAA"}[type(subnet)]
).records.all()
except ObjectDoesNotExist:
records = []
return [
str(
ip_address(int(ip_address(record.content)) & int(subnet.hostmask)) # suffix
+ int(subnet.network_address) # prefix
)
for record in records
if isinstance(subnet.network_address, type(ip_address(record.content)))
]


Expand Down
205 changes: 205 additions & 0 deletions api/desecapi/tests/test_dyndns12update.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,20 @@ def test_ddclient_dyndns2_v4_invalid(self):
self.assertStatus(response, status.HTTP_400_BAD_REQUEST)
self.assertIn("malformed", str(response.data))

def test_ddclient_dyndns2_v4_valid_priority(self):
# /nic/update?system=dyndns&hostname=foobar.dedyn.io&myip=10.2.3.4asdf
params = {
"domain_name": self.my_domain.name,
"system": "dyndns",
"hostname": self.my_domain.name,
"myip": "invalid",
"ip": "10.4.2.1",
}
response = self.client.get(self.reverse("v1:dyndns12update"), params)
self.assertStatus(response, status.HTTP_200_OK)
self.assertEqual(response.data, "good")
self.assertIP(ipv4="10.4.2.1")

def test_ddclient_dyndns2_v4_invalid_or_foreign_domain(self):
# /nic/update?system=dyndns&hostname=<...>&myip=10.2.3.4
for name in [
Expand Down Expand Up @@ -296,6 +310,197 @@ def test_subnet(self):
self.assertEqual(response.data, "good")
self.assertIP(ipv6="2a01::3303:72dc:f412:7233", subname="foo")

def test_update_multiple_v4(self):
# /nic/update?hostname=a.io,sub.a.io&myip=1.2.3.4
new_ip = "1.2.3.4"
domain1 = self.my_domain.name
domain2 = "sub." + self.my_domain.name

with self.assertRequests(
Comment thread
peterthomassen marked this conversation as resolved.
self.request_pdns_zone_update(domain1),
self.request_pdns_zone_axfr(domain1),
):
response = self.client.get(
self.reverse("v1:dyndns12update"),
{"hostname": f"{domain1},{domain2}", "myip": new_ip},
)

self.assertStatus(response, status.HTTP_200_OK)
self.assertEqual(response.data, "good")

self.assertIP(ipv4=new_ip)
self.assertIP(subname="sub", ipv4=new_ip)

def test_update_multiple_username_param(self):
# /nic/update?username=a.io,sub.a.io&myip=1.2.3.4
new_ip = "1.2.3.4"
domain1 = self.my_domain.name
domain2 = "sub." + self.my_domain.name

with self.assertRequests(
self.request_pdns_zone_update(domain1),
self.request_pdns_zone_axfr(domain1),
):
response = self.client_token_authorized.get(
self.reverse("v1:dyndns12update"),
{"username": f"{domain1},{domain2}", "myip": new_ip},
)

self.assertStatus(response, status.HTTP_200_OK)
self.assertEqual(response.data, "good")

self.assertIP(ipv4=new_ip)
self.assertIP(subname="sub", ipv4=new_ip)

def test_update_multiple_v4v6(self):
# /nic/update?hostname=a.io,sub.a.io&myip=1.2.3.4&myipv6=1::1
new_ip4 = "1.2.3.4"
new_ip6 = "1::1"
domain1 = self.my_domain.name
domain2 = "sub." + domain1

with self.assertRequests(
self.request_pdns_zone_update(domain1),
self.request_pdns_zone_axfr(domain1),
):
response = self.client.get(
self.reverse("v1:dyndns12update"),
{
"hostname": f"{domain1},{domain2}",
"myip": new_ip4,
"myipv6": new_ip6,
},
)

self.assertStatus(response, status.HTTP_200_OK)
self.assertEqual(response.data, "good")

self.assertIP(ipv4=new_ip4, ipv6=new_ip6)
self.assertIP(subname="sub", ipv4=new_ip4, ipv6=new_ip6)

def test_update_multiple_with_subnet(self):
# /nic/update?hostname=sub1.a.io,sub2.a.io&myip=10.1.0.0/16
domain1 = "sub1." + self.my_domain.name
domain2 = "sub2." + self.my_domain.name
self.create_rr_set(
self.my_domain, ["10.0.0.1"], subname="sub1", type="A", ttl=60
)
self.create_rr_set(
self.my_domain, ["10.0.0.2"], subname="sub2", type="A", ttl=60
)

with self.assertRequests(
self.request_pdns_zone_update(self.my_domain.name),
self.request_pdns_zone_axfr(self.my_domain.name),
):
response = self.client.get(
self.reverse("v1:dyndns12update"),
{"hostname": f"{domain1},{domain2}", "myip": "10.1.0.0/16"},
)

self.assertStatus(response, status.HTTP_200_OK)
self.assertEqual(response.data, "good")

self.assertIP(subname="sub1", ipv4="10.1.0.1")
self.assertIP(subname="sub2", ipv4="10.1.0.2")

def test_update_multiple_with_one_being_already_up_to_date(self):
# /nic/update?hostname=a.io,sub.a.io&myip=1.2.3.4
new_ip = "1.2.3.4"
domain1 = self.my_domain.name
domain2 = "sub." + domain1
self.create_rr_set(self.my_domain, [new_ip], subname="sub", type="A", ttl=60)

with self.assertRequests(
self.request_pdns_zone_update(domain1),
self.request_pdns_zone_axfr(domain1),
):
response = self.client.get(
self.reverse("v1:dyndns12update"),
{"hostname": f"{domain1},{domain2}", "myip": new_ip},
)

self.assertStatus(response, status.HTTP_200_OK)
self.assertEqual(response.data, "good")

self.assertIP(ipv4=new_ip)
self.assertIP(subname="sub", ipv4=new_ip)

def test_update_same_domain_twice(self):
Comment thread
peterthomassen marked this conversation as resolved.
# /nic/update?hostname=foobar.dedyn.io,foobar.dedyn.io&myip=1.2.3.4
new_ip = "1.2.3.4"

with self.assertRequests(
self.request_pdns_zone_update(self.my_domain.name),
self.request_pdns_zone_axfr(self.my_domain.name),
):
response = self.client.get(
self.reverse("v1:dyndns12update"),
{
"hostname": f"{self.my_domain.name},{self.my_domain.name}",
"myip": new_ip,
},
)

self.assertStatus(response, status.HTTP_200_OK)
self.assertEqual(response.data, "good")

self.assertIP(ipv4=new_ip)

def test_update_multiple_with_invalid_subnet(self):
# /nic/update?hostname=sub1.a.io,sub2.a.io&myip=1.2.3.4/64
domain1 = "sub1." + self.my_domain.name
domain2 = "sub2." + self.my_domain.name

with self.assertRequests():
response = self.client.get(
self.reverse("v1:dyndns12update"),
{"hostname": f"{domain1},{domain2}", "myip": "1.2.3.4/64"},
)

self.assertContains(
response, "invalid subnet", status_code=status.HTTP_400_BAD_REQUEST
)

def test_update_multiple_with_subdomains_of_different_domains(self):
# /nic/update?hostname=a.io,b.io&myip=1.2.3.4
domain1 = "sub1." + self.my_domain.name
domain2 = "sub2." + self.create_domain(owner=self.owner).name

with self.assertRequests():
response = self.client.get(
self.reverse("v1:dyndns12update"),
{"hostname": f"{domain1},{domain2}", "myip": "1.2.3.4"},
)

self.assertContains(
response,
"Cannot update subdomains from more than one domain.",
status_code=status.HTTP_400_BAD_REQUEST,
)

def test_update_with_trailing_comma(self):
response = self.client_token_authorized.get(
self.reverse("v1:dyndns12update"),
{"host_id": f"{self.my_domain.name},", "myip": "1.2.3.4"},
)

self.assertStatus(response, status.HTTP_404_NOT_FOUND)
self.assertEqual(response.content, b"nohost")

def test_update_with_partial_ownership(self):
with self.assertRequests():
response = self.client.get(
self.reverse("v1:dyndns12update"),
{
"hostname": f"{self.my_domain.name},{self.other_domain.name}",
"myip": "1.2.3.4",
},
)

self.assertStatus(response, status.HTTP_404_NOT_FOUND)
self.assertEqual(response.content, b"nohost")


class SingleDomainDynDNS12UpdateTest(DynDNS12UpdateTest):
NUM_OWNED_DOMAINS = 1
Expand Down
Loading