Add new export experience (#12201)

* Use script setup

* Start on export dialog

* Use new system field interface, replace limit with numeric input

* Set placeholder

* Add sort config

* Use folder picker, correct layoutQuery use

* Add local download button

* Allow writing exports to file

* Add notification after export

* Fix sort config, use new export endpoint

* Setup notification hints

* Add information notice

* Fix local limit, cancel button

* Add (basic) docs for export functionality

* Fix json export file format

* Implement xml batch stitching

* Resolve review points
This commit is contained in:
Rijk van Zanten
2022-03-17 15:43:45 -04:00
committed by GitHub
parent aca9ff9709
commit 1c3e94d830
25 changed files with 1095 additions and 416 deletions

View File

@@ -0,0 +1,345 @@
import { Knex } from 'knex';
import getDatabase from '../database';
import { AbstractServiceOptions, File } from '../types';
import { Accountability, Query, SchemaOverview } from '@directus/shared/types';
import {
ForbiddenException,
InvalidPayloadException,
ServiceUnavailableException,
UnsupportedMediaTypeException,
} from '../exceptions';
import StreamArray from 'stream-json/streamers/StreamArray';
import { ItemsService } from './items';
import { queue } from 'async';
import destroyStream from 'destroy';
import csv from 'csv-parser';
import { set, transform } from 'lodash';
import { parse as toXML } from 'js2xmlparser';
import { Parser as CSVParser, transforms as CSVTransforms } from 'json2csv';
import { appendFile, createReadStream } from 'fs-extra';
import { file as createTmpFile } from 'tmp-promise';
import env from '../env';
import { FilesService } from './files';
import { getDateFormatted } from '../utils/get-date-formatted';
import { toArray } from '@directus/shared/utils';
import { NotificationsService } from './notifications';
import logger from '../logger';
export class ImportService {
knex: Knex;
accountability: Accountability | null;
schema: SchemaOverview;
constructor(options: AbstractServiceOptions) {
this.knex = options.knex || getDatabase();
this.accountability = options.accountability || null;
this.schema = options.schema;
}
async import(collection: string, mimetype: string, stream: NodeJS.ReadableStream): Promise<void> {
if (collection.startsWith('directus_')) throw new ForbiddenException();
const createPermissions = this.accountability?.permissions?.find(
(permission) => permission.collection === collection && permission.action === 'create'
);
const updatePermissions = this.accountability?.permissions?.find(
(permission) => permission.collection === collection && permission.action === 'update'
);
if (this.accountability?.admin !== true && (!createPermissions || !updatePermissions)) {
throw new ForbiddenException();
}
switch (mimetype) {
case 'application/json':
return await this.importJSON(collection, stream);
case 'text/csv':
case 'application/vnd.ms-excel':
return await this.importCSV(collection, stream);
default:
throw new UnsupportedMediaTypeException(`Can't import files of type "${mimetype}"`);
}
}
importJSON(collection: string, stream: NodeJS.ReadableStream): Promise<void> {
const extractJSON = StreamArray.withParser();
return this.knex.transaction((trx) => {
const service = new ItemsService(collection, {
knex: trx,
schema: this.schema,
accountability: this.accountability,
});
const saveQueue = queue(async (value: Record<string, unknown>) => {
return await service.upsertOne(value);
});
return new Promise<void>((resolve, reject) => {
stream.pipe(extractJSON);
extractJSON.on('data', ({ value }) => {
saveQueue.push(value);
});
extractJSON.on('error', (err) => {
destroyStream(stream);
destroyStream(extractJSON);
reject(new InvalidPayloadException(err.message));
});
saveQueue.error((err) => {
reject(err);
});
extractJSON.on('end', () => {
saveQueue.drain(() => {
return resolve();
});
});
});
});
}
importCSV(collection: string, stream: NodeJS.ReadableStream): Promise<void> {
return this.knex.transaction((trx) => {
const service = new ItemsService(collection, {
knex: trx,
schema: this.schema,
accountability: this.accountability,
});
const saveQueue = queue(async (value: Record<string, unknown>) => {
return await service.upsertOne(value);
});
return new Promise<void>((resolve, reject) => {
stream
.pipe(csv())
.on('data', (value: Record<string, string>) => {
const obj = transform(value, (result: Record<string, string>, value, key) => {
if (value.length === 0) {
delete result[key];
} else {
try {
const parsedJson = JSON.parse(value);
set(result, key, parsedJson);
} catch {
set(result, key, value);
}
}
});
saveQueue.push(obj);
})
.on('error', (err) => {
destroyStream(stream);
reject(new InvalidPayloadException(err.message));
})
.on('end', () => {
saveQueue.drain(() => {
return resolve();
});
});
saveQueue.error((err) => {
reject(err);
});
});
});
}
}
export class ExportService {
knex: Knex;
accountability: Accountability | null;
schema: SchemaOverview;
constructor(options: AbstractServiceOptions) {
this.knex = options.knex || getDatabase();
this.accountability = options.accountability || null;
this.schema = options.schema;
}
/**
* Export the query results as a named file. Will query in batches, and keep appending a tmp file
* until all the data is retrieved. Uploads the result as a new file using the regular
* FilesService upload method.
*/
async exportToFile(
collection: string,
query: Partial<Query>,
format: 'xml' | 'csv' | 'json',
options?: {
file?: Partial<File>;
}
) {
try {
const mimeTypes = {
xml: 'text/xml',
csv: 'text/csv',
json: 'application/json',
};
const database = getDatabase();
const { path, cleanup } = await createTmpFile();
await database.transaction(async (trx) => {
const service = new ItemsService(collection, {
accountability: this.accountability,
schema: this.schema,
knex: trx,
});
const totalCount = await service
.readByQuery({
...query,
aggregate: {
count: ['*'],
},
})
.then((result) => Number(result?.[0]?.count ?? 0));
const count = query.limit ? Math.min(totalCount, query.limit) : totalCount;
const requestedLimit = query.limit ?? -1;
const batchesRequired = Math.ceil(count / env.EXPORT_BATCH_SIZE);
let readCount = 0;
for (let batch = 0; batch <= batchesRequired; batch++) {
let limit = env.EXPORT_BATCH_SIZE;
if (requestedLimit > 0 && env.EXPORT_BATCH_SIZE > requestedLimit - readCount) {
limit = requestedLimit - readCount;
}
const result = await service.readByQuery({
...query,
limit,
page: batch,
});
readCount += result.length;
if (result.length) {
await appendFile(
path,
this.transform(result, format, {
includeHeader: batch === 0,
includeFooter: batch + 1 === batchesRequired,
})
);
}
}
});
const filesService = new FilesService({
accountability: this.accountability,
schema: this.schema,
});
const storage: string = toArray(env.STORAGE_LOCATIONS)[0];
const title = `export-${collection}-${getDateFormatted()}`;
const filename = `${title}.${format}`;
const fileWithDefaults: Partial<File> & { storage: string; filename_download: string } = {
...(options?.file ?? {}),
title: options?.file?.title ?? title,
filename_download: options?.file?.filename_download ?? filename,
storage: options?.file?.storage ?? storage,
type: mimeTypes[format],
};
const savedFile = await filesService.uploadOne(createReadStream(path), fileWithDefaults);
if (this.accountability?.user) {
const notificationsService = new NotificationsService({
accountability: this.accountability,
schema: this.schema,
});
await notificationsService.createOne({
recipient: this.accountability.user,
sender: this.accountability.user,
subject: `Your export of ${collection} is ready`,
collection: `directus_files`,
item: savedFile,
});
}
await cleanup();
} catch (err: any) {
logger.error(err, `Couldn't export ${collection}: ${err.message}`);
if (this.accountability?.user) {
const notificationsService = new NotificationsService({
accountability: this.accountability,
schema: this.schema,
});
await notificationsService.createOne({
recipient: this.accountability.user,
sender: this.accountability.user,
subject: `Your export of ${collection} failed`,
message: `Please contact your system administrator for more information.`,
});
}
}
}
/**
* Transform a given input object / array to the given type
*/
transform(
input: Record<string, any>[],
format: 'xml' | 'csv' | 'json',
options?: {
includeHeader?: boolean;
includeFooter?: boolean;
}
): string {
if (format === 'json') {
let string = JSON.stringify(input || null, null, '\t');
if (options?.includeHeader === false) string = string.split('\n').slice(1).join('\n');
if (options?.includeFooter === false) {
const lines = string.split('\n');
string = lines.slice(0, lines.length - 1).join('\n');
string += ',\n';
}
return string;
}
if (format === 'xml') {
let string = toXML('data', input);
if (options?.includeHeader === false) string = string.split('\n').slice(2).join('\n');
if (options?.includeFooter === false) {
const lines = string.split('\n');
string = lines.slice(0, lines.length - 1).join('\n');
string += '\n';
}
return string;
}
if (format === 'csv') {
const parser = new CSVParser({
transforms: [CSVTransforms.flatten({ separator: '.' })],
header: options?.includeHeader !== false,
});
return parser.parse(input);
}
throw new ServiceUnavailableException(`Illegal export type used: "${format}"`, { service: 'export' });
}
}

View File

@@ -1,138 +0,0 @@
import { Knex } from 'knex';
import getDatabase from '../database';
import { AbstractServiceOptions } from '../types';
import { Accountability, SchemaOverview } from '@directus/shared/types';
import { ForbiddenException, InvalidPayloadException, UnsupportedMediaTypeException } from '../exceptions';
import StreamArray from 'stream-json/streamers/StreamArray';
import { ItemsService } from './items';
import { queue } from 'async';
import destroyStream from 'destroy';
import csv from 'csv-parser';
import { set, transform } from 'lodash';
export class ImportService {
knex: Knex;
accountability: Accountability | null;
schema: SchemaOverview;
constructor(options: AbstractServiceOptions) {
this.knex = options.knex || getDatabase();
this.accountability = options.accountability || null;
this.schema = options.schema;
}
async import(collection: string, mimetype: string, stream: NodeJS.ReadableStream): Promise<void> {
if (collection.startsWith('directus_')) throw new ForbiddenException();
const createPermissions = this.accountability?.permissions?.find(
(permission) => permission.collection === collection && permission.action === 'create'
);
const updatePermissions = this.accountability?.permissions?.find(
(permission) => permission.collection === collection && permission.action === 'update'
);
if (this.accountability?.admin !== true && (!createPermissions || !updatePermissions)) {
throw new ForbiddenException();
}
switch (mimetype) {
case 'application/json':
return await this.importJSON(collection, stream);
case 'text/csv':
case 'application/vnd.ms-excel':
return await this.importCSV(collection, stream);
default:
throw new UnsupportedMediaTypeException(`Can't import files of type "${mimetype}"`);
}
}
importJSON(collection: string, stream: NodeJS.ReadableStream): Promise<void> {
const extractJSON = StreamArray.withParser();
return this.knex.transaction((trx) => {
const service = new ItemsService(collection, {
knex: trx,
schema: this.schema,
accountability: this.accountability,
});
const saveQueue = queue(async (value: Record<string, unknown>) => {
return await service.upsertOne(value);
});
return new Promise<void>((resolve, reject) => {
stream.pipe(extractJSON);
extractJSON.on('data', ({ value }) => {
saveQueue.push(value);
});
extractJSON.on('error', (err) => {
destroyStream(stream);
destroyStream(extractJSON);
reject(new InvalidPayloadException(err.message));
});
saveQueue.error((err) => {
reject(err);
});
extractJSON.on('end', () => {
saveQueue.drain(() => {
return resolve();
});
});
});
});
}
importCSV(collection: string, stream: NodeJS.ReadableStream): Promise<void> {
return this.knex.transaction((trx) => {
const service = new ItemsService(collection, {
knex: trx,
schema: this.schema,
accountability: this.accountability,
});
const saveQueue = queue(async (value: Record<string, unknown>) => {
return await service.upsertOne(value);
});
return new Promise<void>((resolve, reject) => {
stream
.pipe(csv())
.on('data', (value: Record<string, string>) => {
const obj = transform(value, (result: Record<string, string>, value, key) => {
if (value.length === 0) {
delete result[key];
} else {
try {
const parsedJson = JSON.parse(value);
set(result, key, parsedJson);
} catch {
set(result, key, value);
}
}
});
saveQueue.push(obj);
})
.on('error', (err) => {
destroyStream(stream);
reject(new InvalidPayloadException(err.message));
})
.on('end', () => {
saveQueue.drain(() => {
return resolve();
});
});
saveQueue.error((err) => {
reject(err);
});
});
});
}
}

View File

@@ -10,7 +10,7 @@ export * from './fields';
export * from './files';
export * from './folders';
export * from './graphql';
export * from './import';
export * from './import-export';
export * from './mail';
export * from './meta';
export * from './notifications';