/* * (c) 2021 Collibra Inc. This software is protected under international copyright law. You may only install and use this * software subject to the license agreement available at https://marketplace.collibra.com/binary-code-license-agreement/. * If such an agreement is not in place, you may not use the software. */ package com.collibra.marketplace.azure.purview.component; import java.time.Duration; import java.util.Iterator; import java.util.Set; import javax.annotation.PostConstruct; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.web.client.RestTemplateBuilder; import org.springframework.http.HttpEntity; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; import org.springframework.http.MediaType; import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Component; import org.springframework.web.client.RestTemplate; import com.collibra.marketplace.azure.purview.util.JsonUtil; import org.json.simple.JSONArray; import org.json.simple.JSONObject; @Component public class AzurePurviewSearchComponent { private static final Logger LOGGER = LoggerFactory.getLogger(AzurePurviewSearchComponent.class); @Value("${purview.rest.api.host}") private String purviewHost; @Value("${purview.rest.api.port}") private String purviewPort; @Value("${purview.rest.api.base.path}") private String purviewBasePath; @Value("${azure.connection.timeout:30000000}") private Long connectionTimeout; @Value("${azure.read.timeout:60000000}") private Long readTimeout; @Value("${azure.api.max.retry:5}") private Integer maxRetry; @Value("${azure.api.retry.delay:1000}") private Long retryDelay; @Value("${azure.uri.limit:60}") private int uriLimit; private RestTemplate purviewRestTemplate; private String postPurviewSearchUrl; private String getPurviewEntitiesUrl; private String accessToken; private AzureAuthenticationComponent authComponent; @Autowired public AzurePurviewSearchComponent(AzureAuthenticationComponent authComponent) { this.authComponent = authComponent; } @PostConstruct public void initialise() { postPurviewSearchUrl = "https://" + purviewHost + ":" + purviewPort + purviewBasePath + "/search/advanced"; getPurviewEntitiesUrl = "https://" + purviewHost + ":" + purviewPort + purviewBasePath + "/entity/bulk?"; purviewRestTemplate = new RestTemplateBuilder().setConnectTimeout(Duration.ofMillis(connectionTimeout)) .setReadTimeout(Duration.ofMillis(readTimeout)).build(); } /** * Adds the default request header for setting the content type to * application/json, and OAuth access token * * @param headers The HttpHeaders to be set. * @return The headers object with the included content type. */ private HttpHeaders addAuthenticationToHeaders(HttpHeaders headers) { accessToken = authComponent.getAccessToken(); headers.add("Authorization", "Bearer " + accessToken); headers.setContentType(MediaType.APPLICATION_JSON); LOGGER.trace("Search Request Headers::\n{}", headers); return headers; } /** * Return the entities matching query parameter from the purview API. * * @param limit Number of maximum results to return in a call to the purview * API. * @return The JSONArray with the results. */ public JSONArray searchEntity(String assetType, Integer limit) { return (JSONArray) searchEntityFull(assetType, limit, true).get("entities"); } /** * Return the entities matching query parameter from the purview API. * * @param limit Number of maximum results to return in a call to the purview * API. * @return The JSONArray with the results. */ public JSONArray searchEntity(String assetType, Integer limit, Boolean minExtInfo, JSONArray additionalFilters) { return (JSONArray) searchEntityFull(assetType, limit, minExtInfo, additionalFilters).get("entities"); } /** * Return the entities matching query parameter from the purview API. * * @param limit Number of maximum results to return in a call to the purview * API. * @return The JSONArray with the results. */ public JSONObject searchEntityFull(String assetType, Integer limit, Boolean minExtInfo) { return searchEntityFull(assetType, limit, minExtInfo, null); } public JSONObject searchEntityFull(String assetType, Integer limit, Boolean minExtInfo, JSONArray additionalFilters) { return searchEntityFull(assetType, limit, minExtInfo, additionalFilters, false); } /** * Return the entities matching query parameter from the purview API. * * @param limit Number of maximum results to return in a call to the purview * API. * @return The JSONArray with the results. */ @SuppressWarnings("unchecked") public JSONObject searchEntityFull(String assetType, Integer limit, Boolean minExtInfo, JSONArray additionalFilters, Boolean includeSubTypes) { JSONArray jsonResults = new JSONArray(); JSONArray jsonRelatedResults = new JSONArray(); JSONObject jsonTotalResults = new JSONObject(); JSONObject entityFilterJSON = new JSONObject(); entityFilterJSON.put("entityType", assetType); entityFilterJSON.put("includeSubTypes", includeSubTypes); JSONArray filters = new JSONArray(); filters.add(entityFilterJSON); if (additionalFilters != null) { filters.addAll(additionalFilters); } JSONObject filterJSON = new JSONObject(); filterJSON.put("and", filters); JSONObject searchJSON = new JSONObject(); searchJSON.put("keywords", "*"); searchJSON.put("limit", limit); searchJSON.put("filter", filterJSON); Long offset = 0L; Long limitLong = new Long(limit); LOGGER.info("Initiating Search for :: {}", assetType); while (true) { int tryCount = 0; try { searchJSON.put("offset", offset); LOGGER.info( "Preparing and posting to Purview advanced search for {} starting at {} and limit to {} with:\n{}", assetType, offset, limitLong, searchJSON); HttpHeaders headers = new HttpHeaders(); headers = this.addAuthenticationToHeaders(headers); HttpEntity entity = new HttpEntity(searchJSON.toString(), headers); ResponseEntity response = purviewRestTemplate.exchange(postPurviewSearchUrl, HttpMethod.POST, entity, String.class); LOGGER.debug("Search Response :: {}", response.toString()); JSONObject jsonResponse = new JsonUtil().parseJsonStringToObject(response.getBody()); LOGGER.debug("Response Json :: {}", jsonResponse.toString()); Long searchCount = (Long) jsonResponse.get("@search.count"); if (searchCount > 100000L) { LOGGER.error("There are {} entities of type {}. Due to a limitation on the Azure Purview API, not more than 100,000 entities can be retrieved. Therefore, the connector will not retrieve any of these entities.",searchCount,assetType); break; } JSONArray valueArray = (JSONArray) jsonResponse.get("value"); if (valueArray != null && valueArray.size() > 0) { JSONObject jsonEntityResults = entityResults(valueArray, "id", minExtInfo); jsonResults.addAll((JSONArray) jsonEntityResults.get("entities")); jsonRelatedResults.addAll((JSONArray) jsonEntityResults.get("referredEntities")); } LOGGER.debug("Current Search Count for :: {}, {}", assetType, searchCount); if ((offset + limitLong) >= searchCount) break; else offset = offset + limit; // End happy path logic } catch (Exception e) { LOGGER.error(e.getLocalizedMessage()); tryCount++; if (tryCount > maxRetry) { throw new RuntimeException(e); } else try { Thread.sleep(retryDelay); } catch (InterruptedException ie) { LOGGER.error(ie.getLocalizedMessage()); } } } jsonTotalResults.put("entities", jsonResults); jsonTotalResults.put("relatedEntities", jsonRelatedResults); LOGGER.info("Finished Search for :: {}\n", assetType); return jsonTotalResults; } public JSONArray entityDetails(JSONArray entitiesArray) { JSONArray entityResults = null; if (entitiesArray != null && entitiesArray.size() > 0) { JSONObject jsonResponse = entityResults(entitiesArray, "id"); entityResults = (JSONArray) jsonResponse.get("entities"); } return entityResults; } public JSONObject entityResults(JSONArray entitiesArray, String idField) { return entityResults(entitiesArray, idField, true); } @SuppressWarnings("unchecked") public JSONObject entityResults(JSONArray entitiesArray, String idField, Boolean minExtInfo) { JSONObject jsonResponse = null; JSONArray jsonEntities = new JSONArray(); JSONArray jsonReferredEntities = new JSONArray(); if (entitiesArray != null && entitiesArray.size() > 0) { jsonResponse = new JSONObject(); int pageCount = 0; StringBuilder sb = null; for (int i = 0; i < entitiesArray.size(); i++) { if (pageCount == 0) { sb = new StringBuilder(); sb.append(getPurviewEntitiesUrl); } else { sb.append("&"); } sb.append("guid="); JSONObject entity = (JSONObject) entitiesArray.get(i); sb.append((String) entity.get(idField)); pageCount++; if (pageCount == uriLimit) { ResponseEntity pageResponse = getBulkEntities(sb, minExtInfo); LOGGER.debug("Entity GUID Search Response at {} :: {}", i, pageResponse.toString()); String body = pageResponse.getBody(); if(body != null) { JSONObject jsonPagedResponse = new JsonUtil().parseJsonStringToObject(body); LOGGER.debug("Response Json :: {}", jsonPagedResponse.toString()); // process jsonEntities.addAll((JSONArray) jsonPagedResponse.get("entities")); JSONObject referredRecords = (JSONObject) jsonPagedResponse.get("referredEntities"); if (referredRecords != null) { Iterator referredIds = referredRecords.keySet().iterator(); // Find all the related entities while (referredIds.hasNext()) { String referredId = referredIds.next(); jsonReferredEntities.add(referredRecords.get(referredId)); } } } // Reset pageCount = 0; } } // Catch remaining records if (pageCount > 0) { ResponseEntity pageResponse = getBulkEntities(sb, minExtInfo); LOGGER.debug("Entity GUID Search Response at {} :: {}", entitiesArray.size(), pageResponse.toString()); JSONObject jsonPagedResponse = new JsonUtil().parseJsonStringToObject(pageResponse.getBody()); LOGGER.debug("Response Json :: {}", jsonPagedResponse.toString()); // process jsonEntities.addAll((JSONArray) jsonPagedResponse.get("entities")); JSONObject referredRecords = (JSONObject) jsonPagedResponse.get("referredEntities"); if (referredRecords != null) { Iterator referredIds = referredRecords.keySet().iterator(); // Find all the related entities while (referredIds.hasNext()) { String referredId = referredIds.next(); jsonReferredEntities.add(referredRecords.get(referredId)); } } } jsonResponse.put("entities", jsonEntities); jsonResponse.put("referredEntities", jsonReferredEntities); } return jsonResponse; } private ResponseEntity getBulkEntities(StringBuilder sbBulkSearch, Boolean minExtInfo) { int tryCount = 0; while (true) { try { HttpHeaders headers = new HttpHeaders(); headers = this.addAuthenticationToHeaders(headers); HttpEntity entity = new HttpEntity(headers); if (minExtInfo != null) sbBulkSearch.append("&minExtInfo=" + minExtInfo.toString()); LOGGER.debug("Bulk GUID search URL ::\n{}", sbBulkSearch.toString()); return purviewRestTemplate.exchange(sbBulkSearch.toString(), HttpMethod.GET, entity, String.class); } catch (Exception e) { LOGGER.error(e.getLocalizedMessage()); tryCount++; if (tryCount > maxRetry) { throw new RuntimeException(e); } else try { Thread.sleep(retryDelay); } catch (InterruptedException ie) { LOGGER.error(ie.getLocalizedMessage()); } } } } @SuppressWarnings("unchecked") public void filterByWorkspaces(JSONArray entitiesArray, Set workspaceNames, Set workspaceIds) { entitiesArray.removeIf(entity -> !workspaceNames.contains(getWorkspaceName((JSONObject) entity))); entitiesArray.removeIf(entity -> !workspaceIds.contains(getWorkspaceId((JSONObject) entity))); } private String getWorkspaceName(JSONObject entity) { return (String) ((JSONObject) entity.get("attributes")).get("workspaceName"); } private String getWorkspaceId(JSONObject entity) { return (String) ((JSONObject) ((JSONObject) entity.get("relationshipAttributes")).get("workspace")).get("guid"); } }