diff --git a/dpkt/bgp.py b/dpkt/bgp.py index ea4d3159..352dfa0c 100755 --- a/dpkt/bgp.py +++ b/dpkt/bgp.py @@ -152,7 +152,7 @@ def unpack(self, buf): elif self.type == UPDATE: self.data = self.update = self.Update(self.data) elif self.type == NOTIFICATION: - self.data = self.notifiation = self.Notification(self.data) + self.data = self.notification = self.Notification(self.data) elif self.type == KEEPALIVE: self.data = self.keepalive = self.Keepalive(self.data) elif self.type == ROUTE_REFRESH: @@ -601,7 +601,7 @@ def __bytes__(self): b''.join(map(bytes, self.snpas)) + \ b''.join(map(bytes, self.announced)) - class SNPA(object): + class SNPA(dpkt.Packet): __hdr__ = ( ('len', 'B', 0), ) @@ -1211,3 +1211,373 @@ def test_bgp_add_path_6_1_as_path(): assert (attribute.flags == 0x80) assert (attribute.len == 4) assert (socket.inet_ntop(socket.AF_INET, bytes(attribute.originator_id)) == '10.0.15.1') + + +def test_attribute_accessors(): + from binascii import unhexlify + + buf = unhexlify( + '00' # flags + '01' # type (ORIGIN) + + '01' # length + '00' # Origin type + ) + attribute = BGP.Update.Attribute(buf) + assert isinstance(attribute.data, BGP.Update.Attribute.Origin) + for attr in ['optional', 'transitive', 'partial', 'extended_length']: + assert getattr(attribute, attr) == 0 + + # check we can set.. + setattr(attribute, attr, 1) + assert getattr(attribute, attr) == 1 + + # and also unset + setattr(attribute, attr, 0) + assert getattr(attribute, attr) == 0 + + +def test_snpa(): + from binascii import unhexlify + + buf = unhexlify( + '04' # len (in semi-octets) + '1234' # data + ) + snpa = BGP.Update.Attribute.MPReachNLRI.SNPA(buf) + assert snpa.len == 4 # length of the data in semi-octets + assert len(snpa) == 3 # length of the snpa in bytes (including header) + assert bytes(snpa) == buf + + +def test_mpreachnlri(): + from binascii import unhexlify + + buf = unhexlify( + '0000' # afi + '00' # safi + + '00' # nlen + '01' # num SNPAs + + # SNPA + '04' # len + '1234' # data + ) + mp = BGP.Update.Attribute.MPReachNLRI(buf) + assert len(mp.snpas) == 1 + assert bytes(mp) == buf + + +def test_notification(): + from binascii import unhexlify + + buf_notification = unhexlify( + '11' # code + '22' # subcode + + '33' # error + ) + notification = BGP.Notification(buf_notification) + assert notification.code == 0x11 + assert notification.subcode == 0x22 + assert notification.error == b'\x33' + assert bytes(notification) == buf_notification + + buf_bgp_hdr = unhexlify( + '11111111111111111111111111111111' # marker + '0016' # len + '03' # type (NOTIFICATION) + ) + bgp = BGP(buf_bgp_hdr + buf_notification) + + assert hasattr(bgp, 'notification') + assert isinstance(bgp.data, BGP.Notification) + assert bgp.data.code == 0x11 + assert bgp.data.subcode == 0x22 + assert bgp.data.error == b'\x33' + assert bytes(bgp) == buf_bgp_hdr + buf_notification + + +def test_keepalive(): + keepalive = BGP.Keepalive(b'\x11') + assert len(keepalive) == 0 + assert bytes(keepalive) == b'' + + +def test_routegeneric(): + from binascii import unhexlify + + buf = unhexlify( + '08' # len (bits) + '11' # prefix + ) + routegeneric = RouteGeneric(buf) + assert routegeneric.len == 8 + assert routegeneric.prefix == b'\x11' + + assert bytes(routegeneric) == buf + assert len(routegeneric) == 2 + + +def test_routeipv4(): + from binascii import unhexlify + + buf = unhexlify( + '08' # len (bits) + + '11' # prefix + ) + routeipv4 = RouteIPV4(buf) + assert routeipv4.len == 8 # prefix len in bits + assert routeipv4.prefix == b'\x11\x00\x00\x00' + + assert repr(routeipv4) == "RouteIPV4(17.0.0.0/8)" + assert bytes(routeipv4) == buf + assert len(routeipv4) == 2 # header + prefix(bytes) + + +def test_routeipv6(): + from binascii import unhexlify + + buf = unhexlify( + '08' # len (bits) + '22' # prefix + ) + + routeipv6 = RouteIPV4(buf) + assert routeipv6.len == 8 # prefix len in bits + assert routeipv6.prefix == b'\x22\x00\x00\x00' + + assert bytes(routeipv6) == buf + assert len(routeipv6) == 2 # header + prefix(bytes) + + +def test_extendedrouteipv4(): + from binascii import unhexlify + + buf = unhexlify( + '00000001' # path_id + '20' # len (bits) + '05050505' # prefix + ) + extendedrouteipv4 = ExtendedRouteIPV4(buf) + assert extendedrouteipv4.path_id == 1 + assert extendedrouteipv4.len == 32 + assert extendedrouteipv4.prefix == unhexlify('05050505') + assert repr(extendedrouteipv4) == "ExtendedRouteIPV4(5.5.5.5/32 PathId 1)" + + assert bytes(extendedrouteipv4) == buf + assert len(extendedrouteipv4) == len(buf) + + +def test_routeevpn(): + from binascii import unhexlify + + buf = unhexlify( + '02' # type + '1a' # len + + # route distinguisher + '1111111111111111' + + # esi + '22222222222222222222' + + # eth_id + '33333333' + + # mac address + '00' # len (bits) + + # ip address + '00' # len (bits) + + # mpls + '6666' # label stack + ) + + routeevpn = RouteEVPN(buf) + assert routeevpn.type == 2 + assert routeevpn.len == 26 + + assert routeevpn.esi == unhexlify('22222222222222222222') + assert routeevpn.eth_id == unhexlify('33333333') + + assert routeevpn.mac_address_length == 0 + assert routeevpn.mac_address is None + + assert routeevpn.ip_address_length == 0 + assert routeevpn.ip_address is None + + assert routeevpn.mpls_label_stack == unhexlify('6666') + + assert bytes(routeevpn) == buf + assert len(routeevpn) == len(buf) + + +def test_route_refresh(): + from binascii import unhexlify + buf_route_refresh = unhexlify( + '1111' # afi + '22' # rsvd + '33' # safi + ) + route_refresh = BGP.RouteRefresh(buf_route_refresh) + assert route_refresh.afi == 0x1111 + assert route_refresh.rsvd == 0x22 + assert route_refresh.safi == 0x33 + assert bytes(route_refresh) == buf_route_refresh + + buf_bgp_hdr = unhexlify( + '11111111111111111111111111111111' # marker + '0017' # len + '05' # type (ROUTE_REFRESH) + ) + bgp = BGP(buf_bgp_hdr + buf_route_refresh) + + assert hasattr(bgp, 'route_refresh') + assert isinstance(bgp.data, BGP.RouteRefresh) + assert bgp.data.afi == 0x1111 + assert bgp.data.rsvd == 0x22 + assert bgp.data.safi == 0x33 + assert bytes(bgp) == buf_bgp_hdr + buf_route_refresh + + +def test_mpunreachnlri(): + from binascii import unhexlify + buf_routeipv4 = unhexlify( + '08' # len (bits) + '11' # prefix + ) + + buf_routeipv6 = unhexlify( + '08' # len (bits) + '22' # prefix + ) + + buf_routeevpn = unhexlify( + '02' # type + '1a' # len + + # route distinguisher + '1111111111111111' + + # esi + '22222222222222222222' + + # eth_id + '33333333' + + # mac address + '00' # len (bits) + + # ip address + '00' # len (bits) + + # mpls + '6666' # label stack + ) + + buf_routegeneric = unhexlify( + '08' # len (bits) + '33' # prefix + ) + + afi = struct.Struct('>H') + routes = ( + (AFI_IPV4, buf_routeipv4, RouteIPV4), + (AFI_IPV6, buf_routeipv6, RouteIPV6), + (AFI_L2VPN, buf_routeevpn, RouteEVPN), + # this afi does not exist, so we will parse as RouteGeneric + (1234, buf_routegeneric, RouteGeneric), + ) + + for afi_id, buf, cls in routes: + buf = afi.pack(afi_id) + b'\xcc' + buf + mpu = BGP.Update.Attribute.MPUnreachNLRI(buf) + + assert mpu.afi == afi_id + assert mpu.safi == 0xcc + assert len(mpu.data) == 1 + route = mpu.data[0] + assert isinstance(route, cls) + + assert bytes(mpu) == buf + assert len(mpu) == len(buf) + + # test the unpacking of the routes, as an Attribute + attribute_hdr = struct.Struct('BBB') + for afi_id, buf, cls in routes: + buf_mpunreachnlri = afi.pack(afi_id) + b'\xcc' + buf + buf_attribute_hdr = attribute_hdr.pack(0, MP_UNREACH_NLRI, len(buf_mpunreachnlri)) + buf = buf_attribute_hdr + buf_mpunreachnlri + + attribute = BGP.Update.Attribute(buf) + assert isinstance(attribute.data, BGP.Update.Attribute.MPUnreachNLRI) + routes = attribute.data.data + assert len(routes) == 1 + assert isinstance(routes[0], cls) + + +def test_update_withdrawn(): + from binascii import unhexlify + buf_ipv4 = unhexlify( + '08' # len (bits) + '11' # prefix + ) + packed_length = struct.Struct('>H').pack + wlen, plen = packed_length(len(buf_ipv4)), packed_length(0) + + buf = wlen + buf_ipv4 + plen + update = BGP.Update(buf) + + assert len(update.withdrawn) == 1 + route = update.withdrawn[0] + assert isinstance(route, RouteIPV4) + assert bytes(update) == buf + + +def test_parameters(): + from binascii import unhexlify + buf = unhexlify( + '44' # v + '1111' # asn + '2222' # holdtime + '33333333' # identifier + '03' # param_len + + # Parameter + '01' # type (AUTHENTICATION) + '01' # len + + # Authentication + '11' # code + ) + bgp_open = BGP.Open(buf) + assert len(bgp_open.parameters) == 1 + parameter = bgp_open.parameters[0] + + assert isinstance(parameter, BGP.Open.Parameter) + assert isinstance(parameter.data, BGP.Open.Parameter.Authentication) + + assert bytes(bgp_open) == buf + assert len(bgp_open) == len(buf) + + +def test_reservedcommunities(): + from binascii import unhexlify + buf = unhexlify( + # ReservedCommunity + '00002222' # value + ) + communities = BGP.Update.Attribute.Communities(buf) + assert len(communities.data) == 1 + + community = communities.data[0] + assert isinstance(community, BGP.Update.Attribute.Communities.ReservedCommunity) + assert len(community) == 4 + assert bytes(community) == buf + + assert len(communities) == 4 + assert bytes(communities) == buf