Compare commits

...

2 Commits

Author SHA1 Message Date
Jan Schär b3a72503fb
Merge 7a668d7c79 into d11ef81b6a 2025-03-18 10:55:37 +01:00
Jan Schär 7a668d7c79 Set rule handle during flush
This change makes it possible to delete rules after inserting them,
without needing to query the rules first. Additionally, this allows
positioning a new rule next to an existing rule.

There are two ways to refer to a rule: Either by ID or by handle. The ID
is assigned by userspace, and is only valid within a transaction, so it
can only be used before the flush. The handle is assigned by the kernel
when the transaction is committed, and can thus only be used after the
flush. We thus need to set an ID on each newly created rule, and
retrieve the handle of the rule during the flush.

I implemented a new mechanism for retrieving replies in Flush, and
handling these replies by adding a callback to netlink messages. There
was some existing code to handle "overrun", which I deleted, because it
was nonsensical and just worked by accident. NLMSG_OVERRUN is in fact
not a flag, but a complete message type, so the (re&netlink.Overrun)
masking makes no sense. Even better, NLMSG_OVERRUN is never actually
used by Linux. What this code was actually doing was skipping over the
NFT_MSG_NEWRULE replies, and possibly a NFT_MSG_NEWGEN reply.

I updated tests to generate replies for the NFT_MSG_NEWRULE messages
with a handle added.
2025-03-18 09:46:35 +00:00
10 changed files with 234 additions and 901 deletions

View File

@ -140,7 +140,7 @@ func (cc *Conn) AddChain(c *Chain) *Chain {
{Type: unix.NFTA_CHAIN_TYPE, Data: []byte(c.Type + "\x00")},
})...)
}
cc.messages = append(cc.messages, netlink.Message{
cc.messages = append(cc.messages, netlinkMessage{
Header: netlink.Header{
Type: netlink.HeaderType((unix.NFNL_SUBSYS_NFTABLES << 8) | unix.NFT_MSG_NEWCHAIN),
Flags: netlink.Request | netlink.Acknowledge | netlink.Create,
@ -161,7 +161,7 @@ func (cc *Conn) DelChain(c *Chain) {
{Type: unix.NFTA_CHAIN_NAME, Data: []byte(c.Name + "\x00")},
})
cc.messages = append(cc.messages, netlink.Message{
cc.messages = append(cc.messages, netlinkMessage{
Header: netlink.Header{
Type: netlink.HeaderType((unix.NFNL_SUBSYS_NFTABLES << 8) | unix.NFT_MSG_DELCHAIN),
Flags: netlink.Request | netlink.Acknowledge,
@ -179,7 +179,7 @@ func (cc *Conn) FlushChain(c *Chain) {
{Type: unix.NFTA_RULE_TABLE, Data: []byte(c.Table.Name + "\x00")},
{Type: unix.NFTA_RULE_CHAIN, Data: []byte(c.Name + "\x00")},
})
cc.messages = append(cc.messages, netlink.Message{
cc.messages = append(cc.messages, netlinkMessage{
Header: netlink.Header{
Type: netlink.HeaderType((unix.NFNL_SUBSYS_NFTABLES << 8) | unix.NFT_MSG_DELRULE),
Flags: netlink.Request | netlink.Acknowledge,

106
conn.go
View File

@ -41,7 +41,7 @@ type Conn struct {
lasting bool // establish a lasting connection to be used across multiple netlink operations.
mu sync.Mutex // protects the following state
messages []netlink.Message
messages []netlinkMessage
err error
nlconn *netlink.Conn // netlink socket using NETLINK_NETFILTER protocol.
sockOptions []SockOption
@ -49,6 +49,12 @@ type Conn struct {
allocatedIDs uint32
}
type netlinkMessage struct {
Header netlink.Header
Data []byte
handleReply func(reply netlink.Message) error
}
// ConnOption is an option to change the behavior of the nftables Conn returned by Open.
type ConnOption func(*Conn)
@ -171,24 +177,6 @@ func receiveAckAware(nlconn *netlink.Conn, sentMsgFlags netlink.HeaderFlags) ([]
return reply, nil
}
if len(reply) != 0 {
last := reply[len(reply)-1]
for re := last.Header.Type; (re&netlink.Overrun) == netlink.Overrun && (re&netlink.Done) != netlink.Done; re = last.Header.Type {
// we are not finished, the message is overrun
r, err := nlconn.Receive()
if err != nil {
return nil, err
}
reply = append(reply, r...)
last = reply[len(reply)-1]
}
if last.Header.Type == netlink.Error && binaryutil.BigEndian.Uint32(last.Data[:4]) == 0 {
// we have already collected an ack
return reply, nil
}
}
// Now we expect an ack
ack, err := nlconn.Receive()
if err != nil {
@ -196,8 +184,7 @@ func receiveAckAware(nlconn *netlink.Conn, sentMsgFlags netlink.HeaderFlags) ([]
}
if len(ack) == 0 {
// received an empty ack?
return reply, nil
return nil, errors.New("received an empty ack")
}
msg := ack[0]
@ -263,15 +250,53 @@ func (cc *Conn) Flush() error {
}
defer func() { _ = closer() }()
if _, err := conn.SendMessages(batch(cc.messages)); err != nil {
messages, err := conn.SendMessages(batch(cc.messages))
if err != nil {
return fmt.Errorf("SendMessages: %w", err)
}
var errs error
// Fetch replies. Each message with the Echo flag triggers a reply of the same
// type. Additionally, if the first message of the batch has the Echo flag, we
// get a reply of type NFT_MSG_NEWGEN, which we ignore.
replyIndex := 0
for replyIndex < len(cc.messages) && cc.messages[replyIndex].Header.Flags&netlink.Echo == 0 {
replyIndex++
}
replies, err := conn.Receive()
for err == nil && len(replies) != 0 {
reply := replies[0]
if reply.Header.Type == netlink.Error && reply.Header.Sequence == messages[1].Header.Sequence {
// The next message is the acknowledgement for the first message in the
// batch; stop looking for replies.
break
} else if replyIndex < len(cc.messages) {
msg := messages[replyIndex+1]
if msg.Header.Sequence == reply.Header.Sequence && msg.Header.Type == reply.Header.Type {
err := cc.messages[replyIndex].handleReply(reply)
if err != nil {
errs = errors.Join(errs, err)
}
replyIndex++
for replyIndex < len(cc.messages) && cc.messages[replyIndex].Header.Flags&netlink.Echo == 0 {
replyIndex++
}
}
}
replies = replies[1:]
if len(replies) == 0 {
replies, err = conn.Receive()
}
}
// Fetch the requested acknowledgement for each message we sent.
for _, msg := range cc.messages {
if _, err := receiveAckAware(conn, msg.Header.Flags); err != nil {
if errors.Is(err, os.ErrPermission) || errors.Is(err, syscall.ENOBUFS) {
for i := range cc.messages {
if i != 0 {
_, err = conn.Receive()
}
if err != nil {
if errors.Is(err, os.ErrPermission) || errors.Is(err, syscall.ENOBUFS) || errors.Is(err, syscall.ENOMEM) {
// Kernel will only send one error to user space.
return err
}
@ -282,6 +307,9 @@ func (cc *Conn) Flush() error {
if errs != nil {
return fmt.Errorf("conn.Receive: %w", errs)
}
if replyIndex < len(cc.messages) {
return fmt.Errorf("missing reply for message %d in batch", replyIndex)
}
return nil
}
@ -291,7 +319,7 @@ func (cc *Conn) Flush() error {
func (cc *Conn) FlushRuleset() {
cc.mu.Lock()
defer cc.mu.Unlock()
cc.messages = append(cc.messages, netlink.Message{
cc.messages = append(cc.messages, netlinkMessage{
Header: netlink.Header{
Type: netlink.HeaderType((unix.NFNL_SUBSYS_NFTABLES << 8) | unix.NFT_MSG_DELTABLE),
Flags: netlink.Request | netlink.Acknowledge | netlink.Create,
@ -350,26 +378,30 @@ func (cc *Conn) marshalExpr(fam byte, e expr.Any) []byte {
return b
}
func batch(messages []netlink.Message) []netlink.Message {
batch := []netlink.Message{
{
Header: netlink.Header{
Type: netlink.HeaderType(unix.NFNL_MSG_BATCH_BEGIN),
Flags: netlink.Request,
},
Data: extraHeader(0, unix.NFNL_SUBSYS_NFTABLES),
func batch(messages []netlinkMessage) []netlink.Message {
batch := make([]netlink.Message, len(messages)+2)
batch[0] = netlink.Message{
Header: netlink.Header{
Type: netlink.HeaderType(unix.NFNL_MSG_BATCH_BEGIN),
Flags: netlink.Request,
},
Data: extraHeader(0, unix.NFNL_SUBSYS_NFTABLES),
}
batch = append(batch, messages...)
for i, msg := range messages {
batch[i+1] = netlink.Message{
Header: msg.Header,
Data: msg.Data,
}
}
batch = append(batch, netlink.Message{
batch[len(messages)+1] = netlink.Message{
Header: netlink.Header{
Type: netlink.HeaderType(unix.NFNL_MSG_BATCH_END),
Flags: netlink.Request,
},
Data: extraHeader(0, unix.NFNL_SUBSYS_NFTABLES),
})
}
return batch
}

View File

@ -142,7 +142,7 @@ func (cc *Conn) AddFlowtable(f *Flowtable) *Flowtable {
{Type: unix.NLA_F_NESTED | NFTA_FLOWTABLE_HOOK, Data: cc.marshalAttr(hookAttr)},
})...)
cc.messages = append(cc.messages, netlink.Message{
cc.messages = append(cc.messages, netlinkMessage{
Header: netlink.Header{
Type: netlink.HeaderType((unix.NFNL_SUBSYS_NFTABLES << 8) | NFT_MSG_NEWFLOWTABLE),
Flags: netlink.Request | netlink.Acknowledge | netlink.Create,
@ -162,7 +162,7 @@ func (cc *Conn) DelFlowtable(f *Flowtable) {
{Type: NFTA_FLOWTABLE_NAME, Data: []byte(f.Name)},
})
cc.messages = append(cc.messages, netlink.Message{
cc.messages = append(cc.messages, netlinkMessage{
Header: netlink.Header{
Type: netlink.HeaderType((unix.NFNL_SUBSYS_NFTABLES << 8) | NFT_MSG_DELFLOWTABLE),
Flags: netlink.Request | netlink.Acknowledge,

View File

@ -8,7 +8,9 @@ import (
"testing"
"github.com/google/nftables"
"github.com/google/nftables/binaryutil"
"github.com/mdlayher/netlink"
"golang.org/x/sys/unix"
)
// Recorder provides an nftables connection that does not send to the Linux
@ -21,14 +23,34 @@ type Recorder struct {
// Conn opens an nftables connection that records netlink messages into the
// Recorder.
func (r *Recorder) Conn() (*nftables.Conn, error) {
nextHandle := uint64(1)
return nftables.New(nftables.WithTestDial(
func(req []netlink.Message) ([]netlink.Message, error) {
r.requests = append(r.requests, req...)
acks := make([]netlink.Message, 0, len(req))
replies := make([]netlink.Message, 0, len(req))
// Generate replies.
for _, msg := range req {
if msg.Header.Flags&netlink.Echo != 0 {
data := append([]byte{}, msg.Data...)
switch msg.Header.Type {
case netlink.HeaderType((unix.NFNL_SUBSYS_NFTABLES << 8) | unix.NFT_MSG_NEWRULE):
attrs, _ := netlink.MarshalAttributes([]netlink.Attribute{
{Type: unix.NFTA_RULE_HANDLE, Data: binaryutil.BigEndian.PutUint64(nextHandle)},
})
nextHandle++
data = append(data, attrs...)
}
replies = append(replies, netlink.Message{
Header: msg.Header,
Data: data,
})
}
}
// Generate acknowledgements.
for _, msg := range req {
if msg.Header.Flags&netlink.Acknowledge != 0 {
acks = append(acks, netlink.Message{
replies = append(replies, netlink.Message{
Header: netlink.Header{
Length: 4,
Type: netlink.Error,
@ -39,7 +61,7 @@ func (r *Recorder) Conn() (*nftables.Conn, error) {
})
}
}
return acks, nil
return replies, nil
}))
}

File diff suppressed because it is too large Load Diff

4
obj.go
View File

@ -124,7 +124,7 @@ func (cc *Conn) AddObj(o Obj) Obj {
attrs = append(attrs, netlink.Attribute{Type: unix.NLA_F_NESTED | unix.NFTA_OBJ_DATA, Data: data})
}
cc.messages = append(cc.messages, netlink.Message{
cc.messages = append(cc.messages, netlinkMessage{
Header: netlink.Header{
Type: netlink.HeaderType((unix.NFNL_SUBSYS_NFTABLES << 8) | unix.NFT_MSG_NEWOBJ),
Flags: netlink.Request | netlink.Acknowledge | netlink.Create,
@ -146,7 +146,7 @@ func (cc *Conn) DeleteObject(o Obj) {
data := cc.marshalAttr(attrs)
data = append(data, cc.marshalAttr([]netlink.Attribute{{Type: unix.NLA_F_NESTED | unix.NFTA_OBJ_DATA}})...)
cc.messages = append(cc.messages, netlink.Message{
cc.messages = append(cc.messages, netlinkMessage{
Header: netlink.Header{
Type: netlink.HeaderType((unix.NFNL_SUBSYS_NFTABLES << 8) | unix.NFT_MSG_DELOBJ),
Flags: netlink.Request | netlink.Acknowledge,

49
rule.go
View File

@ -48,10 +48,13 @@ const (
type Rule struct {
Table *Table
Chain *Chain
// Handle identifies an existing Rule.
// Handle identifies an existing Rule. For a new Rule, this field is set
// during the Flush() in which the rule is committed. Make sure to not access
// this field concurrently with this Flush() to avoid data races.
Handle uint64
// ID is an identifier for a new Rule, which is assigned by
// AddRule/InsertRule, and only valid before the rule is committed by Flush().
// The field is set to 0 during Flush().
ID uint32
// Position can be set to the Handle of another Rule to insert the new Rule
// before (InsertRule) or after (AddRule) the existing rule.
@ -94,7 +97,7 @@ func (cc *Conn) GetRules(t *Table, c *Chain) ([]*Rule, error) {
message := netlink.Message{
Header: netlink.Header{
Type: netlink.HeaderType((unix.NFNL_SUBSYS_NFTABLES << 8) | unix.NFT_MSG_GETRULE),
Flags: netlink.Request | netlink.Acknowledge | netlink.Dump | unix.NLM_F_ECHO,
Flags: netlink.Request | netlink.Acknowledge | netlink.Dump,
},
Data: append(extraHeader(uint8(t.Family), 0), data...),
}
@ -164,20 +167,23 @@ func (cc *Conn) newRule(r *Rule, op ruleOperation) *Rule {
msgData := []byte{}
msgData = append(msgData, data...)
var flags netlink.HeaderFlags
if r.UserData != nil {
msgData = append(msgData, cc.marshalAttr([]netlink.Attribute{
{Type: unix.NFTA_RULE_USERDATA, Data: r.UserData},
})...)
}
var flags netlink.HeaderFlags
var handleReply func(reply netlink.Message) error
switch op {
case operationAdd:
flags = netlink.Request | netlink.Acknowledge | netlink.Create | unix.NLM_F_ECHO | unix.NLM_F_APPEND
flags = netlink.Request | netlink.Acknowledge | netlink.Create | netlink.Echo | netlink.Append
handleReply = r.handleCreateReply
case operationInsert:
flags = netlink.Request | netlink.Acknowledge | netlink.Create | unix.NLM_F_ECHO
flags = netlink.Request | netlink.Acknowledge | netlink.Create | netlink.Echo
handleReply = r.handleCreateReply
case operationReplace:
flags = netlink.Request | netlink.Acknowledge | netlink.Replace | unix.NLM_F_ECHO | unix.NLM_F_REPLACE
flags = netlink.Request | netlink.Acknowledge | netlink.Replace
}
if r.Position != 0 || (r.Flags&(1<<unix.NFTA_RULE_POSITION)) != 0 {
@ -190,17 +196,42 @@ func (cc *Conn) newRule(r *Rule, op ruleOperation) *Rule {
})...)
}
cc.messages = append(cc.messages, netlink.Message{
cc.messages = append(cc.messages, netlinkMessage{
Header: netlink.Header{
Type: newRuleHeaderType,
Flags: flags,
},
Data: append(extraHeader(uint8(r.Table.Family), 0), msgData...),
Data: append(extraHeader(uint8(r.Table.Family), 0), msgData...),
handleReply: handleReply,
})
return r
}
func (r *Rule) handleCreateReply(reply netlink.Message) error {
ad, err := netlink.NewAttributeDecoder(reply.Data[4:])
if err != nil {
return err
}
ad.ByteOrder = binary.BigEndian
var handle uint64
for ad.Next() {
switch ad.Type() {
case unix.NFTA_RULE_HANDLE:
handle = ad.Uint64()
}
}
if ad.Err() != nil {
return ad.Err()
}
if handle == 0 {
return fmt.Errorf("missing rule handle in create reply")
}
r.Handle = handle
r.ID = 0
return nil
}
func (cc *Conn) ReplaceRule(r *Rule) *Rule {
return cc.newRule(r, operationReplace)
}
@ -247,7 +278,7 @@ func (cc *Conn) DelRule(r *Rule) error {
}
flags := netlink.Request | netlink.Acknowledge
cc.messages = append(cc.messages, netlink.Message{
cc.messages = append(cc.messages, netlinkMessage{
Header: netlink.Header{
Type: delRuleHeaderType,
Flags: flags,

8
set.go
View File

@ -506,7 +506,7 @@ func (cc *Conn) appendElemList(s *Set, vals []SetElement, hdrType uint16) error
{Type: unix.NFTA_SET_ELEM_LIST_ELEMENTS | unix.NLA_F_NESTED, Data: encodedElem},
}
cc.messages = append(cc.messages, netlink.Message{
cc.messages = append(cc.messages, netlinkMessage{
Header: netlink.Header{
Type: netlink.HeaderType((unix.NFNL_SUBSYS_NFTABLES << 8) | hdrType),
Flags: netlink.Request | netlink.Acknowledge | netlink.Create,
@ -680,7 +680,7 @@ func (cc *Conn) AddSet(s *Set, vals []SetElement) error {
tableInfo = append(tableInfo, netlink.Attribute{Type: unix.NLA_F_NESTED | NFTA_SET_ELEM_EXPRESSIONS, Data: data})
}
cc.messages = append(cc.messages, netlink.Message{
cc.messages = append(cc.messages, netlinkMessage{
Header: netlink.Header{
Type: netlink.HeaderType((unix.NFNL_SUBSYS_NFTABLES << 8) | unix.NFT_MSG_NEWSET),
Flags: netlink.Request | netlink.Acknowledge | netlink.Create,
@ -700,7 +700,7 @@ func (cc *Conn) DelSet(s *Set) {
{Type: unix.NFTA_SET_TABLE, Data: []byte(s.Table.Name + "\x00")},
{Type: unix.NFTA_SET_NAME, Data: []byte(s.Name + "\x00")},
})
cc.messages = append(cc.messages, netlink.Message{
cc.messages = append(cc.messages, netlinkMessage{
Header: netlink.Header{
Type: netlink.HeaderType((unix.NFNL_SUBSYS_NFTABLES << 8) | unix.NFT_MSG_DELSET),
Flags: netlink.Request | netlink.Acknowledge,
@ -717,7 +717,7 @@ func (cc *Conn) FlushSet(s *Set) {
{Type: unix.NFTA_SET_TABLE, Data: []byte(s.Table.Name + "\x00")},
{Type: unix.NFTA_SET_NAME, Data: []byte(s.Name + "\x00")},
})
cc.messages = append(cc.messages, netlink.Message{
cc.messages = append(cc.messages, netlinkMessage{
Header: netlink.Header{
Type: netlink.HeaderType((unix.NFNL_SUBSYS_NFTABLES << 8) | unix.NFT_MSG_DELSETELEM),
Flags: netlink.Request | netlink.Acknowledge,

View File

@ -254,7 +254,10 @@ func TestMarshalSet(t *testing.T) {
}
msg := c.messages[connMsgSetIdx]
nset, err := setsFromMsg(msg)
nset, err := setsFromMsg(netlink.Message{
Header: msg.Header,
Data: msg.Data,
})
if err != nil {
t.Fatalf("setsFromMsg() error: %+v", err)
}

View File

@ -57,7 +57,7 @@ func (cc *Conn) DelTable(t *Table) {
{Type: unix.NFTA_TABLE_NAME, Data: []byte(t.Name + "\x00")},
{Type: unix.NFTA_TABLE_FLAGS, Data: []byte{0, 0, 0, 0}},
})
cc.messages = append(cc.messages, netlink.Message{
cc.messages = append(cc.messages, netlinkMessage{
Header: netlink.Header{
Type: netlink.HeaderType((unix.NFNL_SUBSYS_NFTABLES << 8) | unix.NFT_MSG_DELTABLE),
Flags: netlink.Request | netlink.Acknowledge,
@ -73,7 +73,7 @@ func (cc *Conn) addTable(t *Table, flag netlink.HeaderFlags) *Table {
{Type: unix.NFTA_TABLE_NAME, Data: []byte(t.Name + "\x00")},
{Type: unix.NFTA_TABLE_FLAGS, Data: []byte{0, 0, 0, 0}},
})
cc.messages = append(cc.messages, netlink.Message{
cc.messages = append(cc.messages, netlinkMessage{
Header: netlink.Header{
Type: netlink.HeaderType((unix.NFNL_SUBSYS_NFTABLES << 8) | unix.NFT_MSG_NEWTABLE),
Flags: netlink.Request | netlink.Acknowledge | flag,
@ -103,7 +103,7 @@ func (cc *Conn) FlushTable(t *Table) {
data := cc.marshalAttr([]netlink.Attribute{
{Type: unix.NFTA_RULE_TABLE, Data: []byte(t.Name + "\x00")},
})
cc.messages = append(cc.messages, netlink.Message{
cc.messages = append(cc.messages, netlinkMessage{
Header: netlink.Header{
Type: netlink.HeaderType((unix.NFNL_SUBSYS_NFTABLES << 8) | unix.NFT_MSG_DELRULE),
Flags: netlink.Request | netlink.Acknowledge,