diff --git a/spec/resource-pool-spec.js b/spec/resource-pool-spec.js new file mode 100644 index 000000000..27893360a --- /dev/null +++ b/spec/resource-pool-spec.js @@ -0,0 +1,66 @@ +/** @babel */ + +import ResourcePool from '../src/resource-pool' + +import {it} from './async-spec-helpers' + +describe('ResourcePool', () => { + let queue + + beforeEach(() => { + queue = new ResourcePool([{}]) + }) + + describe('.enqueue', () => { + it('calls the enqueued function', async () => { + let called = false + await queue.enqueue(() => { + called = true + return Promise.resolve() + }) + expect(called).toBe(true) + }) + + it('forwards values from the inner promise', async () => { + const result = await queue.enqueue(() => Promise.resolve(42)) + expect(result).toBe(42) + }) + + it('forwards errors from the inner promise', async () => { + let threw = false + try { + await queue.enqueue(() => Promise.reject(new Error('down with the sickness'))) + } catch (e) { + threw = true + } + expect(threw).toBe(true) + }) + + it('continues to dequeue work after a promise has been rejected', async () => { + try { + await queue.enqueue(() => Promise.reject(new Error('down with the sickness'))) + } catch (e) {} + + const result = await queue.enqueue(() => Promise.resolve(42)) + expect(result).toBe(42) + }) + + it('queues up work', async () => { + let resolve = null + queue.enqueue(() => { + return new Promise((resolve_, reject) => { + resolve = resolve_ + }) + }) + + expect(queue.getQueueDepth()).toBe(0) + + queue.enqueue(() => new Promise((resolve, reject) => {})) + + expect(queue.getQueueDepth()).toBe(1) + resolve() + + waitsFor(() => queue.getQueueDepth() === 0) + }) + }) +}) diff --git a/src/git-repository-async.js b/src/git-repository-async.js index 17d293c14..e45c1e47e 100644 --- a/src/git-repository-async.js +++ b/src/git-repository-async.js @@ -3,6 +3,7 @@ import fs from 'fs-plus' import path from 'path' import Git from 'nodegit' +import ResourcePool from './resource-pool' import {Emitter, CompositeDisposable, Disposable} from 'event-kit' const modifiedStatusFlags = Git.Status.STATUS.WT_MODIFIED | Git.Status.STATUS.INDEX_MODIFIED | Git.Status.STATUS.WT_DELETED | Git.Status.STATUS.INDEX_DELETED | Git.Status.STATUS.WT_TYPECHANGE | Git.Status.STATUS.INDEX_TYPECHANGE @@ -38,7 +39,8 @@ export default class GitRepositoryAsync { } constructor (_path, options = {}) { - Git.enableThreadSafety() + // We'll serialize our access manually. + Git.disableThreadSafety() this.emitter = new Emitter() this.subscriptions = new CompositeDisposable() @@ -50,6 +52,11 @@ export default class GitRepositoryAsync { this._openExactPath = options.openExactPath || false this.repoPromise = this.openRepository() + // NB: We don't currently _use_ the pooled object. But by giving it one + // thing, we're really just serializing all the work. Down the road, we + // could open multiple connections to the repository. + this.repoPool = new ResourcePool([this.repoPromise]) + this.isCaseInsensitive = fs.isCaseInsensitive() this.upstream = {} this.submodules = {} @@ -81,6 +88,7 @@ export default class GitRepositoryAsync { this.emitter.dispose() this.emitter = null } + if (this.subscriptions) { this.subscriptions.dispose() this.subscriptions = null @@ -260,10 +268,12 @@ export default class GitRepositoryAsync { // Public: Returns a {Promise} which resolves to whether the given branch // exists. hasBranch (branch) { - return this.getRepo() - .then(repo => repo.getBranch(branch)) - .then(branch => branch != null) - .catch(_ => false) + return this.repoPool.enqueue(() => { + return this.getRepo() + .then(repo => repo.getBranch(branch)) + .then(branch => branch != null) + .catch(_ => false) + }) } // Public: Retrieves a shortened version of the HEAD reference value. @@ -277,9 +287,11 @@ export default class GitRepositoryAsync { // // Returns a {Promise} which resolves to a {String}. getShortHead (_path) { - return this.getRepo(_path) - .then(repo => repo.getCurrentBranch()) - .then(branch => branch.shorthand()) + return this.repoPool.enqueue(() => { + return this.getRepo(_path) + .then(repo => repo.getCurrentBranch()) + .then(branch => branch.shorthand()) + }) } // Public: Is the given path a submodule in the repository? @@ -289,14 +301,18 @@ export default class GitRepositoryAsync { // Returns a {Promise} that resolves true if the given path is a submodule in // the repository. isSubmodule (_path) { - return this.getRepo() - .then(repo => repo.openIndex()) - .then(index => Promise.all([index, this.relativizeToWorkingDirectory(_path)])) - .then(([index, relativePath]) => { - const entry = index.getByPath(relativePath) - if (!entry) return false + return this.relativizeToWorkingDirectory(_path) + .then(relativePath => { + return this.repoPool.enqueue(() => { + return this.getRepo() + .then(repo => repo.openIndex()) + .then(index => { + const entry = index.getByPath(relativePath) + if (!entry) return false - return entry.mode === submoduleMode + return entry.mode === submoduleMode + }) + }) }) } @@ -311,16 +327,18 @@ export default class GitRepositoryAsync { // * `ahead` The {Number} of commits ahead. // * `behind` The {Number} of commits behind. getAheadBehindCount (reference, _path) { - return this.getRepo(_path) - .then(repo => Promise.all([repo, repo.getBranch(reference)])) - .then(([repo, local]) => { - const upstream = Git.Branch.upstream(local) - return Promise.all([repo, local, upstream]) - }) - .then(([repo, local, upstream]) => { - return Git.Graph.aheadBehind(repo, local.target(), upstream.target()) - }) - .catch(_ => ({ahead: 0, behind: 0})) + return this.repoPool.enqueue(() => { + return this.getRepo(_path) + .then(repo => Promise.all([repo, repo.getBranch(reference)])) + .then(([repo, local]) => { + const upstream = Git.Branch.upstream(local) + return Promise.all([repo, local, upstream]) + }) + .then(([repo, local, upstream]) => { + return Git.Graph.aheadBehind(repo, local.target(), upstream.target()) + }) + .catch(_ => ({ahead: 0, behind: 0})) + }) } // Public: Get the cached ahead/behind commit counts for the current branch's @@ -352,10 +370,12 @@ export default class GitRepositoryAsync { // Returns a {Promise} which resolves to the {String} git configuration value // specified by the key. getConfigValue (key, _path) { - return this.getRepo(_path) - .then(repo => repo.configSnapshot()) - .then(config => config.getStringBuf(key)) - .catch(_ => null) + return this.repoPool.enqueue(() => { + return this.getRepo(_path) + .then(repo => repo.configSnapshot()) + .then(config => config.getStringBuf(key)) + .catch(_ => null) + }) } // Public: Get the URL for the 'origin' remote. @@ -378,9 +398,11 @@ export default class GitRepositoryAsync { // Returns a {Promise} which resolves to a {String} branch name such as // `refs/remotes/origin/master`. getUpstreamBranch (_path) { - return this.getRepo(_path) - .then(repo => repo.getCurrentBranch()) - .then(branch => Git.Branch.upstream(branch)) + return this.repoPool.enqueue(() => { + return this.getRepo(_path) + .then(repo => repo.getCurrentBranch()) + .then(branch => Git.Branch.upstream(branch)) + }) } // Public: Gets all the local and remote references. @@ -393,23 +415,25 @@ export default class GitRepositoryAsync { // * `remotes` An {Array} of remote reference names. // * `tags` An {Array} of tag reference names. getReferences (_path) { - return this.getRepo(_path) - .then(repo => repo.getReferences(Git.Reference.TYPE.LISTALL)) - .then(refs => { - const heads = [] - const remotes = [] - const tags = [] - for (const ref of refs) { - if (ref.isTag()) { - tags.push(ref.name()) - } else if (ref.isRemote()) { - remotes.push(ref.name()) - } else if (ref.isBranch()) { - heads.push(ref.name()) + return this.repoPool.enqueue(() => { + return this.getRepo(_path) + .then(repo => repo.getReferences(Git.Reference.TYPE.LISTALL)) + .then(refs => { + const heads = [] + const remotes = [] + const tags = [] + for (const ref of refs) { + if (ref.isTag()) { + tags.push(ref.name()) + } else if (ref.isRemote()) { + remotes.push(ref.name()) + } else if (ref.isBranch()) { + heads.push(ref.name()) + } } - } - return {heads, remotes, tags} - }) + return {heads, remotes, tags} + }) + }) } // Public: Get the SHA for the given reference. @@ -421,9 +445,11 @@ export default class GitRepositoryAsync { // Returns a {Promise} which resolves to the current {String} SHA for the // given reference. getReferenceTarget (reference, _path) { - return this.getRepo(_path) - .then(repo => Git.Reference.nameToId(repo, reference)) - .then(oid => oid.tostrS()) + return this.repoPool.enqueue(() => { + return this.getRepo(_path) + .then(repo => Git.Reference.nameToId(repo, reference)) + .then(oid => oid.tostrS()) + }) } // Reading Status @@ -460,12 +486,17 @@ export default class GitRepositoryAsync { // Returns a {Promise} which resolves to a {Boolean} that's true if the `path` // is ignored. isPathIgnored (_path) { - return Promise.all([this.getRepo(), this.getWorkingDirectory()]) - .then(([repo, wd]) => { - const relativePath = this.relativize(_path, wd) - return Git.Ignore.pathIsIgnored(repo, relativePath) + return this.getWorkingDirectory() + .then(wd => { + return this.repoPool.enqueue(() => { + return this.getRepo() + .then(repo => { + const relativePath = this.relativize(_path, wd) + return Git.Ignore.pathIsIgnored(repo, relativePath) + }) + .then(ignored => Boolean(ignored)) + }) }) - .then(ignored => Boolean(ignored)) } // Get the status of a directory in the repository's working directory. @@ -501,8 +532,8 @@ export default class GitRepositoryAsync { // status bit for the path. refreshStatusForPath (_path) { let relativePath - return Promise.all([this.getRepo(), this.getWorkingDirectory()]) - .then(([repo, wd]) => { + return this.getWorkingDirectory() + .then(wd => { relativePath = this.relativize(_path, wd) return this._getStatus([relativePath]) }) @@ -609,34 +640,39 @@ export default class GitRepositoryAsync { // * `added` The {Number} of added lines. // * `deleted` The {Number} of deleted lines. getDiffStats (_path) { - return this.getRepo(_path) - .then(repo => Promise.all([repo, repo.getHeadCommit()])) - .then(([repo, headCommit]) => Promise.all([repo, headCommit.getTree(), this.getWorkingDirectory(_path)])) - .then(([repo, tree, wd]) => { - const options = new Git.DiffOptions() - options.contextLines = 0 - options.flags = Git.Diff.OPTION.DISABLE_PATHSPEC_MATCH - options.pathspec = this.relativize(_path, wd) - if (process.platform === 'win32') { - // Ignore eol of line differences on windows so that files checked in - // as LF don't report every line modified when the text contains CRLF - // endings. - options.flags |= Git.Diff.OPTION.IGNORE_WHITESPACE_EOL - } - return Git.Diff.treeToWorkdir(repo, tree, options) - }) - .then(diff => this._getDiffLines(diff)) - .then(lines => { - const stats = {added: 0, deleted: 0} - for (const line of lines) { - const origin = line.origin() - if (origin === Git.Diff.LINE.ADDITION) { - stats.added++ - } else if (origin === Git.Diff.LINE.DELETION) { - stats.deleted++ - } - } - return stats + return this.getWorkingDirectory(_path) + .then(wd => { + return this.repoPool.enqueue(() => { + return this.getRepo(_path) + .then(repo => Promise.all([repo, repo.getHeadCommit()])) + .then(([repo, headCommit]) => Promise.all([repo, headCommit.getTree()])) + .then(([repo, tree]) => { + const options = new Git.DiffOptions() + options.contextLines = 0 + options.flags = Git.Diff.OPTION.DISABLE_PATHSPEC_MATCH + options.pathspec = this.relativize(_path, wd) + if (process.platform === 'win32') { + // Ignore eol of line differences on windows so that files checked in + // as LF don't report every line modified when the text contains CRLF + // endings. + options.flags |= Git.Diff.OPTION.IGNORE_WHITESPACE_EOL + } + return Git.Diff.treeToWorkdir(repo, tree, options) + }) + .then(diff => this._getDiffLines(diff)) + .then(lines => { + const stats = {added: 0, deleted: 0} + for (const line of lines) { + const origin = line.origin() + if (origin === Git.Diff.LINE.ADDITION) { + stats.added++ + } else if (origin === Git.Diff.LINE.DELETION) { + stats.deleted++ + } + } + return stats + }) + }) }) } @@ -652,24 +688,29 @@ export default class GitRepositoryAsync { // * `oldLines` The {Number} of lines in the old hunk. // * `newLines` The {Number} of lines in the new hunk getLineDiffs (_path, text) { - let relativePath = null - return Promise.all([this.getRepo(_path), this.getWorkingDirectory(_path)]) - .then(([repo, wd]) => { - relativePath = this.relativize(_path, wd) - return repo.getHeadCommit() - }) - .then(commit => commit.getEntry(relativePath)) - .then(entry => entry.getBlob()) - .then(blob => { - const options = new Git.DiffOptions() - options.contextLines = 0 - if (process.platform === 'win32') { - // Ignore eol of line differences on windows so that files checked in - // as LF don't report every line modified when the text contains CRLF - // endings. - options.flags = Git.Diff.OPTION.IGNORE_WHITESPACE_EOL - } - return this._diffBlobToBuffer(blob, text, options) + return this.getWorkingDirectory(_path) + .then(wd => { + let relativePath = null + return this.repoPool.enqueue(() => { + return this.getRepo(_path) + .then(repo => { + relativePath = this.relativize(_path, wd) + return repo.getHeadCommit() + }) + .then(commit => commit.getEntry(relativePath)) + .then(entry => entry.getBlob()) + .then(blob => { + const options = new Git.DiffOptions() + options.contextLines = 0 + if (process.platform === 'win32') { + // Ignore eol of line differences on windows so that files checked in + // as LF don't report every line modified when the text contains CRLF + // endings. + options.flags = Git.Diff.OPTION.IGNORE_WHITESPACE_EOL + } + return this._diffBlobToBuffer(blob, text, options) + }) + }) }) } @@ -691,12 +732,17 @@ export default class GitRepositoryAsync { // Returns a {Promise} that resolves or rejects depending on whether the // method was successful. checkoutHead (_path) { - return Promise.all([this.getRepo(_path), this.getWorkingDirectory(_path)]) - .then(([repo, wd]) => { - const checkoutOptions = new Git.CheckoutOptions() - checkoutOptions.paths = [this.relativize(_path, wd)] - checkoutOptions.checkoutStrategy = Git.Checkout.STRATEGY.FORCE | Git.Checkout.STRATEGY.DISABLE_PATHSPEC_MATCH - return Git.Checkout.head(repo, checkoutOptions) + return this.getWorkingDirectory(_path) + .then(wd => { + return this.repoPool.enqueue(() => { + return this.getRepo(_path) + .then(repo => { + const checkoutOptions = new Git.CheckoutOptions() + checkoutOptions.paths = [this.relativize(_path, wd)] + checkoutOptions.checkoutStrategy = Git.Checkout.STRATEGY.FORCE | Git.Checkout.STRATEGY.DISABLE_PATHSPEC_MATCH + return Git.Checkout.head(repo, checkoutOptions) + }) + }) }) .then(() => this.refreshStatusForPath(_path)) } @@ -709,17 +755,19 @@ export default class GitRepositoryAsync { // // Returns a {Promise} that resolves if the method was successful. checkoutReference (reference, create) { - return this.getRepo() - .then(repo => repo.checkoutBranch(reference)) - .catch(error => { - if (create) { - return this._createBranch(reference) - .then(_ => this.checkoutReference(reference, false)) - } else { - throw error - } - }) - .then(_ => null) + return this.repoPool.enqueue(() => { + return this.getRepo() + .then(repo => repo.checkoutBranch(reference)) + }) + .catch(error => { + if (create) { + return this._createBranch(reference) + .then(_ => this.checkoutReference(reference, false)) + } else { + throw error + } + }) + .then(_ => null) } // Private @@ -745,9 +793,11 @@ export default class GitRepositoryAsync { // Returns a {Promise} which resolves to a {NodeGit.Ref} reference to the // created branch. _createBranch (name) { - return this.getRepo() - .then(repo => Promise.all([repo, repo.getHeadCommit()])) - .then(([repo, commit]) => repo.createBranch(name, commit)) + return this.repoPool.enqueue(() => { + return this.getRepo() + .then(repo => Promise.all([repo, repo.getHeadCommit()])) + .then(([repo, commit]) => repo.createBranch(name, commit)) + }) } // Get all the hunks in the diff. @@ -804,14 +854,16 @@ export default class GitRepositoryAsync { // Returns a {Promise} which resolves to a {boolean} indicating whether the // branch name changed. _refreshBranch () { - return this.getRepo() - .then(repo => repo.getCurrentBranch()) - .then(ref => ref.name()) - .then(branchName => { - const changed = branchName !== this.branch - this.branch = branchName - return changed - }) + return this.repoPool.enqueue(() => { + return this.getRepo() + .then(repo => repo.getCurrentBranch()) + .then(ref => ref.name()) + .then(branchName => { + const changed = branchName !== this.branch + this.branch = branchName + return changed + }) + }) } // Refresh the cached ahead/behind count with the given branch. @@ -1077,18 +1129,20 @@ export default class GitRepositoryAsync { // // Returns a {Promise} which resolves to an {Array} of {NodeGit.StatusFile} // statuses for the paths. - _getStatus (paths, repo) { - return this.getRepo() - .then(repo => { - const opts = { - flags: Git.Status.OPT.INCLUDE_UNTRACKED | Git.Status.OPT.RECURSE_UNTRACKED_DIRS - } + _getStatus (paths) { + return this.repoPool.enqueue(() => { + return this.getRepo() + .then(repo => { + const opts = { + flags: Git.Status.OPT.INCLUDE_UNTRACKED | Git.Status.OPT.RECURSE_UNTRACKED_DIRS + } - if (paths) { - opts.pathspec = paths - } + if (paths) { + opts.pathspec = paths + } - return repo.getStatusExt(opts) - }) + return repo.getStatusExt(opts) + }) + }) } } diff --git a/src/resource-pool.js b/src/resource-pool.js new file mode 100644 index 000000000..ae7cb71d0 --- /dev/null +++ b/src/resource-pool.js @@ -0,0 +1,57 @@ +/** @babel */ + +// Manages a pool of some resource. +export default class ResourcePool { + constructor (pool) { + this.pool = pool + + this.queue = [] + } + + // Enqueue the given function. The function will be given an object from the + // pool. The function must return a {Promise}. + enqueue (fn) { + let resolve = null + let reject = null + const wrapperPromise = new Promise((resolve_, reject_) => { + resolve = resolve_ + reject = reject_ + }) + + this.queue.push(this.wrapFunction(fn, resolve, reject)) + + this.dequeueIfAble() + + return wrapperPromise + } + + wrapFunction (fn, resolve, reject) { + return (resource) => { + const promise = fn(resource) + promise + .then(result => { + resolve(result) + this.taskDidComplete(resource) + }, error => { + reject(error) + this.taskDidComplete(resource) + }) + } + } + + taskDidComplete (resource) { + this.pool.push(resource) + + this.dequeueIfAble() + } + + dequeueIfAble () { + if (!this.pool.length || !this.queue.length) return + + const fn = this.queue.shift() + const resource = this.pool.shift() + fn(resource) + } + + getQueueDepth () { return this.queue.length } +}