This repository was archived by the owner on Apr 4, 2025. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathproducer.ts
96 lines (87 loc) · 2.37 KB
/
producer.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import { HttpClient } from "./http";
/**
* Optional parameters for each produced message
*/
export type ProduceOptions = {
/**
* The partition to produce to.
* Will be assigned by kafka if left empty.
*/
partition?: number;
/**
* The unix timestamp in seconds.
* Will be assigned by kafka if left empty.
*/
timestamp?: number;
/**
* Events with the same event key (e.g., a customer or vehicle ID) are written
* to the same partition, and Kafka guarantees that any consumer of a given
* topic-partition will always read that partition's events in exactly the
* same order as they were written.
*/
key?: string;
headers?: { key: string; value: string }[];
};
/**
* Request payload to produce a message to a topic.
*/
export type ProduceRequest = ProduceOptions & {
/**
* The topic where the message gets publish.
* Make sure this exists in upstash before. Otherwise it will throw an error.
*/
topic: string;
/**
* The message itself. This will be serialized using `JSON.stringify`
*/
value: unknown;
};
/**
* Response for each successfull message produced
*/
export type ProduceResponse = {
topic: string;
partition: number;
offset: number;
timestamp: number;
};
export class Producer {
private readonly client: HttpClient;
constructor(client: HttpClient) {
this.client = client;
}
/**
* Produce a single message to a single topic
*/
public async produce<TMessage>(
topic: string,
message: TMessage,
opts?: ProduceOptions,
): Promise<ProduceResponse> {
const request: ProduceRequest = {
topic,
value: typeof message === "string" ? message : JSON.stringify(message),
...opts,
};
const res = await this.client.post<ProduceResponse[]>({
path: ["produce"],
body: request,
});
return res[0];
}
/**
* Produce multiple messages to different topics at the same time
*
* Each entry in the response array belongs to the request with the same order in the requests.
*/
public async produceMany(requests: ProduceRequest[]): Promise<ProduceResponse[]> {
const transformedRequests = requests.map(({ value, ...rest }) => ({
...rest,
value: typeof value === "string" ? value : JSON.stringify(value),
}));
return await this.client.post<ProduceResponse[]>({
path: ["produce"],
body: transformedRequests,
});
}
}