diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml
index ba0af6d..8bc2e9d 100644
--- a/.github/workflows/linux.yml
+++ b/.github/workflows/linux.yml
@@ -30,7 +30,6 @@ jobs:
mkdir ./coverage-ci
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/frame.out -covermode=atomic ./pkg/frame
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/pipe.out -covermode=atomic ./pkg/pipe
- go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/rpc.out -covermode=atomic ./pkg/rpc
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/socket.out -covermode=atomic ./pkg/socket
go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage-ci/internal.out -covermode=atomic ./internal
- name: Run fuzz tests
diff --git a/.github/workflows/macos.yml b/.github/workflows/macos.yml
index b7e1a55..1663cbe 100644
--- a/.github/workflows/macos.yml
+++ b/.github/workflows/macos.yml
@@ -29,6 +29,5 @@ jobs:
run: |
go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./pkg/frame
go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./pkg/pipe
- go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./pkg/rpc
go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./pkg/socket
go test -v -race -cover -tags=debug -coverpkg=./... -covermode=atomic ./internal
diff --git a/.github/workflows/windows.yml b/.github/workflows/windows.yml
index c9d6347..dbe50af 100644
--- a/.github/workflows/windows.yml
+++ b/.github/workflows/windows.yml
@@ -35,6 +35,5 @@ jobs:
run: |
go test -v -race -tags=debug ./pkg/frame
go test -v -race -tags=debug ./pkg/pipe
- go test -v -race -tags=debug ./pkg/rpc
go test -v -race -tags=debug ./pkg/socket
go test -v -race -tags=debug ./internal
diff --git a/.gitignore b/.gitignore
index cb5f39b..8520e93 100644
--- a/.gitignore
+++ b/.gitignore
@@ -18,3 +18,4 @@
.idea
bench
**vendor
+.claude/settings.local.json
diff --git a/Makefile b/Makefile
index ae9ef02..29c3ff9 100644
--- a/Makefile
+++ b/Makefile
@@ -3,7 +3,6 @@ test:
go test -v -race -cover -tags=debug ./internal
go test -v -race -cover -tags=debug ./pkg/frame
go test -v -race -cover -tags=debug ./pkg/pipe
- go test -v -race -cover -tags=debug ./pkg/rpc
go test -v -race -cover -tags=debug ./pkg/socket
.PHONY: fuzz
diff --git a/README.md b/README.md
index 4ade3ab..60a65d1 100644
--- a/README.md
+++ b/README.md
@@ -1,4 +1,4 @@
-High-performance PHP-to-Golang IPC bridge
+High-performance PHP-to-Golang IPC transport
=================================================
[](https://godoc.org/github.com/roadrunner-server/goridge/v4)

@@ -10,8 +10,7 @@ High-performance PHP-to-Golang IPC bridge
-Goridge is high performance PHP-to-Golang codec library which works over native PHP sockets and Golang net/rpc package.
-The library allows you to call Go service methods from PHP with a minimal footprint, structures and `[]byte` support.
+Goridge is a binary frame protocol with pipe and socket transports for inter-process communication between PHP and Go (or between any two processes that speak the goridge frame format). It exposes a 12-byte CRC-checked framing header, a small `Relay` interface, and ready-made pipe and TCP/Unix-socket implementations.
PHP source code can be found in this repository: [goridge-php](https://github.com/roadrunner-php/goridge)
@@ -21,65 +20,83 @@ See https://github.com/roadrunner-server/roadrunner - High-performance PHP appli
Features
--------
-- no external dependencies or services, drop-in (64bit PHP version required)
+- no external dependencies, drop-in (64-bit PHP required for the PHP side)
- low message footprint (12 bytes over any binary payload), binary error detection
- CRC32 header verification
-- sockets over TCP or Unix (ext-sockets is required), standard pipes
-- very fast (300k calls per second on Ryzen 1700X over 20 threads)
-- native `net/rpc` integration, ability to connect to existed application(s)
-- standalone protocol usage
-- structured data transfer using json
-- `[]byte` transfer, including big payloads
+- sockets over TCP or Unix domain, standard pipes
+- standalone protocol usage; bring your own RPC layer or none at all
+- structured data transfer (json / msgpack / proto / gob / raw via codec flags)
+- `[]byte` transfer, including large payloads
- service, message and transport level error handling
- hackable
-- works on Windows
-- unix sockets powered (also on Windows)
+- works on Windows (named pipes, Unix-domain sockets via AF_UNIX)
Installation
------------
-```go
-GO111MODULE=on go get github.com/roadrunner-server/goridge/v4
+```bash
+go get github.com/roadrunner-server/goridge/v4
```
-### Sample of usage
+Sample usage
+------------
+
+A minimal echo server using the socket relay:
+
```go
package main
import (
- "fmt"
+ "log"
"net"
- "net/rpc"
- goridgeRpc "github.com/roadrunner-server/goridge/v4/pkg/rpc"
+ "github.com/roadrunner-server/goridge/v4/pkg/frame"
+ "github.com/roadrunner-server/goridge/v4/pkg/socket"
)
-type App struct{}
-
-func (s *App) Hi(name string, r *string) error {
- *r = fmt.Sprintf("Hello, %s!", name)
- return nil
-}
-
func main() {
- ln, err := net.Listen("tcp", ":6001")
+ ln, err := net.Listen("tcp", "127.0.0.1:6001")
if err != nil {
- panic(err)
+ log.Fatal(err)
}
-
- _ = rpc.Register(new(App))
-
for {
conn, err := ln.Accept()
if err != nil {
- continue
+ log.Printf("accept: %v", err)
+ return
+ }
+ go serve(conn)
+ }
+}
+
+func serve(conn net.Conn) {
+ relay := socket.NewSocketRelay(conn)
+ defer func() { _ = relay.Close() }()
+
+ for {
+ in := frame.NewFrame()
+ if err := relay.Receive(in); err != nil {
+ return
+ }
+
+ // Mirror the incoming header so the response preserves the client's
+ // version, codec and any options/stream bits — a faithful echo.
+ out := frame.NewFrame()
+ out.WriteVersion(out.Header(), in.ReadVersion(in.Header()))
+ out.WriteFlags(out.Header(), in.ReadFlags())
+ out.WritePayload(in.Payload())
+ out.WritePayloadLen(out.Header(), uint32(len(in.Payload())))
+ out.WriteCRC(out.Header())
+
+ if err := relay.Send(out); err != nil {
+ return
}
- _ = conn
- go rpc.ServeCodec(goridgeRpc.NewCodec(conn))
}
}
```
+For the pipe-based variant, swap `socket.NewSocketRelay(conn)` for `pipe.NewPipeRelay(in, out)` where `in` is an `io.ReadCloser` and `out` is an `io.WriteCloser` — typically `os.Stdin`/`os.Stdout` on a child process.
+
License
-------
diff --git a/benchmarks/main.go b/benchmarks/main.go
deleted file mode 100644
index 0217eb9..0000000
--- a/benchmarks/main.go
+++ /dev/null
@@ -1,136 +0,0 @@
-package main
-
-import (
- "context"
- "net"
- "net/http"
- "net/http/pprof"
- "net/rpc"
- "time"
-
- goridgeRpc "github.com/roadrunner-server/goridge/v4/pkg/rpc"
- "github.com/roadrunner-server/goridge/v4/tests"
-)
-
-func main() {
- s := NewServer()
- go func() {
- _ = s.Start("localhost:6061")
- }()
- defer func() {
- _ = s.Stop(context.Background())
- }()
-
- time.Sleep(time.Second * 5)
- // create an pprof server
- server()
- time.Sleep(time.Second * 1)
-
- client()
-}
-
-// testService sample
-type testService struct{}
-
-func (s *testService) ProtoMessage(payload *tests.Payload, item *tests.Item) error {
- (*item).Key = payload.Items[0].Key
- return nil
-}
-
-func client() {
- err := rpc.RegisterName("testbench", new(testService))
- if err != nil {
- panic(err)
- }
-
- conn, err := net.Dial("tcp", "127.0.0.1:18321")
- if err != nil {
- panic(err)
- }
-
- client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn))
- defer func() {
- err := client.Close()
- if err != nil {
- panic(err)
- }
- }()
-
- tt := time.Now().String()
-
- keysP := &tests.Payload{
- Storage: "memory-rr",
- Items: []*tests.Item{
- {
- Key: "a",
- Value: "hhhhhhhhhhhhhhhhheeeeeeeeeeeeeeeeeeeeeeeeeeeelllllllllllllllllllllllllllllllllloooooooooooooooooooooooooooooo",
- Timeout: tt,
- },
- {
- Key: "b",
- Value: "hhhhhhhhhhhhhhhhheeeeeeeeeeeeeeeeeeeeeeeeeeeelllllllllllllllllllllllllllllllllloooooooooooooooooooooooooooooo",
- Timeout: tt,
- },
- {
- Key: "c",
- Value: "hhhhhhhhhhhhhhhhheeeeeeeeeeeeeeeeeeeeeeeeeeeelllllllllllllllllllllllllllllllllloooooooooooooooooooooooooooooo",
- Timeout: tt,
- },
- },
- }
-
- item := &tests.Item{}
- for range 1000000 {
- err = client.Call("testbench.ProtoMessage", keysP, item)
- if err != nil {
- panic(err)
- }
- }
-}
-
-func server() {
- ln, err := net.Listen("tcp", "127.0.0.1:18321")
- if err != nil {
- panic(err)
- }
-
- go func() {
- for {
- conn, err2 := ln.Accept()
- if err2 != nil {
- panic(err2)
- }
- rpc.ServeCodec(goridgeRpc.NewCodec(conn))
- }
- }()
-}
-
-// Server is a HTTP server for debugging.
-type Server struct {
- srv *http.Server
-}
-
-// NewServer creates new HTTP server for debugging.
-func NewServer() Server {
- mux := http.NewServeMux()
-
- mux.HandleFunc("/debug/pprof/", pprof.Index)
- mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
- mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
- mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
- mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
-
- return Server{srv: &http.Server{Handler: mux}}
-}
-
-// Start debug server.
-func (s *Server) Start(addr string) error {
- s.srv.Addr = addr
-
- return s.srv.ListenAndServe()
-}
-
-// Stop debug server.
-func (s *Server) Stop(ctx context.Context) error {
- return s.srv.Shutdown(ctx)
-}
diff --git a/go.mod b/go.mod
index 119d277..0518e1a 100644
--- a/go.mod
+++ b/go.mod
@@ -3,9 +3,8 @@ module github.com/roadrunner-server/goridge/v4
go 1.26
require (
- github.com/roadrunner-server/errors v1.4.1
+ github.com/roadrunner-server/errors v1.5.0
github.com/stretchr/testify v1.11.1
- google.golang.org/protobuf v1.36.11
)
require (
diff --git a/go.sum b/go.sum
index d152103..54c320a 100644
--- a/go.sum
+++ b/go.sum
@@ -1,8 +1,6 @@
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
-github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
-github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
@@ -12,15 +10,13 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
-github.com/roadrunner-server/errors v1.4.1 h1:LKNeaCGiwd3t8IaL840ZNF3UA9yDQlpvHnKddnh0YRQ=
-github.com/roadrunner-server/errors v1.4.1/go.mod h1:qeffnIKG0e4j1dzGpa+OGY5VKSfMphizvqWIw8s2lAo=
+github.com/roadrunner-server/errors v1.5.0 h1:unG7LKIZrSzkCCF3YLRLA5VyqE0KKomofXVJUXJe00g=
+github.com/roadrunner-server/errors v1.5.0/go.mod h1:g9fo/T2C13cWRDR9PW1r0ZAOSQfNhWAZawyfkGiaHuI=
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
-google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE=
-google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
diff --git a/pkg/rpc/client.go b/pkg/rpc/client.go
deleted file mode 100644
index 788310e..0000000
--- a/pkg/rpc/client.go
+++ /dev/null
@@ -1,229 +0,0 @@
-package rpc
-
-import (
- "bytes"
- "encoding/gob"
- "encoding/json"
- "io"
- "net/rpc"
- "sync"
-
- "github.com/roadrunner-server/errors"
- "github.com/roadrunner-server/goridge/v4/pkg/frame"
- "github.com/roadrunner-server/goridge/v4/pkg/relay"
- "github.com/roadrunner-server/goridge/v4/pkg/socket"
- "google.golang.org/protobuf/proto"
-)
-
-// ClientCodec is codec for goridge connection.
-type ClientCodec struct {
- // bytes sync.Pool
- bPool sync.Pool
- fPool sync.Pool
-
- relay relay.Relay
- closed bool
- frame *frame.Frame
-}
-
-// NewClientCodec initiates new server rpc codec over socket connection.
-func NewClientCodec(rwc io.ReadWriteCloser) *ClientCodec {
- return &ClientCodec{
- bPool: sync.Pool{New: func() any {
- return new(bytes.Buffer)
- }},
-
- fPool: sync.Pool{New: func() any {
- return frame.NewFrame()
- }},
-
- relay: socket.NewSocketRelay(rwc),
- }
-}
-
-func (c *ClientCodec) get() *bytes.Buffer {
- return c.bPool.Get().(*bytes.Buffer)
-}
-
-func (c *ClientCodec) put(b *bytes.Buffer) {
- b.Reset()
- c.bPool.Put(b)
-}
-
-func (c *ClientCodec) getFrame() *frame.Frame {
- return c.fPool.Get().(*frame.Frame)
-}
-
-func (c *ClientCodec) putFrame(f *frame.Frame) {
- f.Reset()
- c.fPool.Put(f)
-}
-
-// WriteRequest writes request to the connection. Sequential.
-func (c *ClientCodec) WriteRequest(r *rpc.Request, body any) error {
- const op = errors.Op("goridge_write_request")
-
- // get a frame from the pool
- fr := c.getFrame()
- defer c.putFrame(fr)
-
- // get a buffer from the pool
- buf := c.get()
- defer c.put(buf)
-
- // writeServiceMethod to the buffer
- buf.WriteString(r.ServiceMethod)
- // use fallback as gob
- fr.WriteFlags(fr.Header(), frame.CodecGob)
-
- if body != nil {
- // if body is proto message, use proto codec
- switch m := body.(type) {
- // check if message is PROTO
- case proto.Message:
- fr.WriteFlags(fr.Header(), frame.CodecProto)
- b, err := proto.Marshal(m)
- if err != nil {
- return errors.E(op, err)
- }
- buf.Write(b)
- default:
- enc := gob.NewEncoder(buf)
- // write data to the gob
- err := enc.Encode(body)
- if err != nil {
- return errors.E(op, err)
- }
- }
- }
-
- // SEQ_ID + METHOD_NAME_LEN
- fr.WriteOptions(fr.HeaderPtr(), uint32(r.Seq), uint32(len(r.ServiceMethod))) //nolint:gosec
- fr.WriteVersion(fr.Header(), frame.Version1)
-
- fr.WritePayloadLen(fr.Header(), uint32(buf.Len())) //nolint:gosec
- fr.WritePayload(buf.Bytes())
- fr.WriteCRC(fr.Header())
-
- err := c.relay.Send(fr)
- if err != nil {
- return errors.E(op, err)
- }
- return nil
-}
-
-// ReadResponseHeader reads response from the connection.
-func (c *ClientCodec) ReadResponseHeader(r *rpc.Response) error {
- const op = errors.Op("client_read_response_header")
-
- // get a frame from sync.Pool
- fr := c.getFrame()
-
- err := c.relay.Receive(fr)
- if err != nil {
- return errors.E(op, err)
- }
- if !fr.VerifyCRC(fr.Header()) {
- return errors.E(op, errors.Str("CRC verification failed"))
- }
-
- // save the frame after CRC verification
- c.frame = fr
-
- opts := fr.ReadOptions(fr.Header())
- if len(opts) != 2 {
- return errors.E(op, errors.Str(errOpts))
- }
-
- if int(opts[1]) > len(fr.Payload()) {
- return errors.E(op, errors.Str("method length offset exceeds payload bounds"))
- }
-
- // check for error
- if fr.ReadFlags()&frame.ERROR != 0 {
- r.Error = string(fr.Payload()[opts[1]:])
- }
-
- r.Seq = uint64(opts[0])
- r.ServiceMethod = string(fr.Payload()[:opts[1]])
-
- return nil
-}
-
-// ReadResponseBody response from the connection.
-func (c *ClientCodec) ReadResponseBody(out any) error {
- const op = errors.Op("client_read_response_body")
-
- // put frame after response was sent
- defer c.putFrame(c.frame)
- // if there is no out interface to unmarshall the body, skip
- if out == nil {
- return nil
- }
-
- flags := c.frame.ReadFlags()
-
- opts := c.frame.ReadOptions(c.frame.Header())
- if len(opts) != 2 {
- return errors.E(op, errors.Str(errOpts))
- }
-
- if int(opts[1]) > len(c.frame.Payload()) {
- return errors.E(op, errors.Str("method length offset exceeds payload bounds"))
- }
-
- payload := c.frame.Payload()[opts[1]:]
- if len(payload) == 0 {
- return nil
- }
-
- switch { //nolint:dupl
- case flags&frame.CodecProto != 0:
- // check if the out message is a correct proto.Message
- // instead send an error
- if pOut, ok := out.(proto.Message); ok {
- err := proto.Unmarshal(payload, pOut)
- if err != nil {
- return errors.E(op, err)
- }
- return nil
- }
-
- return errors.E(op, errors.Str("message type is not a proto"))
- case flags&frame.CodecJSON != 0:
- return json.Unmarshal(payload, out)
- case flags&frame.CodecGob != 0:
- buf := c.get()
- defer c.put(buf)
-
- dec := gob.NewDecoder(buf)
- buf.Write(payload)
-
- err := dec.Decode(out)
- if err != nil {
- return errors.E(op, err)
- }
-
- return nil
- case flags&frame.CodecRaw != 0:
- if raw, ok := out.(*[]byte); ok {
- *raw = append(*raw, payload...)
- }
-
- return nil
- case flags&frame.CodecMsgpack != 0:
- return errors.E(op, errors.Str(errMsgpackV4))
- default:
- return errors.E(op, errors.Str("unknown decoder used in frame"))
- }
-}
-
-// Close closes the client connection.
-func (c *ClientCodec) Close() error {
- if c.closed {
- return nil
- }
-
- c.closed = true
- return c.relay.Close()
-}
diff --git a/pkg/rpc/client_server_test.go b/pkg/rpc/client_server_test.go
deleted file mode 100644
index dd02418..0000000
--- a/pkg/rpc/client_server_test.go
+++ /dev/null
@@ -1,407 +0,0 @@
-package rpc
-
-import (
- "crypto/rand"
- "net"
- "net/rpc"
- "strings"
- "sync"
- "testing"
-
- "github.com/roadrunner-server/errors"
- "github.com/roadrunner-server/goridge/v4/tests"
- "github.com/stretchr/testify/assert"
- "github.com/stretchr/testify/require"
- "google.golang.org/protobuf/proto"
-)
-
-// testService sample
-type testService struct{}
-
-// Payload sample
-type Payload struct {
- Name string `json:"name"`
- Value int `json:"value"`
- Keys map[string]string `json:"keys,omitempty"`
-}
-
-// Echo returns incoming message
-func (s *testService) Echo(msg string, r *string) error {
- *r = msg
- return nil
-}
-
-// Echo returns error
-func (s *testService) EchoR(_ string, r *string) error {
- *r = "error"
- return errors.Str("echoR error")
-}
-
-// Process performs payload conversion
-func (s *testService) Process(msg Payload, r *Payload) error {
- r.Name = strings.ToUpper(msg.Name)
- r.Value = -msg.Value
-
- if len(msg.Keys) != 0 {
- r.Keys = make(map[string]string)
- for n, v := range msg.Keys {
- r.Keys[v] = n
- }
- }
-
- return nil
-}
-
-// EchoBinary work over binary data
-func (s *testService) EchoBinary(msg []byte, out *[]byte) error {
- *out = append(*out, msg...)
- return nil
-}
-
-// Test Proto
-func (s *testService) ProtoMessage(payload *tests.Payload, item *tests.Item) error {
- (*item).Key = payload.Items[0].Key
- return nil
-}
-
-func TestClientServerProto(t *testing.T) {
- ln, err := net.Listen("tcp", "127.0.0.1:18935")
- assert.NoError(t, err)
-
- go func() {
- for {
- conn, err2 := ln.Accept()
- assert.NoError(t, err2)
- rpc.ServeCodec(NewCodec(conn))
- }
- }()
-
- err = rpc.RegisterName("test123", new(testService))
- assert.NoError(t, err)
-
- conn, err := net.Dial("tcp", "127.0.0.1:18935")
- assert.NoError(t, err)
-
- client := rpc.NewClientWithCodec(NewClientCodec(conn))
- keysP := &tests.Payload{
- Storage: "memory-rr",
- Items: []*tests.Item{
- {
- Key: "a",
- },
- {
- Key: "b",
- },
- {
- Key: "c",
- },
- },
- }
-
- item := &tests.Item{}
- assert.NoError(t, client.Call("test123.ProtoMessage", keysP, item))
- assert.Equal(t, "a", item.Key)
-
- t.Cleanup(func() {
- err2 := client.Close()
- if err2 != nil {
- t.Fatal(err2)
- }
- })
-}
-
-func TestClientServerProtoError(t *testing.T) {
- ln, err := net.Listen("tcp", "127.0.0.1:18321")
- assert.NoError(t, err)
-
- go func() {
- for {
- conn, err2 := ln.Accept()
- assert.NoError(t, err2)
- rpc.ServeCodec(NewCodec(conn))
- }
- }()
-
- err = rpc.RegisterName("testError", new(testService))
- assert.NoError(t, err)
-
- conn, err := net.Dial("tcp", "127.0.0.1:18321")
- assert.NoError(t, err)
-
- client := rpc.NewClientWithCodec(NewClientCodec(conn))
- keysP := &tests.Payload{
- Storage: "memory-rr",
- Items: []*tests.Item{
- {
- Key: "a",
- },
- {
- Key: "b",
- },
- {
- Key: "c",
- },
- },
- }
-
- keys, err := proto.Marshal(keysP)
- if err != nil {
- t.Fatal(err)
- }
-
- item := &tests.Item{}
- assert.Error(t, client.Call("testError.ProtoMessage", keys, item))
-
- t.Cleanup(func() {
- err2 := client.Close()
- if err2 != nil {
- t.Fatal(err2)
- }
- })
-}
-
-func TestClientServerJSON(t *testing.T) {
- ln, err := net.Listen("tcp", "127.0.0.1:18936")
- assert.NoError(t, err)
-
- go func() {
- for {
- conn, err2 := ln.Accept()
- assert.NoError(t, err2)
- rpc.ServeCodec(NewCodec(conn))
- }
- }()
-
- err = rpc.RegisterName("test2", new(testService))
- assert.NoError(t, err)
-
- conn, err := net.Dial("tcp", "127.0.0.1:18936")
- assert.NoError(t, err)
-
- client := rpc.NewClientWithCodec(NewClientCodec(conn))
-
- var rp = Payload{}
- assert.NoError(t, client.Call("test2.Process", Payload{
- Name: "name",
- Value: 1000,
- Keys: map[string]string{"key": "value"},
- }, &rp))
-
- assert.Equal(t, "NAME", rp.Name)
- assert.Equal(t, -1000, rp.Value)
- assert.Equal(t, "key", rp.Keys["value"])
-
- t.Cleanup(func() {
- err2 := client.Close()
- if err2 != nil {
- t.Fatal(err2)
- }
- })
-}
-
-func TestClientServerRaw(t *testing.T) {
- ln, err := net.Listen("tcp", "127.0.0.1:18937")
- assert.NoError(t, err)
-
- go func() {
- for {
- conn, err2 := ln.Accept()
- assert.NoError(t, err2)
- rpc.ServeCodec(NewCodec(conn))
- }
- }()
-
- err = rpc.RegisterName("testBinary", new(testService))
- assert.NoError(t, err)
-
- conn, err := net.Dial("tcp", "127.0.0.1:18937")
- assert.NoError(t, err)
-
- client := rpc.NewClientWithCodec(NewClientCodec(conn))
-
- data := make([]byte, 100000)
- _, _ = rand.Read(data)
-
- resp := make([]byte, 0, 10000)
- assert.NoError(t, client.Call("testBinary.EchoBinary", data, &resp))
- require.Equal(t, data, resp)
-
- t.Cleanup(func() {
- err2 := client.Close()
- if err2 != nil {
- t.Fatal(err2)
- }
- })
-}
-
-func TestClientServerError(t *testing.T) {
- ln, err := net.Listen("tcp", "127.0.0.1:12336")
- assert.NoError(t, err)
-
- go func() {
- for {
- conn, err2 := ln.Accept()
- assert.NoError(t, err2)
- rpc.ServeCodec(NewCodec(conn))
- }
- }()
-
- err = rpc.RegisterName("testError2", new(testService))
- assert.NoError(t, err)
-
- conn, err := net.Dial("tcp", "127.0.0.1:12336")
- assert.NoError(t, err)
-
- client := rpc.NewClientWithCodec(NewClientCodec(conn))
-
- err = client.Call("unknown", nil, nil)
- assert.Error(t, err)
- assert.Equal(t, "rpc: service/method request ill-formed: unknown", err.Error())
-
- t.Cleanup(func() {
- err2 := client.Close()
- if err2 != nil {
- t.Fatal(err2)
- }
- })
-}
-
-func TestClientServerConcurrent(t *testing.T) {
- ln, err := net.Listen("tcp", "127.0.0.1:22385")
- if err != nil {
- panic(err)
- }
-
- go func() {
- for {
- conn, err2 := ln.Accept()
- assert.NoError(t, err2)
- rpc.ServeCodec(NewCodec(conn))
- }
- }()
-
- err = rpc.RegisterName("test", new(testService))
- assert.NoError(t, err)
-
- conn, err := net.Dial("tcp", "127.0.0.1:22385")
- assert.NoError(t, err)
-
- client := rpc.NewClientWithCodec(NewClientCodec(conn))
-
- wg := &sync.WaitGroup{}
- wg.Add(300)
-
- // this test uses random inputs
- for range 100 {
- go func() {
- defer wg.Done()
- var rp = Payload{}
- b := make([]byte, 15)
- _, err := rand.Read(b)
- assert.NoError(t, err)
-
- <-client.Go("test.Process", Payload{
- Name: string(b),
- Value: 1000,
- Keys: map[string]string{"key": string(b)},
- }, &rp, nil).Done
-
- assert.Equal(t, strings.ToUpper(string(b)), rp.Name)
- assert.Equal(t, -1000, rp.Value)
- assert.Equal(t, "key", rp.Keys[string(b)])
- }()
-
- go func() {
- var rs = ""
- b := make([]byte, 15)
- _, err := rand.Read(b)
- assert.NoError(t, err)
- <-client.Go("test.Echo", string(b), &rs, nil).Done
- assert.Equal(t, string(b), rs)
- wg.Done()
- }()
-
- go func() {
- rs := ""
- rb := make([]byte, 0)
-
- r := make([]byte, 15)
- _, err := rand.Read(r)
- assert.NoError(t, err)
- a := client.Go("test.Echo", string(r), &rs, nil)
- b := client.Go("test.EchoBinary", []byte("hello world"), &rb, nil)
- c := client.Go("test.EchoR", "hi", &rs, nil)
-
- <-a.Done
- assert.Equal(t, string(r), rs)
- <-b.Done
- assert.Equal(t, []byte("hello world"), rb)
- resC := <-c.Done
- assert.Error(t, resC.Error)
- wg.Done()
- }()
- }
-
- wg.Wait()
-
- wg2 := &sync.WaitGroup{}
- wg2.Add(300)
-
- for range 100 {
- go func() {
- defer wg2.Done()
- var rp = Payload{}
- b := make([]byte, 15)
- _, err := rand.Read(b)
- assert.NoError(t, err)
-
- assert.NoError(t, client.Call("test.Process", Payload{
- Name: string(b),
- Value: 1000,
- Keys: map[string]string{"key": string(b)},
- }, &rp))
-
- assert.Equal(t, strings.ToUpper(string(b)), rp.Name)
- assert.Equal(t, -1000, rp.Value)
- assert.Equal(t, "key", rp.Keys[string(b)])
- }()
-
- go func() {
- defer wg2.Done()
- var rs = ""
- r := make([]byte, 15)
- _, err := rand.Read(r)
- assert.NoError(t, err)
-
- assert.NoError(t, client.Call("test.Echo", string(r), &rs))
- assert.Equal(t, string(r), rs)
- }()
-
- go func() {
- defer wg2.Done()
- rs := ""
- rb := make([]byte, 0, len("hello world"))
-
- r := make([]byte, 15)
- _, err := rand.Read(r)
- assert.NoError(t, err)
-
- assert.NoError(t, client.Call("test.Echo", string(r), &rs))
- assert.Equal(t, string(r), rs)
-
- assert.NoError(t, client.Call("test.EchoBinary", r, &rb))
- assert.Equal(t, r, rb)
-
- assert.Error(t, client.Call("test.EchoR", "hi", &rs))
- }()
- }
-
- wg2.Wait()
-
- t.Cleanup(func() {
- err2 := client.Close()
- if err2 != nil {
- t.Fatal(err2)
- }
- })
-}
diff --git a/pkg/rpc/codec.go b/pkg/rpc/codec.go
deleted file mode 100644
index b6a74db..0000000
--- a/pkg/rpc/codec.go
+++ /dev/null
@@ -1,355 +0,0 @@
-package rpc
-
-import (
- "bytes"
- "encoding/gob"
- "encoding/json"
- "io"
- "net/rpc"
- "sync"
-
- "github.com/roadrunner-server/errors"
- "github.com/roadrunner-server/goridge/v4/pkg/frame"
- "github.com/roadrunner-server/goridge/v4/pkg/relay"
- "github.com/roadrunner-server/goridge/v4/pkg/socket"
- "google.golang.org/protobuf/proto"
-)
-
-const errOpts = "should be 2 options. SEQ_ID and METHOD_LEN"
-const errMsgpackV4 = "msgpack codec is not supported in v4"
-
-// Codec represent net/rpc bridge over Goridge socket relay.
-type Codec struct {
- relay relay.Relay
- closed bool
- frame *frame.Frame
- codec sync.Map
-
- bPool sync.Pool
- fPool sync.Pool
-}
-
-// NewCodec initiates new server rpc codec over socket connection.
-func NewCodec(rwc io.ReadWriteCloser) *Codec {
- return &Codec{
- relay: socket.NewSocketRelay(rwc),
- codec: sync.Map{},
-
- bPool: sync.Pool{New: func() any {
- return new(bytes.Buffer)
- }},
-
- fPool: sync.Pool{New: func() any {
- return frame.NewFrame()
- }},
- }
-}
-
-// NewCodecWithRelay initiates new server rpc codec with a relay of choice.
-func NewCodecWithRelay(relay relay.Relay) *Codec {
- return &Codec{relay: relay}
-}
-
-func (c *Codec) get() *bytes.Buffer {
- return c.bPool.Get().(*bytes.Buffer)
-}
-
-func (c *Codec) put(b *bytes.Buffer) {
- b.Reset()
- c.bPool.Put(b)
-}
-
-func (c *Codec) getFrame() *frame.Frame {
- return c.fPool.Get().(*frame.Frame)
-}
-
-func (c *Codec) putFrame(f *frame.Frame) {
- f.Reset()
- c.fPool.Put(f)
-}
-
-// WriteResponse marshals response, byte slice or error to remote.
-func (c *Codec) WriteResponse(r *rpc.Response, body any) error { //nolint:funlen
- const op = errors.Op("goridge_write_response")
- fr := c.getFrame()
- defer c.putFrame(fr)
-
- // SEQ_ID + METHOD_NAME_LEN
- fr.WriteOptions(fr.HeaderPtr(), uint32(r.Seq), uint32(len(r.ServiceMethod))) //nolint:gosec
- // Write protocol version
- fr.WriteVersion(fr.Header(), frame.Version1)
-
- // load and delete associated codec to not waste memory
- // because we write it to the fr and don't need more information about it
- codec, ok := c.codec.LoadAndDelete(r.Seq)
- if !ok {
- // fallback codec
- fr.WriteFlags(fr.Header(), frame.CodecGob)
- } else {
- fr.WriteFlags(fr.Header(), codec.(byte))
- }
-
- // if error returned, we sending it via relay and return error from WriteResponse
- if r.Error != "" {
- // Append error flag
- return c.handleError(r, fr, r.Error)
- }
-
- if codec == nil {
- return c.handleError(r, fr, errors.E(op, errors.Str("codec not found for response")).Error())
- }
-
- switch {
- case codec.(byte)&frame.CodecProto != 0:
- d, err := proto.Marshal(body.(proto.Message))
- if err != nil {
- return c.handleError(r, fr, err.Error())
- }
-
- // initialize buffer
- buf := c.get()
- defer c.put(buf)
-
- buf.Grow(len(d) + len(r.ServiceMethod))
- // writeServiceMethod to the buffer
- buf.WriteString(r.ServiceMethod)
- buf.Write(d)
-
- fr.WritePayloadLen(fr.Header(), uint32(buf.Len())) //nolint:gosec
- // copy inside
- fr.WritePayload(buf.Bytes())
- fr.WriteCRC(fr.Header())
- // send buffer
- return c.relay.Send(fr)
- case codec.(byte)&frame.CodecRaw != 0:
- // initialize buffer
- buf := c.get()
- defer c.put(buf)
-
- switch data := body.(type) {
- case []byte:
- buf.Grow(len(data) + len(r.ServiceMethod))
- // writeServiceMethod to the buffer
- buf.WriteString(r.ServiceMethod)
- buf.Write(data)
-
- fr.WritePayloadLen(fr.Header(), uint32(buf.Len())) //nolint:gosec
- fr.WritePayload(buf.Bytes())
- case *[]byte:
- buf.Grow(len(*data) + len(r.ServiceMethod))
- // writeServiceMethod to the buffer
- buf.WriteString(r.ServiceMethod)
- buf.Write(*data)
-
- fr.WritePayloadLen(fr.Header(), uint32(buf.Len())) //nolint:gosec
- fr.WritePayload(buf.Bytes())
- default:
- return c.handleError(r, fr, "unknown Raw payload type")
- }
-
- // send buffer
- fr.WriteCRC(fr.Header())
- return c.relay.Send(fr)
-
- case codec.(byte)&frame.CodecJSON != 0:
- data, err := json.Marshal(body)
- if err != nil {
- return c.handleError(r, fr, err.Error())
- }
-
- // initialize buffer
- buf := c.get()
- defer c.put(buf)
-
- buf.Grow(len(data) + len(r.ServiceMethod))
- // writeServiceMethod to the buffer
- buf.WriteString(r.ServiceMethod)
- buf.Write(data)
-
- fr.WritePayloadLen(fr.Header(), uint32(buf.Len())) //nolint:gosec
- // copy inside
- fr.WritePayload(buf.Bytes())
- fr.WriteCRC(fr.Header())
- // send buffer
- return c.relay.Send(fr)
-
- case codec.(byte)&frame.CodecMsgpack != 0:
- return c.handleError(r, fr, errMsgpackV4)
-
- case codec.(byte)&frame.CodecGob != 0:
- // initialize buffer
- buf := c.get()
- defer c.put(buf)
-
- buf.WriteString(r.ServiceMethod)
-
- dec := gob.NewEncoder(buf)
- err := dec.Encode(body)
- if err != nil {
- return errors.E(op, err)
- }
-
- fr.WritePayloadLen(fr.Header(), uint32(buf.Len())) //nolint:gosec
- // copy inside
- fr.WritePayload(buf.Bytes())
- fr.WriteCRC(fr.Header())
- // send buffer
- return c.relay.Send(fr)
- default:
- return c.handleError(r, fr, errors.E(op, errors.Str("unknown codec")).Error())
- }
-}
-
-func (c *Codec) handleError(r *rpc.Response, fr *frame.Frame, err string) error {
- buf := c.get()
- defer c.put(buf)
-
- // write all possible errors
- buf.WriteString(r.ServiceMethod)
-
- const op = errors.Op("handle codec error")
- fr.WriteFlags(fr.Header(), frame.ERROR)
- // error should be here
- if err != "" {
- buf.WriteString(err)
- }
- fr.WritePayloadLen(fr.Header(), uint32(buf.Len())) //nolint:gosec
- fr.WritePayload(buf.Bytes())
-
- fr.WriteCRC(fr.Header())
- _ = c.relay.Send(fr)
- return errors.E(op, errors.Str(err))
-}
-
-// ReadRequestHeader receives frame with options
-// options should have 2 values
-// [0] - integer, sequence ID
-// [1] - integer, offset for method name
-// For example:
-// 15Test.Payload
-// SEQ_ID: 15
-// METHOD_LEN: 12 and we take 12 bytes from the payload as method name
-func (c *Codec) ReadRequestHeader(r *rpc.Request) error {
- const op = errors.Op("goridge_read_request_header")
- f := c.getFrame()
-
- err := c.relay.Receive(f)
- if err != nil {
- c.putFrame(f)
- return err
- }
-
- // opts[0] sequence ID
- // opts[1] service method name offset from payload in bytes
- opts := f.ReadOptions(f.Header())
- if len(opts) != 2 {
- c.putFrame(f)
- return errors.E(op, errors.Str(errOpts))
- }
-
- if int(opts[1]) > len(f.Payload()) {
- c.putFrame(f)
- return errors.E(op, errors.Str("method length offset exceeds payload bounds"))
- }
-
- r.Seq = uint64(opts[0])
- r.ServiceMethod = string(f.Payload()[:opts[1]])
- c.frame = f
- return c.storeCodec(r, f.ReadFlags())
-}
-
-func (c *Codec) storeCodec(r *rpc.Request, flag byte) error {
- switch {
- case flag&frame.CodecProto != 0:
- c.codec.Store(r.Seq, frame.CodecProto)
- case flag&frame.CodecJSON != 0:
- c.codec.Store(r.Seq, frame.CodecJSON)
- case flag&frame.CodecRaw != 0:
- c.codec.Store(r.Seq, frame.CodecRaw)
- case flag&frame.CodecMsgpack != 0:
- return errors.E(errors.Op("store_codec"), errors.Str(errMsgpackV4))
- case flag&frame.CodecGob != 0:
- c.codec.Store(r.Seq, frame.CodecGob)
- default:
- c.codec.Store(r.Seq, frame.CodecGob)
- }
-
- return nil
-}
-
-// ReadRequestBody fetches prefixed body data and automatically unmarshal it as json. RawBody flag will populate
-// []byte lice argument for rpc method.
-func (c *Codec) ReadRequestBody(out any) error {
- const op = errors.Op("goridge_read_request_body")
- if out == nil {
- return nil
- }
-
- defer c.putFrame(c.frame)
-
- flags := c.frame.ReadFlags()
-
- opts := c.frame.ReadOptions(c.frame.Header())
- if len(opts) != 2 {
- return errors.E(op, errors.Str(errOpts))
- }
-
- if int(opts[1]) > len(c.frame.Payload()) {
- return errors.E(op, errors.Str("method length offset exceeds payload bounds"))
- }
-
- payload := c.frame.Payload()[opts[1]:]
- if len(payload) == 0 {
- return nil
- }
-
- switch { //nolint:dupl
- case flags&frame.CodecProto != 0:
- // check if the out message is a correct proto.Message
- // instead send an error
- if pOut, ok := out.(proto.Message); ok {
- err := proto.Unmarshal(payload, pOut)
- if err != nil {
- return errors.E(op, err)
- }
- return nil
- }
-
- return errors.E(op, errors.Str("message type is not a proto"))
- case flags&frame.CodecJSON != 0:
- return json.Unmarshal(payload, out)
- case flags&frame.CodecGob != 0:
- buf := c.get()
- defer c.put(buf)
-
- dec := gob.NewDecoder(buf)
- buf.Write(payload)
-
- err := dec.Decode(out)
- if err != nil {
- return errors.E(op, err)
- }
-
- return nil
- case flags&frame.CodecRaw != 0:
- if raw, ok := out.(*[]byte); ok {
- *raw = append(*raw, payload...)
- }
-
- return nil
- case flags&frame.CodecMsgpack != 0:
- return errors.E(op, errors.Str(errMsgpackV4))
- default:
- return errors.E(op, errors.Str("unknown decoder used in frame"))
- }
-}
-
-// Close underlying socket.
-func (c *Codec) Close() error {
- if c.closed {
- return nil
- }
-
- c.closed = true
- return c.relay.Close()
-}
diff --git a/pkg/rpc/codec_edge_test.go b/pkg/rpc/codec_edge_test.go
deleted file mode 100644
index ea669c2..0000000
--- a/pkg/rpc/codec_edge_test.go
+++ /dev/null
@@ -1,138 +0,0 @@
-package rpc
-
-import (
- "bytes"
- "io"
- "net/rpc"
- "testing"
-
- "github.com/roadrunner-server/goridge/v4/pkg/frame"
- "github.com/stretchr/testify/assert"
-)
-
-// nopCloserRWC wraps an io.ReadWriter with a no-op Close method.
-type nopCloserRWC struct {
- io.ReadWriter
-}
-
-func (nopCloserRWC) Close() error { return nil }
-
-func TestWriteResponse_NilCodecPanic_Bug2(t *testing.T) {
- // Bug 2 regression test:
- // When Seq is not in the sync.Map and r.Error is empty,
- // WriteResponse reaches the switch at codec.go:97 where codec is nil.
- // The type assertion codec.(byte) panics on nil interface.
- defer func() {
- r := recover()
- assert.Nil(t, r, "WriteResponse panicked on nil codec, expected no panic")
- }()
-
- buf := &bytes.Buffer{}
- c := NewCodec(nopCloserRWC{buf})
-
- resp := &rpc.Response{
- ServiceMethod: "Test.Method",
- Seq: 99999, // Seq not stored in sync.Map
- Error: "", // No error → doesn't short-circuit to handleError
- }
-
- // Previously panicked at codec.(byte) due to LoadAndDelete returning nil; ensure no panic now.
- _ = c.WriteResponse(resp, "some body")
-}
-
-func TestWriteResponse_ErrorPath_NilCodecSafe(t *testing.T) {
- // When Seq is not in sync.Map but r.Error is non-empty,
- // the code takes the handleError path before reaching the switch.
- // This should not panic.
- buf := &bytes.Buffer{}
- c := NewCodec(nopCloserRWC{buf})
-
- resp := &rpc.Response{
- ServiceMethod: "Test.Method",
- Seq: 88888,
- Error: "some error occurred",
- }
-
- // Should not panic — error path short-circuits before nil codec assertion
- err := c.WriteResponse(resp, nil)
- // handleError returns an error wrapping r.Error
- assert.Error(t, err)
-}
-
-func TestCodec_DoubleClose(t *testing.T) {
- buf := &bytes.Buffer{}
- c := NewCodec(nopCloserRWC{buf})
-
- err := c.Close()
- assert.NoError(t, err)
-
- // Second close should return nil
- err = c.Close()
- assert.NoError(t, err)
-}
-
-func TestClientCodec_DoubleClose(t *testing.T) {
- buf := &bytes.Buffer{}
- c := NewClientCodec(nopCloserRWC{buf})
-
- err := c.Close()
- assert.NoError(t, err)
-
- // Second close should return nil
- err = c.Close()
- assert.NoError(t, err)
-}
-
-func TestReadRequestBody_NilOut(t *testing.T) {
- buf := &bytes.Buffer{}
- c := NewCodec(nopCloserRWC{buf})
-
- // ReadRequestBody with nil out should return nil immediately
- err := c.ReadRequestBody(nil)
- assert.NoError(t, err)
-}
-
-func TestReadResponseBody_NilOut(t *testing.T) {
- buf := &bytes.Buffer{}
- c := NewClientCodec(nopCloserRWC{buf})
-
- // Set a valid frame so putFrame in defer doesn't panic
- c.frame = frame.NewFrame()
-
- err := c.ReadResponseBody(nil)
- assert.NoError(t, err)
-}
-
-func TestStoreCodec_AllCodecs(t *testing.T) {
- cases := []struct {
- name string
- flag byte
- wantErr bool
- }{
- {"proto", frame.CodecProto, false},
- {"json", frame.CodecJSON, false},
- {"raw", frame.CodecRaw, false},
- {"gob", frame.CodecGob, false},
- {"msgpack", frame.CodecMsgpack, true},
- }
-
- for _, tc := range cases {
- t.Run(tc.name, func(t *testing.T) {
- buf := &bytes.Buffer{}
- c := NewCodec(nopCloserRWC{buf})
-
- req := &rpc.Request{Seq: 1}
- err := c.storeCodec(req, tc.flag)
-
- if tc.wantErr {
- assert.Error(t, err)
- return
- }
-
- assert.NoError(t, err)
- val, ok := c.codec.Load(req.Seq)
- assert.True(t, ok)
- assert.Equal(t, tc.flag, val.(byte))
- })
- }
-}
diff --git a/pkg/rpc/doc.go b/pkg/rpc/doc.go
deleted file mode 100644
index da6bb6a..0000000
--- a/pkg/rpc/doc.go
+++ /dev/null
@@ -1,11 +0,0 @@
-// Package rpc provides [net/rpc] codec implementations that use the goridge
-// frame protocol as a transport.
-//
-// [Codec] is a server-side codec (implements [rpc.ServerCodec]) and
-// [ClientCodec] is the corresponding client-side codec (implements
-// [rpc.ClientCodec]). Both support multiple serialization formats selected
-// per-call via frame flags: Proto (protobuf), JSON, Gob, and Raw byte slices.
-//
-// Frames and byte buffers are managed through [sync.Pool] to reduce
-// allocations on the hot path.
-package rpc
diff --git a/tests/issues/issue_185_test.go b/tests/issues/issue_185_test.go
deleted file mode 100644
index 589eec4..0000000
--- a/tests/issues/issue_185_test.go
+++ /dev/null
@@ -1,60 +0,0 @@
-package issues
-
-import (
- "context"
- "fmt"
- "net"
- "net/rpc"
- "os/exec"
- "testing"
- "time"
-
- goridgeRpc "github.com/roadrunner-server/goridge/v4/pkg/rpc"
- "github.com/stretchr/testify/assert"
- "github.com/stretchr/testify/require"
-)
-
-type App struct{}
-
-func (s *App) Hi(name string, r *string) error {
- *r = fmt.Sprintf("Hello, %s!", name)
- return nil
-}
-
-func TestRPC_Issue185(t *testing.T) {
- var lc net.ListenConfig
- ln, err := lc.Listen(context.Background(), "tcp", "127.0.0.1:6001")
- require.NoError(t, err)
-
- err = rpc.Register(new(App))
- require.NoError(t, err)
-
- stopCh := make(chan struct{}, 1)
-
- go func() {
- time.Sleep(time.Second * 2)
- out, err := exec.CommandContext(context.Background(), "php", "../php_test_files/issue_185.php").Output()
- assert.NoError(t, err)
-
- assert.Equal(t, out, []byte("Hello, Antony!"))
-
- stopCh <- struct{}{}
- }()
-
- go func() {
- for range stopCh {
- _ = ln.Close()
- }
- }()
-
- for {
- conn, err := ln.Accept()
- if err != nil {
- return
- }
-
- fmt.Printf("New connection from %s\n", conn.RemoteAddr().String())
-
- go rpc.ServeCodec(goridgeRpc.NewCodec(conn))
- }
-}
diff --git a/tests/message.pb.go b/tests/message.pb.go
deleted file mode 100644
index 1365cd4..0000000
--- a/tests/message.pb.go
+++ /dev/null
@@ -1,301 +0,0 @@
-// Code generated by protoc-gen-go. DO NOT EDIT.
-// versions:
-// protoc-gen-go v1.27.1
-// protoc v3.17.3
-// source: message.proto
-
-package tests
-
-import (
- reflect "reflect"
- sync "sync"
-
- protoreflect "google.golang.org/protobuf/reflect/protoreflect"
- protoimpl "google.golang.org/protobuf/runtime/protoimpl"
-)
-
-const (
- // Verify that this generated code is sufficiently up-to-date.
- _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
- // Verify that runtime/protoimpl is sufficiently up-to-date.
- _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
-)
-
-type Payload struct {
- state protoimpl.MessageState
- sizeCache protoimpl.SizeCache
- unknownFields protoimpl.UnknownFields
-
- // could be an enum in the future
- Storage string `protobuf:"bytes,1,opt,name=storage,proto3" json:"storage,omitempty"`
- Items []*Item `protobuf:"bytes,2,rep,name=items,proto3" json:"items,omitempty"`
-}
-
-func (x *Payload) Reset() {
- *x = Payload{}
- if protoimpl.UnsafeEnabled {
- mi := &file_message_proto_msgTypes[0]
- ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
- ms.StoreMessageInfo(mi)
- }
-}
-
-func (x *Payload) String() string {
- return protoimpl.X.MessageStringOf(x)
-}
-
-func (*Payload) ProtoMessage() {}
-
-func (x *Payload) ProtoReflect() protoreflect.Message {
- mi := &file_message_proto_msgTypes[0]
- if protoimpl.UnsafeEnabled && x != nil {
- ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
- if ms.LoadMessageInfo() == nil {
- ms.StoreMessageInfo(mi)
- }
- return ms
- }
- return mi.MessageOf(x)
-}
-
-// Deprecated: Use Payload.ProtoReflect.Descriptor instead.
-func (*Payload) Descriptor() ([]byte, []int) {
- return file_message_proto_rawDescGZIP(), []int{0}
-}
-
-func (x *Payload) GetStorage() string {
- if x != nil {
- return x.Storage
- }
- return ""
-}
-
-func (x *Payload) GetItems() []*Item {
- if x != nil {
- return x.Items
- }
- return nil
-}
-
-type Item struct {
- state protoimpl.MessageState
- sizeCache protoimpl.SizeCache
- unknownFields protoimpl.UnknownFields
-
- Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"`
- Value string `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"`
- // RFC 3339
- Timeout string `protobuf:"bytes,3,opt,name=timeout,proto3" json:"timeout,omitempty"`
-}
-
-func (x *Item) Reset() {
- *x = Item{}
- if protoimpl.UnsafeEnabled {
- mi := &file_message_proto_msgTypes[1]
- ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
- ms.StoreMessageInfo(mi)
- }
-}
-
-func (x *Item) String() string {
- return protoimpl.X.MessageStringOf(x)
-}
-
-func (*Item) ProtoMessage() {}
-
-func (x *Item) ProtoReflect() protoreflect.Message {
- mi := &file_message_proto_msgTypes[1]
- if protoimpl.UnsafeEnabled && x != nil {
- ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
- if ms.LoadMessageInfo() == nil {
- ms.StoreMessageInfo(mi)
- }
- return ms
- }
- return mi.MessageOf(x)
-}
-
-// Deprecated: Use Item.ProtoReflect.Descriptor instead.
-func (*Item) Descriptor() ([]byte, []int) {
- return file_message_proto_rawDescGZIP(), []int{1}
-}
-
-func (x *Item) GetKey() string {
- if x != nil {
- return x.Key
- }
- return ""
-}
-
-func (x *Item) GetValue() string {
- if x != nil {
- return x.Value
- }
- return ""
-}
-
-func (x *Item) GetTimeout() string {
- if x != nil {
- return x.Timeout
- }
- return ""
-}
-
-// KV response for the KV RPC methods
-type Response struct {
- state protoimpl.MessageState
- sizeCache protoimpl.SizeCache
- unknownFields protoimpl.UnknownFields
-
- Items []*Item `protobuf:"bytes,1,rep,name=items,proto3" json:"items,omitempty"`
-}
-
-func (x *Response) Reset() {
- *x = Response{}
- if protoimpl.UnsafeEnabled {
- mi := &file_message_proto_msgTypes[2]
- ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
- ms.StoreMessageInfo(mi)
- }
-}
-
-func (x *Response) String() string {
- return protoimpl.X.MessageStringOf(x)
-}
-
-func (*Response) ProtoMessage() {}
-
-func (x *Response) ProtoReflect() protoreflect.Message {
- mi := &file_message_proto_msgTypes[2]
- if protoimpl.UnsafeEnabled && x != nil {
- ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
- if ms.LoadMessageInfo() == nil {
- ms.StoreMessageInfo(mi)
- }
- return ms
- }
- return mi.MessageOf(x)
-}
-
-// Deprecated: Use Response.ProtoReflect.Descriptor instead.
-func (*Response) Descriptor() ([]byte, []int) {
- return file_message_proto_rawDescGZIP(), []int{2}
-}
-
-func (x *Response) GetItems() []*Item {
- if x != nil {
- return x.Items
- }
- return nil
-}
-
-var File_message_proto protoreflect.FileDescriptor
-
-var file_message_proto_rawDesc = []byte{
- 0x0a, 0x0d, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12,
- 0x05, 0x74, 0x65, 0x73, 0x74, 0x73, 0x22, 0x46, 0x0a, 0x07, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61,
- 0x64, 0x12, 0x18, 0x0a, 0x07, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01,
- 0x28, 0x09, 0x52, 0x07, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x12, 0x21, 0x0a, 0x05, 0x69,
- 0x74, 0x65, 0x6d, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0b, 0x2e, 0x74, 0x65, 0x73,
- 0x74, 0x73, 0x2e, 0x49, 0x74, 0x65, 0x6d, 0x52, 0x05, 0x69, 0x74, 0x65, 0x6d, 0x73, 0x22, 0x48,
- 0x0a, 0x04, 0x49, 0x74, 0x65, 0x6d, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20,
- 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75,
- 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x18,
- 0x0a, 0x07, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52,
- 0x07, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x22, 0x2d, 0x0a, 0x08, 0x52, 0x65, 0x73, 0x70,
- 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x21, 0x0a, 0x05, 0x69, 0x74, 0x65, 0x6d, 0x73, 0x18, 0x01, 0x20,
- 0x03, 0x28, 0x0b, 0x32, 0x0b, 0x2e, 0x74, 0x65, 0x73, 0x74, 0x73, 0x2e, 0x49, 0x74, 0x65, 0x6d,
- 0x52, 0x05, 0x69, 0x74, 0x65, 0x6d, 0x73, 0x42, 0x09, 0x5a, 0x07, 0x2e, 0x2f, 0x3b, 0x74, 0x65,
- 0x73, 0x74, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
-}
-
-var (
- file_message_proto_rawDescOnce sync.Once
- file_message_proto_rawDescData = file_message_proto_rawDesc
-)
-
-func file_message_proto_rawDescGZIP() []byte {
- file_message_proto_rawDescOnce.Do(func() {
- file_message_proto_rawDescData = protoimpl.X.CompressGZIP(file_message_proto_rawDescData)
- })
- return file_message_proto_rawDescData
-}
-
-var file_message_proto_msgTypes = make([]protoimpl.MessageInfo, 3)
-var file_message_proto_goTypes = []interface{}{
- (*Payload)(nil), // 0: tests.Payload
- (*Item)(nil), // 1: tests.Item
- (*Response)(nil), // 2: tests.Response
-}
-var file_message_proto_depIdxs = []int32{
- 1, // 0: tests.Payload.items:type_name -> tests.Item
- 1, // 1: tests.Response.items:type_name -> tests.Item
- 2, // [2:2] is the sub-list for method output_type
- 2, // [2:2] is the sub-list for method input_type
- 2, // [2:2] is the sub-list for extension type_name
- 2, // [2:2] is the sub-list for extension extendee
- 0, // [0:2] is the sub-list for field type_name
-}
-
-func init() { file_message_proto_init() }
-func file_message_proto_init() {
- if File_message_proto != nil {
- return
- }
- if !protoimpl.UnsafeEnabled {
- file_message_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
- switch v := v.(*Payload); i {
- case 0:
- return &v.state
- case 1:
- return &v.sizeCache
- case 2:
- return &v.unknownFields
- default:
- return nil
- }
- }
- file_message_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
- switch v := v.(*Item); i {
- case 0:
- return &v.state
- case 1:
- return &v.sizeCache
- case 2:
- return &v.unknownFields
- default:
- return nil
- }
- }
- file_message_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
- switch v := v.(*Response); i {
- case 0:
- return &v.state
- case 1:
- return &v.sizeCache
- case 2:
- return &v.unknownFields
- default:
- return nil
- }
- }
- }
- type x struct{}
- out := protoimpl.TypeBuilder{
- File: protoimpl.DescBuilder{
- GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
- RawDescriptor: file_message_proto_rawDesc,
- NumEnums: 0,
- NumMessages: 3,
- NumExtensions: 0,
- NumServices: 0,
- },
- GoTypes: file_message_proto_goTypes,
- DependencyIndexes: file_message_proto_depIdxs,
- MessageInfos: file_message_proto_msgTypes,
- }.Build()
- File_message_proto = out.File
- file_message_proto_rawDesc = nil
- file_message_proto_goTypes = nil
- file_message_proto_depIdxs = nil
-}
diff --git a/tests/message.proto b/tests/message.proto
deleted file mode 100644
index 1fc66c7..0000000
--- a/tests/message.proto
+++ /dev/null
@@ -1,22 +0,0 @@
-syntax = "proto3";
-
-package tests;
-option go_package = "./;test";
-
-message Payload {
- // could be an enum in the future
- string storage = 1;
- repeated Item items = 2;
-}
-
-message Item {
- string key = 1;
- string value = 2;
- // RFC 3339
- string timeout = 3;
-}
-
-// KV response for the KV RPC methods
-message Response {
- repeated Item items = 1;
-}
diff --git a/tests/php_test_files/composer.json b/tests/php_test_files/composer.json
deleted file mode 100644
index f082ea7..0000000
--- a/tests/php_test_files/composer.json
+++ /dev/null
@@ -1,5 +0,0 @@
-{
- "require": {
- "spiral/goridge": "^4.0"
- }
-}
diff --git a/tests/php_test_files/composer.lock b/tests/php_test_files/composer.lock
deleted file mode 100644
index 42337cd..0000000
--- a/tests/php_test_files/composer.lock
+++ /dev/null
@@ -1,155 +0,0 @@
-{
- "_readme": [
- "This file locks the dependencies of your project to a known state",
- "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies",
- "This file is @generated automatically"
- ],
- "content-hash": "bc305d74c8fd505987b069cf59894b45",
- "packages": [
- {
- "name": "spiral/goridge",
- "version": "4.2.1",
- "source": {
- "type": "git",
- "url": "https://github.com/roadrunner-php/goridge.git",
- "reference": "2a372118dac1f0c0511e2862f963ce649fefd9fa"
- },
- "dist": {
- "type": "zip",
- "url": "https://api.github.com/repos/roadrunner-php/goridge/zipball/2a372118dac1f0c0511e2862f963ce649fefd9fa",
- "reference": "2a372118dac1f0c0511e2862f963ce649fefd9fa",
- "shasum": ""
- },
- "require": {
- "ext-json": "*",
- "ext-sockets": "*",
- "php": ">=8.1",
- "spiral/roadrunner": "^2023 || ^2024.1 || ^2025.1"
- },
- "require-dev": {
- "google/protobuf": "^3.22 || ^4.0",
- "infection/infection": "^0.29.0",
- "jetbrains/phpstorm-attributes": "^1.0",
- "phpunit/phpunit": "^10.5.45",
- "rybakit/msgpack": "^0.7",
- "spiral/code-style": "*",
- "vimeo/psalm": "^6.0"
- },
- "suggest": {
- "ext-msgpack": "MessagePack codec support",
- "ext-protobuf": "Protobuf codec support",
- "google/protobuf": "(^3.0) Protobuf codec support",
- "rybakit/msgpack": "(^0.7) MessagePack codec support"
- },
- "type": "goridge",
- "autoload": {
- "psr-4": {
- "Spiral\\Goridge\\": "src"
- }
- },
- "notification-url": "https://packagist.org/downloads/",
- "license": [
- "MIT"
- ],
- "authors": [
- {
- "name": "Anton Titov (wolfy-j)",
- "email": "wolfy-j@spiralscout.com"
- },
- {
- "name": "Valery Piashchynski",
- "homepage": "https://github.com/rustatian"
- },
- {
- "name": "Aleksei Gagarin (roxblnfk)",
- "homepage": "https://github.com/roxblnfk"
- },
- {
- "name": "Pavel Buchnev (butschster)",
- "email": "pavel.buchnev@spiralscout.com"
- },
- {
- "name": "Maksim Smakouz (msmakouz)",
- "email": "maksim.smakouz@spiralscout.com"
- },
- {
- "name": "RoadRunner Community",
- "homepage": "https://github.com/roadrunner-server/roadrunner/graphs/contributors"
- }
- ],
- "description": "High-performance PHP-to-Golang RPC bridge",
- "homepage": "https://spiral.dev/",
- "support": {
- "chat": "https://discord.gg/V6EK4he",
- "docs": "https://docs.roadrunner.dev",
- "issues": "https://github.com/roadrunner-server/roadrunner/issues",
- "source": "https://github.com/roadrunner-php/goridge/tree/4.2.1"
- },
- "funding": [
- {
- "url": "https://github.com/sponsors/roadrunner-server",
- "type": "github"
- }
- ],
- "time": "2025-05-05T13:55:33+00:00"
- },
- {
- "name": "spiral/roadrunner",
- "version": "v2025.1.8",
- "source": {
- "type": "git",
- "url": "https://github.com/roadrunner-server/roadrunner.git",
- "reference": "e3b05713e8d65ad4d5104d839890cc4892d63b2d"
- },
- "dist": {
- "type": "zip",
- "url": "https://api.github.com/repos/roadrunner-server/roadrunner/zipball/e3b05713e8d65ad4d5104d839890cc4892d63b2d",
- "reference": "e3b05713e8d65ad4d5104d839890cc4892d63b2d",
- "shasum": ""
- },
- "type": "metapackage",
- "notification-url": "https://packagist.org/downloads/",
- "license": [
- "MIT"
- ],
- "authors": [
- {
- "name": "Anton Titov / Wolfy-J",
- "email": "wolfy.jd@gmail.com"
- },
- {
- "name": "Valery Piashchynski",
- "homepage": "https://github.com/rustatian"
- },
- {
- "name": "RoadRunner Community",
- "homepage": "https://github.com/roadrunner-server/roadrunner/graphs/contributors"
- }
- ],
- "description": "RoadRunner: High-performance PHP application server and process manager written in Go and powered with plugins",
- "homepage": "https://roadrunner.dev/",
- "support": {
- "chat": "https://discord.gg/V6EK4he",
- "docs": "https://docs.roadrunner.dev/",
- "issues": "https://github.com/roadrunner-server/roadrunner/issues",
- "source": "https://github.com/roadrunner-server/roadrunner/tree/v2025.1.8"
- },
- "funding": [
- {
- "url": "https://github.com/sponsors/roadrunner-server",
- "type": "github"
- }
- ],
- "time": "2026-02-19T14:32:44+00:00"
- }
- ],
- "packages-dev": [],
- "aliases": [],
- "minimum-stability": "stable",
- "stability-flags": {},
- "prefer-stable": false,
- "prefer-lowest": false,
- "platform": {},
- "platform-dev": {},
- "plugin-api-version": "2.9.0"
-}
diff --git a/tests/php_test_files/issue_185.php b/tests/php_test_files/issue_185.php
deleted file mode 100644
index 67a6053..0000000
--- a/tests/php_test_files/issue_185.php
+++ /dev/null
@@ -1,15 +0,0 @@
-call("App.Hi", "Antony");
\ No newline at end of file