diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MapreduceRestoreSnapshotHelper.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MapreduceRestoreSnapshotHelper.java new file mode 100644 index 000000000000..628ac63ef2c1 --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MapreduceRestoreSnapshotHelper.java @@ -0,0 +1,831 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; +import org.apache.hadoop.hbase.io.HFileLink; +import org.apache.hadoop.hbase.io.Reference; +import org.apache.hadoop.hbase.mob.MobUtils; +import org.apache.hadoop.hbase.monitoring.MonitoredTask; +import org.apache.hadoop.hbase.monitoring.TaskMonitor; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; +import org.apache.hadoop.hbase.regionserver.StoreContext; +import org.apache.hadoop.hbase.regionserver.StoreFileInfo; +import org.apache.hadoop.hbase.regionserver.StoreUtils; +import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker; +import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; +import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; +import org.apache.hadoop.hbase.snapshot.SnapshotManifest; +import org.apache.hadoop.hbase.snapshot.SnapshotTTLExpiredException; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.MapreduceHFileArchiver; +import org.apache.hadoop.hbase.util.ModifyRegionUtils; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.io.IOUtils; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Helper to Restore/Clone a Snapshot + *

+ * The helper assumes that a table is already created, and by calling restore() the content present + * in the snapshot will be restored as the new content of the table. + *

+ * Clone from Snapshot: If the target table is empty, the restore operation is just a "clone + * operation", where the only operations are: + *

+ *

+ * Restore from Snapshot: + *

+ */ +@InterfaceAudience.Private +public final class MapreduceRestoreSnapshotHelper { + + private static final Logger LOG = LoggerFactory.getLogger(MapreduceRestoreSnapshotHelper.class); + private final Map regionsMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); + + private final Map> parentsMap = new HashMap<>(); + + private final ForeignExceptionDispatcher monitor; + private final MonitoredTask status; + + private final SnapshotManifest snapshotManifest; + private final SnapshotDescription snapshotDesc; + private final TableName snapshotTable; + + private final TableDescriptor tableDesc; + private final Path rootDir; + private final Path tableDir; + + private final Configuration conf; + private final FileSystem fs; + private final boolean createBackRefs; + + public MapreduceRestoreSnapshotHelper(final Configuration conf, final FileSystem fs, + final SnapshotManifest manifest, final TableDescriptor tableDescriptor, final Path rootDir, + final ForeignExceptionDispatcher monitor, final MonitoredTask status, + final boolean createBackRefs) { + this.fs = fs; + this.conf = conf; + this.snapshotManifest = manifest; + this.snapshotDesc = manifest.getSnapshotDescription(); + this.snapshotTable = TableName.valueOf(snapshotDesc.getTable()); + this.tableDesc = tableDescriptor; + this.rootDir = rootDir; + this.tableDir = CommonFSUtils.getTableDir(rootDir, tableDesc.getTableName()); + this.monitor = monitor; + this.status = status; + this.createBackRefs = createBackRefs; + } + + /** + * Restore the on-disk table to a specified snapshot state. + * @return the set of regions touched by the restore operation + */ + public MapreduceRestoreSnapshotHelper.RestoreMetaChanges restoreHdfsRegions() throws IOException { + ThreadPoolExecutor exec = SnapshotManifest.createExecutor(conf, "RestoreSnapshot"); + try { + return restoreHdfsRegions(exec); + } finally { + exec.shutdown(); + } + } + + private MapreduceRestoreSnapshotHelper.RestoreMetaChanges restoreHdfsRegions(final ThreadPoolExecutor exec) throws IOException { + LOG.info("starting restore table regions using snapshot={}", snapshotDesc); + + Map regionManifests = snapshotManifest.getRegionManifestsMap(); + if (regionManifests == null) { + LOG.warn("Nothing to restore. Snapshot {} looks empty", snapshotDesc); + return null; + } + + MapreduceRestoreSnapshotHelper.RestoreMetaChanges + metaChanges = new MapreduceRestoreSnapshotHelper.RestoreMetaChanges(tableDesc, parentsMap); + + // Take a copy of the manifest.keySet() since we are going to modify + // this instance, by removing the regions already present in the restore dir. + Set regionNames = new HashSet<>(regionManifests.keySet()); + + List tableRegions = getTableRegions(); + + RegionInfo mobRegion = + MobUtils.getMobRegionInfo(snapshotManifest.getTableDescriptor().getTableName()); + if (tableRegions != null) { + // restore the mob region in case + if (regionNames.contains(mobRegion.getEncodedName())) { + monitor.rethrowException(); + status.setStatus("Restoring mob region..."); + List mobRegions = new ArrayList<>(1); + mobRegions.add(mobRegion); + restoreHdfsMobRegions(exec, regionManifests, mobRegions); + regionNames.remove(mobRegion.getEncodedName()); + status.setStatus("Finished restoring mob region."); + } + } + if (regionNames.contains(mobRegion.getEncodedName())) { + // add the mob region + monitor.rethrowException(); + status.setStatus("Cloning mob region..."); + cloneHdfsMobRegion(regionManifests, mobRegion); + regionNames.remove(mobRegion.getEncodedName()); + status.setStatus("Finished cloning mob region."); + } + + // Identify which region are still available and which not. + // NOTE: we rely upon the region name as: "table name, start key, end key" + if (tableRegions != null) { + monitor.rethrowException(); + for (RegionInfo regionInfo : tableRegions) { + String regionName = regionInfo.getEncodedName(); + if (regionNames.contains(regionName)) { + LOG.info("region to restore: {}", regionName); + regionNames.remove(regionName); + metaChanges.addRegionToRestore( + ProtobufUtil.toRegionInfo(regionManifests.get(regionName).getRegionInfo())); + } else { + LOG.info("region to remove: {}", regionName); + metaChanges.addRegionToRemove(regionInfo); + } + } + } + + // Regions to Add: present in the snapshot but not in the current table + List regionsToAdd = new ArrayList<>(regionNames.size()); + if (!regionNames.isEmpty()) { + monitor.rethrowException(); + for (String regionName : regionNames) { + LOG.info("region to add: {}", regionName); + regionsToAdd + .add(ProtobufUtil.toRegionInfo(regionManifests.get(regionName).getRegionInfo())); + } + } + + // Create new regions cloning from the snapshot + // HBASE-19980: We need to call cloneHdfsRegions() before restoreHdfsRegions() because + // regionsMap is constructed in cloneHdfsRegions() and it can be used in restoreHdfsRegions(). + monitor.rethrowException(); + status.setStatus("Cloning regions..."); + RegionInfo[] clonedRegions = cloneHdfsRegions(exec, regionManifests, regionsToAdd); + metaChanges.setNewRegions(clonedRegions); + status.setStatus("Finished cloning regions."); + + // Restore regions using the snapshot data + monitor.rethrowException(); + status.setStatus("Restoring table regions..."); + restoreHdfsRegions(exec, regionManifests, metaChanges.getRegionsToRestore()); + status.setStatus("Finished restoring all table regions."); + + // Remove regions from the current table + monitor.rethrowException(); + status.setStatus("Starting to delete excess regions from table"); + removeHdfsRegions(exec, metaChanges.getRegionsToRemove()); + status.setStatus("Finished deleting excess regions from table."); + + LOG.info("finishing restore table regions using snapshot={}", snapshotDesc); + + return metaChanges; + } + + /** + * Describe the set of operations needed to update hbase:meta after restore. + */ + private static class RestoreMetaChanges { + private final Map> parentsMap; + private final TableDescriptor htd; + + private List regionsToRestore = null; + private List regionsToRemove = null; + private List regionsToAdd = null; + + public RestoreMetaChanges(TableDescriptor htd, Map> parentsMap) { + this.parentsMap = parentsMap; + this.htd = htd; + } + + /** + * Returns the list of 'restored regions' during the on-disk restore. The caller is responsible + * to add the regions to hbase:meta if not present. + * @return the list of regions restored + */ + public List getRegionsToRestore() { + return this.regionsToRestore; + } + + /** + * Returns the list of regions removed during the on-disk restore. The caller is responsible to + * remove the regions from META. e.g. MetaTableAccessor.deleteRegions(...) + * @return the list of regions to remove from META + */ + public List getRegionsToRemove() { + return this.regionsToRemove; + } + + void setNewRegions(final RegionInfo[] hris) { + if (hris != null) { + regionsToAdd = Arrays.asList(hris); + } else { + regionsToAdd = null; + } + } + + void addRegionToRemove(final RegionInfo hri) { + if (regionsToRemove == null) { + regionsToRemove = new LinkedList<>(); + } + regionsToRemove.add(hri); + } + + void addRegionToRestore(final RegionInfo hri) { + if (regionsToRestore == null) { + regionsToRestore = new LinkedList<>(); + } + regionsToRestore.add(hri); + } + } + + /** + * Remove specified regions from the file-system, using the archiver. + */ + private void removeHdfsRegions(final ThreadPoolExecutor exec, final List regions) + throws IOException { + if (regions == null || regions.isEmpty()) return; + ModifyRegionUtils.editRegions(exec, regions, + (ModifyRegionUtils.RegionEditTask) hri -> MapreduceHFileArchiver.archiveRegion(conf, fs, hri, rootDir, tableDir)); + } + + /** + * Restore specified regions by restoring content to the snapshot state. + */ + private void restoreHdfsRegions(final ThreadPoolExecutor exec, + final Map regionManifests, final List regions) + throws IOException { + if (regions == null || regions.isEmpty()) return; + ModifyRegionUtils.editRegions(exec, regions, + (ModifyRegionUtils.RegionEditTask) hri -> restoreRegion(hri, regionManifests.get(hri.getEncodedName()))); + } + + /** + * Restore specified mob regions by restoring content to the snapshot state. + */ + private void restoreHdfsMobRegions(final ThreadPoolExecutor exec, + final Map regionManifests, final List regions) + throws IOException { + if (regions == null || regions.isEmpty()) return; + ModifyRegionUtils.editRegions(exec, regions, + (ModifyRegionUtils.RegionEditTask) hri -> restoreMobRegion(hri, regionManifests.get(hri.getEncodedName()))); + } + + private Map> + getRegionHFileReferences(final SnapshotProtos.SnapshotRegionManifest manifest) { + Map> familyMap = + new HashMap<>(manifest.getFamilyFilesCount()); + for (SnapshotProtos.SnapshotRegionManifest.FamilyFiles familyFiles : manifest.getFamilyFilesList()) { + familyMap.put(familyFiles.getFamilyName().toStringUtf8(), + new ArrayList<>(familyFiles.getStoreFilesList())); + } + return familyMap; + } + + /** + * Restore region by removing files not in the snapshot and adding the missing ones from the + * snapshot. + */ + private void restoreRegion(final RegionInfo regionInfo, + final SnapshotProtos.SnapshotRegionManifest regionManifest) throws IOException { + restoreRegion(regionInfo, regionManifest, new Path(tableDir, regionInfo.getEncodedName()), + tableDir); + } + + /** + * Restore mob region by removing files not in the snapshot and adding the missing ones from the + * snapshot. + */ + private void restoreMobRegion(final RegionInfo regionInfo, + final SnapshotProtos.SnapshotRegionManifest regionManifest) throws IOException { + if (regionManifest == null) { + return; + } + restoreRegion(regionInfo, regionManifest, + MobUtils.getMobRegionPath(conf, tableDesc.getTableName()), + MobUtils.getMobTableDir(conf, tableDesc.getTableName())); + } + + /** + * Restore region by removing files not in the snapshot and adding the missing ones from the + * snapshot. + */ + private void restoreRegion(final RegionInfo regionInfo, + final SnapshotProtos.SnapshotRegionManifest regionManifest, Path regionDir, Path tableDir) throws IOException { + Map> snapshotFiles = + getRegionHFileReferences(regionManifest); + + String tableName = tableDesc.getTableName().getNameAsString(); + final String snapshotName = snapshotDesc.getName(); + + HRegionFileSystem regionFS = (fs.exists(regionDir)) + ? HRegionFileSystem.openRegionFromFileSystem(conf, fs, tableDir, regionInfo, false) + : HRegionFileSystem.createRegionOnFileSystem(conf, fs, tableDir, regionInfo); + + // Restore families present in the table + for (Path familyDir : FSUtils.getFamilyDirs(fs, regionDir)) { + byte[] family = Bytes.toBytes(familyDir.getName()); + ColumnFamilyDescriptor familyDescriptor = ColumnFamilyDescriptorBuilder.of(family); + StoreFileTracker tracker = StoreFileTrackerFactory.create(conf, true, + StoreContext.getBuilder().withColumnFamilyDescriptor(familyDescriptor) + .withFamilyStoreDirectoryPath(familyDir).withRegionFileSystem(regionFS).build()); + List storeFileInfos = tracker.load(); + List familyFiles = storeFileInfos.stream() + .map(storeFileInfo -> storeFileInfo.getPath().getName()).collect(Collectors.toList()); + List snapshotFamilyFiles = + snapshotFiles.remove(familyDir.getName()); + List filesToTrack = new ArrayList<>(); + if (snapshotFamilyFiles != null) { + List hfilesToAdd = new ArrayList<>(); + for (SnapshotProtos.SnapshotRegionManifest.StoreFile storeFile : snapshotFamilyFiles) { + if (familyFiles.contains(storeFile.getName())) { + // HFile already present + familyFiles.remove(storeFile.getName()); + // no need to restore already present files, but we need to add those to tracker + filesToTrack + .add(tracker.getStoreFileInfo(new Path(familyDir, storeFile.getName()), true)); + } else { + // HFile missing + hfilesToAdd.add(storeFile); + } + } + + // Remove hfiles not present in the snapshot + for (String hfileName : familyFiles) { + for (StoreFileInfo storeFileInfo : storeFileInfos) { + if (hfileName.equals(storeFileInfo.getPath().getName())) { + tracker.removeStoreFiles( + StoreUtils.toHStoreFile(Collections.singletonList(storeFileInfo), null, null)); + LOG.trace("Removing HFile={} not present in snapshot={} from region={} table={}", + hfileName, snapshotName, regionInfo.getEncodedName(), tableName); + } + } + } + + // Restore Missing files + for (SnapshotProtos.SnapshotRegionManifest.StoreFile storeFile : hfilesToAdd) { + LOG.debug("Restoring missing HFileLink {} of snapshot={} to region={} table={}", + storeFile.getName(), snapshotName, regionInfo.getEncodedName(), tableName); + StoreFileInfo storeFileInfo = + restoreStoreFile(familyDir, regionInfo, storeFile, createBackRefs, tracker); + // mark the reference file to be added to tracker + filesToTrack.add(storeFileInfo); + } + } else { + // Family doesn't exist in the snapshot + LOG.trace("Removing family={} in snapshot={} from region={} table={}", + Bytes.toString(family), snapshotName, regionInfo.getEncodedName(), tableName); + LOG.debug("Removing family={} in snapshot={} from region={} table={}", + Bytes.toString(family), snapshotName, regionInfo.getEncodedName(), tableName); + MapreduceHFileArchiver.archiveFamilyByFamilyDir(fs, conf, regionInfo, familyDir, family); + fs.delete(familyDir, true); + } + + // simply reset list of tracked files with the matching files + // and the extra one present in the snapshot + tracker.set(filesToTrack); + } + + // Add families not present in the table + for (Map.Entry> familyEntry : snapshotFiles + .entrySet()) { + Path familyDir = new Path(regionDir, familyEntry.getKey()); + StoreFileTracker tracker = + StoreFileTrackerFactory.create(conf, true, StoreContext.getBuilder() + .withFamilyStoreDirectoryPath(familyDir).withRegionFileSystem(regionFS).build()); + List files = new ArrayList<>(); + if (!fs.mkdirs(familyDir)) { + throw new IOException("Unable to create familyDir=" + familyDir); + } + + for (SnapshotProtos.SnapshotRegionManifest.StoreFile storeFile : familyEntry.getValue()) { + LOG.trace("Adding HFileLink (Not present in the table) {} of snapshot {} to table={}", + storeFile.getName(), snapshotName, tableName); + StoreFileInfo storeFileInfo = + restoreStoreFile(familyDir, regionInfo, storeFile, createBackRefs, tracker); + files.add(storeFileInfo); + } + tracker.set(files); + } + } + + /** + * Clone specified regions. For each region create a new region and create a HFileLink for each + * hfile. + */ + private RegionInfo[] cloneHdfsRegions(final ThreadPoolExecutor exec, + final Map regionManifests, final List regions) + throws IOException { + if (regions == null || regions.isEmpty()) return null; + + final Map snapshotRegions = new HashMap<>(regions.size()); + final String snapshotName = snapshotDesc.getName(); + + // clone region info (change embedded tableName with the new one) + RegionInfo[] clonedRegionsInfo = new RegionInfo[regions.size()]; + for (int i = 0; i < clonedRegionsInfo.length; ++i) { + // clone the region info from the snapshot region info + RegionInfo snapshotRegionInfo = regions.get(i); + clonedRegionsInfo[i] = cloneRegionInfo(snapshotRegionInfo); + + // add the region name mapping between snapshot and cloned + String snapshotRegionName = snapshotRegionInfo.getEncodedName(); + String clonedRegionName = clonedRegionsInfo[i].getEncodedName(); + regionsMap.put(Bytes.toBytes(snapshotRegionName), Bytes.toBytes(clonedRegionName)); + LOG.info("clone region={} as {} in snapshot {}", snapshotRegionName, clonedRegionName, + snapshotName); + + // Add mapping between cloned region name and snapshot region info + snapshotRegions.put(clonedRegionName, snapshotRegionInfo); + } + + // create the regions on disk + ModifyRegionUtils.createRegions(exec, conf, rootDir, tableDesc, clonedRegionsInfo, + (ModifyRegionUtils.RegionFillTask) region -> { + RegionInfo snapshotHri = snapshotRegions.get(region.getRegionInfo().getEncodedName()); + cloneRegion(region, snapshotHri, regionManifests.get(snapshotHri.getEncodedName())); + }); + + return clonedRegionsInfo; + } + + /** + * Clone the mob region. For the region create a new region and create a HFileLink for each hfile. + */ + private void cloneHdfsMobRegion(final Map regionManifests, + final RegionInfo region) throws IOException { + // clone region info (change embedded tableName with the new one) + Path clonedRegionPath = MobUtils.getMobRegionPath(rootDir, tableDesc.getTableName()); + cloneRegion(MobUtils.getMobRegionInfo(tableDesc.getTableName()), clonedRegionPath, region, + regionManifests.get(region.getEncodedName())); + } + + /** + * Clone region directory content from the snapshot info. Each region is encoded with the table + * name, so the cloned region will have a different region name. Instead of copying the hfiles a + * HFileLink is created. + * @param regionDir {@link Path} cloned dir + */ + private void cloneRegion(final RegionInfo newRegionInfo, final Path regionDir, + final RegionInfo snapshotRegionInfo, final SnapshotProtos.SnapshotRegionManifest manifest) throws IOException { + final String tableName = tableDesc.getTableName().getNameAsString(); + final String snapshotName = snapshotDesc.getName(); + for (SnapshotProtos.SnapshotRegionManifest.FamilyFiles familyFiles : manifest.getFamilyFilesList()) { + Path familyDir = new Path(regionDir, familyFiles.getFamilyName().toStringUtf8()); + List clonedFiles = new ArrayList<>(); + HRegionFileSystem regionFS = (fs.exists(regionDir)) + ? HRegionFileSystem.openRegionFromFileSystem(conf, fs, tableDir, newRegionInfo, false) + : HRegionFileSystem.createRegionOnFileSystem(conf, fs, tableDir, newRegionInfo); + + Configuration sftConf = StoreUtils.createStoreConfiguration(conf, tableDesc, + tableDesc.getColumnFamily(familyFiles.getFamilyName().toByteArray())); + StoreFileTracker tracker = + StoreFileTrackerFactory + .create(sftConf, true, + StoreContext.getBuilder() + .withFamilyStoreDirectoryPath( + new Path(regionDir, familyFiles.getFamilyName().toStringUtf8())) + .withRegionFileSystem(regionFS) + .withColumnFamilyDescriptor( + ColumnFamilyDescriptorBuilder.of(familyFiles.getFamilyName().toByteArray())) + .build()); + tracker.load(); + for (SnapshotProtos.SnapshotRegionManifest.StoreFile storeFile : familyFiles.getStoreFilesList()) { + LOG.info("Adding HFileLink {} from cloned region in snapshot {} to table={}", + storeFile.getName(), snapshotName, tableName); + if (MobUtils.isMobRegionInfo(newRegionInfo)) { + String mobFileName = + HFileLink.createHFileLinkName(snapshotRegionInfo, storeFile.getName()); + Path mobPath = new Path(familyDir, mobFileName); + if (fs.exists(mobPath)) { + fs.delete(mobPath, true); + } + StoreFileInfo storeFileInfo = + restoreStoreFile(familyDir, snapshotRegionInfo, storeFile, createBackRefs, tracker); + clonedFiles.add(storeFileInfo); + } else { + StoreFileInfo storeFileInfo = + restoreStoreFile(familyDir, snapshotRegionInfo, storeFile, createBackRefs, tracker); + clonedFiles.add(storeFileInfo); + } + } + tracker.add(clonedFiles); + } + + } + + /** + * Clone region directory content from the snapshot info. Each region is encoded with the table + * name, so the cloned region will have a different region name. Instead of copying the hfiles a + * HFileLink is created. + * @param region {@link HRegion} cloned + */ + private void cloneRegion(final HRegion region, final RegionInfo snapshotRegionInfo, + final SnapshotProtos.SnapshotRegionManifest manifest) throws IOException { + cloneRegion(region.getRegionInfo(), new Path(tableDir, region.getRegionInfo().getEncodedName()), + snapshotRegionInfo, manifest); + } + + /** + * Create a new {@link HFileLink} to reference the store file. + *

+ * The store file in the snapshot can be a simple hfile, an HFileLink or a reference. + *

    + *
  • hfile: abc -> table=region-abc + *
  • reference: abc.1234 -> table=region-abc.1234 + *
  • hfilelink: table=region-hfile -> table=region-hfile + *
+ * @param familyDir destination directory for the store file + * @param regionInfo destination region info for the table + * @param createBackRef - Whether back reference should be created. Defaults to true. + * @param storeFile store file name (can be a Reference, HFileLink or simple HFile) + */ + private StoreFileInfo restoreStoreFile(final Path familyDir, final RegionInfo regionInfo, + final SnapshotProtos.SnapshotRegionManifest.StoreFile storeFile, final boolean createBackRef, + final StoreFileTracker tracker) throws IOException { + String hfileName = storeFile.getName(); + StoreFileInfo info = null; + if (HFileLink.isHFileLink(hfileName) || StoreFileInfo.isMobFileLink(hfileName)) { + HFileLink hfileLink = tracker.createFromHFileLink(hfileName, createBackRef); + info = new StoreFileInfo(conf, fs, new Path(familyDir, hfileName), hfileLink); + return info; + } else if (StoreFileInfo.isReference(hfileName)) { + return restoreReferenceFile(familyDir, regionInfo, storeFile, tracker); + } else { + HFileLink hfileLink = tracker.createAndCommitHFileLink(regionInfo.getTable(), + regionInfo.getEncodedName(), hfileName, createBackRef); + return new StoreFileInfo(conf, fs, new Path(familyDir, HFileLink + .createHFileLinkName(regionInfo.getTable(), regionInfo.getEncodedName(), hfileName)), + hfileLink); + } + } + + /** + * Create a new {@link Reference} as copy of the source one. + *

+ *

+ * + *
+   * The source table looks like:
+   *    1234/abc      (original file)
+   *    5678/abc.1234 (reference file)
+   *
+   * After the clone operation looks like:
+   *   wxyz/table=1234-abc
+   *   stuv/table=1234-abc.wxyz
+   *
+   * NOTE that the region name in the clone changes (md5 of regioninfo)
+   * and the reference should reflect that change.
+   * 
+ * + *
+ * @param familyDir destination directory for the store file + * @param regionInfo destination region info for the table + * @param storeFile reference file name + */ + private StoreFileInfo restoreReferenceFile(final Path familyDir, final RegionInfo regionInfo, + final SnapshotProtos.SnapshotRegionManifest.StoreFile storeFile, final StoreFileTracker tracker) + throws IOException { + String hfileName = storeFile.getName(); + StoreFileInfo storeFileInfo = null; + + // Extract the referred information (hfile name and parent region) + Path refPath = + StoreFileInfo + .getReferredToFile( + new Path( + new Path( + new Path(new Path(snapshotTable.getNamespaceAsString(), + snapshotTable.getQualifierAsString()), regionInfo.getEncodedName()), + familyDir.getName()), + hfileName)); + String snapshotRegionName = refPath.getParent().getParent().getName(); + String fileName = refPath.getName(); + + // The new reference should have the cloned region name as parent, if it is a clone. + String clonedRegionName = Bytes.toString(regionsMap.get(Bytes.toBytes(snapshotRegionName))); + if (clonedRegionName == null) clonedRegionName = snapshotRegionName; + + // The output file should be a reference link table=snapshotRegion-fileName.clonedRegionName + Path linkPath = null; + String refLink = fileName; + if (!HFileLink.isHFileLink(fileName)) { + refLink = HFileLink.createHFileLinkName(snapshotTable, snapshotRegionName, fileName); + linkPath = new Path(familyDir, + HFileLink.createHFileLinkName(snapshotTable, regionInfo.getEncodedName(), hfileName)); + } + + Path outPath = new Path(familyDir, refLink + '.' + clonedRegionName); + + // Create the new reference + if (storeFile.hasReference()) { + Reference reference = Reference.convert(storeFile.getReference()); + tracker.createAndCommitReference(reference, outPath); + storeFileInfo = new StoreFileInfo(conf, fs, outPath, reference); + } else { + InputStream in; + if (linkPath != null) { + HFileLink hfileLink = HFileLink.buildFromHFileLinkPattern(conf, linkPath); + storeFileInfo = new StoreFileInfo(conf, fs, outPath, hfileLink); + tracker.add(Collections.singletonList(storeFileInfo)); + in = hfileLink.open(fs); + } else { + linkPath = new Path(new Path( + HRegion.getRegionDir(snapshotManifest.getSnapshotDir(), regionInfo.getEncodedName()), + familyDir.getName()), hfileName); + in = fs.open(linkPath); + } + OutputStream out = fs.create(outPath); + IOUtils.copyBytes(in, out, conf); + } + + // Add the daughter region to the map + String regionName = Bytes.toString(regionsMap.get(regionInfo.getEncodedNameAsBytes())); + if (regionName == null) { + regionName = regionInfo.getEncodedName(); + } + LOG.debug("Restore reference {} to {}", regionName, clonedRegionName); + synchronized (parentsMap) { + Pair daughters = parentsMap.get(clonedRegionName); + if (daughters == null) { + // In case one side of the split is already compacted, regionName is put as both first and + // second of Pair + daughters = new Pair<>(regionName, regionName); + parentsMap.put(clonedRegionName, daughters); + } else if (!regionName.equals(daughters.getFirst())) { + daughters.setSecond(regionName); + } + } + return storeFileInfo; + } + + /** + * Create a new {@link RegionInfo} from the snapshot region info. Keep the same startKey, endKey, + * regionId and split information but change the table name. + * @param snapshotRegionInfo Info for region to clone. + * @return the new HRegion instance + */ + public RegionInfo cloneRegionInfo(final RegionInfo snapshotRegionInfo) { + return cloneRegionInfo(tableDesc.getTableName(), snapshotRegionInfo); + } + + public static RegionInfo cloneRegionInfo(TableName tableName, RegionInfo snapshotRegionInfo) { + return RegionInfoBuilder.newBuilder(tableName).setStartKey(snapshotRegionInfo.getStartKey()) + .setEndKey(snapshotRegionInfo.getEndKey()).setSplit(snapshotRegionInfo.isSplit()) + .setRegionId(snapshotRegionInfo.getRegionId()).setOffline(snapshotRegionInfo.isOffline()) + .build(); + } + + /** Returns the set of the regions contained in the table */ + private List getTableRegions() throws IOException { + LOG.debug("get table regions: {}", tableDir); + FileStatus[] regionDirs = + CommonFSUtils.listStatus(fs, tableDir, new FSUtils.RegionDirFilter(fs)); + if (regionDirs == null) { + return null; + } + + List regions = new ArrayList<>(regionDirs.length); + for (FileStatus regionDir : regionDirs) { + RegionInfo hri = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir.getPath()); + regions.add(hri); + } + LOG.debug("found {} regions for table={}", regions.size(), + tableDesc.getTableName().getNameAsString()); + return regions; + } + + /** + * Copy the snapshot files for a snapshot scanner, discards meta changes. + */ + public static MapreduceRestoreSnapshotHelper.RestoreMetaChanges copySnapshotForScanner(Configuration conf, FileSystem fs, + Path rootDir, Path restoreDir, String snapshotName) throws IOException { + // ensure that restore dir is not under root dir + if (!restoreDir.getFileSystem(conf).getUri().equals(rootDir.getFileSystem(conf).getUri())) { + throw new IllegalArgumentException( + "Filesystems for restore directory and HBase root " + "directory should be the same"); + } + if (restoreDir.toUri().getPath().startsWith(rootDir.toUri().getPath() + "/")) { + throw new IllegalArgumentException("Restore directory cannot be a sub directory of HBase " + + "root directory. RootDir: " + rootDir + ", restoreDir: " + restoreDir); + } + String restorePath = restoreDir.toUri().getPath(); + String rootPath = rootDir.toUri().getPath(); + if (restorePath.equals(rootPath) || restorePath.startsWith(rootPath + "/")) { + String message = "BLOCKED: MapReduce restore directory cannot be the HBase root directory " + + "or a sub directory of it. This could lead to accidental archival and permanent " + + "data loss if the path falls under " + rootDir + "/data/. Use a temporary directory " + + "outside of hbase.rootdir for MR snapshot scanning. RootDir: " + rootDir + + ", restoreDir: " + restoreDir; + LOG.error(message); + throw new IllegalArgumentException(message); + } + + Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir); + SnapshotDescription snapshotDesc = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir); + // check if the snapshot is expired. + boolean isExpired = SnapshotDescriptionUtils.isExpiredSnapshot(snapshotDesc.getTtl(), + snapshotDesc.getCreationTime(), EnvironmentEdgeManager.currentTime()); + if (isExpired) { + throw new SnapshotTTLExpiredException(ProtobufUtil.createSnapshotDesc(snapshotDesc)); + } + SnapshotManifest manifest = SnapshotManifest.open(conf, fs, snapshotDir, snapshotDesc); + + MonitoredTask status = TaskMonitor.get() + .createStatus("Restoring snapshot '" + snapshotName + "' to directory " + restoreDir); + ForeignExceptionDispatcher monitor = new ForeignExceptionDispatcher(); + + // we send createBackRefs=false so that restored hfiles do not create back reference links + // in the base hbase root dir. + MapreduceRestoreSnapshotHelper helper = new MapreduceRestoreSnapshotHelper(conf, fs, manifest, + manifest.getTableDescriptor(), restoreDir, monitor, status, false); + MapreduceRestoreSnapshotHelper.RestoreMetaChanges metaChanges = helper.restoreHdfsRegions(); // TODO: parallelize. + + if (LOG.isDebugEnabled()) { + LOG.debug("Restored table dir:{}", restoreDir); + CommonFSUtils.logFileSystemState(fs, restoreDir, LOG); + } + return metaChanges; + } +} diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormatImpl.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormatImpl.java index 42db1db5f87f..2d22031d851f 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormatImpl.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableSnapshotInputFormatImpl.java @@ -28,7 +28,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper; import org.apache.hadoop.hbase.snapshot.SnapshotManifest; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.ConfigurationUtil; @@ -226,7 +225,8 @@ public void restoreSnapshots(Configuration conf, Map snapshotToDir void restoreSnapshot(Configuration conf, String snapshotName, Path rootDir, Path restoreDir, FileSystem fs) throws IOException { - RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName); + MapreduceRestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, + snapshotName); } } diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java index 633a9b25cdd2..e70286700011 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java @@ -44,7 +44,6 @@ import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.regionserver.HRegion; -import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper; import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; import org.apache.hadoop.hbase.snapshot.SnapshotManifest; import org.apache.hadoop.hbase.util.Bytes; @@ -616,7 +615,8 @@ public static void setInput(Configuration conf, String snapshotName, Path restor restoreDir = new Path(restoreDir, UUID.randomUUID().toString()); - RestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName); + MapreduceRestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, + snapshotName); conf.set(RESTORE_DIR_KEY, restoreDir.toString()); } diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java index 36422b6e9f4a..3b0bc1a6fe7e 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.PrefixFilter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.MapreduceRestoreSnapshotHelper; import org.apache.hadoop.hbase.mapreduce.TableInputFormat; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; @@ -58,7 +59,6 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerStorage; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; -import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.Pair; @@ -435,8 +435,10 @@ private void restoreSnapshotForPeerCluster(Configuration conf, String peerQuorum FileSystem.setDefaultUri(peerConf, peerFSAddress); CommonFSUtils.setRootDir(peerConf, new Path(peerFSAddress, peerHBaseRootAddress)); FileSystem fs = FileSystem.get(peerConf); - RestoreSnapshotHelper.copySnapshotForScanner(peerConf, fs, CommonFSUtils.getRootDir(peerConf), - new Path(peerFSAddress, peerSnapshotTmpDir), peerSnapshotName); + Path peerRootDir = CommonFSUtils.getRootDir(peerConf); + Path peerRestoreDir = new Path(peerFSAddress, peerSnapshotTmpDir); + MapreduceRestoreSnapshotHelper.copySnapshotForScanner(peerConf, fs, peerRootDir, peerRestoreDir, + peerSnapshotName); } /** diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/util/MapreduceHFileArchiver.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/util/MapreduceHFileArchiver.java new file mode 100644 index 000000000000..9b819f04f532 --- /dev/null +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/util/MapreduceHFileArchiver.java @@ -0,0 +1,570 @@ +package org.apache.hadoop.hbase.util; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hbase.backup.FailedArchiveException; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; + +public class MapreduceHFileArchiver { + + private static final Logger LOG = LoggerFactory.getLogger(MapreduceHFileArchiver.class); + private static final String SEPARATOR = "."; + + /** Number of retries in case of fs operation failure */ + private static final int DEFAULT_RETRIES_NUMBER = 3; + + private static final Function FUNC_FILE_TO_PATH = new Function() { + @Override + public Path apply(MapreduceHFileArchiver.File file) { + return file == null ? null : file.getPath(); + } + }; + + private static ThreadPoolExecutor archiveExecutor; + + /** Returns True if the Region exits in the filesystem. */ + public static boolean exists(Configuration conf, FileSystem fs, RegionInfo info) + throws IOException { + Path rootDir = CommonFSUtils.getRootDir(conf); + Path regionDir = FSUtils.getRegionDirFromRootDir(rootDir, info); + return fs.exists(regionDir); + } + + /** + * Cleans up all the files for a HRegion by archiving the HFiles to the archive directory + * @param conf the configuration to use + * @param fs the file system object + * @param info RegionInfo for region to be deleted + * @param rootDir {@link Path} to the root directory where hbase files are stored (for building + * the archive path) + * @param tableDir {@link Path} to where the table is being stored (for building the archive path) + */ + public static void archiveRegion(Configuration conf, FileSystem fs, RegionInfo info, Path rootDir, + Path tableDir) throws IOException { + archiveRegion(conf, fs, rootDir, tableDir, FSUtils.getRegionDirFromRootDir(rootDir, info)); + } + + /** + * Remove an entire region from the table directory via archiving the region's hfiles. + * @param fs {@link FileSystem} from which to remove the region + * @param rootdir {@link Path} to the root directory where hbase files are stored (for building + * the archive path) + * @param tableDir {@link Path} to where the table is being stored (for building the archive + * path) + * @param regionDir {@link Path} to where a region is being stored (for building the archive path) + * @return true if the region was successfully deleted. false if the filesystem + * operations could not complete. + * @throws IOException if the request cannot be completed + */ + public static boolean archiveRegion(Configuration conf, FileSystem fs, Path rootdir, + Path tableDir, Path regionDir) throws IOException { + // otherwise, we archive the files + // make sure we can archive + if (tableDir == null || regionDir == null) { + LOG.error("No archive directory could be found because tabledir (" + tableDir + + ") or regiondir (" + regionDir + "was null. Deleting files instead."); + if (regionDir != null) { + deleteRegionWithoutArchiving(fs, regionDir); + } + // we should have archived, but failed to. Doesn't matter if we deleted + // the archived files correctly or not. + return false; + } + + LOG.debug("ARCHIVING {}", regionDir); + + // make sure the regiondir lives under the tabledir + Preconditions.checkArgument(regionDir.toString().startsWith(tableDir.toString())); + Path regionArchiveDir = HFileArchiveUtil.getRegionArchiveDir(rootdir, + CommonFSUtils.getTableName(tableDir), regionDir.getName()); + + MapreduceHFileArchiver.FileStatusConverter getAsFile = new MapreduceHFileArchiver.FileStatusConverter(fs); + // otherwise, we attempt to archive the store files + + // build collection of just the store directories to archive + Collection toArchive = new ArrayList<>(); + final PathFilter dirFilter = new FSUtils.DirFilter(fs); + PathFilter nonHidden = new PathFilter() { + @Override + public boolean accept(Path file) { + return dirFilter.accept(file) && !file.getName().startsWith("."); + } + }; + FileStatus[] storeDirs = CommonFSUtils.listStatus(fs, regionDir, nonHidden); + // if there no files, we can just delete the directory and return; + if (storeDirs == null) { + LOG.debug("Directory {} empty.", regionDir); + return deleteRegionWithoutArchiving(fs, regionDir); + } + + // convert the files in the region to a File + Stream.of(storeDirs).map(getAsFile).forEachOrdered(toArchive::add); + LOG.debug("Archiving " + toArchive); + List failedArchive = resolveAndArchive(conf, fs, regionArchiveDir, toArchive, + EnvironmentEdgeManager.currentTime()); + if (!failedArchive.isEmpty()) { + throw new FailedArchiveException( + "Failed to archive/delete all the files for region:" + regionDir.getName() + " into " + + regionArchiveDir + ". Something is probably awry on the filesystem.", + failedArchive.stream().map(FUNC_FILE_TO_PATH).collect(Collectors.toList())); + } + // if that was successful, then we delete the region + return deleteRegionWithoutArchiving(fs, regionDir); + } + + // We need this method instead of Threads.getNamedThreadFactory() to pass some tests. + // The difference from Threads.getNamedThreadFactory() is that it doesn't fix ThreadGroup for + // new threads. If we use Threads.getNamedThreadFactory(), we will face ThreadGroup related + // issues in some tests. + private static ThreadFactory getThreadFactory(String archiverName) { + return new ThreadFactory() { + final AtomicInteger threadNumber = new AtomicInteger(1); + + @Override + public Thread newThread(Runnable r) { + final String name = archiverName + "-" + threadNumber.getAndIncrement(); + Thread t = new Thread(r, name); + t.setDaemon(true); + return t; + } + }; + } + + /** + * Removes from the specified region the store files of the specified column family, either by + * archiving them or outright deletion + * @param fs the filesystem where the store files live + * @param conf {@link Configuration} to examine to determine the archive directory + * @param parent Parent region hosting the store files + * @param familyDir {@link Path} to where the family is being stored + * @param family the family hosting the store files + * @throws IOException if the files could not be correctly disposed. + */ + public static void archiveFamilyByFamilyDir(FileSystem fs, Configuration conf, RegionInfo parent, + Path familyDir, byte[] family) throws IOException { + FileStatus[] storeFiles = CommonFSUtils.listStatus(fs, familyDir); + if (storeFiles == null) { + LOG.debug("No files to dispose of in {}, family={}", parent.getRegionNameAsString(), + Bytes.toString(family)); + return; + } + + MapreduceHFileArchiver.FileStatusConverter getAsFile = new MapreduceHFileArchiver.FileStatusConverter(fs); + Collection toArchive = Stream.of(storeFiles).map(getAsFile).collect(Collectors.toList()); + Path storeArchiveDir = HFileArchiveUtil.getStoreArchivePath(conf, parent, family); + + // do the actual archive + List failedArchive = + resolveAndArchive(conf, fs, storeArchiveDir, toArchive, EnvironmentEdgeManager.currentTime()); + if (!failedArchive.isEmpty()) { + throw new FailedArchiveException( + "Failed to archive/delete all the files for region:" + + Bytes.toString(parent.getRegionName()) + ", family:" + Bytes.toString(family) + " into " + + storeArchiveDir + ". Something is probably awry on the filesystem.", + failedArchive.stream().map(FUNC_FILE_TO_PATH).collect(Collectors.toList())); + } + } + + /** + * Resolve any conflict with an existing archive file via timestamp-append renaming of the + * existing file and then archive the passed in files. + * @param fs {@link FileSystem} on which to archive the files + * @param baseArchiveDir base archive directory to store the files. If any of the files to archive + * are directories, will append the name of the directory to the base + * archive directory name, creating a parallel structure. + * @param toArchive files/directories that need to be archvied + * @param start time the archiving started - used for resolving archive conflicts. + * @return the list of failed to archive files. + * @throws IOException if an unexpected file operation exception occurred + */ + private static List resolveAndArchive(Configuration conf, FileSystem fs, + Path baseArchiveDir, Collection toArchive, long start) throws IOException { + // Early exit if no files to archive + if (toArchive.isEmpty()) { + LOG.trace("No files to archive, returning an empty list."); + return Collections.emptyList(); + } + + LOG.trace("Preparing to archive files into directory: {}", baseArchiveDir); + + // Ensure the archive directory exists + ensureArchiveDirectoryExists(fs, baseArchiveDir); + + // Thread-safe collection for storing failures + Queue failures = new ConcurrentLinkedQueue<>(); + String startTime = Long.toString(start); + + // Separate files and directories for processing + List filesOnly = new ArrayList<>(); + for (MapreduceHFileArchiver.File file : toArchive) { + if (file.isFile()) { + filesOnly.add(file); + } else { + handleDirectory(conf, fs, baseArchiveDir, failures, file, start); + } + } + + // Archive files concurrently + archiveFilesConcurrently(conf, baseArchiveDir, filesOnly, failures, startTime); + + return new ArrayList<>(failures); // Convert to a List for the return value + } + + private static void ensureArchiveDirectoryExists(FileSystem fs, Path baseArchiveDir) + throws IOException { + if (!fs.exists(baseArchiveDir) && !fs.mkdirs(baseArchiveDir)) { + throw new IOException("Failed to create the archive directory: " + baseArchiveDir); + } + LOG.trace("Archive directory ready: {}", baseArchiveDir); + } + + private static void handleDirectory(Configuration conf, FileSystem fs, Path baseArchiveDir, + Queue failures, MapreduceHFileArchiver.File directory, long start) { + LOG.trace("Processing directory: {}, archiving its children.", directory); + Path subArchiveDir = new Path(baseArchiveDir, directory.getName()); + + try { + Collection children = directory.getChildren(); + failures.addAll(resolveAndArchive(conf, fs, subArchiveDir, children, start)); + } catch (IOException e) { + LOG.warn("Failed to archive directory: {}", directory, e); + failures.add(directory); + } + } + + private static void archiveFilesConcurrently(Configuration conf, Path baseArchiveDir, + List files, Queue failures, String startTime) { + LOG.trace("Archiving {} files concurrently into directory: {}", files.size(), baseArchiveDir); + Map> futureMap = new HashMap<>(); + // Submit file archiving tasks + // default is 16 which comes equal hbase.hstore.blockingStoreFiles default value + int maxThreads = conf.getInt("hbase.hfilearchiver.per.region.thread.pool.max", 16); + ThreadPoolExecutor hfilesArchiveExecutor = Threads.getBoundedCachedThreadPool(maxThreads, 30L, + TimeUnit.SECONDS, getThreadFactory("HFileArchiverPerRegion-")); + try { + for (MapreduceHFileArchiver.File file : files) { + Future future = hfilesArchiveExecutor + .submit(() -> resolveAndArchiveFile(baseArchiveDir, file, startTime)); + futureMap.put(file, future); + } + + // Process results of each task + for (Map.Entry> entry : futureMap.entrySet()) { + MapreduceHFileArchiver.File file = entry.getKey(); + try { + if (!entry.getValue().get()) { + LOG.warn("Failed to archive file: {} into directory: {}", file, baseArchiveDir); + failures.add(file); + } + } catch (InterruptedException e) { + LOG.error("Archiving interrupted for file: {}", file, e); + Thread.currentThread().interrupt(); // Restore interrupt status + failures.add(file); + } catch (ExecutionException e) { + LOG.error("Archiving failed for file: {}", file, e); + failures.add(file); + } + } + } finally { + hfilesArchiveExecutor.shutdown(); + } + } + + /** + * Attempt to archive the passed in file to the archive directory. + *

+ * If the same file already exists in the archive, it is moved to a timestamped directory under + * the archive directory and the new file is put in its place. + * @param archiveDir {@link Path} to the directory that stores the archives of the hfiles + * @param currentFile {@link Path} to the original HFile that will be archived + * @param archiveStartTime time the archiving started, to resolve naming conflicts + * @return true if the file is successfully archived. false if there was a + * problem, but the operation still completed. + * @throws IOException on failure to complete {@link FileSystem} operations. + */ + private static boolean resolveAndArchiveFile(Path archiveDir, MapreduceHFileArchiver.File currentFile, + String archiveStartTime) throws IOException { + // build path as it should be in the archive + String filename = currentFile.getName(); + Path archiveFile = new Path(archiveDir, filename); + FileSystem fs = currentFile.getFileSystem(); + + // An existing destination file in the archive is unexpected, but we handle it here. + if (fs.exists(archiveFile)) { + if (!fs.exists(currentFile.getPath())) { + // If the file already exists in the archive, and there is no current file to archive, then + // assume that the file in archive is correct. This is an unexpected situation, suggesting a + // race condition or split brain. + // In HBASE-26718 this was found when compaction incorrectly happened during warmupRegion. + LOG.warn("{} exists in archive. Attempted to archive nonexistent file {}.", archiveFile, + currentFile); + // We return success to match existing behavior in this method, where FileNotFoundException + // in moveAndClose is ignored. + return true; + } + // There is a conflict between the current file and the already existing archived file. + // Move the archived file to a timestamped backup. This is a really, really unlikely + // situation, where we get the same name for the existing file, but is included just for that + // 1 in trillion chance. We are potentially incurring data loss in the archive directory if + // the files are not identical. The timestamped backup will be cleaned by HFileCleaner as it + // has no references. + FileStatus curStatus = fs.getFileStatus(currentFile.getPath()); + FileStatus archiveStatus = fs.getFileStatus(archiveFile); + long curLen = curStatus.getLen(); + long archiveLen = archiveStatus.getLen(); + long curMtime = curStatus.getModificationTime(); + long archiveMtime = archiveStatus.getModificationTime(); + if (curLen != archiveLen) { + LOG.error( + "{} already exists in archive with different size than current {}." + + " archiveLen: {} currentLen: {} archiveMtime: {} currentMtime: {}", + archiveFile, currentFile, archiveLen, curLen, archiveMtime, curMtime); + throw new IOException( + archiveFile + " already exists in archive with different size" + " than " + currentFile); + } + + LOG.error( + "{} already exists in archive, moving to timestamped backup and overwriting" + + " current {}. archiveLen: {} currentLen: {} archiveMtime: {} currentMtime: {}", + archiveFile, currentFile, archiveLen, curLen, archiveMtime, curMtime); + + // move the archive file to the stamped backup + Path backedupArchiveFile = new Path(archiveDir, filename + SEPARATOR + archiveStartTime); + if (!fs.rename(archiveFile, backedupArchiveFile)) { + LOG.error("Could not rename archive file to backup: " + backedupArchiveFile + + ", deleting existing file in favor of newer."); + // try to delete the existing file, if we can't rename it + if (!fs.delete(archiveFile, false)) { + throw new IOException("Couldn't delete existing archive file (" + archiveFile + + ") or rename it to the backup file (" + backedupArchiveFile + + ") to make room for similarly named file."); + } + } else { + LOG.info("Backed up archive file from {} to {}.", archiveFile, backedupArchiveFile); + } + } + + LOG.trace("No existing file in archive for {}, free to archive original file.", archiveFile); + + // at this point, we should have a free spot for the archive file + boolean success = false; + for (int i = 0; !success && i < DEFAULT_RETRIES_NUMBER; ++i) { + if (i > 0) { + // Ensure that the archive directory exists. + // The previous "move to archive" operation has failed probably because + // the cleaner has removed our archive directory (HBASE-7643). + // (we're in a retry loop, so don't worry too much about the exception) + try { + if (!fs.exists(archiveDir)) { + if (fs.mkdirs(archiveDir)) { + LOG.debug("Created archive directory {}", archiveDir); + } + } + } catch (IOException e) { + LOG.warn("Failed to create directory {}", archiveDir, e); + } + } + + try { + success = currentFile.moveAndClose(archiveFile); + } catch (FileNotFoundException fnfe) { + LOG.warn("Failed to archive " + currentFile + + " because it does not exist! Skipping and continuing on.", fnfe); + success = true; + } catch (IOException e) { + success = false; + // When HFiles are placed on a filesystem other than HDFS a rename operation can be a + // non-atomic file copy operation. It can take a long time to copy a large hfile and if + // interrupted there may be a partially copied file present at the destination. We must + // remove the partially copied file, if any, or otherwise the archive operation will fail + // indefinitely from this point. + LOG.warn("Failed to archive " + currentFile + " on try #" + i, e); + try { + fs.delete(archiveFile, false); + } catch (FileNotFoundException fnfe) { + // This case is fine. + } catch (IOException ee) { + // Complain about other IO exceptions + LOG.warn("Failed to clean up from failure to archive " + currentFile + " on try #" + i, + ee); + } + } + } + + if (!success) { + LOG.error("Failed to archive " + currentFile); + return false; + } + + LOG.debug("Archived from {} to {}", currentFile, archiveFile); + return true; + } + + /** + * Without regard for backup, delete a region. Should be used with caution. + * @param regionDir {@link Path} to the region to be deleted. + * @param fs FileSystem from which to delete the region + * @return true on successful deletion, false otherwise + * @throws IOException on filesystem operation failure + */ + private static boolean deleteRegionWithoutArchiving(FileSystem fs, Path regionDir) + throws IOException { + if (fs.delete(regionDir, true)) { + LOG.debug("Deleted {}", regionDir); + return true; + } + LOG.debug("Failed to delete directory {}", regionDir); + return false; + } + + + /** + * Adapt a type to match the {@link MapreduceHFileArchiver.File} interface, which is used internally for handling + * archival/removal of files + * @param type to adapt to the {@link MapreduceHFileArchiver.File} interface + */ + private static abstract class FileConverter implements Function { + protected final FileSystem fs; + + public FileConverter(FileSystem fs) { + this.fs = fs; + } + } + + /** + * Convert a FileStatus to something we can manage in the archiving + */ + private static class FileStatusConverter extends MapreduceHFileArchiver.FileConverter { + public FileStatusConverter(FileSystem fs) { + super(fs); + } + + @Override + public MapreduceHFileArchiver.File apply(FileStatus input) { + return new MapreduceHFileArchiver.FileablePath(fs, input.getPath()); + } + } + + /** + * Wrapper to handle file operations uniformly + */ + private static abstract class File { + protected final FileSystem fs; + + public File(FileSystem fs) { + this.fs = fs; + } + + /** + * Check to see if this is a file or a directory + * @return true if it is a file, false otherwise + * @throws IOException on {@link FileSystem} connection error + */ + abstract boolean isFile() throws IOException; + + /** + * @return if this is a directory, returns all the children in the directory, otherwise returns + * an empty list + */ + abstract Collection getChildren() throws IOException; + + /** + * close any outside readers of the file + */ + abstract void close() throws IOException; + + /** Returns the name of the file (not the full fs path, just the individual file name) */ + abstract String getName(); + + /** Returns the path to this file */ + abstract Path getPath(); + + /** + * Move the file to the given destination + * @return true on success + */ + public boolean moveAndClose(Path dest) throws IOException { + this.close(); + Path p = this.getPath(); + return CommonFSUtils.renameAndSetModifyTime(fs, p, dest); + } + + /** Returns the {@link FileSystem} on which this file resides */ + public FileSystem getFileSystem() { + return this.fs; + } + + @Override + public String toString() { + return this.getClass().getSimpleName() + ", " + getPath().toString(); + } + } + + /** + * A {@link MapreduceHFileArchiver.File} that wraps a simple {@link Path} on a {@link FileSystem}. + */ + private static class FileablePath extends MapreduceHFileArchiver.File { + private final Path file; + private final MapreduceHFileArchiver.FileStatusConverter getAsFile; + + public FileablePath(FileSystem fs, Path file) { + super(fs); + this.file = file; + this.getAsFile = new MapreduceHFileArchiver.FileStatusConverter(fs); + } + + @Override + public String getName() { + return file.getName(); + } + + @Override + public Collection getChildren() throws IOException { + if (fs.isFile(file)) { + return Collections.emptyList(); + } + return Stream.of(fs.listStatus(file)).map(getAsFile).collect(Collectors.toList()); + } + + @Override + public boolean isFile() throws IOException { + return fs.isFile(file); + } + + @Override + public void close() throws IOException { + // NOOP - files are implicitly closed on removal + } + + @Override + Path getPath() { + return file; + } + } +} diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMapreduceRestoreSnapshotHelper.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMapreduceRestoreSnapshotHelper.java new file mode 100644 index 000000000000..0935ecc7780f --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMapreduceRestoreSnapshotHelper.java @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.SnapshotType; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; +import org.apache.hadoop.hbase.io.HFileLink; +import org.apache.hadoop.hbase.master.assignment.AssignmentManager; +import org.apache.hadoop.hbase.master.assignment.MergeTableRegionsProcedure; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.mob.MobUtils; +import org.apache.hadoop.hbase.monitoring.MonitoredTask; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.StoreFileInfo; +import org.apache.hadoop.hbase.snapshot.SnapshotCreationException; +import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; +import org.apache.hadoop.hbase.snapshot.SnapshotManifest; +import org.apache.hadoop.hbase.snapshot.SnapshotTTLExpiredException; +import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; +import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils.SnapshotMock; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.FSTableDescriptors; +import org.apache.hadoop.hbase.wal.WALSplitUtil; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; + +/** + * Test the restore/clone operation from a file-system point of view. + */ +@Tag(RegionServerTests.TAG) +@Tag(MediumTests.TAG) +public class TestMapreduceRestoreSnapshotHelper { + + private static final Logger LOG = LoggerFactory.getLogger(TestMapreduceRestoreSnapshotHelper.class); + + protected final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); + protected final static String TEST_HFILE = "abc"; + + protected Configuration conf; + protected Path archiveDir; + protected FileSystem fs; + protected Path rootDir; + + protected void setupConf(Configuration conf) { + } + + @BeforeAll + public static void setupCluster() throws Exception { + TEST_UTIL.getConfiguration().setInt(AssignmentManager.ASSIGN_MAX_ATTEMPTS, 3); + TEST_UTIL.startMiniCluster(); + } + + @AfterAll + public static void tearDownCluster() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @BeforeEach + public void setup() throws Exception { + rootDir = TEST_UTIL.getDataTestDir("testRestore"); + archiveDir = new Path(rootDir, HConstants.HFILE_ARCHIVE_DIRECTORY); + fs = TEST_UTIL.getTestFileSystem(); + conf = TEST_UTIL.getConfiguration(); + setupConf(conf); + CommonFSUtils.setRootDir(conf, rootDir); + // Turn off balancer so it doesn't cut in and mess up our placements. + TEST_UTIL.getAdmin().balancerSwitch(false, true); + } + + @AfterEach + public void tearDown() throws Exception { + fs.delete(TEST_UTIL.getDataTestDir(), true); + } + + protected SnapshotMock createSnapshotMock() throws IOException { + return new SnapshotMock(TEST_UTIL.getConfiguration(), fs, rootDir); + } + + @Test + public void testNoHFileLinkInRootDir() throws IOException { + rootDir = TEST_UTIL.getDefaultRootDirPath(); + CommonFSUtils.setRootDir(conf, rootDir); + fs = rootDir.getFileSystem(conf); + + TableName tableName = TableName.valueOf("testNoHFileLinkInRootDir"); + String snapshotName = tableName.getNameAsString() + "-snapshot"; + createTableAndSnapshot(tableName, snapshotName); + + Path restoreDir = new Path("/hbase/.tmp-restore"); + MapreduceRestoreSnapshotHelper.copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName); + checkNoHFileLinkInTableDir(tableName); + } + + @Test + public void testCopyExpiredSnapshotForScanner() throws IOException, InterruptedException { + rootDir = TEST_UTIL.getDefaultRootDirPath(); + CommonFSUtils.setRootDir(conf, rootDir); + TableName tableName = TableName.valueOf("testCopyExpiredSnapshotForScanner"); + String snapshotName = tableName.getNameAsString() + "-snapshot"; + Path restoreDir = new Path("/hbase/.tmp-expired-snapshot/copySnapshotDest"); + // create table and put some data into the table + byte[] columnFamily = Bytes.toBytes("A"); + Table table = TEST_UTIL.createTable(tableName, columnFamily); + TEST_UTIL.loadTable(table, columnFamily); + // create snapshot with ttl = 10 sec + Map properties = new HashMap<>(); + properties.put("TTL", 10); + org.apache.hadoop.hbase.client.SnapshotDescription snapshotDesc = + new org.apache.hadoop.hbase.client.SnapshotDescription(snapshotName, tableName, + SnapshotType.FLUSH, null, EnvironmentEdgeManager.currentTime(), -1, properties); + TEST_UTIL.getAdmin().snapshot(snapshotDesc); + boolean isExist = TEST_UTIL.getAdmin().listSnapshots().stream() + .anyMatch(ele -> snapshotName.equals(ele.getName())); + assertTrue(isExist); + int retry = 6; + while ( + !SnapshotDescriptionUtils.isExpiredSnapshot(snapshotDesc.getTtl(), + snapshotDesc.getCreationTime(), EnvironmentEdgeManager.currentTime()) && retry > 0 + ) { + retry--; + Thread.sleep(10 * 1000); + } + boolean isExpiredSnapshot = SnapshotDescriptionUtils.isExpiredSnapshot(snapshotDesc.getTtl(), + snapshotDesc.getCreationTime(), EnvironmentEdgeManager.currentTime()); + assertTrue(isExpiredSnapshot); + assertThrows(SnapshotTTLExpiredException.class, () -> MapreduceRestoreSnapshotHelper + .copySnapshotForScanner(conf, fs, rootDir, restoreDir, snapshotName)); + } + + private void createAndAssertSnapshot(TableName tableName, String snapshotName) + throws SnapshotCreationException, IllegalArgumentException, IOException { + org.apache.hadoop.hbase.client.SnapshotDescription snapshotDescOne = + new org.apache.hadoop.hbase.client.SnapshotDescription(snapshotName, tableName, + SnapshotType.FLUSH, null, EnvironmentEdgeManager.currentTime(), -1); + TEST_UTIL.getAdmin().snapshot(snapshotDescOne); + boolean isExist = TEST_UTIL.getAdmin().listSnapshots().stream() + .anyMatch(ele -> snapshotName.equals(ele.getName())); + assertTrue(isExist); + + } + + private void flipCompactions(boolean isEnable) { + int numLiveRegionServers = TEST_UTIL.getHBaseCluster().getNumLiveRegionServers(); + for (int serverNumber = 0; serverNumber < numLiveRegionServers; serverNumber++) { + HRegionServer regionServer = TEST_UTIL.getHBaseCluster().getRegionServer(serverNumber); + regionServer.getCompactSplitThread().setCompactionsEnabled(isEnable); + } + + } + + private ProcedureExecutor getMasterProcedureExecutor() { + return TEST_UTIL.getHBaseCluster().getMaster().getMasterProcedureExecutor(); + } + + protected void createTableAndSnapshot(TableName tableName, String snapshotName) + throws IOException { + byte[] column = Bytes.toBytes("A"); + Table table = TEST_UTIL.createTable(tableName, column, 2); + TEST_UTIL.loadTable(table, column); + TEST_UTIL.getAdmin().snapshot(snapshotName, tableName); + } + + private void checkNoHFileLinkInTableDir(TableName tableName) throws IOException { + Path[] tableDirs = new Path[] { CommonFSUtils.getTableDir(rootDir, tableName), + CommonFSUtils.getTableDir(new Path(rootDir, HConstants.HFILE_ARCHIVE_DIRECTORY), tableName), + CommonFSUtils.getTableDir(MobUtils.getMobHome(rootDir), tableName) }; + for (Path tableDir : tableDirs) { + assertFalse(hasHFileLink(tableDir)); + } + } + + private boolean hasHFileLink(Path tableDir) throws IOException { + if (fs.exists(tableDir)) { + RemoteIterator iterator = fs.listFiles(tableDir, true); + while (iterator.hasNext()) { + LocatedFileStatus fileStatus = iterator.next(); + if (fileStatus.isFile() && HFileLink.isHFileLink(fileStatus.getPath())) { + return true; + } + } + } + return false; + } + + private void verifyRestore(final Path rootDir, final TableDescriptor sourceHtd, + final TableDescriptor htdClone) throws IOException { + List files = SnapshotTestingUtils.listHFileNames(fs, + CommonFSUtils.getTableDir(rootDir, htdClone.getTableName())); + assertEquals(12, files.size()); + for (int i = 0; i < files.size(); i += 2) { + String linkFile = files.get(i); + String refFile = files.get(i + 1); + assertTrue(HFileLink.isHFileLink(linkFile), linkFile + " should be a HFileLink"); + assertTrue(StoreFileInfo.isReference(refFile), refFile + " should be a Reference"); + assertEquals(sourceHtd.getTableName(), HFileLink.getReferencedTableName(linkFile)); + Path refPath = getReferredToFile(refFile); + LOG.debug("get reference name for file " + refFile + " = " + refPath); + assertTrue(HFileLink.isHFileLink(refPath.getName()), + refPath.getName() + " should be a HFileLink"); + assertEquals(linkFile, refPath.getName()); + } + } + + private Path getReferredToFile(final String referenceName) { + Path fakeBasePath = new Path(new Path("table", "region"), "cf"); + return StoreFileInfo.getReferredToFile(new Path(fakeBasePath, referenceName)); + } +} diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java index 7a9f8d5dfb63..6529048f2e41 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java @@ -53,7 +53,6 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat.TableSnapshotRegionRecordReader; import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat.TableSnapshotRegionSplit; -import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper; import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests; @@ -708,7 +707,7 @@ public void testReadFromRestoredSnapshotViaMR() throws Exception { SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName, Arrays.asList(FAMILIES), null, snapshotName, rootDir, fs, true); Path tempRestoreDir = UTIL.getDataTestDirOnTestFS("restore_" + snapshotName); - RestoreSnapshotHelper.copySnapshotForScanner(UTIL.getConfiguration(), fs, rootDir, + MapreduceRestoreSnapshotHelper.copySnapshotForScanner(UTIL.getConfiguration(), fs, rootDir, tempRestoreDir, snapshotName); assertTrue(fs.exists(tempRestoreDir), "Restore directory should exist");