Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
374 changes: 372 additions & 2 deletions dpkt/bgp.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ def unpack(self, buf):
elif self.type == UPDATE:
self.data = self.update = self.Update(self.data)
elif self.type == NOTIFICATION:
self.data = self.notifiation = self.Notification(self.data)
self.data = self.notification = self.Notification(self.data)
elif self.type == KEEPALIVE:
self.data = self.keepalive = self.Keepalive(self.data)
elif self.type == ROUTE_REFRESH:
Expand Down Expand Up @@ -601,7 +601,7 @@ def __bytes__(self):
b''.join(map(bytes, self.snpas)) + \
b''.join(map(bytes, self.announced))

class SNPA(object):
class SNPA(dpkt.Packet):
__hdr__ = (
('len', 'B', 0),
)
Expand Down Expand Up @@ -1211,3 +1211,373 @@ def test_bgp_add_path_6_1_as_path():
assert (attribute.flags == 0x80)
assert (attribute.len == 4)
assert (socket.inet_ntop(socket.AF_INET, bytes(attribute.originator_id)) == '10.0.15.1')


def test_attribute_accessors():
from binascii import unhexlify

buf = unhexlify(
'00' # flags
'01' # type (ORIGIN)

'01' # length
'00' # Origin type
)
attribute = BGP.Update.Attribute(buf)
assert isinstance(attribute.data, BGP.Update.Attribute.Origin)
for attr in ['optional', 'transitive', 'partial', 'extended_length']:
assert getattr(attribute, attr) == 0

# check we can set..
setattr(attribute, attr, 1)
assert getattr(attribute, attr) == 1

# and also unset
setattr(attribute, attr, 0)
assert getattr(attribute, attr) == 0


def test_snpa():
from binascii import unhexlify

buf = unhexlify(
'04' # len (in semi-octets)
'1234' # data
)
snpa = BGP.Update.Attribute.MPReachNLRI.SNPA(buf)
assert snpa.len == 4 # length of the data in semi-octets
assert len(snpa) == 3 # length of the snpa in bytes (including header)
assert bytes(snpa) == buf


def test_mpreachnlri():
from binascii import unhexlify

buf = unhexlify(
'0000' # afi
'00' # safi

'00' # nlen
'01' # num SNPAs

# SNPA
'04' # len
'1234' # data
)
mp = BGP.Update.Attribute.MPReachNLRI(buf)
assert len(mp.snpas) == 1
assert bytes(mp) == buf


def test_notification():
from binascii import unhexlify

buf_notification = unhexlify(
'11' # code
'22' # subcode

'33' # error
)
notification = BGP.Notification(buf_notification)
assert notification.code == 0x11
assert notification.subcode == 0x22
assert notification.error == b'\x33'
assert bytes(notification) == buf_notification

buf_bgp_hdr = unhexlify(
'11111111111111111111111111111111' # marker
'0016' # len
'03' # type (NOTIFICATION)
)
bgp = BGP(buf_bgp_hdr + buf_notification)

assert hasattr(bgp, 'notification')
assert isinstance(bgp.data, BGP.Notification)
assert bgp.data.code == 0x11
assert bgp.data.subcode == 0x22
assert bgp.data.error == b'\x33'
assert bytes(bgp) == buf_bgp_hdr + buf_notification


def test_keepalive():
keepalive = BGP.Keepalive(b'\x11')
assert len(keepalive) == 0
assert bytes(keepalive) == b''


def test_routegeneric():
from binascii import unhexlify

buf = unhexlify(
'08' # len (bits)
'11' # prefix
)
routegeneric = RouteGeneric(buf)
assert routegeneric.len == 8
assert routegeneric.prefix == b'\x11'

assert bytes(routegeneric) == buf
assert len(routegeneric) == 2


def test_routeipv4():
from binascii import unhexlify

buf = unhexlify(
'08' # len (bits)

'11' # prefix
)
routeipv4 = RouteIPV4(buf)
assert routeipv4.len == 8 # prefix len in bits
assert routeipv4.prefix == b'\x11\x00\x00\x00'

assert repr(routeipv4) == "RouteIPV4(17.0.0.0/8)"
assert bytes(routeipv4) == buf
assert len(routeipv4) == 2 # header + prefix(bytes)


def test_routeipv6():
from binascii import unhexlify

buf = unhexlify(
'08' # len (bits)
'22' # prefix
)

routeipv6 = RouteIPV4(buf)
assert routeipv6.len == 8 # prefix len in bits
assert routeipv6.prefix == b'\x22\x00\x00\x00'

assert bytes(routeipv6) == buf
assert len(routeipv6) == 2 # header + prefix(bytes)


def test_extendedrouteipv4():
from binascii import unhexlify

buf = unhexlify(
'00000001' # path_id
'20' # len (bits)
'05050505' # prefix
)
extendedrouteipv4 = ExtendedRouteIPV4(buf)
assert extendedrouteipv4.path_id == 1
assert extendedrouteipv4.len == 32
assert extendedrouteipv4.prefix == unhexlify('05050505')
assert repr(extendedrouteipv4) == "ExtendedRouteIPV4(5.5.5.5/32 PathId 1)"

assert bytes(extendedrouteipv4) == buf
assert len(extendedrouteipv4) == len(buf)


def test_routeevpn():
from binascii import unhexlify

buf = unhexlify(
'02' # type
'1a' # len

# route distinguisher
'1111111111111111'

# esi
'22222222222222222222'

# eth_id
'33333333'

# mac address
'00' # len (bits)

# ip address
'00' # len (bits)

# mpls
'6666' # label stack
)

routeevpn = RouteEVPN(buf)
assert routeevpn.type == 2
assert routeevpn.len == 26

assert routeevpn.esi == unhexlify('22222222222222222222')
assert routeevpn.eth_id == unhexlify('33333333')

assert routeevpn.mac_address_length == 0
assert routeevpn.mac_address is None

assert routeevpn.ip_address_length == 0
assert routeevpn.ip_address is None

assert routeevpn.mpls_label_stack == unhexlify('6666')

assert bytes(routeevpn) == buf
assert len(routeevpn) == len(buf)


def test_route_refresh():
from binascii import unhexlify
buf_route_refresh = unhexlify(
'1111' # afi
'22' # rsvd
'33' # safi
)
route_refresh = BGP.RouteRefresh(buf_route_refresh)
assert route_refresh.afi == 0x1111
assert route_refresh.rsvd == 0x22
assert route_refresh.safi == 0x33
assert bytes(route_refresh) == buf_route_refresh

buf_bgp_hdr = unhexlify(
'11111111111111111111111111111111' # marker
'0017' # len
'05' # type (ROUTE_REFRESH)
)
bgp = BGP(buf_bgp_hdr + buf_route_refresh)

assert hasattr(bgp, 'route_refresh')
assert isinstance(bgp.data, BGP.RouteRefresh)
assert bgp.data.afi == 0x1111
assert bgp.data.rsvd == 0x22
assert bgp.data.safi == 0x33
assert bytes(bgp) == buf_bgp_hdr + buf_route_refresh


def test_mpunreachnlri():
from binascii import unhexlify
buf_routeipv4 = unhexlify(
'08' # len (bits)
'11' # prefix
)

buf_routeipv6 = unhexlify(
'08' # len (bits)
'22' # prefix
)

buf_routeevpn = unhexlify(
'02' # type
'1a' # len

# route distinguisher
'1111111111111111'

# esi
'22222222222222222222'

# eth_id
'33333333'

# mac address
'00' # len (bits)

# ip address
'00' # len (bits)

# mpls
'6666' # label stack
)

buf_routegeneric = unhexlify(
'08' # len (bits)
'33' # prefix
)

afi = struct.Struct('>H')
routes = (
(AFI_IPV4, buf_routeipv4, RouteIPV4),
(AFI_IPV6, buf_routeipv6, RouteIPV6),
(AFI_L2VPN, buf_routeevpn, RouteEVPN),
# this afi does not exist, so we will parse as RouteGeneric
(1234, buf_routegeneric, RouteGeneric),
)

for afi_id, buf, cls in routes:
buf = afi.pack(afi_id) + b'\xcc' + buf
mpu = BGP.Update.Attribute.MPUnreachNLRI(buf)

assert mpu.afi == afi_id
assert mpu.safi == 0xcc
assert len(mpu.data) == 1
route = mpu.data[0]
assert isinstance(route, cls)

assert bytes(mpu) == buf
assert len(mpu) == len(buf)

# test the unpacking of the routes, as an Attribute
attribute_hdr = struct.Struct('BBB')
for afi_id, buf, cls in routes:
buf_mpunreachnlri = afi.pack(afi_id) + b'\xcc' + buf
buf_attribute_hdr = attribute_hdr.pack(0, MP_UNREACH_NLRI, len(buf_mpunreachnlri))
buf = buf_attribute_hdr + buf_mpunreachnlri

attribute = BGP.Update.Attribute(buf)
assert isinstance(attribute.data, BGP.Update.Attribute.MPUnreachNLRI)
routes = attribute.data.data
assert len(routes) == 1
assert isinstance(routes[0], cls)


def test_update_withdrawn():
from binascii import unhexlify
buf_ipv4 = unhexlify(
'08' # len (bits)
'11' # prefix
)
packed_length = struct.Struct('>H').pack
wlen, plen = packed_length(len(buf_ipv4)), packed_length(0)

buf = wlen + buf_ipv4 + plen
update = BGP.Update(buf)

assert len(update.withdrawn) == 1
route = update.withdrawn[0]
assert isinstance(route, RouteIPV4)
assert bytes(update) == buf


def test_parameters():
from binascii import unhexlify
buf = unhexlify(
'44' # v
'1111' # asn
'2222' # holdtime
'33333333' # identifier
'03' # param_len

# Parameter
'01' # type (AUTHENTICATION)
'01' # len

# Authentication
'11' # code
)
bgp_open = BGP.Open(buf)
assert len(bgp_open.parameters) == 1
parameter = bgp_open.parameters[0]

assert isinstance(parameter, BGP.Open.Parameter)
assert isinstance(parameter.data, BGP.Open.Parameter.Authentication)

assert bytes(bgp_open) == buf
assert len(bgp_open) == len(buf)


def test_reservedcommunities():
from binascii import unhexlify
buf = unhexlify(
# ReservedCommunity
'00002222' # value
)
communities = BGP.Update.Attribute.Communities(buf)
assert len(communities.data) == 1

community = communities.data[0]
assert isinstance(community, BGP.Update.Attribute.Communities.ReservedCommunity)
assert len(community) == 4
assert bytes(community) == buf

assert len(communities) == 4
assert bytes(communities) == buf