帮助文档 Logo
平台使用
阿里云
百度云
移动云
智算服务
教育生态
登录 →
帮助文档 Logo
平台使用 阿里云 百度云 移动云 智算服务 教育生态
登录
  1. 首页
  2. 阿里云
  3. 日志服务
  4. 操作指南
  5. 查询与分析
  6. 最佳实践
  7. 同步定时SQL

同步定时SQL

  • 最佳实践
  • 发布于 2025-04-22
  • 0 次阅读
文档编辑
文档编辑

本文向您介绍将定时SQL任务同步到其他Project中。

配置文件说明​

[
  {
    "target_schedule_sql_config": {
      "s_sql_job_name": "sql-1704166982-166294",
      "project": "sls-ml-demo",
      "endpoint": "cn-chengdu.log.aliyuncs.com"
    },
    "newly_job_config": {
      "description": "",
      "displayName": "",
      "fromTime": 0,
      "toTime": 0,
      "source": {
        "project": "sls-ml-demo",
        "logstore": "cdn_access_log",
        "endpoint": "cn-chengdu.log.aliyuncs.com",
        "roleArn": "acs:ram::xxxxx:role/aliyunlogetlrole"
      },
      "destination": {
        "project": "sls-ml-demo",
        "logstore": "test_temp",
        "endpoint": "cn-chengdu-intranet.log.aliyuncs.com",
        "roleArn": "acs:ram::xxxxx:role/aliyunlogetlrole"
      }
    }
  }
]

以上配置内容中:

  • target_schedule_sql_config字段:描述原始定时SQL任务的基本信息。其中 s_sql_job_name为任务名,您可以通过控制台查看定时SQL任务基础信息。

  • newly_job_config字段:描述将上述定时SQL任务复制到哪个Project中。

    • description和displayName字段如果为空字符串,则使用原始的Job的相关字段进行填充

    • fromTime和toTime字段用来设置,新创建任务的开始时间和结束时间,如果设置的值小于等于零,则复用原始Job的时间区间。

    • source字段用来设置,新的Job创建在哪个Project中。

    • destination字段用来设置,新的Job中的SQL结果数据存储到哪个Logstore中。如果 destination 和 source 在相同的Region,则将destination中的endpoint设置为私网地址,可以降低流量费用。

    • roleArn字段代表写目标授权角色ARN。

示例代码​

# -*- coding: utf-8 -*-
import json
import time

from aliyun.log import *
from aliyun.log.scheduled_sql import *

global_ak_id = ""
global_ak_key = ""

client_map = {}


def get_sls_client(endpoint: str) -> LogClient:
    sls_client = LogClient(endpoint, global_ak_id, global_ak_key)
    if endpoint in client_map.keys():
        return client_map[endpoint]
    client_map[endpoint] = sls_client
    return sls_client


def check_store_item(store_item: dict):
    key_names = ["project", "logstore", "endpoint"]
    for key in key_names:
        if key not in store_item.keys():
            raise ValueError(f"logstore config miss key {key}")
        if len(store_item[key]) == "":
            raise ValueError(f"logstore config miss value [{key}]")


def get_schedule_sql_job_config(endpoint: str, project: str, ssql_job_name: str) -> dict:
    sls_client = get_sls_client(endpoint)
    ssql_job_resp = sls_client.get_scheduled_sql(project, ssql_job_name)
    ssql_job = ssql_job_resp.get_scheduled_sql()
    # print(type(ssql_job))
    # print(json.dumps(ssql_job))
    return ssql_job


def make_schedule_sql_name() -> str:
    # sql-1704166982-166294
    now_stamp = int(time.time())
    postfix = time.time_ns() % 1000000
    job_name = f"sql-{now_stamp}-{postfix}"
    return job_name


def create_schedule_sql(s_sql_config: dict):
    """
    1. 仅拷贝原始的S-SQL中的Query部分的配置,在新建任务中,需要确认对应的开始和结束时间
    2. 要判断下 source 和 destination 中的 logstore 是否在相同的Region,如果是相同的Region,则使用内网地址;
       如果是不同Region则需要使用公网地址,但是公网地址会产生费用
    """

    def make_scheduled_sql_schedule(origin_ssql_job: dict):
        origin_job_schedule = origin_ssql_job["schedule"]
        job_schedule = JobSchedule()
        job_schedule.setJobName("")
        job_schedule.setDisplayName("")
        job_schedule.setDescription("")
        job_schedule.setType(origin_job_schedule["type"])
        job_schedule.setInterval(origin_job_schedule["interval"])
        job_schedule.setDelay(origin_job_schedule["delay"])
        job_schedule.setRunImmediately(origin_job_schedule["runImmediately"])
        if "hour" in origin_job_schedule.keys():
            job_schedule.setHour(origin_job_schedule["hour"])
        if "dayOfWeek" in origin_job_schedule.keys():
            job_schedule.setDayOfWeek(origin_job_schedule["dayOfWeek"])
        if "timeZone" in origin_job_schedule.keys():
            job_schedule.setTimeZone(origin_job_schedule["timeZone"])
        if "cronExpression" in origin_job_schedule.keys():
            job_schedule.setCronExpression(origin_job_schedule["cronExpression"])
        return job_schedule

    def make_scheduled_sql_config(origin_ssql_job: dict, from_time: int, to_time: int, source_store_config: dict, dest_store_config: dict):
        source_role_arn = source_store_config["roleArn"]
        dest_role_arn = dest_store_config["roleArn"]

        origin_job_config = origin_ssql_job["configuration"]
        schedule_sql_config = ScheduledSQLConfiguration()
        schedule_sql_config.setScript(origin_job_config["script"])
        schedule_sql_config.setSqlType(origin_job_config["sqlType"])
        schedule_sql_config.setRoleArn(origin_job_config["roleArn"])
        if len(source_role_arn) > 0:
            schedule_sql_config.setRoleArn(source_role_arn)
        schedule_sql_config.setDestRoleArn(origin_job_config["destRoleArn"])
        if len(dest_role_arn) > 0:
            schedule_sql_config.setDestRoleArn(dest_role_arn)
        schedule_sql_config.setSourceLogstore(origin_job_config["sourceLogstore"])
        if len(source_store_config["logstore"]) > 0:
            schedule_sql_config.setSourceLogstore(source_store_config["logstore"])

        schedule_sql_config.setDestEndpoint(origin_job_config["destEndpoint"])
        schedule_sql_config.setDestProject(origin_job_config["destProject"])
        schedule_sql_config.setDestLogstore(origin_job_config["destLogstore"])
        schedule_sql_config.setDestRoleArn(origin_job_config["destRoleArn"])
        if len(dest_store_config["project"]) > 0:
            schedule_sql_config.setDestProject(dest_store_config["project"])
        if len(dest_store_config["logstore"]) > 0:
            schedule_sql_config.setDestLogstore(dest_store_config["logstore"])
        if len(dest_store_config["endpoint"]) > 0:
            schedule_sql_config.setDestEndpoint(dest_store_config["endpoint"])

        schedule_sql_config.setMaxRetries(origin_job_config["maxRetries"])
        schedule_sql_config.setMaxRunTimeInSeconds(origin_job_config["maxRunTimeInSeconds"])
        schedule_sql_config.setDataFormat(origin_job_config["dataFormat"])
        schedule_sql_config.setResourcePool(origin_job_config["resourcePool"])
        schedule_sql_config.setFromTime(origin_job_config["fromTime"])
        schedule_sql_config.setFromTimeExpr(origin_job_config["fromTimeExpr"])
        if from_time > 0:
            schedule_sql_config.setFromTime(from_time)
        schedule_sql_config.setToTime(origin_job_config["toTime"])
        schedule_sql_config.setToTimeExpr(origin_job_config["toTimeExpr"])
        if to_time > 0:
            schedule_sql_config.setToTime(to_time)
        schedule_sql_config.setParameters(origin_job_config["parameters"])
        return schedule_sql_config

    target_job_config = s_sql_config["target_schedule_sql_config"]
    ssql_job_name = target_job_config["s_sql_job_name"]
    project = target_job_config["project"]
    endpoint = target_job_config["endpoint"]
    ssql_job = get_schedule_sql_job_config(endpoint, project, ssql_job_name)
    newly_job_config = s_sql_config["newly_job_config"]
    source_config = newly_job_config["source"]
    dest_config = newly_job_config["destination"]
    check_store_item(source_config)
    check_store_item(dest_config)

    from_time, to_time = newly_job_config["fromTime"], newly_job_config["toTime"]
    schedule_sql_config = make_scheduled_sql_config(ssql_job, from_time, to_time, source_config, dest_config)
    job_schedule = make_scheduled_sql_schedule(ssql_job)
    scheduled_sql = ScheduledSQL()
    job_name = make_schedule_sql_name()
    scheduled_sql.setName(job_name)
    scheduled_sql.setConfiguration(schedule_sql_config)
    scheduled_sql.setSchedule(job_schedule)
    if len(newly_job_config["description"]) > 0:
        scheduled_sql.setDescription(newly_job_config["description"])
    else:
        scheduled_sql.setDescription(ssql_job["description"])
    if len(newly_job_config["displayName"]) > 0:
        scheduled_sql.setDisplayName(newly_job_config["displayName"])
    else:
        scheduled_sql.setDisplayName(ssql_job["displayName"])

    sls_client = get_sls_client(source_config["endpoint"])
    sls_client.create_scheduled_sql(source_config["project"], scheduled_sql)
    print(f"sync to \n\tsrc_project {source_config}\n\tdest_project {dest_config}\n\tjob_name {job_name}")


if __name__ == "__main__":
    sync_store_config_path = "./sls_tools/sync_ssql.json"
    with open(sync_store_config_path, "r") as reader:
        sync_map = json.load(reader)
    for ssql_config in sync_map:
        try:
            create_schedule_sql(ssql_config)
        except Exception as e:
            print(e)
相关文章

常见分析案例 2025-04-22 10:56

本文为您提供日志数据分析的一些案例。 5分钟错误率超过40%时触发报警 统计每分

提高查询分析日志速度的方法 2025-04-22 10:56

您可以使用以下方式,提高日志查询分析的速度。 增加Shard数量或开启SQL独享版

使用SQL语句查询分析日志 2025-04-22 10:56

当您需要使用SQL查询和分析日志服务中的数据时,可以通过JDBC、Python MySQLdb、MySQL命令行工具等方式连接日志服务来实现。本文主要为您介绍JDBC、Python MySQLdb、MySQL命令行工具连接日志服务的操作步骤。 使用限制

查询和分析网站日志 2025-04-22 10:56

本文以查询和分析网站日志为例,帮助您快速上手查询和分析操作。 前提条件 已采集到网站访问日志。配置Logtail采集配置的步骤,请参见采集主机文本日志。

查询和分析JSON日志 2025-04-22 10:56

本文以JSON类型的网站日志为例,介绍查询和分析的步骤,并提供SQL示例。 前提条件 为了进行后续的日志分析,您需要先采集JSON格式文本日志。

关联Logstore与MySQL数据库进行查询分析 2025-04-22 10:56

本文以游戏公司数据分析场景为例,介绍日志服务Logstore与MySQL数据库关联分析功能。 前提条件

目录
Copyright © 2025 your company All Rights Reserved. Powered by 博智数字服务平台.
闽ICP备08105208号-1