Ranger安装初始化过程中,有一个setupdb的过程,这个过程会对Ranger的运行环境做一些基础配置,提供基础数据。Ranger把这些一次性的功能抽象成形形色色的Patch,就是Ranger的Patch机制。
1.初始化Patch运行过程
ranger的启动入口在ranger.sh脚本中,会ranger-admin-services.sh start来启动ranger admin web server。
初始化过程如下:
ranger.sh
setup.sh
db_setup.py
import_core_db_schema // 导入Ranger Core DB Schema,脚本为ranger_core_db_mysql.sql
db_setup.py -javapatch
从数据库找到pending的Patch列表
execute_java_patches // 根据Patch列表依次启动patch进程
db_setup.py -changepassword -pair // 这里有大量用户需要修改密码
ranger-admin-services.sh start // 启动ranger admin server
启动过程中,首先会导入数据到数据库中,主要是执行ranger_core_db_mysql.sql脚本。而Ranger的Patch也是作为元数据写入到表x_db_version_h中的,例如:
INSERT INTO x_db_version_h (version,inst_at,inst_by,updated_at,updated_by,active) VALUES ('J10033',UTC_TIMESTAMP(),'Ranger 1.0.0',UTC_TIMESTAMP(),'localhost','Y');
这里需要注意的是,setup.sh并不是每次都执行,而是仅成功执行一次即可。setup.sh成功执行后,会在ranger的安装目录下创建“.setupDone”文件,以此作为初始化成功的标记。如果“.setupDone”文件存在,则不执行setup.sh脚本。
下面,就是启动对应的Patch进程了,也就是启动patch对应的main函数。
2.初始化Patch的源码
Patch源码在security-admin-web模块下,package名称前缀为“org.apache.ranger.patch”。
所有的Patch都继承了org.apache.ranger.patch.BaseLoader,代码层面提供了扩展能力。如果自己想要实现一个自定义的Patch,则只需要继承BaseLoader,实现execLoad方法即可。
当前BaseLoader已实现的Patch如下
ChangePasswordUtil
ChangeUserNameUtil
DbToSolrMigrationUtil
MetricUtil
RoleBasedUserSearchUtil
UpdateUserAndGroupNamesInJson
XXTrxLogUpdateUtil
PatchAssignSecurityZonePersmissionToAdmin_J10026
PatchForAllServiceDefUpdateForResourceSpecificAccesses_J10012
PatchForAtlasAdminAudits_J10043
PatchForAtlasResourceAndAccessTypeUpdate_J10016
PatchForAtlasServiceDefUpdate_J10013
PatchForAtlasToAddEntityLabelAndBusinessMetadata_J10034
PatchForAtlasToAddTypeRead_J10040
PatchForHBaseDefaultPolicyUpdate_J10045
PatchForHBaseServiceDefUpdate_J10035
PatchForHiveServiceDefUpdate_J10006
PatchForHiveServiceDefUpdate_J10007
PatchForHiveServiceDefUpdate_J10009
PatchForHiveServiceDefUpdate_J10010
PatchForHiveServiceDefUpdate_J10017
PatchForHiveServiceDefUpdate_J10027
PatchForHiveServiceDefUpdate_J10030
PatchForKafkaServiceDefUpdate_J10015
PatchForKafkaServiceDefUpdate_J10025
PatchForKafkaServiceDefUpdate_J10033
PatchForMigratingOldRegimePolicyJson_J10046
PatchForMigratingRangerServiceResource_J10037
PatchForNifiResourceUpdateExclude_J10011
PatchForOzoneDefaultPoliciesUpdate_J10044
PatchForOzoneServiceDefUpdate_J10041
PatchForPrestoToSupportPresto333_J10038
PatchForServiceVersionInfo_J10004
PatchForTagServiceDefUpdate_J10008
PatchForTagServiceDefUpdate_J10028
PatchForUpdatingPolicyJson_J10019
PatchForUpdatingTagsJson_J10020
PatchForXGlobalState_J10036
PatchGrantAuditPermissionToKeyRoleUser_J10014
PatchMigration_J10002
PatchPasswordEncryption_J10001
PatchPermissionModel_J10003
PatchTagModulePermission_J10005
所有的Patch类都带有main方法,可以独立运行,也是启动Patch进程的入口。
3.BaseLoader进程运行逻辑
下面,我们以org.apache.ranger.patch.PatchForKafkaServiceDefUpdate_J10033为例,探索BaseLoader main方法的运行逻辑。
ranger使用了spring框架,PatchForKafkaServiceDefUpdate_J10033作为单例存在,内部有大量的单例成员变量,都是用于完成此Patch所需要的接口。
Main方法比较简单,如下所示:
public static void main(String[] args) {
logger.info("main()");
try {
PatchForKafkaServiceDefUpdate_J10033 loader = (PatchForKafkaServiceDefUpdate_J10033) CLIUtil.getBean(PatchForKafkaServiceDefUpdate_J10033.class);
loader.init();
while (loader.isMoreToProcess()) {
loader.load();
}
logger.info("Load complete. Exiting!!!");
System.exit(0);
} catch (Exception e) {
logger.error("Error loading", e);
System.exit(1);
}
}
所有的Patch中,main方法基本结构是一致的,大部分只是类名不同。
最终会落到loader.load方法的调用,如果moreToProcess为true,则会一直执行。
@Transactional(readOnly = false, propagation = Propagation.REQUIRED)
public void load() {
if (firstCall) {
startTime = DateUtil.getUTCDate().getTime();
startProgressMonitor();
firstCall = false;
}
try {
execLoad();
if (batchSize < 0) {
moreToProcess = false;
}
} catch (Throwable t) {
logger.error("Error while loading data.", t);
moreToProcess = false;
}
if (!moreToProcess) {
long endTime = DateUtil.getUTCDate().getTime();
logger.info("###############################################");
printStats();
logger.info("Loading completed!!!. Time taken="
+ formatTimeTaken(endTime - startTime) + " for "
+ countSoFar);
logger.info("###############################################");
synchronized (twoDForm) {
twoDForm.notifyAll();
}
}
}
如果是第一次执行,则启动monitorThread线程,改线程会以30秒为间隔不停的调用printStats打印日志,直到moreToProcess为false。
这里还有个batchSize,表示是否多次执行,默认是-1,表示仅执行一次,一般在init方法中进行设置。
这里还有一个twoDForm对象,可以作为锁来使用。在monitorThread线程中,以twoDForm.wait(30 * 1000)来阻塞当前线程,使线程处于等待状态,避免不停地循环。而load()方法中对twoDForm.notifyAll()的调用表明当前Patch的核心逻辑已执行完成,monitorThread监控线程就可以结束了。
3.1.execLoad方法
execLoad方法是每个Patch所需要实现的核心逻辑。每个Patch类的execLoad都不同,需要根据自己的场景进行编码实现。
其中,PatchForKafkaServiceDefUpdate_J10033主要是针对每个kafka类型的service实例,修改一些参数,添加一些默认策略,比如“all - consumergroup”策略,当前ranger的超级用户拥有所有消费组消费的权限。
4.文章可解答的问题
当前文章可解决ranger使用过程中的以下疑问:
(1)在ranger中注册service后,一般会有一些默认策略生成,这些默认策略是怎么生成的?
就是在各种初始化Patch中定义的。
(2)默认策略中用户是谁?
不同Patch可能不相同,大部分是Service注册时配置的username。也有可能是ranger admin进程配置的超级管理员。
(3)为什么要有默认策略?
因为如果没有策略的话,所有用户都不能访问资源,包括服务端相互之间的访问,为了保证服务启动后,整体平台逻辑的完备性,需要保证平台能够正常运行,就需要为特别的用户配置一些默认策略。