1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214 |
x2
x2
x2
x2
x2
x2
x2
x6
x2
x2
x10
x2
x2
x8
x6
x2
x2
x2
x2
x3
x3
|
|
import type { Async, AsyncIter, Log, Reader } from '../../index.ts';
import { Connect, Listen } from '../../context.ts';
import { UTF8, Bytes, Fn, Name, Pipe, Flag,
byteParse, readBytes, readUntilDone, write, merged } from '../../format.ts';
/** A ZeroMQ connection. */
export type Conn = (AsyncIter<Frame> & ConnOpts) | { socket?: unknown };
/** Options for creating a ZeroMQ connection. */
export type ConnOpts = Log & {
close (): void, readable: ReadableStream, writable: WritableStream, };
/** ZeroMQ protocol version. */
export type Ver = { maj: number, min: number };
/** Known ZeroMQ security mechanisms. */
export type Sec = 'NULL'|'PLAIN'|'CURVE';
/** Known ZeroMQ command strings. */
export type Cmd = 'READY'|string;
/** ZeroMQ publisher (server listener). */
export type Pub = Conn & {
mode: 'PUB'; close: Fn<[]>; send (...msgs: Bytes[]): Promise<void>; };
/** ZeroMQ subscriber (client connection). */
export type Sub = Conn & {
mode: 'SUB'; subscribe (topic: string): Promise<void>; receive (): Promise<Frame[]>; };
/** Create ZeroMQ publisher. */
export const Pub = merged(function zmqPub (at: number|string|URL, handler: Fn<[unknown], Async<{ close: Fn }>>) {
return Name(`ZMQ PUB ${at}`, async function zmqPublisher (_: unknown): Promise<Pub> {
let stopped = false;
const send = (..._: unknown[]) => { throw new Error('zmq pub send: not implemented') };
const close = () => { stopped = true; (socket as any).close(); };
const socket = await Listen(at, async connection => {
connection.on('error', error => { throw error });
await Pub.shake(connection);
return handler(connection);
});
return { mode: 'PUB', socket, send, close };
}, { at, handler })
}, {
shake: Name('ZMQ PUB shake', async (connection) => {
await writeCb(connection, Hello({ pub: true }));
await connection.read();
await connection.read();
await writeCb(connection, Frame.ready());
})
});
/** Create ZeroMQ subscriber. */
export const Sub = merged(async function zmqSub (to: number|string|URL, handler: Fn<[Sub]>) {
return Name(`ZMQ SUB ${to}`, async function zmqSubscriber (
_: unknown
): Promise<Sub> {
const socket = await Connect(to);
await Sub.shake(socket);
const subscribe = async (topicName: string) => {
const topic = UTF8.encode(topicName) as Uint8Array;
const payload = new Uint8Array(topic.length + 1);
payload[0] = 0x01;
payload.set(topic, 1);
await writeCb(socket, zmqFrame(payload));
};
const receive = async () => {
const res = [];
for (let f: Frame; (f = await (Frame as any).read(socket))?.more; f.size > 0) {
res.push(zmqFrame(f));
}
return res;
};
const close = () => { console.log(socket); socket.end() };
return { mode: 'SUB', socket, subscribe, receive, close };
}, { to, handler });
}, {
shake: Name('ZMQ SUB shake', async (socket) => {
await writeCb(socket, Hello({ pub: false }));
await socket.read();
await writeCb(socket, Frame.ready());
await socket.read();
})
});
/** ZeroMQ greeting options. */
export type Hello = { sig: Bytes, sec: Sec, ver: Ver, pub: boolean };
export const Hello = merged(zmqHello, {
size: 64,
read: Name('Hello', Pipe(readBytes({ max: 64 }), zmqHello)),
write: Name('Hello', Fn(Pipe(zmqHello, write))),
});
function zmqHello (input?: Bytes): Hello & Bytes;
function zmqHello (input?: Partial<Hello>): Hello & Bytes;
function zmqHello (input?: unknown): Hello & Bytes {
if (input && input[0]) {
const bytes = Bytes(input);
const secBytes = bytes.subarray(12, 12+6);
const sec = UTF8.decode(secBytes.slice(0, secBytes.indexOf(0)||Infinity));
const sig = bytes.subarray(0, 10);
const ver = { maj: bytes[10] & 0xFF, min: bytes[11] & 0xFF };
const pub = bytes[32] === 0x01;
return merged(bytes, { sig, sec, ver, pub }) as Hello & Bytes;
}
const bytes = new Uint8Array(Hello.size);
const { pub = false, sec = 'NULL', ver: { maj = 3, min = 0 } = {} } = input as Partial<Hello> || {};
const secBytes = UTF8.encode(sec);
for (const [byte, value] of Object.entries({
0x00: 0xFF,
0x08: 0x01,
0x09: 0x7F,
0x0a: maj & 0xFF,
0x0b: min & 0xFF,
0x0c: secBytes[0] ?? 0,
0x0d: secBytes[1] ?? 0,
0x0e: secBytes[2] ?? 0,
0x0f: secBytes[3] ?? 0,
0x10: secBytes[4] ?? 0,
0x11: secBytes[5] ?? 0,
0x32: pub ? 0x01: 0x00,
})) bytes[byte] = value;
const props = { pub, sec, ver: { maj, min } };
return merged(bytes, input||{}, props) as Hello & Bytes;
}
/** A ZeroMQ frame packet. */
export type Frame = Flags & { size: number, command?: Cmd, offset?: number, payload?: Bytes };
/** Known ZeroMQ frame flags. */
export type Flags = { more: Flag, long: Flag, cmd: Flag, };
export const Flags = { cmd: Flag('CMD', 2), long: Flag('LONG', 1), more: Flag('MORE', 0), };
export const Frame = merged(zmqFrame, {
read: Pipe(readUntilDone, zmqFrame),
write: (frame: Frame) => w => w.write(zmqFrame(frame)),
empty: new Uint8Array([1, 0]),
payload: (frame: Frame & Bytes, offset = 1): Uint8Array => {
const { buf, u8, u64 } = byteParse(frame);
const n = frame.long ? Number(u64(offset)) : u8(offset);
offset += frame.long ? 4 : 1;
return buf(n, offset);
},
ready: merged(Fn(zmqFrame, { command: 'READY' }), {
id: 'READY',
read: Name('ZMQ>ready', Pipe(readUntilDone, zmqFrame, zmqExpectCmd('READY'))),
write: Name('ZMQ<ready', writable => writable.write(Frame.ready())),
})
});
export function zmqFrame (input?: Bytes): Frame & Bytes;
export function zmqFrame (input?: Partial<Frame>): Frame & Bytes;
export function zmqFrame (input?: unknown): Frame & Bytes {
if (input && typeof input === 'object' && Symbol.iterator in input) {
if ((input as []).length === 0) throw new Error('empty frame');
const bytes = Bytes(input);
const long = Flags.long(bytes);
if (long) throw new Error("long frames not supported yet");
const size = Number(byteParse(bytes)[long ? 'u64' : 'u8'](1));
const more = Flags.more(bytes);
const cmd = Flags.cmd(bytes);
merged(bytes, { size, more, long, cmd });
if (cmd) {
const command = UTF8.decode(bytes.subarray(3, 3 + bytes[2]));
merged(bytes, { command });
}
return input as Frame & Bytes
}
const { more = false, command = null as Cmd|null, ...rest } =
(input || {}) as Partial<Frame>;
let flag = 0;
if (more) flag |= Flags.more.mask;
if (command) flag |= Flags.cmd.mask;
const length = 1 + (command?.length ?? 0);
//if (payloadLength > 0xFF) flag |= Flags.long.mask;
return merged(new Uint8Array([
Math.min(255, flag),
Math.min(255, length),
...command ? [
Math.min(255, command.length),
...UTF8.encode(command),
] : []
]), rest, {
flag, more, command,
long: false, //long: payloadLength > 0xFF,
//metadata, metadataLength, payload, payloadLength,
}) as Frame & Bytes;
/** Write frame payload. */
///** Send messages over pubsub channel. */
//async function zmqPubSend (write: Write, ...msgs: Bytes[]): Promise<void> {
//await write(zmqEmptyMore);
//if (msgs.length > 0) {
//const b = [];
//for (let i = 0; i < msgs.length - 1; i++) b.push(zmqFrame(msgs[i], { more: true }));
//b.push(zmqFrame(msgs[msgs.length - 1]));
//const output = new Uint8Array(b.reduce((l,b)=>l+b.length, 0));
//let i = 0; for (const c of b) for (const d of c) output[i++] = d;
//await write(output);
//}
//}
}
function zmqExpectCmd (id: Cmd) {
return (frame: Frame) => {
if (!frame.command) throw new Error(`not a command frame`);
if (frame.command !== id) throw new Error(`command not ${id} but ${frame.command}`)
return frame;
}
}
/** FIXME: WTF */
const writeCb = (writable, data): Promise<void> =>
new Promise((resolve, reject)=>{
writable.once('error', error);
try {
writable.write(data, () => { resolve() });
writable.off('error', error);
} catch (e) {
reject(e)
writable.off('error', error);
}
function error (e: unknown) {
reject(e);
writable.off('error', error);
};
});
|