菜单 学习猿地 - LMONKEY

VIP

开通学习猿地VIP

尊享10项VIP特权 持续新增

知识通关挑战

打卡带练!告别无效练习

接私单赚外块

VIP优先接,累计金额超百万

学习猿地私房课免费学

大厂实战课仅对VIP开放

你的一对一导师

每月可免费咨询大牛30次

领取更多软件工程师实用特权

入驻
383
0

排查启动脚本,弄清依赖关系 弄清楚一个命令的执行后的影响 讲解 启动 依赖 源码分析

原创
05/13 14:22
阅读数 33461

 

[root@hadoop1 apache-flume-1.8.0-bin]# less bin/flume-ng

 

#!/bin/bash
#
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#

################################
# constants
################################

FLUME_AGENT_CLASS="org.apache.flume.node.Application"
FLUME_AVRO_CLIENT_CLASS="org.apache.flume.client.avro.AvroCLIClient"
FLUME_VERSION_CLASS="org.apache.flume.tools.VersionInfo"
FLUME_TOOLS_CLASS="org.apache.flume.tools.FlumeToolsMain"

CLEAN_FLAG=1
################################
# functions
################################

info() {
  if [ ${CLEAN_FLAG} -ne 0 ]; then
    local msg=$1
    echo "Info: $msg" >&2
  fi
}

warn() {
  if [ ${CLEAN_FLAG} -ne 0 ]; then
    local msg=$1
    echo "Warning: $msg" >&2
  fi
}

error() {
  local msg=$1
  local exit_code=$2

  echo "Error: $msg" >&2

  if [ -n "$exit_code" ] ; then
    exit $exit_code
  fi
}

# If avail, add Hadoop paths to the FLUME_CLASSPATH and to the
# FLUME_JAVA_LIBRARY_PATH env vars.
# Requires Flume jars to already be on FLUME_CLASSPATH.
add_hadoop_paths() {
  local HADOOP_IN_PATH=$(PATH="${HADOOP_HOME:-${HADOOP_PREFIX}}/bin:$PATH" \
      which hadoop 2>/dev/null)

  if [ -f "${HADOOP_IN_PATH}" ]; then
    info "Including Hadoop libraries found via ($HADOOP_IN_PATH) for HDFS access"

    # determine hadoop java.library.path and use that for flume
    local HADOOP_CLASSPATH=""
    local HADOOP_JAVA_LIBRARY_PATH=$(HADOOP_CLASSPATH="$FLUME_CLASSPATH" \
        ${HADOOP_IN_PATH} org.apache.flume.tools.GetJavaProperty \
        java.library.path)

    # look for the line that has the desired property value
    # (considering extraneous output from some GC options that write to stdout)
    # IFS = InternalFieldSeparator (set to recognize only newline char as delimiter)
    IFS=$'\n'
    for line in $HADOOP_JAVA_LIBRARY_PATH; do
      if [[ $line =~ ^java\.library\.path=(.*)$ ]]; then
        HADOOP_JAVA_LIBRARY_PATH=${BASH_REMATCH[1]}
        break
      fi
    done
    unset IFS

    if [ -n "${HADOOP_JAVA_LIBRARY_PATH}" ]; then
      FLUME_JAVA_LIBRARY_PATH="$FLUME_JAVA_LIBRARY_PATH:$HADOOP_JAVA_LIBRARY_PATH"
    fi

    # determine hadoop classpath
    HADOOP_CLASSPATH=$($HADOOP_IN_PATH classpath)

    FLUME_CLASSPATH="$FLUME_CLASSPATH:$HADOOP_CLASSPATH"
  fi
}
add_HBASE_paths() {
  local HBASE_IN_PATH=$(PATH="${HBASE_HOME}/bin:$PATH" \
      which hbase 2>/dev/null)

  if [ -f "${HBASE_IN_PATH}" ]; then
    info "Including HBASE libraries found via ($HBASE_IN_PATH) for HBASE access"

    # determine HBASE java.library.path and use that for flume
    local HBASE_CLASSPATH=""
    local HBASE_JAVA_LIBRARY_PATH=$(HBASE_CLASSPATH="$FLUME_CLASSPATH" \
        ${HBASE_IN_PATH} org.apache.flume.tools.GetJavaProperty \
        java.library.path)

    # look for the line that has the desired property value
    # (considering extraneous output from some GC options that write to stdout)
    # IFS = InternalFieldSeparator (set to recognize only newline char as delimiter)
    IFS=$'\n'
    for line in $HBASE_JAVA_LIBRARY_PATH; do
      if [[ $line =~ ^java\.library\.path=(.*)$ ]]; then
        HBASE_JAVA_LIBRARY_PATH=${BASH_REMATCH[1]}
        break
      fi
    done
    unset IFS

    if [ -n "${HBASE_JAVA_LIBRARY_PATH}" ]; then
      FLUME_JAVA_LIBRARY_PATH="$FLUME_JAVA_LIBRARY_PATH:$HBASE_JAVA_LIBRARY_PATH"
    fi

    # determine HBASE classpath
    HBASE_CLASSPATH=$($HBASE_IN_PATH classpath)

    FLUME_CLASSPATH="$FLUME_CLASSPATH:$HBASE_CLASSPATH"
    FLUME_CLASSPATH="$FLUME_CLASSPATH:$HBASE_HOME/conf"

  fi
}

add_hive_paths(){
  if [ -d "${HIVE_HOME}/lib" ]; then
    info "Including Hive libraries found via ($HIVE_HOME) for Hive access"
    FLUME_CLASSPATH="$FLUME_CLASSPATH:$HIVE_HOME/lib/*"
  fi
  if [ -d "${HCAT_HOME}/share/hcatalog" ]; then
    info "Including HCatalog libraries found via ($HCAT_HOME) for Hive access"
    FLUME_CLASSPATH="$FLUME_CLASSPATH:${HCAT_HOME}/share/hcatalog/*"
  fi
}

set_LD_LIBRARY_PATH(){
#Append the FLUME_JAVA_LIBRARY_PATH to whatever the user may have specified in
#flume-env.sh
  if [ -n "${FLUME_JAVA_LIBRARY_PATH}" ]; then
    export LD_LIBRARY_PATH="${LD_LIBRARY_PATH}:${FLUME_JAVA_LIBRARY_PATH}"
  fi
}

display_help() {
  cat <<EOF
Usage: $0 <command> [options]...

commands:
  help                      display this help text
  agent                     run a Flume agent
  avro-client               run an avro Flume client
  version                   show Flume version info

global options:
  --conf,-c <conf>          use configs in <conf> directory
  --classpath,-C <cp>       append to the classpath
  --dryrun,-d               do not actually start Flume, just print the command
  --plugins-path <dirs>     colon-separated list of plugins.d directories. See the
                            plugins.d section in the user guide for more details.
                            Default: \$FLUME_HOME/plugins.d
  -Dproperty=value          sets a Java system property value
  -Xproperty=value          sets a Java -X option

agent options:
  --name,-n <name>          the name of this agent (required)
  --conf-file,-f <file>     specify a config file (required if -z missing)
  --zkConnString,-z <str>   specify the ZooKeeper connection to use (required if -f missing)
  --zkBasePath,-p <path>    specify the base path in ZooKeeper for agent configs
  --no-reload-conf          do not reload config file if changed
  --help,-h                 display help text

avro-client options:
  --rpcProps,-P <file>   RPC client properties file with server connection params
  --host,-H <host>       hostname to which events will be sent
  --port,-p <port>       port of the avro source
  --dirname <dir>        directory to stream to avro source
  --filename,-F <file>   text file to stream to avro source (default: std input)
  --headerFile,-R <file> File containing event headers as key/value pairs on each new line
  --help,-h              display help text

  Either --rpcProps or both --host and --port must be specified.

Note that if <conf> directory is specified, then it is always included first
in the classpath.

EOF
}

run_flume() {
  local FLUME_APPLICATION_CLASS

  if [ "$#" -gt 0 ]; then
    FLUME_APPLICATION_CLASS=$1
    shift
  else
    error "Must specify flume application class" 1
  fi

  if [ ${CLEAN_FLAG} -ne 0 ]; then
    set -x
  fi
  $EXEC $JAVA_HOME/bin/java $JAVA_OPTS $FLUME_JAVA_OPTS "${arr_java_props[@]}" -cp "$FLUME_CLASSPATH" \
      -Djava.library.path=$FLUME_JAVA_LIBRARY_PATH "$FLUME_APPLICATION_CLASS" $*
}

################################
# main
################################

# set default params
FLUME_CLASSPATH=""
FLUME_JAVA_LIBRARY_PATH=""
JAVA_OPTS="-Xmx20m"
LD_LIBRARY_PATH=""

opt_conf=""
opt_classpath=""
opt_plugins_dirs=""
arr_java_props=()
arr_java_props_ct=0
opt_dryrun=""

mode=$1
shift

case "$mode" in
  help)
    display_help
    exit 0
    ;;
  agent)
    opt_agent=1
    ;;
  node)
    opt_agent=1
    warn "The \"node\" command is deprecated. Please use \"agent\" instead."
    ;;
  avro-client)
    opt_avro_client=1
    ;;
  tool)
    opt_tool=1
    ;;
  version)
   opt_version=1
   CLEAN_FLAG=0
   ;;
  *)
    error "Unknown or unspecified command '$mode'"
    echo
    display_help
    exit 1
    ;;
esac

args=""
while [ -n "$*" ] ; do
  arg=$1
  shift

  case "$arg" in
    --conf|-c)
      [ -n "$1" ] || error "Option --conf requires an argument" 1
      opt_conf=$1
      shift
      ;;
    --classpath|-C)
      [ -n "$1" ] || error "Option --classpath requires an argument" 1
      opt_classpath=$1
      shift
      ;;
    --dryrun|-d)
      opt_dryrun="1"
      ;;
    --plugins-path)
      opt_plugins_dirs=$1
      shift
      ;;
    -agentlib*)
      arr_java_props[arr_java_props_ct]=$arg
      ((++arr_java_props_ct))
      ;;
    -agentpath*)
      arr_java_props[arr_java_props_ct]=$arg
      ((++arr_java_props_ct))
      ;;
    -javaagent*)
      arr_java_props[arr_java_props_ct]=$arg
      ((++arr_java_props_ct))
      ;;
    -D*)
      arr_java_props[arr_java_props_ct]=$arg
      ((++arr_java_props_ct))
      ;;
    -X*)
      arr_java_props[arr_java_props_ct]=$arg
      ((++arr_java_props_ct))
      ;;
    *)
      args="$args $arg"
      ;;
  esac
done

# make opt_conf absolute
if [[ -n "$opt_conf" && -d "$opt_conf" ]]; then
  opt_conf=$(cd $opt_conf; pwd)
fi

# allow users to override the default env vars via conf/flume-env.sh
if [ -z "$opt_conf" ]; then
  warn "No configuration directory set! Use --conf <dir> to override."
elif [ -f "$opt_conf/flume-env.sh" ]; then
  info "Sourcing environment configuration script $opt_conf/flume-env.sh"
  source "$opt_conf/flume-env.sh"
fi

# prepend command-line classpath to env script classpath
if [ -n "${opt_classpath}" ]; then
  if [ -n "${FLUME_CLASSPATH}" ]; then
    FLUME_CLASSPATH="${opt_classpath}:${FLUME_CLASSPATH}"
  else
    FLUME_CLASSPATH="${opt_classpath}"
  fi
fi

if [ -z "${FLUME_HOME}" ]; then
  FLUME_HOME=$(cd $(dirname $0)/..; pwd)
fi

# prepend $FLUME_HOME/lib jars to the specified classpath (if any)
if [ -n "${FLUME_CLASSPATH}" ] ; then
  FLUME_CLASSPATH="${FLUME_HOME}/lib/*:$FLUME_CLASSPATH"
else
  FLUME_CLASSPATH="${FLUME_HOME}/lib/*"
fi

# load plugins.d directories
PLUGINS_DIRS=""
if [ -n "${opt_plugins_dirs}" ]; then
  PLUGINS_DIRS=$(sed -e 's/:/ /g' <<<${opt_plugins_dirs})
else
  PLUGINS_DIRS="${FLUME_HOME}/plugins.d"
fi

unset plugin_lib plugin_libext plugin_native
for PLUGINS_DIR in $PLUGINS_DIRS; do
  if [[ -d ${PLUGINS_DIR} ]]; then
    for plugin in ${PLUGINS_DIR}/*; do
      if [[ -d "$plugin/lib" ]]; then
        plugin_lib="${plugin_lib}${plugin_lib+:}${plugin}/lib/*"
      fi
      if [[ -d "$plugin/libext" ]]; then
        plugin_libext="${plugin_libext}${plugin_libext+:}${plugin}/libext/*"
      fi
      if [[ -d "$plugin/native" ]]; then
        plugin_native="${plugin_native}${plugin_native+:}${plugin}/native"
      fi
    done
  fi
done

if [[ -n "${plugin_lib}" ]]
then
  FLUME_CLASSPATH="${FLUME_CLASSPATH}:${plugin_lib}"
fi

if [[ -n "${plugin_libext}" ]]
then
  FLUME_CLASSPATH="${FLUME_CLASSPATH}:${plugin_libext}"
fi

if [[ -n "${plugin_native}" ]]
then
  if [[ -n "${FLUME_JAVA_LIBRARY_PATH}" ]]
  then
    FLUME_JAVA_LIBRARY_PATH="${FLUME_JAVA_LIBRARY_PATH}:${plugin_native}"
  else
    FLUME_JAVA_LIBRARY_PATH="${plugin_native}"
  fi
fi

# find java
if [ -z "${JAVA_HOME}" ] ; then
  warn "JAVA_HOME is not set!"
  # Try to use Bigtop to autodetect JAVA_HOME if it's available
  if [ -e /usr/libexec/bigtop-detect-javahome ] ; then
    . /usr/libexec/bigtop-detect-javahome
  elif [ -e /usr/lib/bigtop-utils/bigtop-detect-javahome ] ; then
    . /usr/lib/bigtop-utils/bigtop-detect-javahome
  fi

  # Using java from path if bigtop is not installed or couldn't find it
  if [ -z "${JAVA_HOME}" ] ; then
    JAVA_DEFAULT=$(type -p java)
    [ -n "$JAVA_DEFAULT" ] || error "Unable to find java executable. Is it in your PATH?" 1
    JAVA_HOME=$(cd $(dirname $JAVA_DEFAULT)/..; pwd)
  fi
fi

# look for hadoop libs
add_hadoop_paths
add_HBASE_paths
add_hive_paths

# prepend conf dir to classpath
if [ -n "$opt_conf" ]; then
  FLUME_CLASSPATH="$opt_conf:$FLUME_CLASSPATH"
fi

set_LD_LIBRARY_PATH
# allow dryrun
EXEC="exec"
if [ -n "${opt_dryrun}" ]; then
  warn "Dryrun mode enabled (will not actually initiate startup)"
  EXEC="echo"
fi

# finally, invoke the appropriate command
if [ -n "$opt_agent" ] ; then
  run_flume $FLUME_AGENT_CLASS $args
elif [ -n "$opt_avro_client" ] ; then
  run_flume $FLUME_AVRO_CLIENT_CLASS $args
elif [ -n "${opt_version}" ] ; then
  run_flume $FLUME_VERSION_CLASS $args
elif [ -n "${opt_tool}" ] ; then
  run_flume $FLUME_TOOLS_CLASS $args
else
  error "This message should never appear" 1
fi

exit 0

  

 

 

 

 

 

2018-07-20 08:32:44,613 (conf-file-poller-0) [ERROR - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:146)] Failed to start agent because dependencies were not found in classpath. Error follows.
java.lang.NoClassDefFoundError: org/apache/hadoop/io/SequenceFile$CompressionType
	at org.apache.flume.sink.hdfs.HDFSEventSink.configure(HDFSEventSink.java:235)
	at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
	at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:411)
	at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:102)
	at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:141)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.io.SequenceFile$CompressionType
	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	... 12 more
2018-07-20 08:33:14,615 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:127)] Checking file:conf/flume.conf for changes

  

 

 

HADOOP_PREFIX=/home/hadoop-2.9.1;export HADOOP_PREFIX;HADOOP_CONF_DIR=$HADOOP_PREFIX/etc/hadoop;export HADOOP_CONF_DIR;HADOOP_HOME=/home/hadoop-2.9.1;export HADOOP_HOME;

 

[root@hadoop1 apache-flume-1.8.0-bin]# bin/flume-ng agent --conf ./conf/ -f conf/flume.conf -Dflume.root.logger=DEBUG,console -n agent1  
Info: Sourcing environment configuration script /home/apache-flume-1.8.0-bin/conf/flume-env.sh
Info: Including Hadoop libraries found via (/home/hadoop-2.9.1/bin/hadoop) for HDFS access
Info: Including Hive libraries found via () for Hive access
+ exec /usr/local/jdk/bin/java -Xmx20m -Dflume.root.logger=DEBUG,console -cp '/home/apache-flume-1.8.0-bin/conf:/home/apache-flume-1.8.0-bin/lib/*:/home/hadoop-2.9.1/etc/hadoop:/home/hadoop-2.9.1/share/hadoop/common/lib/*:/home/hadoop-2.9.1/share/hadoop/common/*:/home/hadoop-2.9.1/share/hadoop/hdfs:/home/hadoop-2.9.1/share/hadoop/hdfs/lib/*:/home/hadoop-2.9.1/share/hadoop/hdfs/*:/home/hadoop-2.9.1/share/hadoop/yarn:/home/hadoop-2.9.1/share/hadoop/yarn/lib/*:/home/hadoop-2.9.1/share/hadoop/yarn/*:/home/hadoop-2.9.1/share/hadoop/mapreduce/lib/*:/home/hadoop-2.9.1/share/hadoop/mapreduce/*:/home/hadoop-2.9.1/contrib/capacity-scheduler/*.jar:/lib/*' -Djava.library.path=:/home/hadoop-2.9.1/lib/native org.apache.flume.node.Application -f conf/flume.conf -n agent1
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/apache-flume-1.8.0-bin/lib/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/hadoop-2.9.1/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
2018-07-20 10:49:26,713 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start(PollingPropertiesFileConfigurationProvider.java:62)] Configuration provider starting
2018-07-20 10:49:26,723 (lifecycleSupervisor-1-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start(PollingPropertiesFileConfigurationProvider.java:79)] Configuration provider started
2018-07-20 10:49:26,727 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:127)] Checking file:conf/flume.conf for changes
2018-07-20 10:49:26,728 (conf-file-poller-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:134)] Reloading configuration file:conf/flume.conf
2018-07-20 10:49:26,733 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:HDFS
2018-07-20 10:49:26,733 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1020)] Created context for HDFS: hdfs.useLocalTimeStamp
2018-07-20 10:49:26,735 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:HDFS
2018-07-20 10:49:26,738 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:HDFS
2018-07-20 10:49:26,738 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:HDFS
2018-07-20 10:49:26,739 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:HDFS
2018-07-20 10:49:26,739 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:HDFS
2018-07-20 10:49:26,739 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:HDFS
2018-07-20 10:49:26,740 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:930)] Added sinks: HDFS Agent: agent1
2018-07-20 10:49:26,740 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:HDFS
2018-07-20 10:49:26,741 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:HDFS
2018-07-20 10:49:26,741 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:HDFS
2018-07-20 10:49:26,745 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:HDFS
2018-07-20 10:49:26,746 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:313)] Starting validation of configuration for agent: agent1
2018-07-20 10:49:26,749 (conf-file-poller-0) [INFO - org.apache.flume.conf.LogPrivacyUtil.<clinit>(LogPrivacyUtil.java:51)] Logging of configuration details is disabled. To see configuration details in the log run the agent with -Dorg.apache.flume.log.printconfig=true JVM argument. Please note that this is not recommended in production systems as it may leak private information to the logfile.
2018-07-20 10:49:26,756 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateChannels(FlumeConfiguration.java:467)] Created channel ch1
2018-07-20 10:49:26,764 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateSinks(FlumeConfiguration.java:674)] Creating sink: HDFS using HDFS
2018-07-20 10:49:26,767 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:135)] Channels:ch1

2018-07-20 10:49:26,768 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:136)] Sinks HDFS

2018-07-20 10:49:26,768 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:137)] Sources avro-source1

2018-07-20 10:49:26,768 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:140)] Post-validation flume configuration contains configuration for agents: [agent1]
2018-07-20 10:49:26,768 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:147)] Creating channels
2018-07-20 10:49:26,779 (conf-file-poller-0) [INFO - org.apache.flume.channel.DefaultChannelFactory.create(DefaultChannelFactory.java:42)] Creating instance of channel ch1 type memory
2018-07-20 10:49:26,783 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:201)] Created channel ch1
2018-07-20 10:49:26,784 (conf-file-poller-0) [INFO - org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:41)] Creating instance of source avro-source1, type avro
2018-07-20 10:49:26,801 (conf-file-poller-0) [INFO - org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:42)] Creating instance of sink: HDFS, type: hdfs
2018-07-20 10:49:26,813 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:116)] Channel ch1 connected to [avro-source1, HDFS]
2018-07-20 10:49:26,821 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:137)] Starting new configuration:{ sourceRunners:{avro-source1=EventDrivenSourceRunner: { source:Avro source avro-source1: { bindAddress: hadoop1, port: 41414 } }} sinkRunners:{HDFS=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@62589e29 counterGroup:{ name:null counters:{} } }} channels:{ch1=org.apache.flume.channel.MemoryChannel{name: ch1}} }
2018-07-20 10:49:26,834 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:144)] Starting Channel ch1
2018-07-20 10:49:26,908 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: CHANNEL, name: ch1: Successfully registered new MBean.
2018-07-20 10:49:26,908 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: CHANNEL, name: ch1 started
2018-07-20 10:49:26,908 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:171)] Starting Sink HDFS
2018-07-20 10:49:26,911 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SINK, name: HDFS: Successfully registered new MBean.
2018-07-20 10:49:26,911 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SINK, name: HDFS started
2018-07-20 10:49:26,920 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:182)] Starting Source avro-source1
2018-07-20 10:49:26,921 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:141)] Polling sink runner starting
2018-07-20 10:49:26,921 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:234)] Starting Avro source avro-source1: { bindAddress: hadoop1, port: 41414 }...
2018-07-20 10:49:27,264 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SOURCE, name: avro-source1: Successfully registered new MBean.
2018-07-20 10:49:27,264 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SOURCE, name: avro-source1 started
2018-07-20 10:49:27,266 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:260)] Avro source avro-source1 started.
2018-07-20 10:49:56,924 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:127)] Checking file:conf/flume.conf for changes
2018-07-20 10:50:26,924 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:127)] Checking file:conf/flume.conf for changes

 

 

错误详情

2018-07-20 10:57:33,800 (New I/O server boss #5) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0xffe40d8c, /192.168.3.101:45426 => /192.168.3.101:41414] OPEN
2018-07-20 10:57:33,801 (New I/O worker #2) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0xffe40d8c, /192.168.3.101:45426 => /192.168.3.101:41414] BOUND: /192.168.3.101:41414
2018-07-20 10:57:33,801 (New I/O worker #2) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0xffe40d8c, /192.168.3.101:45426 => /192.168.3.101:41414] CONNECTED: /192.168.3.101:45426
2018-07-20 10:57:34,012 (New I/O worker #2) [DEBUG - org.apache.flume.source.AvroSource.append(AvroSource.java:351)] Avro source avro-source1: Received avro event
2018-07-20 10:57:34,032 (New I/O worker #2) [DEBUG - org.apache.flume.source.AvroSource.append(AvroSource.java:351)] Avro source avro-source1: Received avro event
2018-07-20 10:57:34,033 (New I/O worker #2) [DEBUG - org.apache.flume.source.AvroSource.append(AvroSource.java:351)] Avro source avro-source1: Received avro event
2018-07-20 10:57:34,035 (New I/O worker #2) [DEBUG - org.apache.flume.source.AvroSource.append(AvroSource.java:351)] Avro source avro-source1: Received avro event
2018-07-20 10:57:34,036 (New I/O worker #2) [DEBUG - org.apache.flume.source.AvroSource.append(AvroSource.java:351)] Avro source avro-source1: Received avro event
2018-07-20 10:57:34,037 (New I/O worker #2) [DEBUG - org.apache.flume.source.AvroSource.append(AvroSource.java:351)] Avro source avro-source1: Received avro event
2018-07-20 10:57:34,039 (New I/O worker #2) [DEBUG - org.apache.flume.source.AvroSource.append(AvroSource.java:351)] Avro source avro-source1: Received avro event
2018-07-20 10:57:34,040 (New I/O worker #2) [DEBUG - org.apache.flume.source.AvroSource.append(AvroSource.java:351)] Avro source avro-source1: Received avro event
2018-07-20 10:57:34,042 (New I/O worker #2) [DEBUG - org.apache.flume.source.AvroSource.append(AvroSource.java:351)] Avro source avro-source1: Received avro event
2018-07-20 10:57:34,043 (New I/O worker #2) [DEBUG - org.apache.flume.source.AvroSource.append(AvroSource.java:351)] Avro source avro-source1: Received avro event
2018-07-20 10:57:34,044 (New I/O worker #2) [DEBUG - org.apache.flume.source.AvroSource.append(AvroSource.java:351)] Avro source avro-source1: Received avro event
2018-07-20 10:57:34,046 (New I/O worker #2) [DEBUG - org.apache.flume.source.AvroSource.append(AvroSource.java:351)] Avro source avro-source1: Received avro event
2018-07-20 10:57:34,047 (New I/O worker #2) [DEBUG - org.apache.flume.source.AvroSource.append(AvroSource.java:351)] Avro source avro-source1: Received avro event
2018-07-20 10:57:34,049 (New I/O worker #2) [DEBUG - org.apache.flume.source.AvroSource.append(AvroSource.java:351)] Avro source avro-source1: Received avro event
2018-07-20 10:57:34,050 (New I/O worker #2) [DEBUG - org.apache.flume.source.AvroSource.append(AvroSource.java:351)] Avro source avro-source1: Received avro event
2018-07-20 10:57:34,052 (New I/O worker #2) [DEBUG - org.apache.flume.source.AvroSource.append(AvroSource.java:351)] Avro source avro-source1: Received avro event
2018-07-20 10:57:34,054 (New I/O worker #2) [DEBUG - org.apache.flume.source.AvroSource.append(AvroSource.java:351)] Avro source avro-source1: Received avro event
2018-07-20 10:57:34,055 (New I/O worker #2) [DEBUG - org.apache.flume.source.AvroSource.append(AvroSource.java:351)] Avro source avro-source1: Received avro event
2018-07-20 10:57:34,057 (New I/O worker #2) [DEBUG - org.apache.flume.source.AvroSource.append(AvroSource.java:351)] Avro source avro-source1: Received avro event
2018-07-20 10:57:34,059 (New I/O worker #2) [DEBUG - org.apache.flume.source.AvroSource.append(AvroSource.java:351)] Avro source avro-source1: Received avro event
2018-07-20 10:57:34,062 (New I/O worker #2) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0xffe40d8c, /192.168.3.101:45426 :> /192.168.3.101:41414] DISCONNECTED
2018-07-20 10:57:34,062 (New I/O worker #2) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0xffe40d8c, /192.168.3.101:45426 :> /192.168.3.101:41414] UNBOUND
2018-07-20 10:57:34,062 (New I/O worker #2) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0xffe40d8c, /192.168.3.101:45426 :> /192.168.3.101:41414] CLOSED
2018-07-20 10:57:34,062 (New I/O worker #2) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.channelClosed(NettyServer.java:209)] Connection to /192.168.3.101:45426 disconnected.
2018-07-20 10:57:35,527 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.HDFSDataStream.configure(HDFSDataStream.java:57)] Serializer = TEXT, UseRawLocalFileSystem = false
2018-07-20 10:57:35,552 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:251)] Creating hdfs://hadoop1:9001/home/hadoop-2.9.1/logs/2018-07-20/10/flumeHdfs.1532055455528.tmp
2018-07-20 10:57:35,608 (hdfs-HDFS-call-runner-4) [DEBUG - org.apache.flume.sink.hdfs.AbstractHDFSWriter.reflectGetNumCurrentReplicas(AbstractHDFSWriter.java:200)] Using getNumCurrentReplicas--HDFS-826
2018-07-20 10:57:35,608 (hdfs-HDFS-call-runner-4) [DEBUG - org.apache.flume.sink.hdfs.AbstractHDFSWriter.reflectGetDefaultReplication(AbstractHDFSWriter.java:228)] Using FileSystem.getDefaultReplication(Path) from HADOOP-8014
2018-07-20 10:57:36,609 (hdfs-HDFS-roll-timer-0) [DEBUG - org.apache.flume.sink.hdfs.BucketWriter$2.call(BucketWriter.java:291)] Rolling file (hdfs://hadoop1:9001/home/hadoop-2.9.1/logs/2018-07-20/10/flumeHdfs.1532055455528.tmp): Roll scheduled after 1 sec elapsed.
2018-07-20 10:57:36,624 (Thread-11) [INFO - org.apache.hadoop.hdfs.DataStreamer.createBlockOutputStream(DataStreamer.java:1762)] Exception in createBlockOutputStream
java.io.IOException: Got error, status=ERROR, status message , ack with firstBadLink as 192.168.3.102:50010
	at org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.checkBlockOpStatus(DataTransferProtoUtil.java:118)
	at org.apache.hadoop.hdfs.DataStreamer.createBlockOutputStream(DataStreamer.java:1751)
	at org.apache.hadoop.hdfs.DataStreamer.nextBlockOutputStream(DataStreamer.java:1655)
	at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:710)
2018-07-20 10:57:36,625 (Thread-11) [WARN - org.apache.hadoop.hdfs.DataStreamer.nextBlockOutputStream(DataStreamer.java:1658)] Abandoning BP-1823215470-192.168.3.101-1531885297754:blk_1073741828_1004
2018-07-20 10:57:36,630 (Thread-11) [WARN - org.apache.hadoop.hdfs.DataStreamer.nextBlockOutputStream(DataStreamer.java:1663)] Excluding datanode DatanodeInfoWithStorage[192.168.3.102:50010,DS-56002022-a16c-46fa-8b64-a78c65fac104,DISK]
2018-07-20 10:57:36,651 (Thread-11) [INFO - org.apache.hadoop.hdfs.DataStreamer.createBlockOutputStream(DataStreamer.java:1762)] Exception in createBlockOutputStream
java.io.IOException: Got error, status=ERROR, status message , ack with firstBadLink as 192.168.3.103:50010
	at org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.checkBlockOpStatus(DataTransferProtoUtil.java:118)
	at org.apache.hadoop.hdfs.DataStreamer.createBlockOutputStream(DataStreamer.java:1751)
	at org.apache.hadoop.hdfs.DataStreamer.nextBlockOutputStream(DataStreamer.java:1655)
	at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:710)
2018-07-20 10:57:36,652 (Thread-11) [WARN - org.apache.hadoop.hdfs.DataStreamer.nextBlockOutputStream(DataStreamer.java:1658)] Abandoning BP-1823215470-192.168.3.101-1531885297754:blk_1073741829_1005
2018-07-20 10:57:36,663 (Thread-11) [WARN - org.apache.hadoop.hdfs.DataStreamer.nextBlockOutputStream(DataStreamer.java:1663)] Excluding datanode DatanodeInfoWithStorage[192.168.3.103:50010,DS-2b0f3ddc-d575-4400-9fdb-629cd71cd8e9,DISK]
2018-07-20 10:57:36,688 (hdfs-HDFS-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.BucketWriter.close(BucketWriter.java:393)] Closing hdfs://hadoop1:9001/home/hadoop-2.9.1/logs/2018-07-20/10/flumeHdfs.1532055455528.tmp
2018-07-20 10:57:36,699 (hdfs-HDFS-call-runner-7) [INFO - org.apache.flume.sink.hdfs.BucketWriter$8.call(BucketWriter.java:655)] Renaming hdfs://hadoop1:9001/home/hadoop-2.9.1/logs/2018-07-20/10/flumeHdfs.1532055455528.tmp to hdfs://hadoop1:9001/home/hadoop-2.9.1/logs/2018-07-20/10/flumeHdfs.1532055455528
2018-07-20 10:57:36,705 (hdfs-HDFS-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.HDFSEventSink$1.run(HDFSEventSink.java:382)] Writer callback called.
2018-07-20 10:57:56,931 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:127)] Checking file:conf/flume.conf for changes
2018-07-20 10:58:26,931 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:127)] Checking file:conf/flume.conf for changes
2018-07-20 10:58:56,932 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:127)] Checking file:conf/flume.conf for changes
2018-07-20 10:59:26,932 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:127)] Checking file:conf/flume.conf for changes
2018-07-20 10:59:56,933 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:127)] Checking file:conf/flume.conf for changes
2018-07-20 11:00:26,933 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:127)] Checking file:conf/flume.conf for changes
2018-07-20 11:00:56,934 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:127)] Checking file:conf/flume.conf for changes

  

下载源码,分析

<!-- https://mvnrepository.com/artifact/org.apache.flume.flume-ng-sinks/flume-hdfs-sink -->
<dependency>
<groupId>org.apache.flume.flume-ng-sinks</groupId>
<artifactId>flume-hdfs-sink</artifactId>
<version>1.8.0</version>
</dependency>



org.apache.flume.sink.hdfs.BucketWriter
C:\Users\mymy\.m2\repository\org\apache\flume\flume-ng-sinks\flume-hdfs-sink\1.8.0\flume-hdfs-sink-1.8.0.jar!\org\apache\flume\sink\hdfs\BucketWriter.class


//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//

package org.apache.flume.sink.hdfs;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import java.io.IOException;
import java.lang.reflect.Method;
import java.security.PrivilegedExceptionAction;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.flume.Clock;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.SystemClock;
import org.apache.flume.auth.PrivilegedExecutor;
import org.apache.flume.instrumentation.SinkCounter;
import org.apache.flume.sink.hdfs.HDFSEventSink.WriterCallback;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class BucketWriter {
    private static final Logger LOG = LoggerFactory.getLogger(BucketWriter.class);
    private static final Integer staticLock = new Integer(1);
    private Method isClosedMethod;
    private HDFSWriter writer;
    private final long rollInterval;
    private final long rollSize;
    private final long rollCount;
    private final long batchSize;
    private final CompressionCodec codeC;
    private final CompressionType compType;
    private final ScheduledExecutorService timedRollerPool;
    private final PrivilegedExecutor proxyUser;
    private final AtomicLong fileExtensionCounter;
    private long eventCounter;
    private long processSize;
    private FileSystem fileSystem;
    private volatile String filePath;
    private volatile String fileName;
    private volatile String inUsePrefix;
    private volatile String inUseSuffix;
    private volatile String fileSuffix;
    private volatile String bucketPath;
    private volatile String targetPath;
    private volatile long batchCounter;
    private volatile boolean isOpen;
    private volatile boolean isUnderReplicated;
    private volatile int consecutiveUnderReplRotateCount;
    private volatile ScheduledFuture<Void> timedRollFuture;
    private SinkCounter sinkCounter;
    private final int idleTimeout;
    private volatile ScheduledFuture<Void> idleFuture;
    private final WriterCallback onCloseCallback;
    private final String onCloseCallbackPath;
    private final long callTimeout;
    private final ExecutorService callTimeoutPool;
    private final int maxConsecUnderReplRotations;
    private boolean mockFsInjected;
    private final long retryInterval;
    private final int maxRenameTries;
    protected boolean closed;
    AtomicInteger renameTries;

    BucketWriter(long rollInterval, long rollSize, long rollCount, long batchSize, Context context, String filePath, String fileName, String inUsePrefix, String inUseSuffix, String fileSuffix, CompressionCodec codeC, CompressionType compType, HDFSWriter writer, ScheduledExecutorService timedRollerPool, PrivilegedExecutor proxyUser, SinkCounter sinkCounter, int idleTimeout, WriterCallback onCloseCallback, String onCloseCallbackPath, long callTimeout, ExecutorService callTimeoutPool, long retryInterval, int maxCloseTries) {
        this(rollInterval, rollSize, rollCount, batchSize, context, filePath, fileName, inUsePrefix, inUseSuffix, fileSuffix, codeC, compType, writer, timedRollerPool, proxyUser, sinkCounter, idleTimeout, onCloseCallback, onCloseCallbackPath, callTimeout, callTimeoutPool, retryInterval, maxCloseTries, new SystemClock());
    }

    BucketWriter(long rollInterval, long rollSize, long rollCount, long batchSize, Context context, String filePath, String fileName, String inUsePrefix, String inUseSuffix, String fileSuffix, CompressionCodec codeC, CompressionType compType, HDFSWriter writer, ScheduledExecutorService timedRollerPool, PrivilegedExecutor proxyUser, SinkCounter sinkCounter, int idleTimeout, WriterCallback onCloseCallback, String onCloseCallbackPath, long callTimeout, ExecutorService callTimeoutPool, long retryInterval, int maxCloseTries, Clock clock) {
        this.isClosedMethod = null;
        this.consecutiveUnderReplRotateCount = 0;
        this.maxConsecUnderReplRotations = 30;
        this.mockFsInjected = false;
        this.closed = false;
        this.renameTries = new AtomicInteger(0);
        this.rollInterval = rollInterval;
        this.rollSize = rollSize;
        this.rollCount = rollCount;
        this.batchSize = batchSize;
        this.filePath = filePath;
        this.fileName = fileName;
        this.inUsePrefix = inUsePrefix;
        this.inUseSuffix = inUseSuffix;
        this.fileSuffix = fileSuffix;
        this.codeC = codeC;
        this.compType = compType;
        this.writer = writer;
        this.timedRollerPool = timedRollerPool;
        this.proxyUser = proxyUser;
        this.sinkCounter = sinkCounter;
        this.idleTimeout = idleTimeout;
        this.onCloseCallback = onCloseCallback;
        this.onCloseCallbackPath = onCloseCallbackPath;
        this.callTimeout = callTimeout;
        this.callTimeoutPool = callTimeoutPool;
        this.fileExtensionCounter = new AtomicLong(clock.currentTimeMillis());
        this.retryInterval = retryInterval;
        this.maxRenameTries = maxCloseTries;
        this.isOpen = false;
        this.isUnderReplicated = false;
        this.writer.configure(context);
    }

    @VisibleForTesting
    void setFileSystem(FileSystem fs) {
        this.fileSystem = fs;
        this.mockFsInjected = true;
    }

    @VisibleForTesting
    void setMockStream(HDFSWriter dataWriter) {
        this.writer = dataWriter;
    }

    private void resetCounters() {
        this.eventCounter = 0L;
        this.processSize = 0L;
        this.batchCounter = 0L;
    }

    private Method getRefIsClosed() {
        try {
            return this.fileSystem.getClass().getMethod("isFileClosed", Path.class);
        } catch (Exception var2) {
            LOG.info("isFileClosed() is not available in the version of the distributed filesystem being used. Flume will not attempt to re-close files if the close fails on the first attempt");
            return null;
        }
    }

    private Boolean isFileClosed(FileSystem fs, Path tmpFilePath) throws Exception {
        return (Boolean)((Boolean)this.isClosedMethod.invoke(fs, tmpFilePath));
    }

    private void open() throws IOException, InterruptedException {
        if (this.filePath != null && this.writer != null) {
            final Configuration config = new Configuration();
            config.setBoolean("fs.automatic.close", false);
            Integer var2 = staticLock;
            synchronized(staticLock) {
                checkAndThrowInterruptedException();

                try {
                    long counter = this.fileExtensionCounter.incrementAndGet();
                    String fullFileName = this.fileName + "." + counter;
                    if (this.fileSuffix != null && this.fileSuffix.length() > 0) {
                        fullFileName = fullFileName + this.fileSuffix;
                    } else if (this.codeC != null) {
                        fullFileName = fullFileName + this.codeC.getDefaultExtension();
                    }

                    this.bucketPath = this.filePath + "/" + this.inUsePrefix + fullFileName + this.inUseSuffix;
                    this.targetPath = this.filePath + "/" + fullFileName;
                    LOG.info("Creating " + this.bucketPath);
                    this.callWithTimeout(new BucketWriter.CallRunner<Void>() {
                        public Void call() throws Exception {
                            if (BucketWriter.this.codeC == null) {
                                if (!BucketWriter.this.mockFsInjected) {
                                    BucketWriter.this.fileSystem = (new Path(BucketWriter.this.bucketPath)).getFileSystem(config);
                                }

                                BucketWriter.this.writer.open(BucketWriter.this.bucketPath);
                            } else {
                                if (!BucketWriter.this.mockFsInjected) {
                                    BucketWriter.this.fileSystem = (new Path(BucketWriter.this.bucketPath)).getFileSystem(config);
                                }

                                BucketWriter.this.writer.open(BucketWriter.this.bucketPath, BucketWriter.this.codeC, BucketWriter.this.compType);
                            }

                            return null;
                        }
                    });
                } catch (Exception var7) {
                    this.sinkCounter.incrementConnectionFailedCount();
                    if (var7 instanceof IOException) {
                        throw (IOException)var7;
                    }

                    throw Throwables.propagate(var7);
                }
            }

            this.isClosedMethod = this.getRefIsClosed();
            this.sinkCounter.incrementConnectionCreatedCount();
            this.resetCounters();
            if (this.rollInterval > 0L) {
                Callable<Void> action = new Callable<Void>() {
                    public Void call() throws Exception {
                        BucketWriter.LOG.debug("Rolling file ({}): Roll scheduled after {} sec elapsed.", BucketWriter.this.bucketPath, BucketWriter.this.rollInterval);

                        try {
                            BucketWriter.this.close(true);
                        } catch (Throwable var2) {
                            BucketWriter.LOG.error("Unexpected error", var2);
                        }

                        return null;
                    }
                };
                this.timedRollFuture = this.timedRollerPool.schedule(action, this.rollInterval, TimeUnit.SECONDS);
            }

            this.isOpen = true;
        } else {
            throw new IOException("Invalid file settings");
        }
    }

    public synchronized void close() throws IOException, InterruptedException {
        this.close(false);
    }

    private BucketWriter.CallRunner<Void> createCloseCallRunner() {
        return new BucketWriter.CallRunner<Void>() {
            private final HDFSWriter localWriter;

            {
                this.localWriter = BucketWriter.this.writer;
            }

            public Void call() throws Exception {
                this.localWriter.close();
                return null;
            }
        };
    }

    private Callable<Void> createScheduledRenameCallable() {
        return new Callable<Void>() {
            private final String path;
            private final String finalPath;
            private FileSystem fs;
            private int renameTries;

            {
                this.path = BucketWriter.this.bucketPath;
                this.finalPath = BucketWriter.this.targetPath;
                this.fs = BucketWriter.this.fileSystem;
                this.renameTries = 1;
            }

            public Void call() throws Exception {
                if (this.renameTries >= BucketWriter.this.maxRenameTries) {
                    BucketWriter.LOG.warn("Unsuccessfully attempted to rename " + this.path + " " + BucketWriter.this.maxRenameTries + " times. File may still be open.");
                    return null;
                } else {
                    ++this.renameTries;

                    try {
                        BucketWriter.this.renameBucket(this.path, this.finalPath, this.fs);
                        return null;
                    } catch (Exception var2) {
                        BucketWriter.LOG.warn("Renaming file: " + this.path + " failed. Will retry again in " + BucketWriter.this.retryInterval + " seconds.", var2);
                        BucketWriter.this.timedRollerPool.schedule(this, BucketWriter.this.retryInterval, TimeUnit.SECONDS);
                        return null;
                    }
                }
            }
        };
    }

    private synchronized void recoverLease() {
        if (this.bucketPath != null && this.fileSystem instanceof DistributedFileSystem) {
            try {
                LOG.debug("Starting lease recovery for {}", this.bucketPath);
                ((DistributedFileSystem)this.fileSystem).recoverLease(new Path(this.bucketPath));
            } catch (IOException var2) {
                LOG.warn("Lease recovery failed for {}", this.bucketPath, var2);
            }
        }

    }

    public synchronized void close(boolean callCloseCallback) throws IOException, InterruptedException {
        checkAndThrowInterruptedException();

        try {
            this.flush();
        } catch (IOException var7) {
            LOG.warn("pre-close flush failed", var7);
        }

        LOG.info("Closing {}", this.bucketPath);
        BucketWriter.CallRunner<Void> closeCallRunner = this.createCloseCallRunner();
        if (this.isOpen) {
            try {
                this.callWithTimeout(closeCallRunner);
                this.sinkCounter.incrementConnectionClosedCount();
            } catch (IOException var6) {
                LOG.warn("failed to close() HDFSWriter for file (" + this.bucketPath + "). Exception follows.", var6);
                this.sinkCounter.incrementConnectionFailedCount();
                this.recoverLease();
            }

            this.isOpen = false;
        } else {
            LOG.info("HDFSWriter is already closed: {}", this.bucketPath);
        }

        if (this.timedRollFuture != null && !this.timedRollFuture.isDone()) {
            this.timedRollFuture.cancel(false);
            this.timedRollFuture = null;
        }

        if (this.idleFuture != null && !this.idleFuture.isDone()) {
            this.idleFuture.cancel(false);
            this.idleFuture = null;
        }

        if (this.bucketPath != null && this.fileSystem != null) {
            try {
                this.renameBucket(this.bucketPath, this.targetPath, this.fileSystem);
            } catch (Exception var5) {
                LOG.warn("failed to rename() file (" + this.bucketPath + "). Exception follows.", var5);
                this.sinkCounter.incrementConnectionFailedCount();
                Callable<Void> scheduledRename = this.createScheduledRenameCallable();
                this.timedRollerPool.schedule(scheduledRename, this.retryInterval, TimeUnit.SECONDS);
            }
        }

        if (callCloseCallback) {
            this.runCloseAction();
            this.closed = true;
        }

    }

    public synchronized void flush() throws IOException, InterruptedException {
        checkAndThrowInterruptedException();
        if (!this.isBatchComplete()) {
            this.doFlush();
            if (this.idleTimeout > 0 && (this.idleFuture == null || this.idleFuture.cancel(false))) {
                Callable<Void> idleAction = new Callable<Void>() {
                    public Void call() throws Exception {
                        BucketWriter.LOG.info("Closing idle bucketWriter {} at {}", BucketWriter.this.bucketPath, System.currentTimeMillis());
                        if (BucketWriter.this.isOpen) {
                            BucketWriter.this.close(true);
                        }

                        return null;
                    }
                };
                this.idleFuture = this.timedRollerPool.schedule(idleAction, (long)this.idleTimeout, TimeUnit.SECONDS);
            }
        }

    }

    private void runCloseAction() {
        try {
            if (this.onCloseCallback != null) {
                this.onCloseCallback.run(this.onCloseCallbackPath);
            }
        } catch (Throwable var2) {
            LOG.error("Unexpected error", var2);
        }

    }

    private void doFlush() throws IOException, InterruptedException {
        this.callWithTimeout(new BucketWriter.CallRunner<Void>() {
            public Void call() throws Exception {
                BucketWriter.this.writer.sync();
                return null;
            }
        });
        this.batchCounter = 0L;
    }

    public synchronized void append(final Event event) throws IOException, InterruptedException {
        checkAndThrowInterruptedException();
        if (this.idleFuture != null) {
            this.idleFuture.cancel(false);
            if (!this.idleFuture.isDone()) {
                try {
                    this.idleFuture.get(this.callTimeout, TimeUnit.MILLISECONDS);
                } catch (TimeoutException var6) {
                    LOG.warn("Timeout while trying to cancel closing of idle file. Idle file close may have failed", var6);
                } catch (Exception var7) {
                    LOG.warn("Error while trying to cancel closing of idle file. ", var7);
                }
            }

            this.idleFuture = null;
        }

        if (!this.isOpen) {
            if (this.closed) {
                throw new BucketClosedException("This bucket writer was closed and this handle is thus no longer valid");
            }

            this.open();
        }

        if (this.shouldRotate()) {
            boolean doRotate = true;
            if (this.isUnderReplicated) {
                if (this.consecutiveUnderReplRotateCount >= 30) {
                    doRotate = false;
                    if (this.consecutiveUnderReplRotateCount == 30) {
                        LOG.error("Hit max consecutive under-replication rotations ({}); will not continue rolling files under this path due to under-replication", 30);
                    }
                } else {
                    LOG.warn("Block Under-replication detected. Rotating file.");
                }

                ++this.consecutiveUnderReplRotateCount;
            } else {
                this.consecutiveUnderReplRotateCount = 0;
            }

            if (doRotate) {
                this.close();
                this.open();
            }
        }

        try {
            this.sinkCounter.incrementEventDrainAttemptCount();
            this.callWithTimeout(new BucketWriter.CallRunner<Void>() {
                public Void call() throws Exception {
                    BucketWriter.this.writer.append(event);
                    return null;
                }
            });
        } catch (IOException var5) {
            LOG.warn("Caught IOException writing to HDFSWriter ({}). Closing file (" + this.bucketPath + ") and rethrowing exception.", var5.getMessage());

            try {
                this.close(true);
            } catch (IOException var4) {
                LOG.warn("Caught IOException while closing file (" + this.bucketPath + "). Exception follows.", var4);
            }

            throw var5;
        }

        this.processSize += (long)event.getBody().length;
        ++this.eventCounter;
        ++this.batchCounter;
        if (this.batchCounter == this.batchSize) {
            this.flush();
        }

    }

    private boolean shouldRotate() {
        boolean doRotate = false;
        if (this.writer.isUnderReplicated()) {
            this.isUnderReplicated = true;
            doRotate = true;
        } else {
            this.isUnderReplicated = false;
        }

        if (this.rollCount > 0L && this.rollCount <= this.eventCounter) {
            LOG.debug("rolling: rollCount: {}, events: {}", this.rollCount, this.eventCounter);
            doRotate = true;
        }

        if (this.rollSize > 0L && this.rollSize <= this.processSize) {
            LOG.debug("rolling: rollSize: {}, bytes: {}", this.rollSize, this.processSize);
            doRotate = true;
        }

        return doRotate;
    }

    private void renameBucket(String bucketPath, String targetPath, final FileSystem fs) throws IOException, InterruptedException {
        if (!bucketPath.equals(targetPath)) {
            final Path srcPath = new Path(bucketPath);
            final Path dstPath = new Path(targetPath);
            this.callWithTimeout(new BucketWriter.CallRunner<Void>() {
                public Void call() throws Exception {
                    if (fs.exists(srcPath)) {
                        BucketWriter.LOG.info("Renaming " + srcPath + " to " + dstPath);
                        BucketWriter.this.renameTries.incrementAndGet();
                        fs.rename(srcPath, dstPath);
                    }

                    return null;
                }
            });
        }
    }

    public String toString() {
        return "[ " + this.getClass().getSimpleName() + " targetPath = " + this.targetPath + ", bucketPath = " + this.bucketPath + " ]";
    }

    private boolean isBatchComplete() {
        return this.batchCounter == 0L;
    }

    private static void checkAndThrowInterruptedException() throws InterruptedException {
        Thread.currentThread();
        if (Thread.interrupted()) {
            throw new InterruptedException("Timed out before HDFS call was made. Your hdfs.callTimeout might be set too low or HDFS calls are taking too long.");
        }
    }

    private <T> T callWithTimeout(final BucketWriter.CallRunner<T> callRunner) throws IOException, InterruptedException {
        Future future = this.callTimeoutPool.submit(new Callable<T>() {
            public T call() throws Exception {
                return BucketWriter.this.proxyUser.execute(new PrivilegedExceptionAction<T>() {
                    public T run() throws Exception {
                        return callRunner.call();
                    }
                });
            }
        });

        try {
            return this.callTimeout > 0L ? future.get(this.callTimeout, TimeUnit.MILLISECONDS) : future.get();
        } catch (TimeoutException var5) {
            future.cancel(true);
            this.sinkCounter.incrementConnectionFailedCount();
            throw new IOException("Callable timed out after " + this.callTimeout + " ms on file: " + this.bucketPath, var5);
        } catch (ExecutionException var6) {
            this.sinkCounter.incrementConnectionFailedCount();
            Throwable cause = var6.getCause();
            if (cause instanceof IOException) {
                throw (IOException)cause;
            } else if (cause instanceof InterruptedException) {
                throw (InterruptedException)cause;
            } else if (cause instanceof RuntimeException) {
                throw (RuntimeException)cause;
            } else if (cause instanceof Error) {
                throw (Error)cause;
            } else {
                throw new RuntimeException(var6);
            }
        } catch (CancellationException var7) {
            throw new InterruptedException("Blocked callable interrupted by rotation event");
        } catch (InterruptedException var8) {
            LOG.warn("Unexpected Exception " + var8.getMessage(), var8);
            throw var8;
        }
    }

    private interface CallRunner<T> {
        T call() throws Exception;
    }
}

  

 

jps

[root@hadoop1 hadoop-2.9.1]# jps
2485 DataNode
2950 ResourceManager
2264 NameNode
4057 Jps
3052 NodeManager
2815 SecondaryNameNode
[root@hadoop1 hadoop-2.9.1]#

 

 

[root@hadoop2 ~]# jps
29251 DataNode
31144 Jps
[root@hadoop2 ~]#

 

[root@hadoop3 ~]# jps
18513 DataNode
1709 Jps
[root@hadoop3 ~]#

 

jps进程已经开启,但是  


<!-- Put site-specific property overrides in this file. -->
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://hadoop1:9001/</value>
</property>
<property>
<name>io.file.buffer.size</name>
<value>131072</value>
</property>
</configuration>

[root@hadoop2 hadoop-2.9.1]# cat etc/hadoop/core-site.xml

 

没有配置hdfs dataNodes

Apache Hadoop 2.9.1 – Hadoop Cluster Setup http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/ClusterSetup.html

This section deals with important parameters to be specified in the given configuration files:

  • etc/hadoop/core-site.xml
ParameterValueNotes
fs.defaultFS NameNode URI hdfs://host:port/
io.file.buffer.size 131072 Size of read/write buffer used in SequenceFiles.
  • etc/hadoop/hdfs-site.xml

  • Configurations for NameNode:

ParameterValueNotes
dfs.namenode.name.dir Path on the local filesystem where the NameNode stores the namespace and transactions logs persistently. If this is a comma-delimited list of directories then the name table is replicated in all of the directories, for redundancy.
dfs.hosts /dfs.hosts.exclude List of permitted/excluded DataNodes. If necessary, use these files to control the list of allowable datanodes.
dfs.blocksize 268435456 HDFS blocksize of 256MB for large file-systems.
dfs.namenode.handler.count 100 More NameNode server threads to handle RPCs from large number of DataNodes.
  • Configurations for DataNode:
ParameterValueNotes
dfs.datanode.data.dir Comma separated list of paths on the local filesystem of a DataNode where it should store its blocks. If this is a comma-delimited list of directories, then data will be stored in all named directories, typically on different devices.

 

 

 

<configuration>
  <property>
    <name>dfs.replication</name>
    <value>2</value>
  </property>
  <property>
    <name>dfs.datanode.data.dir</name>
    <value>file:///home/hadoop-2.9.1/mydata/datanode</value>
  </property>
  <property>
    <name>dfs.namenode.name.dir</name>
    <value>file:///home/hadoop-2.9.1/mydata/namenode</value>
  </property>
  <property>
    <name>dfs.blocksize</name>
    <value>268435456</value>
  </property>
  <property>
    <name>dfs.namenode.handler.count</name>
    <value>100</value>
  </property>  
</configuration>
[root@hadoop1 hadoop-2.9.1]# cat  etc/hadoop/hdfs-site.xml 

  

 

 

 

 

 






 

发表评论

0/200
383 点赞
0 评论
收藏
为你推荐 换一批