All files / format / stream.ts

100.00% Branches 0/0
4.55% Lines 3/66
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
 
x2
x2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
x8
 
 
 
 
 
 
 
 
 
 
 
 
 


























































































import type { Bytes, Step } from '../index.ts';
import { Name, Pipe, identity } from './function.ts';
import { byteConcat } from './byte.ts';

/** Connection to `Read & Write` pair. */
export type RW<T = Bytes> = Reader<T> & Writer<T> & { close?: () => unknown };
/** Convert owner of `readable & writable` to `Read & Write` pair. */
export function toRW <T> (state): RW<T> {
  const {
    name = null,
    close,
    readable,
    onRead = identity,
    writable,
    onWrite = identity,
  } = state;
  return Object.assign(state, {
    read:  toRead({ name, readable },  onRead)  as Read<T>,
    write: toWrite({ name, writable }, onWrite) as Write<T>,
    close,
  });
};

/** Input stream reader. */
export type Reader<T = Bytes> = { read: Read<T> };
/** Read from stream. */
export type Read<T = Bytes> = () => Promise<{ done: boolean, value: T }>;
/** Convert the owner of a `readable` to a `Read` function. */
export function toRead <T> ({ name = null, readable }, ...steps: Step<T>[]): Read<T> {
  const pipeline = Pipe(...steps);
  const reader = readable.getReader();
  return Name(name ? `${name}>` : 'read', async function read () {
    return pipeline(await reader.read()) as { done: boolean, value: T };
  }, { name, readable, reader, pipeline });
};

/** Output stream writer. */
export type Writer<T = Bytes> = { write: Write<T> };
/** Write to stream. */
export type Write<T = Bytes> = (_: T) => Promise<void>;
/** Convert the owner of a `writable` to a `Write` function. */
export function toWrite <T> ({ name = null, writable }, ...steps: Step<T>[]): Write<T> {
  const pipeline = Pipe(...steps);
  const writer = writable.getWriter();
  return Name(name ? `${name}<` : 'write', function write (
    ...args: Parameters<typeof writer["write"]>
  ) {
    return writer.write(pipeline(...args));
  }, { name, writable, writer, pipeline });
};

/** Specify write target. */
export const writeTo = <T>(output: Writer<T>, ...prefix: T[]) =>
  async (...data: T[]) => {
    for (const datum of prefix) await output.write(datum);
    for (const datum of data) await output.write(datum);
  };

/** Specify write operation. */
export const write = <T>(...data: T[]) =>
  async (writer: Writer<T>) => {
    for (const datum of data) await writer.write(datum);
    return writer
  }

/** Read between `min` and `max` bytes. */
export const readBytes = ({ min = 0, max = 256 } = {}) =>
  Name(`read between ${min} and ${max} bytes`,
    async function readSomeBytes (reader: Reader<Bytes>) {
      let total = 0;
      const chunks = []
      while (true) {
        const { done, value } = await reader.read() || { done: true };
        chunks.push(value);
        total += value?.length ?? 0;
        if (done || (total >= max)) break;
      }
      return byteConcat(chunks)
    }, { min, max });

/** Read whole stream. */
export async function readUntilDone (reader: Reader<Bytes>): Promise<Bytes|null> {
  const chunks = [];
  while (true) {
    const { done, value } = await reader.read() || { done: true };
    chunks.push(value);
    if (done) break;
  }
  if (chunks.length === 0) return null
  const result = byteConcat(chunks)
  return result
}