本篇文章实现一个文件上传的功能, 有以下特性:
技术栈为 React + Node.js, 废话不多说, 开始操作
前端部分梳理需求如下:
选择文件的合法性:
文件预览的流程是: 当用户选择文件, 浏览器读到文件, 然后输出到页面上, 有2种方式实现:
任选一种实现如下:
const [objectURL, setObjectURL] = React.useState('');
React.useEffect(()=>{
let imageUrl = URL.createObjectURL(selectedFile)
setObjectURL(imageUrl)
return () => URL.revokeObjectURL(imageUrl)
}, [selectedFile])
return (
<>
{objectURL && <img src={objectURL}>}
</>
)
点击上传按钮后, 主要校验内容为:
function handleUpload(file){
if(!currentFile){
return message.error('未选择文件')
}
if(!allowUpload(currentFile)){
return message.error('不支持')
}
}
function allowUpload(file){
const isValidFileType = ['image/jpeg', 'image/png', 'application/pdf', 'video/mp4'].includes(file.type);
if (!isValidFileType) {
message.error('不支持此文件类型!');
}
const isLt2G = file.size / 1024 / 1024 < 1024 * 1024 * 1024;
if (!isLt2G) {
message.error('上传的图片不能大于2MB!');
}
return isValidFileType && isLt2G;
}
function Upload(){
let [currentFile, setCurrentFile] = useState<File>();
useEffect(()=>{
window.URL.createObjectURL(currentFile)
},[currentFile])
}
分片思路: 用slice将文件切片, 放近一个对象数组里面, 上传文件格式为
filename,//文件名
chunk_name: `${filename}-${index}`,//分块的名称
chunk: part.chunk,//代码块
size: part.chunk.size//此代码块的大小
通过Promise.all执行多个异步上传任务
partList = partList.map((part, index: number) => ({
filename,//文件名
chunk_name: `${filename}-${index}`,//分块的名称
chunk: part.chunk,//代码块
size: part.chunk.size//此代码块的大小
}));
await uploadParts(partList, filename);
async function uploadParts(partList: Part[], filename: string) {
let requests = createRequests(partList);
// 发送所有切片
await Promise.all(requests);
// 告诉服务器合并所有切片
await request({
url: '/merge',
method: 'POST',
headers: { 'Content-Type': "application/json" },
data: JSON.stringify({ filename })
});
}
// 组装每一个切片ajax请求
function createRequests(partList: Part[]) {
return partList.map((part: Part) => {
return request({
url: `/upload/${part.filename}/${part.chunk_name!}`,
method: 'POST',
header: { 'Content-Type': 'application/octet-stream' },
data: part.chunk,
});
})
}
文件切片:
function createChunks(file: File): Part[] {
let current = 0;
const partList: Part[] = [];
while (current < file.size) {
const chunk = file.slice(current, current + SIZE);
partList.push({ chunk, size: chunk.size });
current += SIZE;
}
return partList;
}
如果文件上传到服务器过, 那么就不再上传了, 实现秒传.
实现思路: 在webworker中计算文件hash, 上传文件名由 hash.mimeType
的格式组成
postMessage({ partList })
worker.onmessage = (event.data)
worker部分:
importScripts
spark.append(buffer)
计算完哈希后通知浏览器postMessage({hash})
const calculateHash = (partList: Part[]): Promise<string> => {
return new Promise(resolve => {
let worker = new Worker("/hash.js");
setWorker(worker);
worker.postMessage({ partList });
worker.onmessage = (event) => {
const { percent, hash } = event.data;
setHashPercent(percent);
if (hash) {
resolve(hash);
}
};
});
}
// handleHash.js
self.importScripts('https:spark-md5.js');
self.onmessage = async (event) => {
var { partList } = event.data;
const spark = new self.SparkMD5.ArrayBuffer();
var percent = 0;
var perSize = 100 / partList.length;
var buffers = await Promise.all(partList.map(({ chunk }) => new Promise((resolve) => {
// 也可以直接 chunk.ArrayBuffer()转
const reader = new FileReader();
reader.readAsArrayBuffer(chunk);
reader.onload = (event) => {
percent += perSize;
self.postMessage({ percent: Number(percent.toFixed(2)) });
resolve(event.target.result);
}
})));
buffers.forEach(buffer => spark.append(buffer));
self.postMessage({ percent: 100, hash: spark.end() });
self.close();
}
实现思路: 前端每次上传, 先把hash文件名给服务器, 服务端检查文件是否传过, 如果服务器上有这个文件夹就说明上传过
如果上传过, 读取已上传的目录, 看到底上传了多少个, 返回一个已上传列表, 前端拿到这个列表开始上传没传过的部分
断点续传支持: 暂停下载和恢复下载的逻辑
async function handleUpload(){
...
const verify = filename=>{
return await request({url:`/verify/${filename}`})
}
// 上传之前,看把hash文件名给后端
let {needUpload, uploadList} = verify(filename)
if(!needUpload){
return message.success('上传成功');
}
// 创建切片异步任务
let requests = createRequests(partList, uploadedList);
await Promise.all(requests);
await request({
url: '/merge',
method: 'POST',
headers: { 'Content-Type': "application/json" },
data: JSON.stringify({ filename })
});
message.info('上传成功!');
// 续传实现逻辑: 根据uploadedList 只上传位上传的
function createRequests(partList: Part[], uploadedList: Uploaded[]) {
return partList.filter((part: Part) => {
// 根据filename查找没上传过的
let uploadedFile = uploadedList.find(item => item.filename === part.chunk_name);
// 没上传过的 需要上传
if (!uploadedFile) {
part.loaded = 0;
part.percent = 0;
return true;
}
// 上传过的, 但没上传完整, 也需要上传
if (uploadedFile.size < part.chunk.size) {
part.loaded = uploadedFile.size;
part.percent = Number(((part.loaded / part.chunk.size) * 100).toFixed(2));
return true;
}
// 不需要上传
return false;
}).map((part: Part) => {
return request({
url: `/upload/${part.filename}/${part.chunk_name!}/${part.loaded!}`,
method: 'POST',
header: { 'Content-Type': 'application/octet-stream' },
data: part.chunk.slice(part.loaded!),
// 用来拿到xhr实现暂停 option.setXHR && setXHR(xhr)
setXHR: (xhr: XMLHttpRequest) => { part.xhr = xhr },
onProgress: (event: ProgressEvent) => {
part.percent = Number((Number(part.loaded + event.loaded) / part.chunk.size * 100).toFixed(2));
setPartList([...partList]);
}
});
})
}
...
}
上传文件2个进度条: hash计算的进度条和文件上传的进度条
worker把每个chunk从blob转ArrayBuffer的时候会通知进度:
reader.onload = (event) => {
percent += perSize;
self.postMessage({ percent: Number(percent.toFixed(2)) });
resolve(event.target.result);
}
文件上传的进度条:
通过xhr.upload.onprogress
获取进度, event.loaded
是传输了多少内容, event.total
是总文件大小
const onProgress = (event: ProgressEvent) => {
part.percent = Number((Number(part.loaded + event.loaded) / part.chunk.size * 100).toFixed(2));
setPartList([...partList]);
}
xhr.upload.onprogress = onProgress;
let totalPercent = partList.length > 0 ?
Math.round(partList.reduce((acc, curr) => acc + curr.percent!, 0) /
(partList.length * 100) * 100) : 0;
合并切片思路: 性能考虑,流处理.
// helpers, 读文件到writeStream
const pipeStream = (filePath: string, writeStream: WriteStream) => new Promise(resolve => {
const readStream = fs.createReadStream(filePath);
readStream.on('end', async () => {
await fs.unlink(filePath);
resolve();
});
readStream.pipe(writeStream);
});
export const mergeChunks = async (filename: string, size: number = SIZE) => {
const filePath = path.resolve(PUBLIC_DIR, filename);
const chunksDir = path.resolve(TEMP_DIR, filename);
const chunkFiles = await fs.readdir(chunksDir);
// 文件名升序排列
chunkFiles.sort((a, b) => Number(a.split('-')[1]) - Number(b.split('-')[1]));
await Promise.all(
// 将每一片文件读到可写流 合并chunks成一个文件
chunkFiles.map((chunkFile, index) => pipeStream(
path.resolve(chunksDir, chunkFile),
fs.createWriteStream(filePath, {
start: index * size
})
))
);
await fs.rmdir(chunksDir);
}
app.post('/merge', async (req: Request, res: Response) => {
let { filename } = req.body;
await mergeChunks(filename);
res.json({
success: true,
url: `http://localhost:8000/${filename}`
});
});
app.post('/upload/:filename/:chunk_name', async (req: Request, res: Response, _next: NextFunction) => {
let file_dir = path.resolve(TEMP_DIR, req.params.filename);
let exist = await fs.pathExists(file_dir);
if (!exist) {
await fs.mkdirs(file_dir);
}
const filePath = path.resolve(TEMP_DIR, req.params.filename, req.params.chunk_name);
// 追加模式打开
let writeStream = fs.createWriteStream(filePath, { start: 0, flags: "a" });
req.pipe(writeStream);
req.on('end', () => {
writeStream.close();
res.json({
success: true
});
});
});
app.post('/verify', async (req: Request, res: Response): Promise<any> => {
const { filename } = req.body;
const filePath = path.resolve(PUBLIC_DIR, filename);
let existFile = await fs.pathExists(filePath);
// 如果有public里面有这个文件, 说明已经上传过了 实现秒传
if (existFile) {
return res.json({
success: true,
needUpload: false
});
}
let tempFilePath = path.resolve(TEMP_DIR, filename);
let uploadedList: any[] = [];
let existTemporaryFile = await fs.pathExists(tempFilePath);
// 如果上传过 没传完到情况
if (existTemporaryFile) {
uploadedList = await fs.readdir(tempFilePath);
uploadedList = await Promise.all(uploadedList.map(async (filename: string) => {
let stat = await fs.stat(path.resolve(tempFilePath, filename));
// 把每个chunk的文件名和大小读出来
return {
filename,
size: stat.size
}
}));
}
res.json({
success: true,
needUpload: true,
uploadedList: uploadedList
});
});