Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Task distribution in Apache Flink

Tags:

apache-flink

Consider a Flink cluster with some nodes where each node has a multi-core processor. If we configure the number of the slots based on the number of cores and equal share of memory, how does Apache Flink distribute the tasks between the nodes and the free slots? Are they fairly treated?
Is there any way to make/configure Flink to treat the slots equally when we configure the task slots based on the number of the cores available on a node
     For instance, assume that we partition the data equally and run the same task over the partitions. Flink uses all the slots from some nodes and at the same time some nodes are totally free. The node which has less number of CPU cores involved outputs the result much faster than the node with more number of CPU cores involved in the process. Apart from that, this ratio of speedup is not proportional to the number of used cores in each node. In other words, if in one node one core is occupied and in another node two cores are occupied, in fairly treating each core as a slot, each slot should output the result over the same task in almost equal amount of time irrespective of which node they belong to. But, this is not the case here.
     With this assumption, I would say that the nodes are not treated equally. This in turn produces a result time wise that is not proportional to the number of the nodes available. We can not say that increasing the number of the slots necessarily decreases the time cost.

I would appreciate any comment from the Apache Flink Community!!

like image 267
Ahmad.S Avatar asked Jan 13 '16 17:01

Ahmad.S


People also ask

What are task slots in Flink?

Task Slots and ResourcesTo control how many tasks a worker accepts, a worker has so called task slots (at least one). Each task slot represents a fixed subset of resources of the TaskManager. A TaskManager with three slots, for example, will dedicate 1/3 of its managed memory to each slot.

Is Apache Flink distributed?

Apache Flink is a distributed system and requires compute resources in order to execute applications.

What is a Flink task?

For distributed execution, Flink chains operator subtasks together into tasks. Each task is executed by one thread. Chaining operators together into tasks is a useful optimization: it reduces the overhead of thread-to-thread handover and buffering, and increases overall throughput while decreasing latency.

What is task manager in Flink?

Job client submits the Flink job to the Job manager. Job manager then orchestrates jobs on different task managers and manage the resources. Task Managers are the actual worker nodes doing computations on the data and updating the job manager about their progress.


1 Answers

Flink's default strategy as of version >= 1.5 considers every slot to be resource-wise the same. With this assumption, it should not matter wrt resources where you place the tasks since all slots should be the same. Given this, the main objective for placing tasks is to colocate them with their inputs in order to minimize network I/O.

If we are now in a standalone setup where we have a fixed number of TaskManagers running, Flink will pick slots in an arbitrary fashion (no guarantee given) for the sources and then colocate their consumers in the same slots if possible.

When running Flink on Yarn or Mesos where Flink can start new TaskManagers, Flink will first use up all slots of an existing TaskManager before it requests a new one. In this case, you will see that all sources will end up on as few TaskManagers as possible.

Since CPUs are not isolated wrt slots (they are a shared resource), the above-mentioned assumption does not hold true in all cases. Hence, in some cases where you have a fixed set of TaskManagers it is actually beneficial to spread the tasks out as much as possible to make use of the shared CPU resources.

In order to support this kind of scheduling strategy, the Flink community added the task spread out strategy via FLINK-12122. In order to use a scheduling strategy which is more similar to the pre FLIP-6 behaviour where Flink tries to spread out the workload across all available TaskExecutors, one needs to set cluster.evenly-spread-out-slots: true in the flink-conf.yaml

like image 163
Till Rohrmann Avatar answered Oct 22 '22 04:10

Till Rohrmann