Skip to content

Commit 616be90

Browse files
authored
Merge pull request #14 from francoispqt/update/add-examples
Add examples directory containing example of implementations
2 parents dd3bc12 + b38a7ba commit 616be90

File tree

4 files changed

+291
-0
lines changed

4 files changed

+291
-0
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package client
2+
3+
import (
4+
"github.com/francoispqt/gojay/examples/websocket/comm"
5+
"golang.org/x/net/websocket"
6+
)
7+
8+
type client struct {
9+
comm.SenderReceiver
10+
id int
11+
}
12+
13+
func NewClient(id int) *client {
14+
c := new(client)
15+
c.id = id
16+
return c
17+
}
18+
19+
func (c *client) Dial(url, origin string) error {
20+
conn, err := websocket.Dial(url, "", origin)
21+
if err != nil {
22+
return err
23+
}
24+
c.Conn = conn
25+
c.Init(10)
26+
return nil
27+
}

examples/websocket/comm/comm.go

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
package comm
2+
3+
import (
4+
"errors"
5+
"log"
6+
7+
"github.com/francoispqt/gojay"
8+
"golang.org/x/net/websocket"
9+
)
10+
11+
// A basic message for our WebSocket app
12+
13+
type Message struct {
14+
Message string
15+
UserName string
16+
}
17+
18+
func (m *Message) UnmarshalObject(dec *gojay.Decoder, k string) error {
19+
switch k {
20+
case "message":
21+
return dec.AddString(&m.Message)
22+
case "userName":
23+
return dec.AddString(&m.UserName)
24+
}
25+
return nil
26+
}
27+
func (m *Message) NKeys() int {
28+
return 2
29+
}
30+
31+
func (m *Message) MarshalObject(enc *gojay.Encoder) {
32+
enc.AddStringKey("message", m.Message)
33+
enc.AddStringKey("userName", m.UserName)
34+
}
35+
func (u *Message) IsNil() bool {
36+
return u == nil
37+
}
38+
39+
// Here are defined our communication types
40+
type Sender chan gojay.MarshalerObject
41+
42+
func (s Sender) MarshalStream(enc *gojay.StreamEncoder) {
43+
select {
44+
case <-enc.Done():
45+
return
46+
case m := <-s:
47+
enc.AddObject(m)
48+
}
49+
}
50+
51+
type Receiver chan *Message
52+
53+
func (s Receiver) UnmarshalStream(dec *gojay.StreamDecoder) error {
54+
m := &Message{}
55+
if err := dec.AddObject(m); err != nil {
56+
return err
57+
}
58+
s <- m
59+
return nil
60+
}
61+
62+
type SenderReceiver struct {
63+
Send Sender
64+
Receive Receiver
65+
Dec *gojay.StreamDecoder
66+
Enc *gojay.StreamEncoder
67+
Conn *websocket.Conn
68+
}
69+
70+
func (sc *SenderReceiver) SetReceiver() {
71+
sc.Receive = Receiver(make(chan *Message))
72+
sc.Dec = gojay.Stream.BorrowDecoder(sc.Conn)
73+
go sc.Dec.DecodeStream(sc.Receive)
74+
}
75+
76+
func (sc *SenderReceiver) SetSender(nCons int) {
77+
sc.Send = Sender(make(chan gojay.MarshalerObject))
78+
sc.Enc = gojay.Stream.BorrowEncoder(sc.Conn).NConsumer(nCons).LineDelimited()
79+
go sc.Enc.EncodeStream(sc.Send)
80+
}
81+
82+
func (sc *SenderReceiver) SendMessage(m gojay.MarshalerObject) error {
83+
select {
84+
case <-sc.Enc.Done():
85+
return errors.New("sender closed")
86+
case sc.Send <- m:
87+
log.Print("message sent by client: ", m)
88+
return nil
89+
}
90+
}
91+
92+
func (c *SenderReceiver) OnMessage(f func(*Message)) error {
93+
for {
94+
select {
95+
case <-c.Dec.Done():
96+
return errors.New("receiver closed")
97+
case m := <-c.Receive:
98+
f(m)
99+
}
100+
}
101+
}
102+
103+
func (sc *SenderReceiver) Init(sender int) *SenderReceiver {
104+
sc.SetSender(sender)
105+
sc.SetReceiver()
106+
return sc
107+
}

examples/websocket/main.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
// package main simulates a conversation between
2+
// a given set of websocket clients and a server.
3+
//
4+
// It spins up a web socket server.
5+
// On a client's connection it creates a SenderReceiver which handles JSON Stream
6+
// encoding and decoding using gojay's streaming API to abstract JSON communication
7+
// between server and client, only having to handle go values.
8+
//
9+
// To simulate a conversation:
10+
// - the server sends a welcome message to the client
11+
// - when the client receives the message, it sends a message back to the server
12+
// - when the server receives the ack message, it will send a message randomly to a client
13+
// - when the client receives the message, it sends a message back to the server... and so on.
14+
package main
15+
16+
import (
17+
"log"
18+
"strconv"
19+
20+
"github.com/francoispqt/gojay/examples/websocket/client"
21+
"github.com/francoispqt/gojay/examples/websocket/comm"
22+
"github.com/francoispqt/gojay/examples/websocket/server"
23+
)
24+
25+
func createServer(done chan error) {
26+
// create our server, with a done signal
27+
s := server.NewServer()
28+
// set our connection handler
29+
s.OnConnection(func(c *server.Client) {
30+
// send welcome message to initiate the conversation
31+
c.SendMessage(&comm.Message{
32+
UserName: "server",
33+
Message: "Welcome !",
34+
})
35+
// start handling messages
36+
c.OnMessage(func(m *comm.Message) {
37+
log.Print("message received from client: ", m)
38+
s.BroadCastRandom(c, m)
39+
})
40+
})
41+
go s.Listen(":8070", done)
42+
}
43+
44+
func createClient(url, origin string, i int) {
45+
// create our client
46+
c := client.NewClient(i)
47+
// Dial connection to the WS server
48+
err := c.Dial(url, origin)
49+
if err != nil {
50+
panic(err)
51+
}
52+
str := strconv.Itoa(i)
53+
// Init client's sender and receiver
54+
// Set the OnMessage handler
55+
c.OnMessage(func(m *comm.Message) {
56+
log.Print("client "+str+" received from "+m.UserName+" message: ", m)
57+
c.SendMessage(&comm.Message{
58+
UserName: str,
59+
Message: "Responding to: " + m.UserName + " | old message: " + m.Message,
60+
})
61+
})
62+
}
63+
64+
// Our main function
65+
func main() {
66+
done := make(chan error)
67+
createServer(done)
68+
// add our clients connection
69+
for i := 0; i < 100; i++ {
70+
i := i
71+
go createClient("ws://localhost:8070/ws", "http://localhost/", i)
72+
}
73+
// handle server's termination
74+
log.Fatal(<-done)
75+
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package server
2+
3+
import (
4+
"log"
5+
"math/rand"
6+
"net/http"
7+
"sync"
8+
"time"
9+
10+
"github.com/francoispqt/gojay/examples/websocket/comm"
11+
"golang.org/x/net/websocket"
12+
)
13+
14+
type server struct {
15+
clients []*Client
16+
mux *sync.RWMutex
17+
handle func(c *Client)
18+
}
19+
20+
type Client struct {
21+
comm.SenderReceiver
22+
server *server
23+
}
24+
25+
func NewClient(s *server, conn *websocket.Conn) *Client {
26+
sC := new(Client)
27+
sC.Conn = conn
28+
sC.server = s
29+
return sC
30+
}
31+
32+
func NewServer() *server {
33+
s := new(server)
34+
s.mux = new(sync.RWMutex)
35+
s.clients = make([]*Client, 0, 100)
36+
return s
37+
}
38+
39+
func (c *Client) Close() {
40+
c.Conn.Close()
41+
}
42+
43+
func (s *server) Handle(conn *websocket.Conn) {
44+
defer func() {
45+
err := conn.Close()
46+
if err != nil {
47+
log.Fatal(err)
48+
}
49+
}()
50+
c := NewClient(s, conn)
51+
// add our server client to the list of clients
52+
s.mux.Lock()
53+
s.clients = append(s.clients, c)
54+
s.mux.Unlock()
55+
// init Client's sender and receiver
56+
c.Init(10)
57+
s.handle(c)
58+
// block until reader is done
59+
<-c.Dec.Done()
60+
}
61+
62+
func (s *server) Listen(port string, done chan error) {
63+
http.Handle("/ws", websocket.Handler(s.Handle))
64+
done <- http.ListenAndServe(port, nil)
65+
}
66+
67+
func (s *server) OnConnection(h func(c *Client)) {
68+
s.handle = h
69+
}
70+
71+
func random(min, max int) int {
72+
rand.Seed(time.Now().Unix())
73+
return rand.Intn(max-min) + min
74+
}
75+
76+
func (s *server) BroadCastRandom(sC *Client, m *comm.Message) {
77+
m.Message = "Random message"
78+
s.mux.RLock()
79+
r := random(0, len(s.clients))
80+
s.clients[r].SendMessage(m)
81+
s.mux.RUnlock()
82+
}

0 commit comments

Comments
 (0)