admin管理员组文章数量:1024593
below is my file upload function with the respective files with the router initialization, I am trying to stream write a file in this code, I am still relatively new to the concepts of streaming in nodejs and uWebsocket at this low level considering I've used express for most of my time with nodejs development. To manage backpressure I've added some conditions which would pause the res(uWs.HttpResponse) from further sending the data & once the data is drained from the writeStream I resume the response, what I don't understand is the consoles that I am getting immediately the first console console.log("Backpressure detected, pausing response.") which is strange for me because at the start it the writeStream should be empty? furthermore the processing pauses infinitely, after some tries at Backpressure detected, pausing response and no other console is logged it does not resume the response or drain the file for some reason? even the file does not finish writing & that console is also not logged it just pauses, is there something I may be doing wrong? or something related to uWs.HttpResponse?
I have tried uploading the file using postman & curl, it's the same in both, on the other hand if I use the nodejs:http module for this streaming this whole implementation with pausing/resuming works without an issue.
Would really appreciate any help, thanks :)
funcs/index.ts // File Upload Function
function streamFileUpload(res: HttpResponse, req: HttpRequest) {
const filename = `upload_${Date.now()}.bin`;
const filepath = path.join(__dirname, "../uploads", filename);
const writeStream = fs.createWriteStream("testfile.txt");
let isPaused = false;
writeStream.on("drain", () => {
if (isPaused) {
console.log("Draining complete, resuming response.");
isPaused = false;
res.resume();
}
});
writeStream.on("finish", () => {
console.log("File upload complete.");
res.writeStatus("200 OK").end(JSON.stringify({ success: true, filepath }));
});
writeStream.on("error", (err) => {
console.error("Error writing to file:", err.message);
res
.writeStatus("500 Internal Server Error")
.end(JSON.stringify({ error: "File upload failed" }));
});
res.onData((ab: any, isLast: any) => {
const chunk = Buffer.from(ab);
const backpressure = writeStream.write(chunk);
if (!backpressure) {
console.log("Backpressure detected, pausing response.");
isPaused = true;
res.pause();
}
if (isLast) {
console.log("Received last chunk, ending file stream.");
writeStream.end();
}
});
res.onAborted(() => {
console.error("Request aborted by the client.");
writeStream.destroy();
});
}
router.ts // Router that calls the function
type UWSApp = ReturnType<typeof uWs.App>;
export default (app: UWSApp) => {
app.post("/api/upload", (res, req) => {
setCORSHeaders(res);
streamFileUpload(res, req);
});
};
app.ts
const app = uWs.App();
// registering the routes
registerUserRoutes(app);
indexRoutes(app);
app
.ws("/*", {
compression: uWs.SHARED_COMPRESSOR,
maxPayloadLength: 16 * 1024 * 1024,
idleTimeout: 10,
open: (ws) => {
console.log("A WebSocket connected!");
ws.send(JSON.stringify({ message: "thanks for connecting" }));
},
message: (ws, message, isBinary) => {
let data = Buffer.from(message).toString();
console.log("Received data:", data);
redisPublisher.publish("chatroom", data);
messageFunction(ws as any, message);
},
drain: (ws) => {
console.log("WebSocket backpressure: " + ws.getBufferedAmount());
},
close: (ws, code, message) => {
console.log("WebSocket closed");
},
})
.get("/test", (res: HttpResponse, req: HttpRequest) => {
console.log("testing this route");
// res.writeHeader();
// res.
res.onAborted(() => {
res.close();
});
});
(async () => {
try {
const result = await db.execute(`SELECT NOW()`);
console.log("Postgres & Server is up & running");
app.listen(3200, (listenSocket) => {
if (listenSocket) console.log("listening on port 3200");
});
} catch (error) {
console.error("Failed to connect to the database:", error);
}
await redisClient.connect();
await redisPublisher.connect();
await redisClient.subscribe("chatroom", (message: string) => {
const parsed: MessagePayload = JSON.parse(message);
console.log("publishing message to chat room=>", parsed.chatRoomId);
console.log("parsed message", parsed.message);
app.publish(parsed.chatRoomId, JSON.stringify(parsed.message));
});
})();
below is my file upload function with the respective files with the router initialization, I am trying to stream write a file in this code, I am still relatively new to the concepts of streaming in nodejs and uWebsocket at this low level considering I've used express for most of my time with nodejs development. To manage backpressure I've added some conditions which would pause the res(uWs.HttpResponse) from further sending the data & once the data is drained from the writeStream I resume the response, what I don't understand is the consoles that I am getting immediately the first console console.log("Backpressure detected, pausing response.") which is strange for me because at the start it the writeStream should be empty? furthermore the processing pauses infinitely, after some tries at Backpressure detected, pausing response and no other console is logged it does not resume the response or drain the file for some reason? even the file does not finish writing & that console is also not logged it just pauses, is there something I may be doing wrong? or something related to uWs.HttpResponse?
I have tried uploading the file using postman & curl, it's the same in both, on the other hand if I use the nodejs:http module for this streaming this whole implementation with pausing/resuming works without an issue.
Would really appreciate any help, thanks :)
funcs/index.ts // File Upload Function
function streamFileUpload(res: HttpResponse, req: HttpRequest) {
const filename = `upload_${Date.now()}.bin`;
const filepath = path.join(__dirname, "../uploads", filename);
const writeStream = fs.createWriteStream("testfile.txt");
let isPaused = false;
writeStream.on("drain", () => {
if (isPaused) {
console.log("Draining complete, resuming response.");
isPaused = false;
res.resume();
}
});
writeStream.on("finish", () => {
console.log("File upload complete.");
res.writeStatus("200 OK").end(JSON.stringify({ success: true, filepath }));
});
writeStream.on("error", (err) => {
console.error("Error writing to file:", err.message);
res
.writeStatus("500 Internal Server Error")
.end(JSON.stringify({ error: "File upload failed" }));
});
res.onData((ab: any, isLast: any) => {
const chunk = Buffer.from(ab);
const backpressure = writeStream.write(chunk);
if (!backpressure) {
console.log("Backpressure detected, pausing response.");
isPaused = true;
res.pause();
}
if (isLast) {
console.log("Received last chunk, ending file stream.");
writeStream.end();
}
});
res.onAborted(() => {
console.error("Request aborted by the client.");
writeStream.destroy();
});
}
router.ts // Router that calls the function
type UWSApp = ReturnType<typeof uWs.App>;
export default (app: UWSApp) => {
app.post("/api/upload", (res, req) => {
setCORSHeaders(res);
streamFileUpload(res, req);
});
};
app.ts
const app = uWs.App();
// registering the routes
registerUserRoutes(app);
indexRoutes(app);
app
.ws("/*", {
compression: uWs.SHARED_COMPRESSOR,
maxPayloadLength: 16 * 1024 * 1024,
idleTimeout: 10,
open: (ws) => {
console.log("A WebSocket connected!");
ws.send(JSON.stringify({ message: "thanks for connecting" }));
},
message: (ws, message, isBinary) => {
let data = Buffer.from(message).toString();
console.log("Received data:", data);
redisPublisher.publish("chatroom", data);
messageFunction(ws as any, message);
},
drain: (ws) => {
console.log("WebSocket backpressure: " + ws.getBufferedAmount());
},
close: (ws, code, message) => {
console.log("WebSocket closed");
},
})
.get("/test", (res: HttpResponse, req: HttpRequest) => {
console.log("testing this route");
// res.writeHeader();
// res.
res.onAborted(() => {
res.close();
});
});
(async () => {
try {
const result = await db.execute(`SELECT NOW()`);
console.log("Postgres & Server is up & running");
app.listen(3200, (listenSocket) => {
if (listenSocket) console.log("listening on port 3200");
});
} catch (error) {
console.error("Failed to connect to the database:", error);
}
await redisClient.connect();
await redisPublisher.connect();
await redisClient.subscribe("chatroom", (message: string) => {
const parsed: MessagePayload = JSON.parse(message);
console.log("publishing message to chat room=>", parsed.chatRoomId);
console.log("parsed message", parsed.message);
app.publish(parsed.chatRoomId, JSON.stringify(parsed.message));
});
})();
本文标签: nodejsuWebsocket HttpResponse not resumingpausing properlyStack Overflow
版权声明:本文标题:node.js - uWebsocket HttpResponse not resumingpausing properly - Stack Overflow 内容由热心网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://it.en369.cn/questions/1745610951a2159000.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论