Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to avoid continuous "Resetting offset" and "Seeking to LATEST offset"?

I'm trying to follow this guide: https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html But I don't realize why I'm it's most of the time not writing data to the console, and why its spamming execution thread logging?
Do I need to configure something? This is my code:

SparkSession spark = SparkSession
  .builder()
  .appName("Testing")
  .config("spark.master", "local")
  .getOrCreate();

StructType recordSchema = new StructType()
  .add("description", "string")
  .add("location", "string")
  .add("id", "string")
  .add("title", "string")
  .add("company", "string")
  .add("place", "string")
  .add("date", "string")
  .add("senorityLevel", "string")
  .add("function", "string")
  .add("employmentType", "string")
  .add("industries", "string");

Dataset<Row> df = spark
  .readStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "127.0.0.1:9092")
  .option("subscribe", "linkedin-producer")
  .option("startingOffsets", "earliest")
  .option("kafka.group.id","test")
  .load();

StreamingQuery query = df
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .select(from_json(col("value").cast("string"), recordSchema).as("data"))
  .select("data.*")
  .writeStream()
  .outputMode(OutputMode.Append())
  .format("console")
  .start();

try {
  query.awaitTermination(10000);
} catch (StreamingQueryException e) {
  e.printStackTrace();
}

And sometimes I'm getting the df in the console, but my console is full of this:

[Executor task launch worker for task 1] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.7.0
[Executor task launch worker for task 1] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 448719dc99a19793
[Executor task launch worker for task 1] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1613492229792
[Executor task launch worker for task 1] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-test-3, groupId=test] Subscribed to partition(s): linkedin-producer-0
[Executor task launch worker for task 1] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-test-3, groupId=test] Seeking to offset 0 for partition linkedin-producer-0
[Executor task launch worker for task 1] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-test-3, groupId=test] Cluster ID: N88wfukWTIS-ycMeSGhhng
[task-result-getter-0] INFO org.apache.spark.network.client.TransportClientFactory - Successfully created connection to /10.0.0.9:44237 after 76 ms (0 ms spent in bootstraps)
[Executor task launch worker for task 1] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-test-3, groupId=test] Seeking to offset 500 for partition linkedin-producer-0
[task-result-getter-0] INFO org.apache.spark.scheduler.TaskSetManager - Finished task 0.0 in stage 0.0 (TID 0) in 1069 ms on 10.0.0.9 (executor driver) (1/3)
[dispatcher-BlockManagerMaster] INFO org.apache.spark.storage.BlockManagerInfo - Removed taskresult_0 on 10.0.0.9:44237 in memory (size: 2.9 MiB, free: 848.4 MiB)
[Executor task launch worker for task 1] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-test-3, groupId=test] Seeking to offset 909 for partition linkedin-producer-0
[Executor task launch worker for task 1] INFO org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask - Commit authorized for partition 1 (task 1, attempt 0, stage 0.0)
[Executor task launch worker for task 1] INFO org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask - Committed partition 1 (task 1, attempt 0, stage 0.0)
[Executor task launch worker for task 1] INFO org.apache.spark.storage.memory.MemoryStore - Block taskresult_1 stored as bytes in memory (estimated size 2.9 MiB, free 845.5 MiB)
[dispatcher-BlockManagerMaster] INFO org.apache.spark.storage.BlockManagerInfo - Added taskresult_1 in memory on 10.0.0.9:44237 (size: 2.9 MiB, free: 845.5 MiB)
[Executor task launch worker for task 1] INFO org.apache.spark.executor.Executor - Finished task 1.0 in stage 0.0 (TID 1). 3003495 bytes result sent via BlockManager)
[dispatcher-event-loop-1] INFO org.apache.spark.scheduler.TaskSetManager - Starting task 2.0 in stage 0.0 (TID 2, 10.0.0.9, executor driver, partition 2, PROCESS_LOCAL, 8103 bytes)
[Executor task launch worker for task 2] INFO org.apache.spark.executor.Executor - Running task 2.0 in stage 0.0 (TID 2)
[task-result-getter-1] INFO org.apache.spark.scheduler.TaskSetManager - Finished task 1.0 in stage 0.0 (TID 1) in 304 ms on 10.0.0.9 (executor driver) (2/3)
[dispatcher-BlockManagerMaster] INFO org.apache.spark.storage.BlockManagerInfo - Removed taskresult_1 on 10.0.0.9:44237 in memory (size: 2.9 MiB, free: 848.4 MiB)
[Executor task launch worker for task 2] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: 
    allow.auto.create.topics = true
    auto.commit.interval.ms = 5000
    auto.offset.reset = none
    bootstrap.servers = [127.0.0.1:9092]
    check.crcs = true
    client.dns.lookup = use_all_dns_ips
    client.id = consumer-test-4
    client.rack = 
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = test
    group.instance.id = null
    heartbeat.interval.ms = 3000
    interceptor.classes = []
    internal.leave.group.on.close = true
    internal.throw.on.fetch.stable.offset.unsupported = false
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    socket.connection.setup.timeout.max.ms = 127000
    socket.connection.setup.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.certificate.chain = null
    ssl.keystore.key = null
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.2
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.certificates = null
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

[Executor task launch worker for task 2] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.7.0
[Executor task launch worker for task 2] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 448719dc99a19793
[Executor task launch worker for task 2] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1613492230087
[Executor task launch worker for task 2] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-test-4, groupId=test] Subscribed to partition(s): linkedin-producer-2
[Executor task launch worker for task 2] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-test-4, groupId=test] Seeking to offset 0 for partition linkedin-producer-2
[Executor task launch worker for task 2] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-test-4, groupId=test] Cluster ID: N88wfukWTIS-ycMeSGhhng
[Executor task launch worker for task 2] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-test-4, groupId=test] Seeking to offset 500 for partition linkedin-producer-2
[Executor task launch worker for task 2] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-test-4, groupId=test] Seeking to offset 905 for partition linkedin-producer-2
[Executor task launch worker for task 2] INFO org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask - Commit authorized for partition 2 (task 2, attempt 0, stage 0.0)
[Executor task launch worker for task 2] INFO org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask - Committed partition 2 (task 2, attempt 0, stage 0.0)
[Executor task launch worker for task 2] INFO org.apache.spark.storage.memory.MemoryStore - Block taskresult_2 stored as bytes in memory (estimated size 2.9 MiB, free 845.5 MiB)
[dispatcher-BlockManagerMaster] INFO org.apache.spark.storage.BlockManagerInfo - Added taskresult_2 in memory on 10.0.0.9:44237 (size: 2.9 MiB, free: 845.5 MiB)
[Executor task launch worker for task 2] INFO org.apache.spark.executor.Executor - Finished task 2.0 in stage 0.0 (TID 2). 3001144 bytes result sent via BlockManager)
[task-result-getter-2] INFO org.apache.spark.scheduler.TaskSetManager - Finished task 2.0 in stage 0.0 (TID 2) in 240 ms on 10.0.0.9 (executor driver) (3/3)
[dispatcher-BlockManagerMaster] INFO org.apache.spark.storage.BlockManagerInfo - Removed taskresult_2 on 10.0.0.9:44237 in memory (size: 2.9 MiB, free: 848.4 MiB)
[task-result-getter-2] INFO org.apache.spark.scheduler.TaskSchedulerImpl - Removed TaskSet 0.0, whose tasks have all completed, from pool 
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - ResultStage 0 (start at Spark.java:73) finished in 1.730 s
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Job 0 is finished. Cancelling potential speculative or zombie tasks for this job
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.TaskSchedulerImpl - Killing all running tasks in stage 0: Stage finished
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.spark.scheduler.DAGScheduler - Job 0 finished: start at Spark.java:73, took 1.768779 s
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec - Data source write support org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@52eea1c3 is committing.
-------------------------------------------
Batch: 0
-------------------------------------------
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator - Code generated in 29.841333 ms
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator - Code generated in 30.563541 ms
+--------------------+--------+----------+--------------------+--------------------+--------------------+----------+----------------+--------------------+--------------+--------------------+
|         description|location|        id|               title|             company|               place|      date|   senorityLevel|            function|employmentType|          industries|
+--------------------+--------+----------+--------------------+--------------------+--------------------+----------+----------------+--------------------+--------------+--------------------+
|Job Summary We ar...|  Israel|2406654159|       Retail Intern|Disney Media & En...|Tel Aviv, Tel Avi...|2021-02-11|Mid-Senior level|General Business,...|     Full-time|Marketing and Adv...|
|We're looking for...|  Israel|2404180635|  Personal Assistant|            Lemonade|Tel Aviv, Tel Avi...|2021-01-07|     Entry level|      Administrative|     Full-time|Marketing and Adv...|
|Job Summary We ar...|  Israel|2398561147|Retail intern -12...|The Walt Disney C...|    Tel Aviv, Israel|2021-02-10|      Internship|           Marketing|     Full-time|       Entertainment|
|We're looking for...|  Israel|2404180635|  Personal Assistant|            Lemonade|Tel Aviv, Tel Avi...|2021-01-07|     Entry level|      Administrative|     Full-time|Marketing and Adv...|
|We're looking for...|  Israel|2404180635|  Personal Assistant|            Lemonade|Tel Aviv, Tel Avi...|2021-01-07|     Entry level|      Administrative|     Full-time|Marketing and Adv...|
|Job Summary We ar...|  Israel|2406654159|       Retail Intern|Disney Media & En...|Tel Aviv, Tel Avi...|2021-02-11|Mid-Senior level|General Business,...|     Full-time|Marketing and Adv...|
|Job Summary We ar...|  Israel|2398561147|Retail intern -12...|The Walt Disney C...|    Tel Aviv, Israel|2021-02-10|      Internship|           Marketing|     Full-time|       Entertainment|
|At CrowdStrike we...|  Israel|2406653801|       HR Generalist|         CrowdStrike|Ramat Gan, Tel Av...|2021-02-11|       Associate|     Human Resources|     Full-time|Information Techn...|
|Job Description W...|  Israel|2406699205|HR Administrator ...| Akamai Technologies|Tel Aviv, Tel Avi...|2021-02-11|  Not Applicable|     Human Resources|     Full-time|Computer Networki...|
|JOB PURPOSE To as...|  Israel|2403563715|Research, Campaig...|Amnesty Internati...|Jerusalem Municip...|2021-02-09|     Entry level|            Research|      Contract|Nonprofit Organiz...|
|Job Description A...|  Israel|2383126490|Receptionist – Pa...|    Ceragon Networks|Tel Aviv, Tel Avi...|2021-02-01|  Not Applicable|      Administrative|     Full-time|Computer Networki...|
|Fiverr is looking...|  Israel|2419715658|        Data Analyst|        About Fiverr|    Tel Aviv, Israel|2021-02-11|Mid-Senior level|Information Techn...|     Full-time|            Internet|
|חברת AlfaCloud - ...|  Israel|2400094107|     Project Manager|AlfaCloud - ERP S...|    Tel Aviv, Israel|2021-02-11|     Entry level|Project Managemen...|     Full-time|   Computer Software|
|טדי הפקות מחפשת א...|  Israel|2396568054|       Booking Agent|    Tedy Productions|    Tel Aviv, Israel|2021-02-09|     Entry level|Design, Art/Creat...|     Full-time|                    |
|The Norman Tel Av...|  Israel|2418149015|    Front Desk Staff| The Norman Tel Aviv|    Tel Aviv, Israel|2021-02-10|     Entry level|      Administrative|     Full-time|         Hospitality|
|Are you a stellar...|  Israel|2405797088|Regional Operatio...|                Wolt|Tel Aviv, Tel Avi...|2021-02-11|        Director|          Management|     Full-time|Marketing and Adv...|
|About CXBuzz Inte...|  Israel|2400078284|   Journalism Intern|              CXBuzz|    Tel Aviv, Israel|2021-02-11|      Internship| Education, Training|    Internship|          Publishing|
|Job Summary We ar...|  Israel|2406654159|       Retail Intern|Disney Media & En...|Tel Aviv, Tel Avi...|2021-02-11|Mid-Senior level|General Business,...|     Full-time|Marketing and Adv...|
|Job Summary We ar...|  Israel|2398561147|Retail intern -12...|The Walt Disney C...|    Tel Aviv, Israel|2021-02-10|      Internship|           Marketing|     Full-time|       Entertainment|
|At CrowdStrike we...|  Israel|2406653801|       HR Generalist|         CrowdStrike|Ramat Gan, Tel Av...|2021-02-11|       Associate|     Human Resources|     Full-time|Information Techn...|
+--------------------+--------+----------+--------------------+--------------------+--------------------+----------+----------------+--------------------+--------------+--------------------+
only showing top 20 rows

[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec - Data source write support org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@52eea1c3 committed.
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.spark.sql.execution.streaming.CheckpointFileManager - Writing atomically to file:/tmp/temporary-c9ccc957-c729-4f8f-8635-1a029de31511/commits/0 using temp file file:/tmp/temporary-c9ccc957-c729-4f8f-8635-1a029de31511/commits/.0.0cdf78cd-795c-4c3c-94d1-91341e38187f.tmp
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.spark.sql.execution.streaming.CheckpointFileManager - Renamed temp file file:/tmp/temporary-c9ccc957-c729-4f8f-8635-1a029de31511/commits/.0.0cdf78cd-795c-4c3c-94d1-91341e38187f.tmp to file:/tmp/temporary-c9ccc957-c729-4f8f-8635-1a029de31511/commits/0
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.spark.sql.execution.streaming.MicroBatchExecution - Streaming query made progress: {
  "id" : "9d193cbf-379e-495e-87e3-18f9f09145ea",
  "runId" : "2e9f6d84-23af-4b23-89cd-73ecef66d290",
  "name" : null,
  "timestamp" : "2021-02-16T16:17:06.949Z",
  "batchId" : 0,
  "numInputRows" : 3813,
  "processedRowsPerSecond" : 1035.5784899511136,
  "durationMs" : {
    "addBatch" : 2786,
    "getBatch" : 22,
    "latestOffset" : 446,
    "queryPlanning" : 363,
    "triggerExecution" : 3681,
    "walCommit" : 23
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[linkedin-producer]]",
    "startOffset" : null,
    "endOffset" : {
      "linkedin-producer" : {
        "2" : 1269,
        "1" : 1272,
        "0" : 1272
      }
    },
    "numInputRows" : 3813,
    "processedRowsPerSecond" : 1035.5784899511136
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.execution.streaming.ConsoleTable$@793ec5d7",
    "numOutputRows" : 3813
  }
}
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-test-1, groupId=test] Seeking to LATEST offset of partition linkedin-producer-0
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-test-1, groupId=test] Seeking to LATEST offset of partition linkedin-producer-2
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-test-1, groupId=test] Seeking to LATEST offset of partition linkedin-producer-1
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-test-1, groupId=test] Resetting offset for partition linkedin-producer-0 to position FetchPosition{offset=1272, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[omri-Lenovo-YOGA-920-13IKB:9092 (id: 0 rack: null)], epoch=0}}.
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-test-1, groupId=test] Resetting offset for partition linkedin-producer-2 to position FetchPosition{offset=1269, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[omri-Lenovo-YOGA-920-13IKB:9092 (id: 0 rack: null)], epoch=0}}.
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-test-1, groupId=test] Resetting offset for partition linkedin-producer-1 to position FetchPosition{offset=1272, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[omri-Lenovo-YOGA-920-13IKB:9092 (id: 0 rack: null)], epoch=0}}.
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.spark.sql.execution.streaming.MicroBatchExecution - Streaming query made progress: {
  "id" : "9d193cbf-379e-495e-87e3-18f9f09145ea",
  "runId" : "2e9f6d84-23af-4b23-89cd-73ecef66d290",
  "name" : null,
  "timestamp" : "2021-02-16T16:17:10.664Z",
  "batchId" : 1,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "latestOffset" : 3,
    "triggerExecution" : 4
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[linkedin-producer]]",
    "startOffset" : {
      "linkedin-producer" : {
        "2" : 1269,
        "1" : 1272,
        "0" : 1272
      }
    },
    "endOffset" : {
      "linkedin-producer" : {
        "2" : 1269,
        "1" : 1272,
        "0" : 1272
      }
    },
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.execution.streaming.ConsoleTable$@793ec5d7",
    "numOutputRows" : 0
  }
}
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-test-1, groupId=test] Seeking to LATEST offset of partition linkedin-producer-0
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-test-1, groupId=test] Seeking to LATEST offset of partition linkedin-producer-2
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-test-1, groupId=test] Seeking to LATEST offset of partition linkedin-producer-1
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-test-1, groupId=test] Resetting offset for partition linkedin-producer-0 to position FetchPosition{offset=1272, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[omri-Lenovo-YOGA-920-13IKB:9092 (id: 0 rack: null)], epoch=0}}.
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-test-1, groupId=test] Resetting offset for partition linkedin-producer-2 to position FetchPosition{offset=1269, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[omri-Lenovo-YOGA-920-13IKB:9092 (id: 0 rack: null)], epoch=0}}.
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-test-1, groupId=test] Resetting offset for partition linkedin-producer-1 to position FetchPosition{offset=1272, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[omri-Lenovo-YOGA-920-13IKB:9092 (id: 0 rack: null)], epoch=0}}.
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-test-1, groupId=test] Seeking to LATEST offset of partition linkedin-producer-0
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-test-1, groupId=test] Seeking to LATEST offset of partition linkedin-producer-2
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-test-1, groupId=test] Seeking to LATEST offset of partition linkedin-producer-1
[stream execution thread for [id = 9d193cbf-379e-495e-87e3-18f9f09145ea, runId = 2e9f6d84-23af-4b23-89cd-73ecef66d290]] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-test-1, groupId=test] Resetting offset for partition linkedin-producer-0 to position FetchPosition{offset=1272, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[omri-Lenovo-YOGA-920-13IKB:9092 (id: 0 rack: null)], epoch=0}}.
.
.
.

pom.xml:

      <!--Spark-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-avro_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>
like image 586
Zamkie Avatar asked Dec 13 '25 03:12

Zamkie


1 Answers

you are getting logger information as you have used default logging level as INFO. set logging level to WARN by spark.sparkContext.setLogLevel("WARN").

like image 152
himanshu singh Avatar answered Dec 14 '25 20:12

himanshu singh



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!