pack-ops.js 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. "use strict";
  2. var sha1 = require('git-sha1');
  3. var applyDelta = require('../lib/apply-delta.js');
  4. var codec = require('../lib/object-codec.js');
  5. var decodePack = require('../lib/pack-codec.js').decodePack;
  6. var encodePack = require('../lib/pack-codec.js').encodePack;
  7. var makeChannel = require('culvert');
  8. module.exports = function (repo) {
  9. // packChannel is a writable culvert channel {put,drain} containing raw packfile binary data
  10. // opts can contain "onProgress" or "onError" hook functions.
  11. // callback will be called with a list of all unpacked hashes on success.
  12. repo.unpack = unpack; // (packChannel, opts) => hashes
  13. // hashes is an array of hashes to pack
  14. // packChannel will be a readable culvert channel {take} containing raw packfile binary data
  15. repo.pack = pack; // (hashes, opts) => packChannel
  16. };
  17. function unpack(packChannel, opts, callback) {
  18. /*jshint validthis:true*/
  19. if (!callback) return unpack.bind(this, packChannel, opts);
  20. packChannel = applyParser(packChannel, decodePack, callback);
  21. var repo = this;
  22. var version, num, numDeltas = 0, count = 0, countDeltas = 0;
  23. var done, startDeltaProgress = false;
  24. // hashes keyed by offset for ofs-delta resolving
  25. var hashes = {};
  26. // key is hash, boolean is cached "has" value of true or false
  27. var has = {};
  28. // key is hash we're waiting for, value is array of items that are waiting.
  29. var pending = {};
  30. return packChannel.take(onStats);
  31. function onDone(err) {
  32. if (done) return;
  33. done = true;
  34. if (err) return callback(err);
  35. return callback(null, values(hashes));
  36. }
  37. function onStats(err, stats) {
  38. if (err) return onDone(err);
  39. version = stats.version;
  40. num = stats.num;
  41. packChannel.take(onRead);
  42. }
  43. function objectProgress(more) {
  44. if (!more) startDeltaProgress = true;
  45. var percent = Math.round(count / num * 100);
  46. return opts.onProgress("Receiving objects: " + percent + "% (" + (count++) + "/" + num + ") " + (more ? "\r" : "\n"));
  47. }
  48. function deltaProgress(more) {
  49. if (!startDeltaProgress) return;
  50. var percent = Math.round(countDeltas / numDeltas * 100);
  51. return opts.onProgress("Applying deltas: " + percent + "% (" + (countDeltas++) + "/" + numDeltas + ") " + (more ? "\r" : "\n"));
  52. }
  53. function onRead(err, item) {
  54. if (err) return onDone(err);
  55. if (opts.onProgress) objectProgress(item);
  56. if (item === undefined) return onDone();
  57. if (item.size !== item.body.length) {
  58. return onDone(new Error("Body size mismatch"));
  59. }
  60. if (item.type === "ofs-delta") {
  61. numDeltas++;
  62. item.ref = hashes[item.offset - item.ref];
  63. return resolveDelta(item);
  64. }
  65. if (item.type === "ref-delta") {
  66. numDeltas++;
  67. return checkDelta(item);
  68. }
  69. return saveValue(item);
  70. }
  71. function resolveDelta(item) {
  72. if (opts.onProgress) deltaProgress();
  73. return repo.loadRaw(item.ref, function (err, buffer) {
  74. if (err) return onDone(err);
  75. if (!buffer) return onDone(new Error("Missing base image at " + item.ref));
  76. var target = codec.deframe(buffer);
  77. item.type = target.type;
  78. item.body = applyDelta(item.body, target.body);
  79. return saveValue(item);
  80. });
  81. }
  82. function checkDelta(item) {
  83. var hasTarget = has[item.ref];
  84. if (hasTarget === true) return resolveDelta(item);
  85. if (hasTarget === false) return enqueueDelta(item);
  86. return repo.hasHash(item.ref, function (err, value) {
  87. if (err) return onDone(err);
  88. has[item.ref] = value;
  89. if (value) return resolveDelta(item);
  90. return enqueueDelta(item);
  91. });
  92. }
  93. function saveValue(item) {
  94. var buffer = codec.frame(item);
  95. var hash = sha1(buffer);
  96. hashes[item.offset] = hash;
  97. has[hash] = true;
  98. if (hash in pending) {
  99. // I have yet to come across a pack stream that actually needs this.
  100. // So I will only implement it when I have concrete data to test against.
  101. console.error({
  102. list: pending[hash],
  103. item: item
  104. });
  105. throw "TODO: pending value was found, resolve it";
  106. }
  107. return repo.saveRaw(hash, buffer, onSave);
  108. }
  109. function onSave(err) {
  110. if (err) return callback(err);
  111. packChannel.take(onRead);
  112. }
  113. function enqueueDelta(item) {
  114. var list = pending[item.ref];
  115. if (!list) pending[item.ref] = [item];
  116. else list.push(item);
  117. packChannel.take(onRead);
  118. }
  119. }
  120. // TODO: Implement delta refs to reduce stream size
  121. function pack(hashes, opts, callback) {
  122. /*jshint validthis:true*/
  123. if (!callback) return pack.bind(this, hashes, opts);
  124. var repo = this;
  125. var i = 0, first = true, done = false;
  126. return callback(null, applyParser({ take: take }, encodePack));
  127. function take(callback) {
  128. if (done) return callback();
  129. if (first) return readFirst(callback);
  130. var hash = hashes[i++];
  131. if (hash === undefined) {
  132. return callback();
  133. }
  134. repo.loadRaw(hash, function (err, buffer) {
  135. if (err) return callback(err);
  136. if (!buffer) return callback(new Error("Missing hash: " + hash));
  137. // Reframe with pack format header
  138. callback(null, codec.deframe(buffer));
  139. });
  140. }
  141. function readFirst(callback) {
  142. first = false;
  143. callback(null, {num: hashes.length});
  144. }
  145. }
  146. function values(object) {
  147. var keys = Object.keys(object);
  148. var length = keys.length;
  149. var out = new Array(length);
  150. for (var i = 0; i < length; i++) {
  151. out[i] = object[keys[i]];
  152. }
  153. return out;
  154. }
  155. function applyParser(stream, parser, onError) {
  156. var extra = makeChannel();
  157. extra.put = parser(extra.put);
  158. stream.take(onData);
  159. function onData(err, item) {
  160. if (err) return onError(err);
  161. var more;
  162. try { more = extra.put(item); }
  163. catch (err) { return onError(err); }
  164. if (more) stream.take(onData);
  165. else extra.drain(onDrain);
  166. }
  167. function onDrain(err) {
  168. if (err) return onError(err);
  169. stream.take(onData);
  170. }
  171. return { take: extra.take };
  172. }