Skip to content

Commit 8ad6839

Browse files
committed
Support forwarding options to Manifold.send/2
1 parent f43c5be commit 8ad6839

File tree

4 files changed

+327
-8
lines changed

4 files changed

+327
-8
lines changed

README.md

Lines changed: 114 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,121 @@ pub fn broadcast(pids: List(process.Pid), message: String) {
5151
}
5252
```
5353

54-
## Current Limitations
54+
### Advanced Options
5555

56-
**No options parameter support**: The Elixir Manifold library supports an options parameter for tuning performance characteristics (like partition size). This Gleam wrapper doesn't currently expose these options, though this could be added in the future.
56+
The library supports the same options as the Elixir Manifold library for tuning performance:
57+
58+
#### Pack Modes
59+
60+
Control how messages are serialized before sending:
61+
62+
```gleam
63+
import gleam_manifold as manifold
64+
65+
pub fn send_with_packing() {
66+
let subject = manifold.new_subject()
67+
let pid = process.self()
68+
69+
// Binary packing - efficient for large messages sent to many processes
70+
manifold.send_with_options(
71+
pid,
72+
subject,
73+
large_data,
74+
[manifold.PackModeOption(manifold.Binary)]
75+
)
76+
77+
// ETF (Erlang Term Format) - default behavior
78+
manifold.send_with_options(
79+
pid,
80+
subject,
81+
data,
82+
[manifold.PackModeOption(manifold.Etf)]
83+
)
84+
85+
// No packing
86+
manifold.send_with_options(
87+
pid,
88+
subject,
89+
data,
90+
[manifold.PackModeOption(manifold.NoPacking)]
91+
)
92+
}
93+
```
94+
95+
#### Send Modes
96+
97+
Control how messages are delivered:
98+
99+
```gleam
100+
import gleam_manifold as manifold
101+
102+
pub fn send_with_offload() {
103+
let subject = manifold.new_subject()
104+
let pids = get_many_pids()
105+
106+
// Offload mode - non-blocking, routes through sender processes
107+
manifold.send_multi_with_options(
108+
pids,
109+
subject,
110+
message,
111+
[manifold.SendModeOption(manifold.Offload)]
112+
)
113+
114+
// Direct mode (default) - sends directly
115+
manifold.send_multi_with_options(
116+
pids,
117+
subject,
118+
message,
119+
[manifold.SendModeOption(manifold.Direct)]
120+
)
121+
}
122+
```
123+
124+
#### Combining Options
125+
126+
You can combine multiple options for fine-tuned control:
127+
128+
```gleam
129+
pub fn optimized_broadcast(pids: List(process.Pid), message: String) {
130+
let subject = manifold.new_subject()
131+
132+
// Use binary packing with offload mode for optimal performance
133+
manifold.send_multi_with_options(
134+
pids,
135+
subject,
136+
message,
137+
[
138+
manifold.PackModeOption(manifold.Binary),
139+
manifold.SendModeOption(manifold.Offload)
140+
]
141+
)
142+
}
143+
```
144+
145+
### Partitioner and Sender Keys
146+
147+
Control load distribution across Manifold's internal processes:
148+
149+
```gleam
150+
import gleam_manifold as manifold
151+
152+
pub fn with_custom_routing() {
153+
// Set a custom partitioner key for consistent routing
154+
manifold.set_partitioner_key("user_123")
155+
156+
// Set a custom sender key for offloaded messages
157+
manifold.set_sender_key("channel_456")
158+
159+
// Messages will be routed based on these keys
160+
manifold.send(pid, subject, message)
161+
}
162+
```
163+
164+
This is useful for:
165+
166+
- Ensuring message ordering for specific entities
167+
- Load balancing across partitioner processes
168+
- Preventing hot spots in message distribution
57169

58170
## Testing
59171

src/gleam_manifold.gleam

Lines changed: 127 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,15 @@
22
////
33
//// More information about Manifold can be found on its
44
//// [GitHub page](https://github.com/discord/manifold).
5-
////
5+
////
66
//// Unlike Gleam's erlang module, Subjects in gleam_manifold
77
//// are not bound to a specific process. They're merely a unique
88
//// tag which provides runtime guarantees about type-safe
99
//// message delivery.
1010

1111
import gleam/erlang/process
1212
import gleam/erlang/reference
13+
import gleam/list
1314

1415
pub opaque type Subject(message) {
1516
Subject(tag: reference.Reference)
@@ -18,13 +19,43 @@ pub opaque type Subject(message) {
1819
type Message(message) =
1920
#(reference.Reference, message)
2021

22+
// Type aliases for Erlang interop
23+
type Atom
24+
25+
type DoNotLeak
26+
27+
/// Pack mode determines how messages are serialized before sending
28+
pub type PackMode {
29+
/// Pack messages as binary using term_to_binary
30+
Binary
31+
/// Use Erlang Term Format (default)
32+
Etf
33+
/// No special packing
34+
NoPacking
35+
}
36+
37+
/// Send mode determines how messages are sent
38+
pub type SendMode {
39+
/// Route messages through a sender process (non-blocking)
40+
Offload
41+
/// Send directly (default)
42+
Direct
43+
}
44+
45+
/// Options for configuring message sending behavior
46+
pub type ManifoldOption {
47+
PackModeOption(PackMode)
48+
SendModeOption(SendMode)
49+
}
50+
2151
/// Create a new Manifold subject for sending and receiving messages
2252
/// of the specified type.
2353
pub fn new_subject() -> Subject(message) {
2454
Subject(reference.new())
2555
}
2656

27-
/// You almost certainly do not want to use this.
57+
/// Send a message to a process using the default settings.
58+
/// You almost certainly do not want to use this.
2859
/// Absolutely prefer to use the type-safe variant of
2960
/// this function `manifold.send(subject, message)`
3061
pub fn send(
@@ -36,9 +67,22 @@ pub fn send(
3667
Nil
3768
}
3869

70+
/// Send a message to a process with custom options.
71+
/// This allows you to control packing and send mode.
72+
pub fn send_with_options(
73+
pid: process.Pid,
74+
subject: Subject(message),
75+
message: message,
76+
options: List(ManifoldOption),
77+
) -> Nil {
78+
let opts = options_to_keyword_list(options)
79+
manifold_send_with_options(pid, #(subject.tag, message), opts)
80+
Nil
81+
}
82+
3983
/// Call Manifold.send with a list of pids as the first argument, as
4084
/// is supported in Manifold
41-
///
85+
///
4286
/// Sending multi does not support sending to subjects, because
4387
/// subjects themselves EACH have a unique tag along with them. The
4488
/// subject expects the tag to be included in the message itself, so
@@ -55,19 +99,97 @@ pub fn send_multi(
5599
Nil
56100
}
57101

102+
/// Send a message to multiple processes with custom options.
103+
/// This allows you to control packing and send mode.
104+
pub fn send_multi_with_options(
105+
pids: List(process.Pid),
106+
subject: Subject(message),
107+
message: message,
108+
options: List(ManifoldOption),
109+
) -> Nil {
110+
let opts = options_to_keyword_list(options)
111+
manifold_send_multi_with_options(pids, #(subject.tag, message), opts)
112+
Nil
113+
}
114+
58115
@external(erlang, "gleam_manifold_ffi", "receive")
59116
pub fn receive(subject: Subject(message), timeout: Int) -> Result(message, Nil)
60117

61118
@external(erlang, "gleam_manifold_ffi", "receive")
62119
pub fn receive_forever(subject: Subject(message)) -> message
63120

64-
type DoNotLeak
121+
/// Set a custom partitioner key for the current process.
122+
/// This controls which partitioner process will handle your messages,
123+
/// useful for load balancing and ensuring message ordering.
124+
pub fn set_partitioner_key(key: String) -> Nil {
125+
do_set_partitioner_key(key)
126+
Nil
127+
}
128+
129+
/// Set a custom sender key for the current process.
130+
/// This controls which sender process will handle offloaded messages.
131+
pub fn set_sender_key(key: String) -> Nil {
132+
do_set_sender_key(key)
133+
Nil
134+
}
135+
136+
fn options_to_keyword_list(options: List(ManifoldOption)) -> List(#(Atom, Term)) {
137+
list.map(options, fn(opt) {
138+
case opt {
139+
PackModeOption(Binary) -> #(
140+
create_atom("pack_mode"),
141+
dynamic_atom(create_atom("binary")),
142+
)
143+
PackModeOption(Etf) -> #(
144+
create_atom("pack_mode"),
145+
dynamic_atom(create_atom("etf")),
146+
)
147+
PackModeOption(NoPacking) -> #(create_atom("pack_mode"), dynamic_nil())
148+
SendModeOption(Offload) -> #(
149+
create_atom("send_mode"),
150+
dynamic_atom(create_atom("offload")),
151+
)
152+
SendModeOption(Direct) -> #(create_atom("send_mode"), dynamic_nil())
153+
}
154+
})
155+
}
156+
157+
type Term
158+
159+
@external(erlang, "gleam_manifold_ffi", "create_atom")
160+
fn create_atom(name: String) -> Atom
161+
162+
@external(erlang, "gleam_manifold_ffi", "dynamic_atom")
163+
fn dynamic_atom(atom: Atom) -> Term
164+
165+
@external(erlang, "gleam_manifold_ffi", "dynamic_nil")
166+
fn dynamic_nil() -> Term
65167

66168
@external(erlang, "Elixir.Manifold", "send")
67169
fn manifold_send(pid: process.Pid, message: Message(message)) -> DoNotLeak
68170

69171
@external(erlang, "Elixir.Manifold", "send")
70172
fn manifold_send_multi(
71-
pid: List(process.Pid),
173+
pids: List(process.Pid),
72174
message: Message(message),
73175
) -> DoNotLeak
176+
177+
@external(erlang, "Elixir.Manifold", "send")
178+
fn manifold_send_with_options(
179+
pid: process.Pid,
180+
message: Message(message),
181+
options: List(#(Atom, Term)),
182+
) -> DoNotLeak
183+
184+
@external(erlang, "Elixir.Manifold", "send")
185+
fn manifold_send_multi_with_options(
186+
pids: List(process.Pid),
187+
message: Message(message),
188+
options: List(#(Atom, Term)),
189+
) -> DoNotLeak
190+
191+
@external(erlang, "Elixir.Manifold", "set_partitioner_key")
192+
fn do_set_partitioner_key(key: String) -> DoNotLeak
193+
194+
@external(erlang, "Elixir.Manifold", "set_sender_key")
195+
fn do_set_sender_key(key: String) -> DoNotLeak

src/gleam_manifold_ffi.erl

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
-module(gleam_manifold_ffi).
2-
-export(['receive'/1, 'receive'/2]).
2+
-export(['receive'/1, 'receive'/2, create_atom/1, dynamic_atom/1, dynamic_nil/0]).
33

44
'receive'({subject, Ref}) ->
55
receive
@@ -12,3 +12,12 @@
1212
after Timeout ->
1313
{error, nil}
1414
end.
15+
16+
create_atom(Binary) when is_binary(Binary) ->
17+
binary_to_atom(Binary, utf8).
18+
19+
dynamic_atom(Atom) when is_atom(Atom) ->
20+
Atom.
21+
22+
dynamic_nil() ->
23+
nil.

0 commit comments

Comments
 (0)