123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201 |
- "use strict";
- var sha1 = require('git-sha1');
- var applyDelta = require('../lib/apply-delta.js');
- var codec = require('../lib/object-codec.js');
- var decodePack = require('../lib/pack-codec.js').decodePack;
- var encodePack = require('../lib/pack-codec.js').encodePack;
- var makeChannel = require('culvert');
- module.exports = function (repo) {
- // packChannel is a writable culvert channel {put,drain} containing raw packfile binary data
- // opts can contain "onProgress" or "onError" hook functions.
- // callback will be called with a list of all unpacked hashes on success.
- repo.unpack = unpack; // (packChannel, opts) => hashes
- // hashes is an array of hashes to pack
- // packChannel will be a readable culvert channel {take} containing raw packfile binary data
- repo.pack = pack; // (hashes, opts) => packChannel
- };
- function unpack(packChannel, opts, callback) {
- /*jshint validthis:true*/
- if (!callback) return unpack.bind(this, packChannel, opts);
- packChannel = applyParser(packChannel, decodePack, callback);
- var repo = this;
- var version, num, numDeltas = 0, count = 0, countDeltas = 0;
- var done, startDeltaProgress = false;
- // hashes keyed by offset for ofs-delta resolving
- var hashes = {};
- // key is hash, boolean is cached "has" value of true or false
- var has = {};
- // key is hash we're waiting for, value is array of items that are waiting.
- var pending = {};
- return packChannel.take(onStats);
- function onDone(err) {
- if (done) return;
- done = true;
- if (err) return callback(err);
- return callback(null, values(hashes));
- }
- function onStats(err, stats) {
- if (err) return onDone(err);
- version = stats.version;
- num = stats.num;
- packChannel.take(onRead);
- }
- function objectProgress(more) {
- if (!more) startDeltaProgress = true;
- var percent = Math.round(count / num * 100);
- return opts.onProgress("Receiving objects: " + percent + "% (" + (count++) + "/" + num + ") " + (more ? "\r" : "\n"));
- }
- function deltaProgress(more) {
- if (!startDeltaProgress) return;
- var percent = Math.round(countDeltas / numDeltas * 100);
- return opts.onProgress("Applying deltas: " + percent + "% (" + (countDeltas++) + "/" + numDeltas + ") " + (more ? "\r" : "\n"));
- }
- function onRead(err, item) {
- if (err) return onDone(err);
- if (opts.onProgress) objectProgress(item);
- if (item === undefined) return onDone();
- if (item.size !== item.body.length) {
- return onDone(new Error("Body size mismatch"));
- }
- if (item.type === "ofs-delta") {
- numDeltas++;
- item.ref = hashes[item.offset - item.ref];
- return resolveDelta(item);
- }
- if (item.type === "ref-delta") {
- numDeltas++;
- return checkDelta(item);
- }
- return saveValue(item);
- }
- function resolveDelta(item) {
- if (opts.onProgress) deltaProgress();
- return repo.loadRaw(item.ref, function (err, buffer) {
- if (err) return onDone(err);
- if (!buffer) return onDone(new Error("Missing base image at " + item.ref));
- var target = codec.deframe(buffer);
- item.type = target.type;
- item.body = applyDelta(item.body, target.body);
- return saveValue(item);
- });
- }
- function checkDelta(item) {
- var hasTarget = has[item.ref];
- if (hasTarget === true) return resolveDelta(item);
- if (hasTarget === false) return enqueueDelta(item);
- return repo.hasHash(item.ref, function (err, value) {
- if (err) return onDone(err);
- has[item.ref] = value;
- if (value) return resolveDelta(item);
- return enqueueDelta(item);
- });
- }
- function saveValue(item) {
- var buffer = codec.frame(item);
- var hash = sha1(buffer);
- hashes[item.offset] = hash;
- has[hash] = true;
- if (hash in pending) {
- // I have yet to come across a pack stream that actually needs this.
- // So I will only implement it when I have concrete data to test against.
- console.error({
- list: pending[hash],
- item: item
- });
- throw "TODO: pending value was found, resolve it";
- }
- return repo.saveRaw(hash, buffer, onSave);
- }
- function onSave(err) {
- if (err) return callback(err);
- packChannel.take(onRead);
- }
- function enqueueDelta(item) {
- var list = pending[item.ref];
- if (!list) pending[item.ref] = [item];
- else list.push(item);
- packChannel.take(onRead);
- }
- }
- // TODO: Implement delta refs to reduce stream size
- function pack(hashes, opts, callback) {
- /*jshint validthis:true*/
- if (!callback) return pack.bind(this, hashes, opts);
- var repo = this;
- var i = 0, first = true, done = false;
- return callback(null, applyParser({ take: take }, encodePack));
- function take(callback) {
- if (done) return callback();
- if (first) return readFirst(callback);
- var hash = hashes[i++];
- if (hash === undefined) {
- return callback();
- }
- repo.loadRaw(hash, function (err, buffer) {
- if (err) return callback(err);
- if (!buffer) return callback(new Error("Missing hash: " + hash));
- // Reframe with pack format header
- callback(null, codec.deframe(buffer));
- });
- }
- function readFirst(callback) {
- first = false;
- callback(null, {num: hashes.length});
- }
- }
- function values(object) {
- var keys = Object.keys(object);
- var length = keys.length;
- var out = new Array(length);
- for (var i = 0; i < length; i++) {
- out[i] = object[keys[i]];
- }
- return out;
- }
- function applyParser(stream, parser, onError) {
- var extra = makeChannel();
- extra.put = parser(extra.put);
- stream.take(onData);
- function onData(err, item) {
- if (err) return onError(err);
- var more;
- try { more = extra.put(item); }
- catch (err) { return onError(err); }
- if (more) stream.take(onData);
- else extra.drain(onDrain);
- }
- function onDrain(err) {
- if (err) return onError(err);
- stream.take(onData);
- }
- return { take: extra.take };
- }
|