This commit is contained in:
Jan Schär 2025-03-24 09:36:38 +00:00 committed by GitHub
commit f609898485
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 234 additions and 900 deletions

View File

@ -140,7 +140,7 @@ func (cc *Conn) AddChain(c *Chain) *Chain {
{Type: unix.NFTA_CHAIN_TYPE, Data: []byte(c.Type + "\x00")},
})...)
}
cc.messages = append(cc.messages, netlink.Message{
cc.messages = append(cc.messages, netlinkMessage{
Header: netlink.Header{
Type: netlink.HeaderType((unix.NFNL_SUBSYS_NFTABLES << 8) | unix.NFT_MSG_NEWCHAIN),
Flags: netlink.Request | netlink.Acknowledge | netlink.Create,
@ -161,7 +161,7 @@ func (cc *Conn) DelChain(c *Chain) {
{Type: unix.NFTA_CHAIN_NAME, Data: []byte(c.Name + "\x00")},
})
cc.messages = append(cc.messages, netlink.Message{
cc.messages = append(cc.messages, netlinkMessage{
Header: netlink.Header{
Type: netlink.HeaderType((unix.NFNL_SUBSYS_NFTABLES << 8) | unix.NFT_MSG_DELCHAIN),
Flags: netlink.Request | netlink.Acknowledge,
@ -179,7 +179,7 @@ func (cc *Conn) FlushChain(c *Chain) {
{Type: unix.NFTA_RULE_TABLE, Data: []byte(c.Table.Name + "\x00")},
{Type: unix.NFTA_RULE_CHAIN, Data: []byte(c.Name + "\x00")},
})
cc.messages = append(cc.messages, netlink.Message{
cc.messages = append(cc.messages, netlinkMessage{
Header: netlink.Header{
Type: netlink.HeaderType((unix.NFNL_SUBSYS_NFTABLES << 8) | unix.NFT_MSG_DELRULE),
Flags: netlink.Request | netlink.Acknowledge,

99
conn.go
View File

@ -41,7 +41,7 @@ type Conn struct {
lasting bool // establish a lasting connection to be used across multiple netlink operations.
mu sync.Mutex // protects the following state
messages []netlink.Message
messages []netlinkMessage
err error
nlconn *netlink.Conn // netlink socket using NETLINK_NETFILTER protocol.
sockOptions []SockOption
@ -49,6 +49,12 @@ type Conn struct {
allocatedIDs uint32
}
type netlinkMessage struct {
Header netlink.Header
Data []byte
rule *Rule
}
// ConnOption is an option to change the behavior of the nftables Conn returned by Open.
type ConnOption func(*Conn)
@ -171,24 +177,6 @@ func receiveAckAware(nlconn *netlink.Conn, sentMsgFlags netlink.HeaderFlags) ([]
return reply, nil
}
if len(reply) != 0 {
last := reply[len(reply)-1]
for re := last.Header.Type; (re&netlink.Overrun) == netlink.Overrun && (re&netlink.Done) != netlink.Done; re = last.Header.Type {
// we are not finished, the message is overrun
r, err := nlconn.Receive()
if err != nil {
return nil, err
}
reply = append(reply, r...)
last = reply[len(reply)-1]
}
if last.Header.Type == netlink.Error && binaryutil.BigEndian.Uint32(last.Data[:4]) == 0 {
// we have already collected an ack
return reply, nil
}
}
// Now we expect an ack
ack, err := nlconn.Receive()
if err != nil {
@ -196,8 +184,7 @@ func receiveAckAware(nlconn *netlink.Conn, sentMsgFlags netlink.HeaderFlags) ([]
}
if len(ack) == 0 {
// received an empty ack?
return reply, nil
return nil, errors.New("received an empty ack")
}
msg := ack[0]
@ -263,15 +250,54 @@ func (cc *Conn) Flush() error {
}
defer func() { _ = closer() }()
if _, err := conn.SendMessages(batch(cc.messages)); err != nil {
messages, err := conn.SendMessages(batch(cc.messages))
if err != nil {
return fmt.Errorf("SendMessages: %w", err)
}
var errs error
// Fetch replies. Each message with the Echo flag triggers a reply of the same
// type. Additionally, if the first message of the batch has the Echo flag, we
// get a reply of type NFT_MSG_NEWGEN, which we ignore.
replyIndex := 0
for replyIndex < len(cc.messages) && cc.messages[replyIndex].Header.Flags&netlink.Echo == 0 {
replyIndex++
}
replies, err := conn.Receive()
for err == nil && len(replies) != 0 {
reply := replies[0]
if reply.Header.Type == netlink.Error && reply.Header.Sequence == messages[1].Header.Sequence {
// The next message is the acknowledgement for the first message in the
// batch; stop looking for replies.
break
} else if replyIndex < len(cc.messages) {
msg := messages[replyIndex+1]
if msg.Header.Sequence == reply.Header.Sequence && msg.Header.Type == reply.Header.Type {
// The only messages which set the echo flag are rule create messages.
err := cc.messages[replyIndex].rule.handleCreateReply(reply)
if err != nil {
errs = errors.Join(errs, err)
}
replyIndex++
for replyIndex < len(cc.messages) && cc.messages[replyIndex].Header.Flags&netlink.Echo == 0 {
replyIndex++
}
}
}
replies = replies[1:]
if len(replies) == 0 {
replies, err = conn.Receive()
}
}
// Fetch the requested acknowledgement for each message we sent.
for _, msg := range cc.messages {
if _, err := receiveAckAware(conn, msg.Header.Flags); err != nil {
if errors.Is(err, os.ErrPermission) || errors.Is(err, syscall.ENOBUFS) {
for i := range cc.messages {
if i != 0 {
_, err = conn.Receive()
}
if err != nil {
if errors.Is(err, os.ErrPermission) || errors.Is(err, syscall.ENOBUFS) || errors.Is(err, syscall.ENOMEM) {
// Kernel will only send one error to user space.
return err
}
@ -282,6 +308,9 @@ func (cc *Conn) Flush() error {
if errs != nil {
return fmt.Errorf("conn.Receive: %w", errs)
}
if replyIndex < len(cc.messages) {
return fmt.Errorf("missing reply for message %d in batch", replyIndex)
}
return nil
}
@ -291,7 +320,7 @@ func (cc *Conn) Flush() error {
func (cc *Conn) FlushRuleset() {
cc.mu.Lock()
defer cc.mu.Unlock()
cc.messages = append(cc.messages, netlink.Message{
cc.messages = append(cc.messages, netlinkMessage{
Header: netlink.Header{
Type: netlink.HeaderType((unix.NFNL_SUBSYS_NFTABLES << 8) | unix.NFT_MSG_DELTABLE),
Flags: netlink.Request | netlink.Acknowledge | netlink.Create,
@ -350,26 +379,30 @@ func (cc *Conn) marshalExpr(fam byte, e expr.Any) []byte {
return b
}
func batch(messages []netlink.Message) []netlink.Message {
batch := []netlink.Message{
{
func batch(messages []netlinkMessage) []netlink.Message {
batch := make([]netlink.Message, len(messages)+2)
batch[0] = netlink.Message{
Header: netlink.Header{
Type: netlink.HeaderType(unix.NFNL_MSG_BATCH_BEGIN),
Flags: netlink.Request,
},
Data: extraHeader(0, unix.NFNL_SUBSYS_NFTABLES),
},
}
batch = append(batch, messages...)
for i, msg := range messages {
batch[i+1] = netlink.Message{
Header: msg.Header,
Data: msg.Data,
}
}
batch = append(batch, netlink.Message{
batch[len(messages)+1] = netlink.Message{
Header: netlink.Header{
Type: netlink.HeaderType(unix.NFNL_MSG_BATCH_END),
Flags: netlink.Request,
},
Data: extraHeader(0, unix.NFNL_SUBSYS_NFTABLES),
})
}
return batch
}

View File

@ -142,7 +142,7 @@ func (cc *Conn) AddFlowtable(f *Flowtable) *Flowtable {
{Type: unix.NLA_F_NESTED | NFTA_FLOWTABLE_HOOK, Data: cc.marshalAttr(hookAttr)},
})...)
cc.messages = append(cc.messages, netlink.Message{
cc.messages = append(cc.messages, netlinkMessage{
Header: netlink.Header{
Type: netlink.HeaderType((unix.NFNL_SUBSYS_NFTABLES << 8) | NFT_MSG_NEWFLOWTABLE),
Flags: netlink.Request | netlink.Acknowledge | netlink.Create,
@ -162,7 +162,7 @@ func (cc *Conn) DelFlowtable(f *Flowtable) {
{Type: NFTA_FLOWTABLE_NAME, Data: []byte(f.Name)},
})
cc.messages = append(cc.messages, netlink.Message{
cc.messages = append(cc.messages, netlinkMessage{
Header: netlink.Header{
Type: netlink.HeaderType((unix.NFNL_SUBSYS_NFTABLES << 8) | NFT_MSG_DELFLOWTABLE),
Flags: netlink.Request | netlink.Acknowledge,

View File

@ -8,7 +8,9 @@ import (
"testing"
"github.com/google/nftables"
"github.com/google/nftables/binaryutil"
"github.com/mdlayher/netlink"
"golang.org/x/sys/unix"
)
// Recorder provides an nftables connection that does not send to the Linux
@ -21,14 +23,34 @@ type Recorder struct {
// Conn opens an nftables connection that records netlink messages into the
// Recorder.
func (r *Recorder) Conn() (*nftables.Conn, error) {
nextHandle := uint64(1)
return nftables.New(nftables.WithTestDial(
func(req []netlink.Message) ([]netlink.Message, error) {
r.requests = append(r.requests, req...)
acks := make([]netlink.Message, 0, len(req))
replies := make([]netlink.Message, 0, len(req))
// Generate replies.
for _, msg := range req {
if msg.Header.Flags&netlink.Echo != 0 {
data := append([]byte{}, msg.Data...)
switch msg.Header.Type {
case netlink.HeaderType((unix.NFNL_SUBSYS_NFTABLES << 8) | unix.NFT_MSG_NEWRULE):
attrs, _ := netlink.MarshalAttributes([]netlink.Attribute{
{Type: unix.NFTA_RULE_HANDLE, Data: binaryutil.BigEndian.PutUint64(nextHandle)},
})
nextHandle++
data = append(data, attrs...)
}
replies = append(replies, netlink.Message{
Header: msg.Header,
Data: data,
})
}
}
// Generate acknowledgements.
for _, msg := range req {
if msg.Header.Flags&netlink.Acknowledge != 0 {
acks = append(acks, netlink.Message{
replies = append(replies, netlink.Message{
Header: netlink.Header{
Length: 4,
Type: netlink.Error,
@ -39,7 +61,7 @@ func (r *Recorder) Conn() (*nftables.Conn, error) {
})
}
}
return acks, nil
return replies, nil
}))
}

File diff suppressed because it is too large Load Diff

4
obj.go
View File

@ -124,7 +124,7 @@ func (cc *Conn) AddObj(o Obj) Obj {
attrs = append(attrs, netlink.Attribute{Type: unix.NLA_F_NESTED | unix.NFTA_OBJ_DATA, Data: data})
}
cc.messages = append(cc.messages, netlink.Message{
cc.messages = append(cc.messages, netlinkMessage{
Header: netlink.Header{
Type: netlink.HeaderType((unix.NFNL_SUBSYS_NFTABLES << 8) | unix.NFT_MSG_NEWOBJ),
Flags: netlink.Request | netlink.Acknowledge | netlink.Create,
@ -146,7 +146,7 @@ func (cc *Conn) DeleteObject(o Obj) {
data := cc.marshalAttr(attrs)
data = append(data, cc.marshalAttr([]netlink.Attribute{{Type: unix.NLA_F_NESTED | unix.NFTA_OBJ_DATA}})...)
cc.messages = append(cc.messages, netlink.Message{
cc.messages = append(cc.messages, netlinkMessage{
Header: netlink.Header{
Type: netlink.HeaderType((unix.NFNL_SUBSYS_NFTABLES << 8) | unix.NFT_MSG_DELOBJ),
Flags: netlink.Request | netlink.Acknowledge,

47
rule.go
View File

@ -48,10 +48,13 @@ const (
type Rule struct {
Table *Table
Chain *Chain
// Handle identifies an existing Rule.
// Handle identifies an existing Rule. For a new Rule, this field is set
// during the Flush() in which the rule is committed. Make sure to not access
// this field concurrently with this Flush() to avoid data races.
Handle uint64
// ID is an identifier for a new Rule, which is assigned by
// AddRule/InsertRule, and only valid before the rule is committed by Flush().
// The field is set to 0 during Flush().
ID uint32
// Position can be set to the Handle of another Rule to insert the new Rule
// before (InsertRule) or after (AddRule) the existing rule.
@ -94,7 +97,7 @@ func (cc *Conn) GetRules(t *Table, c *Chain) ([]*Rule, error) {
message := netlink.Message{
Header: netlink.Header{
Type: netlink.HeaderType((unix.NFNL_SUBSYS_NFTABLES << 8) | unix.NFT_MSG_GETRULE),
Flags: netlink.Request | netlink.Acknowledge | netlink.Dump | unix.NLM_F_ECHO,
Flags: netlink.Request | netlink.Acknowledge | netlink.Dump,
},
Data: append(extraHeader(uint8(t.Family), 0), data...),
}
@ -164,20 +167,23 @@ func (cc *Conn) newRule(r *Rule, op ruleOperation) *Rule {
msgData := []byte{}
msgData = append(msgData, data...)
var flags netlink.HeaderFlags
if r.UserData != nil {
msgData = append(msgData, cc.marshalAttr([]netlink.Attribute{
{Type: unix.NFTA_RULE_USERDATA, Data: r.UserData},
})...)
}
var flags netlink.HeaderFlags
var ruleRef *Rule
switch op {
case operationAdd:
flags = netlink.Request | netlink.Acknowledge | netlink.Create | unix.NLM_F_ECHO | unix.NLM_F_APPEND
flags = netlink.Request | netlink.Acknowledge | netlink.Create | netlink.Echo | netlink.Append
ruleRef = r
case operationInsert:
flags = netlink.Request | netlink.Acknowledge | netlink.Create | unix.NLM_F_ECHO
flags = netlink.Request | netlink.Acknowledge | netlink.Create | netlink.Echo
ruleRef = r
case operationReplace:
flags = netlink.Request | netlink.Acknowledge | netlink.Replace | unix.NLM_F_ECHO | unix.NLM_F_REPLACE
flags = netlink.Request | netlink.Acknowledge | netlink.Replace
}
if r.Position != 0 || (r.Flags&(1<<unix.NFTA_RULE_POSITION)) != 0 {
@ -190,17 +196,42 @@ func (cc *Conn) newRule(r *Rule, op ruleOperation) *Rule {
})...)
}
cc.messages = append(cc.messages, netlink.Message{
cc.messages = append(cc.messages, netlinkMessage{
Header: netlink.Header{
Type: newRuleHeaderType,
Flags: flags,
},
Data: append(extraHeader(uint8(r.Table.Family), 0), msgData...),
rule: ruleRef,
})
return r
}
func (r *Rule) handleCreateReply(reply netlink.Message) error {
ad, err := netlink.NewAttributeDecoder(reply.Data[4:])
if err != nil {
return err
}
ad.ByteOrder = binary.BigEndian
var handle uint64
for ad.Next() {
switch ad.Type() {
case unix.NFTA_RULE_HANDLE:
handle = ad.Uint64()
}
}
if ad.Err() != nil {
return ad.Err()
}
if handle == 0 {
return fmt.Errorf("missing rule handle in create reply")
}
r.Handle = handle
r.ID = 0
return nil
}
func (cc *Conn) ReplaceRule(r *Rule) *Rule {
return cc.newRule(r, operationReplace)
}
@ -247,7 +278,7 @@ func (cc *Conn) DelRule(r *Rule) error {
}
flags := netlink.Request | netlink.Acknowledge
cc.messages = append(cc.messages, netlink.Message{
cc.messages = append(cc.messages, netlinkMessage{
Header: netlink.Header{
Type: delRuleHeaderType,
Flags: flags,

8
set.go
View File

@ -506,7 +506,7 @@ func (cc *Conn) appendElemList(s *Set, vals []SetElement, hdrType uint16) error
{Type: unix.NFTA_SET_ELEM_LIST_ELEMENTS | unix.NLA_F_NESTED, Data: encodedElem},
}
cc.messages = append(cc.messages, netlink.Message{
cc.messages = append(cc.messages, netlinkMessage{
Header: netlink.Header{
Type: netlink.HeaderType((unix.NFNL_SUBSYS_NFTABLES << 8) | hdrType),
Flags: netlink.Request | netlink.Acknowledge | netlink.Create,
@ -680,7 +680,7 @@ func (cc *Conn) AddSet(s *Set, vals []SetElement) error {
tableInfo = append(tableInfo, netlink.Attribute{Type: unix.NLA_F_NESTED | NFTA_SET_ELEM_EXPRESSIONS, Data: data})
}
cc.messages = append(cc.messages, netlink.Message{
cc.messages = append(cc.messages, netlinkMessage{
Header: netlink.Header{
Type: netlink.HeaderType((unix.NFNL_SUBSYS_NFTABLES << 8) | unix.NFT_MSG_NEWSET),
Flags: netlink.Request | netlink.Acknowledge | netlink.Create,
@ -700,7 +700,7 @@ func (cc *Conn) DelSet(s *Set) {
{Type: unix.NFTA_SET_TABLE, Data: []byte(s.Table.Name + "\x00")},
{Type: unix.NFTA_SET_NAME, Data: []byte(s.Name + "\x00")},
})
cc.messages = append(cc.messages, netlink.Message{
cc.messages = append(cc.messages, netlinkMessage{
Header: netlink.Header{
Type: netlink.HeaderType((unix.NFNL_SUBSYS_NFTABLES << 8) | unix.NFT_MSG_DELSET),
Flags: netlink.Request | netlink.Acknowledge,
@ -717,7 +717,7 @@ func (cc *Conn) FlushSet(s *Set) {
{Type: unix.NFTA_SET_TABLE, Data: []byte(s.Table.Name + "\x00")},
{Type: unix.NFTA_SET_NAME, Data: []byte(s.Name + "\x00")},
})
cc.messages = append(cc.messages, netlink.Message{
cc.messages = append(cc.messages, netlinkMessage{
Header: netlink.Header{
Type: netlink.HeaderType((unix.NFNL_SUBSYS_NFTABLES << 8) | unix.NFT_MSG_DELSETELEM),
Flags: netlink.Request | netlink.Acknowledge,

View File

@ -254,7 +254,10 @@ func TestMarshalSet(t *testing.T) {
}
msg := c.messages[connMsgSetIdx]
nset, err := setsFromMsg(msg)
nset, err := setsFromMsg(netlink.Message{
Header: msg.Header,
Data: msg.Data,
})
if err != nil {
t.Fatalf("setsFromMsg() error: %+v", err)
}

View File

@ -57,7 +57,7 @@ func (cc *Conn) DelTable(t *Table) {
{Type: unix.NFTA_TABLE_NAME, Data: []byte(t.Name + "\x00")},
{Type: unix.NFTA_TABLE_FLAGS, Data: []byte{0, 0, 0, 0}},
})
cc.messages = append(cc.messages, netlink.Message{
cc.messages = append(cc.messages, netlinkMessage{
Header: netlink.Header{
Type: netlink.HeaderType((unix.NFNL_SUBSYS_NFTABLES << 8) | unix.NFT_MSG_DELTABLE),
Flags: netlink.Request | netlink.Acknowledge,
@ -73,7 +73,7 @@ func (cc *Conn) addTable(t *Table, flag netlink.HeaderFlags) *Table {
{Type: unix.NFTA_TABLE_NAME, Data: []byte(t.Name + "\x00")},
{Type: unix.NFTA_TABLE_FLAGS, Data: []byte{0, 0, 0, 0}},
})
cc.messages = append(cc.messages, netlink.Message{
cc.messages = append(cc.messages, netlinkMessage{
Header: netlink.Header{
Type: netlink.HeaderType((unix.NFNL_SUBSYS_NFTABLES << 8) | unix.NFT_MSG_NEWTABLE),
Flags: netlink.Request | netlink.Acknowledge | flag,
@ -103,7 +103,7 @@ func (cc *Conn) FlushTable(t *Table) {
data := cc.marshalAttr([]netlink.Attribute{
{Type: unix.NFTA_RULE_TABLE, Data: []byte(t.Name + "\x00")},
})
cc.messages = append(cc.messages, netlink.Message{
cc.messages = append(cc.messages, netlinkMessage{
Header: netlink.Header{
Type: netlink.HeaderType((unix.NFNL_SUBSYS_NFTABLES << 8) | unix.NFT_MSG_DELRULE),
Flags: netlink.Request | netlink.Acknowledge,