博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Flink DataSet API Programming Guide学习&译文(未完待续)
阅读量:2055 次
发布时间:2019-04-28

本文共 10892 字,大约阅读时间需要 36 分钟。

地址:https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/batch/index.html

注意:本文以Scala Api为学习语言

Flink也是常规的分布式程序,在分布式数据集上实现了transformation算子,例如: filtering, mapping, updating state, joining, grouping, defining windows, aggregating。数据集初始化一般是读取文件、Kafka、或者内存数据等,结果一般可以写到文件或者终端打印。Flink支持Standalone或者运行在其他框架之上,不同的运行模式下Flink运行在不同的Context中,Flink程序可以执行在本地或者是集群中!

接下来的部分是操作符的参考文档和高级特性:

 Example Program
 如下是一个完整的wordcount程序,可以复制到IDE直接本地方式运行,前提你需要导入必要的依赖和引入必需的包!
import org.apache.flink.api.scala._object WordCount {  def main(args: Array[String]) {    val env = ExecutionEnvironment.getExecutionEnvironment    val text = env.fromElements(      "Who's there?",      "I think I hear them. Stand, ho! Who's there?")    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }      .map { (_, 1) }      .groupBy(0)      .sum(1)    counts.print()  }}
 DataSet Transformations
  Data transform 可以将一个或者多个数据集转成一个新的数据集,程序可以组合使用多个transformation算子,接下来介绍一下可以使用的transformation算子参考文档的,大部分算子和Spark算子区别不大,更多详细的算子文档参考:
  
 Data Sources

Data sources create the initial data sets, such as from files or from Java collections. The general mechanism of creating data sets is abstracted behind an . Flink comes with several built-in formats to create data sets from common file formats. Many of them have shortcut methods on the ExecutionEnvironment.

File-based:

  • readTextFile(path) / TextInputFormat - Reads files line wise and returns them as Strings.

  • readTextFileWithValue(path) / TextValueInputFormat - Reads files line wise and returns them as StringValues. StringValues are mutable strings.

  • readCsvFile(path) / CsvInputFormat - Parses files of comma (or another char) delimited fields. Returns a DataSet of tuples, case class objects, or POJOs. Supports the basic java types and their Value counterparts as field types.

  • readFileOfPrimitives(path, delimiter) / PrimitiveInputFormat - Parses files of new-line (or another char sequence) delimited primitive data types such as String or Integer using the given delimiter.

  • readHadoopFile(FileInputFormat, Key, Value, path) / FileInputFormat - Creates a JobConf and reads file from the specified path with the specified FileInputFormat, Key class and Value class and returns them as Tuple2<Key, Value>.

  • readSequenceFile(Key, Value, path) / SequenceFileInputFormat - Creates a JobConf and reads file from the specified path with type SequenceFileInputFormat, Key class and Value class and returns them as Tuple2<Key, Value>.

Collection-based:

  • fromCollection(Seq) - Creates a data set from a Seq. All elements in the collection must be of the same type.

  • fromCollection(Iterator) - Creates a data set from an Iterator. The class specifies the data type of the elements returned by the iterator.

  • fromElements(elements: _*) - Creates a data set from the given sequence of objects. All objects must be of the same type.

  • fromParallelCollection(SplittableIterator) - Creates a data set from an iterator, in parallel. The class specifies the data type of the elements returned by the iterator.

  • generateSequence(from, to) - Generates the squence of numbers in the given interval, in parallel.

Generic:

  • readFile(inputFormat, path) / FileInputFormat - Accepts a file input format.

  • createInput(inputFormat) / InputFormat - Accepts a generic input format.

Examples

val env  = ExecutionEnvironment.getExecutionEnvironment// read text file from local files systemval localLines = env.readTextFile("file:///path/to/my/textfile")// read text file from a HDFS running at nnHost:nnPortval hdfsLines = env.readTextFile("hdfs://nnHost:nnPort/path/to/my/textfile")// read a CSV file with three fieldsval csvInput = env.readCsvFile[(Int, String, Double)]("hdfs:///the/CSV/file")// read a CSV file with five fields, taking only two of themval csvInput = env.readCsvFile[(String, Double)](  "hdfs:///the/CSV/file",  includedFields = Array(0, 3)) // take the first and the fourth field// CSV input can also be used with Case Classescase class MyCaseClass(str: String, dbl: Double)val csvInput = env.readCsvFile[MyCaseClass](  "hdfs:///the/CSV/file",  includedFields = Array(0, 3)) // take the first and the fourth field// read a CSV file with three fields into a POJO (Person) with corresponding fieldsval csvInput = env.readCsvFile[Person](  "hdfs:///the/CSV/file",  pojoFields = Array("name", "age", "zipcode"))// create a set from some given elementsval values = env.fromElements("Foo", "bar", "foobar", "fubar")// generate a number sequenceval numbers = env.generateSequence(1, 10000000);// read a file from the specified path of type TextInputFormatval tuples = env.readHadoopFile(new TextInputFormat, classOf[LongWritable], classOf[Text], "hdfs://nnHost:nnPort/path/to/file")// read a file from the specified path of type SequenceFileInputFormatval tuples = env.readSequenceFile(classOf[IntWritable], classOf[Text], "hdfs://nnHost:nnPort/path/to/file")
配置CSV文件解析和读取压缩文件,参考官方文档!

Configuring CSV Parsing

Flink offers a number of configuration options for CSV parsing:

  • lineDelimiter: String specifies the delimiter of individual records. The default line delimiter is the new-line character '\n'.

  • fieldDelimiter: String specifies the delimiter that separates fields of a record. The default field delimiter is the comma character ','.

  • includeFields: Array[Int] defines which fields to read from the input file (and which to ignore). By default the first n fields (as defined by the number of types in the types() call) are parsed.

  • pojoFields: Array[String] specifies the fields of a POJO that are mapped to CSV fields. Parsers for CSV fields are automatically initialized based on the type and order of the POJO fields.

  • parseQuotedStrings: Character enables quoted string parsing. Strings are parsed as quoted strings if the first character of the string field is the quote character (leading or tailing whitespaces are not trimmed). Field delimiters within quoted strings are ignored. Quoted string parsing fails if the last character of a quoted string field is not the quote character. If quoted string parsing is enabled and the first character of the field is not the quoting string, the string is parsed as unquoted string. By default, quoted string parsing is disabled.

  • ignoreComments: String specifies a comment prefix. All lines that start with the specified comment prefix are not parsed and ignored. By default, no lines are ignored.

  • lenient: Boolean enables lenient parsing, i.e., lines that cannot be correctly parsed are ignored. By default, lenient parsing is disabled and invalid lines raise an exception.

  • ignoreFirstLine: Boolean configures the InputFormat to ignore the first line of the input file. By default no line is ignored.

Recursive Traversal of the Input Path Directory

For file-based inputs, when the input path is a directory, nested files are not enumerated by default. Instead, only the files inside the base directory are read, while nested files are ignored. Recursive enumeration of nested files can be enabled through the recursive.file.enumeration configuration parameter, like in the following example.

// enable recursive enumeration of nested input filesval env  = ExecutionEnvironment.getExecutionEnvironment// create a configuration objectval parameters = new Configuration// set the recursive enumeration parameterparameters.setBoolean("recursive.file.enumeration", true)// pass the configuration to the data sourceenv.readTextFile("file:///path/with.nested/files").withParameters(parameters)

Read Compressed Files

Flink currently supports transparent decompression of input files if these are marked with an appropriate file extension. In particular, this means that no further configuration of the input formats is necessary and any FileInputFormat support the compression, including custom input formats. Please notice that compressed files might not be read in parallel, thus impacting job scalability.

The following table lists the currently supported compression methods.


Compression method File extensions Parallelizable
DEFLATE .deflate no
GZip .gz.gzip no
Bzip2 .bz2 no
XZ .xz no
 Data Sinks
  Data Sink是将数据集中的数据存储或者返回,  Data Sink的操作可以由OutputFormat指定输出文件格式,Flink默认构建了一些输出文件格式。
  • writeAsText() / TextOutputFormat - Writes elements line-wise as Strings. The Strings are obtained by calling the toString() method of each element. 以文本的方式写出去
  • writeAsCsv(...) / CsvOutputFormat - Writes tuples as comma-separated value files. Row and field delimiters are configurable. The value for each field comes from the toString() method of the objects.以csv的方式写出去
  • print() / printToErr() - Prints the toString() value of each element on the standard out / standard error stream.打印到控制台
  • write() / FileOutputFormat - Method and base class for custom file outputs. Supports custom object-to-bytes conversion.指定文件格式以写文件的方式出去
  • output()OutputFormat - Most generic output method, for data sinks that are not file based (such as storing the result in a database).通用的泛型方法,将数据写出去,但是不是以文件的方式,例如写到数据库等(write() 方法是以文件方式写出去
Example:
标准的数据sink方法:
// text dataval textData: DataSet[String] = // [...]// write DataSet to a file on the local file systemtextData.writeAsText("file:///my/result/on/localFS")// write DataSet to a file on a HDFS with a namenode running at nnHost:nnPorttextData.writeAsText("hdfs://nnHost:nnPort/my/result/on/localFS")// write DataSet to a file and overwrite the file if it existstextData.writeAsText("file:///my/result/on/localFS", WriteMode.OVERWRITE)// tuples as lines with pipe as the separator "a|b|c"val values: DataSet[(String, Int, Double)] = // [...]values.writeAsCsv("file:///path/to/the/result/file", "\n", "|")// this writes tuples in the text formatting "(a, b, c)", rather than as CSV linesvalues.writeAsText("file:///path/to/the/result/file");// this writes values as strings using a user-defined formattingvalues map {
tuple => tuple._1 + " - " + tuple._2 } .writeAsText("file:///path/to/the/result/file")
 Locally Sorted Output 本地排序输出:
 
将输出的数据根据指定的制度按进行排序,这个使用任何的output format。如下Example:
val tData: DataSet[(Int, String, Double)] = // [...]val pData: DataSet[(BookPojo, Double)] = // [...]val sData: DataSet[String] = // [...]// sort output on String field in ascending ordertData.sortPartition(1, Order.ASCENDING).print;// sort output on Double field in descending and Int field in ascending ordertData.sortPartition(2, Order.DESCENDING).sortPartition(0, Order.ASCENDING).print;// sort output on the "author" field of nested BookPojo in descending orderpData.sortPartition("_1.author", Order.DESCENDING).writeAsText(...);// sort output on the full tuple in ascending ordertData.sortPartition("_", Order.ASCENDING).writeAsCsv(...);// sort atomic type (String) output in descending ordersData.sortPartition("_", Order.DESCENDING).writeAsText(...);
目前仅支持分区排序,全局排序目前还不支持!
 Iteration Operators
   
 Flink程序的迭代是使用循环的,迭代操作符封装了的一部分代码并重复执行,每一次返回迭代结果到下一次迭代。在Flink中有两种  迭代:BulkIteration 和 DeltaIteration
.接下来可以看一下两种迭代的使用,迭代更多细节参考:

 
 

转载地址:http://kfjlf.baihongyu.com/

你可能感兴趣的文章
c结构体、c++结构体和c++类的区别以及错误纠正
查看>>
Linux下查看根目录各文件内存占用情况
查看>>
A星算法详解(个人认为最详细,最通俗易懂的一个版本)
查看>>
利用栈实现DFS
查看>>
逆序对的数量(递归+归并思想)
查看>>
数的范围(二分查找上下界)
查看>>
算法导论阅读顺序
查看>>
Windows程序设计:直线绘制
查看>>
linux之CentOS下文件解压方式
查看>>
Django字段的创建并连接MYSQL
查看>>
div标签布局的使用
查看>>
HTML中表格的使用
查看>>
(模板 重要)Tarjan算法解决LCA问题(PAT 1151 LCA in a Binary Tree)
查看>>
(PAT 1154) Vertex Coloring (图的广度优先遍历)
查看>>
(PAT 1115) Counting Nodes in a BST (二叉查找树-统计指定层元素个数)
查看>>
(PAT 1143) Lowest Common Ancestor (二叉查找树的LCA)
查看>>
(PAT 1061) Dating (字符串处理)
查看>>
(PAT 1118) Birds in Forest (并查集)
查看>>
数据结构 拓扑排序
查看>>
(PAT 1040) Longest Symmetric String (DP-最长回文子串)
查看>>