本文共 10892 字,大约阅读时间需要 36 分钟。
地址:https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/batch/index.html
注意:本文以Scala Api为学习语言
Flink也是常规的分布式程序,在分布式数据集上实现了transformation算子,例如: filtering, mapping, updating state, joining, grouping, defining windows, aggregating。数据集初始化一般是读取文件、Kafka、或者内存数据等,结果一般可以写到文件或者终端打印。Flink支持Standalone或者运行在其他框架之上,不同的运行模式下Flink运行在不同的Context中,Flink程序可以执行在本地或者是集群中!
接下来的部分是操作符的参考文档和高级特性:
import org.apache.flink.api.scala._object WordCount { def main(args: Array[String]) { val env = ExecutionEnvironment.getExecutionEnvironment val text = env.fromElements( "Who's there?", "I think I hear them. Stand, ho! Who's there?") val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } } .map { (_, 1) } .groupBy(0) .sum(1) counts.print() }}
Data sources create the initial data sets, such as from files or from Java collections. The general mechanism of creating data sets is abstracted behind an . Flink comes with several built-in formats to create data sets from common file formats. Many of them have shortcut methods on the ExecutionEnvironment.
File-based:
readTextFile(path)
/ TextInputFormat
- Reads files line wise and returns them as Strings.
readTextFileWithValue(path)
/ TextValueInputFormat
- Reads files line wise and returns them as StringValues. StringValues are mutable strings.
readCsvFile(path)
/ CsvInputFormat
- Parses files of comma (or another char) delimited fields. Returns a DataSet of tuples, case class objects, or POJOs. Supports the basic java types and their Value counterparts as field types.
readFileOfPrimitives(path, delimiter)
/ PrimitiveInputFormat
- Parses files of new-line (or another char sequence) delimited primitive data types such as String
or Integer
using the given delimiter.
readHadoopFile(FileInputFormat, Key, Value, path)
/ FileInputFormat
- Creates a JobConf and reads file from the specified path with the specified FileInputFormat, Key class and Value class and returns them as Tuple2<Key, Value>.
readSequenceFile(Key, Value, path)
/ SequenceFileInputFormat
- Creates a JobConf and reads file from the specified path with type SequenceFileInputFormat, Key class and Value class and returns them as Tuple2<Key, Value>.
Collection-based:
fromCollection(Seq)
- Creates a data set from a Seq. All elements in the collection must be of the same type.
fromCollection(Iterator)
- Creates a data set from an Iterator. The class specifies the data type of the elements returned by the iterator.
fromElements(elements: _*)
- Creates a data set from the given sequence of objects. All objects must be of the same type.
fromParallelCollection(SplittableIterator)
- Creates a data set from an iterator, in parallel. The class specifies the data type of the elements returned by the iterator.
generateSequence(from, to)
- Generates the squence of numbers in the given interval, in parallel.
Generic:
readFile(inputFormat, path)
/ FileInputFormat
- Accepts a file input format.
createInput(inputFormat)
/ InputFormat
- Accepts a generic input format.
Examples
val env = ExecutionEnvironment.getExecutionEnvironment// read text file from local files systemval localLines = env.readTextFile("file:///path/to/my/textfile")// read text file from a HDFS running at nnHost:nnPortval hdfsLines = env.readTextFile("hdfs://nnHost:nnPort/path/to/my/textfile")// read a CSV file with three fieldsval csvInput = env.readCsvFile[(Int, String, Double)]("hdfs:///the/CSV/file")// read a CSV file with five fields, taking only two of themval csvInput = env.readCsvFile[(String, Double)]( "hdfs:///the/CSV/file", includedFields = Array(0, 3)) // take the first and the fourth field// CSV input can also be used with Case Classescase class MyCaseClass(str: String, dbl: Double)val csvInput = env.readCsvFile[MyCaseClass]( "hdfs:///the/CSV/file", includedFields = Array(0, 3)) // take the first and the fourth field// read a CSV file with three fields into a POJO (Person) with corresponding fieldsval csvInput = env.readCsvFile[Person]( "hdfs:///the/CSV/file", pojoFields = Array("name", "age", "zipcode"))// create a set from some given elementsval values = env.fromElements("Foo", "bar", "foobar", "fubar")// generate a number sequenceval numbers = env.generateSequence(1, 10000000);// read a file from the specified path of type TextInputFormatval tuples = env.readHadoopFile(new TextInputFormat, classOf[LongWritable], classOf[Text], "hdfs://nnHost:nnPort/path/to/file")// read a file from the specified path of type SequenceFileInputFormatval tuples = env.readSequenceFile(classOf[IntWritable], classOf[Text], "hdfs://nnHost:nnPort/path/to/file")
Flink offers a number of configuration options for CSV parsing:
lineDelimiter: String
specifies the delimiter of individual records. The default line delimiter is the new-line character '\n'
.
fieldDelimiter: String
specifies the delimiter that separates fields of a record. The default field delimiter is the comma character ','
.
includeFields: Array[Int]
defines which fields to read from the input file (and which to ignore). By default the first n fields (as defined by the number of types in the types()
call) are parsed.
pojoFields: Array[String]
specifies the fields of a POJO that are mapped to CSV fields. Parsers for CSV fields are automatically initialized based on the type and order of the POJO fields.
parseQuotedStrings: Character
enables quoted string parsing. Strings are parsed as quoted strings if the first character of the string field is the quote character (leading or tailing whitespaces are not trimmed). Field delimiters within quoted strings are ignored. Quoted string parsing fails if the last character of a quoted string field is not the quote character. If quoted string parsing is enabled and the first character of the field is not the quoting string, the string is parsed as unquoted string. By default, quoted string parsing is disabled.
ignoreComments: String
specifies a comment prefix. All lines that start with the specified comment prefix are not parsed and ignored. By default, no lines are ignored.
lenient: Boolean
enables lenient parsing, i.e., lines that cannot be correctly parsed are ignored. By default, lenient parsing is disabled and invalid lines raise an exception.
ignoreFirstLine: Boolean
configures the InputFormat to ignore the first line of the input file. By default no line is ignored.
For file-based inputs, when the input path is a directory, nested files are not enumerated by default. Instead, only the files inside the base directory are read, while nested files are ignored. Recursive enumeration of nested files can be enabled through the recursive.file.enumeration
configuration parameter, like in the following example.
// enable recursive enumeration of nested input filesval env = ExecutionEnvironment.getExecutionEnvironment// create a configuration objectval parameters = new Configuration// set the recursive enumeration parameterparameters.setBoolean("recursive.file.enumeration", true)// pass the configuration to the data sourceenv.readTextFile("file:///path/with.nested/files").withParameters(parameters)
Flink currently supports transparent decompression of input files if these are marked with an appropriate file extension. In particular, this means that no further configuration of the input formats is necessary and any FileInputFormat
support the compression, including custom input formats. Please notice that compressed files might not be read in parallel, thus impacting job scalability.
The following table lists the currently supported compression methods.
Compression method | File extensions | Parallelizable |
---|---|---|
DEFLATE | .deflate | no |
GZip | .gz , .gzip | no |
Bzip2 | .bz2 | no |
XZ | .xz | no |
writeAsText()
/ TextOutputFormat
- Writes elements line-wise as Strings. The Strings are obtained by calling the toString() method of each element. 以文本的方式写出去writeAsCsv(...)
/ CsvOutputFormat
- Writes tuples as comma-separated value files. Row and field delimiters are configurable. The value for each field comes from the toString() method of the objects.以csv的方式写出去print()
/ printToErr()
- Prints the toString() value of each element on the standard out / standard error stream.打印到控制台write()
/ FileOutputFormat
- Method and base class for custom file outputs. Supports custom object-to-bytes conversion.指定文件格式以写文件的方式出去output()
/ OutputFormat
- Most generic output method, for data sinks that are not file based (such as storing the result in a database).通用的泛型方法,将数据写出去,但是不是以文件的方式,例如写到数据库等(write()
方法是以文件方式写出去)// text dataval textData: DataSet[String] = // [...]// write DataSet to a file on the local file systemtextData.writeAsText("file:///my/result/on/localFS")// write DataSet to a file on a HDFS with a namenode running at nnHost:nnPorttextData.writeAsText("hdfs://nnHost:nnPort/my/result/on/localFS")// write DataSet to a file and overwrite the file if it existstextData.writeAsText("file:///my/result/on/localFS", WriteMode.OVERWRITE)// tuples as lines with pipe as the separator "a|b|c"val values: DataSet[(String, Int, Double)] = // [...]values.writeAsCsv("file:///path/to/the/result/file", "\n", "|")// this writes tuples in the text formatting "(a, b, c)", rather than as CSV linesvalues.writeAsText("file:///path/to/the/result/file");// this writes values as strings using a user-defined formattingvalues map { tuple => tuple._1 + " - " + tuple._2 } .writeAsText("file:///path/to/the/result/file")
val tData: DataSet[(Int, String, Double)] = // [...]val pData: DataSet[(BookPojo, Double)] = // [...]val sData: DataSet[String] = // [...]// sort output on String field in ascending ordertData.sortPartition(1, Order.ASCENDING).print;// sort output on Double field in descending and Int field in ascending ordertData.sortPartition(2, Order.DESCENDING).sortPartition(0, Order.ASCENDING).print;// sort output on the "author" field of nested BookPojo in descending orderpData.sortPartition("_1.author", Order.DESCENDING).writeAsText(...);// sort output on the full tuple in ascending ordertData.sortPartition("_", Order.ASCENDING).writeAsCsv(...);// sort atomic type (String) output in descending ordersData.sortPartition("_", Order.DESCENDING).writeAsText(...);
转载地址:http://kfjlf.baihongyu.com/