Ranger初始化Patch机制


发布于 2024-04-25 / 30 阅读 / 0 评论 /
Ranger在初始化时,会进行一系列的数据库操作,以完成服务运行所需的基础环境配置。本文基于ranger-2.4.0。

Ranger安装初始化过程中,有一个setupdb的过程,这个过程会对Ranger的运行环境做一些基础配置,提供基础数据。Ranger把这些一次性的功能抽象成形形色色的Patch,就是Ranger的Patch机制。

1.初始化Patch运行过程

ranger的启动入口在ranger.sh脚本中,会ranger-admin-services.sh start来启动ranger admin web server。

初始化过程如下:

ranger.sh
   setup.sh
      db_setup.py
         import_core_db_schema // 导入Ranger Core DB Schema,脚本为ranger_core_db_mysql.sql
      db_setup.py -javapatch
         从数据库找到pending的Patch列表
         execute_java_patches // 根据Patch列表依次启动patch进程
      db_setup.py -changepassword -pair // 这里有大量用户需要修改密码
   ranger-admin-services.sh start // 启动ranger admin server

启动过程中,首先会导入数据到数据库中,主要是执行ranger_core_db_mysql.sql脚本。而Ranger的Patch也是作为元数据写入到表x_db_version_h中的,例如:

INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('J10033',UTC_TIMESTAMP(),'Ranger 1.0.0',UTC_TIMESTAMP(),'localhost','Y');

这里需要注意的是,setup.sh并不是每次都执行,而是仅成功执行一次即可。setup.sh成功执行后,会在ranger的安装目录下创建“.setupDone”文件,以此作为初始化成功的标记。如果“.setupDone”文件存在,则不执行setup.sh脚本。

下面,就是启动对应的Patch进程了,也就是启动patch对应的main函数。

2.初始化Patch的源码

Patch源码在security-admin-web模块下,package名称前缀为“org.apache.ranger.patch”。

所有的Patch都继承了org.apache.ranger.patch.BaseLoader,代码层面提供了扩展能力。如果自己想要实现一个自定义的Patch,则只需要继承BaseLoader,实现execLoad方法即可。

当前BaseLoader已实现的Patch如下

ChangePasswordUtil
ChangeUserNameUtil
DbToSolrMigrationUtil
MetricUtil
RoleBasedUserSearchUtil
UpdateUserAndGroupNamesInJson
XXTrxLogUpdateUtil
PatchAssignSecurityZonePersmissionToAdmin_J10026
PatchForAllServiceDefUpdateForResourceSpecificAccesses_J10012
PatchForAtlasAdminAudits_J10043
PatchForAtlasResourceAndAccessTypeUpdate_J10016
PatchForAtlasServiceDefUpdate_J10013
PatchForAtlasToAddEntityLabelAndBusinessMetadata_J10034
PatchForAtlasToAddTypeRead_J10040
PatchForHBaseDefaultPolicyUpdate_J10045
PatchForHBaseServiceDefUpdate_J10035
PatchForHiveServiceDefUpdate_J10006
PatchForHiveServiceDefUpdate_J10007
PatchForHiveServiceDefUpdate_J10009
PatchForHiveServiceDefUpdate_J10010
PatchForHiveServiceDefUpdate_J10017
PatchForHiveServiceDefUpdate_J10027
PatchForHiveServiceDefUpdate_J10030
PatchForKafkaServiceDefUpdate_J10015
PatchForKafkaServiceDefUpdate_J10025
PatchForKafkaServiceDefUpdate_J10033
PatchForMigratingOldRegimePolicyJson_J10046
PatchForMigratingRangerServiceResource_J10037
PatchForNifiResourceUpdateExclude_J10011
PatchForOzoneDefaultPoliciesUpdate_J10044
PatchForOzoneServiceDefUpdate_J10041
PatchForPrestoToSupportPresto333_J10038
PatchForServiceVersionInfo_J10004
PatchForTagServiceDefUpdate_J10008
PatchForTagServiceDefUpdate_J10028
PatchForUpdatingPolicyJson_J10019
PatchForUpdatingTagsJson_J10020
PatchForXGlobalState_J10036
PatchGrantAuditPermissionToKeyRoleUser_J10014
PatchMigration_J10002
PatchPasswordEncryption_J10001
PatchPermissionModel_J10003
PatchTagModulePermission_J10005

所有的Patch类都带有main方法,可以独立运行,也是启动Patch进程的入口。

3.BaseLoader进程运行逻辑

下面,我们以org.apache.ranger.patch.PatchForKafkaServiceDefUpdate_J10033为例,探索BaseLoader main方法的运行逻辑。

ranger使用了spring框架,PatchForKafkaServiceDefUpdate_J10033作为单例存在,内部有大量的单例成员变量,都是用于完成此Patch所需要的接口。

Main方法比较简单,如下所示:

   public static void main(String[] args) {
      logger.info("main()");
      try {
         PatchForKafkaServiceDefUpdate_J10033 loader = (PatchForKafkaServiceDefUpdate_J10033) CLIUtil.getBean(PatchForKafkaServiceDefUpdate_J10033.class);
         loader.init();
         while (loader.isMoreToProcess()) {
            loader.load();
         }
         logger.info("Load complete. Exiting!!!");
         System.exit(0);
      } catch (Exception e) {
         logger.error("Error loading", e);
         System.exit(1);
      }
   }

所有的Patch中,main方法基本结构是一致的,大部分只是类名不同。

最终会落到loader.load方法的调用,如果moreToProcess为true,则会一直执行。

@Transactional(readOnly = false, propagation = Propagation.REQUIRED)
public void load() {
   if (firstCall) {
      startTime =  DateUtil.getUTCDate().getTime();
      startProgressMonitor();
      firstCall = false;
   }
   try {
      execLoad();
      if (batchSize < 0) {
         moreToProcess = false;
      }
   } catch (Throwable t) {
      logger.error("Error while loading data.", t);
      moreToProcess = false;
   }
   if (!moreToProcess) {
      long endTime =  DateUtil.getUTCDate().getTime();
      logger.info("###############################################");
      printStats();
      logger.info("Loading completed!!!. Time taken="
         + formatTimeTaken(endTime - startTime) + " for "
         + countSoFar);
      logger.info("###############################################");
      synchronized (twoDForm) {
         twoDForm.notifyAll();
      }
   }
}

如果是第一次执行,则启动monitorThread线程,改线程会以30秒为间隔不停的调用printStats打印日志,直到moreToProcess为false。

这里还有个batchSize,表示是否多次执行,默认是-1,表示仅执行一次,一般在init方法中进行设置。

这里还有一个twoDForm对象,可以作为锁来使用。在monitorThread线程中,以twoDForm.wait(30 * 1000)来阻塞当前线程,使线程处于等待状态,避免不停地循环。而load()方法中对twoDForm.notifyAll()的调用表明当前Patch的核心逻辑已执行完成,monitorThread监控线程就可以结束了。

3.1.execLoad方法

execLoad方法是每个Patch所需要实现的核心逻辑。每个Patch类的execLoad都不同,需要根据自己的场景进行编码实现。

其中,PatchForKafkaServiceDefUpdate_J10033主要是针对每个kafka类型的service实例,修改一些参数,添加一些默认策略,比如“all - consumergroup”策略,当前ranger的超级用户拥有所有消费组消费的权限。

4.文章可解答的问题

当前文章可解决ranger使用过程中的以下疑问:

(1)在ranger中注册service后,一般会有一些默认策略生成,这些默认策略是怎么生成的?

就是在各种初始化Patch中定义的。

(2)默认策略中用户是谁?

不同Patch可能不相同,大部分是Service注册时配置的username。也有可能是ranger admin进程配置的超级管理员。

(3)为什么要有默认策略?

因为如果没有策略的话,所有用户都不能访问资源,包括服务端相互之间的访问,为了保证服务启动后,整体平台逻辑的完备性,需要保证平台能够正常运行,就需要为特别的用户配置一些默认策略。