包裝在線設(shè)計(jì)網(wǎng)站谷歌seo靠譜嗎
前言
前期實(shí)現(xiàn)了導(dǎo)入MySQL元數(shù)據(jù)到Apache Atlas, 由于是初步版本,且功能參照Atlas Hive Hook,實(shí)現(xiàn)的不夠完美
本期對(duì)功能進(jìn)行改進(jìn),實(shí)現(xiàn)了導(dǎo)入多種關(guān)系型數(shù)據(jù)庫元數(shù)據(jù)到Apache Atlas
數(shù)據(jù)庫schema與catalog
按照SQL
標(biāo)準(zhǔn)的解釋,在SQL
環(huán)境下Catalog
和Schema
都屬于抽象概念,可以把它們理解為一個(gè)容器或者數(shù)據(jù)庫對(duì)象命名空間中的一個(gè)層次,主要用來解決命名沖突問題。從概念上說,一個(gè)數(shù)據(jù)庫系統(tǒng)包含多個(gè)Catalog
,每個(gè)Catalog
又包含多個(gè)Schema
,而每個(gè)Schema
又包含多個(gè)數(shù)據(jù)庫對(duì)象(表、視圖、字段等),反過來講一個(gè)數(shù)據(jù)庫對(duì)象必然屬于一個(gè)Schema
,而該Schema
又必然屬于一個(gè)Catalog
,這樣我們就可以得到該數(shù)據(jù)庫對(duì)象的完全限定名稱,從而解決命名沖突的問題了;例如數(shù)據(jù)庫對(duì)象表的完全限定名稱就可以表示為:Catalog
名稱.Schema
名稱.表名稱。這里還有一點(diǎn)需要注意的是,SQL
標(biāo)準(zhǔn)并不要求每個(gè)數(shù)據(jù)庫對(duì)象的完全限定名稱是唯一的。
從實(shí)現(xiàn)的角度來看,各種數(shù)據(jù)庫系統(tǒng)對(duì)Catalog
和Schema
的支持和實(shí)現(xiàn)方式千差萬別,針對(duì)具體問題需要參考具體的產(chǎn)品說明書,比較簡單而常用的實(shí)現(xiàn)方式是使用數(shù)據(jù)庫名作為Catalog
名,使用用戶名作為Schema
名,具體可參見下表:
表1 常用數(shù)據(jù)庫
供應(yīng)商 | Catalog支持 | Schema支持 |
---|---|---|
Oracle | 不支持 | Oracle User ID |
MySQL | 不支持 | 數(shù)據(jù)庫名 |
MS SQL Server | 數(shù)據(jù)庫名 | 對(duì)象屬主名,2005版開始有變 |
DB2 | 指定數(shù)據(jù)庫對(duì)象時(shí),Catalog部分省略 | Catalog屬主名 |
Sybase | 數(shù)據(jù)庫名 | 數(shù)據(jù)庫屬主名 |
Informix | 不支持 | 不需要 |
PointBase | 不支持 | 數(shù)據(jù)庫名 |
原文:https://www.cnblogs.com/ECNB/p/4611309.html
元數(shù)據(jù)模型層級(jí)抽象
不同的關(guān)系型數(shù)據(jù)庫,其數(shù)據(jù)庫模式有所區(qū)別,對(duì)應(yīng)與下面的層級(jí)關(guān)系
- Datasource -> Catalog -> Schema -> Table -> Column
- Datasource -> Catalog -> Table -> Column
- Datasource -> Schema -> Table -> Column
元數(shù)據(jù)轉(zhuǎn)換設(shè)計(jì)
提供元數(shù)據(jù)
借鑒Apache DolphinScheduler中獲取Connection
的方式,不多贅述。
public Connection getConnection(DbType dbType, ConnectionParam connectionParam) throws ExecutionException {BaseConnectionParam baseConnectionParam = (BaseConnectionParam) connectionParam;String datasourceUniqueId = DataSourceUtils.getDatasourceUniqueId(baseConnectionParam, dbType);logger.info("Get connection from datasource {}", datasourceUniqueId);DataSourceClient dataSourceClient = uniqueId2dataSourceClientCache.get(datasourceUniqueId, () -> {Map<String, DataSourceChannel> dataSourceChannelMap = dataSourcePluginManager.getDataSourceChannelMap();DataSourceChannel dataSourceChannel = dataSourceChannelMap.get(dbType.getDescp());if (null == dataSourceChannel) {throw new RuntimeException(String.format("datasource plugin '%s' is not found", dbType.getDescp()));}return dataSourceChannel.createDataSourceClient(baseConnectionParam, dbType);});return dataSourceClient.getConnection();}
轉(zhuǎn)換元數(shù)據(jù)
- 元數(shù)據(jù)模型
創(chuàng)建數(shù)據(jù)庫的元數(shù)據(jù)模型
private AtlasEntityDef createJdbcDatabaseDef() {AtlasEntityDef typeDef = createClassTypeDef(DatabaseProperties.JDBC_TYPE_DATABASE,Collections.singleton(DatabaseProperties.ENTITY_TYPE_DATASET),createOptionalAttrDef(DatabaseProperties.ATTR_URL, "string"),createOptionalAttrDef(DatabaseProperties.ATTR_DRIVER_NAME, "string"),createOptionalAttrDef(DatabaseProperties.ATTR_PRODUCT_NAME, "string"),createOptionalAttrDef(DatabaseProperties.ATTR_PRODUCT_VERSION, "string"));typeDef.setServiceType(DatabaseProperties.ENTITY_SERVICE_TYPE);return typeDef;
}
創(chuàng)建數(shù)據(jù)庫模式的元數(shù)據(jù)模型
private AtlasEntityDef createJdbcSchemaDef() {AtlasEntityDef typeDef = AtlasTypeUtil.createClassTypeDef(SchemaProperties.JDBC_TYPE_SCHEMA,Collections.singleton(SchemaProperties.ENTITY_TYPE_DATASET));typeDef.setServiceType(SchemaProperties.ENTITY_SERVICE_TYPE);typeDef.setOptions(new HashMap<>() {{put("schemaElementsAttribute", "tables");}});return typeDef;
}
創(chuàng)建數(shù)據(jù)庫表的元數(shù)據(jù)模型
private AtlasEntityDef createJdbcTableDef() {AtlasEntityDef typeDef = createClassTypeDef(TableProperties.JDBC_TYPE_TABLE,Collections.singleton(TableProperties.ENTITY_TYPE_DATASET),createOptionalAttrDef(TableProperties.ATTR_TABLE_TYPE, "string"));typeDef.setServiceType(BaseProperties.ENTITY_SERVICE_TYPE);typeDef.setOptions(new HashMap<>() {{put("schemaElementsAttribute", "columns");}});return typeDef;
}
創(chuàng)建數(shù)據(jù)庫列的元數(shù)據(jù)模型
private AtlasEntityDef createJdbcColumnDef() {AtlasEntityDef typeDef = createClassTypeDef(ColumnProperties.JDBC_TYPE_COLUMN,Collections.singleton(ColumnProperties.ENTITY_TYPE_DATASET),createOptionalAttrDef(ColumnProperties.ATTR_COLUMN_TYPE, "string"),createOptionalAttrDef(ColumnProperties.ATTR_IS_PRIMARY_KEY, "string"),createOptionalAttrDef(ColumnProperties.ATTR_COLUMN_IS_NULLABLE, "string"),createOptionalAttrDef(ColumnProperties.ATTR_COLUMN_DEFAULT_VALUE, "string"),createOptionalAttrDef(ColumnProperties.ATTR_COLUMN_AUTO_INCREMENT, "string"));typeDef.setServiceType(BaseProperties.ENTITY_SERVICE_TYPE);HashMap<String, String> options = new HashMap<>() {{put("schemaAttributes", "[\"name\", \"isPrimaryKey\", \"columnType\", \"isNullable\" , \"isAutoIncrement\", \"description\"]");}};typeDef.setOptions(options);return typeDef;
}
創(chuàng)建實(shí)體之間的關(guān)系模型
private List<AtlasRelationshipDef> createAtlasRelationshipDef() {String version = "1.0";// 數(shù)據(jù)庫和模式的關(guān)系AtlasRelationshipDef databaseSchemasDef = createRelationshipTypeDef(BaseProperties.RELATIONSHIP_DATABASE_SCHEMAS,BaseProperties.RELATIONSHIP_DATABASE_SCHEMAS,version, COMPOSITION, AtlasRelationshipDef.PropagateTags.NONE,createRelationshipEndDef(BaseProperties.JDBC_TYPE_DATABASE, "schemas", SET, true),createRelationshipEndDef(BaseProperties.JDBC_TYPE_SCHEMA, "database", SINGLE, false));databaseSchemasDef.setServiceType(BaseProperties.ENTITY_SERVICE_TYPE);AtlasRelationshipDef databaseTablesDef = createRelationshipTypeDef(BaseProperties.RELATIONSHIP_DATABASE_TABLES,BaseProperties.RELATIONSHIP_DATABASE_TABLES,version, AGGREGATION, AtlasRelationshipDef.PropagateTags.NONE,createRelationshipEndDef(BaseProperties.JDBC_TYPE_DATABASE, "tables", SET, true),createRelationshipEndDef(BaseProperties.JDBC_TYPE_TABLE, "database", SINGLE, false));databaseTablesDef.setServiceType(BaseProperties.ENTITY_SERVICE_TYPE);// 模式和數(shù)據(jù)表的關(guān)系// 注意 schema 已經(jīng)被使用, 需要更換否則會(huì)沖突, 例如改為 Jschema(jdbc_schema)AtlasRelationshipDef schemaTablesDef = createRelationshipTypeDef(BaseProperties.RELATIONSHIP_SCHEMA_TABLES,BaseProperties.RELATIONSHIP_SCHEMA_TABLES,version, AGGREGATION, AtlasRelationshipDef.PropagateTags.NONE,createRelationshipEndDef(BaseProperties.JDBC_TYPE_SCHEMA, "tables", SET, true),createRelationshipEndDef(BaseProperties.JDBC_TYPE_TABLE, "Jschema", SINGLE, false));schemaTablesDef.setServiceType(BaseProperties.ENTITY_SERVICE_TYPE);// 表和數(shù)據(jù)列的關(guān)系AtlasRelationshipDef tableColumnsDef = createRelationshipTypeDef(BaseProperties.RELATIONSHIP_TABLE_COLUMNS,BaseProperties.RELATIONSHIP_TABLE_COLUMNS,version, COMPOSITION, AtlasRelationshipDef.PropagateTags.NONE,createRelationshipEndDef(BaseProperties.JDBC_TYPE_TABLE, "columns", SET, true),createRelationshipEndDef(BaseProperties.JDBC_TYPE_COLUMN, "table", SINGLE, false));tableColumnsDef.setServiceType(BaseProperties.ENTITY_SERVICE_TYPE);return Arrays.asList(databaseSchemasDef, databaseTablesDef, schemaTablesDef, tableColumnsDef);
}
-
提取元數(shù)據(jù)
不再贅述
-
轉(zhuǎn)換元數(shù)據(jù)
使用工廠模式,提供不同類型的元數(shù)據(jù)轉(zhuǎn)換方式
public interface JdbcTransferFactory {JdbcTransfer getTransfer(DatabaseMetaData metaData, AtlasClientV2 client);boolean supportType(String type);String getName();
}
List ignorePatterns 用來過濾不想導(dǎo)入的數(shù)據(jù)庫元數(shù)據(jù),例如mysql
的information_schema
public interface JdbcTransfer {void transfer();JdbcTransfer setIgnorePatterns(List<Pattern> ignorePatterns);
}
舉例:JdbcMysqlTransfer 和 MysqlTransferFactory
@AutoService(JdbcTransferFactory.class)
public class MysqlTransferFactory implements JdbcTransferFactory {public static final String MYSQL = "mysql";@Overridepublic JdbcTransfer getTransfer(DatabaseMetaData metaData, AtlasClientV2 client) {return new JdbcMysqlTransfer(metaData, client);}@Overridepublic boolean supportType(String type) {return MYSQL.equalsIgnoreCase(type);}@Overridepublic String getName() {return MYSQL;}
}
public class JdbcMysqlTransfer implements JdbcTransfer {private final Jdbc jdbc;private final AtlasService atlasService;private List<Pattern> ignorePatterns;public JdbcMysqlTransfer(DatabaseMetaData metaData, AtlasClientV2 client) {this.jdbc = new Jdbc(new JdbcMetadata(metaData));this.atlasService = new AtlasService(client);this.ignorePatterns = Collections.emptyList();}@Overridepublic JdbcTransfer setIgnorePatterns(List<Pattern> ignorePatterns) {this.ignorePatterns = ignorePatterns;return this;}private boolean tableIsNotIgnored(String tableName) {return ignorePatterns.stream().noneMatch(regex -> regex.matcher(tableName).matches());}@Overridepublic void transfer() {// 1.數(shù)據(jù)庫實(shí)體轉(zhuǎn)換DatabaseTransfer databaseTransfer = new DatabaseTransfer(atlasService);AtlasEntity databaseEntity = databaseTransfer.apply(jdbc);// 2.表實(shí)體轉(zhuǎn)換String catalog = (String) databaseEntity.getAttribute(BaseProperties.ATTR_NAME);List<AtlasEntity> tableEntities = jdbc.getTables(catalog, catalog).parallelStream().filter(jdbcTable -> tableIsNotIgnored(jdbcTable.getTableName())).map(new TableTransfer(atlasService, databaseEntity)).toList();// 3.列轉(zhuǎn)換for (AtlasEntity tableEntity : tableEntities) {String tableName = (String) tableEntity.getAttribute(BaseProperties.ATTR_NAME);List<JdbcPrimaryKey> primaryKeys = jdbc.getPrimaryKeys(catalog, tableName);jdbc.getColumns(catalog, catalog, tableName).parallelStream().forEach(new ColumnTransfer(atlasService, tableEntity, primaryKeys));}}}
- 元數(shù)據(jù)存入Atlas
public class DatabaseTransfer implements Function<Jdbc, AtlasEntity> {private final AtlasService atlasService;public DatabaseTransfer(AtlasService atlasService) {this.atlasService = atlasService;}@Overridepublic AtlasEntity apply(Jdbc jdbc) {String userName = jdbc.getUserName();String driverName = jdbc.getDriverName();String productName = jdbc.getDatabaseProductName();String productVersion = jdbc.getDatabaseProductVersion();String url = jdbc.getUrl();String urlWithNoParams = url.contains("?") ? url.substring(0, url.indexOf("?")) : url;String catalogName = urlWithNoParams.substring(urlWithNoParams.lastIndexOf("/") + 1);// 特殊處理 Oracleif (productName.equalsIgnoreCase("oracle")){catalogName = userName.toUpperCase();urlWithNoParams = urlWithNoParams + "/" + catalogName;}DatabaseProperties properties = new DatabaseProperties();properties.setQualifiedName(urlWithNoParams);properties.setDisplayName(catalogName);properties.setOwner(userName);properties.setUrl(url);properties.setDriverName(driverName);properties.setProductName(productName);properties.setProductVersion(productVersion);// 1.創(chuàng)建Atlas EntityAtlasEntity atlasEntity = new AtlasEntity(DatabaseProperties.JDBC_TYPE_DATABASE, properties.getAttributes());// 2.判斷是否存在實(shí)體, 存在則填充GUIDMap<String, String> searchParam = Collections.singletonMap(DatabaseProperties.ATTR_QUALIFIED_NAME, urlWithNoParams);Optional<AtlasEntityHeader> entityHeader = atlasService.checkAtlasEntityExists(DatabaseProperties.JDBC_TYPE_DATABASE, searchParam);entityHeader.ifPresent(header -> atlasEntity.setGuid(header.getGuid()));// 3,存儲(chǔ)或者更新到Atlas中if (entityHeader.isPresent()){atlasService.createAtlasEntity(new AtlasEntity.AtlasEntityWithExtInfo(atlasEntity));}else {AtlasEntityHeader header = atlasService.createAtlasEntity(new AtlasEntity.AtlasEntityWithExtInfo(atlasEntity));atlasEntity.setGuid(header.getGuid());}return atlasEntity;}
}
效果展示
- 元數(shù)據(jù)類型定義
- 測試導(dǎo)入元數(shù)據(jù)
由于mysql沒有采用schema,因此jdbc_schema為空
如圖所示,可以清晰的了解mysql數(shù)據(jù)庫中demo數(shù)據(jù)庫的數(shù)據(jù)表內(nèi)容
數(shù)據(jù)表元數(shù)據(jù),qualifiedName使用數(shù)據(jù)庫連接url
.表名
如同所示,數(shù)據(jù)表內(nèi)各個(gè)列的元數(shù)據(jù);可以清晰的了解該數(shù)據(jù)表的各個(gè)字段信息