This article introduces two methods of appending content to the end of the file under the HDFS framework.
- Framework: hadoop 1.0.3
- Test Environment: Single Node
Method 1: Using hadoop built-in mechanism
Previous version of HDFS had no support for an append operation for some reliability and consistency reason.
After the release of 0.21, it provides a configuration parameter dfs.support.append to disable or enable the append functionality. By default, it is false.
(note that append functionality is still unstable, so this flag should be set to true only on development or test clusters).
Firstly, you need to set the
dfs.support.append to be
true.
Procedure:
- Open the file on the path ${HADOOP_HOME}/conf/hdfs-site.xml
- copy the following code into the file between the <configuration> tag
<property>
<name>dfs.support.append</name>
<value>true</value>
<description>Does HDFS allow appends to files?
This is currently set to false because there are bugs in the
"append code" and is not supported in any prodction cluster.</description></property>
Secondly, compile the following source code and distribute the compiled class file including the full package namespace(that is all the parent directory) under the ${Hadoop_home} directory.
package hadoop.demo;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class FileAppend1 {
public static final String uri = "hdfs://localhost:9000/user/zhouyaofei/log.txt";
/**
* @param args
* @throws IOException
*/
public static void main(String[] args) throws IOException {
// TODO Auto-generated method stub
if (args.length == 0) {
System.err.println("wrong argument list");
}
// get the content user want to append
String content = args[0];
// instantiate a configuration class
Configuration conf = new Configuration();
// get a HDFS filesystem instance
FileSystem fs = FileSystem.get(URI.create(uri), conf);
//get if file append functionality is enabled
boolean flag = Boolean.getBoolean(fs.getConf()
.get("dfs.support.append"));
System.out.println("dfs.support.append is set to be " + flag);
if (flag) {
FSDataOutputStream fsout = fs.append(new Path(uri));
// wrap the outputstream with a writer
PrintWriter writer = new PrintWriter(fsout);
writer.append(content);
writer.close();
} else {
System.err.println("please set the dfs.support.append to be true");
}
fs.close();
}
}
More information about method 1, please refer to File Appends in HDFS.
Method 2: Write your code for appending the content when the dfs.support.append to be false.
In the method 2, you need to write more code than method 1 to achieve the same the functionality.
What we need to do is using a temporary file for backing up the existing file content. Then re-write to the file with previous content and the new content to target file.
In this example, we use the GenericOptionsParser to handle to two options -t -c to specify the file name and the content. For the detail of the format of this command, you just issue the command without any option, then I use the standard format helper released by apache common library to show the usage of the this command.
The core of this bunch of code is to read the backup file content in a loop using the
read(long position, byte[] buffer, int offset, int length) of FSInputStream. For detail refer
detailed specification.
Firstly, you should set
dfs.support.append to be
false
Secondly, compile the following source code and distribute the compiled class file including the full package namespace(that is all the parent directory) under the ${Hadoop_home} directory.
package hadoop.demo;
import java.io.IOException;
import java.io.PrintWriter;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.GenericOptionsParser;
public class FileAppend2 {
public static final String uriBackUp = "hdfs://localhost:9000/user/zhouyaofei/backup.tmp";
public static final String uriTargetPrefix = "hdfs://localhost:9000/user/zhouyaofei/";
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
Configuration conf = new Configuration();
Options customizedOptions = buildOption();
if (args.length == 0){
printUsage(customizedOptions);
return ;
}
try {
GenericOptionsParser parser = new GenericOptionsParser(conf,
customizedOptions, args);
CommandLine cmdLine = parser.getCommandLine();
if (cmdLine.hasOption("t") && cmdLine.hasOption("c")) {
String targetPath = cmdLine.getOptionValue("t");
String content = cmdLine.getOptionValue("c");
processCommand(targetPath, content, conf);
}
} catch (IOException e) {
// TODO Auto-generated catch block
} catch (NullPointerException e) {
}
}
public static void backup(Configuration conf, FileSystem fs,
FSDataInputStream sourceContent) throws IOException {
FSDataOutputStream out = fs.create(new Path(uriBackUp), true);
IOUtils.copyBytes(sourceContent, out, 4096, false);
out.close();
}
private static void processCommand(String targetPath, String content,
Configuration conf) throws Exception {
Path path = new Path(uriTargetPrefix + targetPath);
FileSystem fs = FileSystem.get(conf);
FSDataInputStream in = fs.open(path);
backup(conf, fs, in);
in.close();
FSDataOutputStream out = fs.create((path), true);
FSDataInputStream backup = fs.open(new Path(uriBackUp));
int offset = 0;
int bufferSize = 4096;
int result = 0;
byte[] buffer = new byte[bufferSize];
// pre read a part of content from input stream
result = backup.read(offset, buffer, 0, bufferSize);
// loop read input stream until it does not fill whole size of buffer
while (result == bufferSize) {
out.write(buffer);
// read next segment from input stream by moving the offset pointer
offset += bufferSize;
result = backup.read(offset, buffer, 0, bufferSize);
}
if (result > 0 && result < bufferSize) {
for (int i = 0; i < result; i++) {
out.write(buffer[i]);
}
}
out.writeBytes(content+"\n");
out.close();
}
public static void printUsage(Options options) {
HelpFormatter formatter = new HelpFormatter();
PrintWriter writer = new PrintWriter(System.out);
formatter.printHelp(writer, 80, "FileAppend [generic option] ", "Begin",
options, 4, 5, "end", true);
writer.close();
}
@SuppressWarnings("static-access")
public static Options buildOption() {
Options options = new Options();
Option t = OptionBuilder
.hasArg()
.withArgName("TargetName")
.withDescription("hdfs file path you want to append content to")
.isRequired().withLongOpt("target").create("t");
Option c = OptionBuilder
.hasArg()
.withArgName("Content")
.withDescription(
"content you want to append to the target file")
.isRequired().withLongOpt("content").create("c");
options.addOption(t);
options.addOption(c);
return options;
}
}