Skip to content

Commit 6a52fd2

Browse files
author
Eric Chlebek
committed
Replace boltdb with modernc.org/sqlite
This commit replaces the storage backend of lasr. Now, sqlite is used. It also removes support for dead-lettering, putting the responsibility for that in the hands of the user. The new storage backend comes with a significant performance penalty. The library is now around 10 times slower. More work will be needed to achieve parity with the boltdb-backed queue. Signed-off-by: Eric Chlebek <eric@sensu.io>
1 parent dc9debd commit 6a52fd2

25 files changed

+492
-961
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
[![GoDoc](https://img.shields.io/badge/godoc-reference-blue.svg?style=flat-square)](https://godoc.org/github.com/echlebek/lasr)
33

44
# lasr
5-
A persistent message queue backed by BoltDB. This queue is useful when the producers and consumers can live in the same process.
5+
A persistent work queue backed by sqlite3. This queue is useful when the producers and consumers can live in the same process.
66

77
Project goals
88
-------------

acknack.go

Lines changed: 22 additions & 117 deletions
Original file line numberDiff line numberDiff line change
@@ -2,77 +2,40 @@ package lasr
22

33
import (
44
"sync/atomic"
5-
6-
"github.com/boltdb/bolt"
75
)
86

9-
func (q *Q) ack(id []byte) error {
10-
q.mu.RLock()
11-
defer q.mu.RUnlock()
12-
var wake bool
13-
err := q.db.Update(func(tx *bolt.Tx) error {
14-
var err error
15-
wake, err = q.stopWaitingOn(tx, id)
16-
if err != nil {
17-
return err
18-
}
19-
bucket, err := q.bucket(tx, q.keys.unacked)
20-
if err != nil {
21-
return err
22-
}
23-
return bucket.Delete(id)
24-
})
25-
if err == nil {
26-
q.inFlight.Done()
27-
}
28-
if wake && !q.isClosed() {
29-
q.waker.Wake()
30-
}
31-
return err
32-
}
7+
func (q *Q) ack(id ID) error {
8+
q.mu.Lock()
9+
defer q.mu.Unlock()
3310

34-
func (q *Q) nack(id []byte, retry bool) error {
35-
q.mu.RLock()
36-
defer q.mu.RUnlock()
37-
var wake bool
38-
err := q.db.Update(func(tx *bolt.Tx) (rerr error) {
39-
bucket, err := q.bucket(tx, q.keys.unacked)
40-
if err != nil {
41-
return err
42-
}
43-
if retry {
44-
wake = true
45-
val := bucket.Get(id)
46-
ready, err := q.bucket(tx, q.keys.ready)
47-
if err != nil {
48-
return err
49-
}
50-
return ready.Put(id, val)
51-
}
52-
wake, err = q.stopWaitingOn(tx, id)
53-
if err != nil {
54-
return err
55-
}
56-
if len(q.keys.returned) > 0 {
57-
val := bucket.Get(id)
58-
returned, err := q.bucket(tx, q.keys.returned)
59-
if err != nil {
60-
return err
61-
}
62-
return returned.Put(id, val)
63-
}
64-
return bucket.Delete(id)
65-
})
11+
_, err := q.db.Exec(ackSQL, id)
6612
if err != nil {
6713
return err
6814
}
15+
6916
q.inFlight.Done()
70-
if wake && !q.isClosed() {
17+
18+
if !q.isClosed() {
7119
q.waker.Wake()
7220
}
21+
7322
return nil
7423
}
7524

25+
func (q *Q) nack(id ID, retry bool) error {
26+
q.mu.Lock()
27+
defer q.mu.Unlock()
28+
if retry {
29+
_, err := q.db.Exec(nackRetrySQL, id)
30+
if err == nil {
31+
q.inFlight.Done()
32+
}
33+
q.waker.Wake()
34+
return err
35+
}
36+
return q.ack(id)
37+
}
38+
7639
// Ack acknowledges successful receipt and processing of the Message.
7740
func (m *Message) Ack() (err error) {
7841
if !atomic.CompareAndSwapInt32(&m.once, 0, 1) {
@@ -90,61 +53,3 @@ func (m *Message) Nack(retry bool) (err error) {
9053
}
9154
return m.q.nack(m.ID, retry)
9255
}
93-
94-
// stopWaitingOn causes all messages waiting on id to not wait on id.
95-
// If stopWaitingOn finds any messages that were waiting on id that are not
96-
// waiting on any other messages, it will move them to the Ready state.
97-
func (q *Q) stopWaitingOn(tx *bolt.Tx, id []byte) (bool, error) {
98-
// blocking -> x blocking y
99-
// blockedOn -> x blocked on y
100-
wake := false
101-
blocking, err := q.bucket(tx, q.keys.blocking)
102-
if err != nil {
103-
return wake, err
104-
}
105-
blocker := blocking.Bucket(id)
106-
if blocker == nil {
107-
return wake, nil
108-
}
109-
blockedOn, err := q.bucket(tx, q.keys.blockedOn)
110-
if err != nil {
111-
return wake, err
112-
}
113-
c := blocker.Cursor()
114-
for k, _ := c.First(); k != nil; k, _ = c.Next() {
115-
blockedMsg := blockedOn.Bucket(k)
116-
if blockedMsg == nil {
117-
continue
118-
}
119-
if err := blockedMsg.Delete(id); err != nil {
120-
return wake, err
121-
}
122-
c = blockedMsg.Cursor()
123-
if k, _ := c.First(); k != nil {
124-
// There are still messages blocking the release of blockedMsg
125-
continue
126-
}
127-
// at this point, nothing is causing 'blockedMsg' to wait anymore.
128-
// a message will be placed into the ready state
129-
if err := blockedOn.DeleteBucket(k); err != nil {
130-
return wake, err
131-
}
132-
waiting, err := q.bucket(tx, q.keys.waiting)
133-
if err != nil {
134-
return wake, err
135-
}
136-
v := waiting.Get(k)
137-
ready, err := q.bucket(tx, q.keys.ready)
138-
if err != nil {
139-
return wake, err
140-
}
141-
if err := ready.Put(k, v); err != nil {
142-
return wake, err
143-
}
144-
wake = true
145-
if err := waiting.Delete(k); err != nil {
146-
return wake, err
147-
}
148-
}
149-
return wake, blocking.DeleteBucket(id)
150-
}

acknack_test.go

Lines changed: 9 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,10 @@ import (
44
"context"
55
"sync"
66
"testing"
7-
8-
"github.com/boltdb/bolt"
97
)
108

119
func TestAckConcurrent(t *testing.T) {
12-
q, cleanup := newQ(t, WithDeadLetters())
13-
defer cleanup()
10+
q := newQ(t)
1411

1512
msg := []byte("foo")
1613
if _, err := q.Send(msg); err != nil {
@@ -58,8 +55,7 @@ func TestAckConcurrent(t *testing.T) {
5855
}
5956

6057
func TestNackConcurrent(t *testing.T) {
61-
q, cleanup := newQ(t, WithDeadLetters())
62-
defer cleanup()
58+
q := newQ(t)
6359

6460
msg := []byte("foo")
6561
if _, err := q.Send(msg); err != nil {
@@ -107,8 +103,7 @@ func TestNackConcurrent(t *testing.T) {
107103
}
108104

109105
func TestNackDeletesMessage_GH5(t *testing.T) {
110-
q, cleanup := newQ(t)
111-
defer cleanup()
106+
q := newQ(t)
112107

113108
_, err := q.Send([]byte("foo"))
114109
if err != nil {
@@ -124,19 +119,12 @@ func TestNackDeletesMessage_GH5(t *testing.T) {
124119
t.Fatal(err)
125120
}
126121

127-
err = q.db.Update(func(tx *bolt.Tx) error {
128-
bucket, err := q.bucket(tx, q.keys.unacked)
129-
if err != nil {
130-
return err
131-
}
132-
k, _ := bucket.Cursor().First()
133-
if k != nil {
134-
t.Error("message remains unacked")
135-
}
136-
return nil
137-
})
138-
139-
if err != nil {
122+
row := q.db.QueryRow("SELECT count(*) FROM queue WHERE id = ?;", msg.ID)
123+
var count int
124+
if err := row.Scan(&count); err != nil {
140125
t.Fatal(err)
141126
}
127+
if count > 0 {
128+
t.Fatalf("bad count: got %d, want 0", count)
129+
}
142130
}

0 commit comments

Comments
 (0)